diff --git a/livekit-rtc/livekit/rtc/participant.py b/livekit-rtc/livekit/rtc/participant.py index 22b07435..969d5811 100644 --- a/livekit-rtc/livekit/rtc/participant.py +++ b/livekit-rtc/livekit/rtc/participant.py @@ -18,6 +18,7 @@ import asyncio import os import mimetypes +import io import aiofiles from typing import List, Union, Callable, Dict, Awaitable, Optional, Mapping, cast, TypeVar from abc import abstractmethod, ABC @@ -664,6 +665,82 @@ async def send_file( return writer.info + async def send_bytes( + self, + data: Union[bytes, bytearray, memoryview, io.IOBase], + name: str, + *, + mime_type: str = "application/octet-stream", + topic: str = "", + destination_identities: Optional[List[str]] = None, + attributes: Optional[Dict[str, str]] = None, + stream_id: str | None = None, + ) -> ByteStreamInfo: + """ + Send in-memory bytes or a file-like object as a byte stream. + + Accepts common Python byte/blob types: bytes, bytearray, memoryview, and readable io.IOBase + (e.g., io.BytesIO, buffered readers). The name is used for the stream metadata. + """ + # Bytes-like input path + if isinstance(data, (bytes, bytearray, memoryview)): + buffer = bytes(data) + total_size = len(buffer) + + writer: ByteStreamWriter = await self.stream_bytes( + name=name, + total_size=total_size, + mime_type=mime_type, + attributes=attributes, + stream_id=stream_id, + destination_identities=destination_identities, + topic=topic, + ) + + offset = 0 + while offset < total_size: + end = min(offset + STREAM_CHUNK_SIZE, total_size) + await writer.write(buffer[offset:end]) + offset = end + + await writer.aclose() + return writer.info + + # File-like input path + if isinstance(data, io.IOBase) and data.readable(): + total_size: Optional[int] = None + try: + if data.seekable(): + current_pos = data.tell() + data.seek(0, io.SEEK_END) + end_pos = data.tell() + total_size = end_pos - current_pos + data.seek(current_pos, io.SEEK_SET) + except Exception: + total_size = None + + writer = await self.stream_bytes( + name=name, + total_size=total_size, + mime_type=mime_type, + attributes=attributes, + stream_id=stream_id, + destination_identities=destination_identities, + topic=topic, + ) + + while True: + chunk = data.read(STREAM_CHUNK_SIZE) + if not chunk: + break + await writer.write(chunk) + await writer.aclose() + return writer.info + + raise TypeError( + "Unsupported data type for send_bytes. Expected bytes, bytearray, memoryview, or a readable io.IOBase." + ) + async def publish_track( self, track: LocalTrack, options: TrackPublishOptions = TrackPublishOptions() ) -> LocalTrackPublication: