Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 25 additions & 3 deletions v2/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ type ceClient struct {
eventDefaulterFns []EventDefaulter
pollGoroutines int
blockingCallback bool
parallelGoroutines int
ackMalformedEvent bool
}

Expand Down Expand Up @@ -238,6 +239,10 @@ func (c *ceClient) StartReceiver(ctx context.Context, fn interface{}) error {
wg.Add(1)
go func() {
defer wg.Done()
var parallel chan struct{}
if c.parallelGoroutines > 0 {
parallel = make(chan struct{}, c.parallelGoroutines)
}
for {
var msg binding.Message
var respFn protocol.ResponseFn
Expand Down Expand Up @@ -265,16 +270,33 @@ func (c *ceClient) StartReceiver(ctx context.Context, fn interface{}) error {
}
}

// if the blockingCallback option is set, we need to wait for the callback to finish
if c.blockingCallback {
// Wait for the callback to finish before receiving the next message.
callback()
} else {
// Do not block on the invoker.
continue
}

// if the parallelGoroutines option is set, we need to limit the number of goroutines
if parallel != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am i reading this right... we don't support blocking any more?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the value of WithParallelGoroutines to 1, which can achieve the same effect. Moreover, it will be more versatile.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it may not matter but you're blocking at a different place now... if I'm reading this right.
Before it was: read -> block while processing
Now it's: read -> func(process)() -> read -> wait for a free spot to open in the channel
Right? This means that we're getting the next event BEFORE we're ready, where before we could only get one AFTER we were ready. Not sure it matters, but something to think about.

Also, we need testcases.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before it was: read -> block while processing
Now it's: read -> func(process)() -> read -> wait for a free spot to open in the channel

You're right, indeed. Given this situation, should we maintain the original approach or treat it as a breaking change?
I'd like to confirm our final approach so I can make some adjustments accordingly.

PS: I'll supplement the unit tests later.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it depends on whether calling:

  • c.responder.Respond(ctx)
  • c.receiver.Receive(ctx)

before we're ready is risky or not. For example, if we get the next "msg" and then pause, and this entire for-loop stops. If another process tries to continue with the same responser/receiver, that one "msg" will be lost.

Abstractly it sounds like a bug. But in practice I don't know if the idea of continuing to get messages outside of the instance of this for-loop makes any sense. E.g. if we're pulling messages from a queue during the "receiver.Receive()" call, then I think this situation might occur.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can make assumptions based on the logic of WithPollGoroutines. When there are multiple asynchronous fetches occurring simultaneously, it essentially means the scenario you just described is happening.

Can I understand it this way: event messages, when distributed across different processes (goroutines or others), inherently have the potential to encounter such situations? At the very least, the message currently being read would appear as "lost" or "non-existent" to other processes (goroutines or others).

Therefore, I believe this scenario might not be a cause for concern.

Of course, there is one extreme case where the user specifically desires to read messages "sequentially," meaning no other processes (goroutines or others) are allowed to handle the "next" message additionally.

However, I think this situation is similar to using blockingCallback, where the user intentionally makes such a choice. When this happens, it implies there is no scenario where another process (goroutine or other) is missing a message still waiting for the channel to become available.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couple of things come to mind:
1 - I make a distinction between "lost a message because the processor of that message crashed", and "lost a message because it was pulled it from a queue and purposely never even tried to be processed". The former is kind of expected. The latter is a bug IMO. While the latter isn't 100% "purposely dropped it"... the logic comes close since we don't take into account the situation where this entire go process just dies while that message is "on hold". The queue it's being pulled from should be where it's "on hold".

2 - if we're pulling messages from a queue then this "on hold" message could be processed before the next message if there's another processor pulling messages from that queue. Now, I'm not suggesting that it's our job to try to synchronous things in such a scenario, however, I think someone would find it really odd that this "on hold" message might be delayed significantly before it's even started to be processed while the next message might be processed immediately.

3 - "this situation is similar to using blockingCallback"... exactly. We don't support "blocking" any more. At least not with the same "when are things pulled off of the original queue" semantics.

I'm curious to know @embano1's thoughts, but I'm leaning towards a "only pull the next message when we're ready" model, otherwise it kind of feels like a potential bug or we're introducing another queue into the flow... granted a queue of size one, but still a queue, that has zero persistence/resiliency to it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're right. We need to maintain predictability for end users regarding events. Hiding certain event-related operations might make users feel things are beyond their control.

Perhaps we could slightly adjust the channel position—that might preserve the original logic. Of course, this is just my initial thought—I'll look into it more carefully when I have time to confirm.

wg.Add(1)
parallel <- struct{}{}
go func() {
defer wg.Done()
defer func() {
<-parallel
wg.Done()
}()
callback()
}()
continue
}

// otherwise, we can just call the callback directly in a new goroutine
wg.Add(1)
go func() {
defer wg.Done()
callback()
}()
}
}()
}
Expand Down
18 changes: 18 additions & 0 deletions v2/client/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package client

import (
"context"
"errors"
"fmt"

"github.com/cloudevents/sdk-go/v2/binding"
Expand Down Expand Up @@ -127,6 +128,23 @@ func WithBlockingCallback() Option {
}
}

// WithParallelGoroutines enables the callback function of the passed-in StartReceiver to execute asynchronously
// with a fixed number of goroutines.
// WithBlockingCallback takes precedence over this configuration.
func WithParallelGoroutines(num int) Option {
return func(i interface{}) error {
if num <= 0 {
return errors.New("number of parallel goroutines must be greater than 0")
}

if c, ok := i.(*ceClient); ok {
c.parallelGoroutines = num
}

return nil
}
}

// WithAckMalformedevents causes malformed events received within StartReceiver to be acknowledged
// rather than being permanently not-acknowledged. This can be useful when a protocol does not
// provide a responder implementation and would otherwise cause the receiver to be partially or
Expand Down