Skip to content

Commit

Permalink
Fix a case where applied kafka intents were reported without namespac…
Browse files Browse the repository at this point in the history
…e to the cloud (#456)
  • Loading branch information
omris94 authored Jul 17, 2024
1 parent dba9212 commit 05acf89
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 19 deletions.
35 changes: 16 additions & 19 deletions src/operator/api/v2alpha1/clientintents_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,16 +459,7 @@ func (in *ClientIntents) GetDatabaseIntents() []Target {
// GetTargetServerNamespace returns target namespace for intent if exists
// or the entire resource's namespace if the specific intent has no target namespace, as it's optional
func (in *Target) GetTargetServerNamespace(intentsObjNamespace string) string {
var name string

if in.Kubernetes != nil {
name = in.Kubernetes.Name
}
if in.Service != nil {
name = in.Service.Name
}

nameWithNamespace := strings.Split(name, ".")
nameWithNamespace := strings.Split(in.GetTargetServerNameAsWritten(), ".")
if len(nameWithNamespace) == 1 {
return intentsObjNamespace
}
Expand Down Expand Up @@ -526,7 +517,15 @@ func (in *Target) IsTargetOutOfCluster() bool {

// GetTargetServerName returns server's service name, without namespace, or the Kubernetes service without the `svc:` prefix
func (in *Target) GetTargetServerName() string {
name := in.GetTargetServerNameAsWritten()
if in.IsTargetOutOfCluster() {
return name
}
nameWithNamespace := strings.Split(name, ".")
return nameWithNamespace[0]
}

func (in *Target) GetTargetServerNameAsWritten() string {
if in.Internet != nil {
return OtterizeInternetTargetName
}
Expand All @@ -547,22 +546,19 @@ func (in *Target) GetTargetServerName() string {
return in.SQL.Name
}

var name string

if in.Kafka != nil {
name = in.Kafka.Name
return in.Kafka.Name
}

if in.Service != nil {
name = in.Service.Name
return in.Service.Name
}

if in.Kubernetes != nil {
name = in.Kubernetes.Name
return in.Kubernetes.Name
}

nameWithNamespace := strings.Split(name, ".")
return nameWithNamespace[0]
panic("target server name not found")
}

func (in *Target) GetTargetServerKind() string {
Expand Down Expand Up @@ -782,7 +778,7 @@ func (in *Target) ConvertToCloudFormat(ctx context.Context, k8sClient client.Cli
if in.IsTargetServerKubernetesService() {
// alias should be the kubernetes service
alias = &graphqlclient.ServerAliasInput{
Name: lo.ToPtr(serverServiceIdentity.Name),
Name: lo.ToPtr(serverServiceIdentity.GetNameAsServer()),
Kind: lo.ToPtr(serverServiceIdentity.Kind),
}
labelSelector, ok, err := ServiceIdentityToLabelsForWorkloadSelection(ctx, k8sClient, serverServiceIdentity)
Expand Down Expand Up @@ -810,7 +806,8 @@ func (in *Target) ConvertToCloudFormat(ctx context.Context, k8sClient client.Cli
intentInput := graphqlclient.IntentInput{
ClientName: lo.ToPtr(clientServiceIdentity.Name),
ClientWorkloadKind: lo.Ternary(clientServiceIdentity.Kind != serviceidentity.KindOtterizeLegacy, lo.ToPtr(clientServiceIdentity.Kind), nil),
ServerName: lo.ToPtr(in.GetTargetServerName()),
ServerName: lo.ToPtr(serverServiceIdentity.Name),
ServerWorkloadKind: lo.Ternary(serverServiceIdentity.Kind != serviceidentity.KindOtterizeLegacy, lo.ToPtr(clientServiceIdentity.Kind), nil),
Namespace: lo.ToPtr(clientServiceIdentity.Namespace),
ServerNamespace: toPtrOrNil(in.GetTargetServerNamespace(clientServiceIdentity.Namespace)),
ServerAlias: alias,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -770,6 +770,21 @@ func (s *CloudReconcilerTestSuite) TestNamespaceParseSuccess() {
s.Require().Equal(lo.FromPtr(cloudIntent.ServerNamespace), "other-namespace")
}

// TestIntents With Kafka Target
func (s *CloudReconcilerTestSuite) TestKafkaTarget() {
serverName := "server.other-namespace"
intent := &otterizev2alpha1.Target{Kafka: &otterizev2alpha1.KafkaTarget{Name: serverName, Topics: []otterizev2alpha1.KafkaTopic{{Name: "test", Operations: []otterizev2alpha1.KafkaOperation{otterizev2alpha1.KafkaOperationConsume}}}}}
cloudIntent, err := intent.ConvertToCloudFormat(context.Background(), s.client, serviceidentity.ServiceIdentity{Name: clientName, Namespace: testNamespace})
s.Require().NoError(err)
s.Require().Equal(lo.FromPtr(cloudIntent.ServerName), "server")
s.Require().Equal(lo.FromPtr(cloudIntent.ServerNamespace), "other-namespace")
s.Require().Len(cloudIntent.Topics, 1)
s.Require().Equal(lo.FromPtr(cloudIntent.Topics[0].Name), "test")
s.Require().Len(cloudIntent.Topics[0].Operations, 1)
s.Require().Equal(lo.FromPtr(cloudIntent.Topics[0].Operations[0]), graphqlclient.KafkaOperationConsume)

}

func (s *CloudReconcilerTestSuite) TestTargetNamespaceAsSourceNamespace() {
serverName := "server"
intent := &otterizev2alpha1.Target{Kubernetes: &otterizev2alpha1.KubernetesTarget{Name: serverName}}
Expand Down

0 comments on commit 05acf89

Please sign in to comment.