Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Added `OpenAIRealtimeLLMService` and `AzureRealtimeLLMService` which provide
access to OpenAI Realtime.

- Added `BeyVideoService`. This is an integration for Beyond Presence AI video avatars.
(see <https://beyondpresence.ai>)

### Removed

- Remove `VisionImageRawFrame` in favor of context frames (`LLMContextFrame` or
Expand Down
3 changes: 3 additions & 0 deletions env.example
Original file line number Diff line number Diff line change
Expand Up @@ -155,3 +155,6 @@ NVIDIA_API_KEY=...

# Qwen
QWEN_API_KEY=...

# Beyond Presence
BEY_API_KEY=...
165 changes: 165 additions & 0 deletions examples/foundational/45-bey-video-service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
#
# Copyright (c) 2024–2025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

import os
from typing import Any

import aiohttp
from dotenv import load_dotenv
from loguru import logger

from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.bey.video import BeyVideoService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.services.openai.stt import OpenAISTTService
from pipecat.services.openai.tts import OpenAITTSService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams, DailyTransport
from pipecat.transports.daily.utils import (
DailyRESTHelper,
DailyMeetingTokenParams,
DailyMeetingTokenProperties,
)


load_dotenv(override=True)

# Ege stock avatar
# Ref: https://docs.bey.dev/get-started/avatars/default
AVATAR_ID = "b9be11b8-89fb-4227-8f86-4a881393cbdb"

# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=False,
video_out_enabled=False,
video_out_is_live=False,
microphone_out_enabled=False,
vad_analyzer=SileroVADAnalyzer(),
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=False,
video_out_enabled=False,
video_out_is_live=False,
vad_analyzer=SileroVADAnalyzer(),
),
}


async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
if not isinstance(transport, DailyTransport):
raise ValueError("This example only supports Daily transport")
# TODO: Support Small WebRTC transport
async with aiohttp.ClientSession() as session:
stt = OpenAISTTService(api_key=os.getenv("OPENAI_API_KEY"))

tts = OpenAITTSService(api_key=os.getenv("OPENAI_API_KEY"))

llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))

daily_rest_helper = DailyRESTHelper(
daily_api_key=os.getenv("DAILY_API_KEY"),
aiohttp_session=session,
)

video_bot_name = "My Video Bot"

bey_video = BeyVideoService(
api_key=os.getenv("BEY_API_KEY"),
avatar_id=AVATAR_ID,
bot_name=video_bot_name,
# we stream audio to a video bot in the Daily room, so we need this
transport_client=transport._client,
# video bot joins the room remotely on demand, we need these to manage it
rest_helper=daily_rest_helper,
session=session,
)

messages = [
{
"role": "system",
"content": "You are a helpful assistant. Your output will be converted to audio so don't include special characters in your answers. Be succinct and respond to what the user said in a creative and helpful way.",
},
]

context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)

pipeline = Pipeline(
[
transport.input(), # Transport user input
stt, # STT
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
bey_video, # Bey Video Avatar
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)

task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)

@transport.event_handler("on_participant_joined")
async def on_participant_joined(transport, participant: dict[str, Any]):
logger.info(f"Participant joined: {participant['info']['userName']}")

if participant["info"]["userName"] == video_bot_name:
await transport.update_subscriptions(
participant_settings={
participant["id"]: {"media": {"microphone": "unsubscribed"}}
}
)
return

# Kick off the conversation.
messages.append(
{
"role": "system",
"content": "Please introduce yourself to the user.",
}
)
await task.queue_frames([LLMRunFrame()])

@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()

runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)

await runner.run(task)


async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)


if __name__ == "__main__":
from pipecat.runner.run import main

main()
166 changes: 166 additions & 0 deletions src/pipecat/services/bey/video.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
#
# Copyright (c) 2024–2025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

"""Beyond Presence implementation for Pipecat.

This module provides integration with the Beyond Presence API to generate avatar videos
starting from voice agents.
"""

import asyncio

import aiohttp

from pipecat.audio.utils import create_stream_resampler
from pipecat.frames.frames import (
BotStartedSpeakingFrame,
Frame,
SpeechOutputAudioRawFrame,
StartFrame,
StartInterruptionFrame,
TransportMessageFrame,
TTSAudioRawFrame,
TTSStartedFrame,
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.ai_service import AIService
from pipecat.transports.daily.transport import DailyTransportClient
from pipecat.transports.daily.utils import (
DailyRESTHelper,
DailyMeetingTokenParams,
DailyMeetingTokenProperties,
)

BASE_API_URL = "https://api.bey.dev/v1"
FRAME_RATE = 25


class BeyVideoService(AIService):
"""A service that integrates Beyond Presence's avatar video generation into the pipeline.

Converts audio stream from the pipeline into an avatar video stream posted directly
to a Daily room from an external worker managed by Beyond Presence.
"""

def __init__(
self,
api_key: str,
avatar_id: str,
bot_name: str,
transport_client: DailyTransportClient,
rest_helper: DailyRESTHelper,
session: aiohttp.ClientSession,
**kwargs,
) -> None:
"""Initialize the Beyond Presence speech-to-video service.

Args:
api_key: Beyond Presence API key used for authentication.
avatar_id: ID of the Beyond Presence avatar to use for video synthesis.
bot_name: Name of the bot as it appears in the Daily room.
transport_client: DailyTransportClient for managing WebRTC connections.
rest_helper: DailyRESTHelper for REST API interactions with Daily.
session: Async HTTP session used for communication with Beyond Presence.
**kwargs: Additional arguments passed to the parent AIService class.
"""
super().__init__(**kwargs)

self._api_key = api_key
self._avatar_id = avatar_id
self._bot_name = bot_name
self._transport_client = transport_client
self._rest_helper = rest_helper
self._session = session

self._resampler = create_stream_resampler()
self._queue = asyncio.Queue()
self._out_sample_rate = 24000
self._audio_buffer = bytearray()
self._transport_destination: str = "bey-custom-track"
self._http_session: aiohttp.ClientSession | None = None

async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process frames through the service.

Args:
frame: The frame to process.
direction: The direction of frame processing.
"""
await super().process_frame(frame, direction)

if isinstance(frame, StartFrame):
token = await self._rest_helper.get_token(
room_url=self._transport_client.room_url,
params=DailyMeetingTokenParams(
properties=DailyMeetingTokenProperties(user_name=self._bot_name),
),
expiry_time=3600, # 1 hour
)
await self._start_session(
room_url=self._transport_client.room_url,
token=token,
)
await self._transport_client.register_audio_destination(self._transport_destination)
await self.push_frame(frame, direction)
elif isinstance(frame, StartInterruptionFrame):
frame.transport_destination = self._transport_destination
transport_frame = TransportMessageFrame(message="interrupt")
await self._transport_client.send_message(transport_frame)
elif isinstance(frame, TTSAudioRawFrame):
in_sample_rate = frame.sample_rate
chunk_size = int((self._out_sample_rate * 2) / FRAME_RATE)

resampled = await self._resampler.resample(
frame.audio, in_sample_rate, self._out_sample_rate
)
self._audio_buffer.extend(resampled)
while len(self._audio_buffer) >= chunk_size:
chunk = SpeechOutputAudioRawFrame(
bytes(self._audio_buffer[:chunk_size]),
sample_rate=self._out_sample_rate,
num_channels=frame.num_channels,
)

chunk.transport_destination = self._transport_destination

self._audio_buffer = self._audio_buffer[chunk_size:]
await self._transport_client.write_audio_frame(chunk)
elif isinstance(frame, TTSStartedFrame):
await self.start_ttfb_metrics()
elif isinstance(frame, BotStartedSpeakingFrame):
# We constantly receive audio through WebRTC, but most of the time it is silence.
# As soon as we receive actual audio, the base output transport will create a
# BotStartedSpeakingFrame, which we can use as a signal for the TTFB metrics.
await self.stop_ttfb_metrics()
else:
await self.push_frame(frame, direction)

def can_generate_metrics(self) -> bool:
"""Check if the service can generate metrics.

Returns:
True if metrics generation is supported.
"""
return True

async def _start_session(self, room_url: str, token: str) -> None:
async with self._session.post(
f"{BASE_API_URL}/session",
headers={
"x-api-key": self._api_key,
},
json={
"avatar_id": self._avatar_id,
"transport_type": "pipecat",
# TODO: we might want to rename these to just url and token
"pipecat_url": room_url,
"pipecat_token": token,
},
) as response:
if not response.ok:
text = await response.text()
raise Exception(f"Server returned error {response.status}: {text}")
return