diff --git a/server/handler.go b/server/handler.go index 692b66470..022efaf10 100644 --- a/server/handler.go +++ b/server/handler.go @@ -1062,6 +1062,7 @@ func (h *jobsInsertHandler) importFromGCS(ctx context.Context, r *jobsInsertRequ opts = append( opts, option.WithEndpoint(fmt.Sprintf("%s/storage/v1/", host)), + storage.WithJSONReads(), option.WithoutAuthentication(), ) } diff --git a/server/server_test.go b/server/server_test.go index 8215a660a..69276f95a 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -1708,6 +1708,141 @@ func TestImportFromGCS(t *testing.T) { } } +func TestImportFromGCSEmulatorWithoutPublicHost(t *testing.T) { + const ( + projectID = "test" + datasetID = "dataset1" + tableID = "table_a" + host = "127.0.0.1" + bucketName = "test-bucket" + sourceName = "path/to/data.json" + ) + + ctx := context.Background() + bqServer, err := server.New(server.TempStorage) + if err != nil { + t.Fatal(err) + } + project := types.NewProject( + projectID, + types.NewDataset( + datasetID, + types.NewTable( + tableID, + []*types.Column{ + types.NewColumn("id", types.INT64), + types.NewColumn("value", types.INT64), + }, + nil, + ), + ), + ) + if err := bqServer.Load(server.StructSource(project)); err != nil { + t.Fatal(err) + } + + testServer := bqServer.TestServer() + var buf bytes.Buffer + enc := json.NewEncoder(&buf) + for i := 0; i < 3; i++ { + if err := enc.Encode(map[string]interface{}{ + "id": i + 1, + "value": i + 10, + }); err != nil { + t.Fatal(err) + } + } + storageServer, err := fakestorage.NewServerWithOptions(fakestorage.Options{ + InitialObjects: []fakestorage.Object{ + { + ObjectAttrs: fakestorage.ObjectAttrs{ + BucketName: bucketName, + Name: sourceName, + Size: int64(len(buf.Bytes())), + }, + Content: buf.Bytes(), + }, + }, + Host: host, + Scheme: "http", + }) + if err != nil { + t.Fatal(err) + } + + storageServerURL := storageServer.URL() + u, err := url.Parse(storageServerURL) + if err != nil { + t.Fatal(err) + } + storageEmulatorHost := fmt.Sprintf("http://%s:%s", host, u.Port()) + t.Setenv("STORAGE_EMULATOR_HOST", storageEmulatorHost) + + defer func() { + testServer.Close() + bqServer.Stop(ctx) + storageServer.Stop() + }() + + client, err := bigquery.NewClient( + ctx, + projectID, + option.WithEndpoint(testServer.URL), + option.WithoutAuthentication(), + ) + if err != nil { + t.Fatal(err) + } + defer client.Close() + + gcsSourceURL := fmt.Sprintf("gs://%s/%s", bucketName, sourceName) + gcsRef := bigquery.NewGCSReference(gcsSourceURL) + gcsRef.SourceFormat = bigquery.JSON + gcsRef.AutoDetect = true + loader := client.Dataset(datasetID).Table(tableID).LoaderFrom(gcsRef) + loader.WriteDisposition = bigquery.WriteTruncate + job, err := loader.Run(ctx) + if err != nil { + t.Fatal(err) + } + status, err := job.Wait(ctx) + if err != nil { + t.Fatal(err) + } + if status.Err() != nil { + t.Fatal(status.Err()) + } + + query := client.Query(fmt.Sprintf("SELECT * FROM %s.%s", datasetID, tableID)) + it, err := query.Read(ctx) + if err != nil { + t.Fatal(err) + } + + type row struct { + ID int64 + Value int64 + } + var rows []*row + for { + var r row + if err := it.Next(&r); err != nil { + if err == iterator.Done { + break + } + t.Fatal(err) + } + rows = append(rows, &r) + } + if diff := cmp.Diff([]*row{ + {ID: 1, Value: 10}, + {ID: 2, Value: 11}, + {ID: 3, Value: 12}, + }, rows); diff != "" { + t.Errorf("(-want +got):\n%s", diff) + } +} + func TestImportWithWildcardFromGCS(t *testing.T) { const ( projectID = "test"