Skip to content

Commit 8f1e9e9

Browse files
authored
Add Decision Events Iterator implementation (#32)
<!-- Describe what has changed in this PR --> **What changed?** - Added `DecisionEventsIterator` class as an async iterator for processing workflow decision events - Decision Events Iterator worked with existing history_event_iterator for pagination. - Implemented `HistoryHelper` interface and `DecisionEvents` dataclass for workflow replay functionality <!-- Tell your future self why have you made these changes --> **Why?** We need support for iterating decision events for the task handler. This implementation is based on the Java Cadence client's DecisionEventsIterator pattern: - Java HistoryHelper: https://github.com/cadence-workflow/cadence-java-client/blob/0e1916eceee9bd73a9d41cca31c0d2afd413763e/src/main/java/com/uber/cadence/internal/replay/HistoryHelper.java <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> **How did you test it?** Unit tests <!-- Assuming the worst case, what can be broken when deploying this change to production? --> **Potential risks** <!-- Is it notable for release? e.g. schema updates, configuration or data migration required? If so, please mention it, and also update CHANGELOG.md --> **Release notes** <!-- Is there any documentation updates should be made for config, https://cadenceworkflow.io/docs/operation-guide/setup/ ? If so, please open an PR in https://github.com/cadence-workflow/cadence-docs --> **Documentation Changes** --------- Signed-off-by: Tim Li <[email protected]>
1 parent 884c64f commit 8f1e9e9

File tree

2 files changed

+606
-0
lines changed

2 files changed

+606
-0
lines changed
Lines changed: 264 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,264 @@
1+
#!/usr/bin/env python3
2+
"""
3+
Decision Events Iterator for Cadence workflow orchestration.
4+
5+
This module provides functionality to iterate through workflow history events,
6+
particularly focusing on decision-related events for replay and execution.
7+
"""
8+
9+
from dataclasses import dataclass, field
10+
from typing import List, Optional
11+
12+
from cadence.api.v1.history_pb2 import HistoryEvent
13+
from cadence.api.v1.service_worker_pb2 import PollForDecisionTaskResponse
14+
from cadence.client import Client
15+
from cadence._internal.workflow.history_event_iterator import iterate_history_events
16+
17+
18+
@dataclass
19+
class DecisionEvents:
20+
"""
21+
Represents events for a single decision iteration.
22+
"""
23+
events: List[HistoryEvent] = field(default_factory=list)
24+
markers: List[HistoryEvent] = field(default_factory=list)
25+
replay: bool = False
26+
replay_current_time_milliseconds: Optional[int] = None
27+
next_decision_event_id: Optional[int] = None
28+
29+
def get_events(self) -> List[HistoryEvent]:
30+
"""Return all events in this decision iteration."""
31+
return self.events
32+
33+
34+
def get_markers(self) -> List[HistoryEvent]:
35+
"""Return marker events."""
36+
return self.markers
37+
38+
def is_replay(self) -> bool:
39+
"""Check if this decision is in replay mode."""
40+
return self.replay
41+
42+
def get_event_by_id(self, event_id: int) -> Optional[HistoryEvent]:
43+
"""Retrieve a specific event by ID, returns None if not found."""
44+
for event in self.events:
45+
if hasattr(event, 'event_id') and event.event_id == event_id:
46+
return event
47+
return None
48+
49+
class DecisionEventsIterator:
50+
"""
51+
Iterator for processing decision events from workflow history.
52+
53+
This is the main class that processes workflow history events and groups them
54+
into decision iterations for proper workflow replay and execution.
55+
"""
56+
57+
def __init__(self, decision_task: PollForDecisionTaskResponse, client: Client):
58+
self._client = client
59+
self._decision_task = decision_task
60+
self._events: List[HistoryEvent] = []
61+
self._event_index = 0
62+
self._decision_task_started_event: Optional[HistoryEvent] = None
63+
self._next_decision_event_id = 1
64+
self._replay = True
65+
self._replay_current_time_milliseconds: Optional[int] = None
66+
self._initialized = False
67+
68+
@staticmethod
69+
def _is_decision_task_started(event: HistoryEvent) -> bool:
70+
"""Check if event is DecisionTaskStarted."""
71+
return (hasattr(event, 'decision_task_started_event_attributes') and
72+
event.HasField('decision_task_started_event_attributes'))
73+
74+
@staticmethod
75+
def _is_decision_task_completed(event: HistoryEvent) -> bool:
76+
"""Check if event is DecisionTaskCompleted."""
77+
return (hasattr(event, 'decision_task_completed_event_attributes') and
78+
event.HasField('decision_task_completed_event_attributes'))
79+
80+
@staticmethod
81+
def _is_decision_task_failed(event: HistoryEvent) -> bool:
82+
"""Check if event is DecisionTaskFailed."""
83+
return (hasattr(event, 'decision_task_failed_event_attributes') and
84+
event.HasField('decision_task_failed_event_attributes'))
85+
86+
@staticmethod
87+
def _is_decision_task_timed_out(event: HistoryEvent) -> bool:
88+
"""Check if event is DecisionTaskTimedOut."""
89+
return (hasattr(event, 'decision_task_timed_out_event_attributes') and
90+
event.HasField('decision_task_timed_out_event_attributes'))
91+
92+
@staticmethod
93+
def _is_marker_recorded(event: HistoryEvent) -> bool:
94+
"""Check if event is MarkerRecorded."""
95+
return (hasattr(event, 'marker_recorded_event_attributes') and
96+
event.HasField('marker_recorded_event_attributes'))
97+
98+
@staticmethod
99+
def _is_decision_task_completion(event: HistoryEvent) -> bool:
100+
"""Check if event is any kind of decision task completion."""
101+
return (DecisionEventsIterator._is_decision_task_completed(event) or
102+
DecisionEventsIterator._is_decision_task_failed(event) or
103+
DecisionEventsIterator._is_decision_task_timed_out(event))
104+
105+
async def _ensure_initialized(self):
106+
"""Initialize events list using the existing iterate_history_events."""
107+
if not self._initialized:
108+
# Use existing iterate_history_events function
109+
events_iterator = iterate_history_events(self._decision_task, self._client)
110+
self._events = [event async for event in events_iterator]
111+
self._initialized = True
112+
113+
# Find first decision task started event
114+
for i, event in enumerate(self._events):
115+
if self._is_decision_task_started(event):
116+
self._event_index = i
117+
break
118+
119+
async def has_next_decision_events(self) -> bool:
120+
"""Check if there are more decision events to process."""
121+
await self._ensure_initialized()
122+
123+
# Look for the next DecisionTaskStarted event from current position
124+
for i in range(self._event_index, len(self._events)):
125+
if self._is_decision_task_started(self._events[i]):
126+
return True
127+
128+
return False
129+
130+
async def next_decision_events(self) -> DecisionEvents:
131+
"""
132+
Get the next set of decision events.
133+
134+
This method processes events starting from a DecisionTaskStarted event
135+
until the corresponding DecisionTaskCompleted/Failed/TimedOut event.
136+
"""
137+
await self._ensure_initialized()
138+
139+
# Find next DecisionTaskStarted event
140+
start_index = None
141+
for i in range(self._event_index, len(self._events)):
142+
if self._is_decision_task_started(self._events[i]):
143+
start_index = i
144+
break
145+
146+
if start_index is None:
147+
raise StopIteration("No more decision events")
148+
149+
decision_events = DecisionEvents()
150+
decision_events.replay = self._replay
151+
decision_events.replay_current_time_milliseconds = self._replay_current_time_milliseconds
152+
decision_events.next_decision_event_id = self._next_decision_event_id
153+
154+
# Process DecisionTaskStarted event
155+
decision_task_started = self._events[start_index]
156+
self._decision_task_started_event = decision_task_started
157+
decision_events.events.append(decision_task_started)
158+
159+
# Update replay time if available
160+
if hasattr(decision_task_started, 'event_time') and decision_task_started.event_time:
161+
self._replay_current_time_milliseconds = getattr(
162+
decision_task_started.event_time, 'seconds', 0
163+
) * 1000
164+
decision_events.replay_current_time_milliseconds = self._replay_current_time_milliseconds
165+
166+
# Process subsequent events until we find the corresponding DecisionTask completion
167+
current_index = start_index + 1
168+
while current_index < len(self._events):
169+
event = self._events[current_index]
170+
decision_events.events.append(event)
171+
172+
# Categorize the event
173+
if self._is_marker_recorded(event):
174+
decision_events.markers.append(event)
175+
elif self._is_decision_task_completion(event):
176+
# This marks the end of this decision iteration
177+
self._process_decision_completion_event(event, decision_events)
178+
current_index += 1 # Move past this event
179+
break
180+
181+
current_index += 1
182+
183+
# Update the event index for next iteration
184+
self._event_index = current_index
185+
186+
# Update the next decision event ID
187+
if decision_events.events:
188+
last_event = decision_events.events[-1]
189+
if hasattr(last_event, 'event_id'):
190+
self._next_decision_event_id = last_event.event_id + 1
191+
192+
# Check if this is the last decision events
193+
# Set replay to false only if there are no more decision events after this one
194+
# Check directly without calling has_next_decision_events to avoid recursion
195+
has_more = False
196+
for i in range(self._event_index, len(self._events)):
197+
if self._is_decision_task_started(self._events[i]):
198+
has_more = True
199+
break
200+
201+
if not has_more:
202+
self._replay = False
203+
decision_events.replay = False
204+
205+
return decision_events
206+
207+
def _process_decision_completion_event(self, event: HistoryEvent, decision_events: DecisionEvents):
208+
"""Process the decision completion event and update state."""
209+
210+
# Check if we're still in replay mode
211+
# This is determined by comparing event IDs with the current decision task's started event ID
212+
if (self._decision_task_started_event and
213+
hasattr(self._decision_task_started_event, 'event_id') and
214+
hasattr(event, 'event_id')):
215+
216+
# If this completion event ID is >= the current decision task's started event ID,
217+
# we're no longer in replay mode
218+
current_task_started_id = getattr(
219+
self._decision_task.started_event_id, 'value', 0
220+
) if hasattr(self._decision_task, 'started_event_id') else 0
221+
222+
if event.event_id >= current_task_started_id:
223+
self._replay = False
224+
decision_events.replay = False
225+
226+
def get_replay_current_time_milliseconds(self) -> Optional[int]:
227+
"""Get the current replay time in milliseconds."""
228+
return self._replay_current_time_milliseconds
229+
230+
def is_replay_mode(self) -> bool:
231+
"""Check if the iterator is currently in replay mode."""
232+
return self._replay
233+
234+
def __aiter__(self):
235+
return self
236+
237+
async def __anext__(self) -> DecisionEvents:
238+
if not await self.has_next_decision_events():
239+
raise StopAsyncIteration
240+
return await self.next_decision_events()
241+
242+
243+
244+
245+
# Utility functions
246+
def is_decision_event(event: HistoryEvent) -> bool:
247+
"""Check if an event is a decision-related event."""
248+
return (DecisionEventsIterator._is_decision_task_started(event) or
249+
DecisionEventsIterator._is_decision_task_completed(event) or
250+
DecisionEventsIterator._is_decision_task_failed(event) or
251+
DecisionEventsIterator._is_decision_task_timed_out(event))
252+
253+
254+
def is_marker_event(event: HistoryEvent) -> bool:
255+
"""Check if an event is a marker event."""
256+
return DecisionEventsIterator._is_marker_recorded(event)
257+
258+
259+
def extract_event_timestamp_millis(event: HistoryEvent) -> Optional[int]:
260+
"""Extract timestamp from an event in milliseconds."""
261+
if hasattr(event, 'event_time') and event.HasField('event_time'):
262+
seconds = getattr(event.event_time, 'seconds', 0)
263+
return seconds * 1000 if seconds > 0 else None
264+
return None

0 commit comments

Comments
 (0)