Skip to content

Commit 77e787c

Browse files
committed
go/*: various updates for protocol changes (derive-refactor)
These are mostly self-obvious, mechanical, or otherwise uninteresting changes that reflect fallout of other changes.
1 parent 7357155 commit 77e787c

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+684
-1423
lines changed

go/flow/builds.go

Lines changed: 2 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,6 @@ import (
55
"database/sql"
66
"fmt"
77
"io"
8-
"io/ioutil"
9-
"net"
10-
"net/http"
118
"net/url"
129
"os"
1310
"runtime"
@@ -23,7 +20,7 @@ import (
2320
type BuildService struct {
2421
baseURL *url.URL // URL to which buildIDs are joined.
2522
builds map[string]*sharedBuild // All active builds.
26-
gsClient *storage.Client // Google storage client which is initalized on first use.
23+
gsClient *storage.Client // Google storage client which is initialized on first use.
2724
mu sync.Mutex
2825
}
2926

@@ -43,11 +40,6 @@ type sharedBuild struct {
4340
dbTempfile *os.File
4441
dbErr error
4542
dbOnce sync.Once
46-
47-
tsWorker *JSWorker
48-
tsClient *http.Client
49-
tsErr error
50-
tsOnce sync.Once
5143
}
5244

5345
// NewBuildService returns a new *BuildService.
@@ -116,14 +108,6 @@ func (b *Build) Extract(fn func(*sql.DB) error) error {
116108
return fn(b.db)
117109
}
118110

119-
// TypeScriptLocalSocket returns the TypeScript Unix Domain Socket of this Catalog.
120-
// If a TypeScript worker isn't running, one is started
121-
// and will be stopped on a future call to Build.Close().
122-
func (b *Build) TypeScriptClient() (*http.Client, error) {
123-
b.tsOnce.Do(func() { _ = b.initTypeScript() })
124-
return b.tsClient, b.tsErr
125-
}
126-
127111
// Close the Build. If this is the last remaining reference,
128112
// then all allocated resources are cleaned up.
129113
func (b *Build) Close() error {
@@ -184,46 +168,6 @@ func (b *Build) dbInit() (err error) {
184168
return nil
185169
}
186170

187-
func (b *Build) initTypeScript() (err error) {
188-
defer func() { b.tsErr = err }()
189-
190-
var npmPackage []byte
191-
if err = b.Extract(func(d *sql.DB) error {
192-
npmPackage, err = catalog.LoadNPMPackage(b.db)
193-
return err
194-
}); err != nil {
195-
return fmt.Errorf("loading NPM package: %w", err)
196-
}
197-
198-
tsWorker, err := NewJSWorker(npmPackage)
199-
if err != nil {
200-
return fmt.Errorf("starting worker: %w", err)
201-
}
202-
b.tsWorker = tsWorker
203-
204-
// HTTP/S client which dials the TypeScript server over the loopback
205-
// for both cleartext and (fake) HTTPS connections.
206-
// The latter is a requirement for HTTP/2 support over unix domain sockets.
207-
// See also: https://www.mailgun.com/blog/http-2-cleartext-h2c-client-example-go/
208-
b.tsClient = &http.Client{
209-
Transport: &http.Transport{
210-
DialContext: func(_ context.Context, _, _ string) (net.Conn, error) {
211-
return net.Dial("unix", tsWorker.socketPath)
212-
},
213-
DialTLSContext: func(_ context.Context, _, _ string) (net.Conn, error) {
214-
return net.Dial("unix", tsWorker.socketPath)
215-
},
216-
// Compression isn't desired over a local UDS transport.
217-
DisableCompression: true,
218-
// MaxConnsPerHost is the maximum concurrency with which
219-
// we'll drive the lambda server.
220-
MaxConnsPerHost: 8,
221-
},
222-
}
223-
224-
return nil
225-
}
226-
227171
func (b *sharedBuild) destroy() error {
228172
if b.db == nil {
229173
// Nothing to close.
@@ -239,12 +183,6 @@ func (b *sharedBuild) destroy() error {
239183
return fmt.Errorf("removing DB tempfile: %w", err)
240184
}
241185

242-
if b.tsWorker == nil {
243-
// Nothing to stop.
244-
} else if err := b.tsWorker.Stop(); err != nil {
245-
return fmt.Errorf("stopping typescript worker: %w", err)
246-
}
247-
248186
return nil
249187
}
250188

@@ -273,7 +211,7 @@ func fetchResource(svc *BuildService, resource *url.URL) (path string, tempfile
273211
}
274212
defer r.Close()
275213

276-
if tempfile, err = ioutil.TempFile("", "build"); err != nil {
214+
if tempfile, err = os.CreateTemp("", "build"); err != nil {
277215
return "", nil, err
278216
}
279217
if _, err = io.Copy(tempfile, r); err != nil {

go/flow/builds_test.go

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"database/sql"
66
"net/url"
7+
"path"
78
"runtime"
89
"testing"
910

@@ -54,18 +55,19 @@ func TestBuildReferenceCounting(t *testing.T) {
5455
}
5556

5657
func TestBuildLazyInitAndReuse(t *testing.T) {
58+
var dir = t.TempDir()
5759
var args = bindings.BuildArgs{
5860
Context: context.Background(),
5961
FileRoot: "./testdata",
6062
BuildAPI_Config: pf.BuildAPI_Config{
6163
BuildId: "a-build-id",
62-
Directory: t.TempDir(),
64+
BuildDb: path.Join(dir, "a-build-id"),
6365
Source: "file:///specs_test.flow.yaml",
6466
SourceType: pf.ContentType_CATALOG,
6567
}}
6668
require.NoError(t, bindings.BuildCatalog(args))
6769

68-
var builds, err = NewBuildService("file://" + args.Directory + "/")
70+
var builds, err = NewBuildService("file://" + dir + "/")
6971
require.NoError(t, err)
7072

7173
// Open. Expect DB is not initialized until first use.
@@ -80,7 +82,7 @@ func TestBuildLazyInitAndReuse(t *testing.T) {
8082
collection, err = catalog.LoadCollection(db, "example/collection")
8183
return err
8284
}))
83-
require.Equal(t, "example/collection", collection.Collection.String())
85+
require.Equal(t, "example/collection", collection.Name.String())
8486

8587
// Database was initialized.
8688
var db1 = b1.db
@@ -95,12 +97,6 @@ func TestBuildLazyInitAndReuse(t *testing.T) {
9597
return nil
9698
}))
9799

98-
// Our fixture doesn't build a typescript package, so initialization
99-
// fails with an error. Expect the error is shared.
100-
_, err = b1.TypeScriptClient()
101-
require.Error(t, err)
102-
require.Equal(t, err, b2.tsErr)
103-
104100
// Close both builds, dropping the reference count to zero.
105101
require.NoError(t, b1.Close())
106102
require.NoError(t, b2.Close())
@@ -114,7 +110,7 @@ func TestBuildLazyInitAndReuse(t *testing.T) {
114110
collection, err = catalog.LoadCollection(db, "example/collection")
115111
return err
116112
}))
117-
require.Equal(t, "example/collection", collection.Collection.String())
113+
require.Equal(t, "example/collection", collection.Name.String())
118114
}
119115

120116
func TestInitOfMissingBuild(t *testing.T) {

go/flow/converge.go

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88

99
"github.com/estuary/flow/go/labels"
1010
pf "github.com/estuary/flow/go/protocols/flow"
11+
"github.com/estuary/flow/go/protocols/ops"
1112
"go.gazette.dev/core/broker/client"
1213
pb "go.gazette.dev/core/broker/protocol"
1314
"go.gazette.dev/core/consumer"
@@ -21,7 +22,7 @@ func ListShardsRequest(task pf.Task) pc.ListRequest {
2122
Selector: pb.LabelSelector{
2223
Include: pb.MustLabelSet(
2324
labels.TaskName, task.TaskName(),
24-
labels.TaskType, taskType(task),
25+
labels.TaskType, taskType(task).String(),
2526
),
2627
},
2728
}
@@ -34,7 +35,7 @@ func ListRecoveryLogsRequest(task pf.Task) pb.ListRequest {
3435
Include: pb.MustLabelSet(
3536
glabels.ContentType, glabels.ContentType_RecoveryLog,
3637
labels.TaskName, task.TaskName(),
37-
labels.TaskType, taskType(task),
38+
labels.TaskType, taskType(task).String(),
3839
),
3940
},
4041
}
@@ -44,7 +45,7 @@ func ListRecoveryLogsRequest(task pf.Task) pb.ListRequest {
4445
func ListPartitionsRequest(collection *pf.CollectionSpec) pb.ListRequest {
4546
return pb.ListRequest{
4647
Selector: pf.LabelSelector{
47-
Include: pb.MustLabelSet(labels.Collection, collection.Collection.String()),
48+
Include: pb.MustLabelSet(labels.Collection, collection.Name.String()),
4849
},
4950
}
5051
}
@@ -356,14 +357,14 @@ func ActivationChanges(
356357
for _, collection := range collections {
357358
var resp, err = client.ListAllJournals(ctx, jc, ListPartitionsRequest(collection))
358359
if err != nil {
359-
return nil, nil, fmt.Errorf("listing partitions of %s: %w", collection.Collection, err)
360+
return nil, nil, fmt.Errorf("listing partitions of %s: %w", collection.Name, err)
360361
}
361362

362363
var desired = MapPartitionsToCurrentSplits(resp.Journals)
363364
journals, err = CollectionChanges(collection, resp.Journals, desired, journals)
364365

365366
if err != nil {
366-
return nil, nil, fmt.Errorf("processing collection %s: %w", collection.Collection, err)
367+
return nil, nil, fmt.Errorf("processing collection %s: %w", collection.Name, err)
367368
}
368369
}
369370

@@ -411,7 +412,7 @@ func DeletionChanges(
411412
for _, collection := range collections {
412413
var resp, err = client.ListAllJournals(ctx, jc, ListPartitionsRequest(collection))
413414
if err != nil {
414-
return nil, nil, fmt.Errorf("listing partitions of %s: %w", collection.Collection, err)
415+
return nil, nil, fmt.Errorf("listing partitions of %s: %w", collection.Name, err)
415416
}
416417

417418
for _, cur := range resp.Journals {
@@ -452,14 +453,14 @@ func DeletionChanges(
452453
}
453454

454455
// taskType returns the label matching this Task.
455-
func taskType(task pf.Task) string {
456+
func taskType(task pf.Task) ops.TaskType {
456457
switch task.(type) {
457458
case *pf.CaptureSpec:
458-
return labels.TaskTypeCapture
459-
case *pf.DerivationSpec:
460-
return labels.TaskTypeDerivation
459+
return ops.TaskType_capture
460+
case *pf.CollectionSpec:
461+
return ops.TaskType_derivation
461462
case *pf.MaterializationSpec:
462-
return labels.TaskTypeMaterialization
463+
return ops.TaskType_materialization
463464
default:
464465
panic(task)
465466
}

go/flow/converge_test.go

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"database/sql"
66
"fmt"
7+
"path"
78
"testing"
89

910
"github.com/bradleyjkemp/cupaloy"
@@ -24,20 +25,20 @@ func TestConvergence(t *testing.T) {
2425
FileRoot: "./testdata",
2526
BuildAPI_Config: pf.BuildAPI_Config{
2627
BuildId: "fixture",
27-
Directory: t.TempDir(),
28+
BuildDb: path.Join(t.TempDir(), "build.db"),
2829
Source: "file:///specs_test.flow.yaml",
2930
SourceType: pf.ContentType_CATALOG,
3031
}}
3132
require.NoError(t, bindings.BuildCatalog(args))
3233

3334
var collection *pf.CollectionSpec
34-
var derivation *pf.DerivationSpec
35+
var derivation *pf.CollectionSpec
3536

36-
require.NoError(t, catalog.Extract(args.OutputPath(), func(db *sql.DB) (err error) {
37+
require.NoError(t, catalog.Extract(args.BuildDb, func(db *sql.DB) (err error) {
3738
if collection, err = catalog.LoadCollection(db, "example/collection"); err != nil {
3839
return err
3940
}
40-
derivation, err = catalog.LoadDerivation(db, "example/derivation")
41+
derivation, err = catalog.LoadCollection(db, "example/derivation")
4142
return err
4243
}))
4344

@@ -70,7 +71,7 @@ func TestConvergence(t *testing.T) {
7071
)))
7172
require.NoError(t, err)
7273

73-
shardSpec1, err := BuildShardSpec(derivation.ShardTemplate,
74+
shardSpec1, err := BuildShardSpec(derivation.Derivation.ShardTemplate,
7475
labels.EncodeRange(pf.RangeSpec{
7576
KeyBegin: 0x10000000,
7677
KeyEnd: 0x2fffffff,
@@ -79,9 +80,9 @@ func TestConvergence(t *testing.T) {
7980
}, pf.LabelSet{}),
8081
)
8182
require.NoError(t, err)
82-
logSpec1 := BuildRecoverySpec(derivation.RecoveryLogTemplate, shardSpec1)
83+
logSpec1 := BuildRecoverySpec(derivation.Derivation.RecoveryLogTemplate, shardSpec1)
8384

84-
shardSpec2, err := BuildShardSpec(derivation.ShardTemplate,
85+
shardSpec2, err := BuildShardSpec(derivation.Derivation.ShardTemplate,
8586
labels.EncodeRange(pf.RangeSpec{
8687
KeyBegin: 0x30000000,
8788
KeyEnd: 0x3fffffff,
@@ -90,9 +91,9 @@ func TestConvergence(t *testing.T) {
9091
}, pf.LabelSet{}),
9192
)
9293
require.NoError(t, err)
93-
logSpec2 := BuildRecoverySpec(derivation.RecoveryLogTemplate, shardSpec2)
94+
logSpec2 := BuildRecoverySpec(derivation.Derivation.RecoveryLogTemplate, shardSpec2)
9495

95-
shardSpec3, err := BuildShardSpec(derivation.ShardTemplate,
96+
shardSpec3, err := BuildShardSpec(derivation.Derivation.ShardTemplate,
9697
labels.EncodeRange(pf.RangeSpec{
9798
KeyBegin: 0x30000000,
9899
KeyEnd: 0x3fffffff,
@@ -101,7 +102,7 @@ func TestConvergence(t *testing.T) {
101102
}, pf.LabelSet{}),
102103
)
103104
require.NoError(t, err)
104-
logSpec3 := BuildRecoverySpec(derivation.RecoveryLogTemplate, shardSpec3)
105+
logSpec3 := BuildRecoverySpec(derivation.Derivation.RecoveryLogTemplate, shardSpec3)
105106

106107
var allPartitions = []pb.ListResponse_Journal{
107108
{Spec: *partitionSpec1, ModRevision: 11},
@@ -162,7 +163,7 @@ func TestConvergence(t *testing.T) {
162163
})
163164

164165
t.Run("shard-split-errors", func(t *testing.T) {
165-
var shard, err = BuildShardSpec(derivation.ShardTemplate,
166+
var shard, err = BuildShardSpec(derivation.Derivation.ShardTemplate,
166167
labels.EncodeRange(pf.RangeSpec{
167168
KeyEnd: 0x10000000,
168169
RClockEnd: 0x10000000,
@@ -330,7 +331,7 @@ func TestConvergence(t *testing.T) {
330331
var ctx = context.Background()
331332
var jc = &mockJournals{
332333
collections: map[string]*pb.ListResponse{
333-
collection.Collection.String(): {Journals: allPartitions},
334+
collection.Name.String(): {Journals: allPartitions},
334335
},
335336
logs: map[string]*pb.ListResponse{
336337
derivation.TaskName(): {Journals: allLogs},
@@ -361,7 +362,7 @@ func TestConvergence(t *testing.T) {
361362
var ctx = context.Background()
362363
var jc = &mockJournals{
363364
collections: map[string]*pb.ListResponse{
364-
collection.Collection.String(): {Journals: allPartitions},
365+
collection.Name.String(): {Journals: allPartitions},
365366
},
366367
logs: map[string]*pb.ListResponse{
367368
derivation.TaskName(): {Journals: allLogs},

go/flow/mapping.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ func NewMapper(
7373
func PartitionPointers(spec *pf.CollectionSpec) []string {
7474
var ptrs = make([]string, len(spec.PartitionFields))
7575
for i, field := range spec.PartitionFields {
76-
ptrs[i] = pf.GetProjectionByField(field, spec.Projections).Ptr
76+
ptrs[i] = spec.GetProjection(field).Ptr
7777
}
7878
return ptrs
7979
}
@@ -174,7 +174,7 @@ func (m *Mapper) Map(mappable message.Mappable) (pb.Journal, string, error) {
174174
"journal": applySpec.Name,
175175
"readThrough": readThrough,
176176
}).Info("created partition")
177-
createdPartitionsCounters.WithLabelValues(msg.Spec.Collection.String()).Inc()
177+
createdPartitionsCounters.WithLabelValues(msg.Spec.Name.String()).Inc()
178178
}
179179

180180
m.journals.Mu.RLock()
@@ -274,7 +274,7 @@ func (m Mappable) SetUUID(uuid message.UUID) {
274274
func NewAcknowledgementMessage(spec *pf.CollectionSpec) Mappable {
275275
return Mappable{
276276
Spec: spec,
277-
Doc: append(json.RawMessage(nil), spec.AckJsonTemplate...),
277+
Doc: append(json.RawMessage(nil), spec.AckTemplateJson...),
278278
}
279279
}
280280

0 commit comments

Comments
 (0)