Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AppendRows cannot be used on the second or subsequent PENDING write streams - UNKNOWN: failed to append rows: stream is already finalized #352

Open
wataruko opened this issue Aug 25, 2024 · 0 comments
Labels
bug Something isn't working

Comments

@wataruko
Copy link

What happened?

I am using ghcr.io/goccy/bigquery-emulator/bigquery-emulator in a local development environment to check the operation of a code that registers data using the PENDING write stream, but when I run the exact same process more than twice, the following error occurs on the second and subsequent processes: rpc error: code = Unknown desc = failed to append rows: stream is already finalized

What did you expect to happen?

Data can be registered also successfully from the second or subsequent PENDING write streams onwards.

How can we reproduce it (as minimally and precisely as possible)?

TestStorageWrite can be reproduced by changing the code to the following.

  • The changes from the previous version are that the TestServer is started only once at the beginning and each test case connects to the already started TestServer, and that the test case has two PENDING streams.
  • Test case 1(1st pending stream) passes, but test case 2(2nd pending stream) gives this error on the GetResult of the second AppendRows: rpc error: code = Unknown desc = failed to append rows: stream is already finalized
func TestStorageWrite(t *testing.T) {
	const (
		projectID = "test"
		datasetID = "test"
		tableID   = "sample"
	)

	ctx := context.Background()
	bqServer, err := server.New(server.TempStorage)
	if err != nil {
		t.Fatal(err)
	}
	if err := bqServer.Load(
		server.StructSource(
			types.NewProject(
				projectID,
				types.NewDataset(
					datasetID,
					types.NewTable(
						tableID,
						[]*types.Column{
							types.NewColumn("bool_col", types.BOOL),
							types.NewColumn("bytes_col", types.BYTES),
							types.NewColumn("float64_col", types.FLOAT64),
							types.NewColumn("int64_col", types.INT64),
							types.NewColumn("string_col", types.STRING),
							types.NewColumn("date_col", types.DATE),
							types.NewColumn("datetime_col", types.DATETIME),
							types.NewColumn("geography_col", types.GEOGRAPHY),
							types.NewColumn("numeric_col", types.NUMERIC),
							types.NewColumn("bignumeric_col", types.BIGNUMERIC),
							types.NewColumn("time_col", types.TIME),
							types.NewColumn("timestamp_col", types.TIMESTAMP),
							types.NewColumn("int64_list", types.INT64, types.ColumnMode(types.RepeatedMode)),
							types.NewColumn(
								"struct_col",
								types.STRUCT,
								types.ColumnFields(
									types.NewColumn("sub_int_col", types.INT64),
								),
							),
							types.NewColumn(
								"struct_list",
								types.STRUCT,
								types.ColumnFields(
									types.NewColumn("sub_int_col", types.INT64),
								),
								types.ColumnMode(types.RepeatedMode),
							),
						},
						nil,
					),
				),
			),
		),
	); err != nil {
		t.Fatal(err)
	}
	testServer := bqServer.TestServer()
	defer func() {
		testServer.Close()
		bqServer.Close()
	}()
	opts, err := testServer.GRPCClientOptions(ctx)
	if err != nil {
		t.Fatal(err)
	}

	client, err := managedwriter.NewClient(ctx, projectID, opts...)
	if err != nil {
		t.Fatal(err)
	}
	defer client.Close()
	for _, test := range []struct {
		name                            string
		streamType                      storagepb.WriteStream_Type
		isDefaultStream                 bool
		expectedRowsAfterFirstWrite     int
		expectedRowsAfterSecondWrite    int
		expectedRowsAfterThirdWrite     int
		expectedRowsAfterExplicitCommit int
	}{
		{
			name:                            "1st pending stream",
			streamType:                      storagepb.WriteStream_PENDING,
			expectedRowsAfterFirstWrite:     0,
			expectedRowsAfterSecondWrite:    0,
			expectedRowsAfterExplicitCommit: 4,
		},
		{
			name:                            "2nd pending stream",
			streamType:                      storagepb.WriteStream_PENDING,
			expectedRowsAfterFirstWrite:     4,
			expectedRowsAfterSecondWrite:    4,
			expectedRowsAfterExplicitCommit: 8,
		},
	} {
		t.Run(test.name, func(t *testing.T) {
			var writeStreamName string
			fullTableName := fmt.Sprintf("projects/%s/datasets/%s/tables/%s", projectID, datasetID, tableID)
			if !test.isDefaultStream {
				writeStream, err := client.CreateWriteStream(ctx, &storagepb.CreateWriteStreamRequest{
					Parent: fullTableName,
					WriteStream: &storagepb.WriteStream{
						Type: test.streamType,
					},
				})
				if err != nil {
					t.Fatalf("CreateWriteStream: %v", err)
				}
				writeStreamName = writeStream.GetName()
			}
			m := &exampleproto.SampleData{}
			descriptorProto, err := adapt.NormalizeDescriptor(m.ProtoReflect().Descriptor())
			if err != nil {
				t.Fatalf("NormalizeDescriptor: %v", err)
			}
			var writerOptions []managedwriter.WriterOption
			if test.isDefaultStream {
				writerOptions = append(writerOptions, managedwriter.WithType(managedwriter.DefaultStream))
				writerOptions = append(writerOptions, managedwriter.WithDestinationTable(fullTableName))
			} else {
				writerOptions = append(writerOptions, managedwriter.WithStreamName(writeStreamName))
			}
			writerOptions = append(writerOptions, managedwriter.WithSchemaDescriptor(descriptorProto))
			managedStream, err := client.NewManagedStream(
				ctx,
				writerOptions...,
			)
			if err != nil {
				t.Fatalf("NewManagedStream: %v", err)
			}

			bqClient, err := bigquery.NewClient(
				ctx,
				projectID,
				option.WithEndpoint(testServer.URL),
				option.WithoutAuthentication(),
			)
			if err != nil {
				t.Fatal(err)
			}
			defer bqClient.Close()

			rows, err := generateExampleMessages(1)
			if err != nil {
				t.Fatalf("generateExampleMessages: %v", err)
			}

			var (
				curOffset int64
				results   []*managedwriter.AppendResult
			)
			result, err := managedStream.AppendRows(ctx, rows, managedwriter.WithOffset(0))
			if err != nil {
				t.Fatalf("AppendRows first call error: %v", err)
			}

			iter := bqClient.Dataset(datasetID).Table(tableID).Read(ctx)
			resultRowCount := countRows(t, iter)
			if resultRowCount != test.expectedRowsAfterFirstWrite {
				t.Fatalf("expected the number of rows after first AppendRows %d but got %d", test.expectedRowsAfterFirstWrite, resultRowCount)
			}

			results = append(results, result)
			curOffset = curOffset + 1
			rows, err = generateExampleMessages(3)
			if err != nil {
				t.Fatalf("generateExampleMessages: %v", err)
			}
			result, err = managedStream.AppendRows(ctx, rows, managedwriter.WithOffset(curOffset))
			if err != nil {
				t.Fatalf("AppendRows second call error: %v", err)
			}

			iter = bqClient.Dataset(datasetID).Table(tableID).Read(ctx)
			resultRowCount = countRows(t, iter)
			if resultRowCount != test.expectedRowsAfterSecondWrite {
				t.Fatalf("expected the number of rows after second AppendRows %d but got %d", test.expectedRowsAfterSecondWrite, resultRowCount)
			}

			results = append(results, result)

			for k, v := range results {
				recvOffset, err := v.GetResult(ctx)
				if err != nil {
					t.Fatalf("append %d returned error: %v", k, err)
				}
				t.Logf("Successfully appended data at offset %d", recvOffset)
			}

			iter = bqClient.Dataset(datasetID).Table(tableID).Read(ctx)
			resultRowCount = countRows(t, iter)
			if resultRowCount != test.expectedRowsAfterThirdWrite {
				t.Fatalf("expected the number of rows after third AppendRows %d but got %d", test.expectedRowsAfterThirdWrite, resultRowCount)
			}

			rowCount, err := managedStream.Finalize(ctx)
			if err != nil {
				t.Fatalf("error during Finalize: %v", err)
			}

			t.Logf("Stream %s finalized with %d rows", managedStream.StreamName(), rowCount)

			req := &storagepb.BatchCommitWriteStreamsRequest{
				Parent:       managedwriter.TableParentFromStreamName(managedStream.StreamName()),
				WriteStreams: []string{managedStream.StreamName()},
			}

			resp, err := client.BatchCommitWriteStreams(ctx, req)
			if err != nil {
				t.Fatalf("client.BatchCommit: %v", err)
			}
			if len(resp.GetStreamErrors()) > 0 {
				t.Fatalf("stream errors present: %v", resp.GetStreamErrors())
			}

			iter = bqClient.Dataset(datasetID).Table(tableID).Read(ctx)
			resultRowCount = countRows(t, iter)
			if resultRowCount != test.expectedRowsAfterExplicitCommit {
				t.Fatalf("expected the number of rows after Finalize %d but got %d", test.expectedRowsAfterExplicitCommit, resultRowCount)
			}

			t.Logf("Table data committed at %s", resp.GetCommitTime().AsTime().Format(time.RFC3339Nano))
		})
	}
}

Anything else we need to know?

No response

@wataruko wataruko added the bug Something isn't working label Aug 25, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

1 participant