|
| 1 | +package actor |
| 2 | + |
| 3 | +import ( |
| 4 | + "context" |
| 5 | + "errors" |
| 6 | + "sync/atomic" |
| 7 | + |
| 8 | + "github.com/lightningnetwork/lnd/fn/v2" |
| 9 | +) |
| 10 | + |
| 11 | +// ErrNoActorsAvailable is returned when a router cannot find any actors |
| 12 | +// registered for its service key to forward a message to. |
| 13 | +var ErrNoActorsAvailable = errors.New("no actors available for service key") |
| 14 | + |
| 15 | +// RoutingStrategy defines the interface for selecting an actor from a list of |
| 16 | +// available actors. |
| 17 | +// The M (Message) and R (Response) type parameters ensure that the strategy |
| 18 | +// is compatible with the types of actors it will be selecting. |
| 19 | +type RoutingStrategy[M Message, R any] interface { |
| 20 | + // Select chooses an ActorRef from the provided slice. It returns the |
| 21 | + // selected actor or an error if no actor can be selected (e.g., if the |
| 22 | + // list is empty or another strategy-specific issue occurs). |
| 23 | + Select(refs []ActorRef[M, R]) (ActorRef[M, R], error) |
| 24 | +} |
| 25 | + |
| 26 | +// RoundRobinStrategy implements a round-robin selection strategy. It is generic |
| 27 | +// over M and R to match the RoutingStrategy interface, though its logic doesn't |
| 28 | +// depend on these types directly for the selection mechanism itself. |
| 29 | +type RoundRobinStrategy[M Message, R any] struct { |
| 30 | + // index is used to pick the next actor in a round-robin fashion. It |
| 31 | + // must be accessed atomically to ensure thread-safety if multiple |
| 32 | + // goroutines use the same strategy instance (which they will via the |
| 33 | + // router). |
| 34 | + index uint64 |
| 35 | +} |
| 36 | + |
| 37 | +// NewRoundRobinStrategy creates a new RoundRobinStrategy, initialized for |
| 38 | +// round-robin selection. |
| 39 | +func NewRoundRobinStrategy[M Message, R any]() *RoundRobinStrategy[M, R] { |
| 40 | + return &RoundRobinStrategy[M, R]{} |
| 41 | +} |
| 42 | + |
| 43 | +// Select picks an actor from the list using a round-robin algorithm. |
| 44 | +func (s *RoundRobinStrategy[M, R]) Select(refs []ActorRef[M, R]) (ActorRef[M, R], error) { |
| 45 | + if len(refs) == 0 { |
| 46 | + return nil, ErrNoActorsAvailable |
| 47 | + } |
| 48 | + |
| 49 | + // Atomically increment and get the current index for selection. |
| 50 | + // We subtract 1 because AddUint64 returns the new value (which is |
| 51 | + // 1-based for the first call after initialization to 0), and slice |
| 52 | + // indexing is 0-based. |
| 53 | + idx := atomic.AddUint64(&s.index, 1) - 1 |
| 54 | + selectedRef := refs[idx%uint64(len(refs))] |
| 55 | + |
| 56 | + return selectedRef, nil |
| 57 | +} |
| 58 | + |
| 59 | +// Router is a message-dispatching component that fronts multiple actors |
| 60 | +// registered under a specific ServiceKey. It uses a RoutingStrategy to |
| 61 | +// distribute messages to one of the available actors. It is generic over M |
| 62 | +// (Message type) and R (Response type) to match the actors it routes to. |
| 63 | +type Router[M Message, R any] struct { |
| 64 | + receptionist *Receptionist |
| 65 | + serviceKey ServiceKey[M, R] |
| 66 | + strategy RoutingStrategy[M, R] |
| 67 | + dlo ActorRef[Message, any] // Dead Letter Office reference. |
| 68 | +} |
| 69 | + |
| 70 | +// NewRouter creates a new Router for a given service key and strategy. The |
| 71 | +// receptionist is used to discover actors registered with the service key. |
| 72 | +// The router itself is not an actor but a message dispatcher that behaves like |
| 73 | +// an ActorRef from the sender's perspective. |
| 74 | +func NewRouter[M Message, R any](receptionist *Receptionist, |
| 75 | + key ServiceKey[M, R], strategy RoutingStrategy[M, R], |
| 76 | + dlo ActorRef[Message, any]) *Router[M, R] { |
| 77 | + |
| 78 | + return &Router[M, R]{ |
| 79 | + receptionist: receptionist, |
| 80 | + serviceKey: key, |
| 81 | + strategy: strategy, |
| 82 | + dlo: dlo, |
| 83 | + } |
| 84 | +} |
| 85 | + |
| 86 | +// getActor dynamically finds available actors for the service key and selects |
| 87 | +// one using the configured strategy. This method is called internally by Tell |
| 88 | +// and Ask on each invocation to ensure up-to-date actor discovery. |
| 89 | +func (r *Router[M, R]) getActor() (ActorRef[M, R], error) { |
| 90 | + // Discover available actors from the receptionist. |
| 91 | + availableActors := FindInReceptionist(r.receptionist, r.serviceKey) |
| 92 | + if len(availableActors) == 0 { |
| 93 | + return nil, ErrNoActorsAvailable |
| 94 | + } |
| 95 | + |
| 96 | + // Select one actor using the strategy. |
| 97 | + return r.strategy.Select(availableActors) |
| 98 | +} |
| 99 | + |
| 100 | +// Tell sends a message to one of the actors managed by the router, selected by |
| 101 | +// the routing strategy. If no actors are available or the send context is |
| 102 | +// cancelled before the message can be enqueued in the target actor's mailbox, |
| 103 | +// the message may be dropped. Errors during actor selection (e.g., |
| 104 | +// ErrNoActorsAvailable) are currently not propagated from Tell, aligning with |
| 105 | +// its fire-and-forget nature. Such errors could be logged internally if needed. |
| 106 | +func (r *Router[M, R]) Tell(ctx context.Context, msg M) { |
| 107 | + selectedActor, err := r.getActor() |
| 108 | + if err != nil { |
| 109 | + // If no actors are available for the service, and a DLO is |
| 110 | + // configured, forward the message there. |
| 111 | + if errors.Is(err, ErrNoActorsAvailable) && r.dlo != nil { |
| 112 | + r.dlo.Tell(context.Background(), msg) |
| 113 | + } |
| 114 | + return |
| 115 | + } |
| 116 | + |
| 117 | + selectedActor.Tell(ctx, msg) |
| 118 | +} |
| 119 | + |
| 120 | +// Ask sends a message to one of the actors managed by the router, selected by |
| 121 | +// the routing strategy, and returns a Future for the response. If no actors are |
| 122 | +// available (ErrNoActorsAvailable), the Future will be completed with this |
| 123 | +// error. If the send context is cancelled before the message can be enqueued in |
| 124 | +// the chosen actor's mailbox, the Future will be completed with the context's error. |
| 125 | +func (r *Router[M, R]) Ask(ctx context.Context, msg M) Future[R] { |
| 126 | + selectedActor, err := r.getActor() |
| 127 | + if err != nil { |
| 128 | + // If no actor could be selected (e.g., none available), |
| 129 | + // complete the promise immediately with the selection error. |
| 130 | + promise := NewPromise[R]() |
| 131 | + promise.Complete(fn.Err[R](err)) |
| 132 | + return promise.Future() |
| 133 | + } |
| 134 | + |
| 135 | + return selectedActor.Ask(ctx, msg) |
| 136 | +} |
| 137 | + |
| 138 | +// ID provides an identifier for the router. Since a router isn't an actor |
| 139 | +// itself but a dispatcher for a service, its ID can be based on the service |
| 140 | +// key. |
| 141 | +func (r *Router[M, R]) ID() string { |
| 142 | + return "router(" + r.serviceKey.name + ")" |
| 143 | +} |
0 commit comments