Skip to content

Conversation

@coolwednesday
Copy link
Member

@coolwednesday coolwednesday commented Dec 1, 2025

Redis PubSub: Streams Support and Default Mode Change

Description:

  • Closes Redis Pub Sub Support #2347
  • This PR adds comprehensive support for Redis Streams in GoFr's PubSub implementation and changes the default mode from pubsub to streams. The implementation follows the same patterns as other pubsub providers (NATS, EventHub, Kafka) and includes full support for consumer groups, acknowledgments, and message persistence.

Testing Redis PubSub

Prerequisites

Start Redis using Docker:

# Basic Redis instance
docker run -d \
  --name redis \
  -p 6379:6379 \
  redis:7-alpine

# Redis with password authentication
docker run -d \
  --name redis \
  -p 6379:6379 \
  redis:7-alpine redis-server --requirepass mypassword

Testing Streams Mode (Persistence) - Default

Streams mode provides persistent messaging with consumer groups and acknowledgments. Messages are stored in Redis and can be processed even after app restarts.

Configuration

Create configs/.env:

PUBSUB_BACKEND=REDIS
REDIS_HOST=localhost
REDIS_PORT=6379
# Streams mode is default, but explicitly set for clarity
REDIS_PUBSUB_MODE=streams
REDIS_STREAMS_CONSUMER_GROUP=my-group
REDIS_STREAMS_CONSUMER_NAME=my-consumer
REDIS_STREAMS_BLOCK_TIMEOUT=5s
REDIS_STREAMS_MAXLEN=1000

Test Persistence

  1. Start Publisher (using gofr/examples/using-publisher):

    cd gofr/examples/using-publisher
    go run main.go
  2. Publish messages:

    # Publish an order
    curl -X POST http://localhost:8000/publish-order \
      -H "Content-Type: application/json" \
      -d '{"orderId": "order-123", "status": "pending"}'
    
    # Publish a product
    curl -X POST http://localhost:8000/publish-product \
      -H "Content-Type: application/json" \
      -d '{"productId": "prod-456", "price": "99.99"}'
  3. Start Subscriber (using gofr/examples/using-subscriber):

    cd gofr/examples/using-subscriber
    go run main.go
  4. Verify Persistence:

    • Stop the subscriber
    • Publish more messages
    • Restart the subscriber
    • Expected: Subscriber processes all messages, including those published while it was down
    • Messages are persisted in Redis Streams and can be replayed

Testing PubSub Mode (Fire-and-Forget)

PubSub mode provides non-persistent messaging. Messages are delivered only to active subscribers and are lost if no subscriber is listening.

Configuration

Create configs/.env:

PUBSUB_BACKEND=REDIS
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_PUBSUB_MODE=pubsub

Test Fire-and-Forget Behavior

  1. Start Subscriber first:

    cd gofr/examples/using-subscriber
    go run main.go
  2. Publish messages (subscriber is listening):

    curl -X POST http://localhost:8000/publish-order \
      -H "Content-Type: application/json" \
      -d '{"orderId": "order-123", "status": "pending"}'
    • Expected: Subscriber receives and processes the message immediately
  3. Stop the subscriber and publish more messages:

    # Stop subscriber (Ctrl+C)
    # Then publish
    curl -X POST http://localhost:8000/publish-order \
      -H "Content-Type: application/json" \
      -d '{"orderId": "order-456", "status": "processing"}'
  4. Restart subscriber:

    go run main.go
    • Expected: Subscriber does NOT receive messages published while it was down
    • Messages are lost (fire-and-forget behavior)

Features

✅ Streams Mode (Default)

  • Persistence: Messages are stored in Redis Streams
  • Consumer Groups: Multiple consumers can process messages
  • Acknowledgments: Messages are acknowledged after processing
  • At-least-once delivery: Messages are guaranteed to be delivered
  • Replay capability: Messages can be replayed from any point

✅ PubSub Mode

  • Fire-and-forget: Messages delivered only to active subscribers
  • No persistence: Messages are not stored
  • At-most-once delivery: Messages may be lost if no subscriber is active
  • Lightweight: Lower overhead than streams

Checklist:

  • I have formatted my code using goimport and golangci-lint.
  • All new code is covered by unit tests.
  • This PR does not decrease the overall code coverage.
  • I have reviewed the code comments and documentation for clarity.

Copy link

@manavkush manavkush left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes seem good 👍 . Didn't test yet though.
@coolwednesday the AI suggestions for the logging can be added for redacting sensitive information. Can check this in other critical flows as well.

@coolwednesday
Copy link
Member Author

Will reopen the PR after fixing the review comments, linters, tests and proper testing.

@coolwednesday coolwednesday reopened this Dec 11, 2025
Copy link
Member

@Umang01-hash Umang01-hash left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Can we rename the file metrics_interface.go inside redis directory to mock_metrics.go.
  • We have a file config_test.go but in respect to that config.go file doesn;t exist. That violates Go naming conventions and makes code harder to discover.
  • The file pubsub_query_test.go contains only 2 tests for the Query() method, but pubsub_test.go already has query tests (testChannelQuery() and testStreamQuery() in getQueryTestCases()). We can delete pubsub_query_test.go and merge the 2 tests into pubsub_test.go.

"gofr.dev/pkg/gofr/datasource/pubsub/kafka"
"gofr.dev/pkg/gofr/datasource/pubsub/mqtt"
"gofr.dev/pkg/gofr/datasource/redis"
gofrRedis "gofr.dev/pkg/gofr/datasource/redis"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any specific reason for adding this alias??

// Use the embedded PubSub from the Redis client
if c.Redis != nil {
// Type assert to access PubSub field
if redisClient, ok := c.Redis.(*gofrRedis.Redis); ok && redisClient != nil && redisClient.PubSub != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are not using type assertion for initilalizing any other PubSub client then why do it for redis as it violates the existing architectural pattern. Maybe we can create separate redis.NewPubSub() constructor that returns pubsub.Client directly.

}
}

func TestGetRedisConfig_PubSubStreams_InvalidValues(t *testing.T) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A common thing that all test methods have is they need a mockLogger and mockConfig, so can we add a testGetRedisConfig(t, configMap) helper function that can take the map and directly return us *Config from getRedisConfig

)

const (
modePubSub = "pubsub"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where this constant is being used??

}

func (m *streamMessage) Commit() {
err := m.client.XAck(context.Background(), m.stream, m.group, m.id).Err()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using context.Background() here could cause the Commit operation to hang indefinitely if Redis is slow or unresponsive, consider adding a timeout context to prevent goroutine hangs.


// PubSub constants.
defaultRetryTimeout = 10 * time.Second
messageBufferSize = 100
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The message buffer size of 100 is too small for production workloads and will lead to frequent message drops under moderate load. Consider a typical scenario:

  • Message rate: 500 msg/sec per topic
  • Consumer processing time: 50ms per message
  • Buffer fills in: 0.2 seconds


hostname, _ := os.Hostname()

return fmt.Sprintf("consumer-%s-%d", hostname, time.Now().UnixNano())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The consumer name generation using time.Now().UnixNano() can produce duplicate values when called in rapid succession (within the same nanosecond), leading to consumer ID collisions in Redis Streams consumer groups.

}

if ps.parent.logger != nil {
traceID := span.SpanContext().TraceID().String()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

traceID is calculated but never used (it's passed to logPubSub() via span, not directly).

}

addr := fmt.Sprintf("%s:%d", ps.parent.config.HostName, ps.parent.config.Port)
res.Details["addr"] = sanitizeRedisAddr(addr)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sanitizeRedisAddr() function is designed to remove credentials from Redis connection URLs (e.g., redis://user:pass@host:port), but here we're passing a simple host:port string that never contains credentials.

require.NoError(t, err)

// Miniredis compatibility check
if len(res) == 0 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not have if else conditions in test? Please remvoe it and separate it to two test methods if requried.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Redis Pub Sub Support

4 participants