Replies: 1 comment 2 replies
-
What I like about the stream transformer API is that it's very flexible, you can essentially implement any "man-in-the-middle" transformation on the connection to the postgres server.
But on that stream, there is no way to "intercept" messages (so that they're not seen by a
If you only need to react to incoming messages from the server by either forwarding them to the client or by responding directly, how about this: typedef InterceptedMessage = ({bool forward, BaseMessage? response});
typedef ChannelInterceptor = InterceptedMessage Function(
BaseMessage fromServer);
extension on StreamChannel<BaseMessage> {
StreamChannel<BaseMessage> intercept(ChannelInterceptor interceptor) {
return changeStream((oldStream) async* {
await for (final originalMessage in oldStream) {
final intercepted = interceptor(originalMessage);
if (intercepted.response case final BaseMessage response) {
// Respond directly to server
sink.add(response);
}
if (intercepted.forward) {
yield originalMessage;
}
}
});
}
} Since the sink isn't changed, it doesn't need to be cached either. More complex transformations could likely be implemented as well by making the sink a final field that is set to |
Beta Was this translation helpful? Give feedback.
-
This post is intended to discuss the transformer/ API (i.e.
PgSessionSettings.transformer
). The goal is to share how the API is being used as well as to get feedback if there are better approaches.Some Context:
In Streaming Replication mode, once a connection is successfully established with the server, the server will start streaming data to the client. While the majority of the messages would be just data (i.e. changes) that don't require a response, other messages require a response from the client (i.e.
PrimaryKeepAliveMessage
).Example Usage:
While updating the postgresql-dart-replication-example to support version 3, this is how I ended up utilizing the transformer API to send and receive server messages:
Discussion:
While listening to server messages is an appropriate use of the API, caching the sink feels a hacky solution since this does not seem how the API is supposed to be used (i.e. as messages interceptor).
For comparison, in V2,
PostgreSQLConnection.addMessage
was an available straightforward method to send messages to the server. Similarly,PostgreSQLConnection.messages
was the way to listen to server messages.While I don't mind using the approach above, caching the sink just didn't feel right. Maybe I am just overthinking. Anyway, I thought it's better to bring this up now than later.
Cheers
cc @simolus3 @isoos
Beta Was this translation helpful? Give feedback.
All reactions