Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions CLI.md
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,16 @@ Client implementation and command-line tool for the Linera blockchain
Default value: `0.2`
* `--blob-download-timeout-ms <BLOB_DOWNLOAD_TIMEOUT>` — The delay when downloading a blob, after which we try a second validator, in milliseconds

Default value: `1000`
* `--cert-batch-download-timeout-ms <CERTIFICATE_BATCH_DOWNLOAD_TIMEOUT>` — The delay when downloading a batch of certificates, after which we try a second validator, in milliseconds

Default value: `1000`
* `--certificate-download-batch-size <CERTIFICATE_DOWNLOAD_BATCH_SIZE>` — Maximum number of certificates that we download at a time from one validator when synchronizing one of our chains

Default value: `500`
* `--sender-certificate-download-batch-size <SENDER_CERTIFICATE_DOWNLOAD_BATCH_SIZE>` — Maximum number of sender certificates we try to download and receive in one go when syncing sender chains

Default value: `20000`
* `--max-joined-tasks <MAX_JOINED_TASKS>` — Maximum number of tasks that can are joined concurrently in the client

Default value: `100`
Expand Down Expand Up @@ -747,9 +753,6 @@ Run a GraphQL service to explore and extend the chains of the wallet

Default value: `0`
* `--port <PORT>` — The port on which to run the server
* `--sync-sleep-ms <SYNC_SLEEP_MS>` — Milliseconds to sleep between batches during background certificate synchronization

Default value: `500`



Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions examples/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion linera-chain/src/data_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,9 @@ impl TransactionMetadata {
}

/// A chain ID with a block height.
#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize, SimpleObject)]
#[derive(
Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, SimpleObject,
)]
pub struct ChainAndHeight {
pub chain_id: ChainId,
pub height: BlockHeight,
Expand Down
2 changes: 1 addition & 1 deletion linera-client/src/benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ impl<Env: Environment> Benchmark<Env> {
let barrier = Arc::new(Barrier::new(num_chains + 1));

let chain_listener_result = chain_listener
.run(Some(500)) // Default sync_sleep_ms for benchmarks
.run(true) // Enabling background sync for benchmarks
.await;

let chain_listener_handle =
Expand Down
37 changes: 6 additions & 31 deletions linera-client/src/chain_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ impl<C: ClientContext + 'static> ChainListener<C> {
#[instrument(skip(self))]
pub async fn run(
mut self,
sync_sleep_ms: Option<u64>,
enable_background_sync: bool,
) -> Result<impl Future<Output = Result<(), Error>>, Error> {
let chain_ids = {
let guard = self.context.lock().await;
Expand All @@ -223,8 +223,8 @@ impl<C: ClientContext + 'static> ChainListener<C> {
};

// Start background tasks to sync received certificates for each chain,
// if enabled (sync_sleep_ms is Some).
if let Some(sync_sleep_ms) = sync_sleep_ms {
// if enabled.
if enable_background_sync {
let context = Arc::clone(&self.context);
let cancellation_token = self.cancellation_token.clone();
for chain_id in &chain_ids {
Expand All @@ -234,7 +234,6 @@ impl<C: ClientContext + 'static> ChainListener<C> {
drop(linera_base::task::spawn(async move {
if let Err(e) = Self::background_sync_received_certificates(
context,
sync_sleep_ms,
chain_id,
cancellation_token,
)
Expand Down Expand Up @@ -365,39 +364,15 @@ impl<C: ClientContext + 'static> ChainListener<C> {
#[instrument(skip(context, cancellation_token))]
async fn background_sync_received_certificates(
context: Arc<Mutex<C>>,
sync_sleep_ms: u64,
chain_id: ChainId,
cancellation_token: CancellationToken,
) -> Result<(), Error> {
info!("Starting background certificate sync for chain {chain_id}");
let client = context.lock().await.make_chain_client(chain_id);

loop {
// Check if we should stop.
if cancellation_token.is_cancelled() {
info!("Background sync cancelled for chain {chain_id}");
break;
}

// Sleep to avoid overwhelming the system.
Self::sleep(sync_sleep_ms).await;

// Sync one batch.
match client.sync_received_certificates_batch().await {
Ok(has_more) => {
if !has_more {
info!("Background sync completed for chain {chain_id}");
break;
}
}
Err(e) => {
warn!("Error syncing batch for chain {chain_id}: {e}. Will retry.");
// Continue trying despite errors - validators might be temporarily unavailable.
}
}
}

Ok(())
Ok(client
.find_received_certificates(Some(cancellation_token))
.await?)
}

/// Starts listening for notifications about the given chain.
Expand Down
23 changes: 21 additions & 2 deletions linera-client/src/client_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use linera_base::{
use linera_core::{
client::{
BlanketMessagePolicy, ChainClientOptions, MessagePolicy,
DEFAULT_CERTIFICATE_DOWNLOAD_BATCH_SIZE,
DEFAULT_CERTIFICATE_DOWNLOAD_BATCH_SIZE, DEFAULT_SENDER_CERTIFICATE_DOWNLOAD_BATCH_SIZE,
},
node::CrossChainMessageDelivery,
DEFAULT_GRACE_PERIOD,
Expand Down Expand Up @@ -84,7 +84,7 @@ pub struct ClientContextOptions {
/// The duration, in milliseconds, after which an idle sender chain worker will
/// free its memory.
#[arg(
long = "sender-chain-worker-ttl-ms",
long = "sender-chain-worker-ttl-ms",
default_value = "1000",
env = "LINERA_SENDER_CHAIN_WORKER_TTL_MS",
value_parser = util::parse_millis
Expand Down Expand Up @@ -158,6 +158,15 @@ pub struct ClientContextOptions {
)]
pub blob_download_timeout: Duration,

/// The delay when downloading a batch of certificates, after which we try a second validator,
/// in milliseconds.
#[arg(
long = "cert-batch-download-timeout-ms",
default_value = "1000",
value_parser = util::parse_millis
)]
pub certificate_batch_download_timeout: Duration,

/// Maximum number of certificates that we download at a time from one validator when
/// synchronizing one of our chains.
#[arg(
Expand All @@ -166,6 +175,14 @@ pub struct ClientContextOptions {
)]
pub certificate_download_batch_size: u64,

/// Maximum number of sender certificates we try to download and receive in one go
/// when syncing sender chains.
#[arg(
long,
default_value_t = DEFAULT_SENDER_CERTIFICATE_DOWNLOAD_BATCH_SIZE,
)]
pub sender_certificate_download_batch_size: usize,

/// Maximum number of tasks that can are joined concurrently in the client.
#[arg(long, default_value = "100")]
pub max_joined_tasks: usize,
Expand All @@ -186,7 +203,9 @@ impl ClientContextOptions {
cross_chain_message_delivery,
grace_period: self.grace_period,
blob_download_timeout: self.blob_download_timeout,
certificate_batch_download_timeout: self.certificate_batch_download_timeout,
certificate_download_batch_size: self.certificate_download_batch_size,
sender_certificate_download_batch_size: self.sender_certificate_download_batch_size,
max_joined_tasks: self.max_joined_tasks,
}
}
Expand Down
4 changes: 2 additions & 2 deletions linera-client/src/unit_tests/chain_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ async fn test_chain_listener() -> anyhow::Result<()> {
let cancellation_token = CancellationToken::new();
let child_token = cancellation_token.child_token();
let chain_listener = ChainListener::new(config, context, storage, child_token)
.run(None) // Unit test doesn't need background sync
.run(false) // Unit test doesn't need background sync
.await
.unwrap();

Expand Down Expand Up @@ -210,7 +210,7 @@ async fn test_chain_listener_admin_chain() -> anyhow::Result<()> {
let cancellation_token = CancellationToken::new();
let child_token = cancellation_token.child_token();
let chain_listener = ChainListener::new(config, context, storage.clone(), child_token)
.run(None) // Unit test doesn't need background sync
.run(false) // Unit test doesn't need background sync
.await
.unwrap();

Expand Down
1 change: 1 addition & 0 deletions linera-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ test-strategy = { workspace = true, optional = true }
thiserror.workspace = true
tokio.workspace = true
tokio-stream.workspace = true
tokio-util.workspace = true
tonic.workspace = true
tracing.workspace = true
trait-set = "0.3.0"
Expand Down
Loading
Loading