|
| 1 | +# |
| 2 | +# Copyright (c) 2024–2025, Daily |
| 3 | +# |
| 4 | +# SPDX-License-Identifier: BSD 2-Clause License |
| 5 | +# |
| 6 | + |
| 7 | +"""Beyond Presence implementation for Pipecat. |
| 8 | +
|
| 9 | +This module provides integration with the Beyond Presence API to generate avatar videos |
| 10 | +starting from voice agents. |
| 11 | +""" |
| 12 | + |
| 13 | +import asyncio |
| 14 | +import os |
| 15 | +from typing import Optional |
| 16 | + |
| 17 | +import aiohttp |
| 18 | + |
| 19 | +from pipecat.audio.utils import create_stream_resampler |
| 20 | +from pipecat.frames.frames import ( |
| 21 | + BotStartedSpeakingFrame, |
| 22 | + Frame, |
| 23 | + SpeechOutputAudioRawFrame, |
| 24 | + StartFrame, |
| 25 | + StartInterruptionFrame, |
| 26 | + TransportMessageFrame, |
| 27 | + TTSAudioRawFrame, |
| 28 | + TTSStartedFrame, |
| 29 | +) |
| 30 | +from pipecat.processors.frame_processor import FrameDirection, FrameProcessorSetup |
| 31 | +from pipecat.services.ai_service import AIService |
| 32 | +from pipecat.transports.services.daily import DailyParams, DailyTransport, DailyTransportClient |
| 33 | +from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper |
| 34 | + |
| 35 | +BASE_API_URL = "https://api.bey.dev/v1" |
| 36 | +FRAME_RATE = 25 |
| 37 | + |
| 38 | + |
| 39 | +class BeyVideoService(AIService): |
| 40 | + """A service that integrates Beyond Presence's avatar video generation into the pipeline. |
| 41 | +
|
| 42 | + Converts audio stream from the pipeline into an avatar video stream posted directly |
| 43 | + to a Daily room from an external worker managed by Beyond Presence. |
| 44 | + """ |
| 45 | + |
| 46 | + def __init__( |
| 47 | + self, |
| 48 | + api_key: str, |
| 49 | + avatar_id: str, |
| 50 | + # TODO: Is it possible to elegantly infer this from the pipeline's transport? |
| 51 | + # NOTE: Unlike other providers, bey posts video directly to the room, |
| 52 | + # likely resulting in lower latency |
| 53 | + room_url: str, |
| 54 | + session: aiohttp.ClientSession, |
| 55 | + **kwargs, |
| 56 | + ) -> None: |
| 57 | + """Initialize the Beyond Presence speech-to-video service. |
| 58 | +
|
| 59 | + Args: |
| 60 | + api_key: Beyond Presence API key used for authentication. |
| 61 | + avatar_id: ID of the Beyond Presence avatar to use for video synthesis. |
| 62 | + room_url: URL of the Daily room the speech-to-video service will connect to. |
| 63 | + session: Async HTTP session used for communication with Beyond Presence. |
| 64 | + **kwargs: Additional arguments passed to the parent AIService class. |
| 65 | + """ |
| 66 | + super().__init__(**kwargs) |
| 67 | + |
| 68 | + self._api_key = api_key |
| 69 | + self._room_url = room_url |
| 70 | + self._avatar_id = avatar_id |
| 71 | + self._session = session |
| 72 | + |
| 73 | + self._client: Optional[DailyTransportClient] = None |
| 74 | + |
| 75 | + self._resampler = create_stream_resampler() |
| 76 | + self._queue = asyncio.Queue() |
| 77 | + self._out_sample_rate = 16000 |
| 78 | + self._audio_buffer = bytearray() |
| 79 | + self._transport_destination: str = "bey-custom-track" |
| 80 | + self._http_session: aiohttp.ClientSession | None = None |
| 81 | + |
| 82 | + async def setup(self, setup: FrameProcessorSetup): |
| 83 | + """Set up the Beyond Presence video service. |
| 84 | +
|
| 85 | + Args: |
| 86 | + setup: Frame processor setup configuration. |
| 87 | + """ |
| 88 | + await super().setup(setup) |
| 89 | + |
| 90 | + daily_rest_helper = DailyRESTHelper( |
| 91 | + daily_api_key=key, |
| 92 | + aiohttp_session=self._session, |
| 93 | + ) |
| 94 | + token_expiry_time: float = 60 * 60 # 1 hour |
| 95 | + token = await daily_rest_helper.get_token(url, expiry_time) |
| 96 | + # TODO: Fix this hacky way of obtaining the DailyTransportClient |
| 97 | + self._client = DailyTransport( |
| 98 | + self._room_url, |
| 99 | + token, |
| 100 | + "Bey example Bot", |
| 101 | + DailyParams( |
| 102 | + audio_in_enabled=True, |
| 103 | + video_out_enabled=False, |
| 104 | + video_out_is_live=False, |
| 105 | + microphone_out_enabled=False, |
| 106 | + vad_analyzer=SileroVADAnalyzer(), |
| 107 | + ), |
| 108 | + )._client |
| 109 | + await self._client.setup(setup) |
| 110 | + |
| 111 | + async def process_frame(self, frame: Frame, direction: FrameDirection): |
| 112 | + """Process frames through the service. |
| 113 | +
|
| 114 | + Args: |
| 115 | + frame: The frame to process. |
| 116 | + direction: The direction of frame processing. |
| 117 | + """ |
| 118 | + await super().process_frame(frame, direction) |
| 119 | + |
| 120 | + if isinstance(frame, StartFrame): |
| 121 | + await self._start_session(room_url=self._client.room_url, token=self._client._token) |
| 122 | + await self._client.register_audio_destination(self._transport_destination) |
| 123 | + await self.push_frame(frame, direction) |
| 124 | + elif isinstance(frame, StartInterruptionFrame): |
| 125 | + frame.transport_destination = self._transport_destination |
| 126 | + transport_frame = TransportMessageFrame(message="interrupt") |
| 127 | + await self._client.send_message(transport_frame) |
| 128 | + elif isinstance(frame, TTSAudioRawFrame): |
| 129 | + in_sample_rate = frame.sample_rate |
| 130 | + chunk_size = int((self._out_sample_rate * 2) / FRAME_RATE) |
| 131 | + |
| 132 | + resampled = await self._resampler.resample( |
| 133 | + frame.audio, in_sample_rate, self._out_sample_rate |
| 134 | + ) |
| 135 | + self._audio_buffer.extend(resampled) |
| 136 | + while len(self._audio_buffer) >= chunk_size: |
| 137 | + chunk = SpeechOutputAudioRawFrame( |
| 138 | + bytes(self._audio_buffer[:chunk_size]), |
| 139 | + sample_rate=self._out_sample_rate, |
| 140 | + num_channels=frame.num_channels, |
| 141 | + ) |
| 142 | + |
| 143 | + chunk.transport_destination = self._transport_destination |
| 144 | + |
| 145 | + self._audio_buffer = self._audio_buffer[chunk_size:] |
| 146 | + await self._client.write_audio_frame(chunk) |
| 147 | + elif isinstance(frame, TTSStartedFrame): |
| 148 | + await self.start_ttfb_metrics() |
| 149 | + elif isinstance(frame, BotStartedSpeakingFrame): |
| 150 | + # We constantly receive audio through WebRTC, but most of the time it is silence. |
| 151 | + # As soon as we receive actual audio, the base output transport will create a |
| 152 | + # BotStartedSpeakingFrame, which we can use as a signal for the TTFB metrics. |
| 153 | + await self.stop_ttfb_metrics() |
| 154 | + else: |
| 155 | + await self.push_frame(frame, direction) |
| 156 | + |
| 157 | + def can_generate_metrics(self) -> bool: |
| 158 | + """Check if the service can generate metrics. |
| 159 | +
|
| 160 | + Returns: |
| 161 | + True if metrics generation is supported. |
| 162 | + """ |
| 163 | + return True |
| 164 | + |
| 165 | + async def _start_session(self, room_url: str, token: str) -> None: |
| 166 | + async with self._session().post( |
| 167 | + f"{BASE_API_URL}/session", |
| 168 | + headers={ |
| 169 | + "x-api-key": self._api_key, |
| 170 | + }, |
| 171 | + json={ |
| 172 | + "avatar_id": self._avatar_id, |
| 173 | + "transport_type": "pipecat", |
| 174 | + # TODO: we might want to rename these to just url and token |
| 175 | + "pipecat_url": room_url, |
| 176 | + "pipecat_token": token, |
| 177 | + }, |
| 178 | + ) as response: |
| 179 | + if not response.ok: |
| 180 | + text = await response.text() |
| 181 | + raise Exception("Server returned an error", status_code=response.status, body=text) |
| 182 | + return |
0 commit comments