-
Notifications
You must be signed in to change notification settings - Fork 101
Move RPC handlers to room #395
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
typester
wants to merge
17
commits into
main
Choose a base branch
from
typester/rpc-update
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from 12 commits
Commits
Show all changes
17 commits
Select commit
Hold shift + click to select a range
2fc5217
update rust-sdk
typester 9ecf29a
move some rpc functionality to room
typester 5ae87c7
fmt
typester 2c225b4
fix types
typester 4a87383
fix types
typester 22a4fbe
generated protobuf
github-actions[bot] a761391
forgot to add this
typester ddf8f8e
example
typester 0a6be2d
type check
typester 81ad61c
type check
typester 92a2b7c
add a comment
typester a8dc08f
fmt
typester b772e5d
remove deprecated example
typester 12c063e
generated protobuf
github-actions[bot] 04e5e06
use deprecated module
typester ed3904d
Merge remote-tracking branch 'origin/main' into typester/rpc-update
typester 0dfb13b
ruff
typester File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,277 @@ | ||
from livekit import rtc, api | ||
import os | ||
import json | ||
import asyncio | ||
from dotenv import load_dotenv | ||
from livekit.rtc.rpc import RpcInvocationData | ||
|
||
load_dotenv(dotenv_path=".env.local", override=False) | ||
LIVEKIT_API_KEY = os.getenv("LIVEKIT_API_KEY") | ||
LIVEKIT_API_SECRET = os.getenv("LIVEKIT_API_SECRET") | ||
LIVEKIT_URL = os.getenv("LIVEKIT_URL") | ||
if not LIVEKIT_API_KEY or not LIVEKIT_API_SECRET or not LIVEKIT_URL: | ||
raise ValueError("Missing required environment variables. Please check your .env.local file.") | ||
|
||
|
||
async def main(): | ||
rooms = [] # Keep track of all rooms for cleanup | ||
try: | ||
room_name = f"rpc-test-{os.urandom(4).hex()}" | ||
print(f"Connecting participants to room: {room_name}") | ||
|
||
callers_room, greeters_room, math_genius_room = await asyncio.gather( | ||
connect_participant("caller", room_name), | ||
connect_participant("greeter", room_name), | ||
connect_participant("math-genius", room_name), | ||
) | ||
rooms = [callers_room, greeters_room, math_genius_room] | ||
|
||
register_receiver_methods(greeters_room, math_genius_room) | ||
|
||
try: | ||
print("\n\nRunning greeting example...") | ||
await asyncio.gather(perform_greeting(callers_room)) | ||
except Exception as error: | ||
print("Error:", error) | ||
|
||
try: | ||
print("\n\nRunning error handling example...") | ||
await perform_divide(callers_room) | ||
except Exception as error: | ||
print("Error:", error) | ||
|
||
try: | ||
print("\n\nRunning math example...") | ||
await perform_square_root(callers_room) | ||
await asyncio.sleep(2) | ||
await perform_quantum_hypergeometric_series(callers_room) | ||
except Exception as error: | ||
print("Error:", error) | ||
|
||
try: | ||
print("\n\nRunning long calculation with timeout...") | ||
await asyncio.create_task(perform_long_calculation(callers_room)) | ||
except Exception as error: | ||
print("Error:", error) | ||
|
||
try: | ||
print("\n\nRunning long calculation with disconnect...") | ||
# Start the long calculation | ||
long_calc_task = asyncio.create_task(perform_long_calculation(callers_room)) | ||
# Wait a bit then disconnect the math genius | ||
await asyncio.sleep(5) | ||
print("\nDisconnecting math genius early...") | ||
await math_genius_room.disconnect() | ||
# Wait for the calculation to fail | ||
await long_calc_task | ||
except Exception as error: | ||
print("Error:", error) | ||
|
||
print("\n\nParticipants done, disconnecting remaining participants...") | ||
await callers_room.disconnect() | ||
await greeters_room.disconnect() | ||
|
||
print("Participants disconnected. Example completed.") | ||
|
||
except KeyboardInterrupt: | ||
print("\nReceived interrupt signal, cleaning up...") | ||
except Exception as e: | ||
print(f"Unexpected error: {e}") | ||
finally: | ||
# Clean up all rooms | ||
print("Disconnecting all participants...") | ||
await asyncio.gather(*(room.disconnect() for room in rooms), return_exceptions=True) | ||
print("Cleanup complete") | ||
|
||
|
||
def register_receiver_methods(greeters_room: rtc.Room, math_genius_room: rtc.Room): | ||
@greeters_room.local_participant.register_rpc_method("arrival") | ||
async def arrival_method( | ||
data: RpcInvocationData, | ||
): | ||
print(f'[Greeter] Oh {data.caller_identity} arrived and said "{data.payload}"') | ||
await asyncio.sleep(2) | ||
return "Welcome and have a wonderful day!" | ||
|
||
@math_genius_room.local_participant.register_rpc_method("square-root") | ||
async def square_root_method( | ||
data: RpcInvocationData, | ||
): | ||
json_data = json.loads(data.payload) | ||
number = json_data["number"] | ||
print( | ||
f"[Math Genius] I guess {data.caller_identity} wants the square root of {number}. I've only got {data.response_timeout} seconds to respond but I think I can pull it off." | ||
) | ||
|
||
print("[Math Genius] *doing math*…") | ||
await asyncio.sleep(2) | ||
|
||
result = number**0.5 | ||
print(f"[Math Genius] Aha! It's {result}") | ||
return json.dumps({"result": result}) | ||
|
||
@math_genius_room.local_participant.register_rpc_method("divide") | ||
async def divide_method( | ||
data: RpcInvocationData, | ||
): | ||
json_data = json.loads(data.payload) | ||
dividend = json_data["dividend"] | ||
divisor = json_data["divisor"] | ||
print(f"[Math Genius] {data.caller_identity} wants to divide {dividend} by {divisor}.") | ||
|
||
result = dividend / divisor | ||
return json.dumps({"result": result}) | ||
|
||
@math_genius_room.local_participant.register_rpc_method("long-calculation") | ||
async def long_calculation_method( | ||
data: RpcInvocationData, | ||
): | ||
print(f"[Math Genius] Starting a very long calculation for {data.caller_identity}") | ||
print( | ||
f"[Math Genius] This will take 30 seconds even though you're only giving me {data.response_timeout} seconds" | ||
) | ||
await asyncio.sleep(30) | ||
return json.dumps({"result": "Calculation complete!"}) | ||
|
||
|
||
async def perform_greeting(room: rtc.Room): | ||
print("[Caller] Letting the greeter know that I've arrived") | ||
try: | ||
response = await room.local_participant.perform_rpc( | ||
destination_identity="greeter", method="arrival", payload="Hello" | ||
) | ||
print(f'[Caller] That\'s nice, the greeter said: "{response}"') | ||
except Exception as error: | ||
print(f"[Caller] RPC call failed: {error}") | ||
raise | ||
|
||
|
||
async def perform_square_root(room: rtc.Room): | ||
print("[Caller] What's the square root of 16?") | ||
try: | ||
response = await room.local_participant.perform_rpc( | ||
destination_identity="math-genius", | ||
method="square-root", | ||
payload=json.dumps({"number": 16}), | ||
) | ||
parsed_response = json.loads(response) | ||
print(f"[Caller] Nice, the answer was {parsed_response['result']}") | ||
except Exception as error: | ||
print(f"[Caller] RPC call failed: {error}") | ||
raise | ||
|
||
|
||
async def perform_quantum_hypergeometric_series(room: rtc.Room): | ||
print("[Caller] What's the quantum hypergeometric series of 42?") | ||
try: | ||
response = await room.local_participant.perform_rpc( | ||
destination_identity="math-genius", | ||
method="quantum-hypergeometric-series", | ||
payload=json.dumps({"number": 42}), | ||
) | ||
parsed_response = json.loads(response) | ||
print(f"[Caller] genius says {parsed_response['result']}!") | ||
except rtc.RpcError as error: | ||
if error.code == rtc.RpcError.ErrorCode.UNSUPPORTED_METHOD: | ||
print("[Caller] Aww looks like the genius doesn't know that one.") | ||
return | ||
print("[Caller] Unexpected error:", error) | ||
raise | ||
except Exception as error: | ||
print("[Caller] Unexpected error:", error) | ||
raise | ||
|
||
|
||
async def perform_divide(room: rtc.Room): | ||
print("[Caller] Let's divide 10 by 0.") | ||
try: | ||
response = await room.local_participant.perform_rpc( | ||
destination_identity="math-genius", | ||
method="divide", | ||
payload=json.dumps({"dividend": 10, "divisor": 0}), | ||
) | ||
parsed_response = json.loads(response) | ||
print(f"[Caller] The result is {parsed_response['result']}") | ||
except rtc.RpcError as error: | ||
if error.code == rtc.RpcError.ErrorCode.APPLICATION_ERROR: | ||
print("[Caller] Aww something went wrong with that one, lets try something else.") | ||
else: | ||
print(f"[Caller] RPC call failed with unexpected RpcError: {error}") | ||
except Exception as error: | ||
print(f"[Caller] RPC call failed with unexpected error: {error}") | ||
|
||
|
||
async def perform_long_calculation(room: rtc.Room): | ||
print("[Caller] Giving the math genius 10s to complete a long calculation") | ||
try: | ||
response = await room.local_participant.perform_rpc( | ||
destination_identity="math-genius", | ||
method="long-calculation", | ||
payload=json.dumps({}), | ||
response_timeout=10, | ||
) | ||
parsed_response = json.loads(response) | ||
print(f"[Caller] Result: {parsed_response['result']}") | ||
except rtc.RpcError as error: | ||
if error.code == rtc.RpcError.ErrorCode.RESPONSE_TIMEOUT: | ||
print("[Caller] Math genius took too long to respond") | ||
elif error.code == rtc.RpcError.ErrorCode.RECIPIENT_DISCONNECTED: | ||
print("[Caller] Math genius disconnected before response was received") | ||
else: | ||
print(f"[Caller] Unexpected RPC error: {error}") | ||
except Exception as error: | ||
print(f"[Caller] Unexpected error: {error}") | ||
|
||
|
||
def create_token(identity: str, room_name: str): | ||
token = ( | ||
api.AccessToken(LIVEKIT_API_KEY, LIVEKIT_API_SECRET) | ||
.with_identity(identity) | ||
.with_grants( | ||
api.VideoGrants( | ||
room=room_name, | ||
room_join=True, | ||
can_publish=True, | ||
can_subscribe=True, | ||
) | ||
) | ||
) | ||
return token.to_jwt() | ||
|
||
|
||
async def connect_participant(identity: str, room_name: str) -> rtc.Room: | ||
room = rtc.Room() | ||
token = create_token(identity, room_name) | ||
|
||
def on_disconnected(reason: str): | ||
print(f"[{identity}] Disconnected from room: {reason}") | ||
|
||
room.on("disconnected", on_disconnected) | ||
|
||
await room.connect(LIVEKIT_URL, token) | ||
|
||
async def wait_for_participants(): | ||
if room.remote_participants: | ||
return | ||
participant_connected = asyncio.Event() | ||
|
||
def _on_participant_connected(participant: rtc.RemoteParticipant): | ||
room.off("participant_connected", _on_participant_connected) | ||
participant_connected.set() | ||
|
||
room.on("participant_connected", _on_participant_connected) | ||
await participant_connected.wait() | ||
|
||
try: | ||
await asyncio.wait_for(wait_for_participants(), timeout=5.0) | ||
except asyncio.TimeoutError: | ||
raise TimeoutError("Timed out waiting for participants") | ||
|
||
return room | ||
|
||
|
||
if __name__ == "__main__": | ||
try: | ||
asyncio.run(main()) | ||
except KeyboardInterrupt: | ||
print("\nProgram terminated by user") |
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.