Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,76 @@ Getting started guide: https://watermill.io/docs/getting-started/

Issues: https://github.com/ThreeDotsLabs/watermill/issues

## Message consumption models

The library, complies with the Subscriber interface:

```go
type Subscriber interface {
Subscribe(ctx context.Context, topic string) (<-chan *Message, error)
Close() error
}
```

However, it implements several consumption models:
- one in-flight message (default)
- batch consumption
- partition concurrent

### One in-flight message (default)

This is the **default message consumption model**. In this model, when the subscription channel returns a message, it will not return another one until that message is ACKed or NACKed.

When a message is ACKed, the next one (if any), will be pushed to the channel and the partition offset will be updated if there is a consumer group session.

This mode has the advantage of being simple and easily ensuring ordering.

### Batch consumption

While the default model is simple to understand and safe, sometimes, a greater degree of parallelism is required. For example:
- if you use a partitioner based on the key of the messages, you can expect a partial order, that is, the messages overall are not sorted, but they are sorted within the same partition. In that case, you can potentially process multiple messages and the default can fall short
- for some reason you do not care about the order

In this model, the customer can configure a `maxBatchSize` and `maxWaitTime`. The subscriber will wait until there are `maxBatchSize` messages ready or `maxWaitTime` is ellapsed.

It will, then introduce those messages on the subscription channel. That means that a consumer can now get multiple messages without having to ACK / NACK the previously received ones.

This model deals with ACKs and NACKs properly by resetting the offset of the different (topics, partitions) tuples to the last
message ACKed before a NACK for that (topic, partition) arrived.

Some examples:
- all messages ACKed: offset of the latest message is marked as done
- first message ACKed and second NACKed: offset for the first message is marked as done and second message is resent

To configure it:

```go
kafka.SubscriberConfig{
// ... other settings here
ConsumerModel: kafka.Default,
BatchConsumerConfig: &kafka.BatchConsumerConfig{
MaxBatchSize: 10,
MaxWaitTime: 100 * time.Millisecond,
},
}

```

### Partition Concurrent

Partition concurrent works similar to the default consumption model. The main difference is that it allows up to N in-flight models, where N is the number of partitions.
This allows higher concurrency of processing while easily preserving order.

To configure it:

```go
kafka.SubscriberConfig{
// ... other settings here
ConsumerModel: kafka.PartitionConcurrent,
}

```

## Contributing

All contributions are very much welcome. If you'd like to help with Watermill development,
Expand Down
7 changes: 4 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,10 @@ require (
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
go.opentelemetry.io/otel v1.29.0 // indirect
go.opentelemetry.io/otel/metric v1.29.0 // indirect
go.opentelemetry.io/otel/trace v1.29.0 // indirect
github.com/stretchr/objx v0.5.2 // indirect
go.opentelemetry.io/otel v1.19.0 // indirect
go.opentelemetry.io/otel/metric v1.19.0 // indirect
go.opentelemetry.io/otel/trace v1.19.0 // indirect
golang.org/x/crypto v0.26.0 // indirect
golang.org/x/net v0.28.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
18 changes: 10 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
Expand Down Expand Up @@ -72,19 +72,21 @@ github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/f
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
go.opentelemetry.io/otel v1.29.0 h1:PdomN/Al4q/lN6iBJEN3AwPvUiHPMlt93c8bqTG5Llw=
go.opentelemetry.io/otel v1.29.0/go.mod h1:N/WtXPs1CNCUEx+Agz5uouwCba+i+bJGFicT8SR4NP8=
go.opentelemetry.io/otel/metric v1.29.0 h1:vPf/HFWTNkPu1aYeIsc98l4ktOQaL6LeSoeV2g+8YLc=
go.opentelemetry.io/otel/metric v1.29.0/go.mod h1:auu/QWieFVWx+DmQOUMgj0F8LHWdgalxXqvp7BII/W8=
go.opentelemetry.io/otel/trace v1.29.0 h1:J/8ZNK4XgR7a21DZUAsbF8pZ5Jcw1VhACmnYt39JTi4=
go.opentelemetry.io/otel/trace v1.29.0/go.mod h1:eHl3w0sp3paPkYstJOmAimxhiFXPg+MMTlEh3nsQgWQ=
go.opentelemetry.io/otel v1.19.0 h1:MuS/TNf4/j4IXsZuJegVzI1cwut7Qc00344rgH7p8bs=
go.opentelemetry.io/otel v1.19.0/go.mod h1:i0QyjOq3UPoTzff0PJB2N66fb4S0+rSbSB15/oyH9fY=
go.opentelemetry.io/otel/metric v1.19.0 h1:aTzpGtV0ar9wlV4Sna9sdJyII5jTVJEvKETPiOKwvpE=
go.opentelemetry.io/otel/metric v1.19.0/go.mod h1:L5rUsV9kM1IxCj1MmSdS+JQAcVm319EUrDVLrt7jqt8=
go.opentelemetry.io/otel/trace v1.19.0 h1:DFVQmlVbfVeOuBRrwdtaehRrWiL1JoVs9CPIQ1Dzxpg=
go.opentelemetry.io/otel/trace v1.19.0/go.mod h1:mfaSyvGyEJEI0nyV2I4qhNQnbBOUUmYZpYojqMnX2vo=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58=
Expand Down
192 changes: 192 additions & 0 deletions pkg/kafka/batch_message_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
package kafka

import (
"context"
"fmt"
"time"

"github.com/IBM/sarama"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
)

// batchedMessageHandler works by fetching up to N messages from the provided channel
// waiting until maxWaitTime.
// It then takes the collected KafkaMessages and pushes them in order to outputChannel.
// When all have been ACKed or NACKed, it updates the offsets with the highest ACKed
// for each involved partition.
type batchedMessageHandler struct {
outputChannel chan<- *message.Message

maxBatchSize int16
maxWaitTime time.Duration

nackResendSleep time.Duration

logger watermill.LoggerAdapter
closing chan struct{}
messageParser messageParser
messages chan *messageHolder
}

func NewBatchedMessageHandler(
outputChannel chan<- *message.Message,
unmarshaler Unmarshaler,
logger watermill.LoggerAdapter,
closing chan struct{},
maxBatchSize int16,
maxWaitTime time.Duration,
nackResendSleep time.Duration,
) MessageHandler {
handler := &batchedMessageHandler{
outputChannel: outputChannel,
maxBatchSize: maxBatchSize,
maxWaitTime: maxWaitTime,
nackResendSleep: nackResendSleep,
closing: closing,
logger: logger,
messageParser: messageParser{
unmarshaler: unmarshaler,
},
messages: make(chan *messageHolder),
}
go handler.startProcessing()
return handler
}

func (h *batchedMessageHandler) startProcessing() {
buffer := make([]*messageHolder, 0, h.maxBatchSize)
mustSleep := h.nackResendSleep != NoSleep
logFields := watermill.LogFields{}
sendDeadline := time.Now().Add(h.maxWaitTime)
timer := time.NewTimer(h.maxWaitTime)
for {
timerExpired := false
select {
case message, ok := <-h.messages:
if !ok {
h.logger.Debug("Messages channel is closed", logFields)
}
buffer = append(buffer, message)
case <-timer.C:
if len(buffer) > 0 {
h.logger.Trace("Timer expired, sending already fetched messages.", logFields)
}
timerExpired = true
case <-h.closing:
h.logger.Debug("Subscriber is closing, stopping messageHandler", logFields)
return
}
size := len(buffer)
if timerExpired || size == int(h.maxBatchSize) {
if size > 0 {
newBuffer, err := h.processBatch(buffer)
if err != nil {
return
}
if newBuffer == nil {
return
}
buffer = newBuffer
// if there are messages in the buffer, it means there was NACKs, so we wait
if len(buffer) > 0 && mustSleep {
time.Sleep(h.nackResendSleep)
}
}
sendDeadline = time.Now().Add(h.maxWaitTime)
timerExpired = false
}
timer.Reset(time.Until(sendDeadline))
}
}

func (h *batchedMessageHandler) ProcessMessages(
ctx context.Context,
kafkaMessages <-chan *sarama.ConsumerMessage,
sess sarama.ConsumerGroupSession,
logFields watermill.LogFields,
) error {
for {
select {
case kafkaMsg := <-kafkaMessages:
if kafkaMsg == nil {
h.logger.Debug("kafkaMsg is closed, stopping ProcessMessages", logFields)
return nil
}
msg, err := h.messageParser.prepareAndProcessMessage(ctx, kafkaMsg, h.logger, logFields, sess)
if err != nil {
return err
}
h.messages <- msg
case <-h.closing:
h.logger.Debug("Subscriber is closing, stopping messageHandler", logFields)
return nil
case <-ctx.Done():
h.logger.Debug("Ctx was cancelled, stopping messageHandler", logFields)
return nil
}
}
}

func (h *batchedMessageHandler) processBatch(
buffer []*messageHolder,
) ([]*messageHolder, error) {
waitChannels := make([]<-chan bool, 0, len(buffer))
for _, msgHolder := range buffer {
ctx, cancelCtx := context.WithCancel(msgHolder.message.Context())
msgHolder.message.SetContext(ctx)
select {
case h.outputChannel <- msgHolder.message:
h.logger.Trace("Message sent to consumer", msgHolder.logFields)
waitChannels = append(waitChannels, waitForMessage(ctx, h.logger, msgHolder, cancelCtx))
case <-h.closing:
h.logger.Trace("Closing, message discarded", msgHolder.logFields)
defer cancelCtx()
return nil, nil
case <-ctx.Done():
h.logger.Trace("Closing, ctx cancelled before message was sent to consumer", msgHolder.logFields)
defer cancelCtx()
return nil, nil
}
}

// we wait for all the messages to be ACKed or NACKed
// and we store for each partition the last message that was ACKed so we
// can mark the latest complete offset
lastComittableMessages := make(map[string]*messageHolder, 0)
nackedPartitions := make(map[string]struct{})
newBuffer := make([]*messageHolder, 0, h.maxBatchSize)
for idx, waitChannel := range waitChannels {
msgHolder := buffer[idx]
h.logger.Trace("Waiting for message to be acked", msgHolder.logFields)
ack, ok := <-waitChannel
h.logger.Info("Received ACK / NACK response or closed", msgHolder.logFields)
// it was aborted
if !ok {
h.logger.Info("Returning as messages were closed", msgHolder.logFields)
return nil, nil
}
topicAndPartition := fmt.Sprintf("%s-%d", msgHolder.kafkaMessage.Topic, msgHolder.kafkaMessage.Partition)
_, partitionNacked := nackedPartitions[topicAndPartition]
if !ack || partitionNacked {
newBuffer = append(newBuffer, msgHolder.Copy())
nackedPartitions[topicAndPartition] = struct{}{}
continue
}
if !partitionNacked && ack {
lastComittableMessages[topicAndPartition] = msgHolder
}
}

// If a session is provided, we mark the latest committable message for
// each partition as done. This is required, because if we did not mark anything we might re-process
// messages unnecessarily. If we marked the latest in the bulk, we could lose NACKed messages.
for _, lastComittable := range lastComittableMessages {
if lastComittable.sess != nil {
h.logger.Trace("Marking offset as complete for", lastComittable.logFields)
lastComittable.sess.MarkMessage(lastComittable.kafkaMessage, "")
}
}

return newBuffer, nil
}
73 changes: 73 additions & 0 deletions pkg/kafka/batch_message_handler_bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package kafka

import (
"context"
"testing"
"time"

"github.com/IBM/sarama"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func BenchmarkBatchMessageHandler(b *testing.B) {
b.Run("consumer group session is provided", func(b *testing.B) {
benchmarkBatchMessageHandler(b, true)
})

b.Run("no consumer group session", func(b *testing.B) {
benchmarkBatchMessageHandler(b, true)
})
}

func benchmarkBatchMessageHandler(b *testing.B, hasConsumerGroup bool) {
testConfig := testConfig{
batchWait: 100 * time.Millisecond,
maxBatchSize: 10,
hasConsumerGroup: hasConsumerGroup,
hasCountingConsumerGroup: false,
}

testBenchmark(b, testConfig, testBatchConsumption)
}

func testBenchmark(
b *testing.B,
testConfig testConfig,
buildHandler func(testConfig) (chan<- struct{}, chan *message.Message, MessageHandler),
) {
messagesInTest := 10_000
sess, _ := consumerGroupSession(testConfig)
kafkaMessages := make(chan *sarama.ConsumerMessage, messagesInTest)
defer close(kafkaMessages)
messagesToSend := make([]*sarama.ConsumerMessage, 0, messagesInTest)
for i := 0; i < messagesInTest; i++ {
msg := generateMessage("topic1", i%5, i/5)
messagesToSend = append(messagesToSend, msg)
kafkaMessages <- msg
}
closing, outputChannel, handler := buildHandler(
testConfig,
)
defer close(closing)
defer close(outputChannel)
go func() {
err := handler.ProcessMessages(context.Background(), kafkaMessages, sess, watermill.LogFields{})
assert.NoError(b, err)
}()
receivedMessages := make([]*message.Message, 0, messagesInTest)
for {
select {

Check failure on line 62 in pkg/kafka/batch_message_handler_bench_test.go

View workflow job for this annotation

GitHub Actions / ci / lint (/home/runner/work/watermill-kafka/watermill-kafka)

S1000: should use a simple channel send/receive instead of `select` with a single case (gosimple)
case msg, ok := <-outputChannel:
require.True(b, ok, "channel closed earlier than expected")
receivedMessages = append(receivedMessages, msg)
msg.Ack()
}
if len(receivedMessages) == messagesInTest {
break
}
}
testSameMessagesAndLocalOrder(b, receivedMessages, messagesToSend)
}
Loading
Loading