3434from monarch ._rust_bindings .monarch_hyperactor .shape import Extent
3535
3636from monarch ._src .actor .future import Future
37- from monarch ._src .actor .telemetry import METER
38- from monarch ._src .actor .tensor_engine_shim import _cached_propagation , fake_call
39-
40- from opentelemetry .metrics import Histogram
41-
42- # Histogram for measuring endpoint call latency
43- endpoint_call_latency_histogram : Histogram = METER .create_histogram (
44- name = "endpoint_call_latency.us" ,
45- description = "Latency of endpoint call operations in microseconds" ,
46- )
47-
48- # Histogram for measuring endpoint call_one latency
49- endpoint_call_one_latency_histogram : Histogram = METER .create_histogram (
50- name = "endpoint_call_one_latency.us" ,
51- description = "Latency of endpoint call_one operations in microseconds" ,
52- )
53-
54- # Histogram for measuring endpoint stream latency per yield
55- endpoint_stream_latency_histogram : Histogram = METER .create_histogram (
56- name = "endpoint_stream_latency.us" ,
57- description = "Latency of endpoint stream operations per yield in microseconds" ,
37+ from monarch ._src .actor .metrics import (
38+ endpoint_broadcast_error_counter ,
39+ endpoint_broadcast_throughput_counter ,
40+ endpoint_call_error_counter ,
41+ endpoint_call_latency_histogram ,
42+ endpoint_call_one_error_counter ,
43+ endpoint_call_one_latency_histogram ,
44+ endpoint_call_one_throughput_counter ,
45+ endpoint_call_throughput_counter ,
46+ endpoint_choose_error_counter ,
47+ endpoint_choose_latency_histogram ,
48+ endpoint_choose_throughput_counter ,
49+ endpoint_stream_latency_histogram ,
50+ endpoint_stream_throughput_counter ,
5851)
52+ from monarch ._src .actor .tensor_engine_shim import _cached_propagation , fake_call
5953
60- # Histogram for measuring endpoint choose latency
61- endpoint_choose_latency_histogram : Histogram = METER .create_histogram (
62- name = "endpoint_choose_latency.us" ,
63- description = "Latency of endpoint choose operations in microseconds" ,
64- )
54+ from opentelemetry .metrics import Counter , Histogram
6555
6656T = TypeVar ("T" )
6757
6858
69- def _measure_latency (
59+ def _observe_latency_and_error (
7060 coro : Coroutine [Any , Any , T ],
7161 start_time_ns : int ,
7262 histogram : Histogram ,
63+ error_counter : Counter ,
7364 method_name : str ,
7465 actor_count : int ,
7566) -> Coroutine [Any , Any , T ]:
7667 """
77- Decorator to measure and record latency of an async operation.
68+ Observe and record latency and errors of an async operation.
7869
7970 Args:
80- coro: The coroutine to measure
81- histogram: The histogram to record metrics to
71+ coro: The coroutine to observe
72+ histogram: The histogram to record latency metrics to
73+ error_counter: The counter to record error metrics to
8274 method_name: Name of the method being called
8375 actor_count: Number of actors involved in the call
8476
8577 Returns:
86- A wrapped coroutine that records latency metrics
78+ A wrapped coroutine that records error and latency metrics
8779 """
8880
8981 async def _wrapper () -> T :
82+ error_occurred = False
9083 try :
9184 return await coro
85+ except Exception :
86+ error_occurred = True
87+ raise
9288 finally :
9389 duration_us = int ((time .monotonic_ns () - start_time_ns ) / 1_000 )
94- histogram . record (
95- duration_us ,
96- attributes = {
97- "method" : method_name ,
98- "actor_count" : actor_count ,
99- },
100- )
90+ attributes = {
91+ "method" : method_name ,
92+ "actor_count" : actor_count ,
93+ }
94+ histogram . record ( duration_us , attributes = attributes )
95+ if error_occurred :
96+ error_counter . add ( 1 , attributes = attributes )
10197
10298 return _wrapper ()
10399
@@ -136,27 +132,37 @@ def _get_method_name(self) -> str:
136132 return method_specifier .name
137133 return "unknown"
138134
139- def _with_latency_measurement (
140- self , start_time_ns : int , histogram : Histogram , actor_count : int
135+ def _with_telemetry (
136+ self ,
137+ start_time_ns : int ,
138+ histogram : Histogram ,
139+ error_counter : Counter ,
140+ actor_count : int ,
141141 ) -> Any :
142142 """
143- Decorator factory to add latency measurement to async functions.
143+ Decorator factory to add telemetry ( latency and error tracking) to async functions.
144144
145145 Args:
146- histogram: The histogram to record metrics to
146+ histogram: The histogram to record latency metrics to
147+ error_counter: The counter to record error metrics to
147148 actor_count: Number of actors involved in the operation
148149
149150 Returns:
150- A decorator that wraps async functions with latency measurement
151+ A decorator that wraps async functions with telemetry measurement
151152 """
152153 method_name : str = self ._get_method_name ()
153154
154155 def decorator (func : Any ) -> Any :
155156 @functools .wraps (func )
156157 def wrapper (* args : Any , ** kwargs : Any ) -> Any :
157158 coro = func (* args , ** kwargs )
158- return _measure_latency (
159- coro , start_time_ns , histogram , method_name , actor_count
159+ return _observe_latency_and_error (
160+ coro ,
161+ start_time_ns ,
162+ histogram ,
163+ error_counter ,
164+ method_name ,
165+ actor_count ,
160166 )
161167
162168 return wrapper
@@ -210,20 +216,33 @@ def choose(self, *args: P.args, **kwargs: P.kwargs) -> Future[R]:
210216
211217 Load balanced RPC-style entrypoint for request/response messaging.
212218 """
219+ # Track throughput at method entry
220+ method_name : str = self ._get_method_name ()
221+ endpoint_choose_throughput_counter .add (1 , attributes = {"method" : method_name })
213222
214223 p , r_port = self ._port (once = True )
215224 r : "PortReceiver[R]" = r_port
225+ start_time_ns : int = time .monotonic_ns ()
216226 # pyre-ignore[6]: ParamSpec kwargs is compatible with Dict[str, Any]
217227 self ._send (args , kwargs , port = p , selection = "choose" )
218228
219- @self ._with_latency_measurement (endpoint_choose_latency_histogram , 1 )
229+ @self ._with_telemetry (
230+ start_time_ns ,
231+ endpoint_choose_latency_histogram ,
232+ endpoint_choose_error_counter ,
233+ 1 ,
234+ )
220235 async def process () -> R :
221236 result = await r .recv ()
222237 return result
223238
224239 return Future (coro = process ())
225240
226241 def call_one (self , * args : P .args , ** kwargs : P .kwargs ) -> Future [R ]:
242+ # Track throughput at method entry
243+ method_name : str = self ._get_method_name ()
244+ endpoint_call_one_throughput_counter .add (1 , attributes = {"method" : method_name })
245+
227246 p , r_port = self ._port (once = True )
228247 r : PortReceiver [R ] = r_port
229248 start_time : int = time .monotonic_ns ()
@@ -234,8 +253,11 @@ def call_one(self, *args: P.args, **kwargs: P.kwargs) -> Future[R]:
234253 f"Can only use 'call_one' on a single Actor but this actor has shape { extent } "
235254 )
236255
237- @self ._with_latency_measurement (
238- start_time , endpoint_call_one_latency_histogram , 1
256+ @self ._with_telemetry (
257+ start_time ,
258+ endpoint_call_one_latency_histogram ,
259+ endpoint_call_one_error_counter ,
260+ 1 ,
239261 )
240262 async def process () -> R :
241263 result = await r .recv ()
@@ -247,13 +269,19 @@ def call(self, *args: P.args, **kwargs: P.kwargs) -> "Future[ValueMesh[R]]":
247269 from monarch ._src .actor .actor_mesh import RankedPortReceiver , ValueMesh
248270
249271 start_time : int = time .monotonic_ns ()
272+ # Track throughput at method entry
273+ method_name : str = self ._get_method_name ()
274+ endpoint_call_throughput_counter .add (1 , attributes = {"method" : method_name })
250275 p , unranked = self ._port ()
251276 r : RankedPortReceiver [R ] = unranked .ranked ()
252277 # pyre-ignore[6]: ParamSpec kwargs is compatible with Dict[str, Any]
253278 extent : Extent = self ._send (args , kwargs , port = p )
254279
255- @self ._with_latency_measurement (
256- start_time , endpoint_call_latency_histogram , extent .nelements
280+ @self ._with_telemetry (
281+ start_time ,
282+ endpoint_call_latency_histogram ,
283+ endpoint_call_error_counter ,
284+ extent .nelements ,
257285 )
258286 async def process () -> "ValueMesh[R]" :
259287 from monarch ._rust_bindings .monarch_hyperactor .shape import Shape
@@ -280,14 +308,22 @@ def stream(
280308 This enables processing results from multiple actors incrementally as
281309 they become available. Returns an async generator of response values.
282310 """
311+ # Track throughput at method entry
312+ method_name : str = self ._get_method_name ()
313+ endpoint_stream_throughput_counter .add (1 , attributes = {"method" : method_name })
314+
283315 p , r_port = self ._port ()
284316 start_time : int = time .monotonic_ns ()
285317 # pyre-ignore[6]: ParamSpec kwargs is compatible with Dict[str, Any]
286318 extent : Extent = self ._send (args , kwargs , port = p )
287319 r : "PortReceiver[R]" = r_port
288320
289- latency_decorator : Any = self ._with_latency_measurement (
290- start_time , endpoint_stream_latency_histogram , extent .nelements
321+ # Note: stream doesn't track errors per-yield since errors propagate to caller
322+ latency_decorator : Any = self ._with_telemetry (
323+ start_time ,
324+ endpoint_stream_latency_histogram ,
325+ endpoint_broadcast_error_counter , # Placeholder, errors not tracked per-yield
326+ extent .nelements ,
291327 )
292328
293329 def _stream () -> Generator [Future [R ], None , None ]:
@@ -311,8 +347,18 @@ def broadcast(self, *args: P.args, **kwargs: P.kwargs) -> None:
311347 """
312348 from monarch ._src .actor .actor_mesh import send
313349
314- # pyre-ignore[6]: ParamSpec kwargs is compatible with Dict[str, Any]
315- send (self , args , kwargs )
350+ method_name : str = self ._get_method_name ()
351+ attributes = {
352+ "method" : method_name ,
353+ "actor_count" : 0 , # broadcast doesn't track specific count
354+ }
355+ try :
356+ # pyre-ignore[6]: ParamSpec kwargs is compatible with Dict[str, Any]
357+ send (self , args , kwargs )
358+ endpoint_broadcast_throughput_counter .add (1 , attributes = attributes )
359+ except Exception :
360+ endpoint_broadcast_error_counter .add (1 , attributes = attributes )
361+ raise
316362
317363 @abstractmethod
318364 def _rref (self , args : Tuple [Any , ...], kwargs : Dict [str , Any ]) -> R : ...
0 commit comments