Skip to content

Commit f69c7fc

Browse files
authored
Merge pull request #12 from compas-dev/args
Fix several issues
2 parents 0f27c04 + d2caf2c commit f69c7fc

File tree

9 files changed

+188
-37
lines changed

9 files changed

+188
-37
lines changed

CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
99

1010
### Added
1111

12+
* Added the option to pass arguments into the long running task of a background worker.
13+
* Added the option to manually control when the background worker task is set to **Done**.
14+
* Added dispose function to control resource deallocation in a background worker.
15+
1216
### Changed
1317

18+
* Set background threads in the background worker as daemon threads to prevent blocking the main thread.
19+
* Changed base class of `Message` from `UserDict` to `object` because in IronPython 2.7 `UserDict` is an old-style class. The behavior of dictionary-like is still preserved.
20+
1421
### Removed
1522

1623

docs/_images/background-task.png

5.95 KB
Loading

src/compas_eve/core.py

Lines changed: 39 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
1-
# Python 2/3 compatibility import list
2-
try:
3-
from collections import UserDict
4-
except ImportError:
5-
from UserDict import UserDict
1+
from compas.data import json_dumps
2+
from compas.data import json_loads
63

74
DEFAULT_TRANSPORT = None
85

@@ -61,21 +58,34 @@ def unadvertise(self, topic):
6158
pass
6259

6360

64-
class Message(UserDict):
61+
class Message(object):
6562
"""Message objects used for publishing and subscribing to/from topics.
6663
6764
A message is fundamentally a dictionary and behaves as one."""
6865

66+
def __init__(self, *args, **kwargs):
67+
super(Message, self).__init__()
68+
self.data = {}
69+
self.data.update(*args, **kwargs)
70+
71+
def ToString(self):
72+
return str(self)
73+
6974
def __str__(self):
7075
return str(self.data)
7176

7277
def __getattr__(self, name):
73-
return self.__dict__["data"][name]
78+
return self.data[name]
79+
80+
def __getitem__(self, key):
81+
return self.data[key]
82+
83+
def __setitem__(self, key, value):
84+
self.data[key] = value
7485

7586
@classmethod
7687
def parse(cls, value):
77-
instance = cls()
78-
instance.update(value)
88+
instance = cls(**value)
7989
return instance
8090

8191

@@ -103,6 +113,26 @@ def __init__(self, name, message_type, **options):
103113
self.message_type = message_type
104114
self.options = options
105115

116+
def _message_to_json(self, message):
117+
"""Convert a message to a JSON string.
118+
119+
Normally, this method expects sub-classes of ``Message`` as input.
120+
However, it can deal with regular dictionaries as well as classes
121+
implementing the COMPAS data framework.
122+
"""
123+
try:
124+
data = message.data
125+
except (KeyError, AttributeError):
126+
try:
127+
data = message.__data__
128+
except (KeyError, AttributeError):
129+
data = dict(message)
130+
return json_dumps(data)
131+
132+
def _message_from_json(self, json_message):
133+
"""Converts a JSON string back into a message instance."""
134+
return self.message_type.parse(json_loads(json_message))
135+
106136

107137
class Publisher(object):
108138
"""Publisher interface."""

src/compas_eve/ghpython/background.py

Lines changed: 54 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,16 +54,27 @@ def do_something_long_and_complicated(worker):
5454
Grasshopper environment object
5555
long_running_function : function, optional
5656
This function will be the main entry point for the long-running task.
57+
dispose_function : function, optional
58+
If defined, this function will be called when the worker is disposed. It can be used for clean-up tasks
59+
and resource deallocation.
60+
auto_set_done : bool, optional
61+
If true, the worker state will be automatically set to ``Done`` after the function returns.
62+
Defaults to ``True``.
63+
args : tuple, optional
64+
List or tuple of arguments for the invocation of the ``long_running_function``. Defaults to ``()``.
5765
"""
5866

59-
def __init__(self, ghenv, long_running_function=None):
67+
def __init__(self, ghenv, long_running_function=None, dispose_function=None, auto_set_done=True, args=()):
6068
super(BackgroundWorker, self).__init__()
6169
self.ghenv = ghenv
6270
self._is_working = False
6371
self._is_done = False
6472
self._is_cancelled = False
6573
self._has_requested_cancellation = False
6674
self.long_running_function = long_running_function
75+
self.dispose_function = dispose_function
76+
self.auto_set_done = auto_set_done
77+
self.args = args
6778

6879
def is_working(self):
6980
"""Indicate whether the worker is currently working or not."""
@@ -86,17 +97,37 @@ def start_work(self):
8697
def _long_running_task_wrapper(worker):
8798
try:
8899
worker.set_internal_state_to_working()
89-
result = self.long_running_function(self)
90-
worker.set_internal_state_to_done(result)
100+
result = self.long_running_function(self, *self.args)
101+
102+
# There are (at least) two types of long running functions:
103+
# 1. Those that block the thread while working
104+
# (e.g. they have a busy-wait or some kind of `while` loop)
105+
# 2. Those that hookup event handlers and return immediately
106+
# so then they don't need to block the thread.
107+
# The first case means that we can set the state to "DONE"
108+
# right after calling the function because if it returned, it really
109+
# means it's done.
110+
# The second case will return immediately, and setting the state to "DONE"
111+
# would be wrong because the handlers are still going to trigger.
112+
# In that case we can set the flag `auto_set_done` to `False` so that
113+
# we don't automatically set the state to "DONE".
114+
if self.auto_set_done:
115+
worker.set_internal_state_to_done(result)
91116
except Exception as e:
92117
worker.display_message(str(e))
93118
worker.set_internal_state_to_cancelled()
94119

95120
target = _long_running_task_wrapper
96121
args = (self,)
97122
self.thread = threading.Thread(target=target, args=args)
123+
self.thread.daemon = True
98124
self.thread.start()
99125

126+
def dispose(self):
127+
"""Invoked when the worker is being disposed."""
128+
if callable(self.dispose_function):
129+
self.dispose_function(self)
130+
100131
def set_internal_state_to_working(self):
101132
"""Set the internal state to ``working``."""
102133
self._is_working = True
@@ -159,7 +190,9 @@ def ui_callback():
159190
Rhino.RhinoApp.InvokeOnUiThread(System.Action(ui_callback))
160191

161192
@classmethod
162-
def instance_by_component(cls, ghenv, long_running_function=None, force_new=False):
193+
def instance_by_component(
194+
cls, ghenv, long_running_function=None, dispose_function=None, auto_set_done=True, force_new=False, args=()
195+
):
163196
"""Get the worker instance assigned to the component.
164197
165198
This will get a persistant instance of a background worker
@@ -172,8 +205,16 @@ def instance_by_component(cls, ghenv, long_running_function=None, force_new=Fals
172205
Grasshopper environment object
173206
long_running_function : function, optional
174207
This function will be the main entry point for the long-running task.
208+
dispose_function : function, optional
209+
If defined, this function will be called when the worker is disposed.
210+
It can be used for clean-up tasks and resource deallocation.
211+
auto_set_done : bool, optional
212+
If true, the worker state will be automatically set to ``Done`` after the function returns.
213+
Defaults to ``True``.
175214
force_new : bool, optional
176215
Force the creation of a new background worker, by default False.
216+
args : tuple, optional
217+
List or tuple of arguments for the invocation of the ``long_running_function``. Defaults to ``()``.
177218
178219
Returns
179220
-------
@@ -186,11 +227,18 @@ def instance_by_component(cls, ghenv, long_running_function=None, force_new=Fals
186227

187228
if worker and force_new:
188229
worker.request_cancellation()
230+
worker.dispose()
189231
worker = None
190232
del scriptcontext.sticky[key]
191233

192234
if not worker:
193-
worker = cls(ghenv, long_running_function=long_running_function)
235+
worker = cls(
236+
ghenv,
237+
long_running_function=long_running_function,
238+
dispose_function=dispose_function,
239+
auto_set_done=auto_set_done,
240+
args=args,
241+
)
194242
scriptcontext.sticky[key] = worker
195243

196244
return worker
@@ -212,5 +260,6 @@ def stop_instance_by_component(cls, ghenv):
212260

213261
if worker:
214262
worker.request_cancellation()
263+
worker.dispose()
215264
worker = None
216265
del scriptcontext.sticky[key]

src/compas_eve/ghpython/components/Ce_BackgroundTask/code.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818

1919
class BackgroundTaskComponent(component):
20-
def RunScript(self, reset, task, on):
20+
def RunScript(self, task, reset, on):
2121
if not on:
2222
BackgroundWorker.stop_instance_by_component(ghenv) # noqa: F821
2323
return None

src/compas_eve/ghpython/components/Ce_BackgroundTask/metadata.json

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,14 @@
1010
"isAdvancedMode": true,
1111
"iconDisplay": 2,
1212
"inputParameters": [
13-
{
14-
"name": "reset",
15-
"description": "Resets the background worker."
16-
},
1713
{
1814
"name": "task",
1915
"description": "A Python function that will be executed by the background worker. The function does not need to return quickly, it can even have an infinite loop and keep running, it will not block the UI."
2016
},
17+
{
18+
"name": "reset",
19+
"description": "Resets the background worker."
20+
},
2121
{
2222
"name": "on",
2323
"description": "Turn ON or OFF the background worker.",

src/compas_eve/mqtt/mqtt_cli.py

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,7 @@
44
from ..core import Transport
55
from ..event_emitter import EventEmitterMixin
66

7-
from compas.data import json_dumps
8-
from compas.data import json_loads
9-
107
import clr
11-
import json
128
import os
139
import sys
1410

@@ -75,11 +71,11 @@ def on_ready(self, callback):
7571
self.once("ready", callback)
7672

7773
def publish(self, topic, message):
78-
# TODO: can we avoid the additional cast to dict?
74+
json_message = topic._message_to_json(message)
7975
application_message = (
8076
MqttApplicationMessageBuilder()
8177
.WithTopic(topic.name)
82-
.WithPayload(json_dumps(dict(message)))
78+
.WithPayload(json_message)
8379
.Build()
8480
)
8581

@@ -94,7 +90,7 @@ def subscribe(self, topic, callback):
9490

9591
def _local_callback(application_message):
9692
payload = Encoding.UTF8.GetString(application_message.Payload)
97-
msg = topic.message_type.parse(json_loads(payload))
93+
msg = topic._message_from_json(payload)
9894
callback(msg)
9995

10096
def _subscribe_callback(**kwargs):

src/compas_eve/mqtt/mqtt_paho.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,3 @@
1-
from compas.data import json_dumps
2-
from compas.data import json_loads
3-
41
from ..core import Transport
52
from ..event_emitter import EventEmitterMixin
63

@@ -63,8 +60,7 @@ def publish(self, topic, message):
6360
"""
6461

6562
def _callback(**kwargs):
66-
# TODO: can we avoid the additional cast to dict?
67-
json_message = json_dumps(dict(message))
63+
json_message = topic._message_to_json(message)
6864
self.client.publish(topic.name, json_message)
6965

7066
self.on_ready(_callback)
@@ -91,7 +87,7 @@ def subscribe(self, topic, callback):
9187
subscribe_id = "{}:{}".format(event_key, id(callback))
9288

9389
def _local_callback(msg):
94-
msg = topic.message_type.parse(json_loads(msg.payload.decode()))
90+
msg = topic._message_from_json(msg.payload.decode())
9591
callback(msg)
9692

9793
def _subscribe_callback(**kwargs):

0 commit comments

Comments
 (0)