Skip to content

Commit 1123f7b

Browse files
authored
fix(workflow worker): move event iterator out of WorkflowEngine (#47)
<!-- Describe what has changed in this PR --> **What changed?** * move out event iteration logic * move dataconverter and events into WorkflowInfo <!-- Tell your future self why have you made these changes --> **Why?** WorkflowEngine should be threadsafe and executed in its own thread; iterator has grpc async client which is not threadsafe. <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> **How did you test it?** Unit Test <!-- Assuming the worst case, what can be broken when deploying this change to production? --> **Potential risks** <!-- Is it notable for release? e.g. schema updates, configuration or data migration required? If so, please mention it, and also update CHANGELOG.md --> **Release notes** <!-- Is there any documentation updates should be made for config, https://cadenceworkflow.io/docs/operation-guide/setup/ ? If so, please open an PR in https://github.com/cadence-workflow/cadence-docs --> **Documentation Changes** --------- Signed-off-by: Shijie Sheng <[email protected]>
1 parent 9410b26 commit 1123f7b

File tree

9 files changed

+140
-203
lines changed

9 files changed

+140
-203
lines changed

cadence/_internal/workflow/context.py

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,20 +7,17 @@
77
from cadence.api.v1.common_pb2 import ActivityType
88
from cadence.api.v1.decision_pb2 import ScheduleActivityTaskDecisionAttributes
99
from cadence.api.v1.tasklist_pb2 import TaskList, TaskListKind
10-
from cadence.client import Client
1110
from cadence.data_converter import DataConverter
1211
from cadence.workflow import WorkflowContext, WorkflowInfo, ResultType, ActivityOptions
1312

1413

1514
class Context(WorkflowContext):
1615
def __init__(
1716
self,
18-
client: Client,
1917
info: WorkflowInfo,
2018
decision_helper: DecisionsHelper,
2119
decision_manager: DecisionManager,
2220
):
23-
self._client = client
2421
self._info = info
2522
self._replay_mode = True
2623
self._replay_current_time_milliseconds: Optional[int] = None
@@ -30,11 +27,8 @@ def __init__(
3027
def info(self) -> WorkflowInfo:
3128
return self._info
3229

33-
def client(self) -> Client:
34-
return self._client
35-
3630
def data_converter(self) -> DataConverter:
37-
return self._client.data_converter
31+
return self.info().data_converter
3832

3933
async def execute_activity(
4034
self,
@@ -80,7 +74,7 @@ async def execute_activity(
8074
schedule_attributes = ScheduleActivityTaskDecisionAttributes(
8175
activity_id=activity_id,
8276
activity_type=ActivityType(name=activity),
83-
domain=self._client.domain,
77+
domain=self.info().workflow_domain,
8478
task_list=TaskList(kind=TaskListKind.TASK_LIST_KIND_NORMAL, name=task_list),
8579
input=activity_input,
8680
schedule_to_close_timeout=_round_to_nearest_second(schedule_to_close),

cadence/_internal/workflow/decision_events_iterator.py

Lines changed: 64 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,6 @@
1111

1212
from cadence.api.v1.history_pb2 import HistoryEvent
1313
from cadence.api.v1.service_worker_pb2 import PollForDecisionTaskResponse
14-
from cadence.client import Client
15-
from cadence._internal.workflow.history_event_iterator import iterate_history_events
1614

1715

1816
@dataclass
@@ -55,99 +53,36 @@ class DecisionEventsIterator:
5553
into decision iterations for proper workflow replay and execution.
5654
"""
5755

58-
def __init__(self, decision_task: PollForDecisionTaskResponse, client: Client):
59-
self._client = client
56+
def __init__(
57+
self, decision_task: PollForDecisionTaskResponse, events: List[HistoryEvent]
58+
):
6059
self._decision_task = decision_task
61-
self._events: List[HistoryEvent] = []
62-
self._event_index = 0
60+
self._events: List[HistoryEvent] = events
6361
self._decision_task_started_event: Optional[HistoryEvent] = None
6462
self._next_decision_event_id = 1
6563
self._replay = True
6664
self._replay_current_time_milliseconds: Optional[int] = None
67-
self._initialized = False
68-
69-
@staticmethod
70-
def _is_decision_task_started(event: HistoryEvent) -> bool:
71-
"""Check if event is DecisionTaskStarted."""
72-
return hasattr(
73-
event, "decision_task_started_event_attributes"
74-
) and event.HasField("decision_task_started_event_attributes")
75-
76-
@staticmethod
77-
def _is_decision_task_completed(event: HistoryEvent) -> bool:
78-
"""Check if event is DecisionTaskCompleted."""
79-
return hasattr(
80-
event, "decision_task_completed_event_attributes"
81-
) and event.HasField("decision_task_completed_event_attributes")
82-
83-
@staticmethod
84-
def _is_decision_task_failed(event: HistoryEvent) -> bool:
85-
"""Check if event is DecisionTaskFailed."""
86-
return hasattr(
87-
event, "decision_task_failed_event_attributes"
88-
) and event.HasField("decision_task_failed_event_attributes")
89-
90-
@staticmethod
91-
def _is_decision_task_timed_out(event: HistoryEvent) -> bool:
92-
"""Check if event is DecisionTaskTimedOut."""
93-
return hasattr(
94-
event, "decision_task_timed_out_event_attributes"
95-
) and event.HasField("decision_task_timed_out_event_attributes")
96-
97-
@staticmethod
98-
def _is_marker_recorded(event: HistoryEvent) -> bool:
99-
"""Check if event is MarkerRecorded."""
100-
return hasattr(event, "marker_recorded_event_attributes") and event.HasField(
101-
"marker_recorded_event_attributes"
102-
)
10365

104-
@staticmethod
105-
def _is_decision_task_completion(event: HistoryEvent) -> bool:
106-
"""Check if event is any kind of decision task completion."""
107-
return (
108-
DecisionEventsIterator._is_decision_task_completed(event)
109-
or DecisionEventsIterator._is_decision_task_failed(event)
110-
or DecisionEventsIterator._is_decision_task_timed_out(event)
111-
)
112-
113-
async def _ensure_initialized(self):
114-
"""Initialize events list using the existing iterate_history_events."""
115-
if not self._initialized:
116-
# Use existing iterate_history_events function
117-
events_iterator = iterate_history_events(self._decision_task, self._client)
118-
self._events = [event async for event in events_iterator]
119-
self._initialized = True
120-
121-
# Find first decision task started event
122-
for i, event in enumerate(self._events):
123-
if self._is_decision_task_started(event):
124-
self._event_index = i
125-
break
66+
self._event_index = 0
67+
# Find first decision task started event
68+
for i, event in enumerate(self._events):
69+
if _is_decision_task_started(event):
70+
self._event_index = i
71+
break
12672

12773
async def has_next_decision_events(self) -> bool:
128-
"""Check if there are more decision events to process."""
129-
await self._ensure_initialized()
130-
13174
# Look for the next DecisionTaskStarted event from current position
13275
for i in range(self._event_index, len(self._events)):
133-
if self._is_decision_task_started(self._events[i]):
76+
if _is_decision_task_started(self._events[i]):
13477
return True
13578

13679
return False
13780

13881
async def next_decision_events(self) -> DecisionEvents:
139-
"""
140-
Get the next set of decision events.
141-
142-
This method processes events starting from a DecisionTaskStarted event
143-
until the corresponding DecisionTaskCompleted/Failed/TimedOut event.
144-
"""
145-
await self._ensure_initialized()
146-
14782
# Find next DecisionTaskStarted event
14883
start_index = None
14984
for i in range(self._event_index, len(self._events)):
150-
if self._is_decision_task_started(self._events[i]):
85+
if _is_decision_task_started(self._events[i]):
15186
start_index = i
15287
break
15388

@@ -182,9 +117,9 @@ async def next_decision_events(self) -> DecisionEvents:
182117
decision_events.events.append(event)
183118

184119
# Categorize the event
185-
if self._is_marker_recorded(event):
120+
if _is_marker_recorded(event):
186121
decision_events.markers.append(event)
187-
elif self._is_decision_task_completion(event):
122+
elif _is_decision_task_completion(event):
188123
# This marks the end of this decision iteration
189124
self._process_decision_completion_event(event, decision_events)
190125
current_index += 1 # Move past this event
@@ -206,7 +141,7 @@ async def next_decision_events(self) -> DecisionEvents:
206141
# Check directly without calling has_next_decision_events to avoid recursion
207142
has_more = False
208143
for i in range(self._event_index, len(self._events)):
209-
if self._is_decision_task_started(self._events[i]):
144+
if _is_decision_task_started(self._events[i]):
210145
has_more = True
211146
break
212147

@@ -261,16 +196,16 @@ async def __anext__(self) -> DecisionEvents:
261196
def is_decision_event(event: HistoryEvent) -> bool:
262197
"""Check if an event is a decision-related event."""
263198
return (
264-
DecisionEventsIterator._is_decision_task_started(event)
265-
or DecisionEventsIterator._is_decision_task_completed(event)
266-
or DecisionEventsIterator._is_decision_task_failed(event)
267-
or DecisionEventsIterator._is_decision_task_timed_out(event)
199+
_is_decision_task_started(event)
200+
or _is_decision_task_completed(event)
201+
or _is_decision_task_failed(event)
202+
or _is_decision_task_timed_out(event)
268203
)
269204

270205

271206
def is_marker_event(event: HistoryEvent) -> bool:
272207
"""Check if an event is a marker event."""
273-
return DecisionEventsIterator._is_marker_recorded(event)
208+
return _is_marker_recorded(event)
274209

275210

276211
def extract_event_timestamp_millis(event: HistoryEvent) -> Optional[int]:
@@ -279,3 +214,47 @@ def extract_event_timestamp_millis(event: HistoryEvent) -> Optional[int]:
279214
seconds = getattr(event.event_time, "seconds", 0)
280215
return seconds * 1000 if seconds > 0 else None
281216
return None
217+
218+
219+
def _is_decision_task_started(event: HistoryEvent) -> bool:
220+
"""Check if event is DecisionTaskStarted."""
221+
return hasattr(event, "decision_task_started_event_attributes") and event.HasField(
222+
"decision_task_started_event_attributes"
223+
)
224+
225+
226+
def _is_decision_task_completed(event: HistoryEvent) -> bool:
227+
"""Check if event is DecisionTaskCompleted."""
228+
return hasattr(
229+
event, "decision_task_completed_event_attributes"
230+
) and event.HasField("decision_task_completed_event_attributes")
231+
232+
233+
def _is_decision_task_failed(event: HistoryEvent) -> bool:
234+
"""Check if event is DecisionTaskFailed."""
235+
return hasattr(event, "decision_task_failed_event_attributes") and event.HasField(
236+
"decision_task_failed_event_attributes"
237+
)
238+
239+
240+
def _is_decision_task_timed_out(event: HistoryEvent) -> bool:
241+
"""Check if event is DecisionTaskTimedOut."""
242+
return hasattr(
243+
event, "decision_task_timed_out_event_attributes"
244+
) and event.HasField("decision_task_timed_out_event_attributes")
245+
246+
247+
def _is_marker_recorded(event: HistoryEvent) -> bool:
248+
"""Check if event is MarkerRecorded."""
249+
return hasattr(event, "marker_recorded_event_attributes") and event.HasField(
250+
"marker_recorded_event_attributes"
251+
)
252+
253+
254+
def _is_decision_task_completion(event: HistoryEvent) -> bool:
255+
"""Check if event is any kind of decision task completion."""
256+
return (
257+
_is_decision_task_completed(event)
258+
or _is_decision_task_failed(event)
259+
or _is_decision_task_timed_out(event)
260+
)

cadence/_internal/workflow/workflow_engine.py

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
from cadence._internal.workflow.decision_events_iterator import DecisionEventsIterator
99
from cadence._internal.workflow.statemachine.decision_manager import DecisionManager
1010
from cadence.api.v1.decision_pb2 import Decision
11-
from cadence.client import Client
1211
from cadence.api.v1.service_worker_pb2 import PollForDecisionTaskResponse
1312
from cadence.workflow import WorkflowInfo
1413

@@ -21,16 +20,14 @@ class DecisionResult:
2120

2221

2322
class WorkflowEngine:
24-
def __init__(self, info: WorkflowInfo, client: Client, workflow_definition=None):
23+
def __init__(self, info: WorkflowInfo, workflow_definition=None):
2524
self._workflow_definition = workflow_definition
2625
self._workflow_instance = None
2726
if workflow_definition:
2827
self._workflow_instance = workflow_definition.cls()
2928
self._decision_manager = DecisionManager()
3029
self._decisions_helper = DecisionsHelper()
31-
self._context = Context(
32-
client, info, self._decisions_helper, self._decision_manager
33-
)
30+
self._context = Context(info, self._decisions_helper, self._decision_manager)
3431
self._is_workflow_complete = False
3532

3633
async def process_decision(
@@ -65,7 +62,7 @@ async def process_decision(
6562
with self._context._activate():
6663
# Create DecisionEventsIterator for structured event processing
6764
events_iterator = DecisionEventsIterator(
68-
decision_task, self._context.client()
65+
decision_task, self._context.info().workflow_events
6966
)
7067

7168
# Process decision events using iterator-driven approach
@@ -360,10 +357,8 @@ def _extract_workflow_input(
360357
# Deserialize the input using the client's data converter
361358
try:
362359
# Use from_data method with a single type hint of None (no type conversion)
363-
input_data_list = (
364-
self._context.client().data_converter.from_data(
365-
started_attrs.input, [None]
366-
)
360+
input_data_list = self._context.data_converter().from_data(
361+
started_attrs.input, [None]
367362
)
368363
input_data = input_data_list[0] if input_data_list else None
369364
logger.debug(f"Extracted workflow input: {input_data}")

cadence/worker/_decision_task_handler.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import threading
33
from typing import Dict, Tuple
44

5+
from cadence._internal.workflow.history_event_iterator import iterate_history_events
56
from cadence.api.v1.common_pb2 import Payload
67
from cadence.api.v1.service_worker_pb2 import (
78
PollForDecisionTaskResponse,
@@ -102,13 +103,21 @@ async def _handle_task_implementation(
102103
)
103104
raise KeyError(f"Workflow type '{workflow_type_name}' not found")
104105

106+
# fetch full workflow history
107+
# TODO sticky cache
108+
workflow_events = [
109+
event async for event in iterate_history_events(task, self._client)
110+
]
111+
105112
# Create workflow info and get or create workflow engine from cache
106113
workflow_info = WorkflowInfo(
107114
workflow_type=workflow_type_name,
108115
workflow_domain=self._client.domain,
109116
workflow_id=workflow_id,
110117
workflow_run_id=run_id,
111118
workflow_task_list=self.task_list,
119+
data_converter=self._client.data_converter,
120+
workflow_events=workflow_events,
112121
)
113122

114123
# Use thread-safe cache to get or create workflow engine
@@ -118,7 +127,6 @@ async def _handle_task_implementation(
118127
if workflow_engine is None:
119128
workflow_engine = WorkflowEngine(
120129
info=workflow_info,
121-
client=self._client,
122130
workflow_definition=workflow_definition,
123131
)
124132
self._workflow_engines[cache_key] = workflow_engine

cadence/workflow.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from datetime import timedelta
66
from typing import (
77
Callable,
8+
List,
89
cast,
910
Optional,
1011
Union,
@@ -17,7 +18,7 @@
1718
)
1819
import inspect
1920

20-
from cadence.client import Client
21+
from cadence.api.v1.history_pb2 import HistoryEvent
2122
from cadence.data_converter import DataConverter
2223

2324
ResultType = TypeVar("ResultType")
@@ -169,6 +170,8 @@ class WorkflowInfo:
169170
workflow_id: str
170171
workflow_run_id: str
171172
workflow_task_list: str
173+
workflow_events: List[HistoryEvent]
174+
data_converter: DataConverter
172175

173176

174177
class WorkflowContext(ABC):
@@ -177,9 +180,6 @@ class WorkflowContext(ABC):
177180
@abstractmethod
178181
def info(self) -> WorkflowInfo: ...
179182

180-
@abstractmethod
181-
def client(self) -> Client: ...
182-
183183
@abstractmethod
184184
def data_converter(self) -> DataConverter: ...
185185

0 commit comments

Comments
 (0)