Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3957,7 +3957,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// See https://github.com/sigp/lighthouse/issues/2028
let (_, signed_block, block_data) = signed_block.deconstruct();

match self.get_blobs_or_columns_store_op(block_root, block_data) {
match self.get_blobs_or_columns_store_op(block_root, signed_block.slot(), block_data) {
Ok(Some(blobs_or_columns_store_op)) => {
ops.push(blobs_or_columns_store_op);
}
Expand Down Expand Up @@ -7163,6 +7163,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
pub(crate) fn get_blobs_or_columns_store_op(
&self,
block_root: Hash256,
block_slot: Slot,
block_data: AvailableBlockData<T::EthSpec>,
) -> Result<Option<StoreOp<'_, T::EthSpec>>, String> {
match block_data {
Expand All @@ -7175,7 +7176,15 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
);
Ok(Some(StoreOp::PutBlobs(block_root, blobs)))
}
AvailableBlockData::DataColumns(data_columns) => {
AvailableBlockData::DataColumns(mut data_columns) => {
let columns_to_custody = self.custody_columns_for_epoch(Some(
block_slot.epoch(T::EthSpec::slots_per_epoch()),
));
// Supernodes need to persist all sampled custody columns
Copy link
Collaborator

@dapplion dapplion Oct 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Supernodes need to persist all sampled custody columns
// If `columns_to_custody` contains all possible custody groups, no need to filter

I would update in the comment the mention of supernode. In the future, nodes custodying >50% of columns and doing reconstruction may be supernodes?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is fine, supernode is explicitly defined here in the spec
https://github.com/ethereum/consensus-specs/blob/dev/specs/fulu/p2p-interface.md#supernodes

if columns_to_custody.len() != self.spec.number_of_custody_groups as usize {
data_columns
.retain(|data_column| columns_to_custody.contains(&data_column.index));
}
debug!(
%block_root,
count = data_columns.len(),
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/beacon_chain/src/historical_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

// Store the blobs or data columns too
if let Some(op) = self
.get_blobs_or_columns_store_op(block_root, block_data)
.get_blobs_or_columns_store_op(block_root, block.slot(), block_data)
.map_err(|e| {
HistoricalBlockError::StoreError(StoreError::DBError {
message: format!("get_blobs_or_columns_store_op error {e:?}"),
Expand Down
1 change: 1 addition & 0 deletions beacon_node/beacon_chain/tests/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ fn get_harness(validator_count: usize) -> BeaconChainHarness<EphemeralHarnessTyp
..ChainConfig::default()
})
.keypairs(KEYPAIRS[0..validator_count].to_vec())
.import_all_data_columns(true)
.fresh_ephemeral_store()
.mock_execution_layer()
.build();
Expand Down
164 changes: 163 additions & 1 deletion beacon_node/beacon_chain/tests/store_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ use beacon_chain::test_utils::{
use beacon_chain::{
BeaconChain, BeaconChainError, BeaconChainTypes, BeaconSnapshot, BlockError, ChainConfig,
NotifyExecutionLayer, ServerSentEventHandler, WhenSlotSkipped,
data_availability_checker::MaybeAvailableBlock, historical_blocks::HistoricalBlockError,
data_availability_checker::{AvailableBlockData, MaybeAvailableBlock},
historical_blocks::HistoricalBlockError,
migrate::MigratorConfig,
};
use logging::create_test_tracing_subscriber;
Expand Down Expand Up @@ -4137,6 +4138,167 @@ async fn replay_from_split_state() {
assert_eq!(state.slot(), split.slot);
}

/// Test that regular nodes filter and store only custody columns when processing blocks with data columns.
#[tokio::test]
async fn test_custody_column_filtering_regular_node() {
let db_path = tempdir().unwrap();
let store = get_store(&db_path);
let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT);

// Skip test if PeerDAS is not scheduled
if !harness.spec.is_peer_das_scheduled() {
return;
}

// Generate a block with data columns using public test utils
let fork_name = harness.spec.fork_name_at_epoch(Epoch::new(0));
let mut rng = rand::rng();
let (signed_block, all_data_columns) =
beacon_chain::test_utils::generate_rand_block_and_data_columns::<E>(
fork_name,
beacon_chain::test_utils::NumBlobs::Number(1),
&mut rng,
&harness.spec,
);

let block_root = signed_block.canonical_root();
let slot = signed_block.slot();

// Get custody columns for this epoch - regular nodes only store a subset
let custody_columns = harness
.chain
.custody_columns_for_epoch(Some(slot.epoch(E::slots_per_epoch())));

// Verify that custody columns is a proper subset (not all columns for regular node)
assert!(
custody_columns.len() < harness.spec.number_of_custody_groups as usize,
"Regular node should not custody all columns"
);

// Create AvailableBlock with all data columns
let available_block_data = AvailableBlockData::DataColumns(all_data_columns.clone());
let available_block = AvailableBlock::__new_for_testing(
block_root,
Arc::new(signed_block),
available_block_data,
harness.spec.clone(),
);

// Import the block through the historical block batch import which calls the custody filtering internally
harness
.chain
.import_historical_block_batch(vec![available_block])
.expect("should import block with data columns");

// Check what actually got stored in the database
let stored_column_keys = store
.get_data_column_keys(block_root)
.expect("should get stored column keys");

// Verify only custody columns were stored
let stored_column_indices: std::collections::HashSet<_> =
stored_column_keys.into_iter().collect();
let expected_column_indices: std::collections::HashSet<_> =
custody_columns.iter().copied().collect();

assert_eq!(
stored_column_indices, expected_column_indices,
"Regular node should only store custody columns"
);
}

/// Test that supernodes store all data columns when processing blocks with data columns.
#[tokio::test]
async fn test_custody_column_filtering_supernode() {
let db_path = tempdir().unwrap();
let store = get_store(&db_path);
let harness = get_harness_import_all_data_columns(store.clone(), LOW_VALIDATOR_COUNT);

// Skip test if PeerDAS is not scheduled
if !harness.spec.is_peer_das_scheduled() {
return;
}

// Generate a block with data columns using public test utils
let fork_name = harness.spec.fork_name_at_epoch(Epoch::new(0));
let mut rng = rand::rng();
let (signed_block, all_data_columns) =
beacon_chain::test_utils::generate_rand_block_and_data_columns::<E>(
fork_name,
beacon_chain::test_utils::NumBlobs::Number(3), // Generate 3 blobs to produce data columns
&mut rng,
&harness.spec,
);

// Skip test if no data columns are generated
if all_data_columns.is_empty() {
return;
}

let block_root = signed_block.canonical_root();

// Create AvailableBlock with all data columns
let available_block_data = AvailableBlockData::DataColumns(all_data_columns.clone());
let available_block = AvailableBlock::__new_for_testing(
block_root,
Arc::new(signed_block),
available_block_data,
harness.spec.clone(),
);

// Import the block through the historical block batch import
harness
.chain
.import_historical_block_batch(vec![available_block])
.expect("should import block with data columns");

// Check what actually got stored in the database
let stored_column_keys = store
.get_data_column_keys(block_root)
.expect("should get stored column keys");

// Verify ALL data columns were stored for supernodes
let stored_column_indices: std::collections::HashSet<_> =
stored_column_keys.into_iter().collect();
let all_column_indices: std::collections::HashSet<_> =
all_data_columns.iter().map(|dc| dc.index).collect();

assert_eq!(
stored_column_indices, all_column_indices,
"Supernode should store ALL data columns"
);

// Verify the count matches
assert_eq!(
stored_column_indices.len(),
all_data_columns.len(),
"Supernode should store the same number of columns as generated"
);

// Verify each stored column can be retrieved and matches original data
let stored_columns = store
.get_data_columns(&block_root)
.expect("should get data columns")
.expect("data columns should exist");

for original_column in &all_data_columns {
let stored_column = stored_columns
.iter()
.find(|stored| stored.index == original_column.index)
.unwrap_or_else(|| {
panic!(
"Column {} should be present in stored data",
original_column.index
)
});
assert_eq!(
stored_column, original_column,
"Stored column {} should match original",
original_column.index
);
}
}

/// Checks that two chains are the same, for the purpose of these tests.
///
/// Several fields that are hard/impossible to check are ignored (e.g., the store).
Expand Down
Loading