diff --git a/CHANGELOG.md b/CHANGELOG.md index c882fa7c5d..e411f5cbf1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Added `OpenAIRealtimeLLMService` and `AzureRealtimeLLMService` which provide access to OpenAI Realtime. +- Added `BeyVideoService`. This is an integration for Beyond Presence AI video avatars. + (see ) + ### Removed - Remove `VisionImageRawFrame` in favor of context frames (`LLMContextFrame` or diff --git a/env.example b/env.example index 3690b55239..c0ff8c2702 100644 --- a/env.example +++ b/env.example @@ -155,3 +155,6 @@ NVIDIA_API_KEY=... # Qwen QWEN_API_KEY=... + +# Beyond Presence +BEY_API_KEY=... diff --git a/examples/foundational/45-bey-video-service.py b/examples/foundational/45-bey-video-service.py new file mode 100644 index 0000000000..1ac43be6a1 --- /dev/null +++ b/examples/foundational/45-bey-video-service.py @@ -0,0 +1,165 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import os +from typing import Any + +import aiohttp +from dotenv import load_dotenv +from loguru import logger + +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.frames.frames import LLMRunFrame +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext +from pipecat.runner.types import RunnerArguments +from pipecat.runner.utils import create_transport +from pipecat.services.bey.video import BeyVideoService +from pipecat.services.openai.llm import OpenAILLMService +from pipecat.services.openai.stt import OpenAISTTService +from pipecat.services.openai.tts import OpenAITTSService +from pipecat.transports.base_transport import BaseTransport, TransportParams +from pipecat.transports.daily.transport import DailyParams, DailyTransport +from pipecat.transports.daily.utils import ( + DailyRESTHelper, + DailyMeetingTokenParams, + DailyMeetingTokenProperties, +) + + +load_dotenv(override=True) + +# Ege stock avatar +# Ref: https://docs.bey.dev/get-started/avatars/default +AVATAR_ID = "b9be11b8-89fb-4227-8f86-4a881393cbdb" + +# We store functions so objects (e.g. SileroVADAnalyzer) don't get +# instantiated. The function will be called when the desired transport gets +# selected. +transport_params = { + "daily": lambda: DailyParams( + audio_in_enabled=True, + audio_out_enabled=False, + video_out_enabled=False, + video_out_is_live=False, + microphone_out_enabled=False, + vad_analyzer=SileroVADAnalyzer(), + ), + "webrtc": lambda: TransportParams( + audio_in_enabled=True, + audio_out_enabled=False, + video_out_enabled=False, + video_out_is_live=False, + vad_analyzer=SileroVADAnalyzer(), + ), +} + + +async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): + logger.info(f"Starting bot") + if not isinstance(transport, DailyTransport): + raise ValueError("This example only supports Daily transport") + # TODO: Support Small WebRTC transport + async with aiohttp.ClientSession() as session: + stt = OpenAISTTService(api_key=os.getenv("OPENAI_API_KEY")) + + tts = OpenAITTSService(api_key=os.getenv("OPENAI_API_KEY")) + + llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY")) + + daily_rest_helper = DailyRESTHelper( + daily_api_key=os.getenv("DAILY_API_KEY"), + aiohttp_session=session, + ) + + video_bot_name = "My Video Bot" + + bey_video = BeyVideoService( + api_key=os.getenv("BEY_API_KEY"), + avatar_id=AVATAR_ID, + bot_name=video_bot_name, + # we stream audio to a video bot in the Daily room, so we need this + transport_client=transport._client, + # video bot joins the room remotely on demand, we need these to manage it + rest_helper=daily_rest_helper, + session=session, + ) + + messages = [ + { + "role": "system", + "content": "You are a helpful assistant. Your output will be converted to audio so don't include special characters in your answers. Be succinct and respond to what the user said in a creative and helpful way.", + }, + ] + + context = OpenAILLMContext(messages) + context_aggregator = llm.create_context_aggregator(context) + + pipeline = Pipeline( + [ + transport.input(), # Transport user input + stt, # STT + context_aggregator.user(), # User responses + llm, # LLM + tts, # TTS + bey_video, # Bey Video Avatar + transport.output(), # Transport bot output + context_aggregator.assistant(), # Assistant spoken responses + ] + ) + + task = PipelineTask( + pipeline, + params=PipelineParams( + enable_metrics=True, + enable_usage_metrics=True, + ), + idle_timeout_secs=runner_args.pipeline_idle_timeout_secs, + ) + + @transport.event_handler("on_participant_joined") + async def on_participant_joined(transport, participant: dict[str, Any]): + logger.info(f"Participant joined: {participant['info']['userName']}") + + if participant["info"]["userName"] == video_bot_name: + await transport.update_subscriptions( + participant_settings={ + participant["id"]: {"media": {"microphone": "unsubscribed"}} + } + ) + return + + # Kick off the conversation. + messages.append( + { + "role": "system", + "content": "Please introduce yourself to the user.", + } + ) + await task.queue_frames([LLMRunFrame()]) + + @transport.event_handler("on_client_disconnected") + async def on_client_disconnected(transport, client): + logger.info(f"Client disconnected") + await task.cancel() + + runner = PipelineRunner(handle_sigint=runner_args.handle_sigint) + + await runner.run(task) + + +async def bot(runner_args: RunnerArguments): + """Main bot entry point compatible with Pipecat Cloud.""" + transport = await create_transport(runner_args, transport_params) + await run_bot(transport, runner_args) + + +if __name__ == "__main__": + from pipecat.runner.run import main + + main() diff --git a/src/pipecat/services/bey/video.py b/src/pipecat/services/bey/video.py new file mode 100644 index 0000000000..d1ea68c13c --- /dev/null +++ b/src/pipecat/services/bey/video.py @@ -0,0 +1,166 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""Beyond Presence implementation for Pipecat. + +This module provides integration with the Beyond Presence API to generate avatar videos +starting from voice agents. +""" + +import asyncio + +import aiohttp + +from pipecat.audio.utils import create_stream_resampler +from pipecat.frames.frames import ( + BotStartedSpeakingFrame, + Frame, + SpeechOutputAudioRawFrame, + StartFrame, + StartInterruptionFrame, + TransportMessageFrame, + TTSAudioRawFrame, + TTSStartedFrame, +) +from pipecat.processors.frame_processor import FrameDirection +from pipecat.services.ai_service import AIService +from pipecat.transports.daily.transport import DailyTransportClient +from pipecat.transports.daily.utils import ( + DailyRESTHelper, + DailyMeetingTokenParams, + DailyMeetingTokenProperties, +) + +BASE_API_URL = "https://api.bey.dev/v1" +FRAME_RATE = 25 + + +class BeyVideoService(AIService): + """A service that integrates Beyond Presence's avatar video generation into the pipeline. + + Converts audio stream from the pipeline into an avatar video stream posted directly + to a Daily room from an external worker managed by Beyond Presence. + """ + + def __init__( + self, + api_key: str, + avatar_id: str, + bot_name: str, + transport_client: DailyTransportClient, + rest_helper: DailyRESTHelper, + session: aiohttp.ClientSession, + **kwargs, + ) -> None: + """Initialize the Beyond Presence speech-to-video service. + + Args: + api_key: Beyond Presence API key used for authentication. + avatar_id: ID of the Beyond Presence avatar to use for video synthesis. + bot_name: Name of the bot as it appears in the Daily room. + transport_client: DailyTransportClient for managing WebRTC connections. + rest_helper: DailyRESTHelper for REST API interactions with Daily. + session: Async HTTP session used for communication with Beyond Presence. + **kwargs: Additional arguments passed to the parent AIService class. + """ + super().__init__(**kwargs) + + self._api_key = api_key + self._avatar_id = avatar_id + self._bot_name = bot_name + self._transport_client = transport_client + self._rest_helper = rest_helper + self._session = session + + self._resampler = create_stream_resampler() + self._queue = asyncio.Queue() + self._out_sample_rate = 24000 + self._audio_buffer = bytearray() + self._transport_destination: str = "bey-custom-track" + self._http_session: aiohttp.ClientSession | None = None + + async def process_frame(self, frame: Frame, direction: FrameDirection): + """Process frames through the service. + + Args: + frame: The frame to process. + direction: The direction of frame processing. + """ + await super().process_frame(frame, direction) + + if isinstance(frame, StartFrame): + token = await self._rest_helper.get_token( + room_url=self._transport_client.room_url, + params=DailyMeetingTokenParams( + properties=DailyMeetingTokenProperties(user_name=self._bot_name), + ), + expiry_time=3600, # 1 hour + ) + await self._start_session( + room_url=self._transport_client.room_url, + token=token, + ) + await self._transport_client.register_audio_destination(self._transport_destination) + await self.push_frame(frame, direction) + elif isinstance(frame, StartInterruptionFrame): + frame.transport_destination = self._transport_destination + transport_frame = TransportMessageFrame(message="interrupt") + await self._transport_client.send_message(transport_frame) + elif isinstance(frame, TTSAudioRawFrame): + in_sample_rate = frame.sample_rate + chunk_size = int((self._out_sample_rate * 2) / FRAME_RATE) + + resampled = await self._resampler.resample( + frame.audio, in_sample_rate, self._out_sample_rate + ) + self._audio_buffer.extend(resampled) + while len(self._audio_buffer) >= chunk_size: + chunk = SpeechOutputAudioRawFrame( + bytes(self._audio_buffer[:chunk_size]), + sample_rate=self._out_sample_rate, + num_channels=frame.num_channels, + ) + + chunk.transport_destination = self._transport_destination + + self._audio_buffer = self._audio_buffer[chunk_size:] + await self._transport_client.write_audio_frame(chunk) + elif isinstance(frame, TTSStartedFrame): + await self.start_ttfb_metrics() + elif isinstance(frame, BotStartedSpeakingFrame): + # We constantly receive audio through WebRTC, but most of the time it is silence. + # As soon as we receive actual audio, the base output transport will create a + # BotStartedSpeakingFrame, which we can use as a signal for the TTFB metrics. + await self.stop_ttfb_metrics() + else: + await self.push_frame(frame, direction) + + def can_generate_metrics(self) -> bool: + """Check if the service can generate metrics. + + Returns: + True if metrics generation is supported. + """ + return True + + async def _start_session(self, room_url: str, token: str) -> None: + async with self._session.post( + f"{BASE_API_URL}/session", + headers={ + "x-api-key": self._api_key, + }, + json={ + "avatar_id": self._avatar_id, + "transport_type": "pipecat", + # TODO: we might want to rename these to just url and token + "pipecat_url": room_url, + "pipecat_token": token, + }, + ) as response: + if not response.ok: + text = await response.text() + raise Exception(f"Server returned error {response.status}: {text}") + return