Skip to content

Conversation

@pratham-sarvam
Copy link
Contributor

Implemented a new Sarvam speech-to-text service which is a speech to text translate streaming websocket.

@codecov
Copy link

codecov bot commented Aug 29, 2025

Codecov Report

❌ Patch coverage is 0% with 190 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
src/pipecat/services/sarvam/stt.py 0.00% 190 Missing ⚠️
Files with missing lines Coverage Δ
src/pipecat/services/sarvam/stt.py 0.00% <0.00%> (ø)
🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

runner = [ "python-dotenv>=1.0.0,<2.0.0", "uvicorn>=0.32.0,<1.0.0", "fastapi>=0.115.6,<0.117.0", "pipecat-ai-small-webrtc-prebuilt>=1.0.0"]
sambanova = []
sarvam = [ "websockets>=13.1,<15.0" ]
sarvam = [ "sarvamai", "websockets>=13.1,<15.0" ]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we be more specific about the sarvamai package version? Maybe:

sarvam = [ "sarvamai>=0.1.19,<1", "websockets>=13.1,<15.0" ]

(or whatever upper bound makes sense)

from pipecat.utils.tracing.service_decorators import traced_stt

try:
import httpx
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove httpx, which would be a new dependency. I see this just just used for query param parsing. Instead, let's use:

from urllib.parse import urlencode

This import would be above, outside of the try/except block.

)

# Add query parameters
query_params = httpx.QueryParams()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we're removing httpx, this should be:

query_params = {"model": self._model, "vad_signals": "true"}
query_string = urlencode(query_params)
ws_url = ws_url + f"?{query_string}"

)
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Sarvam, you need to `pip install sarvamai websockets httpx`.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update to:

Suggested change
logger.error("In order to use Sarvam, you need to `pip install sarvamai websockets httpx`.")
logger.error("In order to use Sarvam, you need to `pip install pipecat-ai[sarvam]`.")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will install the optional dependencies from the sarvam pyproject.toml entry.

data: EventData


class SarvamSpeechToTextTranslateService(STTService):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we shorten the name to SarvamSTTService? This would be more consistent with other, similar services.

Suggested change
class SarvamSpeechToTextTranslateService(STTService):
class SarvamSTTService(STTService):

*,
api_key: str,
model: str = "saaras:v2.5",
language_code: str = "hi-IN",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pipecat provides Language enums:

from pipecat.transcriptions.language import Language

Instead, you should use:

Suggested change
language_code: str = "hi-IN",
language_code: str = Langage.HI_IN,

Then, include a function that takes in a Language and returns a string. The supported languages/dialects can be included. You can find examples in many of the service files. Gladia for example: https://github.com/pipecat-ai/pipecat/blob/main/src/pipecat/services/gladia/stt.py#L47.

sample_rate: Audio sample rate. Defaults to 16000 if not specified.
**kwargs: Additional arguments passed to the parent STTService.
"""
super().__init__(sample_rate=sample_rate or 16000, **kwargs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
super().__init__(sample_rate=sample_rate or 16000, **kwargs)
super().__init__(sample_rate=sample_rate, **kwargs)

"audio": {
"data": audio_base64,
"encoding": "audio/wav",
"sample_rate": self.sample_rate,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is where the base classes sample rate value is provided. No need to pass to the constructor. This happens automatically by using the self.sample_rate property.

self._websocket = None
self._websocket_connection = None
self._listening_task = None
self._is_connected = False
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need to track this state? Can't you just rely on the websocket's connection state? This would be much preferred.

self._model = model
self._language_code = language_code

self._client = AsyncSarvamAI(api_subscription_key=api_key)
Copy link
Contributor

@markbackman markbackman Sep 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need the client? It seems like you're rely on websockets mainly. Can this class just use one or the other--websocket or AsyncSarvamAI?

timestamp = parsed.data.occured_at
logger.debug(f"VAD Signal: {signal}, Occurred at: {timestamp}")

if signal == VADSignal.START:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The STT service shouldn't push these frames. If you want to make VAD events available, I'd recommend emitting events similar how it's done in Deepgram:
https://github.com/pipecat-ai/pipecat/blob/main/src/pipecat/services/deepgram/stt.py#L267-L272

Then, the application code can handle this. Again, using the Deepgram events:
https://github.com/pipecat-ai/pipecat/blob/main/examples/foundational/07c-interruptible-deepgram-vad.py#L98-L104

This issue is, you can't just push these frames from the STT. They need to be pushed throughout the entire pipeline. This is something you can do in the application code from the PipelineTask. This ensures that all processes receive the frames.

logger.error(f"Error handling Sarvam response: {e}")
await self.push_error(ErrorFrame(f"Failed to handle response: {e}"))

def _map_language_code_to_enum(self, language_code: str) -> Language:
Copy link
Contributor

@markbackman markbackman Sep 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh! You have a language mapping function. Let's use it, as I mentioned above.

@markbackman
Copy link
Contributor

Hi guys, I'd love to close this out soon. Any update on this one?

@shreyas-sarvam
Copy link
Contributor

Hi guys, I'd love to close this out soon. Any update on this one?

Hey @markbackman, unfortunately @pratham-sarvam will not be able to work on this PR.
I have implemented the changes as mentioned by you in this PR: #2821.
Looking forward your comments to closing it out quickly.

@markbackman
Copy link
Contributor

Sounds good. Closing out this PR.

I'll look at the other one hopefully by next week.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants