Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Storage write api - support default stream #226

Merged
merged 6 commits into from
Jun 12, 2024
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 68 additions & 5 deletions server/storage_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,6 @@ func (s *storageWriteServer) CreateWriteStream(ctx context.Context, req *storage
TableSchema: schema,
WriteMode: storagepb.WriteStream_INSERT,
}

s.mu.Lock()
s.streamMap[streamName] = &writeStreamStatus{
streamType: streamType,
Expand Down Expand Up @@ -488,7 +487,6 @@ func (s *storageWriteServer) appendRows(req *storagepb.AppendRowsRequest, msgDes
if status.finalized {
return fmt.Errorf("stream is already finalized")
}
offset := req.GetOffset().Value
rows := req.GetProtoRows().GetRows().GetSerializedRows()
data, err := s.decodeData(msgDesc, rows)
if err != nil {
Expand All @@ -514,10 +512,19 @@ func (s *storageWriteServer) appendRows(req *storagepb.AppendRowsRequest, msgDes
s.sendErrorMessage(stream, streamName, err)
return err
}
if err := tx.Commit(); err != nil {
s.sendErrorMessage(stream, streamName, err)
return err
}
} else {
status.rows = append(status.rows, data...)
}
return s.sendResult(stream, streamName, offset+int64(len(rows)))
offset := req.GetOffset()
if offset != nil {
return s.sendResult(stream, streamName, offset.Value+int64(len(rows)))

}
return s.sendResult(stream, streamName, 0)
}

func (s *storageWriteServer) sendResult(stream storagepb.BigQueryWrite_AppendRowsServer, streamName string, offset int64) error {
Expand Down Expand Up @@ -670,10 +677,14 @@ func (s *storageWriteServer) insertTableData(ctx context.Context, tx *connection

func (s *storageWriteServer) GetWriteStream(ctx context.Context, req *storagepb.GetWriteStreamRequest) (*storagepb.WriteStream, error) {
s.mu.RLock()
defer s.mu.RUnlock()
status, exists := s.streamMap[req.Name]
s.mu.RUnlock()
if !exists {
return nil, fmt.Errorf("failed to find stream from %s", req.Name)
stream, err := s.createDefaultStream(ctx, req)
if err != nil {
return nil, fmt.Errorf("failed to find stream from %s", req.Name)
}
return stream, err
}
return status.stream, nil
}
Expand Down Expand Up @@ -768,6 +779,58 @@ func (s *storageWriteServer) FlushRows(ctx context.Context, req *storagepb.Flush
}, nil
}

/*
*
According to google documentation (https://pkg.go.dev/cloud.google.com/go/bigquery/storage/apiv1#BigQueryWriteClient.GetWriteStream)
every table has a special stream named ‘_default’ to which data can be written. This stream doesn’t need to be created using CreateWriteStream

Here we create the default stream and add it to map in case it not exists yet, the GetWriteStreamRequest given as second
argument should have Name in this format: projects/<projectId>/datasets/<datasetId>/tables/<tableId>/streams/_default
*/
func (s *storageWriteServer) createDefaultStream(ctx context.Context, req *storagepb.GetWriteStreamRequest) (*storagepb.WriteStream, error) {
streamId := req.Name
suffix := "_default"
streams := "/streams/"
if !strings.HasSuffix(streamId, suffix) {
return nil, fmt.Errorf("unexpected stream id: %s, expected '%s' suffix", streamId, suffix)
}
index := strings.LastIndex(streamId, streams)
if index == -1 {
return nil, fmt.Errorf("unexpected stream id: %s, expected containg '%s'", streamId, streams)
}
streamPart := streamId[:index]
writeStreamReq := &storagepb.CreateWriteStreamRequest{
Parent: streamPart,
WriteStream: &storagepb.WriteStream{
Type: storagepb.WriteStream_COMMITTED,
},
}
stream, err := s.CreateWriteStream(ctx, writeStreamReq)
if err != nil {
return nil, err
}
projectID, datasetID, tableID, err := getIDsFromPath(streamPart)
if err != nil {
return nil, err
}
tableMetadata, err := getTableMetadata(ctx, s.server, projectID, datasetID, tableID)
if err != nil {
return nil, err
}
streamStatus := &writeStreamStatus{
streamType: storagepb.WriteStream_COMMITTED,
stream: stream,
projectID: projectID,
datasetID: datasetID,
tableID: tableID,
tableMetadata: tableMetadata,
}
s.mu.Lock()
defer s.mu.Unlock()
s.streamMap[streamId] = streamStatus
return stream, nil
}

func getIDsFromPath(path string) (string, string, string, error) {
paths := strings.Split(path, "/")
if len(paths)%2 != 0 {
Expand Down