Skip to content

Commit

Permalink
Merge pull request #118 from ferryproxy/feat/load-balance
Browse files Browse the repository at this point in the history
Support load balance
  • Loading branch information
wzshiming authored Sep 2, 2022
2 parents 0a2930e + 59f0bf6 commit 2990b70
Show file tree
Hide file tree
Showing 24 changed files with 720 additions and 529 deletions.
24 changes: 24 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,30 @@ jobs:
TARGET_1: web-1
TARGET_2: web-0

- name: Test control-plane and cluster-1 and cluster-2
run: ./test/test/test-load-balance.sh
env:
ROUTE_NAME: ferry-test
CONTROL_PLANE: control-plane
CLUSTER_1: control-plane
CLUSTER_2: cluster-1
CLUSTER_3: cluster-2
TARGET_1: web-0
TARGET_2: web-1
TARGET_3: web-2

- name: Test cluster-1 and cluster-2 and control-plane
run: ./test/test/test-load-balance.sh
env:
ROUTE_NAME: ferry-test
CONTROL_PLANE: control-plane
CLUSTER_1: cluster-1
CLUSTER_2: cluster-2
CLUSTER_3: control-plane
TARGET_1: web-1
TARGET_2: web-2
TARGET_3: web-0

test-not-controller-cases:
continue-on-error: true
strategy:
Expand Down
18 changes: 9 additions & 9 deletions cmd/ferry-tunnel-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,10 @@ import (
)

var (
serviceName = env.GetEnv("SERVICE_NAME", consts.FerryTunnelName)
namespace = env.GetEnv("NAMESPACE", consts.FerryTunnelNamespace)
labelSelector = env.GetEnv("LABEL_SELECTOR", "tunnel.ferryproxy.io/service=inject")
master = env.GetEnv("MASTER", "")
kubeconfig = env.GetEnv("KUBECONFIG", "")
serviceName = env.GetEnv("SERVICE_NAME", consts.FerryTunnelName)
namespace = env.GetEnv("NAMESPACE", consts.FerryTunnelNamespace)
master = env.GetEnv("MASTER", "")
kubeconfig = env.GetEnv("KUBECONFIG", "")
)

const (
Expand Down Expand Up @@ -72,10 +71,11 @@ func main() {
}()

if serviceName != "" {
svcSyncer := controller.NewServiceSyncer(&controller.ServiceSyncerConfig{
svcSyncer := controller.NewDiscoveryController(&controller.DiscoveryControllerConfig{
Clientset: clientset,
Logger: log.WithName("service-syncer"),
LabelSelector: labelSelector,
Logger: log.WithName("discovery-controller"),
Namespace: namespace,
LabelSelector: consts.TunnelDiscoverConfigMapsKey + "=" + consts.TunnelDiscoverConfigMapsValue,
})

epWatcher := controller.NewEndpointWatcher(&controller.EndpointWatcherConfig{
Expand All @@ -102,7 +102,7 @@ func main() {

ctr := controller.NewRuntimeController(&controller.RuntimeControllerConfig{
Namespace: namespace,
LabelSelector: labelSelector,
LabelSelector: consts.TunnelRulesConfigMapsKey + "=" + consts.TunnelRulesConfigMapsValue,
Clientset: clientset,
Logger: log.WithName("runtime-controller"),
Conf: conf,
Expand Down
26 changes: 12 additions & 14 deletions pkg/consts/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,21 @@ const (
FerryTunnelName = FerryName + "-tunnel"
FerryTunnelNamespace = FerryTunnelName + "-system"

LabelPrefix = "traffic.ferryproxy.io/"
LabelFerryExportedFromKey = LabelPrefix + "exported-from"
LabelFerryExportedFromNamespaceKey = LabelPrefix + "exported-from-namespace"
LabelFerryExportedFromNameKey = LabelPrefix + "exported-from-name"
LabelFerryExportedFromPortsKey = LabelPrefix + "exported-from-ports"
LabelFerryImportedToKey = LabelPrefix + "imported-to"
LabelFerryManagedByKey = LabelPrefix + "managed-by"
LabelFerryManagedByValue = "ferry-controller"
LabelPrefix = "traffic.ferryproxy.io/"
LabelFerryExportedFromKey = LabelPrefix + "exported-from"
LabelFerryImportedToKey = LabelPrefix + "imported-to"
LabelFerryManagedByValue = "ferry"

LabelFerryTunnelKey = "tunnel.ferryproxy.io/service"
LabelFerryTunnelValue = "inject"

LabelGeneratedKey = "generated.ferryproxy.io"
LabelGeneratedValue = "ferry-controller"
LabelGeneratedKey = "generated.ferryproxy.io"
LabelGeneratedValue = "ferry-controller"
LabelGeneratedTunnelValue = "ferry-tunnel"

LabelMCSMarkHubKey = "mcs.traffic.ferryproxy.io/service"
LabelMCSMarkHubValue = "enabled"

TunnelRulesKey = "tunnel"
TunnelRulesKey = "tunnel"
TunnelRulesConfigMapsKey = "tunnel.ferryproxy.io/rules"
TunnelRulesConfigMapsValue = "enabled"
TunnelDiscoverConfigMapsKey = "tunnel.ferryproxy.io/service"
TunnelDiscoverConfigMapsValue = "enabled"
)
1 change: 1 addition & 0 deletions pkg/ferry-controller/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ func (c *Controller) Run(ctx context.Context) error {

mcsController := mcs.NewMCSController(&mcs.MCSControllerConfig{
Config: c.config,
Namespace: c.namespace,
ClusterCache: hubController,
Logger: c.logger.WithName("mcs"),
})
Expand Down
5 changes: 3 additions & 2 deletions pkg/ferry-controller/controller/hub/hub_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
externalversions "github.com/ferryproxy/client-go/generated/informers/externalversions"
"github.com/ferryproxy/ferry/pkg/client"
"github.com/ferryproxy/ferry/pkg/consts"
"github.com/ferryproxy/ferry/pkg/services"
"github.com/ferryproxy/ferry/pkg/utils/objref"
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -180,10 +181,10 @@ func (c *HubController) UnregistryServiceCallback(exportHubName, importHubName s
c.cacheService[exportHubName].UnregistryCallback(importHubName)
}

func (c *HubController) LoadPortPeer(importHubName string, list *corev1.ServiceList) {
func (c *HubController) LoadPortPeer(importHubName string, cluster, namespace, name string, ports []services.MappingPort) {
c.mut.RLock()
defer c.mut.RUnlock()
c.cacheTunnelPorts[importHubName].LoadPortPeer(list)
c.cacheTunnelPorts[importHubName].LoadPortPeer(cluster, namespace, name, ports)
}

func (c *HubController) GetPortPeer(importHubName string, cluster, namespace, name string, port int32) int32 {
Expand Down
66 changes: 10 additions & 56 deletions pkg/ferry-controller/controller/hub/tunnel_ports.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,8 @@ limitations under the License.
package hub

import (
"strconv"
"strings"

"github.com/ferryproxy/ferry/pkg/consts"
"github.com/ferryproxy/ferry/pkg/services"
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
)

type portPeer struct {
Expand Down Expand Up @@ -81,73 +77,31 @@ func (d *tunnelPorts) GetPort(cluster, namespace, name string, port int32) int32
return p
}

func (d *tunnelPorts) LoadPortPeer(list *corev1.ServiceList) {
for _, item := range list.Items {
d.loadPortPeerForService(&item)
}
}

func (d *tunnelPorts) loadPortPeerForService(svc *corev1.Service) {
if svc.Labels == nil ||
svc.Labels[consts.LabelFerryExportedFromKey] == "" ||
svc.Labels[consts.LabelFerryExportedFromNamespaceKey] == "" ||
svc.Labels[consts.LabelFerryExportedFromNameKey] == "" ||
svc.Labels[consts.LabelFerryExportedFromPortsKey] == "" {
return
}
cluster := svc.Labels[consts.LabelFerryExportedFromKey]
namespace := svc.Labels[consts.LabelFerryExportedFromNamespaceKey]
name := svc.Labels[consts.LabelFerryExportedFromNameKey]
ports := strings.Split(svc.Labels[consts.LabelFerryExportedFromPortsKey], "-")
logger := d.logger.WithValues(
"cluster", cluster,
"namespace", namespace,
"name", name,
)
for _, portStr := range ports {
portRaw, err := strconv.ParseInt(portStr, 10, 32)
if err != nil {
logger.Error(err, "Failed to parse port")
continue
}

var serverPort int32
for _, svcPort := range svc.Spec.Ports {
if svcPort.TargetPort.String() == portStr {
serverPort = svcPort.Port
break
}
}

if serverPort == 0 {
logger.Info("no match service port")
continue
}

port := int32(portRaw)
func (d *tunnelPorts) LoadPortPeer(cluster, namespace, name string, ports []services.MappingPort) {
for _, port := range ports {
peer := portPeer{
Cluster: cluster,
Namespace: namespace,
Name: name,
Port: serverPort,
Port: port.TargetPort,
}

if v, ok := d.portToPeer[port]; ok {
if v, ok := d.portToPeer[port.Port]; ok {
if v != peer {
logger.Info("duplicate port", "port", port, "peer", peer, "duplicate", v)
d.logger.Info("duplicate port", "port", port.Port, "peer", peer, "duplicate", v)
continue
}
} else {
d.portToPeer[port] = peer
d.portToPeer[port.Port] = peer
}

if v, ok := d.peerToPort[peer]; ok {
if v != port {
logger.Info("duplicate peer", "port", port, "peer", peer, "duplicate", v)
if v != port.Port {
d.logger.Info("duplicate peer", "port", port.Port, "peer", peer, "duplicate", v)
continue
}
} else {
d.peerToPort[peer] = port
d.peerToPort[peer] = port.Port
}
}
}
32 changes: 13 additions & 19 deletions pkg/ferry-controller/controller/mapping/mapping_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/ferryproxy/ferry/pkg/consts"
"github.com/ferryproxy/ferry/pkg/ferry-controller/router"
"github.com/ferryproxy/ferry/pkg/ferry-controller/router/resource"
"github.com/ferryproxy/ferry/pkg/services"
"github.com/ferryproxy/ferry/pkg/utils/diffobjs"
"github.com/ferryproxy/ferry/pkg/utils/trybuffer"
"github.com/go-logr/logr"
Expand All @@ -40,7 +41,7 @@ type ClusterCache interface {
GetHubGateway(hubName string, forHub string) v1alpha2.HubSpecGateway
GetIdentity(name string) string
Clientset(name string) kubernetes.Interface
LoadPortPeer(importHubName string, list *corev1.ServiceList)
LoadPortPeer(importHubName string, cluster, namespace, name string, ports []services.MappingPort)
GetPortPeer(importHubName string, cluster, namespace, name string, port int32) int32
RegistryServiceCallback(exportHubName, importHubName string, cb func())
UnregistryServiceCallback(exportHubName, importHubName string)
Expand Down Expand Up @@ -121,11 +122,6 @@ func (d *MappingController) Start(ctx context.Context) error {
}
}

err = d.loadLastService(ctx, way[len(way)-1], opt)
if err != nil {
return err
}

d.try = trybuffer.NewTryBuffer(d.sync, time.Second/10)

d.clusterCache.RegistryServiceCallback(d.exportHubName, d.importHubName, d.Sync)
Expand Down Expand Up @@ -171,31 +167,29 @@ func (d *MappingController) loadLastConfigMap(ctx context.Context, name string,
for _, item := range cmList.Items {
d.cacheResources[name] = append(d.cacheResources[name], resource.ConfigMap{item.DeepCopy()})
}
for _, item := range cmList.Items {
if item.Labels != nil && item.Labels[consts.TunnelDiscoverConfigMapsKey] == consts.TunnelDiscoverConfigMapsValue {
d.loadPorts(name, &item)
}
}
return nil
}

func (d *MappingController) loadLastService(ctx context.Context, name string, opt metav1.ListOptions) error {
svcList, err := d.clusterCache.Clientset(name).
CoreV1().
Services("").
List(ctx, opt)
func (d *MappingController) loadPorts(importHubName string, cm *corev1.ConfigMap) {
data, err := services.ServiceFrom(cm.Data)
if err != nil {
return err
}
for _, item := range svcList.Items {
d.cacheResources[name] = append(d.cacheResources[name], resource.Service{item.DeepCopy()})
d.logger.Error(err, "ServiceFrom")
return
}

d.clusterCache.LoadPortPeer(name, svcList)
return nil
d.clusterCache.LoadPortPeer(importHubName, data.ExportHubName, data.ExportServiceNamespace, data.ExportServiceName, data.Ports)
}

func (d *MappingController) getLabel() map[string]string {
if d.labels != nil {
return d.labels
}
d.labels = map[string]string{
consts.LabelFerryManagedByKey: consts.LabelFerryManagedByValue,
consts.LabelGeneratedKey: consts.LabelGeneratedValue,
consts.LabelFerryExportedFromKey: d.exportHubName,
consts.LabelFerryImportedToKey: d.importHubName,
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ferry-controller/controller/mcs/mcs_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (m *MCSController) Sync(ctx context.Context) {
m.mut.Lock()
defer m.mut.Unlock()

importMap, exportMap := m.clusterCache.ListMCS(m.namespace)
importMap, exportMap := m.clusterCache.ListMCS("")

updated := mcsToRoutePolicies(importMap, exportMap)

Expand Down
67 changes: 0 additions & 67 deletions pkg/ferry-controller/router/discovery.go

This file was deleted.

Loading

0 comments on commit 2990b70

Please sign in to comment.