diff --git a/livekit-rtc/livekit/rtc/room.py b/livekit-rtc/livekit/rtc/room.py index db0f92bf..d8766309 100644 --- a/livekit-rtc/livekit/rtc/room.py +++ b/livekit-rtc/livekit/rtc/room.py @@ -17,7 +17,7 @@ import ctypes import logging from dataclasses import dataclass, field -from typing import Callable, Dict, Literal, Optional, cast, Mapping +from typing import Callable, Dict, Literal, Optional, cast, Mapping, Any from .event_emitter import EventEmitter from ._ffi_client import FfiClient, FfiHandle @@ -753,7 +753,11 @@ def _handle_stream_header( text_reader = TextStreamReader(header) self._text_stream_readers[header.stream_id] = text_reader - text_stream_handler(text_reader, participant_identity) + task: asyncio.Task[Any] = asyncio.create_task( + text_stream_handler(text_reader, participant_identity) # type: ignore + ) + self._data_stream_tasks.add(task) + task.add_done_callback(self._data_stream_tasks.discard) elif stream_type == "byte_header": byte_stream_handler = self._byte_stream_handlers.get(header.topic) if byte_stream_handler is None: @@ -765,7 +769,11 @@ def _handle_stream_header( byte_reader = ByteStreamReader(header) self._byte_stream_readers[header.stream_id] = byte_reader - byte_stream_handler(byte_reader, participant_identity) + task = asyncio.create_task( + byte_stream_handler(byte_reader, participant_identity) # type: ignore + ) + self._data_stream_tasks.add(task) + task.add_done_callback(self._data_stream_tasks.discard) else: logging.warning("received unknown header type, %s", stream_type) pass