Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion node/src/components/block_accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,8 @@ impl BlockAccumulator {
let block_timestamps = self.peer_block_timestamps.entry(sender).or_default();

// Prune the timestamps, so the count reflects only the most recently added acceptors.
let purge_interval = self.purge_interval;
// assume at least a 1 milli purge interval to avoid 0 purge interval mathing issues
let purge_interval = self.purge_interval.max(TimeDiff::from_millis(1));
while block_timestamps
.front()
.is_some_and(|(_, timestamp)| timestamp.elapsed() > purge_interval)
Expand Down
3 changes: 3 additions & 0 deletions node/src/components/consensus/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ pub struct Config {
/// The maximum number of blocks by which execution is allowed to lag behind finalization.
/// If it is more than that, consensus will pause, and resume once the executor has caught up.
pub max_execution_delay: u64,
/// The maximum time in millis to skip proposing an empty block.
pub empty_proposal_tolerance_interval: u64,
/// Highway-specific node configuration.
#[serde(default)]
pub highway: HighwayConfig,
Expand All @@ -39,6 +41,7 @@ impl Default for Config {
Config {
secret_key_path: External::Missing,
max_execution_delay: DEFAULT_MAX_EXECUTION_DELAY,
empty_proposal_tolerance_interval: u64::default(),
highway: HighwayConfig::default(),
zug: ZugConfig::default(),
}
Expand Down
68 changes: 57 additions & 11 deletions node/src/components/consensus/era_supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,13 @@
pub(super) mod debug;
mod era;

use anyhow::Error;
use datasize::DataSize;
use futures::{Future, FutureExt};
use itertools::Itertools;
use prometheus::Registry;
use rand::Rng;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::{
cmp,
collections::{BTreeMap, BTreeSet, HashMap},
Expand All @@ -20,22 +27,14 @@ use std::{
sync::Arc,
time::Duration,
};

use anyhow::Error;
use datasize::DataSize;
use futures::{Future, FutureExt};
use itertools::Itertools;
use prometheus::Registry;
use rand::Rng;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use tracing::{debug, error, info, trace, warn};

use casper_binary_port::{ConsensusStatus, ConsensusValidatorChanges};

use casper_types::{
Approval, AsymmetricType, BlockHash, BlockHeader, Chainspec, ConsensusProtocolName, Digest,
DisplayIter, EraId, PublicKey, RewardedSignatures, Timestamp, Transaction, TransactionHash,
ValidatorChange,
DisplayIter, EraId, PublicKey, RewardedSignatures, TimeDiff, Timestamp, Transaction,
TransactionHash, ValidatorChange,
};

use crate::{
Expand Down Expand Up @@ -76,6 +75,8 @@ use crate::{components::consensus::error::CreateNewEraError, types::InvalidPropo
const FTT_EXCEEDED_SHUTDOWN_DELAY_MILLIS: u64 = 60 * 1000;
/// A warning is printed if a timer is delayed by more than this.
const TIMER_DELAY_WARNING_MILLIS: u64 = 1000;
/// Maximum empty proposal tolerance multiple.
const MAXIMUM_EMPTY_PROPOSAL_TOLERANCE_MULTIPLE: u64 = 10;

/// The number of eras across which evidence can be cited.
/// If this is 1, you can cite evidence from the previous era, but not the one before that.
Expand Down Expand Up @@ -112,6 +113,10 @@ pub struct EraSupervisor {
next_block_height: u64,
/// The height of the next block to be executed. If this falls too far behind, we pause.
next_executed_height: u64,
/// The maximum time in millis to skip proposing an empty block.
empty_proposal_tolerance_interval_to_use: u64,
/// The last seen added block time.
last_block_time: Option<Timestamp>,
#[data_size(skip)]
metrics: Metrics,
/// The path to the folder where unit files will be stored.
Expand Down Expand Up @@ -145,15 +150,24 @@ impl EraSupervisor {
info!(our_id = %validator_matrix.public_signing_key(), "EraSupervisor pubkey",);
let metrics = Metrics::new(registry)?;

let empty_proposal_tolerance_interval_to_use = chainspec
.core_config
.minimum_block_time
.saturating_mul(MAXIMUM_EMPTY_PROPOSAL_TOLERANCE_MULTIPLE)
.millis()
.min(config.empty_proposal_tolerance_interval);

let era_supervisor = Self {
open_eras: Default::default(),
validator_matrix,
chainspec,
config,
next_block_height: 0,
last_block_time: None,
metrics,
unit_files_folder,
next_executed_height: 0,
empty_proposal_tolerance_interval_to_use,
last_progress: Timestamp::now(),
message_delay_failpoint: Failpoint::new("consensus.message_delay"),
proposal_delay_failpoint: Failpoint::new("consensus.proposal_delay"),
Expand Down Expand Up @@ -801,12 +815,43 @@ impl EraSupervisor {
block_payload,
block_context,
} = new_block_payload;

match self.current_era() {
None => {
warn!("new block payload but no initialized era");
Effects::new()
}
Some(current_era) => {
let now = Timestamp::now();

// if proposal is empty, do not send it unless too many increments of block time
// have passed. this turns down the volume of empty blocks
let is_empty_proposal = {
let lacks_transactions = block_payload.count(None) == 0;
// validator will always have their own signature for the previous block
let lacks_signatures = !block_payload.rewarded_signatures().has_at_least(2);
lacks_transactions && lacks_signatures
};
if is_empty_proposal {
// THIS BEHAVIOR ALLOWS FOR SKIPPING OF EMPTY PROPOSALS
if let Some(last_block_time) = self.last_block_time {
let threshold_to_force_proposal = last_block_time.saturating_add(
TimeDiff::from_millis(self.empty_proposal_tolerance_interval_to_use),
);
if now < threshold_to_force_proposal {
self.metrics.skipping_empty_proposal(now, last_block_time);
info!(
era = era_id.value(),
?last_block_time,
?now,
?threshold_to_force_proposal,
"SKIPPING EMPTY PROPOSAL: within tolerance for skipping an empty proposal"
);
return Effects::new();
}
}
}

if era_id.saturating_add(PAST_EVIDENCE_ERAS) < current_era
|| !self.open_eras.contains_key(&era_id)
{
Expand All @@ -815,7 +860,7 @@ impl EraSupervisor {
}
let proposed_block = ProposedBlock::new(block_payload, block_context);
self.delegate_to_era(effect_builder, rng, era_id, move |consensus, _| {
consensus.propose(proposed_block, Timestamp::now())
consensus.propose(proposed_block, now)
})
}
}
Expand All @@ -827,6 +872,7 @@ impl EraSupervisor {
rng: &mut NodeRng,
block_header: BlockHeader,
) -> Effects<Event> {
self.last_block_time = Some(block_header.timestamp());
self.last_progress = Timestamp::now();
self.next_executed_height = self
.next_executed_height
Expand Down
105 changes: 94 additions & 11 deletions node/src/components/consensus/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use prometheus::{Gauge, IntGauge, Registry};

use casper_types::Timestamp;

use crate::{types::FinalizedBlock, unregister_metric};
use casper_types::Timestamp;
use prometheus::{Gauge, Histogram, HistogramOpts, IntGauge, Registry};
use std::time::Duration;

/// Network metrics to track Consensus
#[derive(Debug)]
pub(super) struct Metrics {
/// The current era.
pub(super) consensus_current_era: IntGauge,
/// Gauge to track time between proposal and finalization.
finalization_time: Gauge,
/// Amount of finalized blocks.
Expand All @@ -15,8 +16,15 @@ pub(super) struct Metrics {
time_of_last_proposed_block: IntGauge,
/// Timestamp of the most recently finalized block.
time_of_last_finalized_block: IntGauge,
/// The current era.
pub(super) consensus_current_era: IntGauge,

/// INTENTIONALLY SKIPPED BLOCKS METRICS
/// Histogram of frequency of skipped proposals.
skipped_empty_proposal_hist: Histogram,
/// Count of skipped empty proposal.
skipped_empty_proposal: Gauge,
/// Timestamp of skipped empty proposal.
time_of_last_skipped_empty_proposal: IntGauge,

/// Registry component.
registry: Registry,
}
Expand All @@ -29,27 +37,87 @@ impl Metrics {
)?;
let finalized_block_count =
IntGauge::new("amount_of_blocks", "the number of blocks finalized so far")?;
let time_of_last_proposed_block = IntGauge::new(
"time_of_last_block_payload",
"timestamp of the most recently accepted block payload",
)?;
let time_of_last_finalized_block = IntGauge::new(
"time_of_last_finalized_block",
"timestamp of the most recently finalized block",
)?;
let consensus_current_era =
IntGauge::new("consensus_current_era", "the current era in consensus")?;
let time_of_last_proposed_block = IntGauge::new(
"time_of_last_block_payload",
"timestamp of the most recently accepted block payload",
)?;
registry.register(Box::new(finalization_time.clone()))?;
registry.register(Box::new(finalized_block_count.clone()))?;
registry.register(Box::new(consensus_current_era.clone()))?;
registry.register(Box::new(time_of_last_proposed_block.clone()))?;
registry.register(Box::new(time_of_last_finalized_block.clone()))?;

// SKIPPED EMPTY PROPOSAL METRICS
// HISTOGRAM, COUNT, and MOST RECENT INSTANCE (timestamp since epoch)

/*

In Prometheus, when using exponential_buckets to define histogram buckets, the count
variable specifies the number of buckets to generate.
Specifically:

The exponential_buckets(start, factor, count) function creates count buckets
for a histogram.

The start parameter defines the upper bound of the lowest bucket.
The factor parameter determines the multiplier for each subsequent bucket's upper bound.
Each following bucket's upper bound is factor times the previous bucket's upper bound.
The count variable directly dictates how many of these exponentially increasing buckets
will be created, excluding the implicit +Inf bucket which is always present for
histograms and handles observations exceeding the highest defined bucket.

Example: `exponential_buckets(100, 1.2, 3)`

This would create 3 buckets: Upper bound: 100, Upper bound: 100 * 1.2 = 120,
and Upper bound: 120 * 1.2 = 144.

Plus the +Inf bucket.
*/

let skipped_empty_proposal_hist = Histogram::with_opts(
HistogramOpts::new(
"skipped_empty_proposal_hist",
"histogram of skipped proposals start: 1s, factor: 1.75, buckets: 25",
)
// Create exponential buckets from one second to 1 minute with an off-step factor.
// BUCKETS (set up to accommodate a range of block times from 1s to 32s):
// 1s, 1.75s, 3.06s, 5.35s. 9.37s, 16.41s, 28.72s, 50.26s, 87.96s, 153.93s,
// 269.38s, 471.43s, 825s, 1443.75s, 2526.57s, 4421.51s, 7737.64s, 13540.87s,
// 23696.53s, 41468.93s, 72570.64s, 126998.62s, 222247.59s, 388933.29s, 680633.26s
// A given node's entries should be consistent with their configured
// empty_proposal_tolerance_interval setting and the chainspec minimum_block_time,
// allowing for spillover into the smallest eligible bucket.
// i.e. if the block time is 1s and config'd value is 0s there should be NO entries.
// however if config'd value is 10s, there may be entries in the 1s to 16.41s buckets
// and no entries in the 28.72s and up buckets; there may be entries in the 16.41s
// bucket because it is the smallest bucket skips in the 9.38s to 10s range can fit in.
.buckets(prometheus::exponential_buckets(1.0, 1.75, 25)?),
)?;
let time_of_last_skipped_empty_proposal = IntGauge::new(
"time_of_last_skipped_empty_proposal",
"timestamp of the most recently skipped empty proposal",
)?;
let skipped_empty_proposal =
Gauge::new("skipped_empty_proposal", "count of skipped empty proposals")?;
registry.register(Box::new(skipped_empty_proposal.clone()))?;
registry.register(Box::new(skipped_empty_proposal_hist.clone()))?;
registry.register(Box::new(time_of_last_skipped_empty_proposal.clone()))?;

Ok(Metrics {
consensus_current_era,
finalization_time,
finalized_block_count,
time_of_last_proposed_block,
time_of_last_finalized_block,
consensus_current_era,
skipped_empty_proposal_hist,
skipped_empty_proposal,
time_of_last_skipped_empty_proposal,
registry: registry.clone(),
})
}
Expand All @@ -69,6 +137,16 @@ impl Metrics {
self.time_of_last_proposed_block
.set(Timestamp::now().millis() as i64);
}

/// Updates the metrics and records a skipped empty proposal.
pub(super) fn skipping_empty_proposal(&mut self, now: Timestamp, last_block_time: Timestamp) {
let elapsed =
Duration::from_millis(now.saturating_diff(last_block_time).millis()).as_secs_f64();
self.skipped_empty_proposal_hist.observe(elapsed);
self.skipped_empty_proposal.inc();
self.time_of_last_skipped_empty_proposal
.set(now.millis() as i64);
}
}

impl Drop for Metrics {
Expand All @@ -78,5 +156,10 @@ impl Drop for Metrics {
unregister_metric!(self.registry, self.consensus_current_era);
unregister_metric!(self.registry, self.time_of_last_finalized_block);
unregister_metric!(self.registry, self.time_of_last_proposed_block);

// SKIPPED EMPTY PROPOSAL METRICS
unregister_metric!(self.registry, self.skipped_empty_proposal_hist);
unregister_metric!(self.registry, self.skipped_empty_proposal);
unregister_metric!(self.registry, self.time_of_last_skipped_empty_proposal);
}
}
2 changes: 1 addition & 1 deletion node/src/components/network/outgoing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1030,7 +1030,7 @@ where
| OutgoingState::Connecting { .. } => {
// We should, under normal circumstances, not receive drop notifications for
// any of these. Connection failures are handled by the dialer.
warn!("unexpected drop notification");
warn!(%outgoing.state, "unexpected drop notification");
None
}
OutgoingState::Connected { .. } => {
Expand Down
10 changes: 5 additions & 5 deletions node/src/components/transaction_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ impl TransactionBuffer {
.insert(transaction_hash, (expiry_time, Some(footprint)))
{
Some(prev) => {
warn!(%transaction_hash, ?prev, "TransactionBuffer: transaction upserted");
debug!(%transaction_hash, ?prev, "TransactionBuffer: transaction upserted");
}
None => {
debug!(%transaction_hash, "TransactionBuffer: new transaction buffered");
Expand Down Expand Up @@ -667,6 +667,10 @@ where
{
type Event = Event;

fn name(&self) -> &str {
COMPONENT_NAME
}

fn handle_event(
&mut self,
effect_builder: EffectBuilder<REv>,
Expand Down Expand Up @@ -806,8 +810,4 @@ where
},
}
}

fn name(&self) -> &str {
COMPONENT_NAME
}
}
2 changes: 1 addition & 1 deletion node/src/reactor/main_reactor/tests/network_general.rs
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ async fn network_should_recover_from_stall() {
}

// Ensure all nodes progress until block 3 is marked complete.
fixture.run_until_block_height(3, TEN_SECS).await;
fixture.run_until_block_height(3, ONE_MIN).await;
}

#[tokio::test]
Expand Down
14 changes: 14 additions & 0 deletions resources/local/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,20 @@ secret_key_path = 'secret_key.pem'
# If it is more than that, consensus will pause, and resume once the executor has caught up.
max_execution_delay = 3

# If a validating node is selected to propose a block but does not have any transactions to propose,
# it results in an empty block being proposed. Empty blocks require computation and network traffic
# to process and store. In periods of no to low activity on the network, a sequence of such empty
# blocks are produced on the chain, taking up disk space for no real benefit.
#
# This setting allows a validator to opt to not propose an empty block unless the time since the
# last block exceeds a time threshold. If this setting is 0, a validating node chosen to propose
# with no transactions will always propose. If this setting is greater than 0, such a node will
# only propose an empty block if the elapsed time since the last block in milliseconds is equal
# or greater. In other words, if the value is 5 a validator selected to propose will not propose
# an empty block unless it has been 5 milliseconds or more since the last block.
#
# A configured value greater than 10 x minimum_block_time is capped to 10 x minimum_block_time.
empty_proposal_tolerance_interval = 0

# =======================================
# Configuration options for Zug consensus
Expand Down
Loading