Skip to content

[Bug]: WriteToPubSub Sink breaks in batch mode #35990

@kaanonuk

Description

@kaanonuk

What happened?

Not able to publish data when PubSub used as a sink in a batch pipeline.

# Publish to Pub/Sub with rate limiting
        _ = messages | "PublishToPubSub" >> WriteToPubSub(
            topic=known_args.pubsub_topic, with_attributes=True
        )

def format_pubsub_message(record: dict[str, Any]) -> PubsubMessage:
    """Format a parquet record as a Pub/Sub message."""

    message = {"url": record["url"].strip(), "source": "parquet_ingestion"}
    data = json.dumps(message).encode("utf-8")
    attributes = {"source": "parquet_ingestion"}

    return PubsubMessage(data=data, attributes=attributes)

standard_options.streaming = True was needed to make this work, which is suboptimal because, from a customer's perspective, the source (bounded/unbounded) is what categorizes a streaming pipeline.

Besides the fix, can we update the documentation (https://cloud.google.com/dataflow/docs/guides/write-to-pubsub) to make this obvious and add a verbose error message to help with debugging?

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Infrastructure
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions