Skip to content

Commit 2dfe780

Browse files
committed
[bfops/bump-version]: Merge remote-tracking branch 'origin/master' into bfops/bump-version
2 parents 43f28ea + 2c74f73 commit 2dfe780

File tree

42 files changed

+1954
-429
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+1954
-429
lines changed

Cargo.lock

Lines changed: 34 additions & 8 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,7 @@ tokio = { version = "1.37", features = ["full"] }
268268
tokio_metrics = { version = "0.4.0" }
269269
tokio-postgres = { version = "0.7.8", features = ["with-chrono-0_4"] }
270270
tokio-stream = "0.1.17"
271-
tokio-tungstenite = { version = "0.27.0", features = ["native-tls"] }
271+
tokio-tungstenite = { version = "0.27.0", features = ["native-tls", "url"] }
272272
tokio-util = { version = "0.7.4", features = ["time"] }
273273
toml = "0.8"
274274
toml_edit = "0.22.22"

crates/cli/src/common_args.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,3 +29,11 @@ pub fn yes() -> Arg {
2929
.action(SetTrue)
3030
.help("Run non-interactively wherever possible. This will answer \"yes\" to almost all prompts, but will sometimes answer \"no\" to preserve non-interactivity (e.g. when prompting whether to log in with spacetimedb.com).")
3131
}
32+
33+
pub fn confirmed() -> Arg {
34+
Arg::new("confirmed")
35+
.required(false)
36+
.long("confirmed")
37+
.action(SetTrue)
38+
.help("Instruct the server to deliver only updates of confirmed transactions")
39+
}

crates/cli/src/main.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::process::ExitCode;
33
use clap::{Arg, Command};
44
use spacetimedb_cli::*;
55
use spacetimedb_paths::cli::CliTomlPath;
6-
use spacetimedb_paths::{RootDir, SpacetimePaths};
6+
use spacetimedb_paths::RootDir;
77

88
// Note that the standalone server is invoked through standaline/src/main.rs, so you will
99
// also want to set the allocator there.
@@ -24,6 +24,8 @@ static GLOBAL: MiMalloc = MiMalloc;
2424
#[cfg(not(feature = "markdown-docs"))]
2525
#[tokio::main]
2626
async fn main() -> anyhow::Result<ExitCode> {
27+
use spacetimedb_paths::SpacetimePaths;
28+
2729
// Compute matches before loading the config, because `Config` has an observable `drop` method
2830
// (which deletes a lockfile),
2931
// and Clap calls `exit` on parse failure rather than panicking, so destructors never run.

crates/cli/src/subcommands/sql.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ pub fn cli() -> clap::Command {
3434
.conflicts_with("query")
3535
.help("Instead of using a query, run an interactive command prompt for `SQL` expressions"),
3636
)
37+
.arg(common_args::confirmed())
3738
.arg(common_args::anonymous())
3839
.arg(common_args::server().help("The nickname, host name or URL of the server hosting the database"))
3940
.arg(common_args::yes())
@@ -178,11 +179,15 @@ pub async fn exec(config: Config, args: &ArgMatches) -> Result<(), anyhow::Error
178179
crate::repl::exec(con).await?;
179180
} else {
180181
let query = args.get_one::<String>("query").unwrap();
182+
let confirmed = args.get_flag("confirmed");
181183

182184
let con = parse_req(config, args).await?;
183-
let api = ClientApi::new(con);
185+
let mut api = ClientApi::new(con).sql();
186+
if confirmed {
187+
api = api.query(&[("confirmed", "true")]);
188+
}
184189

185-
run_sql(api.sql(), query, false).await?;
190+
run_sql(api, query, false).await?;
186191
}
187192
Ok(())
188193
}

crates/cli/src/subcommands/subscribe.rs

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use anyhow::Context;
22
use clap::{value_parser, Arg, ArgAction, ArgMatches};
33
use futures::{Sink, SinkExt, TryStream, TryStreamExt};
44
use http::header;
5-
use http::uri::Scheme;
5+
use reqwest::Url;
66
use serde_json::Value;
77
use spacetimedb_client_api_messages::websocket::{self as ws, JsonFormat};
88
use spacetimedb_data_structures::map::HashMap;
@@ -65,6 +65,7 @@ pub fn cli() -> clap::Command {
6565
.action(ArgAction::SetTrue)
6666
.help("Print the initial update for the queries."),
6767
)
68+
.arg(common_args::confirmed())
6869
.arg(common_args::anonymous())
6970
.arg(common_args::yes())
7071
.arg(common_args::server().help("The nickname, host name or URL of the server hosting the database"))
@@ -130,25 +131,26 @@ pub async fn exec(config: Config, args: &ArgMatches) -> Result<(), anyhow::Error
130131
let num = args.get_one::<u32>("num-updates").copied();
131132
let timeout = args.get_one::<u32>("timeout").copied();
132133
let print_initial_update = args.get_flag("print_initial_update");
134+
let confirmed = args.get_flag("confirmed");
133135

134136
let conn = parse_req(config, args).await?;
135137
let api = ClientApi::new(conn);
136138
let module_def = api.module_def().await?;
137139

140+
let mut url = Url::parse(&api.con.db_uri("subscribe"))?;
138141
// Change the URI scheme from `http(s)` to `ws(s)`.
139-
let mut uri = http::Uri::try_from(api.con.db_uri("subscribe"))?.into_parts();
140-
uri.scheme = uri.scheme.map(|s| {
141-
if s == Scheme::HTTP {
142-
"ws".parse().unwrap()
143-
} else if s == Scheme::HTTPS {
144-
"wss".parse().unwrap()
145-
} else {
146-
s
147-
}
148-
});
142+
url.set_scheme(match url.scheme() {
143+
"http" => "ws",
144+
"https" => "wss",
145+
unknown => unreachable!("Invalid URL scheme in `Connection::db_uri`: {unknown}"),
146+
})
147+
.unwrap();
148+
if confirmed {
149+
url.query_pairs_mut().append_pair("confirmed", "true");
150+
}
149151

150152
// Create the websocket request.
151-
let mut req = http::Uri::from_parts(uri)?.into_client_request()?;
153+
let mut req = url.into_client_request()?;
152154
req.headers_mut().insert(
153155
header::SEC_WEBSOCKET_PROTOCOL,
154156
http::HeaderValue::from_static(ws::TEXT_PROTOCOL),

crates/client-api-messages/src/name.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,27 @@ pub enum PublishResult {
106106
PermissionDenied { name: DatabaseName },
107107
}
108108

109+
#[derive(serde::Serialize, serde::Deserialize, Debug, Default)]
110+
pub enum MigrationPolicy {
111+
#[default]
112+
Compatible,
113+
BreakClients,
114+
}
115+
116+
#[derive(serde::Serialize, serde::Deserialize, Debug, Default)]
117+
pub enum PrettyPrintStyle {
118+
#[default]
119+
AnsiColor,
120+
NoColor,
121+
}
122+
123+
#[derive(serde::Serialize, serde::Deserialize, Debug)]
124+
pub struct PrintPlanResult {
125+
pub migrate_plan: Box<str>,
126+
pub break_clients: bool,
127+
pub token: spacetimedb_lib::Hash,
128+
}
129+
109130
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
110131
pub enum DnsLookupResponse {
111132
/// The lookup was successful and the domain and identity are returned.

crates/client-api/src/lib.rs

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,15 @@ use http::StatusCode;
77

88
use spacetimedb::client::ClientActorIndex;
99
use spacetimedb::energy::{EnergyBalance, EnergyQuanta};
10-
use spacetimedb::host::{HostController, ModuleHost, NoSuchModule, UpdateDatabaseResult};
10+
use spacetimedb::host::{HostController, MigratePlanResult, ModuleHost, NoSuchModule, UpdateDatabaseResult};
1111
use spacetimedb::identity::{AuthCtx, Identity};
1212
use spacetimedb::messages::control_db::{Database, HostType, Node, Replica};
1313
use spacetimedb::sql;
1414
use spacetimedb_client_api_messages::http::{SqlStmtResult, SqlStmtStats};
1515
use spacetimedb_client_api_messages::name::{DomainName, InsertDomainResult, RegisterTldResult, SetDomainsResult, Tld};
1616
use spacetimedb_lib::{ProductTypeElement, ProductValue};
1717
use spacetimedb_paths::server::ModuleLogsDir;
18+
use spacetimedb_schema::auto_migrate::{MigrationPolicy, PrettyPrintStyle};
1819
use tokio::sync::watch;
1920

2021
pub mod auth;
@@ -67,14 +68,15 @@ impl Host {
6768
&self,
6869
auth: AuthCtx,
6970
database: Database,
71+
confirmed_read: bool,
7072
body: String,
7173
) -> axum::response::Result<Vec<SqlStmtResult<ProductValue>>> {
7274
let module_host = self
7375
.module()
7476
.await
7577
.map_err(|_| (StatusCode::NOT_FOUND, "module not found".to_string()))?;
7678

77-
let json = self
79+
let (tx_offset, durable_offset, json) = self
7880
.host_controller
7981
.using_database(
8082
database,
@@ -115,17 +117,28 @@ impl Host {
115117
.map(|(col_name, col_type)| ProductTypeElement::new(col_type, Some(col_name)))
116118
.collect();
117119

118-
Ok(vec![SqlStmtResult {
119-
schema,
120-
rows: result.rows,
121-
total_duration_micros: total_duration.as_micros() as u64,
122-
stats: SqlStmtStats::from_metrics(&result.metrics),
123-
}])
120+
Ok((
121+
result.tx_offset,
122+
db.durable_tx_offset(),
123+
vec![SqlStmtResult {
124+
schema,
125+
rows: result.rows,
126+
total_duration_micros: total_duration.as_micros() as u64,
127+
stats: SqlStmtStats::from_metrics(&result.metrics),
128+
}],
129+
))
124130
},
125131
)
126132
.await
127133
.map_err(log_and_500)??;
128134

135+
if confirmed_read {
136+
if let Some(mut durable_offset) = durable_offset {
137+
let tx_offset = tx_offset.await.map_err(|_| log_and_500("transaction aborted"))?;
138+
durable_offset.wait_for(tx_offset).await.map_err(log_and_500)?;
139+
}
140+
}
141+
129142
Ok(json)
130143
}
131144

@@ -134,9 +147,10 @@ impl Host {
134147
database: Database,
135148
host_type: HostType,
136149
program_bytes: Box<[u8]>,
150+
policy: MigrationPolicy,
137151
) -> anyhow::Result<UpdateDatabaseResult> {
138152
self.host_controller
139-
.update_module_host(database, host_type, self.replica_id, program_bytes)
153+
.update_module_host(database, host_type, self.replica_id, program_bytes, policy)
140154
.await
141155
}
142156
}
@@ -219,8 +233,11 @@ pub trait ControlStateWriteAccess: Send + Sync {
219233
&self,
220234
publisher: &Identity,
221235
spec: DatabaseDef,
236+
policy: MigrationPolicy,
222237
) -> anyhow::Result<Option<UpdateDatabaseResult>>;
223238

239+
async fn migrate_plan(&self, spec: DatabaseDef, style: PrettyPrintStyle) -> anyhow::Result<MigratePlanResult>;
240+
224241
async fn delete_database(&self, caller_identity: &Identity, database_identity: &Identity) -> anyhow::Result<()>;
225242

226243
// Energy
@@ -309,8 +326,13 @@ impl<T: ControlStateWriteAccess + ?Sized> ControlStateWriteAccess for Arc<T> {
309326
&self,
310327
identity: &Identity,
311328
spec: DatabaseDef,
329+
policy: MigrationPolicy,
312330
) -> anyhow::Result<Option<UpdateDatabaseResult>> {
313-
(**self).publish_database(identity, spec).await
331+
(**self).publish_database(identity, spec, policy).await
332+
}
333+
334+
async fn migrate_plan(&self, spec: DatabaseDef, style: PrettyPrintStyle) -> anyhow::Result<MigratePlanResult> {
335+
(**self).migrate_plan(spec, style).await
314336
}
315337

316338
async fn delete_database(&self, caller_identity: &Identity, database_identity: &Identity) -> anyhow::Result<()> {

0 commit comments

Comments
 (0)