Skip to content

Commit

Permalink
feat: create direct controller for DataflowFlexTemplateJob
Browse files Browse the repository at this point in the history
  • Loading branch information
justinsb committed Aug 27, 2024
1 parent 27ad0c6 commit 9d0f76e
Show file tree
Hide file tree
Showing 9 changed files with 631 additions and 5 deletions.
10 changes: 10 additions & 0 deletions apis/refs/v1beta1/projectref.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,16 @@ type Project struct {
ProjectID string
}

// ResolveProjectFromAnnotation resolves the projectID to use for a resource,
// it should be used for resources which do not have a projectRef
func ResolveProjectFromAnnotation(ctx context.Context, reader client.Reader, src client.Object) (*Project, error) {
if projectID := src.GetAnnotations()["cnrm.cloud.google.com/project-id"]; projectID != "" {
return &Project{ProjectID: projectID}, nil
}

return nil, fmt.Errorf("project-id annotation not set on resource")
}

// ResolveProject will resolve a ProjectRef to a Project, with the ProjectID.
func ResolveProject(ctx context.Context, reader client.Reader, src client.Object, ref *ProjectRef) (*Project, error) {
if ref == nil {
Expand Down
1 change: 1 addition & 0 deletions go.work.sum
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ cloud.google.com/go/containeranalysis v0.12.1/go.mod h1:+/lcJIQSFt45TC0N9Nq7/dPb
cloud.google.com/go/datacatalog v1.20.3/go.mod h1:AKC6vAy5urnMg5eJK3oUjy8oa5zMbiY33h125l8lmlo=
cloud.google.com/go/datacatalog v1.20.5/go.mod h1:DB0QWF9nelpsbB0eR/tA0xbHZZMvpoFD1XFy3Qv/McI=
cloud.google.com/go/dataflow v0.9.9/go.mod h1:Wk/92E1BvhV7qs/dWb+3dN26uGgyp/H1Jr5ZJxeD3dw=
cloud.google.com/go/dataflow v0.9.11 h1:YIhStasKFDESaUdpnsHsp/5bACYL/yvW0OuZ6zPQ6nY=
cloud.google.com/go/dataflow v0.9.11/go.mod h1:CCLufd7I4pPfyp54qMgil/volrL2ZKYjXeYLfQmBGJs=
cloud.google.com/go/dataform v0.9.6/go.mod h1:JKDPMfcYMu9oUMubIvvAGWTBX0sw4o/JIjCcczzbHmk=
cloud.google.com/go/datafusion v1.7.9/go.mod h1:ciYV8FL0JmrwgoJ7CH64oUHiI0oOf2VLE45LWKT51Ls=
Expand Down
2 changes: 1 addition & 1 deletion hack/compare-mock
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@ E2E_GCP_TARGET=mock \
GOLDEN_REQUEST_CHECKS=1 \
GOLDEN_OBJECT_CHECKS=1 \
WRITE_GOLDEN_OUTPUT=1 \
KCC_USE_DIRECT_RECONCILERS="SQLInstance" \
KCC_USE_DIRECT_RECONCILERS="SQLInstance,DataflowFlexTemplateJob" \
RUN_E2E=1 \
go test ./tests/e2e -timeout 3600s -v -run $RUN_TESTS
8 changes: 4 additions & 4 deletions mockgcp/mockdataflow/workflows.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (r *MockService) StopJob(fqn string) error {
obj.CurrentStateTime = timestamppb.New(now)
obj.RequestedState = pb.JobState_JOB_STATE_UNKNOWN
default:
return fmt.Errorf("unexpected state for job %q: %v", fqn, obj.CurrentState)
return fmt.Errorf("unexpected state for job, got=%q, expected=JobState_JOB_STATE_CANCELLING: %v", fqn, obj.CurrentState)
}

if err := r.storage.Update(ctx, fqn, obj); err != nil {
Expand Down Expand Up @@ -85,7 +85,7 @@ func (r *MockService) StartJob(fqn string, project *projects.ProjectData, req *p
}

default:
return fmt.Errorf("unexpected state for job %q: %v", fqn, job.CurrentState)
return fmt.Errorf("unexpected state for job, got=%q, expected=JobState_JOB_STATE_UNKNOWN: %v", fqn, job.CurrentState)
}

if err := r.storage.Update(ctx, fqn, job); err != nil {
Expand All @@ -112,7 +112,7 @@ func (r *MockService) StartJob(fqn string, project *projects.ProjectData, req *p
job.Environment.Experiments = buildExperiments()

default:
return fmt.Errorf("unexpected state for job %q: %v", fqn, job.CurrentState)
return fmt.Errorf("unexpected state for job, got=%q, expected=JobState_JOB_STATE_QUEUED: %v", fqn, job.CurrentState)
}

if err := r.storage.Update(ctx, fqn, job); err != nil {
Expand Down Expand Up @@ -149,7 +149,7 @@ func (r *MockService) StartJob(fqn string, project *projects.ProjectData, req *p
job.Environment.Version = &structpb.Struct{}
job.Environment.WorkerPools = []*pb.WorkerPool{{Kind: "placeholder"}}
default:
return fmt.Errorf("unexpected state for job %q: %v", fqn, job.CurrentState)
return fmt.Errorf("unexpected state for job, got=%q, expected=JobState_JOB_STATE_PENDING: %v", fqn, job.CurrentState)
}

if err := r.storage.Update(ctx, fqn, job); err != nil {
Expand Down
80 changes: 80 additions & 0 deletions pkg/controller/direct/dataflow/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package dataflow

import (
"context"
"fmt"

api "cloud.google.com/go/dataflow/apiv1beta3"
"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/config"
"google.golang.org/api/option"
)

type gcpClient struct {
config config.ControllerConfig
}

func newGCPClient(ctx context.Context, config *config.ControllerConfig) (*gcpClient, error) {
gcpClient := &gcpClient{
config: *config,
}
return gcpClient, nil
}

func (m *gcpClient) options() ([]option.ClientOption, error) {
var opts []option.ClientOption
if m.config.UserAgent != "" {
opts = append(opts, option.WithUserAgent(m.config.UserAgent))
}
if m.config.HTTPClient != nil {
// TODO: Set UserAgent in this scenario (error is: WithHTTPClient is incompatible with gRPC dial options)
opts = append(opts, option.WithHTTPClient(m.config.HTTPClient))
}
if m.config.UserProjectOverride && m.config.BillingProject != "" {
opts = append(opts, option.WithQuotaProject(m.config.BillingProject))
}

// TODO: support endpoints?
// if m.config.Endpoint != "" {
// opts = append(opts, option.WithEndpoint(m.config.Endpoint))
// }

return opts, nil
}

func (m *gcpClient) newFlexTemplatesClient(ctx context.Context) (*api.FlexTemplatesClient, error) {
opts, err := m.options()
if err != nil {
return nil, err
}
client, err := api.NewFlexTemplatesRESTClient(ctx, opts...)
if err != nil {
return nil, fmt.Errorf("building dataflow flexTemplates client: %w", err)
}
return client, err
}

func (m *gcpClient) newJobsClient(ctx context.Context) (*api.JobsV1Beta3Client, error) {
opts, err := m.options()
if err != nil {
return nil, err
}
client, err := api.NewJobsV1Beta3RESTClient(ctx, opts...)
if err != nil {
return nil, fmt.Errorf("building dataflow jobs client: %w", err)
}
return client, err
}
Loading

0 comments on commit 9d0f76e

Please sign in to comment.