Skip to content
8 changes: 8 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
[*]
end_of_line = lf
insert_final_newline = true

[*.py]
indent_style = space
indent_size = 4
trim_trailing_whitespace = true
38 changes: 38 additions & 0 deletions examples/lambda.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from simpleflow import Workflow
from simpleflow.lambda_function import LambdaFunction
from simpleflow.swf.task import LambdaFunctionTask

"""
The lambda function is:

from __future__ import print_function

import json

print('Loading function')


def lambda_handler(event, context):
print("Received event: " + json.dumps(event, indent=2))
return 42
"""


class LambdaWorkflow(Workflow):
name = 'basic'
version = 'example'
task_list = 'example'
lambda_role = 'arn:aws:iam::111111000000:role/swf-lambda' # optional, overridable (--lambda-role)

def run(self):
future = self.submit(
LambdaFunctionTask(
LambdaFunction(
'hello-world-python',
idempotent=True,
),
8,
foo='bar',
)
)
print(future.result)
10 changes: 10 additions & 0 deletions simpleflow/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,9 @@ def transform_input(wf_input):
type=comma_separated_list,
required=False,
help='Tags for the workflow execution.')
@click.option('--lambda-role',
required=False,
help='Lambda role.')
@click.option('--decision-tasks-timeout',
required=False,
help='Timeout for the decision tasks.')
Expand All @@ -183,6 +186,7 @@ def start_workflow(workflow,
task_list,
execution_timeout,
tags,
lambda_role,
decision_tasks_timeout,
input,
input_file,
Expand Down Expand Up @@ -210,6 +214,7 @@ def start_workflow(workflow,
execution_timeout=execution_timeout,
input=wf_input,
tag_list=tags,
lambda_role=lambda_role or workflow_class.lambda_role,
decision_tasks_timeout=decision_tasks_timeout,
)
print('{workflow_id} {run_id}'.format(
Expand Down Expand Up @@ -481,6 +486,9 @@ def create_unique_task_list(workflow_id=''):
type=comma_separated_list,
required=False,
help='Tags identifying the workflow execution.')
@click.option('--lambda-role',
required=False,
help='Lambda role.')
@click.option('--decision-tasks-timeout',
required=False,
help='Decision tasks timeout.')
Expand Down Expand Up @@ -515,6 +523,7 @@ def standalone(context,
workflow_id,
execution_timeout,
tags,
lambda_role,
decision_tasks_timeout,
input,
input_file,
Expand Down Expand Up @@ -617,6 +626,7 @@ def standalone(context,
task_list,
execution_timeout,
tags,
lambda_role,
decision_tasks_timeout,
format.input(wf_input),
None,
Expand Down
90 changes: 90 additions & 0 deletions simpleflow/history.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ class History(object):
:type _markers: collections.OrderedDict[str, list[dict[str, Any]]]
:ivar _timers: timer events
:type _timers: dict[str, dict[str, Any]]]
:ivar _lambda_functions: activity events
:type _lambda_functions: collections.OrderedDict[str, dict[str, Any]]
:ivar _tasks: ordered list of tasks/etc
:type _tasks: list[dict[str, Any]]
"""
Expand All @@ -40,6 +42,7 @@ def __init__(self, history):
self._signaled_workflows = collections.defaultdict(list)
self._markers = collections.OrderedDict()
self._timers = {}
self._lambda_functions = collections.OrderedDict()
self._tasks = []
self._cancel_requested = None
self._cancel_failed = None
Expand Down Expand Up @@ -119,6 +122,14 @@ def cancel_failed_decision_task_completed_event_id(self):
"""
return self._cancel_failed['decision_task_completed_event_id'] if self._cancel_failed else None

@property
def lambda_functions(self):
"""
:return: lambda_functions
:rtype: collections.OrderedDict[str, dict[str, Any]]
"""
return self._lambda_functions

@property
def signaled_workflows(self):
"""
Expand Down Expand Up @@ -639,6 +650,84 @@ def parse_decision_event(self, events, event):
if event.state == 'completed':
self.completed_decision_id = event.id

def parse_lambda_function_event(self, events, event):
"""
Parse a lambda function event.
:param events:
:param event:
"""
def get_lambda():
scheduled_event_id = events[event.scheduled_event_id - 1]
return self._lambda_functions[scheduled_event_id.lambda_id]

if event.state == 'scheduled':
lambda_function = {
'type': 'lambda_function',
'id': event.lambda_id,
'name': event.lambda_name,
'input': event.input,
'state': event.state,
'start_to_close_timeout': getattr(event, 'start_to_close_timeout', None),
'scheduled_id': event.id,
'scheduled_timestamp': event.timestamp,
}
self._lambda_functions[event.lambda_id] = lambda_function
elif event.state == 'schedule_failed':
lambda_function = {
'type': 'lambda_function',
'id': event.lambda_id,
'name': event.lambda_name,
'state': event.state,
'schedule_failed_id': event.id,
'schedule_failed_timestamp': event.timestamp,
}
self._lambda_functions[event.lambda_id] = lambda_function
elif event.state == 'started':
lambda_function = get_lambda()
lambda_function.update({
'state': event.state,
'started_id': event.id,
'started_timestamp': event.timestamp,
})
elif event.state == 'start_failed':
lambda_function = get_lambda()
lambda_function.update({
'state': event.state,
'cause': event.cause,
'message': getattr(event, 'message', ''),
'start_failed_id': event.id,
'start_failed_timestamp': event.timestamp,
'retry': lambda_function.get('retry', -1) + 1,
})
elif event.state == 'completed':
lambda_function = get_lambda()
lambda_function.update({
'state': event.state,
'result': getattr(event, 'result', None),
'completed_id': event.id,
'completed_timestamp': event.timestamp,
})
elif event.state == 'failed':
lambda_function = get_lambda()
lambda_function.update({
'state': event.state,
'reason': getattr(event, 'reason', ''),
'details': getattr(event, 'details', ''),
'failed_id': event.id,
'failed_timestamp': event.timestamp,
'retry': lambda_function.get('retry', -1) + 1,
})
elif event.state == 'timed_out':
lambda_function = get_lambda()
lambda_function.update({
'state': event.state,
'timeout_type': getattr(event, 'timeout_type', 'START_TO_CLOSE'),
'timeout_value': lambda_function['start_to_close_timeout'],
'timed_out_id': event.id,
'timed_out_timestamp': event.timestamp,
'retry': lambda_function.get('retry', -1) + 1,
})

TYPE_TO_PARSER = {
'ActivityTask': parse_activity_event,
'DecisionTask': parse_decision_event,
Expand All @@ -647,6 +736,7 @@ def parse_decision_event(self, events, event):
'ExternalWorkflowExecution': parse_external_workflow_event,
'Marker': parse_marker_event,
'Timer': parse_timer_event,
'LambdaFunction': parse_lambda_function_event,
}

def parse(self):
Expand Down
14 changes: 14 additions & 0 deletions simpleflow/lambda_function.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from simpleflow.base import Submittable


class LambdaFunction(Submittable):
def __init__(self,
name,
start_to_close_timeout=None,
idempotent=None,
is_python_function=True,
):
self.name = name
self.start_to_close_timeout = start_to_close_timeout
self.idempotent = idempotent
self.is_python_function = is_python_function
86 changes: 83 additions & 3 deletions simpleflow/swf/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
MarkerTask,
TimerTask,
CancelTimerTask,
LambdaFunctionTask,
)
from simpleflow.utils import (
hex_hash,
Expand Down Expand Up @@ -383,6 +384,51 @@ def _get_future_from_child_workflow_event(self, event):

return future

def _get_future_from_lambda_function_event(self, event):
"""

:param event: child workflow event
:type event: dict[str, Any]
:return:
:rtype: futures.Future
"""
future = futures.Future()
state = event['state']

if state == 'scheduled':
pass
elif state == 'schedule_failed':
logger.info('failed to schedule {}: {}'.format(
event['name'],
event['cause'],
))
return None
elif state == 'started':
future.set_running()
elif state == 'completed':
future.set_finished(format.decode(event['result']))
elif state == 'failed':
future.set_exception(exceptions.TaskFailed(
name=event['id'],
reason=event['reason'],
details=event.get('details'),
))
elif state == 'start_failed':
future.set_exception(exceptions.TaskFailed(
name=event['id'],
reason=event['cause'],
details=event.get('message'),
))
elif state == 'timed_out':
future.set_exception(exceptions.TimeoutError(
event['timeout_type'],
None,
))
else:
logger.warning('Unknown state: %s', state)

return future

def _get_future_from_marker_event(self, a_task, event):
"""Maps a marker event to a Future with the corresponding
state.
Expand Down Expand Up @@ -520,6 +566,19 @@ def find_child_workflow_event(self, a_task, history):
"""
return history.child_workflows.get(a_task.id)

def find_lambda_function_event(self, a_task, history):
"""
Get the event corresponding to a lambda function, if any.

:param a_task:
:type a_task: LambdaFunctionTask
:param history:
:type history: simpleflow.history.History
:return:
:rtype: Optional[dict]
"""
return history.lambda_functions.get(a_task.id)

def find_signal_event(self, a_task, history):
"""
Get the event corresponding to a signal, if any.
Expand Down Expand Up @@ -593,6 +652,7 @@ def find_timer_event(self, a_task, history):
MarkerTask: find_marker_event,
TimerTask: find_timer_event,
CancelTimerTask: find_timer_event,
LambdaFunctionTask: find_lambda_function_event,
}

def find_event(self, a_task, history):
Expand Down Expand Up @@ -794,12 +854,30 @@ def find_timer_associated_with(self, event, swf_task):
def get_retry_task_timer_id(swf_task):
return '__simpleflow_task_{}'.format(str(swf_task.id))

def resume_lambda_function(self, a_task, event):
"""
Resume a child workflow.

:param a_task:
:type a_task: LambdaTask
:param event:
:type event: dict
:return:
:rtype: simpleflow.futures.Future
"""
future = self._get_future_from_lambda_function_event(event)

if future.finished and future.exception:
raise future.exception

return future

def schedule_task(self, a_task, task_list=None):
"""
Let a task schedule itself.
If too many decisions are in flight, add a timer decision and raise ExecutionBlocked.
:param a_task:
:type a_task: ActivityTask | WorkflowTask | SignalTask | MarkerTask
:type a_task: ActivityTask | WorkflowTask | SignalTask [ MarkerTask [ TimerTask | CancelTimerTask | LambdaFunctionTask # noqa
:param task_list:
:type task_list: Optional[str]
:raise: exceptions.ExecutionBlocked if too many decisions waiting
Expand Down Expand Up @@ -862,6 +940,7 @@ def _add_start_timer_decision(self, id, timeout=0):
'external_workflow': get_future_from_external_workflow_event,
'marker': _get_future_from_marker_event,
'timer': _get_future_from_timer_event,
'lambda_function': resume_lambda_function,
}

def resume(self, a_task, *args, **kwargs):
Expand Down Expand Up @@ -1391,8 +1470,9 @@ def get_event_details(self, event_type, event_name):

def handle_cancel_requested(self):
decision = swf.models.decision.WorkflowExecutionDecision()
is_current_decision = self._history.completed_decision_id < self._history.cancel_requested_id
should_cancel = self._workflow.should_cancel(self._history)
history = self._history
is_current_decision = history.completed_decision_id < history.cancel_requested_id
should_cancel = self._workflow.should_cancel(history)
if not should_cancel:
return None # ignore cancel
if is_current_decision:
Expand Down
Loading