Skip to content

Conversation

chris576
Copy link

@chris576 chris576 commented Sep 16, 2025

Motivation / Background

We use Watermill with Redis to enable communication between our NestJS backend and the system drivers running on an embedded Raspberry Pi–based system.

We chose the PUBLISH/SUBSCRIBE mechanism instead of XADD with Streams for two main reasons:

  1. It is simpler to implement on the backend side.
  2. Messages are not buffered or persisted, which helps keep memory usage low.

The second point is crucial in our setup: Redis receives on average one message per second. Persisting messages would quickly increase memory usage to unsustainable levels and could overwhelm the system.


Details

To make the implementation more explicit and easier to understand, I introduced two separate packages:

  • redisstream
  • redispubsub

Both packages implement the same file structure, structs, methods, and tests.

The redispubsub package provides a Subscriber and Publisher based directly on Redis Subscribe and Publish. Since Redis Pub/Sub does not persist messages, we need to ensure that the subscriber is actively listening to the channel before returning. Otherwise, messages could be lost.

The key logic for this is in subscriber.go (around line 121):

ready := make(chan struct{})
// Ensure the goroutine is started before returning
go func() {
    defer close(consumeClosed)

    s.logger.Debug("Start listening to subscription channel.", logFields)
    pubsub := s.rc.Subscribe(ctx, channel)
    ch := pubsub.Channel()
    s.logger.Debug("Subscribed to Redis channel", logFields.Add(watermill.LogFields{"channel": channel}))
    close(ready)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant