Skip to content

Commit 1abe8c5

Browse files
committed
clean up media devices
1 parent 2b32567 commit 1abe8c5

File tree

1 file changed

+32
-8
lines changed

1 file changed

+32
-8
lines changed

livekit-rtc/livekit/rtc/media_devices.py

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
import asyncio
1818
from dataclasses import dataclass
19-
from typing import Any, Optional
19+
from typing import Any, AsyncIterator, Optional
2020

2121
import numpy as np
2222
import sounddevice as sd # type: ignore[import-not-found, import-untyped]
@@ -57,6 +57,27 @@
5757
BLOCKSIZE = 4800 # 100 ms I/O buffer size for sounddevice
5858

5959

60+
class _AudioStreamIterator:
61+
"""Adapter to convert AudioStream (AsyncIterator[AudioFrameEvent]) to AsyncIterator[AudioFrame].
62+
63+
This adapter wraps an AudioStream and extracts the frame from each AudioFrameEvent,
64+
making it compatible with AudioMixer which expects AsyncIterator[AudioFrame].
65+
"""
66+
67+
def __init__(self, audio_stream: AudioStream) -> None:
68+
self._audio_stream = audio_stream
69+
70+
def __aiter__(self) -> AsyncIterator[AudioFrame]:
71+
return self
72+
73+
async def __anext__(self) -> AudioFrame:
74+
event = await self._audio_stream.__anext__()
75+
return event.frame
76+
77+
async def aclose(self) -> None:
78+
await self._audio_stream.aclose()
79+
80+
6081
def _ensure_loop(loop: Optional[asyncio.AbstractEventLoop]) -> asyncio.AbstractEventLoop:
6182
return loop or asyncio.get_event_loop()
6283

@@ -152,7 +173,7 @@ def __init__(
152173

153174
# Internal mixer for add_track/remove_track API
154175
self._mixer: Optional[AudioMixer] = None
155-
self._track_streams: dict[str, AudioStream] = {} # track.sid -> AudioStream
176+
self._track_streams: dict[str, tuple[AudioStream, _AudioStreamIterator]] = {} # track.sid -> (AudioStream, adapter)
156177

157178
def _callback(outdata: np.ndarray, frame_count: int, time_info: Any, status: Any) -> None:
158179
# Pull PCM int16 from buffer; zero if not enough
@@ -230,9 +251,11 @@ def add_track(self, track: Track) -> None:
230251

231252
# Create audio stream for this track
232253
stream = AudioStream(track, sample_rate=self._sample_rate, num_channels=self._num_channels)
254+
# Wrap the stream with an adapter to convert AudioFrameEvent to AudioFrame
255+
stream_iterator = _AudioStreamIterator(stream)
233256

234-
self._track_streams[track.sid] = stream
235-
self._mixer.add_stream(stream)
257+
self._track_streams[track.sid] = (stream, stream_iterator)
258+
self._mixer.add_stream(stream_iterator)
236259

237260
async def remove_track(self, track: Track) -> None:
238261
"""Remove an audio track from the internal mixer.
@@ -242,13 +265,14 @@ async def remove_track(self, track: Track) -> None:
242265
Args:
243266
track: The audio track to remove.
244267
"""
245-
stream = self._track_streams.pop(track.sid, None)
246-
if stream is None:
268+
entry = self._track_streams.pop(track.sid, None)
269+
if entry is None:
247270
return
248271

272+
stream, stream_iterator = entry
249273
if self._mixer is not None:
250274
try:
251-
self._mixer.remove_stream(stream)
275+
self._mixer.remove_stream(stream_iterator)
252276
except Exception:
253277
pass
254278

@@ -309,7 +333,7 @@ async def aclose(self) -> None:
309333
pass
310334

311335
# Clean up all track streams
312-
for stream in list(self._track_streams.values()):
336+
for stream, _ in list(self._track_streams.values()):
313337
try:
314338
await stream.aclose()
315339
except Exception:

0 commit comments

Comments
 (0)