|
23 | 23 | from typing import ( |
24 | 24 | TYPE_CHECKING, |
25 | 25 | Any, |
| 26 | + Awaitable, |
26 | 27 | Callable, |
27 | 28 | Collection, |
28 | 29 | Dict, |
|
80 | 81 | make_deferred_yieldable, |
81 | 82 | run_in_background, |
82 | 83 | ) |
83 | | -from synapse.metrics.background_process_metrics import run_as_background_process |
| 84 | +from synapse.metrics.background_process_metrics import ( |
| 85 | + run_as_background_process as _run_as_background_process, |
| 86 | +) |
84 | 87 | from synapse.module_api.callbacks.account_validity_callbacks import ( |
85 | 88 | IS_USER_EXPIRED_CALLBACK, |
86 | 89 | ON_LEGACY_ADMIN_REQUEST, |
|
158 | 161 | from synapse.util.frozenutils import freeze |
159 | 162 |
|
160 | 163 | if TYPE_CHECKING: |
| 164 | + # Old versions don't have `LiteralString` |
| 165 | + from typing_extensions import LiteralString |
| 166 | + |
161 | 167 | from synapse.app.generic_worker import GenericWorkerStore |
162 | 168 | from synapse.server import HomeServer |
163 | 169 |
|
@@ -216,6 +222,65 @@ class UserIpAndAgent: |
216 | 222 | last_seen: int |
217 | 223 |
|
218 | 224 |
|
| 225 | +def run_as_background_process( |
| 226 | + desc: "LiteralString", |
| 227 | + func: Callable[..., Awaitable[Optional[T]]], |
| 228 | + *args: Any, |
| 229 | + bg_start_span: bool = True, |
| 230 | + **kwargs: Any, |
| 231 | +) -> "defer.Deferred[Optional[T]]": |
| 232 | + """ |
| 233 | + XXX: Deprecated: use `ModuleApi.run_as_background_process` instead. |
| 234 | +
|
| 235 | + Run the given function in its own logcontext, with resource metrics |
| 236 | +
|
| 237 | + This should be used to wrap processes which are fired off to run in the |
| 238 | + background, instead of being associated with a particular request. |
| 239 | +
|
| 240 | + It returns a Deferred which completes when the function completes, but it doesn't |
| 241 | + follow the synapse logcontext rules, which makes it appropriate for passing to |
| 242 | + clock.looping_call and friends (or for firing-and-forgetting in the middle of a |
| 243 | + normal synapse async function). |
| 244 | +
|
| 245 | + Args: |
| 246 | + desc: a description for this background process type |
| 247 | + server_name: The homeserver name that this background process is being run for |
| 248 | + (this should be `hs.hostname`). |
| 249 | + func: a function, which may return a Deferred or a coroutine |
| 250 | + bg_start_span: Whether to start an opentracing span. Defaults to True. |
| 251 | + Should only be disabled for processes that will not log to or tag |
| 252 | + a span. |
| 253 | + args: positional args for func |
| 254 | + kwargs: keyword args for func |
| 255 | +
|
| 256 | + Returns: |
| 257 | + Deferred which returns the result of func, or `None` if func raises. |
| 258 | + Note that the returned Deferred does not follow the synapse logcontext |
| 259 | + rules. |
| 260 | + """ |
| 261 | + |
| 262 | + logger.warning( |
| 263 | + "Using deprecated `run_as_background_process` that's exported from the Module API. " |
| 264 | + "Prefer `ModuleApi.run_as_background_process` instead.", |
| 265 | + ) |
| 266 | + |
| 267 | + # Historically, since this function is exported from the module API, we can't just |
| 268 | + # change the signature to require a `server_name` argument. Since |
| 269 | + # `run_as_background_process` internally in Synapse requires `server_name` now, we |
| 270 | + # just have to stub this out with a placeholder value and tell people to use the new |
| 271 | + # function instead. |
| 272 | + stub_server_name = "synapse_module_running_from_unknown_server" |
| 273 | + |
| 274 | + return _run_as_background_process( |
| 275 | + desc, |
| 276 | + stub_server_name, |
| 277 | + func, |
| 278 | + *args, |
| 279 | + bg_start_span=bg_start_span, |
| 280 | + **kwargs, |
| 281 | + ) |
| 282 | + |
| 283 | + |
219 | 284 | def cached( |
220 | 285 | *, |
221 | 286 | max_entries: int = 1000, |
@@ -1323,10 +1388,9 @@ def looping_background_call( |
1323 | 1388 |
|
1324 | 1389 | if self._hs.config.worker.run_background_tasks or run_on_all_instances: |
1325 | 1390 | self._clock.looping_call( |
1326 | | - run_as_background_process, |
| 1391 | + self.run_as_background_process, |
1327 | 1392 | msec, |
1328 | 1393 | desc, |
1329 | | - self.server_name, |
1330 | 1394 | lambda: maybe_awaitable(f(*args, **kwargs)), |
1331 | 1395 | ) |
1332 | 1396 | else: |
@@ -1382,9 +1446,8 @@ def delayed_background_call( |
1382 | 1446 | return self._clock.call_later( |
1383 | 1447 | # convert ms to seconds as needed by call_later. |
1384 | 1448 | msec * 0.001, |
1385 | | - run_as_background_process, |
| 1449 | + self.run_as_background_process, |
1386 | 1450 | desc, |
1387 | | - self.server_name, |
1388 | 1451 | lambda: maybe_awaitable(f(*args, **kwargs)), |
1389 | 1452 | ) |
1390 | 1453 |
|
@@ -1590,6 +1653,44 @@ async def get_room_state( |
1590 | 1653 |
|
1591 | 1654 | return {key: state_events[event_id] for key, event_id in state_ids.items()} |
1592 | 1655 |
|
| 1656 | + def run_as_background_process( |
| 1657 | + self, |
| 1658 | + desc: "LiteralString", |
| 1659 | + func: Callable[..., Awaitable[Optional[T]]], |
| 1660 | + *args: Any, |
| 1661 | + bg_start_span: bool = True, |
| 1662 | + **kwargs: Any, |
| 1663 | + ) -> "defer.Deferred[Optional[T]]": |
| 1664 | + """Run the given function in its own logcontext, with resource metrics |
| 1665 | +
|
| 1666 | + This should be used to wrap processes which are fired off to run in the |
| 1667 | + background, instead of being associated with a particular request. |
| 1668 | +
|
| 1669 | + It returns a Deferred which completes when the function completes, but it doesn't |
| 1670 | + follow the synapse logcontext rules, which makes it appropriate for passing to |
| 1671 | + clock.looping_call and friends (or for firing-and-forgetting in the middle of a |
| 1672 | + normal synapse async function). |
| 1673 | +
|
| 1674 | + Args: |
| 1675 | + desc: a description for this background process type |
| 1676 | + server_name: The homeserver name that this background process is being run for |
| 1677 | + (this should be `hs.hostname`). |
| 1678 | + func: a function, which may return a Deferred or a coroutine |
| 1679 | + bg_start_span: Whether to start an opentracing span. Defaults to True. |
| 1680 | + Should only be disabled for processes that will not log to or tag |
| 1681 | + a span. |
| 1682 | + args: positional args for func |
| 1683 | + kwargs: keyword args for func |
| 1684 | +
|
| 1685 | + Returns: |
| 1686 | + Deferred which returns the result of func, or `None` if func raises. |
| 1687 | + Note that the returned Deferred does not follow the synapse logcontext |
| 1688 | + rules. |
| 1689 | + """ |
| 1690 | + return _run_as_background_process( |
| 1691 | + desc, self.server_name, func, *args, bg_start_span=bg_start_span, **kwargs |
| 1692 | + ) |
| 1693 | + |
1593 | 1694 | async def defer_to_thread( |
1594 | 1695 | self, |
1595 | 1696 | f: Callable[P, T], |
|
0 commit comments