Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
316 changes: 316 additions & 0 deletions python/templates/python/.cursor/rules/restate.mdc
Original file line number Diff line number Diff line change
@@ -0,0 +1,316 @@
---
alwaysApply: true
---
rules:
- id: restate-python-determinism-overview
message: |
Restate services must be deterministic to ensure correct recovery and replay. This means the code must produce the same result every time it is executed with the same input.
Any operation that interacts with the outside world or relies on internal, non-constant state (like I/O, system clock, or random number generators) is non-deterministic and must be managed through the Restate Context (`ctx`).
kind: "info"
level: "info"

- id: restate-python-no-direct-random
message: |
Do not use Python's standard `random` module. Its results are non-deterministic and will break replay logic, causing unpredictable behavior during recovery. Restate cannot journal the random numbers it produces.

Always use the `ctx.random()` object to generate deterministic random numbers. Restate journals the generated number, ensuring that during a replay, the exact same "random" number is produced, preserving deterministic execution.
kind: "problem"
level: "error"
bad: |
import random
from restate_sdk import ObjectContext

async def my_handler(ctx: ObjectContext):
# BAD: Non-deterministic, breaks replay
user_id = random.randint(1, 1000)
...
good: |
from restate_sdk import ObjectContext

async def my_handler(ctx: ObjectContext):
# GOOD: Deterministic and safe for replay
user_id = ctx.random().randint(1, 1000)
...

- id: restate-python-no-direct-time
message: |
Do not use `datetime.now()`, `time.time()`, or other standard library functions to get the current time. These are non-deterministic and will return different values during replay than during the original execution, leading to incorrect logic.

Always use `ctx.time()` to get a deterministic representation of the current time. Restate records the timestamp on the first execution and provides the same recorded value during any subsequent replays.
kind: "problem"
level: "error"
bad: |
import datetime
from restate_sdk import ObjectContext

async def my_handler(ctx: ObjectContext):
# BAD: Non-deterministic, breaks replay
created_at = datetime.datetime.now(datetime.UTC)
...
good: |
from restate_sdk import ObjectContext

async def my_handler(ctx: ObjectContext):
# GOOD: Deterministic and safe for replay
created_at = ctx.time()
...

- id: restate-python-unsafe-code-side-effect
message: |
Code that interacts with external systems (e.g., calling a third-party API, querying a database) is a non-deterministic side effect. If not managed by Restate, it might be re-executed during replay, leading to duplicate actions and inconsistent state.

Wrap these operations in `ctx.run_typed()` (also known as a "side effect"). Restate journals the result of the `ctx.run_typed()` block after its first successful execution. During a replay, Restate will not re-execute the code inside `ctx.run_typed()`; instead, it will return the journaled result directly. This guarantees the side effect happens exactly once.
kind: "problem"
level: "error"
bad: |
import httpx
from restate_sdk import ObjectContext

async def my_handler(ctx: ObjectContext, order_id: str):
# BAD: API call is not wrapped, will be re-executed on recovery
async with httpx.AsyncClient() as client:
payment_response = await client.post("https://api.stripe.com/v1/charges", json={...})
...
good: |
import httpx
from restate_sdk import ObjectContext

async def charge_credit_card(order_id: str) -> dict:
async with httpx.AsyncClient() as client:
payment_response = await client.post("https://api.stripe.com/v1/charges", json={...})
payment_response.raise_for_status()
return payment_response.json()

async def my_handler(ctx: ObjectContext, order_id: str):
# GOOD: The external API call is wrapped in ctx.run_typed()
# This ensures it executes only once.
payment_result = await ctx.run_typed("charge_credit_card", charge_credit_card, order_id=order_id)
...

- id: restate-python-terminal-error
message: |
By default, Restate retries failed invocations indefinitely with exponential backoff. This is ideal for transient errors (e.g., network issues, temporary service unavailability). However, for permanent, business-logic failures (e.g., invalid input, payment declined), retrying is pointless and can hide the root cause.

Throw a `restate_sdk.TerminalError` for failures that should not be retried. This immediately stops the invocation, marks it as failed, and propagates the error to the caller. Use this to handle business rule violations and unrecoverable errors.
kind: "problem"
level: "warning"
bad: |
from restate_sdk import ObjectContext

async def my_handler(ctx: ObjectContext, amount: float):
# BAD: A business logic failure raises a generic exception,
# causing infinite retries.
if amount < 0:
raise ValueError("Amount cannot be negative.")
...
good: |
from restate_sdk import ObjectContext, TerminalError

async def my_handler(ctx: ObjectContext, amount: float):
# GOOD: A TerminalError is thrown for a business logic failure,
# preventing useless retries.
if amount < 0:
raise TerminalError("Amount cannot be negative.", status_code=400)
...

- id: restate-python-serialization-pydantic
message: |
While Restate's default JSON serialization works for basic types, it lacks type safety for complex objects. Using dictionaries for state or handler inputs/outputs can lead to runtime errors if the data structure changes or is incorrect.

Use Pydantic models (or Python dataclasses) for handler inputs, outputs, and state. The Restate SDK automatically handles serialization and deserialization, providing static type checking, data validation, and clearer code. Make sure to install the SDK with the `serde` extra: `pip install "restate-sdk[serde]"`.
kind: "problem"
level: "warning"
bad: |
from restate_sdk import ObjectContext

async def my_handler(ctx: ObjectContext, user_data: dict):
# BAD: Using a dict provides no type safety.
# Is "email" guaranteed to be present? What is its type?
email = user_data.get("email")
await ctx.set("user_profile", user_data)
...
good: |
from pydantic import BaseModel
from restate_sdk import ObjectContext

class UserProfile(BaseModel):
username: str
email: str

async def my_handler(ctx: ObjectContext, user: UserProfile) -> UserProfile:
# GOOD: Pydantic model provides validation and type hints.
# Accessing user.email is safe.
await ctx.set("user_profile", user)
profile = await ctx.get("user_profile", type_hint=UserProfile)
return profile

- id: restate-python-virtual-object-state
message: |
Do not use global variables or in-memory instance variables within a Virtual Object class to store state across invocations. The service can be scaled horizontally, and there is no guarantee that subsequent invocations for the same key will land on the same instance. This leads to data loss and inconsistency.

Always use `ctx.get()` and `ctx.set()` within a Virtual Object handler (`ObjectContext`) to manage its state durably. Restate ensures that this state is correctly scoped to the object's key and is persisted across failures and invocations.
kind: "problem"
level: "error"
bad: |
from restate_sdk import ObjectContext, VirtualObject

counter_object = VirtualObject("Counter")

# BAD: This state is not durable and not shared across replicas.
in_memory_counters = {}

@counter_object.handler()
async def increment(ctx: ObjectContext, by: int):
key = ctx.key()
current_value = in_memory_counters.get(key, 0)
in_memory_counters[key] = current_value + by
good: |
from restate_sdk import ObjectContext, VirtualObject

counter_object = VirtualObject("Counter")

@counter_object.handler()
async def increment(ctx: ObjectContext, by: int):
# GOOD: State is managed by Restate and is durable.
current_value = await ctx.get("count", type_hint=int) or 0
await ctx.set("count", current_value + by)

- id: restate-python-ctx-sleep
message: |
Do not use `sleep` or `asyncio.sleep` instead consider using `ctx.sleep(timedelta(...))` as this would use restate's durable timer.
Always use `ctx.sleep()` outside of a `ctx.run_typed` block.
kind: "problem"
level: "warning"
bad: |
import asyncio
from restate_sdk import ObjectContext

async def my_handler(ctx: ObjectContext):
# BAD: This will not be durable and will not survive process restarts
await asyncio.sleep(5)
...
good: |
from datetime import timedelta
from restate_sdk import ObjectContext

async def my_handler(ctx: ObjectContext):
# GOOD: This uses Restate's durable timer and is safe for replay
await ctx.sleep(timedelta(seconds=5))
...

- id: restate-python-concurrency-overview
message: |
Restate provides deterministic concurrency primitives. Always use the Restate combinators (`restate.select`, `restate.wait_completed`, `restate.gather`, `restate.as_completed`) instead of Python's native `asyncio` or threading constructs: those are non-deterministic from Restate's replay/journal perspective.
These combinators operate on `RestateDurableFuture` objects and journal the order of completion, so recovery and replay produce the same sequence of events.
kind: "info"
level: "info"

- id: restate-python-use-select
message: |
Use `restate.select()` to race multiple operations and handle whichever completes first.
`select` returns immediately when the first future completes and yields a labeled result that you should pattern-match against.
Use `select` for timeouts and waiting for external confirmations.
kind: "problem"
level: "error"
bad: |
# BAD: using asyncio.wait (non-deterministic outside Restate)
import asyncio
async def handler(ctx):
claude = ctx.service_call(claude_sonnet, arg="What is the weather?")
openai = ctx.service_call(open_ai, arg="What is the weather?")
done, pending = await asyncio.wait([claude, openai], return_when=asyncio.FIRST_COMPLETED)
# This will not be journaled deterministically by Restate
good: |
# GOOD: using restate.select (deterministic and journaled)
_, confirmation_future = ctx.awakeable(type_hint=str)
match await restate.select(
confirmation=confirmation_future, timeout=ctx.sleep(timedelta(days=1))
):
case ["confirmation", "ok"]:
return "success!"
case ["confirmation", "deny"]:
raise TerminalError("Confirmation was denied!")
case ["timeout", _]:
raise TerminalError("Verification timer expired!")

- id: restate-python-wait-completed
message: |
Use `restate.wait_completed()` when you want to wait until at least one provided future completes, and then inspect both completed and still-pending futures.
It returns two lists: `(pending, done)` where `done` are the completed futures and `pending` are the futures that have not finished.
This is useful when you want to immediately process completed results and optionally cancel or manage the remaining calls.
kind: "problem"
level: "warning"
bad: |
# BAD: assuming wait_completed returns a single future or blocking for all results
claude = ctx.service_call(claude_sonnet, arg="What is the weather?")
openai = ctx.service_call(open_ai, arg="What is the weather?")
# Incorrect assumption: expecting a single future rather than (pending, done)
done = await restate.wait_completed(claude, openai)
result = await done # Wrong: done is a list
good: |
# GOOD: handle both pending and done properly, and cancel if desired
claude = ctx.service_call(claude_sonnet, arg="What is the weather?")
openai = ctx.service_call(open_ai, arg="What is the weather?")
pending, done = await restate.wait_completed(claude, openai)
results = [await f for f in done]
for f in pending:
await f.cancel_invocation()

- id: restate-python-cancel-pending
message: |
After `wait_completed()` you may want to cancel any remaining pending `RestateDurableFuture` objects.
Use `await future.cancel_invocation()` to request cancellation; this operation is journaled and safe for replay.
Do not use native task cancellation mechanisms (like `task.cancel()`) on `RestateDurableFuture` objects.
kind: "problem"
level: "warning"
bad: |
# BAD: cancelling an underlying asyncio Task (not journaled / not safe)
task = asyncio.create_task(some_async_call())
task.cancel()
good: |
# GOOD: use the Restate cancellation API
pending, done = await restate.wait_completed(claude, openai)
for f in pending:
await f.cancel_invocation()

- id: restate-python-gather
message: |
Use `restate.gather()` when you need to wait for all provided futures to finish before proceeding.
This is the deterministic equivalent of `asyncio.gather()` for Restate futures.
kind: "info"
level: "info"
bad: |
# BAD: using asyncio.gather directly on Restate futures or native coroutines
results = await asyncio.gather(call1, call2)
good: |
# GOOD: use restate.gather to ensure results are journaled/deterministic
claude = ctx.service_call(claude_sonnet, arg="What is the weather?")
openai = ctx.service_call(open_ai, arg="What is the weather?")
results_done = await restate.gather(claude, openai)
results = [await result for result in results_done]

- id: restate-python-as-completed
message: |
Use `restate.as_completed()` to process Restate futures in the order they finish. This streams results one-by-one in completion order and is journaled by Restate.
Do not use `asyncio.as_completed()` on `RestateDurableFuture` objects.
kind: "problem"
level: "warning"
bad: |
# BAD: using asyncio.as_completed (not journaled)
for future in asyncio.as_completed([call1, call2]):
print(await future)
good: |
# GOOD: use restate.as_completed for deterministic streaming processing
call1 = ctx.run_typed("LLM call", call_llm, prompt="What is the weather?", model="gpt-4")
call2 = ctx.run_typed("LLM call", call_llm, prompt="What is the weather?", model="gpt-3.5-turbo")
async for future in restate.as_completed(call1, call2):
print(await future)

- id: restate-python-select-vs-waitcompleted
message: |
Key differences between `select` and `wait_completed`:
* `select` returns immediately when the first future completes and provides a labeled result suitable for pattern matching. Use it for racing and timeouts.
* `wait_completed` returns two lists `(pending, done)` after at least one completes, allowing you to inspect completed futures and manage or cancel pending ones.
Choose the primitive that matches your control-flow needs; misusing one can lead to incorrect assumptions about ordering or pending state.
kind: "info"
level: "info"
Loading