Skip to content

Commit 2a8da1d

Browse files
committed
actor: introduce generic Mailbox interface with iter.Seq support
This commit introduces a new Mailbox interface that abstracts the message queue implementation for actors. Previously, actors used a direct channel for their mailbox, which limited flexibility and made it difficult to implement alternative mailbox strategies. The new Mailbox interface provides methods for sending, receiving, and draining messages, with full context support for cancellation. The Receive method leverages Go 1.23's iter.Seq pattern, providing a clean iterator-based API that allows natural for-range loops over messages. The ChannelMailbox implementation maintains the existing channel-based behavior while conforming to the new interface. It stores the actor's context internally, ensuring both caller and actor contexts are properly respected during send and receive operations. This simplifies context handling compared to complex context merging approaches. This abstraction enables future implementations such as priority mailboxes, persistent mailboxes, or bounded mailboxes with overflow strategies, without requiring changes to the actor implementation.
1 parent bca5ca0 commit 2a8da1d

File tree

1 file changed

+176
-0
lines changed

1 file changed

+176
-0
lines changed

actor/mailbox.go

Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
package actor
2+
3+
import (
4+
"context"
5+
"iter"
6+
"sync"
7+
"sync/atomic"
8+
)
9+
10+
// Mailbox represents the message queue for an actor. It provides methods for
11+
// sending messages and receiving them via an iterator pattern.
12+
type Mailbox[M Message, R any] interface {
13+
// Send attempts to send an envelope to the mailbox with context-based
14+
// cancellation. Returns true if sent successfully, false if the
15+
// context was cancelled or the mailbox is closed.
16+
Send(ctx context.Context, env envelope[M, R]) bool
17+
18+
// TrySend attempts to send without blocking. Returns true if the
19+
// envelope was sent, false if the mailbox is full or closed.
20+
TrySend(env envelope[M, R]) bool
21+
22+
// Receive returns an iterator for consuming messages from the mailbox.
23+
// The iterator will yield messages until the mailbox is closed or the
24+
// context is cancelled.
25+
Receive(ctx context.Context) iter.Seq[envelope[M, R]]
26+
27+
// Close closes the mailbox, preventing new messages from being sent.
28+
// Any remaining messages can still be consumed via Receive.
29+
Close()
30+
31+
// IsClosed returns true if the mailbox has been closed.
32+
IsClosed() bool
33+
34+
// Drain returns an iterator that yields all remaining messages in the
35+
// mailbox after it has been closed. This is useful for cleanup.
36+
Drain() iter.Seq[envelope[M, R]]
37+
}
38+
39+
// ChannelMailbox is a channel-based implementation of the Mailbox interface.
40+
type ChannelMailbox[M Message, R any] struct {
41+
ch chan envelope[M, R]
42+
closed atomic.Bool
43+
44+
// mu protects Send/TrySend operations to prevent send-on-closed-channel
45+
// panics. Close() acquires write lock, Send/TrySend acquire read lock.
46+
mu sync.RWMutex
47+
48+
// closeOnce ensures Close() executes exactly once.
49+
closeOnce sync.Once
50+
51+
// actorCtx is the actor's context for lifecycle management.
52+
actorCtx context.Context
53+
}
54+
55+
// NewChannelMailbox creates a new channel-based mailbox with the specified
56+
// buffer capacity and actor context.
57+
func NewChannelMailbox[M Message, R any](actorCtx context.Context,
58+
capacity int) *ChannelMailbox[M, R] {
59+
60+
if capacity <= 0 {
61+
capacity = 1
62+
}
63+
return &ChannelMailbox[M, R]{
64+
ch: make(chan envelope[M, R], capacity),
65+
actorCtx: actorCtx,
66+
}
67+
}
68+
69+
// Send implements Mailbox.Send with context-aware blocking send.
70+
func (m *ChannelMailbox[M, R]) Send(ctx context.Context,
71+
env envelope[M, R]) bool {
72+
73+
m.mu.RLock()
74+
defer m.mu.RUnlock()
75+
76+
if m.IsClosed() {
77+
return false
78+
}
79+
80+
select {
81+
case m.ch <- env:
82+
return true
83+
case <-ctx.Done():
84+
return false
85+
case <-m.actorCtx.Done():
86+
// Actor is shutting down.
87+
return false
88+
}
89+
}
90+
91+
// TrySend implements Mailbox.TrySend with non-blocking send.
92+
func (m *ChannelMailbox[M, R]) TrySend(env envelope[M, R]) bool {
93+
m.mu.RLock()
94+
defer m.mu.RUnlock()
95+
96+
if m.IsClosed() {
97+
return false
98+
}
99+
100+
select {
101+
case m.ch <- env:
102+
return true
103+
default:
104+
return false
105+
}
106+
}
107+
108+
// Receive implements Mailbox.Receive using iter.Seq pattern.
109+
func (m *ChannelMailbox[M, R]) Receive(
110+
ctx context.Context) iter.Seq[envelope[M, R]] {
111+
return func(yield func(envelope[M, R]) bool) {
112+
for {
113+
select {
114+
case env, ok := <-m.ch:
115+
if !ok {
116+
return
117+
}
118+
119+
if !yield(env) {
120+
return
121+
}
122+
123+
case <-ctx.Done():
124+
return
125+
126+
case <-m.actorCtx.Done():
127+
return
128+
}
129+
}
130+
}
131+
}
132+
133+
// Close implements Mailbox.Close.
134+
func (m *ChannelMailbox[M, R]) Close() {
135+
m.closeOnce.Do(func() {
136+
m.mu.Lock()
137+
defer m.mu.Unlock()
138+
139+
m.closed.Store(true)
140+
141+
close(m.ch)
142+
})
143+
}
144+
145+
// IsClosed implements Mailbox.IsClosed.
146+
func (m *ChannelMailbox[M, R]) IsClosed() bool {
147+
return m.closed.Load()
148+
}
149+
150+
// Drain implements Mailbox.Drain for cleanup after close.
151+
func (m *ChannelMailbox[M, R]) Drain() iter.Seq[envelope[M, R]] {
152+
return func(yield func(envelope[M, R]) bool) {
153+
// Only drain if closed.
154+
if !m.IsClosed() {
155+
return
156+
}
157+
158+
// Drain all remaining messages from the channel.
159+
for {
160+
select {
161+
case env, ok := <-m.ch:
162+
// Channel closed, nothing left to drain.
163+
if !ok {
164+
return
165+
}
166+
167+
if !yield(env) {
168+
return
169+
}
170+
default:
171+
// Channel empty, done draining.
172+
return
173+
}
174+
}
175+
}
176+
}

0 commit comments

Comments
 (0)