Skip to content
Open
8 changes: 4 additions & 4 deletions examples/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,15 @@ async def main():


def register_receiver_methods(greeters_room: rtc.Room, math_genius_room: rtc.Room):
@greeters_room.local_participant.register_rpc_method("arrival")
@greeters_room.register_rpc_method("arrival")
async def arrival_method(
data: RpcInvocationData,
):
print(f'[Greeter] Oh {data.caller_identity} arrived and said "{data.payload}"')
await asyncio.sleep(2)
return "Welcome and have a wonderful day!"

@math_genius_room.local_participant.register_rpc_method("square-root")
@math_genius_room.register_rpc_method("square-root")
async def square_root_method(
data: RpcInvocationData,
):
Expand All @@ -110,7 +110,7 @@ async def square_root_method(
print(f"[Math Genius] Aha! It's {result}")
return json.dumps({"result": result})

@math_genius_room.local_participant.register_rpc_method("divide")
@math_genius_room.register_rpc_method("divide")
async def divide_method(
data: RpcInvocationData,
):
Expand All @@ -122,7 +122,7 @@ async def divide_method(
result = dividend / divisor
return json.dumps({"result": result})

@math_genius_room.local_participant.register_rpc_method("long-calculation")
@math_genius_room.register_rpc_method("long-calculation")
async def long_calculation_method(
data: RpcInvocationData,
):
Expand Down
277 changes: 277 additions & 0 deletions examples/rpc_deprecated.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,277 @@
from livekit import rtc, api
import os
import json
import asyncio
from dotenv import load_dotenv
from livekit.rtc.rpc import RpcInvocationData

load_dotenv(dotenv_path=".env.local", override=False)
LIVEKIT_API_KEY = os.getenv("LIVEKIT_API_KEY")
LIVEKIT_API_SECRET = os.getenv("LIVEKIT_API_SECRET")
LIVEKIT_URL = os.getenv("LIVEKIT_URL")
if not LIVEKIT_API_KEY or not LIVEKIT_API_SECRET or not LIVEKIT_URL:
raise ValueError("Missing required environment variables. Please check your .env.local file.")


async def main():
rooms = [] # Keep track of all rooms for cleanup
try:
room_name = f"rpc-test-{os.urandom(4).hex()}"
print(f"Connecting participants to room: {room_name}")

callers_room, greeters_room, math_genius_room = await asyncio.gather(
connect_participant("caller", room_name),
connect_participant("greeter", room_name),
connect_participant("math-genius", room_name),
)
rooms = [callers_room, greeters_room, math_genius_room]

register_receiver_methods(greeters_room, math_genius_room)

try:
print("\n\nRunning greeting example...")
await asyncio.gather(perform_greeting(callers_room))
except Exception as error:
print("Error:", error)

try:
print("\n\nRunning error handling example...")
await perform_divide(callers_room)
except Exception as error:
print("Error:", error)

try:
print("\n\nRunning math example...")
await perform_square_root(callers_room)
await asyncio.sleep(2)
await perform_quantum_hypergeometric_series(callers_room)
except Exception as error:
print("Error:", error)

try:
print("\n\nRunning long calculation with timeout...")
await asyncio.create_task(perform_long_calculation(callers_room))
except Exception as error:
print("Error:", error)

try:
print("\n\nRunning long calculation with disconnect...")
# Start the long calculation
long_calc_task = asyncio.create_task(perform_long_calculation(callers_room))
# Wait a bit then disconnect the math genius
await asyncio.sleep(5)
print("\nDisconnecting math genius early...")
await math_genius_room.disconnect()
# Wait for the calculation to fail
await long_calc_task
except Exception as error:
print("Error:", error)

print("\n\nParticipants done, disconnecting remaining participants...")
await callers_room.disconnect()
await greeters_room.disconnect()

print("Participants disconnected. Example completed.")

except KeyboardInterrupt:
print("\nReceived interrupt signal, cleaning up...")
except Exception as e:
print(f"Unexpected error: {e}")
finally:
# Clean up all rooms
print("Disconnecting all participants...")
await asyncio.gather(*(room.disconnect() for room in rooms), return_exceptions=True)
print("Cleanup complete")


def register_receiver_methods(greeters_room: rtc.Room, math_genius_room: rtc.Room):
@greeters_room.local_participant.register_rpc_method("arrival")
async def arrival_method(
data: RpcInvocationData,
):
print(f'[Greeter] Oh {data.caller_identity} arrived and said "{data.payload}"')
await asyncio.sleep(2)
return "Welcome and have a wonderful day!"

@math_genius_room.local_participant.register_rpc_method("square-root")
async def square_root_method(
data: RpcInvocationData,
):
json_data = json.loads(data.payload)
number = json_data["number"]
print(
f"[Math Genius] I guess {data.caller_identity} wants the square root of {number}. I've only got {data.response_timeout} seconds to respond but I think I can pull it off."
)

print("[Math Genius] *doing math*…")
await asyncio.sleep(2)

result = number**0.5
print(f"[Math Genius] Aha! It's {result}")
return json.dumps({"result": result})

@math_genius_room.local_participant.register_rpc_method("divide")
async def divide_method(
data: RpcInvocationData,
):
json_data = json.loads(data.payload)
dividend = json_data["dividend"]
divisor = json_data["divisor"]
print(f"[Math Genius] {data.caller_identity} wants to divide {dividend} by {divisor}.")

result = dividend / divisor
return json.dumps({"result": result})

@math_genius_room.local_participant.register_rpc_method("long-calculation")
async def long_calculation_method(
data: RpcInvocationData,
):
print(f"[Math Genius] Starting a very long calculation for {data.caller_identity}")
print(
f"[Math Genius] This will take 30 seconds even though you're only giving me {data.response_timeout} seconds"
)
await asyncio.sleep(30)
return json.dumps({"result": "Calculation complete!"})


async def perform_greeting(room: rtc.Room):
print("[Caller] Letting the greeter know that I've arrived")
try:
response = await room.local_participant.perform_rpc(
destination_identity="greeter", method="arrival", payload="Hello"
)
print(f'[Caller] That\'s nice, the greeter said: "{response}"')
except Exception as error:
print(f"[Caller] RPC call failed: {error}")
raise


async def perform_square_root(room: rtc.Room):
print("[Caller] What's the square root of 16?")
try:
response = await room.local_participant.perform_rpc(
destination_identity="math-genius",
method="square-root",
payload=json.dumps({"number": 16}),
)
parsed_response = json.loads(response)
print(f"[Caller] Nice, the answer was {parsed_response['result']}")
except Exception as error:
print(f"[Caller] RPC call failed: {error}")
raise


async def perform_quantum_hypergeometric_series(room: rtc.Room):
print("[Caller] What's the quantum hypergeometric series of 42?")
try:
response = await room.local_participant.perform_rpc(
destination_identity="math-genius",
method="quantum-hypergeometric-series",
payload=json.dumps({"number": 42}),
)
parsed_response = json.loads(response)
print(f"[Caller] genius says {parsed_response['result']}!")
except rtc.RpcError as error:
if error.code == rtc.RpcError.ErrorCode.UNSUPPORTED_METHOD:
print("[Caller] Aww looks like the genius doesn't know that one.")
return
print("[Caller] Unexpected error:", error)
raise
except Exception as error:
print("[Caller] Unexpected error:", error)
raise


async def perform_divide(room: rtc.Room):
print("[Caller] Let's divide 10 by 0.")
try:
response = await room.local_participant.perform_rpc(
destination_identity="math-genius",
method="divide",
payload=json.dumps({"dividend": 10, "divisor": 0}),
)
parsed_response = json.loads(response)
print(f"[Caller] The result is {parsed_response['result']}")
except rtc.RpcError as error:
if error.code == rtc.RpcError.ErrorCode.APPLICATION_ERROR:
print("[Caller] Aww something went wrong with that one, lets try something else.")
else:
print(f"[Caller] RPC call failed with unexpected RpcError: {error}")
except Exception as error:
print(f"[Caller] RPC call failed with unexpected error: {error}")


async def perform_long_calculation(room: rtc.Room):
print("[Caller] Giving the math genius 10s to complete a long calculation")
try:
response = await room.local_participant.perform_rpc(
destination_identity="math-genius",
method="long-calculation",
payload=json.dumps({}),
response_timeout=10,
)
parsed_response = json.loads(response)
print(f"[Caller] Result: {parsed_response['result']}")
except rtc.RpcError as error:
if error.code == rtc.RpcError.ErrorCode.RESPONSE_TIMEOUT:
print("[Caller] Math genius took too long to respond")
elif error.code == rtc.RpcError.ErrorCode.RECIPIENT_DISCONNECTED:
print("[Caller] Math genius disconnected before response was received")
else:
print(f"[Caller] Unexpected RPC error: {error}")
except Exception as error:
print(f"[Caller] Unexpected error: {error}")


def create_token(identity: str, room_name: str):
token = (
api.AccessToken(LIVEKIT_API_KEY, LIVEKIT_API_SECRET)
.with_identity(identity)
.with_grants(
api.VideoGrants(
room=room_name,
room_join=True,
can_publish=True,
can_subscribe=True,
)
)
)
return token.to_jwt()


async def connect_participant(identity: str, room_name: str) -> rtc.Room:
room = rtc.Room()
token = create_token(identity, room_name)

def on_disconnected(reason: str):
print(f"[{identity}] Disconnected from room: {reason}")

room.on("disconnected", on_disconnected)

await room.connect(LIVEKIT_URL, token)

async def wait_for_participants():
if room.remote_participants:
return
participant_connected = asyncio.Event()

def _on_participant_connected(participant: rtc.RemoteParticipant):
room.off("participant_connected", _on_participant_connected)
participant_connected.set()

room.on("participant_connected", _on_participant_connected)
await participant_connected.wait()

try:
await asyncio.wait_for(wait_for_participants(), timeout=5.0)
except asyncio.TimeoutError:
raise TimeoutError("Timed out waiting for participants")

return room


if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\nProgram terminated by user")
36 changes: 18 additions & 18 deletions livekit-rtc/livekit/rtc/_proto/rpc_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading