- 
                Notifications
    You must be signed in to change notification settings 
- Fork 1.4k
Added STT Translate Websocket Implementation #2533
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| Codecov Report❌ Patch coverage is  
 
 🚀 New features to boost your workflow:
 | 
| runner = [ "python-dotenv>=1.0.0,<2.0.0", "uvicorn>=0.32.0,<1.0.0", "fastapi>=0.115.6,<0.117.0", "pipecat-ai-small-webrtc-prebuilt>=1.0.0"] | ||
| sambanova = [] | ||
| sarvam = [ "websockets>=13.1,<15.0" ] | ||
| sarvam = [ "sarvamai", "websockets>=13.1,<15.0" ] | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we be more specific about the sarvamai package version? Maybe:
sarvam = [ "sarvamai>=0.1.19,<1", "websockets>=13.1,<15.0" ]
(or whatever upper bound makes sense)
| from pipecat.utils.tracing.service_decorators import traced_stt | ||
|  | ||
| try: | ||
| import httpx | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's remove httpx, which would be a new dependency. I see this just just used for query param parsing. Instead, let's use:
from urllib.parse import urlencode
This import would be above, outside of the try/except block.
| ) | ||
|  | ||
| # Add query parameters | ||
| query_params = httpx.QueryParams() | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we're removing httpx, this should be:
query_params = {"model": self._model, "vad_signals": "true"}
query_string = urlencode(query_params)
ws_url = ws_url + f"?{query_string}"
| ) | ||
| except ModuleNotFoundError as e: | ||
| logger.error(f"Exception: {e}") | ||
| logger.error("In order to use Sarvam, you need to `pip install sarvamai websockets httpx`.") | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update to:
| logger.error("In order to use Sarvam, you need to `pip install sarvamai websockets httpx`.") | |
| logger.error("In order to use Sarvam, you need to `pip install pipecat-ai[sarvam]`.") | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will install the optional dependencies from the sarvam pyproject.toml entry.
| data: EventData | ||
|  | ||
|  | ||
| class SarvamSpeechToTextTranslateService(STTService): | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we shorten the name to SarvamSTTService? This would be more consistent with other, similar services.
| class SarvamSpeechToTextTranslateService(STTService): | |
| class SarvamSTTService(STTService): | 
| *, | ||
| api_key: str, | ||
| model: str = "saaras:v2.5", | ||
| language_code: str = "hi-IN", | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pipecat provides Language enums:
from pipecat.transcriptions.language import Language
Instead, you should use:
| language_code: str = "hi-IN", | |
| language_code: str = Langage.HI_IN, | 
Then, include a function that takes in a Language and returns a string. The supported languages/dialects can be included. You can find examples in many of the service files. Gladia for example: https://github.com/pipecat-ai/pipecat/blob/main/src/pipecat/services/gladia/stt.py#L47.
| sample_rate: Audio sample rate. Defaults to 16000 if not specified. | ||
| **kwargs: Additional arguments passed to the parent STTService. | ||
| """ | ||
| super().__init__(sample_rate=sample_rate or 16000, **kwargs) | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| super().__init__(sample_rate=sample_rate or 16000, **kwargs) | |
| super().__init__(sample_rate=sample_rate, **kwargs) | 
| "audio": { | ||
| "data": audio_base64, | ||
| "encoding": "audio/wav", | ||
| "sample_rate": self.sample_rate, | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is where the base classes sample rate value is provided. No need to pass to the constructor. This happens automatically by using the self.sample_rate property.
| self._websocket = None | ||
| self._websocket_connection = None | ||
| self._listening_task = None | ||
| self._is_connected = False | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you need to track this state? Can't you just rely on the websocket's connection state? This would be much preferred.
| self._model = model | ||
| self._language_code = language_code | ||
|  | ||
| self._client = AsyncSarvamAI(api_subscription_key=api_key) | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you need the client? It seems like you're rely on websockets mainly. Can this class just use one or the other--websocket or AsyncSarvamAI?
| timestamp = parsed.data.occured_at | ||
| logger.debug(f"VAD Signal: {signal}, Occurred at: {timestamp}") | ||
|  | ||
| if signal == VADSignal.START: | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The STT service shouldn't push these frames. If you want to make VAD events available, I'd recommend emitting events similar how it's done in Deepgram:
https://github.com/pipecat-ai/pipecat/blob/main/src/pipecat/services/deepgram/stt.py#L267-L272
Then, the application code can handle this. Again, using the Deepgram events:
https://github.com/pipecat-ai/pipecat/blob/main/examples/foundational/07c-interruptible-deepgram-vad.py#L98-L104
This issue is, you can't just push these frames from the STT. They need to be pushed throughout the entire pipeline. This is something you can do in the application code from the PipelineTask. This ensures that all processes receive the frames.
| logger.error(f"Error handling Sarvam response: {e}") | ||
| await self.push_error(ErrorFrame(f"Failed to handle response: {e}")) | ||
|  | ||
| def _map_language_code_to_enum(self, language_code: str) -> Language: | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh! You have a language mapping function. Let's use it, as I mentioned above.
| Hi guys, I'd love to close this out soon. Any update on this one? | 
| 
 Hey @markbackman, unfortunately @pratham-sarvam will not be able to work on this PR. | 
| Sounds good. Closing out this PR. I'll look at the other one hopefully by next week. | 
Implemented a new Sarvam speech-to-text service which is a speech to text translate streaming websocket.