Properly Handle the Case when SSE Data is Split into Multiple Chunks #1403
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Context
I was trying to use Upstash SDK with Redis PubSub commands on Cloudflare Workers and was not able to receive the messages. After digging into the Upstash SDK source code and network packets I was able to find that the worker runtime was splitting the SSE message into multiple chunks in the middle of the line (which should be permitted), but the Upstash SDK was not properly handle this situation, causing the message to be silently dropped.
Problem
The streaming parser in the Upstash SDK processed each network chunk independently, splitting on newlines without buffering partial lines. This was incorrect because the stream reader is not guaranteed to emit chunks aligned with newlines (as in the Cloudflare case above). Chunks can be generated at arbitrary byte boundaries, including mid-line. When a message arrived split across multiple chunks, the parser attempted to process incomplete data, resulting in JSON parse errors and lost messages.
Example scenario:
A subscription receives the following message split across two chunks:
Chunk 1:
data: {"id":"msg1","coChunk 2:
ntent":"hello"}\nPrevious behavior:
{"id":"msg1","co→ incomplete message, dropped.ntent":"hello"}→ incomplete message, droppedFix
Implemented line buffering to accumulate partial data between chunks:
buffervariable to store incomplete lines{ stream: true }for proper streaming text decoding (see TextDecoder.decode() documentation)\nto extract complete linesThe
stream: trueoption indicates that additional data will follow in subsequent calls todecode(), which is essential for correctly handling multi-byte characters that may be split across chunk boundaries.