Skip to content

Commit 6273659

Browse files
vidhyavmeta-codesync[bot]
authored andcommitted
Added choose endpoint latency. (#1771)
Summary: Pull Request resolved: #1771 As stated above. Also wrote the latency measurement as a decorator. Reviewed By: pzhan9 Differential Revision: D86236036 fbshipit-source-id: 3892413a09cc81327e2ed946fc04937f04fd530b
1 parent 9f0274a commit 6273659

File tree

1 file changed

+65
-34
lines changed

1 file changed

+65
-34
lines changed

python/monarch/_src/actor/endpoint.py

Lines changed: 65 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -57,12 +57,18 @@
5757
description="Latency of endpoint stream operations per yield in microseconds",
5858
)
5959

60+
# Histogram for measuring endpoint choose latency
61+
endpoint_choose_latency_histogram: Histogram = METER.create_histogram(
62+
name="endpoint_choose_latency.us",
63+
description="Latency of endpoint choose operations in microseconds",
64+
)
65+
6066
T = TypeVar("T")
6167

6268

6369
def _measure_latency(
6470
coro: Coroutine[Any, Any, T],
65-
start_time: int,
71+
start_time_ns: int,
6672
histogram: Histogram,
6773
method_name: str,
6874
actor_count: int,
@@ -84,7 +90,7 @@ async def _wrapper() -> T:
8490
try:
8591
return await coro
8692
finally:
87-
duration_us = int((time.monotonic_ns() - start_time) / 1_000)
93+
duration_us = int((time.monotonic_ns() - start_time_ns) / 1_000)
8894
histogram.record(
8995
duration_us,
9096
attributes={
@@ -130,6 +136,33 @@ def _get_method_name(self) -> str:
130136
return method_specifier.name
131137
return "unknown"
132138

139+
def _with_latency_measurement(
140+
self, start_time_ns: int, histogram: Histogram, actor_count: int
141+
) -> Any:
142+
"""
143+
Decorator factory to add latency measurement to async functions.
144+
145+
Args:
146+
histogram: The histogram to record metrics to
147+
actor_count: Number of actors involved in the operation
148+
149+
Returns:
150+
A decorator that wraps async functions with latency measurement
151+
"""
152+
method_name: str = self._get_method_name()
153+
154+
def decorator(func: Any) -> Any:
155+
@functools.wraps(func)
156+
def wrapper(*args: Any, **kwargs: Any) -> Any:
157+
coro = func(*args, **kwargs)
158+
return _measure_latency(
159+
coro, start_time_ns, histogram, method_name, actor_count
160+
)
161+
162+
return wrapper
163+
164+
return decorator
165+
133166
@abstractmethod
134167
def _send(
135168
self,
@@ -178,10 +211,20 @@ def choose(self, *args: P.args, **kwargs: P.kwargs) -> Future[R]:
178211
Load balanced RPC-style entrypoint for request/response messaging.
179212
"""
180213

181-
p, r = self._port(once=True)
214+
p, r_port = self._port(once=True)
215+
r: "PortReceiver[R]" = r_port
216+
start_time: int = time.monotonic_ns()
182217
# pyre-ignore[6]: ParamSpec kwargs is compatible with Dict[str, Any]
183218
self._send(args, kwargs, port=p, selection="choose")
184-
return r.recv()
219+
220+
@self._with_latency_measurement(
221+
start_time, endpoint_choose_latency_histogram, 1
222+
)
223+
async def process() -> R:
224+
result = await r.recv()
225+
return result
226+
227+
return Future(coro=process())
185228

186229
def call_one(self, *args: P.args, **kwargs: P.kwargs) -> Future[R]:
187230
p, r_port = self._port(once=True)
@@ -194,32 +237,27 @@ def call_one(self, *args: P.args, **kwargs: P.kwargs) -> Future[R]:
194237
f"Can only use 'call_one' on a single Actor but this actor has shape {extent}"
195238
)
196239

197-
method_name = self._get_method_name()
198-
240+
@self._with_latency_measurement(
241+
start_time, endpoint_call_one_latency_histogram, 1
242+
)
199243
async def process() -> R:
200244
result = await r.recv()
201245
return result
202246

203-
measured_coro = _measure_latency(
204-
process(),
205-
start_time,
206-
endpoint_call_one_latency_histogram,
207-
method_name,
208-
1,
209-
)
210-
return Future(coro=measured_coro)
247+
return Future(coro=process())
211248

212249
def call(self, *args: P.args, **kwargs: P.kwargs) -> "Future[ValueMesh[R]]":
213250
from monarch._src.actor.actor_mesh import RankedPortReceiver, ValueMesh
214251

252+
start_time: int = time.monotonic_ns()
215253
p, unranked = self._port()
216254
r: RankedPortReceiver[R] = unranked.ranked()
217-
start_time: int = time.monotonic_ns()
218255
# pyre-ignore[6]: ParamSpec kwargs is compatible with Dict[str, Any]
219256
extent: Extent = self._send(args, kwargs, port=p)
220257

221-
method_name = self._get_method_name()
222-
258+
@self._with_latency_measurement(
259+
start_time, endpoint_call_latency_histogram, extent.nelements
260+
)
223261
async def process() -> "ValueMesh[R]":
224262
from monarch._rust_bindings.monarch_hyperactor.shape import Shape
225263
from monarch._src.actor.shape import NDSlice
@@ -234,14 +272,7 @@ async def process() -> "ValueMesh[R]":
234272
)
235273
return ValueMesh(call_shape, results)
236274

237-
measured_coro = _measure_latency(
238-
process(),
239-
start_time,
240-
endpoint_call_latency_histogram,
241-
method_name,
242-
extent.nelements,
243-
)
244-
return Future(coro=measured_coro)
275+
return Future(coro=process())
245276

246277
def stream(
247278
self, *args: P.args, **kwargs: P.kwargs
@@ -258,18 +289,18 @@ def stream(
258289
extent: Extent = self._send(args, kwargs, port=p)
259290
r: "PortReceiver[R]" = r_port
260291

261-
method_name: str = self._get_method_name()
292+
latency_decorator: Any = self._with_latency_measurement(
293+
start_time, endpoint_stream_latency_histogram, extent.nelements
294+
)
262295

263296
def _stream() -> Generator[Future[R], None, None]:
264297
for _ in range(extent.nelements):
265-
measured_coro = _measure_latency(
266-
r._recv(),
267-
start_time,
268-
endpoint_stream_latency_histogram,
269-
method_name,
270-
extent.nelements,
271-
)
272-
yield Future(coro=measured_coro)
298+
299+
@latency_decorator
300+
async def receive() -> R:
301+
return await r._recv()
302+
303+
yield Future(coro=receive())
273304

274305
return _stream()
275306

0 commit comments

Comments
 (0)