Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 9 additions & 9 deletions crates/client-api-messages/src/energy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,21 +121,21 @@ impl fmt::Debug for EnergyBalance {
}
}

/// A measure of energy representing the energy budget for a reducer.
/// A measure of energy representing the energy budget for a reducer or any callable function.
///
/// In contrast to [`EnergyQuanta`], this is represented by a 64-bit integer. This makes energy handling
/// for reducers easier, while still providing a unlikely-to-ever-be-reached maximum value (e.g. for wasmtime:
/// `(u64::MAX eV / 1000 eV/instruction) * 3 ns/instruction = 640 days`)
#[derive(Copy, Clone, From, Add, Sub)]
pub struct ReducerBudget(u64);
pub struct FunctionBudget(u64);

impl ReducerBudget {
impl FunctionBudget {
// 1 second of wasm runtime is roughly 2 TeV, so this is
// roughly 1 minute of wasm runtime
pub const DEFAULT_BUDGET: Self = ReducerBudget(120_000_000_000_000);
pub const DEFAULT_BUDGET: Self = FunctionBudget(120_000_000_000_000);

pub const ZERO: Self = ReducerBudget(0);
pub const MAX: Self = ReducerBudget(u64::MAX);
pub const ZERO: Self = FunctionBudget(0);
pub const MAX: Self = FunctionBudget(u64::MAX);

pub fn new(v: u64) -> Self {
Self(v)
Expand All @@ -151,13 +151,13 @@ impl ReducerBudget {
}
}

impl From<ReducerBudget> for EnergyQuanta {
fn from(value: ReducerBudget) -> Self {
impl From<FunctionBudget> for EnergyQuanta {
fn from(value: FunctionBudget) -> Self {
EnergyQuanta::new(value.0.into())
}
}

impl fmt::Debug for ReducerBudget {
impl fmt::Debug for FunctionBudget {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_tuple("ReducerBudget")
.field(&EnergyQuanta::from(*self))
Expand Down
98 changes: 46 additions & 52 deletions crates/client-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,57 +78,52 @@ impl Host {

let (tx_offset, durable_offset, json) = self
.host_controller
.using_database(
database,
self.replica_id,
move |db| -> axum::response::Result<_, (StatusCode, String)> {
tracing::info!(sql = body);

// We need a header for query results
let mut header = vec![];

let sql_start = std::time::Instant::now();
let sql_span =
tracing::trace_span!("execute_sql", total_duration = tracing::field::Empty,).entered();

let result = sql::execute::run(
// Returns an empty result set for mutations
db,
&body,
auth,
Some(&module_host.info().subscriptions),
&mut header,
)
.map_err(|e| {
log::warn!("{e}");
if let Some(auth_err) = e.get_auth_error() {
(StatusCode::UNAUTHORIZED, auth_err.to_string())
} else {
(StatusCode::BAD_REQUEST, e.to_string())
}
})?;

let total_duration = sql_start.elapsed();
sql_span.record("total_duration", tracing::field::debug(total_duration));

// Turn the header into a `ProductType`
let schema = header
.into_iter()
.map(|(col_name, col_type)| ProductTypeElement::new(col_type, Some(col_name)))
.collect();

Ok((
result.tx_offset,
db.durable_tx_offset(),
vec![SqlStmtResult {
schema,
rows: result.rows,
total_duration_micros: total_duration.as_micros() as u64,
stats: SqlStmtStats::from_metrics(&result.metrics),
}],
))
},
)
.using_database(database, self.replica_id, move |db| async move {
tracing::info!(sql = body);
let mut header = vec![];
let sql_start = std::time::Instant::now();
let sql_span = tracing::trace_span!("execute_sql", total_duration = tracing::field::Empty,);
let _guard = sql_span.enter();

let result = sql::execute::run(
&db,
&body,
auth,
Some(&module_host.info.subscriptions),
Some(&module_host),
auth.caller,
&mut header,
)
.await
.map_err(|e| {
log::warn!("{e}");
if let Some(auth_err) = e.get_auth_error() {
(StatusCode::UNAUTHORIZED, auth_err.to_string())
} else {
(StatusCode::BAD_REQUEST, e.to_string())
}
})?;

let total_duration = sql_start.elapsed();
drop(_guard);
sql_span.record("total_duration", tracing::field::debug(total_duration));

let schema = header
.into_iter()
.map(|(col_name, col_type)| ProductTypeElement::new(col_type, Some(col_name)))
.collect();

Ok::<_, (StatusCode, String)>((
result.tx_offset,
db.durable_tx_offset(),
vec![SqlStmtResult {
schema,
rows: result.rows,
total_duration_micros: total_duration.as_micros() as u64,
stats: SqlStmtStats::from_metrics(&result.metrics),
}],
))
})
.await
.map_err(log_and_500)??;

Expand All @@ -154,7 +149,6 @@ impl Host {
.await
}
}

/// Parameters for publishing a database.
///
/// See [`ControlStateDelegate::publish_database`].
Expand Down
Loading
Loading