diff --git a/Cargo.lock b/Cargo.lock index d0df7eb2..2982cf6c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12769,8 +12769,11 @@ dependencies = [ "alloy-primitives", "alloy-provider", "alloy-rpc-types-eth", + "alloy-signer-local", + "alloy-sol-types", "eyre", "getrandom 0.2.16", + "reth-e2e-test-utils", "reth-tracing", "scroll-alloy-network", "serde", diff --git a/crates/sequencer/src/lib.rs b/crates/sequencer/src/lib.rs index 86d4f283..9958c1cc 100644 --- a/crates/sequencer/src/lib.rs +++ b/crates/sequencer/src/lib.rs @@ -247,7 +247,7 @@ async fn build_payload_attributes Result { let mut l1_messages = vec![]; let mut cumulative_gas_used = 0; - let expected_index = l1_messages_queue_index; + let mut expected_index = l1_messages_queue_index; // Collect L1 messages to include in payload. let db_l1_messages = provider @@ -277,6 +277,7 @@ async fn build_payload_attributes /dev/null 2>&1; then - echo "Error: anvil failed to start" - exit 1 -fi +# # Check if anvil is running +# if ! cast rpc eth_blockNumber --rpc-url http://localhost:8545 > /dev/null 2>&1; then +# echo "Error: anvil failed to start" +# exit 1 +# fi # Set L1 system contract consensus address -echo "Setting system contract consensus address..." -cast rpc anvil_setStorageAt \ - 0x55B150d210356452e4E79cCb6B778b4e1B167091 \ - 0x0000000000000000000000000000000000000000000000000000000000000067 \ - 0x000000000000000000000000b674Ff99cca262c99D3eAb5B32796a99188543dA \ - --rpc-url http://localhost:8545 +# echo "Setting system contract consensus address..." +# cast rpc anvil_setStorageAt \ +# 0x55B150d210356452e4E79cCb6B778b4e1B167091 \ +# 0x0000000000000000000000000000000000000000000000000000000000000067 \ +# 0x000000000000000000000000b674Ff99cca262c99D3eAb5B32796a99188543dA \ +# --rpc-url http://localhost:8545 # Verify that storage was set correctly echo "Verifying storage..." -storage_value=$(cast storage 0x55B150d210356452e4E79cCb6B778b4e1B167091 0x67 --rpc-url http://localhost:8545) -expected_value="0x000000000000000000000000b674ff99cca262c99d3eab5b32796a99188543da" +storage_value=$(cast storage "$L1_SYSTEM_CONFIG_PROXY_ADDR" 0x67 --rpc-url http://localhost:8545) +expected_value="0x000000000000000000000000$(echo "$L1_CONSENSUS_ADDRESS" | sed 's/0x//' | tr '[:upper:]' '[:lower:]')" if [ "$storage_value" != "$expected_value" ]; then echo "Error: Storage verify failed" diff --git a/tests/launch_l2geth_follower.bash b/tests/launch_l2geth_follower.bash index 51ff1de4..9d37e411 100644 --- a/tests/launch_l2geth_follower.bash +++ b/tests/launch_l2geth_follower.bash @@ -9,7 +9,7 @@ exec geth --datadir=/l2geth \ --http --http.addr 0.0.0.0 --http.port 8545 --http.vhosts "*" --http.corsdomain "*" --http.api "admin,eth,scroll,net,web3,debug" \ --ws --ws.addr 0.0.0.0 --ws.port 8546 --ws.api "admin,eth,scroll,net,web3,debug" \ --pprof --pprof.addr 0.0.0.0 --pprof.port 6060 --metrics --verbosity 5 --log.debug \ - --l1.endpoint "http://l1-node:8545" --l1.confirmations finalized --l1.sync.startblock 0 \ + --l1.endpoint "http://l1-node:8545" --l1.confirmations finalized --l1.sync.startblock 0 --l1.sync.interval 1s \ --gcmode archive --cache.noprefetch --cache.snapshot=0 --snapshot=false \ --gossip.enablebroadcasttoall \ --nat extip:0.0.0.0 diff --git a/tests/launch_l2geth_sequencer.bash b/tests/launch_l2geth_sequencer.bash index 6cd7207c..1101f47f 100644 --- a/tests/launch_l2geth_sequencer.bash +++ b/tests/launch_l2geth_sequencer.bash @@ -19,7 +19,7 @@ exec geth --datadir=/l2geth \ --http --http.addr 0.0.0.0 --http.port 8545 --http.vhosts "*" --http.corsdomain "*" --http.api "admin,eth,scroll,net,web3,debug,miner" \ --ws --ws.addr 0.0.0.0 --ws.port 8546 --ws.api "admin,eth,scroll,net,web3,debug,miner" \ --pprof --pprof.addr 0.0.0.0 --pprof.port 6060 --metrics --verbosity 5 --log.debug \ - --l1.endpoint "http://l1-node:8545" --l1.confirmations finalized --l1.sync.startblock 0 \ + --l1.endpoint "http://l1-node:8545" --l1.confirmations finalized --l1.sync.startblock 0 --l1.sync.interval 1s \ --gcmode archive --cache.noprefetch --cache.snapshot=0 --snapshot=false \ --nat extip:0.0.0.0 \ --gossip.enablebroadcasttoall \ diff --git a/tests/src/docker_compose.rs b/tests/src/docker_compose.rs index f5977701..45a12baf 100644 --- a/tests/src/docker_compose.rs +++ b/tests/src/docker_compose.rs @@ -11,11 +11,18 @@ use tokio::{ pub struct NamedProvider { pub provider: Box>, pub name: &'static str, + pub service_name: &'static str, + pub rpc_url: &'static str, } impl NamedProvider { - pub fn new(provider: Box>, name: &'static str) -> Self { - Self { provider, name } + pub fn new( + provider: Box>, + name: &'static str, + service_name: &'static str, + rpc_url: &'static str, + ) -> Self { + Self { provider, name, service_name, rpc_url } } } @@ -88,8 +95,8 @@ impl DockerComposeEnv { project_name, "up", "-d", - "--force-recreate", - "--build", + // "--force-recreate", + // "--build", ]) .stdout(std::process::Stdio::piped()) .stderr(std::process::Stdio::piped()) @@ -158,7 +165,12 @@ impl DockerComposeEnv { .connect(RN_SEQUENCER_RPC_URL) .await .map_err(|e| eyre::eyre!("Failed to connect to RN sequencer: {}", e))?; - Ok(NamedProvider::new(Box::new(provider), "RN Sequencer")) + Ok(NamedProvider::new( + Box::new(provider), + "RN Sequencer", + "rollup-node-sequencer", + RN_SEQUENCER_RPC_URL, + )) } /// Get a configured follower provider @@ -168,7 +180,12 @@ impl DockerComposeEnv { .connect(RN_FOLLOWER_RPC_URL) .await .map_err(|e| eyre::eyre!("Failed to connect to RN follower: {}", e))?; - Ok(NamedProvider::new(Box::new(provider), "RN Follower")) + Ok(NamedProvider::new( + Box::new(provider), + "RN Follower", + "rollup-node-follower", + RN_FOLLOWER_RPC_URL, + )) } /// Get a configured l2geth sequencer provider @@ -178,7 +195,12 @@ impl DockerComposeEnv { .connect(L2GETH_SEQUENCER_RPC_URL) .await .map_err(|e| eyre::eyre!("Failed to connect to l2geth sequencer: {}", e))?; - Ok(NamedProvider::new(Box::new(provider), "L2Geth Sequencer")) + Ok(NamedProvider::new( + Box::new(provider), + "L2Geth Sequencer", + "l2geth-sequencer", + L2GETH_SEQUENCER_RPC_URL, + )) } /// Get a configured l2geth follower provider @@ -188,7 +210,12 @@ impl DockerComposeEnv { .connect(L2GETH_FOLLOWER_RPC_URL) .await .map_err(|e| eyre::eyre!("Failed to connect to l2geth follower: {}", e))?; - Ok(NamedProvider::new(Box::new(provider), "L2Geth Follower")) + Ok(NamedProvider::new( + Box::new(provider), + "L2Geth Follower", + "l2geth-follower", + L2GETH_FOLLOWER_RPC_URL, + )) } // ===== UTILITIES ===== @@ -299,6 +326,108 @@ impl DockerComposeEnv { Ok(ip) } + // ===== CONTAINER CONTROL ===== + + /// Stop a container + pub async fn stop_container(&self, provider: &NamedProvider) -> Result<()> { + let service_name = provider.service_name; + tracing::info!("🛑 Stopping container: {}", service_name); + + let output = Command::new("docker") + .args([ + "compose", + "-f", + &self.compose_file, + "-p", + &self.project_name, + "stop", + service_name, + ]) + .output() + .map_err(|e| eyre::eyre!("Failed to run docker compose stop: {}", e))?; + + if !output.status.success() { + return Err(eyre::eyre!( + "Failed to stop container {}: {}", + service_name, + String::from_utf8_lossy(&output.stderr) + )); + } + + tracing::info!("✅ Stopped container: {}", service_name); + Ok(()) + } + + /// Start a container + pub async fn start_container(&self, provider: &NamedProvider) -> Result<()> { + let service_name = provider.service_name; + tracing::info!("▶️ Starting container: {}", service_name); + + let output = Command::new("docker") + .args([ + "compose", + "-f", + &self.compose_file, + "-p", + &self.project_name, + "start", + service_name, + ]) + .output() + .map_err(|e| eyre::eyre!("Failed to run docker compose start: {}", e))?; + + if !output.status.success() { + return Err(eyre::eyre!( + "Failed to start container {}: {}", + service_name, + String::from_utf8_lossy(&output.stderr) + )); + } + + tracing::info!("✅ Started container: {}", service_name); + + Self::wait_for_l2_node_ready(provider.rpc_url, 30).await.map_err(|e| { + eyre::eyre!("Container {} did not become ready after start: {}", service_name, e) + })?; + + Ok(()) + } + + /// Restart a container + pub async fn restart_container(&self, provider: &NamedProvider) -> Result<()> { + let service_name = provider.service_name; + tracing::info!("🔄 Restarting container: {}", service_name); + + let output = Command::new("docker") + .args([ + "compose", + "-f", + &self.compose_file, + "-p", + &self.project_name, + "restart", + service_name, + ]) + .output() + .map_err(|e| eyre::eyre!("Failed to run docker compose restart: {}", e))?; + + if !output.status.success() { + return Err(eyre::eyre!( + "Failed to restart container {}: {}", + service_name, + String::from_utf8_lossy(&output.stderr) + )); + } + + tracing::info!("✅ Restarted container: {}", service_name); + + Self::wait_for_l2_node_ready(provider.rpc_url, 30).await.map_err(|e| { + eyre::eyre!("Container {} did not become ready after start: {}", service_name, e) + })?; + + Ok(()) + } + /// Get the rollup node sequencer enode URL with resolved IP address pub fn rn_sequencer_enode(&self) -> Result { let ip = self.get_container_ip(&self.get_full_container_name("rollup-node-sequencer"))?; diff --git a/tests/src/utils.rs b/tests/src/utils.rs index 06ae420a..21e6ebdd 100644 --- a/tests/src/utils.rs +++ b/tests/src/utils.rs @@ -1,9 +1,66 @@ -use alloy_rpc_types_eth::BlockNumberOrTag; -use eyre::Result; -use std::time::Duration; +use alloy_network::TransactionBuilder; +use alloy_primitives::{address, hex::ToHexExt, Address, Bytes, TxHash, U256}; +use alloy_provider::{Provider, ProviderBuilder}; +use alloy_rpc_types_eth::{BlockId, BlockNumberOrTag, TransactionRequest}; +use alloy_signer_local::PrivateKeySigner; +use alloy_sol_types::{sol, SolCall}; +use eyre::{Ok, Result}; +use reth_e2e_test_utils::{transaction::TransactionTestContext, wallet::Wallet}; +use std::{ + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + time::Duration, +}; +use tokio::{sync::Mutex, time::interval}; use crate::docker_compose::NamedProvider; +// ===== L1 CONTRACT CONSTANTS ===== + +/// L1 node RPC URL for docker tests (port 8544 on host maps to 8545 in container) +const L1_RPC_URL: &str = "http://localhost:8544"; + +/// L1 deployer private key (first Anvil account) +const L1_DEPLOYER_PRIVATE_KEY: &str = + "0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80"; + +/// L1 Scroll Messenger proxy address +const L1_SCROLL_MESSENGER_PROXY_ADDR: Address = + address!("8A791620dd6260079BF849Dc5567aDC3F2FdC318"); + +/// L1 Enforced Transaction Gateway proxy address +const L1_ENFORCED_TX_GATEWAY_PROXY_ADDR: Address = + address!("68B1D87F95878fE05B998F19b66F4baba5De1aed"); + +/// L1 Message Queue V2 proxy address +const L1_MESSAGE_QUEUE_V2_PROXY_ADDR: Address = + address!("Dc64a140Aa3E981100a9becA4E685f962f0cF6C9"); + +// ===== L1 CONTRACT INTERFACES ===== + +sol! { + /// L1 Scroll Messenger sendMessage function + function sendMessage( + address to, + uint256 value, + bytes memory message, + uint256 gasLimit + ) external payable; + + /// L1 Enforced Transaction Gateway sendTransaction function + function sendTransaction( + address target, + uint256 value, + uint256 gasLimit, + bytes calldata data + ) external payable; + + /// L1 Message Queue V2 nextCrossDomainMessageIndex function + function nextCrossDomainMessageIndex() external view returns (uint256); +} + /// Enable automatic sequencing on a rollup node pub async fn enable_automatic_sequencing(provider: &NamedProvider) -> Result { provider @@ -38,6 +95,78 @@ pub async fn miner_stop(provider: &NamedProvider) -> Result<()> { .map_err(|e| eyre::eyre!("Failed to stop miner: {}", e)) } +/// Get the latest relayed queue index from an l2geth node. +/// +/// # Arguments +/// * `provider` - The L2 node provider (only l2geth) +/// +/// # Returns +/// * `Ok(u64)` - The latest relayed queue index +/// * `Err` - If the RPC call fails or the value cannot be parsed +pub async fn scroll_get_latest_relayed_queue_index(provider: &NamedProvider) -> Result { + let result: u64 = provider + .client() + .request("scroll_getLatestRelayedQueueIndex", ()) + .await + .map_err(|e| eyre::eyre!("Failed to get latest relayed queue index: {}", e))?; + + Ok(result) +} + +pub async fn wait_for_l1_message_queue_index_reached( + nodes: &[&NamedProvider], + expected_index: u64, +) -> Result<()> { + let timeout_duration = Duration::from_secs(60); + let timeout_secs = timeout_duration.as_secs(); + + tracing::info!( + "⏳ Waiting for {} nodes to reach queue index {}... (timeout: {}s)", + nodes.len(), + expected_index, + timeout_secs + ); + for i in 0..timeout_secs * 2 { + let mut all_matched = true; + let mut node_statuses = Vec::new(); + + for node in nodes { + let current_index = scroll_get_latest_relayed_queue_index(node).await?; + node_statuses.push((node.name, current_index)); + + if current_index < expected_index { + all_matched = false; + } + } + + if all_matched { + tracing::info!("✅ All nodes reached expected queue index {}", expected_index); + for (name, index) in node_statuses { + tracing::info!(" - {}: queue index {}", name, index); + } + return Ok(()); + } + + // Log progress every 5 seconds + if i % 10 == 0 { + tracing::info!("Progress check ({}s elapsed):", i / 2); + for (name, index) in node_statuses { + tracing::info!( + " - {}: queue index {} / {} {}", + name, + index, + expected_index, + if index >= expected_index { "✅" } else { "⏳" } + ); + } + } + + tokio::time::sleep(Duration::from_millis(500)).await; + } + + Ok(()) +} + /// Waits for all provided nodes to reach the target block number. /// /// # Arguments @@ -214,3 +343,287 @@ pub async fn admin_remove_trusted_peer(provider: &NamedProvider, enode: &str) -> .await .map_err(|e| eyre::eyre!("Failed to remove trusted peer {}: {}", enode, e)) } + +pub fn create_wallet(chain_id: u64) -> Arc> { + Arc::new(Mutex::new(Wallet::default().with_chain_id(chain_id))) +} + +/// Generate a transfer transaction with the given wallet. +pub async fn generate_tx(wallet: Arc>) -> Bytes { + let mut wallet = wallet.lock().await; + let tx_fut = TransactionTestContext::transfer_tx_nonce_bytes( + wallet.chain_id, + wallet.inner.clone(), + wallet.inner_nonce, + ); + wallet.inner_nonce += 1; + tx_fut.await +} + +/// Send a raw transaction to multiple nodes, optionally waiting for confirmation. +pub async fn send_tx( + wallet: Arc>, + nodes: &[&NamedProvider], + wait_for_confirmation: bool, +) -> Result<()> { + let tx = generate_tx(wallet).await; + + tracing::debug!("Sending transaction: {:?}", tx); + let tx: Vec = tx.into(); + let mut pending_txs = Vec::new(); + + for node in nodes { + let builder = node.send_raw_transaction(&tx).await; + match builder { + std::result::Result::Ok(builder) => { + let pending_tx = builder.register().await?; + tracing::debug!( + "Sent transaction {:?} to node: {:?}", + pending_tx.tx_hash(), + node.name + ); + pending_txs.push(pending_tx); + } + Err(e) => { + if e.to_string().contains("already known") { + continue; + } + eyre::bail!("Failed to send transaction to node {}: {}", node.name, e); + } + }; + } + + if wait_for_confirmation { + for pending_tx in pending_txs { + let r = pending_tx.await?; + tracing::debug!("Transaction confirmed: {:?}", r.encode_hex()); + } + } + + Ok(()) +} + +/// Simple L2 transaction sender that runs continuously until `stop` is set to true. +pub async fn run_continuous_tx_sender(stop: Arc, nodes: &[&NamedProvider]) -> u64 { + let mut interval = interval(Duration::from_millis(50)); + let mut tx_count = 0u64; + + let wallet = create_wallet(nodes[0].get_chain_id().await.expect("Failed to get chain id")); + + while !stop.load(Ordering::Relaxed) { + interval.tick().await; + + if let Err(e) = send_tx(wallet.clone(), nodes, false).await { + tracing::error!("Error sending transaction: {}", e); + } else { + tx_count += 1; + } + } + + tx_count +} + +pub async fn stop_continuous_tx_sender( + stop: Arc, + tx_sender: tokio::task::JoinHandle, +) -> Result<()> { + stop.store(true, Ordering::Relaxed); + let tx_count = tx_sender.await?; + tracing::info!( + "🔄 Stopped continuous transaction sender after sending {} transactions", + tx_count + ); + + Ok(()) +} + +/// Simple L1 message sender that runs continuously until `stop` is set to true. +pub async fn run_continuous_l1_message_sender(stop: Arc) -> () { + while !stop.load(Ordering::Relaxed) { + if let Err(e) = send_l1_scroll_messenger_message( + address!("0000000000000000000000000000000000000001"), + U256::from(1), + Bytes::new(), + 200000, + true, + ) + .await + { + tracing::error!("Error sending L1 Scroll Messenger message: {}", e); + } + + if let Err(e) = send_l1_enforced_tx_gateway_transaction( + address!("0000000000000000000000000000000000000001"), + U256::from(1), + 200000, + Bytes::new(), + true, + ) + .await + { + tracing::error!("Error sending L1 Enforced Tx Gateway transaction: {}", e); + } + } +} + +pub async fn stop_continuous_l1_message_sender( + stop: Arc, + l1_message_sender: tokio::task::JoinHandle<()>, +) -> Result<()> { + stop.store(true, Ordering::Relaxed); + l1_message_sender.await?; + tracing::info!("🔄 Stopped continuous L1 message sender"); + + Ok(()) +} + +/// Send a message via the L1 Scroll Messenger contract. +/// +/// # Arguments +/// * `to` - The target address on L2 +/// * `value` - The amount of wei to send with the message on L2 +/// * `message` - The calldata to execute on L2 +/// * `gas_limit` - The gas limit for executing the message on L2 +pub async fn send_l1_scroll_messenger_message( + to: Address, + value: U256, + message: Bytes, + gas_limit: u64, + wait_for_confirmation: bool, +) -> Result<()> { + // Parse the private key and create a signer + let signer: PrivateKeySigner = L1_DEPLOYER_PRIVATE_KEY + .parse() + .map_err(|e| eyre::eyre!("Failed to parse L1 deployer private key: {}", e))?; + + // Create a provider with the wallet + let provider = ProviderBuilder::new() + .wallet(signer) + .connect(L1_RPC_URL) + .await + .map_err(|e| eyre::eyre!("Failed to connect to L1 RPC: {}", e))?; + + // Encode the function call + let call = sendMessageCall { to, value, message, gasLimit: U256::from(gas_limit) }; + let calldata = call.abi_encode(); + + // Build the transaction request + let tx = TransactionRequest::default() + .with_to(L1_SCROLL_MESSENGER_PROXY_ADDR) + .with_input(calldata) + .with_value(U256::from(10_000_000_000_000_000u64)) // 0.01 ether + .with_gas_limit(200000) + .with_gas_price(100_000_000); // 0.1 gwei + + let pending_tx = provider.send_transaction(tx).await?; + tracing::debug!( + "📨 Sent L1 Scroll Messenger message to {:?}, tx hash: {:?}", + to, + pending_tx.tx_hash() + ); + + if wait_for_confirmation { + let r = pending_tx.watch().await?; + tracing::debug!("📨 L1 Scroll Messenger message confirmed: {:?}", r.encode_hex()); + } + + Ok(()) +} + +/// Send a transaction via the L1 Enforced Transaction Gateway contract. +/// +/// # Arguments +/// * `target` - The target address on L2 to call +/// * `value` - The amount of wei to send with the transaction on L2 +/// * `gas_limit` - The gas limit for executing the transaction on L2 +/// * `data` - The calldata to execute on L2 +pub async fn send_l1_enforced_tx_gateway_transaction( + target: Address, + value: U256, + gas_limit: u64, + data: Bytes, + wait_for_confirmation: bool, +) -> Result<()> { + // Parse the private key and create a signer + let signer: PrivateKeySigner = L1_DEPLOYER_PRIVATE_KEY + .parse() + .map_err(|e| eyre::eyre!("Failed to parse L1 deployer private key: {}", e))?; + + // Create a provider with the wallet + let provider = ProviderBuilder::new() + .wallet(signer) + .connect(L1_RPC_URL) + .await + .map_err(|e| eyre::eyre!("Failed to connect to L1 RPC: {}", e))?; + + // Encode the function call + let call = sendTransactionCall { target, value, gasLimit: U256::from(gas_limit), data }; + let calldata = call.abi_encode(); + + // Build the transaction request + let tx = TransactionRequest::default() + .with_to(L1_ENFORCED_TX_GATEWAY_PROXY_ADDR) + .with_input(calldata) + .with_value(U256::from(10_000_000_000_000_000u64)) // 0.01 ether + .with_gas_limit(200000) + .with_gas_price(100_000_000); // 0.1 gwei + + let pending_tx = provider.send_transaction(tx).await?; + tracing::debug!( + "🚀 Sent L1 Enforced Tx Gateway transaction to {:?}, tx hash: {:?}", + target, + pending_tx.tx_hash() + ); + + if wait_for_confirmation { + let r = pending_tx.watch().await?; + tracing::debug!("🚀 L1 Enforced Tx Gateway transaction confirmed: {:?}", r.encode_hex()); + } + + Ok(()) +} + +/// Get the L1 message index at the finalized block. +/// +/// This function queries the `nextCrossDomainMessageIndex` from the L1 Message Queue V2 contract +/// at the **finalized** block head. The contract returns the next message index, therefore we +/// subtract 1. +/// +/// # Returns +/// * `Ok(u64)` - The index of the last L1 messages that has been queued +/// * `Err` - If the call fails or the returned value cannot be converted to u64 +pub async fn get_l1_message_index_at_finalized() -> Result { + // Create a provider (no signer needed for read-only calls) + let provider = ProviderBuilder::new() + .connect(L1_RPC_URL) + .await + .map_err(|e| eyre::eyre!("Failed to connect to L1 RPC: {}", e))?; + + // Encode the function call + let call = nextCrossDomainMessageIndexCall {}; + let calldata = call.abi_encode(); + + // Build the call request + let tx = + TransactionRequest::default().with_to(L1_MESSAGE_QUEUE_V2_PROXY_ADDR).with_input(calldata); + + // Execute the call at the finalized block + let result = + provider.call(tx).block(BlockId::Number(BlockNumberOrTag::Finalized)).await.map_err( + |e| eyre::eyre!("Failed to call nextCrossDomainMessageIndex at finalized block: {}", e), + )?; + + // Decode the result - returns U256 directly + let count_u256 = nextCrossDomainMessageIndexCall::abi_decode_returns(&result) + .map_err(|e| eyre::eyre!("Failed to decode nextCrossDomainMessageIndex result: {}", e))?; + + // Convert U256 to u64 + let count: u64 = + count_u256.try_into().map_err(|_| eyre::eyre!("Message count exceeds u64::MAX"))?; + + if count == 0 { + return Ok(0); + } + + Ok(count - 1) // Subtract 1 to get the last queued message index +} diff --git a/tests/tests/heterogeneous_client_sync_and_sequencer_handoff.rs b/tests/tests/heterogeneous_client_sync_and_sequencer_handoff.rs index 167cf047..80c705ee 100644 --- a/tests/tests/heterogeneous_client_sync_and_sequencer_handoff.rs +++ b/tests/tests/heterogeneous_client_sync_and_sequencer_handoff.rs @@ -1,4 +1,5 @@ use eyre::Result; +use std::sync::{atomic::AtomicBool, Arc}; use tests::*; /// Tests cross-client block propagation and synchronization between heterogeneous nodes. @@ -47,7 +48,8 @@ async fn docker_test_heterogeneous_client_sync_and_sequencer_handoff() -> Result reth_tracing::init_test_tracing(); tracing::info!("=== STARTING docker_test_heterogeneous_client_sync_and_sequencer_handoff ==="); - let env = DockerComposeEnv::new("multi-client-propagation").await?; + let env = DockerComposeEnv::new("docker_test_heterogeneous_client_sync_and_sequencer_handoff") + .await?; let rn_sequencer = env.get_rn_sequencer_provider().await?; let rn_follower = env.get_rn_follower_provider().await?; @@ -66,6 +68,21 @@ async fn docker_test_heterogeneous_client_sync_and_sequencer_handoff() -> Result // Enable block production on l2geth sequencer utils::miner_start(&l2geth_sequencer).await?; + // Start single continuous transaction sender for entire test + let stop = Arc::new(AtomicBool::new(false)); + let stop_clone = stop.clone(); + let rn_follower_clone = env.get_rn_follower_provider().await.unwrap(); + let l2geth_follower_clone = env.get_l2geth_follower_provider().await.unwrap(); + let tx_sender = tokio::spawn(async move { + utils::run_continuous_tx_sender(stop_clone, &[&rn_follower_clone, &l2geth_follower_clone]) + .await + }); + let stop_clone = stop.clone(); + let l1_message_sender = + tokio::spawn(async move { utils::run_continuous_l1_message_sender(stop_clone).await }); + + tracing::info!("🔄 Started continuous L1 message and L2 transaction sender for entire test"); + // Wait for at least 10 blocks to be produced let target_block = 10; utils::wait_for_block(&[&l2geth_sequencer], target_block).await?; @@ -107,6 +124,18 @@ async fn docker_test_heterogeneous_client_sync_and_sequencer_handoff() -> Result tracing::info!("Enabling sequencing on RN sequencer"); utils::enable_automatic_sequencing(&rn_sequencer).await?; let target_block = latest_block + 10; + + // TODO: restart RN follower here + // 1. disconnect from all nodes + // 2. get latest block and other state info + // 3. stop the node + // 4. start the node + // 5. check that state is the same as before + // 6. reconnect to nodes + env.stop_container(&rn_follower).await?; + env.start_container(&rn_follower).await?; + utils::admin_add_peer(&rn_follower, &env.l2geth_sequencer_enode()?).await?; + utils::wait_for_block(&nodes, target_block).await?; utils::disable_automatic_sequencing(&rn_sequencer).await?; @@ -141,6 +170,14 @@ async fn docker_test_heterogeneous_client_sync_and_sequencer_handoff() -> Result target_block + 1, l2geth_follower.get_block_number().await? ); + // TODO: restart RN sequencer here + // 1. disconnect from all nodes + // 2. get latest block and other state info + // 3. stop the node + // 4. start the node + // 5. check that state is the same as before + // 6. reconnect to nodes + // 7. start sequencing again // Reconnect l2geth follower to l2geth sequencer and let them sync // topology: @@ -169,5 +206,13 @@ async fn docker_test_heterogeneous_client_sync_and_sequencer_handoff() -> Result utils::wait_for_block(&nodes, target_block).await?; assert_blocks_match(&nodes, target_block).await?; + utils::stop_continuous_tx_sender(stop.clone(), tx_sender).await?; + utils::stop_continuous_l1_message_sender(stop, l1_message_sender).await?; + + // Make sure l1 message queue is processed on all l2geth nodes + let q = utils::get_l1_message_index_at_finalized().await?; + utils::wait_for_l1_message_queue_index_reached(&[&l2geth_sequencer, &l2geth_follower], q) + .await?; + Ok(()) } diff --git a/tests/tests/migrate_sequencer.rs b/tests/tests/migrate_sequencer.rs index ae679d84..c4ff8c03 100644 --- a/tests/tests/migrate_sequencer.rs +++ b/tests/tests/migrate_sequencer.rs @@ -1,4 +1,6 @@ +use alloy_primitives::{address, Bytes, U256}; use eyre::Result; +use std::sync::{atomic::AtomicBool, Arc}; use tests::*; #[tokio::test] @@ -28,6 +30,21 @@ async fn docker_test_migrate_sequencer() -> Result<()> { utils::admin_add_peer(&rn_follower, &env.rn_sequencer_enode()?).await?; utils::admin_add_peer(&rn_sequencer, &env.l2geth_sequencer_enode()?).await?; + // Start single continuous transaction sender for entire test + let stop = Arc::new(AtomicBool::new(false)); + let stop_clone = stop.clone(); + let rn_follower_clone = env.get_rn_follower_provider().await.unwrap(); + let l2geth_follower_clone = env.get_l2geth_follower_provider().await.unwrap(); + let tx_sender = tokio::spawn(async move { + utils::run_continuous_tx_sender(stop_clone, &[&rn_follower_clone, &l2geth_follower_clone]) + .await + }); + let stop_clone = stop.clone(); + let l1_message_sender = + tokio::spawn(async move { utils::run_continuous_l1_message_sender(stop_clone).await }); + + tracing::info!("🔄 Started continuous L1 message and L2 transaction sender for entire test"); + // Enable block production on l2geth sequencer utils::miner_start(&l2geth_sequencer).await?; @@ -47,5 +64,13 @@ async fn docker_test_migrate_sequencer() -> Result<()> { utils::wait_for_block(&nodes, target_block).await?; utils::assert_blocks_match(&nodes, target_block).await?; + utils::stop_continuous_tx_sender(stop.clone(), tx_sender).await?; + utils::stop_continuous_l1_message_sender(stop, l1_message_sender).await?; + + // Make sure l1 message queue is processed on all l2geth nodes + let q = utils::get_l1_message_index_at_finalized().await?; + utils::wait_for_l1_message_queue_index_reached(&[&l2geth_sequencer, &l2geth_follower], q) + .await?; + Ok(()) }