diff --git a/bigquery/main.go b/bigquery/main.go index 33dadb9c..2af9dcf0 100644 --- a/bigquery/main.go +++ b/bigquery/main.go @@ -75,6 +75,7 @@ type bqRow struct { Env []string LogURL string Substitutions []*substitution + NewField string } type substitution struct { @@ -278,6 +279,7 @@ func (n *bqNotifier) SendNotification(ctx context.Context, build *cbpb.Build) er Env: build.Options.Env, LogURL: logURL, Substitutions: substitutions, + NewField: "hello", } return n.client.WriteRow(ctx, newRow) } @@ -300,6 +302,7 @@ func (bq *actualBQ) EnsureTable(ctx context.Context, tableName string) error { // Check for existence of table, create if false bq.table = bq.dataset.Table(tableName) schema, err := bigquery.InferSchema(bqRow{}) + schema = *bq.relaxSchema(&schema) if err != nil { return fmt.Errorf("Failed to infer schema: %v", err) } @@ -318,11 +321,54 @@ func (bq *actualBQ) EnsureTable(ctx context.Context, tableName string) error { if _, err := bq.table.Update(ctx, update, metadata.ETag); err != nil { return fmt.Errorf("Error: unable to update schema of table: %v", err) } + + } else { + existingFields := map[string]bigquery.FieldType{} + for _, field := range metadata.Schema { + existingFields[field.Name] = field.Type + } + for _, field := range schema { + if val, exists := existingFields[field.Name]; !exists { + // field not found, attempting to update table schema + if err := bq.updateTableAddColumn(ctx, field); err != nil { + return fmt.Errorf("Failed to update table schema: %q", err) + } + log.Warningf("Sucessfully added new field to schema: %q", val) + } else if val != field.Type { + return fmt.Errorf("Found existing field %q with incorrect type: %q", field.Name, val) + } + } } return nil } +func (bq *actualBQ) relaxSchema(schema *bigquery.Schema) *bigquery.Schema { + requiredFields := map[string]bool{"ProjectID": true, "ID": true} + relaxed := schema.Relax() + for index, field := range relaxed { + if _, required := requiredFields[field.Name]; required { + relaxed[index].Required = true + } + } + return &relaxed +} + +func (bq *actualBQ) updateTableAddColumn(ctx context.Context, field *bigquery.FieldSchema) error { + tableRef := bq.table + metadata, err := tableRef.Metadata(ctx) + if err != nil { + return err + } + newSchema := append(metadata.Schema, field) + newSchema = *bq.relaxSchema(&newSchema) + update := bigquery.TableMetadataToUpdate{Schema: newSchema} + if _, err := tableRef.Update(ctx, update, metadata.ETag); err != nil { + return err + } + return nil +} + func (bq *actualBQ) WriteRow(ctx context.Context, row *bqRow) error { ins := bq.table.Inserter() log.V(2).Infof("Writing row: %v", row)