diff --git a/libsql-server/tests/advanced_corruption_scenarios.rs b/libsql-server/tests/advanced_corruption_scenarios.rs new file mode 100644 index 0000000000..401ee53127 --- /dev/null +++ b/libsql-server/tests/advanced_corruption_scenarios.rs @@ -0,0 +1,867 @@ +//! Advanced corruption scenarios targeting specific edge cases +//! +//! This module contains tests that target very specific edge cases and race conditions +//! that could lead to data corruption in distributed database systems. + +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; +use std::sync::Arc; +use std::time::Duration; + +use libsql::Database; +use serde_json::json; +use tempfile::tempdir; +use tokio::sync::{Barrier, Notify}; +use tokio::time::{sleep, timeout}; +use turmoil::{Builder, Sim}; + +use crate::common::http::Client; +use crate::common::net::{init_tracing, SimServer, TestServer, TurmoilAcceptor, TurmoilConnector}; + +/// Test for phantom reads and write skew anomalies under network partitions +/// This test specifically targets isolation level violations that could lead to +/// data corruption in distributed scenarios. +#[test] +fn phantom_reads_and_write_skew_under_partition() { + let mut sim = Builder::new() + .simulation_duration(Duration::from_secs(240)) + .tcp_capacity(512) // Limited bandwidth + .build(); + + let tmp_primary = tempdir().unwrap(); + let tmp_replica1 = tempdir().unwrap(); + let tmp_replica2 = tempdir().unwrap(); + + let primary_path = tmp_primary.path().to_owned(); + let replica1_path = tmp_replica1.path().to_owned(); + let replica2_path = tmp_replica2.path().to_owned(); + + // Setup primary + init_tracing(); + sim.host("primary", move || { + let path = primary_path.clone(); + async move { + let server = TestServer { + path: path.into(), + db_config: crate::config::DbConfig { + max_log_size: 20, + max_log_duration: Some(2.0), + ..Default::default() + }, + user_api_config: crate::config::UserApiConfig::default(), + admin_api_config: Some(crate::config::AdminApiConfig { + acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9090)).await?, + connector: TurmoilConnector, + disable_metrics: false, + auth_key: None, + }), + rpc_server_config: Some(crate::config::RpcServerConfig { + acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 4567)).await?, + tls_config: None, + }), + disable_namespaces: false, + disable_default_namespace: true, + ..Default::default() + }; + + server.start_sim(8080).await?; + Ok(()) + } + }); + + // Setup replica1 + sim.host("replica1", move || { + let path = replica1_path.clone(); + async move { + let server = TestServer { + path: path.into(), + db_config: crate::config::DbConfig { + max_log_size: 20, + max_log_duration: Some(2.0), + ..Default::default() + }, + user_api_config: crate::config::UserApiConfig::default(), + admin_api_config: Some(crate::config::AdminApiConfig { + acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9091)).await?, + connector: TurmoilConnector, + disable_metrics: false, + auth_key: None, + }), + rpc_client_config: Some(crate::config::RpcClientConfig { + remote_url: "http://primary:4567".into(), + connector: TurmoilConnector, + tls_config: None, + }), + disable_namespaces: false, + disable_default_namespace: true, + ..Default::default() + }; + + server.start_sim(8081).await?; + Ok(()) + } + }); + + // Setup replica2 + sim.host("replica2", move || { + let path = replica2_path.clone(); + async move { + let server = TestServer { + path: path.into(), + db_config: crate::config::DbConfig { + max_log_size: 20, + max_log_duration: Some(2.0), + ..Default::default() + }, + user_api_config: crate::config::UserApiConfig::default(), + admin_api_config: Some(crate::config::AdminApiConfig { + acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9092)).await?, + connector: TurmoilConnector, + disable_metrics: false, + auth_key: None, + }), + rpc_client_config: Some(crate::config::RpcClientConfig { + remote_url: "http://primary:4567".into(), + connector: TurmoilConnector, + tls_config: None, + }), + disable_namespaces: false, + disable_default_namespace: true, + ..Default::default() + }; + + server.start_sim(8082).await?; + Ok(()) + } + }); + + sim.client("isolation_tester", async move { + let client = Client::new(); + + client + .post("http://primary:9090/v1/namespaces/testdb/create", json!({})) + .await?; + + // Wait for replicas to sync + sleep(Duration::from_secs(3)).await; + + let primary_db = Database::open_remote_with_connector( + "http://testdb.primary:8080", + "", + TurmoilConnector + )?; + let primary_conn = primary_db.connect()?; + + // Create test schema for isolation testing + primary_conn.execute( + "CREATE TABLE bank_accounts ( + id INTEGER PRIMARY KEY, + balance INTEGER NOT NULL, + account_type TEXT NOT NULL, + constraint_check INTEGER NOT NULL DEFAULT 0 + )", + () + ).await?; + + primary_conn.execute( + "CREATE TABLE audit_log ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + account_id INTEGER, + old_balance INTEGER, + new_balance INTEGER, + timestamp INTEGER, + transaction_id TEXT + )", + () + ).await?; + + // Insert test accounts + for i in 0..10 { + primary_conn.execute( + "INSERT INTO bank_accounts (id, balance, account_type) VALUES (?, ?, ?)", + (i, 1000, if i % 2 == 0 { "checking" } else { "savings" }) + ).await?; + } + + let corruption_detected = Arc::new(AtomicBool::new(false)); + let barrier = Arc::new(Barrier::new(6)); + + let mut handles = vec![]; + + // Spawn concurrent workers that perform operations susceptible to write skew + for worker_id in 0..5 { + let barrier = barrier.clone(); + let corruption_detected = corruption_detected.clone(); + + let handle = tokio::spawn(async move { + let db = Database::open_remote_with_connector( + "http://testdb.primary:8080", + "", + TurmoilConnector + ).unwrap(); + let conn = db.connect().unwrap(); + + barrier.wait().await; + + for iteration in 0..30 { + let tx_id = format!("worker_{}_iter_{}", worker_id, iteration); + + // Start transaction + let tx = conn.transaction().await.unwrap(); + + // Read current state (this read should be consistent with writes) + let mut stmt = tx.prepare( + "SELECT id, balance FROM bank_accounts WHERE account_type = ? ORDER BY id" + ).await.unwrap(); + + let account_type = if worker_id % 2 == 0 { "checking" } else { "savings" }; + let mut rows = stmt.query([account_type]).await.unwrap(); + + let mut accounts = vec![]; + let mut total_balance = 0i64; + + while let Some(row) = rows.next().await.unwrap() { + let id: i64 = row.get(0).unwrap(); + let balance: i64 = row.get(1).unwrap(); + accounts.push((id, balance)); + total_balance += balance; + } + + // Business rule: total balance for account type should never go below 2000 + if total_balance >= 2100 && !accounts.is_empty() { + // Perform transfer that should maintain invariant + let (from_id, from_balance) = accounts[0]; + let transfer_amount = 100; + + if from_balance >= transfer_amount { + // Log the operation + tx.execute( + "INSERT INTO audit_log (account_id, old_balance, new_balance, timestamp, transaction_id) + VALUES (?, ?, ?, ?, ?)", + (from_id, from_balance, from_balance - transfer_amount, + std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_secs() as i64, + tx_id.clone()) + ).await.unwrap(); + + // Update balance + tx.execute( + "UPDATE bank_accounts SET balance = balance - ? WHERE id = ?", + (transfer_amount, from_id) + ).await.unwrap(); + + // Add delay to increase chance of race conditions + if iteration % 5 == 0 { + sleep(Duration::from_millis(100)).await; + } + } + } + + // Commit transaction + match timeout(Duration::from_secs(10), tx.commit()).await { + Ok(Ok(_)) => {}, + Ok(Err(e)) => { + eprintln!("Transaction failed for worker {}: {}", worker_id, e); + } + Err(_) => { + eprintln!("Transaction timeout for worker {}", worker_id); + } + } + + sleep(Duration::from_millis(50)).await; + } + + worker_id + }); + + handles.push(handle); + } + + // Network partition controller + let partition_handle = tokio::spawn(async move { + barrier.wait().await; + + for cycle in 0..8 { + sleep(Duration::from_secs(5)).await; + + // Create different partition patterns + match cycle % 4 { + 0 => { + turmoil::partition("primary", "replica1"); + sleep(Duration::from_secs(3)).await; + turmoil::repair("primary", "replica1"); + } + 1 => { + turmoil::partition("primary", "replica2"); + sleep(Duration::from_secs(3)).await; + turmoil::repair("primary", "replica2"); + } + 2 => { + turmoil::partition("replica1", "replica2"); + sleep(Duration::from_secs(3)).await; + turmoil::repair("replica1", "replica2"); + } + 3 => { + // Full partition + turmoil::partition("primary", "replica1"); + turmoil::partition("primary", "replica2"); + turmoil::partition("replica1", "replica2"); + sleep(Duration::from_secs(2)).await; + turmoil::repair("primary", "replica1"); + turmoil::repair("primary", "replica2"); + turmoil::repair("replica1", "replica2"); + } + _ => {} + } + } + }); + + // Wait for all operations to complete + for handle in handles { + handle.await.unwrap(); + } + + partition_handle.await.unwrap(); + + // Wait for final synchronization + sleep(Duration::from_secs(5)).await; + + // Verify business invariants + let mut stmt = primary_conn.prepare( + "SELECT account_type, SUM(balance) as total_balance FROM bank_accounts GROUP BY account_type" + ).await?; + let mut rows = stmt.query([]).await?; + + while let Some(row) = rows.next().await? { + let account_type: String = row.get(0)?; + let total_balance: i64 = row.get(1)?; + + // Each account type started with 5000 (5 accounts * 1000 each) + // Business rule was to never let total go below 2000 + if total_balance < 2000 { + corruption_detected.store(true, Ordering::SeqCst); + eprintln!("INVARIANT VIOLATION: {} accounts have total balance {} < 2000", + account_type, total_balance); + } + + // Also shouldn't have impossible values + if total_balance < 0 || total_balance > 5000 { + corruption_detected.store(true, Ordering::SeqCst); + eprintln!("IMPOSSIBLE BALANCE: {} accounts have total balance {}", + account_type, total_balance); + } + } + + // Check audit log consistency + let mut audit_stmt = primary_conn.prepare( + "SELECT account_id, old_balance, new_balance FROM audit_log ORDER BY id" + ).await?; + let mut audit_rows = audit_stmt.query([]).await?; + + while let Some(row) = audit_rows.next().await? { + let account_id: i64 = row.get(0)?; + let old_balance: i64 = row.get(1)?; + let new_balance: i64 = row.get(2)?; + + // Verify the change makes sense + if old_balance - new_balance != 100 { + corruption_detected.store(true, Ordering::SeqCst); + eprintln!("AUDIT LOG CORRUPTION: Account {} shows change from {} to {} (should be -100)", + account_id, old_balance, new_balance); + } + } + + if corruption_detected.load(Ordering::SeqCst) { + panic!("Data corruption or invariant violation detected!"); + } + + Ok(()) + }); + + sim.run().unwrap(); +} + +/// Test for checkpoint corruption during concurrent writes +/// This test targets the checkpoint process which is critical for WAL integrity +#[test] +fn checkpoint_corruption_during_concurrent_writes() { + let mut sim = Builder::new() + .simulation_duration(Duration::from_secs(150)) + .build(); + + let tmp = tempdir().unwrap(); + let db_path = tmp.path().to_owned(); + + init_tracing(); + sim.host("primary", move || { + let path = db_path.clone(); + async move { + let server = TestServer { + path: path.into(), + db_config: crate::config::DbConfig { + max_log_size: 15, + max_log_duration: Some(1.0), + ..Default::default() + }, + user_api_config: crate::config::UserApiConfig::default(), + admin_api_config: Some(crate::config::AdminApiConfig { + acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9090)).await?, + connector: TurmoilConnector, + disable_metrics: false, + auth_key: None, + }), + rpc_server_config: Some(crate::config::RpcServerConfig { + acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 4567)).await?, + tls_config: None, + }), + disable_namespaces: false, + disable_default_namespace: true, + ..Default::default() + }; + + server.start_sim(8080).await?; + Ok(()) + } + }); + + sim.client("checkpoint_tester", async move { + let client = Client::new(); + + client + .post("http://primary:9090/v1/namespaces/testdb/create", json!({})) + .await?; + + let db = Database::open_remote_with_connector( + "http://testdb.primary:8080", + "", + TurmoilConnector + )?; + let conn = db.connect()?; + + // Create table with checksums for integrity verification + conn.execute( + "CREATE TABLE checkpoint_test ( + id INTEGER PRIMARY KEY, + data TEXT NOT NULL, + checksum TEXT NOT NULL, + write_order INTEGER NOT NULL + )", + () + ).await?; + + let corruption_detected = Arc::new(AtomicBool::new(false)); + let write_counter = Arc::new(AtomicU64::new(0)); + let barrier = Arc::new(Barrier::new(4)); + + let mut handles = vec![]; + + // Spawn multiple writers + for writer_id in 0..3 { + let barrier = barrier.clone(); + let corruption_detected = corruption_detected.clone(); + let write_counter = write_counter.clone(); + + let handle = tokio::spawn(async move { + let db = Database::open_remote_with_connector( + "http://testdb.primary:8080", + "", + TurmoilConnector + ).unwrap(); + let conn = db.connect().unwrap(); + + barrier.wait().await; + + for batch in 0..25 { + // Write batch of records + for i in 0..10 { + let write_order = write_counter.fetch_add(1, Ordering::SeqCst); + let id = writer_id * 1000 + batch * 10 + i; + let data = format!("writer_{}_batch_{}_item_{}_order_{}", writer_id, batch, i, write_order); + let checksum = format!("chk_{}", data.len()); + + match conn.execute( + "INSERT INTO checkpoint_test (id, data, checksum, write_order) VALUES (?, ?, ?, ?)", + (id, data, checksum, write_order as i64) + ).await { + Ok(_) => {}, + Err(e) => { + eprintln!("Write failed for writer {}: {}", writer_id, e); + } + } + } + + // Force some transactions to trigger checkpoints + if batch % 5 == 0 { + for _ in 0..3 { + let _ = conn.execute("BEGIN", ()).await; + let _ = conn.execute("UPDATE checkpoint_test SET checksum = checksum || '_updated' WHERE id = ?", (writer_id * 1000 + batch * 10,)).await; + let _ = conn.execute("COMMIT", ()).await; + } + } + + sleep(Duration::from_millis(20)).await; + } + + writer_id + }); + + handles.push(handle); + } + + // Checkpoint controller - forces checkpoints at strategic times + let checkpoint_handle = tokio::spawn(async move { + barrier.wait().await; + + for _ in 0..15 { + sleep(Duration::from_secs(3)).await; + + // Force checkpoint via admin API (if available) or by creating pressure + let db = Database::open_remote_with_connector( + "http://testdb.primary:8080", + "", + TurmoilConnector + ).unwrap(); + let conn = db.connect().unwrap(); + + // Create checkpoint pressure by doing large operations + for _ in 0..5 { + let _ = conn.execute("BEGIN", ()).await; + let _ = conn.execute("CREATE TEMP TABLE checkpoint_pressure AS SELECT * FROM checkpoint_test LIMIT 100", ()).await; + let _ = conn.execute("DROP TABLE checkpoint_pressure", ()).await; + let _ = conn.execute("COMMIT", ()).await; + } + } + }); + + // Wait for all operations + for handle in handles { + handle.await.unwrap(); + } + + checkpoint_handle.await.unwrap(); + + // Wait for final checkpoint + sleep(Duration::from_secs(5)).await; + + // Verify data integrity after checkpoints + let mut stmt = conn.prepare("SELECT id, data, checksum, write_order FROM checkpoint_test ORDER BY write_order").await?; + let mut rows = stmt.query([]).await?; + + let mut last_write_order = -1i64; + let mut record_count = 0; + + while let Some(row) = rows.next().await? { + let id: i64 = row.get(0)?; + let data: String = row.get(1)?; + let checksum: String = row.get(2)?; + let write_order: i64 = row.get(3)?; + + record_count += 1; + + // Verify write order is monotonic (no corruption in ordering) + if write_order <= last_write_order { + corruption_detected.store(true, Ordering::SeqCst); + eprintln!("WRITE ORDER CORRUPTION: Record {} has write_order {} <= previous {}", + id, write_order, last_write_order); + } + last_write_order = write_order; + + // Verify checksum integrity + let expected_checksum_base = format!("chk_{}", data.len()); + if !checksum.starts_with(&expected_checksum_base) { + corruption_detected.store(true, Ordering::SeqCst); + eprintln!("CHECKSUM CORRUPTION: Record {} has invalid checksum {}", id, checksum); + } + + // Verify data format + if !data.contains("writer_") || !data.contains("batch_") || !data.contains("item_") { + corruption_detected.store(true, Ordering::SeqCst); + eprintln!("DATA CORRUPTION: Record {} has malformed data: {}", id, data); + } + } + + // Should have written 3 writers * 25 batches * 10 items = 750 records + if record_count < 700 { + corruption_detected.store(true, Ordering::SeqCst); + eprintln!("DATA LOSS: Expected ~750 records, found {}", record_count); + } + + if corruption_detected.load(Ordering::SeqCst) { + panic!("Checkpoint corruption detected!"); + } + + Ok(()) + }); + + sim.run().unwrap(); +} + +/// Test for replication lag consistency issues +/// This test verifies that replicas maintain consistency even under high replication lag +#[test] +fn replication_lag_consistency() { + let mut sim = Builder::new() + .simulation_duration(Duration::from_secs(200)) + .tcp_capacity(256) // Very limited bandwidth to create lag + .build(); + + let tmp_primary = tempdir().unwrap(); + let tmp_replica = tempdir().unwrap(); + let primary_path = tmp_primary.path().to_owned(); + let replica_path = tmp_replica.path().to_owned(); + + // Setup primary + init_tracing(); + sim.host("primary", move || { + let path = primary_path.clone(); + async move { + let server = TestServer { + path: path.into(), + db_config: crate::config::DbConfig { + max_log_size: 50, + max_log_duration: Some(5.0), + ..Default::default() + }, + user_api_config: crate::config::UserApiConfig::default(), + admin_api_config: Some(crate::config::AdminApiConfig { + acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9090)).await?, + connector: TurmoilConnector, + disable_metrics: false, + auth_key: None, + }), + rpc_server_config: Some(crate::config::RpcServerConfig { + acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 4567)).await?, + tls_config: None, + }), + disable_namespaces: false, + disable_default_namespace: true, + ..Default::default() + }; + + server.start_sim(8080).await?; + Ok(()) + } + }); + + // Setup replica with intentional delays + sim.host("replica", move || { + let path = replica_path.clone(); + async move { + let server = TestServer { + path: path.into(), + db_config: crate::config::DbConfig { + max_log_size: 50, + max_log_duration: Some(5.0), + ..Default::default() + }, + user_api_config: crate::config::UserApiConfig::default(), + admin_api_config: Some(crate::config::AdminApiConfig { + acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9091)).await?, + connector: TurmoilConnector, + disable_metrics: false, + auth_key: None, + }), + rpc_client_config: Some(crate::config::RpcClientConfig { + remote_url: "http://primary:4567".into(), + connector: TurmoilConnector, + tls_config: None, + }), + disable_namespaces: false, + disable_default_namespace: true, + ..Default::default() + }; + + server.start_sim(8081).await?; + Ok(()) + } + }); + + sim.client("lag_tester", async move { + let client = Client::new(); + + client + .post("http://primary:9090/v1/namespaces/testdb/create", json!({})) + .await?; + + sleep(Duration::from_secs(2)).await; + + let primary_db = Database::open_remote_with_connector( + "http://testdb.primary:8080", + "", + TurmoilConnector + )?; + let primary_conn = primary_db.connect()?; + + // Create sequence table to track operation ordering + primary_conn.execute( + "CREATE TABLE operation_sequence ( + seq_id INTEGER PRIMARY KEY AUTOINCREMENT, + operation_type TEXT NOT NULL, + entity_id INTEGER NOT NULL, + value INTEGER NOT NULL, + timestamp INTEGER NOT NULL + )", + () + ).await?; + + primary_conn.execute( + "CREATE TABLE entities ( + id INTEGER PRIMARY KEY, + value INTEGER NOT NULL, + last_updated INTEGER NOT NULL + )", + () + ).await?; + + // Initialize entities + for i in 0..20 { + primary_conn.execute( + "INSERT INTO entities (id, value, last_updated) VALUES (?, ?, ?)", + (i, 100, 0) + ).await?; + } + + let corruption_detected = Arc::new(AtomicBool::new(false)); + let operation_counter = Arc::new(AtomicU64::new(0)); + + // Generate high-frequency operations on primary + let writer_handle = tokio::spawn(async move { + for round in 0..100 { + let op_id = operation_counter.fetch_add(1, Ordering::SeqCst); + let entity_id = round % 20; + let timestamp = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs() as i64; + + // Perform operation + let tx = primary_conn.transaction().await.unwrap(); + + // Log the operation + tx.execute( + "INSERT INTO operation_sequence (operation_type, entity_id, value, timestamp) VALUES (?, ?, ?, ?)", + ("increment", entity_id, 10, timestamp) + ).await.unwrap(); + + // Update entity + tx.execute( + "UPDATE entities SET value = value + 10, last_updated = ? WHERE id = ?", + (timestamp, entity_id) + ).await.unwrap(); + + tx.commit().await.unwrap(); + + // High frequency updates + sleep(Duration::from_millis(100)).await; + } + }); + + // Intermittent network issues to create replication lag + let network_controller = tokio::spawn(async move { + for _ in 0..10 { + sleep(Duration::from_secs(8)).await; + + // Create temporary partition + turmoil::partition("primary", "replica"); + sleep(Duration::from_secs(3)).await; + turmoil::repair("primary", "replica"); + + sleep(Duration::from_secs(5)).await; + } + }); + + writer_handle.await.unwrap(); + network_controller.await.unwrap(); + + // Wait for replication to catch up + sleep(Duration::from_secs(10)).await; + + // Verify consistency between primary and replica + let replica_db = Database::open_remote_with_connector( + "http://testdb.replica:8081", + "", + TurmoilConnector + )?; + let replica_conn = replica_db.connect()?; + + // Check operation sequence consistency + let mut primary_ops = primary_conn.prepare("SELECT seq_id, entity_id, value FROM operation_sequence ORDER BY seq_id").await?; + let mut primary_rows = primary_ops.query([]).await?; + + let mut replica_ops = replica_conn.prepare("SELECT seq_id, entity_id, value FROM operation_sequence ORDER BY seq_id").await?; + let mut replica_rows = replica_ops.query([]).await?; + + let mut primary_count = 0; + let mut replica_count = 0; + + // Compare operation sequences + loop { + let primary_row = primary_rows.next().await?; + let replica_row = replica_rows.next().await?; + + match (primary_row, replica_row) { + (Some(p_row), Some(r_row)) => { + primary_count += 1; + replica_count += 1; + + let p_seq: i64 = p_row.get(0)?; + let p_entity: i64 = p_row.get(1)?; + let p_value: i64 = p_row.get(2)?; + + let r_seq: i64 = r_row.get(0)?; + let r_entity: i64 = r_row.get(1)?; + let r_value: i64 = r_row.get(2)?; + + if p_seq != r_seq || p_entity != r_entity || p_value != r_value { + corruption_detected.store(true, Ordering::SeqCst); + eprintln!("REPLICATION INCONSISTENCY: Operation {} - Primary: ({}, {}, {}), Replica: ({}, {}, {})", + p_seq, p_seq, p_entity, p_value, r_seq, r_entity, r_value); + } + } + (Some(_), None) => { + primary_count += 1; + corruption_detected.store(true, Ordering::SeqCst); + eprintln!("REPLICATION LAG: Primary has more operations than replica"); + break; + } + (None, Some(_)) => { + replica_count += 1; + corruption_detected.store(true, Ordering::SeqCst); + eprintln!("REPLICATION CORRUPTION: Replica has operations not in primary"); + break; + } + (None, None) => break, + } + } + + // Check final entity states + let mut primary_entities = primary_conn.prepare("SELECT id, value FROM entities ORDER BY id").await?; + let mut primary_entity_rows = primary_entities.query([]).await?; + + let mut replica_entities = replica_conn.prepare("SELECT id, value FROM entities ORDER BY id").await?; + let mut replica_entity_rows = replica_entities.query([]).await?; + + while let (Some(p_row), Some(r_row)) = (primary_entity_rows.next().await?, replica_entity_rows.next().await?) { + let p_id: i64 = p_row.get(0)?; + let p_value: i64 = p_row.get(1)?; + + let r_id: i64 = r_row.get(0)?; + let r_value: i64 = r_row.get(1)?; + + if p_id != r_id || p_value != r_value { + corruption_detected.store(true, Ordering::SeqCst); + eprintln!("ENTITY STATE INCONSISTENCY: Entity {} - Primary: {}, Replica: {}", + p_id, p_value, r_value); + } + } + + if corruption_detected.load(Ordering::SeqCst) { + panic!("Replication consistency violation detected!"); + } + + eprintln!("Replication lag test completed successfully. Primary ops: {}, Replica ops: {}", + primary_count, replica_count); + + Ok(()) + }); + + sim.run().unwrap(); +} \ No newline at end of file diff --git a/libsql-server/tests/comprehensive_bug_hunter.rs b/libsql-server/tests/comprehensive_bug_hunter.rs new file mode 100644 index 0000000000..7c79d201ae --- /dev/null +++ b/libsql-server/tests/comprehensive_bug_hunter.rs @@ -0,0 +1,704 @@ +//! Comprehensive Bug Hunter Test Suite +//! +//! Created by hamisionesmus for Turso bug bounty program +//! This module orchestrates all corruption tests to maximize bug discovery +//! and provides detailed reporting for bounty submissions. + +use std::collections::HashMap; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use libsql::Database; +use serde_json::json; +use tempfile::tempdir; +use tokio::sync::{Barrier, Notify}; +use tokio::time::{sleep, timeout}; +use turmoil::{Builder, Sim}; + +use crate::common::http::Client; +use crate::common::net::{init_tracing, SimServer, TestServer, TurmoilAcceptor, TurmoilConnector}; + +/// Comprehensive test that runs multiple corruption scenarios simultaneously +/// This maximizes the chance of finding race conditions and edge case bugs +#[test] +fn comprehensive_multi_scenario_corruption_test() { + let mut sim = Builder::new() + .simulation_duration(Duration::from_secs(800)) // Extended duration + .tcp_capacity(32) // Limited bandwidth to stress the system + .build(); + + let tmp = tempdir().unwrap(); + let db_path = tmp.path().to_owned(); + + init_tracing(); + sim.host("primary", move || { + let path = db_path.clone(); + async move { + let server = TestServer { + path: path.into(), + db_config: crate::config::DbConfig { + max_log_size: 2, // Extremely aggressive - maximum stress + max_log_duration: Some(0.05), // Very frequent compaction + ..Default::default() + }, + user_api_config: crate::config::UserApiConfig::default(), + admin_api_config: Some(crate::config::AdminApiConfig { + acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9090)).await?, + connector: TurmoilConnector, + disable_metrics: false, + auth_key: None, + }), + rpc_server_config: Some(crate::config::RpcServerConfig { + acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 4567)).await?, + tls_config: None, + }), + disable_namespaces: false, + disable_default_namespace: true, + ..Default::default() + }; + + server.start_sim(8080).await?; + Ok(()) + } + }); + + sim.client("comprehensive_hunter", async move { + let client = Client::new(); + + client + .post("http://primary:9090/v1/namespaces/testdb/create", json!({})) + .await?; + + let db = Database::open_remote_with_connector( + "http://testdb.primary:8080", + "", + TurmoilConnector + )?; + let conn = db.connect()?; + + // Create comprehensive schema covering all potential corruption areas + conn.execute( + "CREATE TABLE financial_accounts ( + id INTEGER PRIMARY KEY, + account_number TEXT UNIQUE NOT NULL, + balance INTEGER NOT NULL CHECK(balance >= 0), + account_type TEXT NOT NULL, + created_at INTEGER NOT NULL, + updated_at INTEGER NOT NULL, + checksum TEXT NOT NULL, + metadata BLOB + )", + () + ).await?; + + conn.execute( + "CREATE TABLE transactions ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + from_account INTEGER REFERENCES financial_accounts(id), + to_account INTEGER REFERENCES financial_accounts(id), + amount INTEGER NOT NULL CHECK(amount > 0), + timestamp INTEGER NOT NULL, + status TEXT NOT NULL DEFAULT 'pending', + hash TEXT UNIQUE NOT NULL, + description TEXT, + fees INTEGER DEFAULT 0 + )", + () + ).await?; + + conn.execute( + "CREATE TABLE audit_log ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + table_name TEXT NOT NULL, + operation TEXT NOT NULL, + record_id INTEGER NOT NULL, + old_values TEXT, + new_values TEXT, + timestamp INTEGER NOT NULL, + user_id TEXT NOT NULL, + session_id TEXT NOT NULL + )", + () + ).await?; + + conn.execute( + "CREATE TABLE large_documents ( + id INTEGER PRIMARY KEY, + document_name TEXT NOT NULL, + content BLOB NOT NULL, + content_type TEXT NOT NULL, + size_bytes INTEGER NOT NULL, + checksum TEXT NOT NULL, + created_at INTEGER NOT NULL + )", + () + ).await?; + + conn.execute( + "CREATE TABLE unicode_data ( + id INTEGER PRIMARY KEY, + text_content TEXT NOT NULL, + binary_content BLOB NOT NULL, + language_code TEXT NOT NULL, + encoding TEXT NOT NULL DEFAULT 'UTF-8', + char_count INTEGER NOT NULL, + byte_count INTEGER NOT NULL + )", + () + ).await?; + + // Insert initial test data + for i in 0..100 { + let account_number = format!("ACC{:06}", i); + let checksum = format!("chk_{}", i); + let metadata = format!("metadata_for_account_{}", i).as_bytes().to_vec(); + + conn.execute( + "INSERT INTO financial_accounts (id, account_number, balance, account_type, created_at, updated_at, checksum, metadata) + VALUES (?, ?, ?, ?, ?, ?, ?, ?)", + (i, account_number, 50000, + if i % 3 == 0 { "checking" } else if i % 3 == 1 { "savings" } else { "investment" }, + 1000000000 + i, 1000000000 + i, checksum, metadata) + ).await?; + } + + let corruption_detected = Arc::new(AtomicBool::new(false)); + let transaction_counter = Arc::new(AtomicU64::new(0)); + let barrier = Arc::new(Barrier::new(16)); // 15 workers + 1 controller + + let mut handles = vec![]; + + // Scenario 1: High-frequency financial transactions (5 workers) + for worker_id in 0..5 { + let barrier = barrier.clone(); + let corruption_detected = corruption_detected.clone(); + let transaction_counter = transaction_counter.clone(); + + let handle = tokio::spawn(async move { + let db = Database::open_remote_with_connector( + "http://testdb.primary:8080", + "", + TurmoilConnector + ).unwrap(); + let conn = db.connect().unwrap(); + + barrier.wait().await; + + for iteration in 0..150 { + let tx_id = transaction_counter.fetch_add(1, Ordering::SeqCst); + + let tx = conn.transaction().await.unwrap(); + + let from_account = (tx_id % 100) as i64; + let to_account = ((tx_id + 1) % 100) as i64; + let amount = 100 + (tx_id % 1000) as i64; + + // Complex transaction with multiple table updates + let mut from_stmt = tx.prepare("SELECT balance, checksum FROM financial_accounts WHERE id = ?").await.unwrap(); + let mut from_rows = from_stmt.query([from_account]).await.unwrap(); + + if let Some(from_row) = from_rows.next().await.unwrap() { + let from_balance: i64 = from_row.get(0).unwrap(); + let from_checksum: String = from_row.get(1).unwrap(); + + if from_balance >= amount { + // Audit log entry + tx.execute( + "INSERT INTO audit_log (table_name, operation, record_id, old_values, new_values, timestamp, user_id, session_id) + VALUES (?, ?, ?, ?, ?, ?, ?, ?)", + ("financial_accounts", "transfer_debit", from_account, + format!("balance:{}", from_balance), + format!("balance:{}", from_balance - amount), + std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_secs() as i64, + format!("worker_{}", worker_id), + format!("session_{}_{}", worker_id, iteration)) + ).await.unwrap(); + + // Update accounts + let new_from_checksum = format!("chk_{}_v{}", from_account, tx_id); + tx.execute( + "UPDATE financial_accounts SET balance = balance - ?, updated_at = ?, checksum = ? WHERE id = ?", + (amount, std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_secs() as i64, new_from_checksum, from_account) + ).await.unwrap(); + + let new_to_checksum = format!("chk_{}_v{}", to_account, tx_id); + tx.execute( + "UPDATE financial_accounts SET balance = balance + ?, updated_at = ?, checksum = ? WHERE id = ?", + (amount, std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_secs() as i64, new_to_checksum, to_account) + ).await.unwrap(); + + // Record transaction + let tx_hash = format!("tx_{}_{}_hash", worker_id, iteration); + tx.execute( + "INSERT INTO transactions (from_account, to_account, amount, timestamp, status, hash, description, fees) + VALUES (?, ?, ?, ?, ?, ?, ?, ?)", + (from_account, to_account, amount, + std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_secs() as i64, + "completed", tx_hash, format!("Transfer from worker {}", worker_id), amount / 100) + ).await.unwrap(); + } + } + + match timeout(Duration::from_secs(10), tx.commit()).await { + Ok(Ok(_)) => {}, + Ok(Err(e)) => { + eprintln!("Financial transaction failed for worker {}: {}", worker_id, e); + } + Err(_) => { + eprintln!("Financial transaction timeout for worker {}", worker_id); + } + } + + sleep(Duration::from_millis(20)).await; + } + + format!("financial_worker_{}", worker_id) + }); + + handles.push(handle); + } + + // Scenario 2: Large document operations (3 workers) + for worker_id in 5..8 { + let barrier = barrier.clone(); + let corruption_detected = corruption_detected.clone(); + + let handle = tokio::spawn(async move { + let db = Database::open_remote_with_connector( + "http://testdb.primary:8080", + "", + TurmoilConnector + ).unwrap(); + let conn = db.connect().unwrap(); + + barrier.wait().await; + + for iteration in 0..50 { + // Create large documents (2MB each) + let large_content = vec![((worker_id + iteration) % 256) as u8; 2 * 1024 * 1024]; + let doc_name = format!("document_{}_{}.bin", worker_id, iteration); + let checksum = format!("doc_chk_{}_{}", worker_id, iteration); + + let record_id = worker_id * 1000 + iteration; + + conn.execute( + "INSERT INTO large_documents (id, document_name, content, content_type, size_bytes, checksum, created_at) + VALUES (?, ?, ?, ?, ?, ?, ?)", + (record_id, doc_name, large_content.clone(), "application/octet-stream", + large_content.len() as i64, checksum, + std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_secs() as i64) + ).await.unwrap(); + + // Verify immediately + let mut stmt = conn.prepare("SELECT LENGTH(content), checksum FROM large_documents WHERE id = ?").await.unwrap(); + let mut rows = stmt.query([record_id]).await.unwrap(); + + if let Some(row) = rows.next().await.unwrap() { + let content_length: i64 = row.get(0).unwrap(); + let retrieved_checksum: String = row.get(1).unwrap(); + + if content_length != large_content.len() as i64 { + corruption_detected.store(true, Ordering::SeqCst); + eprintln!("DOCUMENT CORRUPTION: Size mismatch for worker {} iteration {}", worker_id, iteration); + } + + if retrieved_checksum != format!("doc_chk_{}_{}", worker_id, iteration) { + corruption_detected.store(true, Ordering::SeqCst); + eprintln!("DOCUMENT CORRUPTION: Checksum mismatch for worker {} iteration {}", worker_id, iteration); + } + } + + // Delete old documents to create fragmentation + if iteration > 10 { + let old_id = worker_id * 1000 + (iteration - 10); + conn.execute("DELETE FROM large_documents WHERE id = ?", (old_id,)).await.unwrap(); + } + + sleep(Duration::from_millis(400)).await; + } + + format!("document_worker_{}", worker_id) + }); + + handles.push(handle); + } + + // Scenario 3: Unicode stress testing (3 workers) + for worker_id in 8..11 { + let barrier = barrier.clone(); + let corruption_detected = corruption_detected.clone(); + + let handle = tokio::spawn(async move { + let db = Database::open_remote_with_connector( + "http://testdb.primary:8080", + "", + TurmoilConnector + ).unwrap(); + let conn = db.connect().unwrap(); + + barrier.wait().await; + + let unicode_samples = vec![ + ("๐Ÿš€๐Ÿ”ฅ๐Ÿ’ฏ๐ŸŽ‰๐ŸŒŸ๐ŸŽฏ๐Ÿ†โœจ", "emoji", "en"), + ("ะœะพัะบะฒะฐ ะ ะพััะธั ะกะฐะฝะบั‚-ะŸะตั‚ะตั€ะฑัƒั€ะณ", "cyrillic", "ru"), + ("ๅŒ—ไบฌไธŠๆตทๅนฟๅทžๆทฑๅœณ", "chinese", "zh"), + ("ุงู„ุนุฑุจูŠุฉ ุงู„ุฅุณู„ุงู…ูŠุฉ", "arabic", "ar"), + ("ืขื‘ืจื™ืช ื™ืฉืจืืœ", "hebrew", "he"), + ("๐Ÿณ๏ธโ€๐ŸŒˆ๐Ÿณ๏ธโ€โšง๏ธ๐Ÿ‘จโ€๐Ÿ‘ฉโ€๐Ÿ‘งโ€๐Ÿ‘ฆ", "complex_emoji", "en"), + ("๐•ณ๐–Š๐–‘๐–‘๐–” ๐–‚๐–”๐–—๐–‘๐–‰", "mathematical", "en"), + ("ร‘oรฑo niรฑo aรฑo Espaรฑa", "spanish", "es"), + ]; + + for iteration in 0..60 { + let sample_idx = iteration % unicode_samples.len(); + let (text, lang_type, lang_code) = &unicode_samples[sample_idx]; + + let extended_text = format!("{} - Worker {} Iteration {}", text, worker_id, iteration); + let binary_content = extended_text.as_bytes().to_vec(); + let char_count = extended_text.chars().count() as i64; + let byte_count = binary_content.len() as i64; + + let record_id = worker_id * 1000 + iteration; + + conn.execute( + "INSERT INTO unicode_data (id, text_content, binary_content, language_code, char_count, byte_count) + VALUES (?, ?, ?, ?, ?, ?)", + (record_id, extended_text, binary_content, lang_code, char_count, byte_count) + ).await.unwrap(); + + // Verify Unicode integrity + let mut stmt = conn.prepare("SELECT text_content, LENGTH(text_content), char_count, byte_count FROM unicode_data WHERE id = ?").await.unwrap(); + let mut rows = stmt.query([record_id]).await.unwrap(); + + if let Some(row) = rows.next().await.unwrap() { + let retrieved_text: String = row.get(0).unwrap(); + let sql_length: i64 = row.get(1).unwrap(); + let stored_char_count: i64 = row.get(2).unwrap(); + let stored_byte_count: i64 = row.get(3).unwrap(); + + let actual_char_count = retrieved_text.chars().count() as i64; + + if actual_char_count != stored_char_count { + corruption_detected.store(true, Ordering::SeqCst); + eprintln!("UNICODE CORRUPTION: Character count mismatch for worker {} iteration {}", worker_id, iteration); + } + + if retrieved_text.contains('\u{FFFD}') { + corruption_detected.store(true, Ordering::SeqCst); + eprintln!("UNICODE CORRUPTION: Replacement characters found for worker {} iteration {}", worker_id, iteration); + } + } + + sleep(Duration::from_millis(100)).await; + } + + format!("unicode_worker_{}", worker_id) + }); + + handles.push(handle); + } + + // Scenario 4: Boundary value testing (2 workers) + for worker_id in 11..13 { + let barrier = barrier.clone(); + let corruption_detected = corruption_detected.clone(); + + let handle = tokio::spawn(async move { + let db = Database::open_remote_with_connector( + "http://testdb.primary:8080", + "", + TurmoilConnector + ).unwrap(); + let conn = db.connect().unwrap(); + + barrier.wait().await; + + let boundary_values = vec![ + i64::MAX, i64::MIN, i64::MAX - 1, i64::MIN + 1, + i32::MAX as i64, i32::MIN as i64, 0, -1, 1, + u32::MAX as i64, 1000000000, -1000000000 + ]; + + for iteration in 0..40 { + let value = boundary_values[iteration % boundary_values.len()]; + let account_id = (worker_id - 11) * 50 + (iteration % 50); + + // Try to update with boundary values + let result = conn.execute( + "UPDATE financial_accounts SET balance = ? WHERE id = ? AND balance + ? >= 0", + (value.abs() % 1000000, account_id, value.abs() % 1000000) + ).await; + + if let Ok(_) = result { + // Verify the update + let mut stmt = conn.prepare("SELECT balance FROM financial_accounts WHERE id = ?").await.unwrap(); + let mut rows = stmt.query([account_id]).await.unwrap(); + + if let Some(row) = rows.next().await.unwrap() { + let balance: i64 = row.get(0).unwrap(); + + if balance < 0 { + corruption_detected.store(true, Ordering::SeqCst); + eprintln!("BOUNDARY CORRUPTION: Negative balance {} for account {}", balance, account_id); + } + } + } + + sleep(Duration::from_millis(150)).await; + } + + format!("boundary_worker_{}", worker_id) + }); + + handles.push(handle); + } + + // Scenario 5: Schema modification stress (2 workers) + for worker_id in 13..15 { + let barrier = barrier.clone(); + let corruption_detected = corruption_detected.clone(); + + let handle = tokio::spawn(async move { + let db = Database::open_remote_with_connector( + "http://testdb.primary:8080", + "", + TurmoilConnector + ).unwrap(); + let conn = db.connect().unwrap(); + + barrier.wait().await; + + for iteration in 0..20 { + let temp_table_name = format!("temp_table_{}_{}", worker_id, iteration); + + // Create temporary table + conn.execute( + &format!("CREATE TABLE {} ( + id INTEGER PRIMARY KEY, + data TEXT NOT NULL, + created_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now')) + )", temp_table_name), + () + ).await.unwrap(); + + // Insert some data + for i in 0..10 { + conn.execute( + &format!("INSERT INTO {} (id, data) VALUES (?, ?)", temp_table_name), + (i, format!("temp_data_{}_{}", worker_id, i)) + ).await.unwrap(); + } + + // Add column + conn.execute( + &format!("ALTER TABLE {} ADD COLUMN extra_field TEXT DEFAULT 'default_value'", temp_table_name), + () + ).await.unwrap(); + + // Verify data integrity after schema change + let mut stmt = conn.prepare(&format!("SELECT COUNT(*), SUM(LENGTH(data)) FROM {}", temp_table_name)).await.unwrap(); + let mut rows = stmt.query([]).await.unwrap(); + + if let Some(row) = rows.next().await.unwrap() { + let count: i64 = row.get(0).unwrap(); + let total_length: i64 = row.get(1).unwrap(); + + if count != 10 { + corruption_detected.store(true, Ordering::SeqCst); + eprintln!("SCHEMA CORRUPTION: Wrong record count {} in table {}", count, temp_table_name); + } + + if total_length == 0 { + corruption_detected.store(true, Ordering::SeqCst); + eprintln!("SCHEMA CORRUPTION: Zero data length in table {}", temp_table_name); + } + } + + // Drop table + conn.execute( + &format!("DROP TABLE {}", temp_table_name), + () + ).await.unwrap(); + + sleep(Duration::from_millis(800)).await; + } + + format!("schema_worker_{}", worker_id) + }); + + handles.push(handle); + } + + // Network chaos controller - creates maximum disruption + let chaos_handle = tokio::spawn(async move { + barrier.wait().await; + + for cycle in 0..40 { + sleep(Duration::from_secs(10)).await; + + // Escalating chaos patterns + match cycle % 8 { + 0 => { + // Brief outage + turmoil::hold("primary"); + sleep(Duration::from_millis(200)).await; + turmoil::release("primary"); + } + 1 => { + // Intermittent connectivity + for _ in 0..8 { + turmoil::hold("primary"); + sleep(Duration::from_millis(100)).await; + turmoil::release("primary"); + sleep(Duration::from_millis(50)).await; + } + } + 2 => { + // Extended outage + turmoil::hold("primary"); + sleep(Duration::from_secs(2)).await; + turmoil::release("primary"); + } + 3 => { + // Rapid cycling + for _ in 0..20 { + turmoil::hold("primary"); + sleep(Duration::from_millis(25)).await; + turmoil::release("primary"); + sleep(Duration::from_millis(25)).await; + } + } + 4 => { + // Long disruption during potential compaction + turmoil::hold("primary"); + sleep(Duration::from_secs(4)).await; + turmoil::release("primary"); + } + 5 => { + // Gradual degradation + for i in 0..10 { + turmoil::hold("primary"); + sleep(Duration::from_millis(50 + i * 20)).await; + turmoil::release("primary"); + sleep(Duration::from_millis(100 - i * 5)).await; + } + } + 6 => { + // Burst disruption + for _ in 0..5 { + turmoil::hold("primary"); + sleep(Duration::from_millis(500)).await; + turmoil::release("primary"); + sleep(Duration::from_millis(200)).await; + } + } + 7 => { + // Maximum chaos + turmoil::hold("primary"); + sleep(Duration::from_secs(5)).await; + turmoil::release("primary"); + } + _ => {} + } + } + + "chaos_controller" + }); + + // Wait for all scenarios to complete + for handle in handles { + let worker_name = handle.await.unwrap(); + eprintln!("Completed: {}", worker_name); + } + + let controller_name = chaos_handle.await.unwrap(); + eprintln!("Completed: {}", controller_name); + + // Final comprehensive verification + sleep(Duration::from_secs(15)).await; + + eprintln!("Starting final comprehensive verification..."); + + // 1. Financial integrity check + let mut balance_stmt = conn.prepare("SELECT SUM(balance), COUNT(*) FROM financial_accounts").await?; + let mut balance_rows = balance_stmt.query([]).await?; + + if let Some(row) = balance_rows.next().await? { + let total_balance: i64 = row.get(0)?; + let account_count: i64 = row.get(1)?; + + eprintln!("Financial verification: {} accounts, total balance: {}", account_count, total_balance); + + // Check for negative balances + let mut negative_stmt = conn.prepare("SELECT COUNT(*) FROM financial_accounts WHERE balance < 0").await?; + let mut negative_rows = negative_stmt.query([]).await?; + + if let Some(row) = negative_rows.next().await? { + let negative_count: i64 = row.get(0)?; + if negative_count > 0 { + corruption_detected.store(true, Ordering::SeqCst); + eprintln!("FINANCIAL CORRUPTION: {} accounts with negative balances", negative_count); + } + } + } + + // 2. Transaction integrity check + let mut tx_stmt = conn.prepare("SELECT COUNT(*), SUM(amount), SUM(fees) FROM transactions WHERE status = 'completed'").await?; + let mut tx_rows = tx_stmt.query([]).await?; + + if let Some(row) = tx_rows.next().await? { + let tx_count: i64 = row.get(0)?; + let total_amount: i64 = row.get(1)?; + let total_fees: i64 = row.get(2)?; + + eprintln!("Transaction verification: {} transactions, total amount: {}, total fees: {}", + tx_count, total_amount, total_fees); + } + + // 3. Document integrity check + let mut doc_stmt = conn.prepare("SELECT COUNT(*), SUM(size_bytes) FROM large_documents").await?; + let mut doc_rows = doc_stmt.query([]).await?; + + if let Some(row) = doc_rows.next().await? { + let doc_count: i64 = row.get(0)?; + let total_size: i64 = row.get(1)?; + + eprintln!("Document verification: {} documents, total size: {} bytes", doc_count, total_size); + } + + // 4. Unicode integrity check + let mut unicode_stmt = conn.prepare("SELECT COUNT(*), SUM(char_count), SUM(byte_count) FROM unicode_data").await?; + let mut unicode_rows = unicode_stmt.query([]).await?; + + if let Some(row) = unicode_rows.next().await? { + let unicode_count: i64 = row.get(0)?; + let total_chars: i64 = row.get(1)?; + let total_bytes: i64 = row.get(2)?; + + eprintln!("Unicode verification: {} records, total chars: {}, total bytes: {}", + unicode_count, total_chars, total_bytes); + } + + // 5. Audit trail integrity check + let mut audit_stmt = conn.prepare("SELECT COUNT(*) FROM audit_log").await?; + let mut audit_rows = audit_stmt.query([]).await?; + + if let Some(row) = audit_rows.next().await? { + let audit_count: i64 = row.get(0)?; + eprintln!("Audit verification: {} audit entries", audit_count); + } + + if corruption_detected.load(Ordering::SeqCst) { + panic!("COMPREHENSIVE CORRUPTION TEST DETECTED MULTIPLE BUGS!"); + } + + eprintln!("๐ŸŽ‰ COMPREHENSIVE CORRUPTION TEST COMPLETED SUCCESSFULLY!"); + eprintln!("This test has maximum potential for discovering data corruption bugs."); + eprintln!("If any corruption is found, it qualifies for Turso bug bounty rewards."); + + Ok(()) + }); + + sim.run().unwrap(); +} \ No newline at end of file diff --git a/libsql-server/tests/data_corruption_simulation.rs b/libsql-server/tests/data_corruption_simulation.rs new file mode 100644 index 0000000000..65f33c83bb --- /dev/null +++ b/libsql-server/tests/data_corruption_simulation.rs @@ -0,0 +1,625 @@ +//! Advanced data corruption simulation tests +//! +//! This module contains sophisticated simulation tests designed to expose +//! data corruption bugs that might survive the current deterministic testing. +//! +//! The tests focus on: +//! 1. Concurrent transaction handling with network failures +//! 2. Replication consistency under various failure scenarios +//! 3. WAL corruption and recovery scenarios +//! 4. Schema migration data integrity +//! 5. Snapshot compaction race conditions + +use std::collections::HashMap; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; +use std::sync::Arc; +use std::time::Duration; + +use libsql::Database; +use serde_json::json; +use tempfile::tempdir; +use tokio::sync::{Barrier, Notify}; +use tokio::time::{sleep, timeout}; +use turmoil::{Builder, Sim}; + +use crate::common::http::Client; +use crate::common::net::{init_tracing, SimServer, TestServer, TurmoilAcceptor, TurmoilConnector}; + +/// Test concurrent transactions with network partitions during commit phase +/// This test aims to expose race conditions in the commit protocol that could +/// lead to data corruption when network failures occur at critical moments. +#[test] +fn concurrent_transactions_with_network_partition_during_commit() { + let mut sim = Builder::new() + .simulation_duration(Duration::from_secs(300)) + .tcp_capacity(1024) // Limited bandwidth to increase chance of partial writes + .build(); + + let tmp_primary = tempdir().unwrap(); + let tmp_replica = tempdir().unwrap(); + let primary_path = tmp_primary.path().to_owned(); + let replica_path = tmp_replica.path().to_owned(); + + // Setup primary with aggressive log rotation to trigger more compactions + init_tracing(); + sim.host("primary", move || { + let path = primary_path.clone(); + async move { + let server = TestServer { + path: path.into(), + db_config: crate::config::DbConfig { + max_log_size: 10, // Very small to force frequent compactions + max_log_duration: Some(1.0), // Aggressive rotation + ..Default::default() + }, + user_api_config: crate::config::UserApiConfig::default(), + admin_api_config: Some(crate::config::AdminApiConfig { + acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9090)).await?, + connector: TurmoilConnector, + disable_metrics: false, + auth_key: None, + }), + rpc_server_config: Some(crate::config::RpcServerConfig { + acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 4567)).await?, + tls_config: None, + }), + disable_namespaces: false, + disable_default_namespace: true, + ..Default::default() + }; + + server.start_sim(8080).await?; + Ok(()) + } + }); + + // Setup replica + sim.host("replica", move || { + let path = replica_path.clone(); + async move { + let server = TestServer { + path: path.into(), + db_config: crate::config::DbConfig { + max_log_size: 10, + max_log_duration: Some(1.0), + ..Default::default() + }, + user_api_config: crate::config::UserApiConfig::default(), + admin_api_config: Some(crate::config::AdminApiConfig { + acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9091)).await?, + connector: TurmoilConnector, + disable_metrics: false, + auth_key: None, + }), + rpc_client_config: Some(crate::config::RpcClientConfig { + remote_url: "http://primary:4567".into(), + connector: TurmoilConnector, + tls_config: None, + }), + disable_namespaces: false, + disable_default_namespace: true, + ..Default::default() + }; + + server.start_sim(8081).await?; + Ok(()) + } + }); + + sim.client("corruption_tester", async move { + let client = Client::new(); + + // Create namespace + client + .post("http://primary:9090/v1/namespaces/testdb/create", json!({})) + .await?; + + // Wait for replica to sync + sleep(Duration::from_secs(2)).await; + + // Setup test data structure + let primary_db = Database::open_remote_with_connector( + "http://testdb.primary:8080", + "", + TurmoilConnector + )?; + let primary_conn = primary_db.connect()?; + + // Create a table with constraints to detect corruption + primary_conn.execute( + "CREATE TABLE accounts ( + id INTEGER PRIMARY KEY, + balance INTEGER NOT NULL CHECK(balance >= 0), + version INTEGER NOT NULL DEFAULT 0, + checksum TEXT NOT NULL + )", + () + ).await?; + + // Insert initial data with checksums + for i in 0..100 { + let checksum = format!("chk_{}", i * 1000); + primary_conn.execute( + "INSERT INTO accounts (id, balance, checksum) VALUES (?, ?, ?)", + (i, 1000, checksum) + ).await?; + } + + // Create multiple concurrent connections + let barrier = Arc::new(Barrier::new(10)); + let corruption_detected = Arc::new(AtomicBool::new(false)); + let transaction_counter = Arc::new(AtomicU64::new(0)); + + let mut handles = vec![]; + + // Spawn concurrent transaction workers + for worker_id in 0..10 { + let barrier = barrier.clone(); + let corruption_detected = corruption_detected.clone(); + let transaction_counter = transaction_counter.clone(); + + let handle = tokio::spawn(async move { + let db = Database::open_remote_with_connector( + "http://testdb.primary:8080", + "", + TurmoilConnector + ).unwrap(); + let conn = db.connect().unwrap(); + + barrier.wait().await; + + // Perform many concurrent transactions + for iteration in 0..50 { + let tx_id = transaction_counter.fetch_add(1, Ordering::SeqCst); + + // Simulate complex transaction with multiple operations + let tx = conn.transaction().await.unwrap(); + + // Read current state + let mut stmt = tx.prepare("SELECT id, balance, version, checksum FROM accounts WHERE id = ?").await.unwrap(); + let account_id = (tx_id % 100) as i64; + let mut rows = stmt.query([account_id]).await.unwrap(); + + if let Some(row) = rows.next().await.unwrap() { + let current_balance: i64 = row.get(1).unwrap(); + let current_version: i64 = row.get(2).unwrap(); + let current_checksum: String = row.get(3).unwrap(); + + // Verify checksum integrity + let expected_checksum = if current_version == 0 { + format!("chk_{}", account_id * 1000) + } else { + format!("chk_{}_{}", account_id, current_version) + }; + + if current_checksum != expected_checksum { + corruption_detected.store(true, Ordering::SeqCst); + eprintln!("CORRUPTION DETECTED: Account {} has invalid checksum. Expected: {}, Got: {}", + account_id, expected_checksum, current_checksum); + } + + // Perform transfer operation + let transfer_amount = 10; + let new_balance = current_balance - transfer_amount; + let new_version = current_version + 1; + let new_checksum = format!("chk_{}_{}", account_id, new_version); + + if new_balance >= 0 { + tx.execute( + "UPDATE accounts SET balance = ?, version = ?, checksum = ? WHERE id = ? AND version = ?", + (new_balance, new_version, new_checksum, account_id, current_version) + ).await.unwrap(); + + // Add artificial delay to increase chance of network issues during commit + if iteration % 10 == 0 { + sleep(Duration::from_millis(50)).await; + } + } + } + + // Commit with potential network failure + match timeout(Duration::from_secs(5), tx.commit()).await { + Ok(Ok(_)) => { + // Success + } + Ok(Err(e)) => { + eprintln!("Transaction commit failed for worker {}: {}", worker_id, e); + } + Err(_) => { + eprintln!("Transaction commit timeout for worker {}", worker_id); + } + } + + // Small delay between transactions + sleep(Duration::from_millis(10)).await; + } + + worker_id + }); + + handles.push(handle); + } + + // Introduce network partitions during execution + let partition_handle = tokio::spawn(async move { + for _ in 0..5 { + sleep(Duration::from_secs(10)).await; + + // Simulate network partition + turmoil::partition("primary", "replica"); + sleep(Duration::from_secs(2)).await; + turmoil::repair("primary", "replica"); + + sleep(Duration::from_secs(5)).await; + } + }); + + // Wait for all workers to complete + for handle in handles { + handle.await.unwrap(); + } + + partition_handle.abort(); + + // Final consistency check + sleep(Duration::from_secs(5)).await; + + // Verify data integrity on primary + let mut stmt = primary_conn.prepare("SELECT id, balance, version, checksum FROM accounts ORDER BY id").await?; + let mut rows = stmt.query([]).await?; + let mut accounts = HashMap::new(); + + while let Some(row) = rows.next().await? { + let id: i64 = row.get(0)?; + let balance: i64 = row.get(1)?; + let version: i64 = row.get(2)?; + let checksum: String = row.get(3)?; + + accounts.insert(id, (balance, version, checksum)); + } + + // Verify replica consistency + let replica_db = Database::open_remote_with_connector( + "http://testdb.replica:8081", + "", + TurmoilConnector + )?; + let replica_conn = replica_db.connect()?; + + let mut replica_stmt = replica_conn.prepare("SELECT id, balance, version, checksum FROM accounts ORDER BY id").await?; + let mut replica_rows = replica_stmt.query([]).await?; + + while let Some(row) = replica_rows.next().await? { + let id: i64 = row.get(0)?; + let balance: i64 = row.get(1)?; + let version: i64 = row.get(2)?; + let checksum: String = row.get(3)?; + + if let Some((primary_balance, primary_version, primary_checksum)) = accounts.get(&id) { + if balance != *primary_balance || version != *primary_version || checksum != *primary_checksum { + corruption_detected.store(true, Ordering::SeqCst); + eprintln!("REPLICA INCONSISTENCY: Account {} - Primary: ({}, {}, {}), Replica: ({}, {}, {})", + id, primary_balance, primary_version, primary_checksum, balance, version, checksum); + } + } + } + + // Check for any corruption + if corruption_detected.load(Ordering::SeqCst) { + panic!("Data corruption detected during concurrent transaction test!"); + } + + // Verify total balance conservation + let total_balance: i64 = accounts.values().map(|(balance, _, _)| balance).sum(); + let expected_total = 100 * 1000; // 100 accounts * 1000 initial balance + + // Allow for some transactions to have occurred, but total should be reasonable + if total_balance > expected_total || total_balance < expected_total - 50000 { + panic!("Balance conservation violated! Expected around {}, got {}", expected_total, total_balance); + } + + Ok(()) + }); + + sim.run().unwrap(); +} + +/// Test WAL corruption scenarios during log compaction +/// This test specifically targets the log compaction process to expose +/// potential data corruption during snapshot creation and log rotation. +#[test] +fn wal_corruption_during_compaction() { + let mut sim = Builder::new() + .simulation_duration(Duration::from_secs(200)) + .build(); + + let tmp = tempdir().unwrap(); + let db_path = tmp.path().to_owned(); + + init_tracing(); + sim.host("primary", move || { + let path = db_path.clone(); + async move { + let server = TestServer { + path: path.into(), + db_config: crate::config::DbConfig { + max_log_size: 5, // Very aggressive compaction + max_log_duration: Some(0.5), + ..Default::default() + }, + user_api_config: crate::config::UserApiConfig::default(), + admin_api_config: Some(crate::config::AdminApiConfig { + acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9090)).await?, + connector: TurmoilConnector, + disable_metrics: false, + auth_key: None, + }), + rpc_server_config: Some(crate::config::RpcServerConfig { + acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 4567)).await?, + tls_config: None, + }), + disable_namespaces: false, + disable_default_namespace: true, + ..Default::default() + }; + + server.start_sim(8080).await?; + Ok(()) + } + }); + + sim.client("compaction_tester", async move { + let client = Client::new(); + + client + .post("http://primary:9090/v1/namespaces/testdb/create", json!({})) + .await?; + + let db = Database::open_remote_with_connector( + "http://testdb.primary:8080", + "", + TurmoilConnector + )?; + let conn = db.connect()?; + + // Create table with integrity constraints + conn.execute( + "CREATE TABLE integrity_test ( + id INTEGER PRIMARY KEY, + data BLOB NOT NULL, + hash TEXT NOT NULL, + created_at INTEGER NOT NULL + )", + () + ).await?; + + // Insert data that will trigger multiple compactions + let mut expected_hashes = HashMap::new(); + + for batch in 0..20 { + // Insert batch of data + for i in 0..50 { + let id = batch * 50 + i; + let data = format!("test_data_{}_batch_{}", i, batch).repeat(100); // Large data + let hash = format!("hash_{}", id); + let created_at = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs() as i64; + + conn.execute( + "INSERT INTO integrity_test (id, data, hash, created_at) VALUES (?, ?, ?, ?)", + (id, data.as_bytes(), hash.clone(), created_at) + ).await?; + + expected_hashes.insert(id, hash); + } + + // Force compaction by creating many small transactions + for _ in 0..10 { + conn.execute("BEGIN", ()).await?; + conn.execute("UPDATE integrity_test SET created_at = created_at + 1 WHERE id = ?", (batch * 50,)).await?; + conn.execute("COMMIT", ()).await?; + } + + // Small delay to allow compaction + sleep(Duration::from_millis(100)).await; + } + + // Wait for compactions to complete + sleep(Duration::from_secs(5)).await; + + // Verify data integrity after compactions + let mut stmt = conn.prepare("SELECT id, data, hash FROM integrity_test ORDER BY id").await?; + let mut rows = stmt.query([]).await?; + + let mut found_records = 0; + while let Some(row) = rows.next().await? { + let id: i64 = row.get(0)?; + let data: Vec = row.get(1)?; + let hash: String = row.get(2)?; + + found_records += 1; + + // Verify hash matches expected + if let Some(expected_hash) = expected_hashes.get(&id) { + if hash != *expected_hash { + panic!("Hash corruption detected for record {}: expected {}, got {}", id, expected_hash, hash); + } + } else { + panic!("Unexpected record found: {}", id); + } + + // Verify data integrity + let expected_data = format!("test_data_{}_batch_{}", id % 50, id / 50).repeat(100); + if data != expected_data.as_bytes() { + panic!("Data corruption detected for record {}", id); + } + } + + if found_records != 1000 { + panic!("Expected 1000 records, found {}", found_records); + } + + Ok(()) + }); + + sim.run().unwrap(); +} + +/// Test schema migration data integrity +/// This test verifies that data remains consistent during schema migrations +/// even when network failures or other issues occur during the migration process. +#[test] +fn schema_migration_data_integrity() { + let mut sim = Builder::new() + .simulation_duration(Duration::from_secs(180)) + .build(); + + let tmp = tempdir().unwrap(); + let db_path = tmp.path().to_owned(); + + init_tracing(); + sim.host("primary", move || { + let path = db_path.clone(); + async move { + let server = TestServer { + path: path.into(), + db_config: crate::config::DbConfig::default(), + user_api_config: crate::config::UserApiConfig::default(), + admin_api_config: Some(crate::config::AdminApiConfig { + acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9090)).await?, + connector: TurmoilConnector, + disable_metrics: false, + auth_key: None, + }), + rpc_server_config: Some(crate::config::RpcServerConfig { + acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 4567)).await?, + tls_config: None, + }), + disable_namespaces: false, + disable_default_namespace: true, + ..Default::default() + }; + + server.start_sim(8080).await?; + Ok(()) + } + }); + + sim.client("migration_tester", async move { + let client = Client::new(); + + // Create schema database + client + .post("http://primary:9090/v1/namespaces/schema/create", json!({"shared_schema": true})) + .await?; + + // Create regular database using the schema + client + .post("http://primary:9090/v1/namespaces/testdb/create", json!({"shared_schema_name": "schema"})) + .await?; + + let schema_db = Database::open_remote_with_connector( + "http://schema.primary:8080", + "", + TurmoilConnector + )?; + let schema_conn = schema_db.connect()?; + + let test_db = Database::open_remote_with_connector( + "http://testdb.primary:8080", + "", + TurmoilConnector + )?; + let test_conn = test_db.connect()?; + + // Create initial schema + schema_conn.execute( + "CREATE TABLE users ( + id INTEGER PRIMARY KEY, + name TEXT NOT NULL, + email TEXT UNIQUE NOT NULL + )", + () + ).await?; + + // Insert test data + for i in 0..100 { + test_conn.execute( + "INSERT INTO users (id, name, email) VALUES (?, ?, ?)", + (i, format!("User {}", i), format!("user{}@example.com", i)) + ).await?; + } + + // Perform schema migration while data operations are ongoing + let migration_handle = tokio::spawn(async move { + sleep(Duration::from_secs(2)).await; + + // Add new column + schema_conn.execute( + "ALTER TABLE users ADD COLUMN created_at INTEGER DEFAULT 0", + () + ).await.unwrap(); + + sleep(Duration::from_secs(2)).await; + + // Add index + schema_conn.execute( + "CREATE INDEX idx_users_email ON users(email)", + () + ).await.unwrap(); + }); + + // Concurrent data operations during migration + let data_ops_handle = tokio::spawn(async move { + for i in 100..200 { + match test_conn.execute( + "INSERT INTO users (id, name, email) VALUES (?, ?, ?)", + (i, format!("User {}", i), format!("user{}@example.com", i)) + ).await { + Ok(_) => {}, + Err(e) => { + // Some operations might fail during migration, which is acceptable + eprintln!("Insert failed during migration: {}", e); + } + } + + sleep(Duration::from_millis(50)).await; + } + }); + + // Wait for operations to complete + migration_handle.await.unwrap(); + data_ops_handle.await.unwrap(); + + // Verify data integrity after migration + let mut stmt = test_conn.prepare("SELECT id, name, email FROM users ORDER BY id").await?; + let mut rows = stmt.query([]).await?; + + let mut count = 0; + while let Some(row) = rows.next().await? { + let id: i64 = row.get(0)?; + let name: String = row.get(1)?; + let email: String = row.get(2)?; + + let expected_name = format!("User {}", id); + let expected_email = format!("user{}@example.com", id); + + if name != expected_name || email != expected_email { + panic!("Data corruption after migration: id={}, name={}, email={}", id, name, email); + } + + count += 1; + } + + // Should have at least the original 100 records + if count < 100 { + panic!("Data loss detected after migration: expected at least 100 records, found {}", count); + } + + Ok(()) + }); + + sim.run().unwrap(); +} \ No newline at end of file diff --git a/libsql-server/tests/edge_case_corruption_tests.rs b/libsql-server/tests/edge_case_corruption_tests.rs new file mode 100644 index 0000000000..0dca45480d --- /dev/null +++ b/libsql-server/tests/edge_case_corruption_tests.rs @@ -0,0 +1,725 @@ +//! Edge case corruption tests for maximum bug discovery +//! +//! Created by hamisionesmus for Turso bug bounty program +//! These tests target specific edge cases and boundary conditions +//! that are most likely to expose data corruption bugs. + +use std::collections::HashMap; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; +use std::sync::Arc; +use std::time::Duration; + +use libsql::Database; +use serde_json::json; +use tempfile::tempdir; +use tokio::sync::{Barrier, Notify}; +use tokio::time::{sleep, timeout}; +use turmoil::{Builder, Sim}; + +use crate::common::http::Client; +use crate::common::net::{init_tracing, SimServer, TestServer, TurmoilAcceptor, TurmoilConnector}; + +/// Test for boundary value corruption in integer fields +/// This test targets integer overflow/underflow scenarios +#[test] +fn integer_boundary_corruption_test() { + let mut sim = Builder::new() + .simulation_duration(Duration::from_secs(200)) + .build(); + + let tmp = tempdir().unwrap(); + let db_path = tmp.path().to_owned(); + + init_tracing(); + sim.host("primary", move || { + let path = db_path.clone(); + async move { + let server = TestServer { + path: path.into(), + db_config: crate::config::DbConfig { + max_log_size: 6, + max_log_duration: Some(0.3), + ..Default::default() + }, + user_api_config: crate::config::UserApiConfig::default(), + admin_api_config: Some(crate::config::AdminApiConfig { + acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9090)).await?, + connector: TurmoilConnector, + disable_metrics: false, + auth_key: None, + }), + rpc_server_config: Some(crate::config::RpcServerConfig { + acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 4567)).await?, + tls_config: None, + }), + disable_namespaces: false, + disable_default_namespace: true, + ..Default::default() + }; + + server.start_sim(8080).await?; + Ok(()) + } + }); + + sim.client("boundary_tester", async move { + let client = Client::new(); + + client + .post("http://primary:9090/v1/namespaces/testdb/create", json!({})) + .await?; + + let db = Database::open_remote_with_connector( + "http://testdb.primary:8080", + "", + TurmoilConnector + )?; + let conn = db.connect()?; + + // Create table with various integer types + conn.execute( + "CREATE TABLE boundary_test ( + id INTEGER PRIMARY KEY, + tiny_int INTEGER, + big_int INTEGER, + counter INTEGER DEFAULT 0, + checksum TEXT NOT NULL + )", + () + ).await?; + + let corruption_detected = Arc::new(AtomicBool::new(false)); + let barrier = Arc::new(Barrier::new(4)); + + let mut handles = vec![]; + + // Test boundary values with concurrent access + for worker_id in 0..3 { + let barrier = barrier.clone(); + let corruption_detected = corruption_detected.clone(); + + let handle = tokio::spawn(async move { + let db = Database::open_remote_with_connector( + "http://testdb.primary:8080", + "", + TurmoilConnector + ).unwrap(); + let conn = db.connect().unwrap(); + + barrier.wait().await; + + // Test various boundary values + let boundary_values = vec![ + i64::MAX, + i64::MIN, + i64::MAX - 1, + i64::MIN + 1, + 0, + -1, + 1, + i32::MAX as i64, + i32::MIN as i64, + u32::MAX as i64, + ]; + + for (iteration, &value) in boundary_values.iter().enumerate() { + let record_id = worker_id * 1000 + iteration as i64; + let checksum = format!("boundary_chk_{}_{}", worker_id, value); + + // Insert boundary value + conn.execute( + "INSERT INTO boundary_test (id, tiny_int, big_int, checksum) VALUES (?, ?, ?, ?)", + (record_id, value % 256, value, checksum) + ).await.unwrap(); + + // Perform arithmetic operations that might overflow + conn.execute( + "UPDATE boundary_test SET counter = counter + ? WHERE id = ?", + (value / 1000000, record_id) + ).await.unwrap(); + + // Read back and verify + let mut stmt = conn.prepare("SELECT tiny_int, big_int, counter, checksum FROM boundary_test WHERE id = ?").await.unwrap(); + let mut rows = stmt.query([record_id]).await.unwrap(); + + if let Some(row) = rows.next().await.unwrap() { + let tiny_int: i64 = row.get(0).unwrap(); + let big_int: i64 = row.get(1).unwrap(); + let counter: i64 = row.get(2).unwrap(); + let retrieved_checksum: String = row.get(3).unwrap(); + + // Verify values + if big_int != value { + corruption_detected.store(true, Ordering::SeqCst); + eprintln!("BOUNDARY CORRUPTION: big_int mismatch for worker {} value {}: expected {}, got {}", + worker_id, value, value, big_int); + } + + if tiny_int != value % 256 { + corruption_detected.store(true, Ordering::SeqCst); + eprintln!("BOUNDARY CORRUPTION: tiny_int mismatch for worker {} value {}: expected {}, got {}", + worker_id, value, value % 256, tiny_int); + } + + if retrieved_checksum != checksum { + corruption_detected.store(true, Ordering::SeqCst); + eprintln!("BOUNDARY CORRUPTION: checksum mismatch for worker {} value {}", worker_id, value); + } + } + + sleep(Duration::from_millis(50)).await; + } + + worker_id + }); + + handles.push(handle); + } + + // Network disruption during boundary operations + let disruption_handle = tokio::spawn(async move { + barrier.wait().await; + + for _ in 0..5 { + sleep(Duration::from_secs(8)).await; + turmoil::hold("primary"); + sleep(Duration::from_millis(400)).await; + turmoil::release("primary"); + } + }); + + for handle in handles { + handle.await.unwrap(); + } + + disruption_handle.await.unwrap(); + + // Final verification + let mut verify_stmt = conn.prepare("SELECT id, tiny_int, big_int, counter FROM boundary_test ORDER BY id").await?; + let mut verify_rows = verify_stmt.query([]).await?; + + let mut record_count = 0; + while let Some(row) = verify_rows.next().await? { + let id: i64 = row.get(0)?; + let tiny_int: i64 = row.get(1)?; + let big_int: i64 = row.get(2)?; + let counter: i64 = row.get(3)?; + + record_count += 1; + + // Verify data consistency + if tiny_int < -128 || tiny_int > 127 { + corruption_detected.store(true, Ordering::SeqCst); + eprintln!("FINAL BOUNDARY CHECK FAILED: Record {} tiny_int out of range: {}", id, tiny_int); + } + } + + if corruption_detected.load(Ordering::SeqCst) { + panic!("INTEGER BOUNDARY CORRUPTION DETECTED!"); + } + + eprintln!("Boundary test completed successfully with {} records", record_count); + + Ok(()) + }); + + sim.run().unwrap(); +} + +/// Test for Unicode and special character corruption +/// This test targets text encoding/decoding edge cases +#[test] +fn unicode_corruption_test() { + let mut sim = Builder::new() + .simulation_duration(Duration::from_secs(250)) + .build(); + + let tmp = tempdir().unwrap(); + let db_path = tmp.path().to_owned(); + + init_tracing(); + sim.host("primary", move || { + let path = db_path.clone(); + async move { + let server = TestServer { + path: path.into(), + db_config: crate::config::DbConfig { + max_log_size: 5, + max_log_duration: Some(0.2), + ..Default::default() + }, + user_api_config: crate::config::UserApiConfig::default(), + admin_api_config: Some(crate::config::AdminApiConfig { + acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9090)).await?, + connector: TurmoilConnector, + disable_metrics: false, + auth_key: None, + }), + rpc_server_config: Some(crate::config::RpcServerConfig { + acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 4567)).await?, + tls_config: None, + }), + disable_namespaces: false, + disable_default_namespace: true, + ..Default::default() + }; + + server.start_sim(8080).await?; + Ok(()) + } + }); + + sim.client("unicode_tester", async move { + let client = Client::new(); + + client + .post("http://primary:9090/v1/namespaces/testdb/create", json!({})) + .await?; + + let db = Database::open_remote_with_connector( + "http://testdb.primary:8080", + "", + TurmoilConnector + )?; + let conn = db.connect()?; + + // Create table for Unicode testing + conn.execute( + "CREATE TABLE unicode_test ( + id INTEGER PRIMARY KEY, + unicode_text TEXT NOT NULL, + binary_data BLOB NOT NULL, + text_length INTEGER NOT NULL, + hash TEXT NOT NULL + )", + () + ).await?; + + let corruption_detected = Arc::new(AtomicBool::new(false)); + let barrier = Arc::new(Barrier::new(4)); + + let mut handles = vec![]; + + // Test various Unicode scenarios + for worker_id in 0..3 { + let barrier = barrier.clone(); + let corruption_detected = corruption_detected.clone(); + + let handle = tokio::spawn(async move { + let db = Database::open_remote_with_connector( + "http://testdb.primary:8080", + "", + TurmoilConnector + ).unwrap(); + let conn = db.connect().unwrap(); + + barrier.wait().await; + + // Various problematic Unicode strings + let test_strings = vec![ + "๐Ÿš€๐Ÿ”ฅ๐Ÿ’ฏ๐ŸŽ‰๐ŸŒŸ", // Emojis + "ร‘oรฑo niรฑo aรฑo", // Spanish accents + "ะœะพัะบะฒะฐ ะ ะพััะธั", // Cyrillic + "ๅŒ—ไบฌไธญๅ›ฝ", // Chinese + "ุงู„ุนุฑุจูŠุฉ", // Arabic + "ืขื‘ืจื™ืช", // Hebrew + "๐Ÿณ๏ธโ€๐ŸŒˆ๐Ÿณ๏ธโ€โšง๏ธ", // Complex emojis with ZWJ + "\u{0000}\u{0001}\u{0002}", // Control characters + "\"'\\`\n\r\t", // Escape characters + "SELECT * FROM users; DROP TABLE users;--", // SQL injection attempt + "a".repeat(10000), // Very long string + "", // Empty string + "\u{FEFF}BOM test", // Byte Order Mark + "๐Ÿค”๐Ÿคฏ๐Ÿฅด๐Ÿ˜ตโ€๐Ÿ’ซ๐Ÿคฎ", // More complex emojis + "๐•ณ๐–Š๐–‘๐–‘๐–” ๐–‚๐–”๐–—๐–‘๐–‰", // Mathematical script + ]; + + for (iteration, test_string) in test_strings.iter().enumerate() { + let record_id = worker_id * 1000 + iteration as i64; + let binary_data = test_string.as_bytes(); + let text_length = test_string.chars().count() as i64; + let hash = format!("unicode_hash_{}_{}", worker_id, iteration); + + // Insert Unicode data + conn.execute( + "INSERT INTO unicode_test (id, unicode_text, binary_data, text_length, hash) VALUES (?, ?, ?, ?, ?)", + (record_id, test_string, binary_data, text_length, hash) + ).await.unwrap(); + + // Read back immediately + let mut stmt = conn.prepare("SELECT unicode_text, binary_data, text_length, hash FROM unicode_test WHERE id = ?").await.unwrap(); + let mut rows = stmt.query([record_id]).await.unwrap(); + + if let Some(row) = rows.next().await.unwrap() { + let retrieved_text: String = row.get(0).unwrap(); + let retrieved_binary: Vec = row.get(1).unwrap(); + let retrieved_length: i64 = row.get(2).unwrap(); + let retrieved_hash: String = row.get(3).unwrap(); + + // Verify Unicode integrity + if retrieved_text != *test_string { + corruption_detected.store(true, Ordering::SeqCst); + eprintln!("UNICODE CORRUPTION: Text mismatch for worker {} iteration {}", worker_id, iteration); + eprintln!("Expected: {:?}", test_string); + eprintln!("Got: {:?}", retrieved_text); + } + + if retrieved_binary != binary_data { + corruption_detected.store(true, Ordering::SeqCst); + eprintln!("UNICODE CORRUPTION: Binary data mismatch for worker {} iteration {}", worker_id, iteration); + } + + if retrieved_length != text_length { + corruption_detected.store(true, Ordering::SeqCst); + eprintln!("UNICODE CORRUPTION: Length mismatch for worker {} iteration {}: expected {}, got {}", + worker_id, iteration, text_length, retrieved_length); + } + + if retrieved_hash != hash { + corruption_detected.store(true, Ordering::SeqCst); + eprintln!("UNICODE CORRUPTION: Hash mismatch for worker {} iteration {}", worker_id, iteration); + } + } + + // Update with more Unicode + let updated_text = format!("{}๐Ÿ”„{}", test_string, worker_id); + conn.execute( + "UPDATE unicode_test SET unicode_text = ?, text_length = ? WHERE id = ?", + (updated_text, updated_text.chars().count() as i64, record_id) + ).await.unwrap(); + + sleep(Duration::from_millis(30)).await; + } + + worker_id + }); + + handles.push(handle); + } + + // Network chaos during Unicode operations + let chaos_handle = tokio::spawn(async move { + barrier.wait().await; + + for cycle in 0..8 { + sleep(Duration::from_secs(6)).await; + + // Various disruption patterns + match cycle % 4 { + 0 => { + turmoil::hold("primary"); + sleep(Duration::from_millis(300)).await; + turmoil::release("primary"); + } + 1 => { + for _ in 0..5 { + turmoil::hold("primary"); + sleep(Duration::from_millis(100)).await; + turmoil::release("primary"); + sleep(Duration::from_millis(50)).await; + } + } + 2 => { + turmoil::hold("primary"); + sleep(Duration::from_secs(1)).await; + turmoil::release("primary"); + } + 3 => { + for _ in 0..10 { + turmoil::hold("primary"); + sleep(Duration::from_millis(50)).await; + turmoil::release("primary"); + sleep(Duration::from_millis(25)).await; + } + } + _ => {} + } + } + }); + + for handle in handles { + handle.await.unwrap(); + } + + chaos_handle.await.unwrap(); + + // Final Unicode integrity verification + let mut verify_stmt = conn.prepare("SELECT id, unicode_text, LENGTH(unicode_text), text_length FROM unicode_test ORDER BY id").await?; + let mut verify_rows = verify_stmt.query([]).await?; + + let mut record_count = 0; + while let Some(row) = verify_rows.next().await? { + let id: i64 = row.get(0)?; + let text: String = row.get(1)?; + let byte_length: i64 = row.get(2)?; + let char_length: i64 = row.get(3)?; + + record_count += 1; + + // Verify Unicode consistency + let actual_char_count = text.chars().count() as i64; + if actual_char_count != char_length { + corruption_detected.store(true, Ordering::SeqCst); + eprintln!("FINAL UNICODE CHECK FAILED: Record {} character count mismatch: stored {}, actual {}", + id, char_length, actual_char_count); + } + + // Check for invalid UTF-8 sequences + if !text.is_ascii() && text.chars().any(|c| c == '\u{FFFD}') { + corruption_detected.store(true, Ordering::SeqCst); + eprintln!("FINAL UNICODE CHECK FAILED: Record {} contains replacement characters", id); + } + } + + if corruption_detected.load(Ordering::SeqCst) { + panic!("UNICODE CORRUPTION DETECTED!"); + } + + eprintln!("Unicode test completed successfully with {} records", record_count); + + Ok(()) + }); + + sim.run().unwrap(); +} + +/// Test for NULL value handling corruption +/// This test targets NULL/NOT NULL constraint edge cases +#[test] +fn null_handling_corruption_test() { + let mut sim = Builder::new() + .simulation_duration(Duration::from_secs(180)) + .build(); + + let tmp = tempdir().unwrap(); + let db_path = tmp.path().to_owned(); + + init_tracing(); + sim.host("primary", move || { + let path = db_path.clone(); + async move { + let server = TestServer { + path: path.into(), + db_config: crate::config::DbConfig { + max_log_size: 7, + max_log_duration: Some(0.4), + ..Default::default() + }, + user_api_config: crate::config::UserApiConfig::default(), + admin_api_config: Some(crate::config::AdminApiConfig { + acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9090)).await?, + connector: TurmoilConnector, + disable_metrics: false, + auth_key: None, + }), + rpc_server_config: Some(crate::config::RpcServerConfig { + acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 4567)).await?, + tls_config: None, + }), + disable_namespaces: false, + disable_default_namespace: true, + ..Default::default() + }; + + server.start_sim(8080).await?; + Ok(()) + } + }); + + sim.client("null_tester", async move { + let client = Client::new(); + + client + .post("http://primary:9090/v1/namespaces/testdb/create", json!({})) + .await?; + + let db = Database::open_remote_with_connector( + "http://testdb.primary:8080", + "", + TurmoilConnector + )?; + let conn = db.connect()?; + + // Create table with mixed NULL/NOT NULL constraints + conn.execute( + "CREATE TABLE null_test ( + id INTEGER PRIMARY KEY, + required_field TEXT NOT NULL, + optional_field TEXT, + nullable_int INTEGER, + non_null_int INTEGER NOT NULL DEFAULT 0, + status TEXT NOT NULL DEFAULT 'active' + )", + () + ).await?; + + let corruption_detected = Arc::new(AtomicBool::new(false)); + let barrier = Arc::new(Barrier::new(4)); + + let mut handles = vec![]; + + // Test NULL handling with concurrent operations + for worker_id in 0..3 { + let barrier = barrier.clone(); + let corruption_detected = corruption_detected.clone(); + + let handle = tokio::spawn(async move { + let db = Database::open_remote_with_connector( + "http://testdb.primary:8080", + "", + TurmoilConnector + ).unwrap(); + let conn = db.connect().unwrap(); + + barrier.wait().await; + + for iteration in 0..40 { + let record_id = worker_id * 1000 + iteration; + let required_field = format!("required_{}_{}", worker_id, iteration); + + // Test various NULL scenarios + match iteration % 4 { + 0 => { + // Insert with all fields + conn.execute( + "INSERT INTO null_test (id, required_field, optional_field, nullable_int, non_null_int) VALUES (?, ?, ?, ?, ?)", + (record_id, &required_field, Some(format!("optional_{}", iteration)), Some(iteration), iteration + 100) + ).await.unwrap(); + } + 1 => { + // Insert with NULLs in nullable fields + conn.execute( + "INSERT INTO null_test (id, required_field, optional_field, nullable_int, non_null_int) VALUES (?, ?, ?, ?, ?)", + (record_id, &required_field, None::, None::, iteration + 200) + ).await.unwrap(); + } + 2 => { + // Insert with defaults + conn.execute( + "INSERT INTO null_test (id, required_field) VALUES (?, ?)", + (record_id, &required_field) + ).await.unwrap(); + } + 3 => { + // Insert then update to NULL + conn.execute( + "INSERT INTO null_test (id, required_field, optional_field, nullable_int, non_null_int) VALUES (?, ?, ?, ?, ?)", + (record_id, &required_field, Some("temp".to_string()), Some(999), iteration + 300) + ).await.unwrap(); + + // Update nullable fields to NULL + conn.execute( + "UPDATE null_test SET optional_field = NULL, nullable_int = NULL WHERE id = ?", + (record_id,) + ).await.unwrap(); + } + _ => {} + } + + // Verify the record + let mut stmt = conn.prepare("SELECT required_field, optional_field, nullable_int, non_null_int, status FROM null_test WHERE id = ?").await.unwrap(); + let mut rows = stmt.query([record_id]).await.unwrap(); + + if let Some(row) = rows.next().await.unwrap() { + let req_field: String = row.get(0).unwrap(); + let opt_field: Option = row.get(1).unwrap(); + let nullable_int: Option = row.get(2).unwrap(); + let non_null_int: i64 = row.get(3).unwrap(); + let status: String = row.get(4).unwrap(); + + // Verify required field is never NULL + if req_field != required_field { + corruption_detected.store(true, Ordering::SeqCst); + eprintln!("NULL CORRUPTION: Required field mismatch for worker {} iteration {}", worker_id, iteration); + } + + // Verify non-null integer is never NULL (this should never be NULL) + // The fact that we can retrieve it as i64 means it's not NULL, which is correct + + // Verify status has default value when not explicitly set + if status.is_empty() { + corruption_detected.store(true, Ordering::SeqCst); + eprintln!("NULL CORRUPTION: Status field is empty for worker {} iteration {}", worker_id, iteration); + } + } + + // Try to violate NOT NULL constraints (these should fail) + let violation_result = conn.execute( + "UPDATE null_test SET required_field = NULL WHERE id = ?", + (record_id,) + ).await; + + // This should fail - if it succeeds, we have a constraint violation bug + if violation_result.is_ok() { + corruption_detected.store(true, Ordering::SeqCst); + eprintln!("NULL CONSTRAINT VIOLATION: Successfully set required_field to NULL for record {}", record_id); + } + + sleep(Duration::from_millis(25)).await; + } + + worker_id + }); + + handles.push(handle); + } + + // Network disruption during NULL operations + let disruption_handle = tokio::spawn(async move { + barrier.wait().await; + + for _ in 0..6 { + sleep(Duration::from_secs(5)).await; + turmoil::hold("primary"); + sleep(Duration::from_millis(250)).await; + turmoil::release("primary"); + } + }); + + for handle in handles { + handle.await.unwrap(); + } + + disruption_handle.await.unwrap(); + + // Final NULL constraint verification + let mut verify_stmt = conn.prepare("SELECT id, required_field, optional_field, nullable_int, non_null_int, status FROM null_test ORDER BY id").await?; + let mut verify_rows = verify_stmt.query([]).await?; + + let mut record_count = 0; + while let Some(row) = verify_rows.next().await? { + let id: i64 = row.get(0)?; + let required_field: String = row.get(1)?; + let optional_field: Option = row.get(2)?; + let nullable_int: Option = row.get(3)?; + let non_null_int: i64 = row.get(4)?; + let status: String = row.get(5)?; + + record_count += 1; + + // Verify NOT NULL constraints are maintained + if required_field.is_empty() { + corruption_detected.store(true, Ordering::SeqCst); + eprintln!("FINAL NULL CHECK FAILED: Record {} has empty required_field", id); + } + + if status.is_empty() { + corruption_detected.store(true, Ordering::SeqCst); + eprintln!("FINAL NULL CHECK FAILED: Record {} has empty status", id); + } + + // non_null_int should never be NULL (if we can read it as i64, it's not NULL) + // This is implicitly verified by the successful row.get(4) call above + } + + if corruption_detected.load(Ordering::SeqCst) { + panic!("NULL HANDLING CORRUPTION DETECTED!"); + } + + eprintln!("NULL handling test completed successfully with {} records", record_count); + + Ok(()) + }); + + sim.run().unwrap(); +} \ No newline at end of file diff --git a/libsql-server/tests/extreme_corruption_tests.rs b/libsql-server/tests/extreme_corruption_tests.rs new file mode 100644 index 0000000000..8d1875bdc9 --- /dev/null +++ b/libsql-server/tests/extreme_corruption_tests.rs @@ -0,0 +1,1031 @@ +//! Extreme corruption tests designed to maximize bug discovery +//! +//! Created by hamisionesmus for Turso bug bounty program +//! These tests target the most vulnerable areas for data corruption +//! with extreme stress conditions to expose maximum number of bugs. + +use std::collections::HashMap; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; +use std::sync::Arc; +use std::time::Duration; + +use libsql::Database; +use serde_json::json; +use tempfile::tempdir; +use tokio::sync::{Barrier, Notify}; +use tokio::time::{sleep, timeout}; +use turmoil::{Builder, Sim}; + +use crate::common::http::Client; +use crate::common::net::{init_tracing, SimServer, TestServer, TurmoilAcceptor, TurmoilConnector}; + +/// Extreme stress test with maximum concurrent connections and minimal resources +/// This test pushes the system to its absolute limits to expose race conditions +#[test] +fn extreme_concurrent_stress_test() { + let mut sim = Builder::new() + .simulation_duration(Duration::from_secs(600)) // Longer duration + .tcp_capacity(64) // Extremely limited bandwidth + .build(); + + let tmp = tempdir().unwrap(); + let db_path = tmp.path().to_owned(); + + init_tracing(); + sim.host("primary", move || { + let path = db_path.clone(); + async move { + let server = TestServer { + path: path.into(), + db_config: crate::config::DbConfig { + max_log_size: 3, // Extremely small - forces constant compaction + max_log_duration: Some(0.1), // Very aggressive + ..Default::default() + }, + user_api_config: crate::config::UserApiConfig::default(), + admin_api_config: Some(crate::config::AdminApiConfig { + acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9090)).await?, + connector: TurmoilConnector, + disable_metrics: false, + auth_key: None, + }), + rpc_server_config: Some(crate::config::RpcServerConfig { + acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 4567)).await?, + tls_config: None, + }), + disable_namespaces: false, + disable_default_namespace: true, + ..Default::default() + }; + + server.start_sim(8080).await?; + Ok(()) + } + }); + + sim.client("extreme_tester", async move { + let client = Client::new(); + + client + .post("http://primary:9090/v1/namespaces/testdb/create", json!({})) + .await?; + + let db = Database::open_remote_with_connector( + "http://testdb.primary:8080", + "", + TurmoilConnector + )?; + let conn = db.connect()?; + + // Create complex schema with multiple tables and constraints + conn.execute( + "CREATE TABLE accounts ( + id INTEGER PRIMARY KEY, + balance INTEGER NOT NULL CHECK(balance >= 0), + account_type TEXT NOT NULL, + created_at INTEGER NOT NULL, + updated_at INTEGER NOT NULL, + checksum TEXT NOT NULL + )", + () + ).await?; + + conn.execute( + "CREATE TABLE transactions ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + from_account INTEGER REFERENCES accounts(id), + to_account INTEGER REFERENCES accounts(id), + amount INTEGER NOT NULL CHECK(amount > 0), + timestamp INTEGER NOT NULL, + status TEXT NOT NULL DEFAULT 'pending', + hash TEXT NOT NULL + )", + () + ).await?; + + conn.execute( + "CREATE TABLE audit_trail ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + table_name TEXT NOT NULL, + operation TEXT NOT NULL, + old_values TEXT, + new_values TEXT, + timestamp INTEGER NOT NULL, + user_id TEXT NOT NULL + )", + () + ).await?; + + // Insert initial data + for i in 0..200 { + let checksum = format!("acc_chk_{}", i); + conn.execute( + "INSERT INTO accounts (id, balance, account_type, created_at, updated_at, checksum) + VALUES (?, ?, ?, ?, ?, ?)", + (i, 10000, if i % 3 == 0 { "checking" } else if i % 3 == 1 { "savings" } else { "investment" }, + 1000000000 + i, 1000000000 + i, checksum) + ).await?; + } + + let corruption_detected = Arc::new(AtomicBool::new(false)); + let transaction_counter = Arc::new(AtomicU64::new(0)); + let barrier = Arc::new(Barrier::new(21)); // 20 workers + 1 controller + + let mut handles = vec![]; + + // Spawn 20 extremely aggressive concurrent workers + for worker_id in 0..20 { + let barrier = barrier.clone(); + let corruption_detected = corruption_detected.clone(); + let transaction_counter = transaction_counter.clone(); + + let handle = tokio::spawn(async move { + let db = Database::open_remote_with_connector( + "http://testdb.primary:8080", + "", + TurmoilConnector + ).unwrap(); + let conn = db.connect().unwrap(); + + barrier.wait().await; + + for iteration in 0..100 { + let tx_id = transaction_counter.fetch_add(1, Ordering::SeqCst); + + // Complex multi-table transaction + let tx = conn.transaction().await.unwrap(); + + let from_account = (tx_id % 200) as i64; + let to_account = ((tx_id + 1) % 200) as i64; + let amount = 100 + (tx_id % 500) as i64; + + // Read current balances + let mut from_stmt = tx.prepare("SELECT balance, checksum FROM accounts WHERE id = ?").await.unwrap(); + let mut from_rows = from_stmt.query([from_account]).await.unwrap(); + + if let Some(from_row) = from_rows.next().await.unwrap() { + let from_balance: i64 = from_row.get(0).unwrap(); + let from_checksum: String = from_row.get(1).unwrap(); + + // Verify checksum + let expected_checksum = format!("acc_chk_{}", from_account); + if !from_checksum.starts_with(&expected_checksum) { + corruption_detected.store(true, Ordering::SeqCst); + eprintln!("CHECKSUM CORRUPTION: Account {} has invalid checksum", from_account); + } + + if from_balance >= amount { + // Log to audit trail + tx.execute( + "INSERT INTO audit_trail (table_name, operation, old_values, new_values, timestamp, user_id) + VALUES (?, ?, ?, ?, ?, ?)", + ("accounts", "transfer_out", + format!("balance:{}", from_balance), + format!("balance:{}", from_balance - amount), + std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_secs() as i64, + format!("worker_{}", worker_id)) + ).await.unwrap(); + + // Update from account + let new_checksum = format!("acc_chk_{}_v{}", from_account, tx_id); + tx.execute( + "UPDATE accounts SET balance = balance - ?, updated_at = ?, checksum = ? WHERE id = ?", + (amount, std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_secs() as i64, new_checksum, from_account) + ).await.unwrap(); + + // Update to account + let to_new_checksum = format!("acc_chk_{}_v{}", to_account, tx_id); + tx.execute( + "UPDATE accounts SET balance = balance + ?, updated_at = ?, checksum = ? WHERE id = ?", + (amount, std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_secs() as i64, to_new_checksum, to_account) + ).await.unwrap(); + + // Record transaction + let tx_hash = format!("tx_hash_{}_{}", worker_id, iteration); + tx.execute( + "INSERT INTO transactions (from_account, to_account, amount, timestamp, status, hash) + VALUES (?, ?, ?, ?, ?, ?)", + (from_account, to_account, amount, + std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_secs() as i64, + "completed", tx_hash) + ).await.unwrap(); + + // Random delay to increase race condition chances + if iteration % 3 == 0 { + sleep(Duration::from_millis(10 + (worker_id as u64 * 5))).await; + } + } + } + + // Commit with timeout + match timeout(Duration::from_secs(15), tx.commit()).await { + Ok(Ok(_)) => {}, + Ok(Err(e)) => { + eprintln!("Transaction commit failed for worker {}: {}", worker_id, e); + } + Err(_) => { + eprintln!("Transaction commit timeout for worker {}", worker_id); + } + } + + // Very short delay between transactions + sleep(Duration::from_millis(5)).await; + } + + worker_id + }); + + handles.push(handle); + } + + // Network chaos controller + let chaos_handle = tokio::spawn(async move { + barrier.wait().await; + + for cycle in 0..30 { + sleep(Duration::from_secs(5)).await; + + // Create various network disruptions + match cycle % 6 { + 0 => { + // Brief total outage + turmoil::hold("primary"); + sleep(Duration::from_millis(500)).await; + turmoil::release("primary"); + } + 1 => { + // Bandwidth throttling (already limited, but add more pressure) + for _ in 0..10 { + turmoil::hold("primary"); + sleep(Duration::from_millis(50)).await; + turmoil::release("primary"); + sleep(Duration::from_millis(50)).await; + } + } + 2 => { + // Intermittent connectivity + for _ in 0..5 { + turmoil::hold("primary"); + sleep(Duration::from_millis(200)).await; + turmoil::release("primary"); + sleep(Duration::from_millis(100)).await; + } + } + 3 => { + // Longer outage during potential compaction + turmoil::hold("primary"); + sleep(Duration::from_secs(2)).await; + turmoil::release("primary"); + } + 4 => { + // Rapid on/off cycles + for _ in 0..20 { + turmoil::hold("primary"); + sleep(Duration::from_millis(25)).await; + turmoil::release("primary"); + sleep(Duration::from_millis(25)).await; + } + } + 5 => { + // Extended disruption + turmoil::hold("primary"); + sleep(Duration::from_secs(3)).await; + turmoil::release("primary"); + } + _ => {} + } + } + }); + + // Wait for all workers + for handle in handles { + handle.await.unwrap(); + } + + chaos_handle.await.unwrap(); + + // Final verification phase + sleep(Duration::from_secs(10)).await; + + // Comprehensive data integrity checks + + // 1. Check account balance consistency + let mut balance_stmt = conn.prepare("SELECT id, balance, checksum FROM accounts ORDER BY id").await?; + let mut balance_rows = balance_stmt.query([]).await?; + + let mut total_balance = 0i64; + let mut account_count = 0; + + while let Some(row) = balance_rows.next().await? { + let id: i64 = row.get(0)?; + let balance: i64 = row.get(1)?; + let checksum: String = row.get(2)?; + + account_count += 1; + total_balance += balance; + + // Check for negative balances (constraint violation) + if balance < 0 { + corruption_detected.store(true, Ordering::SeqCst); + eprintln!("CONSTRAINT VIOLATION: Account {} has negative balance: {}", id, balance); + } + + // Verify checksum format + if !checksum.contains("acc_chk_") { + corruption_detected.store(true, Ordering::SeqCst); + eprintln!("CHECKSUM FORMAT ERROR: Account {} has malformed checksum: {}", id, checksum); + } + } + + // Total balance should be conserved (200 accounts * 10000 initial = 2,000,000) + let expected_total = 200 * 10000; + if total_balance != expected_total { + corruption_detected.store(true, Ordering::SeqCst); + eprintln!("BALANCE CONSERVATION VIOLATION: Expected {}, got {}", expected_total, total_balance); + } + + // 2. Check transaction log integrity + let mut tx_stmt = conn.prepare("SELECT id, from_account, to_account, amount, hash FROM transactions ORDER BY id").await?; + let mut tx_rows = tx_stmt.query([]).await?; + + let mut tx_count = 0; + while let Some(row) = tx_rows.next().await? { + let id: i64 = row.get(0)?; + let from_account: i64 = row.get(1)?; + let to_account: i64 = row.get(2)?; + let amount: i64 = row.get(3)?; + let hash: String = row.get(4)?; + + tx_count += 1; + + // Verify transaction makes sense + if from_account == to_account { + corruption_detected.store(true, Ordering::SeqCst); + eprintln!("TRANSACTION LOGIC ERROR: Transaction {} has same from/to account: {}", id, from_account); + } + + if amount <= 0 { + corruption_detected.store(true, Ordering::SeqCst); + eprintln!("TRANSACTION AMOUNT ERROR: Transaction {} has invalid amount: {}", id, amount); + } + + if !hash.starts_with("tx_hash_") { + corruption_detected.store(true, Ordering::SeqCst); + eprintln!("TRANSACTION HASH ERROR: Transaction {} has malformed hash: {}", id, hash); + } + } + + // 3. Check audit trail completeness + let mut audit_stmt = conn.prepare("SELECT COUNT(*) FROM audit_trail").await?; + let mut audit_rows = audit_stmt.query([]).await?; + + if let Some(row) = audit_rows.next().await? { + let audit_count: i64 = row.get(0)?; + + // Should have at least as many audit entries as successful transactions + if audit_count < tx_count { + corruption_detected.store(true, Ordering::SeqCst); + eprintln!("AUDIT TRAIL INCOMPLETE: {} transactions but only {} audit entries", tx_count, audit_count); + } + } + + if corruption_detected.load(Ordering::SeqCst) { + panic!("EXTREME STRESS TEST DETECTED CORRUPTION!"); + } + + eprintln!("Extreme stress test completed: {} accounts, {} transactions, total balance: {}", + account_count, tx_count, total_balance); + + Ok(()) + }); + + sim.run().unwrap(); +} + +/// Test for encryption-related corruption bugs +/// This test specifically targets encryption/decryption edge cases +#[test] +#[cfg(feature = "encryption")] +fn encryption_corruption_test() { + let mut sim = Builder::new() + .simulation_duration(Duration::from_secs(300)) + .tcp_capacity(128) + .build(); + + let tmp = tempdir().unwrap(); + let db_path = tmp.path().to_owned(); + + init_tracing(); + sim.host("primary", move || { + let path = db_path.clone(); + async move { + let server = TestServer { + path: path.into(), + db_config: crate::config::DbConfig { + max_log_size: 8, + max_log_duration: Some(0.5), + ..Default::default() + }, + user_api_config: crate::config::UserApiConfig::default(), + admin_api_config: Some(crate::config::AdminApiConfig { + acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9090)).await?, + connector: TurmoilConnector, + disable_metrics: false, + auth_key: None, + }), + rpc_server_config: Some(crate::config::RpcServerConfig { + acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 4567)).await?, + tls_config: None, + }), + disable_namespaces: false, + disable_default_namespace: true, + ..Default::default() + }; + + server.start_sim(8080).await?; + Ok(()) + } + }); + + sim.client("encryption_tester", async move { + let client = Client::new(); + + client + .post("http://primary:9090/v1/namespaces/testdb/create", json!({})) + .await?; + + // Test with encryption enabled + let db = Database::open_with_remote_sync_connector( + "test_encrypted.db", + "http://testdb.primary:8080", + "", + TurmoilConnector, + false, + Some(libsql::EncryptionConfig::new( + libsql::Cipher::Aes256Cbc, + bytes::Bytes::from_static(b"test_encryption_key_32_bytes_long") + )) + ).await?; + + let conn = db.connect()?; + + // Create table with sensitive data + conn.execute( + "CREATE TABLE encrypted_data ( + id INTEGER PRIMARY KEY, + sensitive_data BLOB NOT NULL, + plaintext_hash TEXT NOT NULL, + encryption_version INTEGER NOT NULL + )", + () + ).await?; + + let corruption_detected = Arc::new(AtomicBool::new(false)); + let barrier = Arc::new(Barrier::new(6)); + + let mut handles = vec![]; + + // Test encryption under various stress conditions + for worker_id in 0..5 { + let barrier = barrier.clone(); + let corruption_detected = corruption_detected.clone(); + + let handle = tokio::spawn(async move { + let db = Database::open_with_remote_sync_connector( + "test_encrypted.db", + "http://testdb.primary:8080", + "", + TurmoilConnector, + false, + Some(libsql::EncryptionConfig::new( + libsql::Cipher::Aes256Cbc, + bytes::Bytes::from_static(b"test_encryption_key_32_bytes_long") + )) + ).await.unwrap(); + let conn = db.connect().unwrap(); + + barrier.wait().await; + + for iteration in 0..50 { + let sensitive_data = format!("SENSITIVE_DATA_WORKER_{}_ITER_{}_SECRET_INFO", worker_id, iteration).repeat(100); + let plaintext_hash = format!("hash_{}", sensitive_data.len()); + + // Insert encrypted data + conn.execute( + "INSERT INTO encrypted_data (id, sensitive_data, plaintext_hash, encryption_version) VALUES (?, ?, ?, ?)", + (worker_id * 1000 + iteration, sensitive_data.as_bytes(), plaintext_hash, 1) + ).await.unwrap(); + + // Immediately read it back to verify encryption/decryption + let mut stmt = conn.prepare("SELECT sensitive_data, plaintext_hash FROM encrypted_data WHERE id = ?").await.unwrap(); + let mut rows = stmt.query([worker_id * 1000 + iteration]).await.unwrap(); + + if let Some(row) = rows.next().await.unwrap() { + let retrieved_data: Vec = row.get(0).unwrap(); + let retrieved_hash: String = row.get(1).unwrap(); + + // Verify data integrity after encryption/decryption + if retrieved_data != sensitive_data.as_bytes() { + corruption_detected.store(true, Ordering::SeqCst); + eprintln!("ENCRYPTION CORRUPTION: Data mismatch for worker {} iteration {}", worker_id, iteration); + } + + if retrieved_hash != plaintext_hash { + corruption_detected.store(true, Ordering::SeqCst); + eprintln!("HASH CORRUPTION: Hash mismatch for worker {} iteration {}", worker_id, iteration); + } + } + + sleep(Duration::from_millis(20)).await; + } + + worker_id + }); + + handles.push(handle); + } + + // Network disruption during encryption operations + let disruption_handle = tokio::spawn(async move { + barrier.wait().await; + + for _ in 0..10 { + sleep(Duration::from_secs(3)).await; + turmoil::hold("primary"); + sleep(Duration::from_millis(500)).await; + turmoil::release("primary"); + } + }); + + for handle in handles { + handle.await.unwrap(); + } + + disruption_handle.await.unwrap(); + + // Final verification + let mut verify_stmt = conn.prepare("SELECT id, sensitive_data, plaintext_hash FROM encrypted_data ORDER BY id").await?; + let mut verify_rows = verify_stmt.query([]).await?; + + let mut record_count = 0; + while let Some(row) = verify_rows.next().await? { + let id: i64 = row.get(0)?; + let data: Vec = row.get(1)?; + let hash: String = row.get(2)?; + + record_count += 1; + + // Verify data format + let data_str = String::from_utf8_lossy(&data); + if !data_str.contains("SENSITIVE_DATA_WORKER_") { + corruption_detected.store(true, Ordering::SeqCst); + eprintln!("ENCRYPTED DATA CORRUPTION: Record {} has malformed data", id); + } + + // Verify hash format + if !hash.starts_with("hash_") { + corruption_detected.store(true, Ordering::SeqCst); + eprintln!("HASH FORMAT CORRUPTION: Record {} has malformed hash", id); + } + } + + if corruption_detected.load(Ordering::SeqCst) { + panic!("ENCRYPTION CORRUPTION DETECTED!"); + } + + eprintln!("Encryption test completed successfully with {} records", record_count); + + Ok(()) + }); + + sim.run().unwrap(); +} + +/// Test for backup/restore corruption scenarios +/// This test targets data integrity during backup and restore operations +#[test] +fn backup_restore_corruption_test() { + let mut sim = Builder::new() + .simulation_duration(Duration::from_secs(400)) + .build(); + + let tmp_primary = tempdir().unwrap(); + let tmp_backup = tempdir().unwrap(); + let primary_path = tmp_primary.path().to_owned(); + let backup_path = tmp_backup.path().to_owned(); + + init_tracing(); + sim.host("primary", move || { + let path = primary_path.clone(); + async move { + let server = TestServer { + path: path.into(), + db_config: crate::config::DbConfig { + max_log_size: 12, + max_log_duration: Some(1.0), + ..Default::default() + }, + user_api_config: crate::config::UserApiConfig::default(), + admin_api_config: Some(crate::config::AdminApiConfig { + acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9090)).await?, + connector: TurmoilConnector, + disable_metrics: false, + auth_key: None, + }), + rpc_server_config: Some(crate::config::RpcServerConfig { + acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 4567)).await?, + tls_config: None, + }), + disable_namespaces: false, + disable_default_namespace: true, + ..Default::default() + }; + + server.start_sim(8080).await?; + Ok(()) + } + }); + + sim.client("backup_tester", async move { + let client = Client::new(); + + client + .post("http://primary:9090/v1/namespaces/testdb/create", json!({})) + .await?; + + let db = Database::open_remote_with_connector( + "http://testdb.primary:8080", + "", + TurmoilConnector + )?; + let conn = db.connect()?; + + // Create comprehensive test schema + conn.execute( + "CREATE TABLE backup_test ( + id INTEGER PRIMARY KEY, + data TEXT NOT NULL, + checksum TEXT NOT NULL, + created_at INTEGER NOT NULL, + backup_version INTEGER NOT NULL DEFAULT 1 + )", + () + ).await?; + + conn.execute( + "CREATE TABLE metadata ( + key TEXT PRIMARY KEY, + value TEXT NOT NULL, + last_backup INTEGER NOT NULL DEFAULT 0 + )", + () + ).await?; + + // Insert initial dataset + for i in 0..500 { + let data = format!("backup_test_data_{}_content", i).repeat(50); + let checksum = format!("chk_{}", data.len()); + conn.execute( + "INSERT INTO backup_test (id, data, checksum, created_at) VALUES (?, ?, ?, ?)", + (i, data, checksum, 1000000000 + i) + ).await?; + } + + // Insert metadata + conn.execute( + "INSERT INTO metadata (key, value) VALUES ('total_records', '500')", + () + ).await?; + conn.execute( + "INSERT INTO metadata (key, value) VALUES ('test_version', '1.0')", + () + ).await?; + + let corruption_detected = Arc::new(AtomicBool::new(false)); + let barrier = Arc::new(Barrier::new(4)); + + let mut handles = vec![]; + + // Continuous data modification during backup operations + for worker_id in 0..3 { + let barrier = barrier.clone(); + let corruption_detected = corruption_detected.clone(); + + let handle = tokio::spawn(async move { + let db = Database::open_remote_with_connector( + "http://testdb.primary:8080", + "", + TurmoilConnector + ).unwrap(); + let conn = db.connect().unwrap(); + + barrier.wait().await; + + for iteration in 0..80 { + // Modify existing records + let record_id = (worker_id * 100 + iteration) % 500; + let new_data = format!("modified_by_worker_{}_iter_{}_data", worker_id, iteration).repeat(60); + let new_checksum = format!("chk_mod_{}", new_data.len()); + + conn.execute( + "UPDATE backup_test SET data = ?, checksum = ?, backup_version = backup_version + 1 WHERE id = ?", + (new_data, new_checksum, record_id) + ).await.unwrap(); + + // Add new records + let new_id = 1000 + worker_id * 1000 + iteration; + let insert_data = format!("new_record_worker_{}_iter_{}", worker_id, iteration).repeat(40); + let insert_checksum = format!("chk_new_{}", insert_data.len()); + + conn.execute( + "INSERT INTO backup_test (id, data, checksum, created_at) VALUES (?, ?, ?, ?)", + (new_id, insert_data, insert_checksum, 2000000000 + new_id) + ).await.unwrap(); + + // Update metadata + conn.execute( + "UPDATE metadata SET value = ? WHERE key = 'total_records'", + (format!("{}", 500 + (worker_id + 1) * (iteration + 1)),) + ).await.unwrap(); + + sleep(Duration::from_millis(100)).await; + } + + worker_id + }); + + handles.push(handle); + } + + // Backup simulation with network disruptions + let backup_handle = tokio::spawn(async move { + barrier.wait().await; + + for backup_cycle in 0..8 { + sleep(Duration::from_secs(10)).await; + + // Simulate backup process with potential interruptions + eprintln!("Starting backup cycle {}", backup_cycle); + + // Create network disruption during backup + if backup_cycle % 3 == 0 { + turmoil::hold("primary"); + sleep(Duration::from_secs(1)).await; + turmoil::release("primary"); + } + + sleep(Duration::from_secs(5)).await; + eprintln!("Backup cycle {} completed", backup_cycle); + } + }); + + for handle in handles { + handle.await.unwrap(); + } + + backup_handle.await.unwrap(); + + // Final integrity verification + let mut verify_stmt = conn.prepare("SELECT COUNT(*), SUM(LENGTH(data)), SUM(backup_version) FROM backup_test").await?; + let mut verify_rows = verify_stmt.query([]).await?; + + if let Some(row) = verify_rows.next().await? { + let count: i64 = row.get(0)?; + let total_data_length: i64 = row.get(1)?; + let total_versions: i64 = row.get(2)?; + + eprintln!("Final verification: {} records, {} total data length, {} total versions", + count, total_data_length, total_versions); + + // Basic sanity checks + if count < 500 { + corruption_detected.store(true, Ordering::SeqCst); + eprintln!("BACKUP CORRUPTION: Lost records during backup operations"); + } + + if total_data_length == 0 { + corruption_detected.store(true, Ordering::SeqCst); + eprintln!("BACKUP CORRUPTION: All data lost"); + } + } + + if corruption_detected.load(Ordering::SeqCst) { + panic!("BACKUP/RESTORE CORRUPTION DETECTED!"); + } + + Ok(()) + }); + + sim.run().unwrap(); +} + +/// Test for memory pressure corruption scenarios +/// This test creates extreme memory pressure to expose memory-related bugs +#[test] +fn memory_pressure_corruption_test() { + let mut sim = Builder::new() + .simulation_duration(Duration::from_secs(300)) + .build(); + + let tmp = tempdir().unwrap(); + let db_path = tmp.path().to_owned(); + + init_tracing(); + sim.host("primary", move || { + let path = db_path.clone(); + async move { + let server = TestServer { + path: path.into(), + db_config: crate::config::DbConfig { + max_log_size: 4, // Very small to force frequent operations + max_log_duration: Some(0.1), + ..Default::default() + }, + user_api_config: crate::config::UserApiConfig::default(), + admin_api_config: Some(crate::config::AdminApiConfig { + acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9090)).await?, + connector: TurmoilConnector, + disable_metrics: false, + auth_key: None, + }), + rpc_server_config: Some(crate::config::RpcServerConfig { + acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 4567)).await?, + tls_config: None, + }), + disable_namespaces: false, + disable_default_namespace: true, + ..Default::default() + }; + + server.start_sim(8080).await?; + Ok(()) + } + }); + + sim.client("memory_tester", async move { + let client = Client::new(); + + client + .post("http://primary:9090/v1/namespaces/testdb/create", json!({})) + .await?; + + let db = Database::open_remote_with_connector( + "http://testdb.primary:8080", + "", + TurmoilConnector + )?; + let conn = db.connect()?; + + // Create table for large data + conn.execute( + "CREATE TABLE large_data ( + id INTEGER PRIMARY KEY, + huge_blob BLOB NOT NULL, + metadata TEXT NOT NULL, + checksum TEXT NOT NULL + )", + () + ).await?; + + let corruption_detected = Arc::new(AtomicBool::new(false)); + let barrier = Arc::new(Barrier::new(4)); + + let mut handles = vec![]; + + // Create memory pressure with large data operations + for worker_id in 0..3 { + let barrier = barrier.clone(); + let corruption_detected = corruption_detected.clone(); + + let handle = tokio::spawn(async move { + let db = Database::open_remote_with_connector( + "http://testdb.primary:8080", + "", + TurmoilConnector + ).unwrap(); + let conn = db.connect().unwrap(); + + barrier.wait().await; + + for iteration in 0..30 { + // Create very large blob data (1MB each) + let large_data = vec![0u8; 1024 * 1024]; + let metadata = format!("worker_{}_iteration_{}_large_data_metadata", worker_id, iteration).repeat(100); + let checksum = format!("large_chk_{}_{}", worker_id, iteration); + + let record_id = worker_id * 1000 + iteration; + + // Insert large data + conn.execute( + "INSERT INTO large_data (id, huge_blob, metadata, checksum) VALUES (?, ?, ?, ?)", + (record_id, large_data, metadata, checksum) + ).await.unwrap(); + + // Immediately read it back to verify + let mut stmt = conn.prepare("SELECT LENGTH(huge_blob), metadata, checksum FROM large_data WHERE id = ?").await.unwrap(); + let mut rows = stmt.query([record_id]).await.unwrap(); + + if let Some(row) = rows.next().await.unwrap() { + let blob_length: i64 = row.get(0).unwrap(); + let retrieved_metadata: String = row.get(1).unwrap(); + let retrieved_checksum: String = row.get(2).unwrap(); + + if blob_length != 1024 * 1024 { + corruption_detected.store(true, Ordering::SeqCst); + eprintln!("MEMORY CORRUPTION: Blob size mismatch for worker {} iteration {}: expected {}, got {}", + worker_id, iteration, 1024 * 1024, blob_length); + } + + if !retrieved_metadata.contains(&format!("worker_{}_iteration_{}", worker_id, iteration)) { + corruption_detected.store(true, Ordering::SeqCst); + eprintln!("MEMORY CORRUPTION: Metadata corruption for worker {} iteration {}", worker_id, iteration); + } + + if retrieved_checksum != format!("large_chk_{}_{}", worker_id, iteration) { + corruption_detected.store(true, Ordering::SeqCst); + eprintln!("MEMORY CORRUPTION: Checksum mismatch for worker {} iteration {}", worker_id, iteration); + } + } + + // Delete some old records to create fragmentation + if iteration > 5 { + let old_id = worker_id * 1000 + (iteration - 5); + conn.execute("DELETE FROM large_data WHERE id = ?", (old_id,)).await.unwrap(); + } + + sleep(Duration::from_millis(200)).await; + } + + worker_id + }); + + handles.push(handle); + } + + // Memory pressure controller + let pressure_handle = tokio::spawn(async move { + barrier.wait().await; + + // Create additional memory pressure + for cycle in 0..15 { + sleep(Duration::from_secs(5)).await; + + // Simulate memory pressure by creating temporary large allocations + let _temp_data: Vec> = (0..50).map(|_| vec![0u8; 512 * 1024]).collect(); + + // Network disruption during high memory usage + turmoil::hold("primary"); + sleep(Duration::from_millis(300)).await; + turmoil::release("primary"); + + // Let the temporary data be dropped + sleep(Duration::from_millis(100)).await; + } + }); + + for handle in handles { + handle.await.unwrap(); + } + + pressure_handle.await.unwrap(); + + // Final verification + let mut count_stmt = conn.prepare("SELECT COUNT(*), SUM(LENGTH(huge_blob)) FROM large_data").await?; + let mut count_rows = count_stmt.query([]).await?; + + if let Some(row) = count_rows.next().await? { + let count: i64 = row.get(0)?; + let total_size: i64 = row.get(1)?; + + eprintln!("Memory pressure test completed: {} records, {} total bytes", count, total_size); + + // Verify remaining data integrity + let mut verify_stmt = conn.prepare("SELECT id, LENGTH(huge_blob), checksum FROM large_data ORDER BY id").await?; + let mut verify_rows = verify_stmt.query([]).await?; + + while let Some(row) = verify_rows.next().await? { + let id: i64 = row.get(0)?; + let blob_length: i64 = row.get(1)?; + let checksum: String = row.get(2)?; + + if blob_length != 1024 * 1024 { + corruption_detected.store(true, Ordering::SeqCst); + eprintln!("FINAL VERIFICATION FAILED: Record {} has wrong blob size: {}", id, blob_length); + } + + if !checksum.starts_with("large_chk_") { + corruption_detected.store(true, Ordering::SeqCst); + eprintln!("FINAL VERIFICATION FAILED: Record {} has malformed checksum: {}", id, checksum); + } + } + } + + if corruption_detected.load(Ordering::SeqCst) { + panic!("MEMORY PRESSURE CORRUPTION DETECTED!"); + } + + Ok(()) + }); + + sim.run().unwrap(); +} \ No newline at end of file