Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions examples/foundational/26a-gemini-multimodal-live-transcription.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,19 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")

llm = GeminiMultimodalLiveLLMService(
# model="models/gemini-2.5-flash-preview-native-audio-dialog",
api_key=os.getenv("GOOGLE_API_KEY"),
voice_id="Aoede", # Puck, Charon, Kore, Fenrir, Aoede
# system_instruction="Talk like a pirate."
# inference_on_context_initialization=False,
# params=InputParams(
# session_resumption=SessionResumptionParams(
# ),
# context_window_compression=ContextWindowCompressionParams(
# enabled=True,
# ),
# media_resolution=GeminiMediaResolution.LOW
# ),
)

context = OpenAILLMContext(
Expand Down Expand Up @@ -109,6 +118,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
task = PipelineTask(
pipeline,
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
),
Expand Down
39 changes: 39 additions & 0 deletions src/pipecat/services/gemini_multimodal_live/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,20 @@ class AudioTranscriptionConfig(BaseModel):
pass


class ContextWindowCompressionConfig(BaseModel):
"""Configuration for context window compression."""

sliding_window: Optional[dict] = Field(default=True)
trigger_tokens: Optional[int] = Field(default=None)


class SessionResumptionConfig(BaseModel):
"""Configuration for session resumption."""

transparent: Optional[bool] = Field(default=None)
handle: Optional[str] = Field(default=None)


class Setup(BaseModel):
"""Setup configuration for the Gemini Live session.

Expand All @@ -247,6 +261,8 @@ class Setup(BaseModel):
input_audio_transcription: Input audio transcription config. Defaults to None.
output_audio_transcription: Output audio transcription config. Defaults to None.
realtime_input_config: Realtime input configuration. Defaults to None.
context_window_compression: context_window_compression. Defaults to None.
session_resumption: session_resumption. Defaults to None.
"""

model: str
Expand All @@ -256,6 +272,8 @@ class Setup(BaseModel):
input_audio_transcription: Optional[AudioTranscriptionConfig] = None
output_audio_transcription: Optional[AudioTranscriptionConfig] = None
realtime_input_config: Optional[RealtimeInputConfig] = None
context_window_compression: Optional[ContextWindowCompressionConfig] = None
session_resumption: Optional[SessionResumptionConfig] = None


class Config(BaseModel):
Expand Down Expand Up @@ -392,6 +410,11 @@ class BidiGenerateContentTranscription(BaseModel):
text: str


class Duration(BaseModel):
seconds: int
nanos: int


class ServerContent(BaseModel):
"""Content sent from server to client.

Expand Down Expand Up @@ -485,6 +508,20 @@ class UsageMetadata(BaseModel):
toolUsePromptTokensDetails: Optional[List[ModalityTokenCount]] = None


class GoAway(BaseModel):
"""Server will not be able to service client soon."""

timeLeft: str


class SessionResumptionUpdate(BaseModel):
"""Update of the session resumption state. Only sent if BidiGenerateContentSetup.session_resumption was set."""

newHandle: Optional[str] = None
resumable: Optional[bool] = None
lastConsumedClientMessageIndex: Optional[int] = None


class ServerEvent(BaseModel):
"""Server event received from the Gemini Live API.

Expand All @@ -499,6 +536,8 @@ class ServerEvent(BaseModel):
serverContent: Optional[ServerContent] = None
toolCall: Optional[ToolCall] = None
usageMetadata: Optional[UsageMetadata] = None
goAway: Optional[GoAway] = None
sessionResumptionUpdate: Optional[SessionResumptionUpdate] = None


def parse_server_event(str):
Expand Down
39 changes: 39 additions & 0 deletions src/pipecat/services/gemini_multimodal_live/gemini.py
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,12 @@ class ContextWindowCompressionParams(BaseModel):
) # None = use default (80% of context window)


class SessionResumptionParams(BaseModel):
"""Parameters for session resumption."""

transparent: Optional[bool] = Field(default=None)
handle: Optional[str] = Field(default=None)

class InputParams(BaseModel):
"""Input parameters for Gemini Multimodal Live generation.

Expand Down Expand Up @@ -464,6 +470,7 @@ class InputParams(BaseModel):
)
vad: Optional[GeminiVADParams] = Field(default=None)
context_window_compression: Optional[ContextWindowCompressionParams] = Field(default=None)
session_resumption: Optional[SessionResumptionParams] = Field(default=None)
extra: Optional[Dict[str, Any]] = Field(default_factory=dict)


Expand Down Expand Up @@ -565,6 +572,9 @@ def __init__(
"language": self._language_code,
"media_resolution": params.media_resolution,
"vad": params.vad,
"session_resumption": params.session_resumption.model_dump()
if params.session_resumption
else None,
"context_window_compression": params.context_window_compression.model_dump()
if params.context_window_compression
else {},
Expand Down Expand Up @@ -842,6 +852,20 @@ async def _connect(self):

config_data["setup"]["context_window_compression"] = compression_config

if self._settings.get("session_resumption"):
session_resumption_config = {}

transparent = self._settings.get("session_resumption").get("transparent")
handle = self._settings.get("session_resumption").get("handle")

if transparent is not None:
session_resumption_config["transparent"] = transparent

if handle is not None:
session_resumption_config["handle"] = handle

config_data["setup"]["session_resumption"] = session_resumption_config

# Add VAD configuration if provided
if self._settings.get("vad"):
vad_config = {}
Expand Down Expand Up @@ -886,6 +910,9 @@ async def _connect(self):
logger.debug(f"Gemini is configuring to use tools{self._tools}")
config.setup.tools = self.get_llm_adapter().from_standard_tools(self._tools)

logger.debug(f"settings {self._settings}")
logger.debug(f"config {config.model_dump(exclude_none=True)}")

# Send the configuration
await self.send_client_event(config)

Expand Down Expand Up @@ -953,6 +980,10 @@ async def _receive_task_handler(self):
await self._handle_evt_grounding_metadata(evt)
elif evt.toolCall:
await self._handle_evt_tool_call(evt)
elif evt.goAway:
await self._handle_evt_go_away(evt)
elif evt.sessionResumptionUpdate:
await self._handle_evt_session_resumption_update(evt)
elif False: # !!! todo: error events?
await self._handle_evt_error(evt)
# errors are fatal, so exit the receive loop
Expand Down Expand Up @@ -1192,6 +1223,14 @@ async def _handle_evt_tool_call(self, evt):

await self.run_function_calls(function_calls_llm)

@traced_gemini_live(operation="llm_go_away")
async def _handle_evt_go_away(self, evt):
logger.debug(f"Gemini is going away in {evt.goAway.timeLeft}")

@traced_gemini_live(operation="llm_session_resumption_update")
async def _handle_evt_session_resumption_update(self, evt):
logger.debug(f"Gemini session resumption update: {evt.sessionResumptionUpdate}")

@traced_gemini_live(operation="llm_response")
async def _handle_evt_turn_complete(self, evt):
"""Handle the turn complete event."""
Expand Down