-
Notifications
You must be signed in to change notification settings - Fork 4.4k
Open
Description
What happened?
Hello, I was trying to use managed I/O in Dataflow. and had difficulty to resolve this issue.
Here's my sample code for dataflow.
import apache_beam as beam
from apache_beam import Row
from apache_beam.transforms import managed
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions
opts = # set google cloud option
with beam.Pipeline(options=opts) as p:
rows = (
p
| "CreateRows" >> beam.Create([
{"id": "a", "name": "Alice", "value": 100},
{"id": "b", "name": "Bob", "value": 200},
])
# | "ToRow" >> beam.Map(lambda x: Row(id=str(x["id"]), name=str(x["name"]), value=int(x["value"]))) # this works.
| "ToRow" >> beam.Map(lambda x: Row(**x)) # this does not work.
)
_ = rows | "WriteToBQ" >> managed.Write(
managed.BIGQUERY,
config={
"table": "myproject:mydataset.mytable",
"keep": ["id", "name", "value"],
},
)
2nd threw an error in managed.Write
like this
raise RuntimeError(_sanitize_java_traceback(response.error))
RuntimeError: java.lang.IllegalStateException: Cannot call getSchema when there is no schema
at org.apache.beam.sdk.values.PCollection.getSchema(PCollection.java:325)
at org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryFileLoadsSchemaTransformProvider$BigQueryFileLoadsSchemaTransform.expand(BigQueryFileLoadsSchemaTransformProvider.java:93)
at org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryFileLoadsSchemaTransformProvider$BigQueryFileLoadsSchemaTransform.expand(BigQueryFileLoadsSchemaTransformProvider.java:78)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:559)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:490)
at org.apache.beam.sdk.values.PCollectionRowTuple.apply(PCollectionRowTuple.java:215)
at org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryWriteSchemaTransformProvider$BigQueryWriteSchemaTransform.expand(BigQueryWriteSchemaTransformProvider.java:66)
at org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryWriteSchemaTransformProvider$BigQueryWriteSchemaTransform.expand(BigQueryWriteSchemaTransformProvider.java:55)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:559)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:490)
at org.apache.beam.sdk.values.PCollectionRowTuple.apply(PCollectionRowTuple.java:215)
at org.apache.beam.sdk.managed.ManagedSchemaTransformProvider$ManagedSchemaTransform.expand(ManagedSchemaTransformProvider.java:184)
at org.apache.beam.sdk.managed.ManagedSchemaTransformProvider$ManagedSchemaTransform.expand(ManagedSchemaTransformProvider.java:155)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:559)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:507)
at org.apache.beam.sdk.expansion.service.TransformProvider.apply(TransformProvider.java:121)
at org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:657)
at org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:758)
at org.apache.beam.model.expansion.v1.ExpansionServiceGrpc$MethodHandlers.invoke(ExpansionServiceGrpc.java:306)
at org.apache.beam.vendor.grpc.v1p69p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182)
at org.apache.beam.vendor.grpc.v1p69p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:356)
at org.apache.beam.vendor.grpc.v1p69p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:861)
at org.apache.beam.vendor.grpc.v1p69p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at org.apache.beam.vendor.grpc.v1p69p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
java.lang.IllegalStateException: Cannot call getSchema when there is no schema
And I have no idea why this occurs. Can anyone help me?
Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
Issue Components
- Component: Python SDK
- Component: Java SDK
- Component: Go SDK
- Component: Typescript SDK
- Component: IO connector
- Component: Beam YAML
- Component: Beam examples
- Component: Beam playground
- Component: Beam katas
- Component: Website
- Component: Infrastructure
- Component: Spark Runner
- Component: Flink Runner
- Component: Samza Runner
- Component: Twister2 Runner
- Component: Hazelcast Jet Runner
- Component: Google Cloud Dataflow Runner