diff --git a/.github/workflows/skylake2x-tests.yml b/.github/workflows/skylake2x-tests.yml index 5bc73a452..e75ab2cec 100644 --- a/.github/workflows/skylake2x-tests.yml +++ b/.github/workflows/skylake2x-tests.yml @@ -49,3 +49,4 @@ jobs: bash scripts/ci.bash env: CI_MACHINE_TYPE: "skylake2x" + timeout-minutes: 600 diff --git a/.github/workflows/skylake4x-tests.yml b/.github/workflows/skylake4x-tests.yml index aacca4ab2..7a64cca9b 100644 --- a/.github/workflows/skylake4x-tests.yml +++ b/.github/workflows/skylake4x-tests.yml @@ -48,4 +48,5 @@ jobs: bash setup.sh bash scripts/ci.bash env: - CI_MACHINE_TYPE: "skylake4x" \ No newline at end of file + CI_MACHINE_TYPE: "skylake4x" + timeout-minutes: 600 diff --git a/kernel/run.py b/kernel/run.py index a1fa5d4b5..50370e03b 100644 --- a/kernel/run.py +++ b/kernel/run.py @@ -49,6 +49,7 @@ def get_network_config(workers): config['tap{}'.format(2*i)] = { 'mid': i, 'mac': '56:b4:44:e9:62:d{:x}'.format(i), + 'ip' : f"172.31.0.1{i}" } return config @@ -106,6 +107,8 @@ def get_network_config(workers): # DCM Scheduler arguments parser.add_argument("--dcm-path", help='Path of DCM jar to use (defaults to latest release)', required=False, default=None) +parser.add_argument("--mid", + help="Machine id to set for this instance", required=False, default=None) # QEMU related arguments parser.add_argument("--qemu-nodes", type=int, @@ -215,8 +218,10 @@ def build_kernel(args): build_args = ['build', '--target', KERNEL_TARGET] if args.no_kfeatures: build_args += ["--no-default-features"] + log(" - enable feature --no-default-features") for feature in args.kfeatures: build_args += ['--features', feature] + log(" - enable feature {}".format(feature)) build_args += CARGO_DEFAULT_ARGS build_args += CARGO_NOSTD_BUILD_ARGS if args.verbose: @@ -233,6 +238,16 @@ def build_user_libraries(args): build_args += ["--features", "rumprt"] if args.nic == "virtio-net-pci": build_args += ["--features", "virtio"] + log(" - enable feature virtio") + + for featurelist in args.ufeatures: + for feature in featurelist.split(',') : + if ':' in feature: + mod_part, feature_part = feature.split(':') + if "libvibrio" == mod_part: + log(" - enable feature {}".format(feature_part)) + build_args += ['--features', feature_part] + # else: use e1000 / wm0 build_args += CARGO_DEFAULT_ARGS build_args += CARGO_NOSTD_BUILD_ARGS @@ -259,18 +274,21 @@ def build_userspace(args): if not (USR_PATH / module).exists(): log("User module {} not found, skipping.".format(module)) continue + log("build user-space module {}".format(module)) with local.cwd(USR_PATH / module): with local.env(RUSTFLAGS=USER_RUSTFLAGS): with local.env(RUST_TARGET_PATH=USR_PATH.absolute()): build_args = build_args_default.copy() - for feature in args.ufeatures: - if ':' in feature: - mod_part, feature_part = feature.split(':') - if module == mod_part: - build_args += ['--features', feature_part] - else: - build_args += ['--features', feature] - log("Build user-module {}".format(module)) + for featurelist in args.ufeatures: + for feature in featurelist.split(',') : + if ':' in feature: + mod_part, feature_part = feature.split(':') + if module == mod_part: + log(" - enable feature {}".format(feature_part)) + build_args += ['--features', feature_part] + else: + log(" - enable feature {}".format(feature)) + build_args += ['--features', feature] if args.verbose: print("cd {}".format(USR_PATH / module)) print("RUSTFLAGS={} RUST_TARGET_PATH={} cargo ".format( @@ -316,7 +334,10 @@ def deploy(args): # Append globally unique machine id to cmd (for rackscale) # as well as a number of workers (clients) if args.cmd and NETWORK_CONFIG[args.tap]['mid'] != None: - args.cmd += " mid={}".format(NETWORK_CONFIG[args.tap]['mid']) + if args.mid is None : + args.cmd += " mid={}".format(NETWORK_CONFIG[args.tap]['mid']) + else : + args.cmd += f" mid={args.mid}" if is_controller or is_client: args.cmd += " workers={}".format(args.workers) # Write kernel cmd-line file in ESP dir @@ -733,20 +754,26 @@ def configure_network(args): sudo[ip[['link', 'set', '{}'.format(tap), 'down']]](retcode=(0, 1)) sudo[ip[['link', 'del', '{}'.format(tap)]]](retcode=(0, 1)) - # Need to find out how to set default=True in case workers are >0 in `args` - if (not 'workers' in args) or ('workers' in args and args.workers <= 1): - sudo[tunctl[['-t', args.tap, '-u', user, '-g', group]]]() - sudo[ifconfig[args.tap, NETWORK_INFRA_IP]]() - sudo[ip[['link', 'set', args.tap, 'up']]](retcode=(0, 1)) - else: - assert args.workers <= MAX_WORKERS, "Too many workers, can't configure network" - sudo[ip[['link', 'add', 'br0', 'type', 'bridge']]]() - sudo[ip[['addr', 'add', NETWORK_INFRA_IP, 'brd', '+', 'dev', 'br0']]]() - for _, ncfg in zip(range(0, args.workers), NETWORK_CONFIG): - sudo[tunctl[['-t', ncfg, '-u', user, '-g', group]]]() - sudo[ip[['link', 'set', ncfg, 'up']]](retcode=(0, 1)) - sudo[brctl[['addif', 'br0', ncfg]]]() - sudo[ip[['link', 'set', 'br0', 'up']]](retcode=(0, 1)) + + # figure out how many workers we have + workers = 1 + if 'workers' in args: + workers = args.workers + + # create the bridge + sudo[ip[['link', 'add', 'br0', 'type', 'bridge']]]() + sudo[ip[['addr', 'add', NETWORK_INFRA_IP, 'brd', '+', 'dev', 'br0']]]() + + # add a network interface for every worker there is + for _, ncfg in zip(range(0, workers), NETWORK_CONFIG): + sudo[tunctl[['-t', ncfg, '-u', user, '-g', group]]]() + sudo[ip[['link', 'set', ncfg, 'up']]](retcode=(0, 1)) + sudo[brctl[['addif', 'br0', ncfg]]]() + + # set the link up + sudo[ip[['link', 'set', 'br0', 'up']]](retcode=(0, 1)) + + sudo[brctl[['setageing', 'br0', 600]]]() def configure_dcm_scheduler(args): diff --git a/kernel/tests/dhcpd.conf b/kernel/tests/dhcpd.conf index 26e798afc..4e46f513c 100644 --- a/kernel/tests/dhcpd.conf +++ b/kernel/tests/dhcpd.conf @@ -5,11 +5,11 @@ option domain-name-servers ns1.example.org, ns2.example.org; ddns-update-style none; subnet 172.31.0.0 netmask 255.255.255.0 { - range 172.31.0.12 172.31.0.16; + range 172.31.0.118 172.31.0.118; option routers 172.31.0.20; option subnet-mask 255.255.255.0; - default-lease-time 1; - max-lease-time 1; + default-lease-time 1000; + max-lease-time 1000; } host nrk1 { @@ -20,4 +20,34 @@ host nrk1 { host nrk2 { hardware ethernet 56:b4:44:e9:62:d1; fixed-address 172.31.0.11; +} + +host nrk3 { + hardware ethernet 56:b4:44:e9:62:d2; + fixed-address 172.31.0.12; +} + +host nrk4 { + hardware ethernet 56:b4:44:e9:62:d3; + fixed-address 172.31.0.13; +} + +host nrk5 { + hardware ethernet 56:b4:44:e9:62:d4; + fixed-address 172.31.0.14; +} + +host nrk6 { + hardware ethernet 56:b4:44:e9:62:d5; + fixed-address 172.31.0.15; +} + +host nrk7 { + hardware ethernet 56:b4:44:e9:62:d6; + fixed-address 172.31.0.16; +} + +host nrk8 { + hardware ethernet 56:b4:44:e9:62:d7; + fixed-address 172.31.0.17; } \ No newline at end of file diff --git a/kernel/tests/s10_benchmarks.rs b/kernel/tests/s10_benchmarks.rs index f590ace52..17cba2dc0 100644 --- a/kernel/tests/s10_benchmarks.rs +++ b/kernel/tests/s10_benchmarks.rs @@ -22,6 +22,7 @@ use serde::Serialize; use testutils::builder::{BuildArgs, Machine}; use testutils::helpers::{setup_network, spawn_dhcpd, spawn_nrk, DHCP_ACK_MATCH}; +use testutils::memcached::{parse_memcached_output, MEMCACHED_MEM_SIZE_MB, MEMCACHED_NUM_QUERIES}; use testutils::redis::{redis_benchmark, REDIS_BENCHMARK, REDIS_START_MATCH}; use testutils::runner_args::{check_for_successful_exit, wait_for_sigterm, RunnerArgs}; @@ -56,7 +57,7 @@ fn s10_redis_benchmark_virtio() { output += dhcp_server.exp_string(DHCP_ACK_MATCH)?.as_str(); output += p.exp_string(REDIS_START_MATCH)?.as_str(); - std::thread::sleep(std::time::Duration::from_secs(9)); + std::thread::sleep(std::time::Duration::from_secs(20)); let mut redis_client = redis_benchmark("virtio", 2_000_000)?; @@ -96,7 +97,7 @@ fn s10_redis_benchmark_e1000() { output += p.exp_string(REDIS_START_MATCH)?.as_str(); use std::{thread, time}; - thread::sleep(time::Duration::from_secs(9)); + thread::sleep(time::Duration::from_secs(20)); let mut redis_client = redis_benchmark("e1000", 2_000_000)?; @@ -671,8 +672,7 @@ fn memcached_benchmark( #[cfg(not(feature = "baremetal"))] #[test] fn s10_memcached_benchmark() { - let _r = - which::which(MEMASLAP_BINARY).expect("memaslap not installed on host, test will fail!"); + let _r = which::which(MEMASLAP_BINARY).expect("memslap not installed on host, test will fail!"); let max_cores = 4; let threads = if cfg!(feature = "smoke") { @@ -693,6 +693,8 @@ fn s10_memcached_benchmark() { for nic in &["virtio", "e1000"] { for thread in threads.iter() { + println!("\n# Memcached with {} threads over {}", thread, nic); + let kernel_cmdline = format!("init=memcached.bin initargs={}", *thread); let cmdline = RunnerArgs::new_with_build("userspace-smp", &build) .memory(8192) @@ -716,12 +718,50 @@ fn s10_memcached_benchmark() { dhcp_server.exp_regex(DHCP_ACK_MATCH)?; - std::thread::sleep(std::time::Duration::from_secs(6)); - let mut memaslap = memcached_benchmark(nic, *thread, 10)?; + use std::{thread, time}; + let timeout = 15; + print!("waiting {timeout} seconds to give the server time to start up. "); + for _ in 0..timeout { + let _ = std::io::stdout().flush(); + thread::sleep(time::Duration::from_secs(1)); + print!(". ") + } + println!("\nstarting benchmark"); + match memcached_benchmark(nic, *thread, 10) { + Ok(mut s) => { + let _ = s.process.kill(SIGTERM)?; + println!("benchmark done."); + } + Err(e) => { + println!("benchmark failed."); + print!("\nnrk: "); + while let Some(c) = p.try_read() { + if c == '\n' { + print!("\nnrk: "); + } else { + print!("{}", c); + } + } + println!(); + match e.kind() { + ErrorKind::EOF(_r, s, _) => { + for l in s.lines() { + println!("memslap: {}", l); + } + } + ErrorKind::Timeout(_r, s, _) => { + for l in s.lines() { + println!("memslap: {}", l); + } + } + e => { + println!("Error: {:?}", e); + } + } + } + } dhcp_server.send_control('c')?; - memaslap.process.kill(SIGTERM)?; - p.process.kill(SIGTERM) }; @@ -731,6 +771,7 @@ fn s10_memcached_benchmark() { } #[test] +#[ignore] fn s10_leveldb_benchmark() { setup_network(1); @@ -834,9 +875,12 @@ fn s10_leveldb_benchmark() { } #[test] +#[ignore] fn s10_memcached_benchmark_internal() { setup_network(1); + let is_smoke = cfg!(feature = "smoke"); + let machine = Machine::determine(); let build = BuildArgs::default() .module("rkapps") @@ -850,7 +894,7 @@ fn s10_memcached_benchmark_internal() { // Throw out everything above 28 since we have some non-deterministic // bug on larger machines that leads to threads calling sched_yield and // no readrandom is performed... - .filter(|&t| t <= 28) + .filter(|&t| if is_smoke { t <= 10 } else { t <= 128 }) .collect(); // memcached arguments // currently not there. @@ -858,10 +902,11 @@ fn s10_memcached_benchmark_internal() { (16 * 1024 /* MB */, 16 /* MB */, 2000000, 300_000) } else { ( - 128 * 1024, /* MB */ - 32 * 1024, /* MB */ - 50000000, - 600_000, + // keep in sync with the s11_ra + std::cmp::max(8192, 4 * MEMCACHED_MEM_SIZE_MB), /* MB */ + MEMCACHED_MEM_SIZE_MB, + MEMCACHED_NUM_QUERIES, + std::cmp::max(60_000, MEMCACHED_NUM_QUERIES) as u64, ) }; @@ -874,8 +919,11 @@ fn s10_memcached_benchmark_internal() { } println!(); + let total_cores_per_node = core::cmp::max(1, machine.max_cores() / machine.max_numa_nodes()); for thread in threads.iter() { - println!("Running memcached internal benchmark with {thread} threads, {queries} GETs and {memsize}MB memory. "); + println!("\n\nRunning memcached internal benchmark with {thread} threads, {queries} GETs and {memsize}MB memory. "); + + let num_nodes = (thread + (total_cores_per_node - 1)) / total_cores_per_node; let kernel_cmdline = format!( r#"init=memcachedbench.bin initargs={} appcmd='--x-benchmark-mem={} --x-benchmark-queries={}'"#, @@ -884,8 +932,8 @@ fn s10_memcached_benchmark_internal() { let cmdline = RunnerArgs::new_with_build("userspace-smp", &build) .timeout(timeout) - .cores(machine.max_cores()) - .nodes(2) + .cores(*thread) + .nodes(num_nodes) .use_virtio() .memory(qemu_mem) .setaffinity(Vec::new()) @@ -899,70 +947,12 @@ fn s10_memcached_benchmark_internal() { output += dhcp_server.exp_string(DHCP_ACK_MATCH)?.as_str(); - // match the title + // somehow that needs to be here ??? let (prev, matched) = p.exp_regex(r#"INTERNAL BENCHMARK CONFIGURE"#)?; - - output += prev.as_str(); - output += matched.as_str(); - - // x_benchmark_mem = 10 MB - let (prev, matched) = p.exp_regex(r#"x_benchmark_mem = (\d+) MB"#)?; - println!("> {}", matched); - let b_mem = matched.replace("x_benchmark_mem = ", "").replace(" MB", ""); - output += prev.as_str(); output += matched.as_str(); - // number of threads: 3 - let (prev, matched) = p.exp_regex(r#"number of threads: (\d+)"#)?; - println!("> {}", matched); - let b_threads = matched.replace("number of threads: ", ""); - - output += prev.as_str(); - output += matched.as_str(); - - // number of keys: 131072 - let (prev, matched) = p.exp_regex(r#"number of keys: (\d+)"#)?; - println!("> {}", matched); - - output += prev.as_str(); - output += matched.as_str(); - - let (prev, matched) = p.exp_regex(r#"Executing (\d+) queries with (\d+) threads"#)?; - println!("> {}", matched); - - output += prev.as_str(); - output += matched.as_str(); - - // benchmark took 129 seconds - let (prev, matched) = p.exp_regex(r#"benchmark took (\d+) ms"#)?; - println!("> {}", matched); - let b_time = matched.replace("benchmark took ", "").replace(" ms", ""); - - output += prev.as_str(); - output += matched.as_str(); - - // benchmark took 7937984 queries / second - let (prev, matched) = p.exp_regex(r#"benchmark took (\d+) queries / second"#)?; - println!("> {}", matched); - let b_thpt = matched - .replace("benchmark took ", "") - .replace(" queries / second", ""); - - output += prev.as_str(); - output += matched.as_str(); - - let (prev, matched) = p.exp_regex(r#"benchmark executed (\d+)"#)?; - println!("> {}", matched); - let b_queries = matched - .replace("benchmark executed ", "") - .split(' ') - .next() - .unwrap() - .to_string(); - - output += prev.as_str(); - output += matched.as_str(); + let ret = parse_memcached_output(&mut p, *thread, &mut output)?; // Append parsed results to a CSV file let write_headers = !Path::new(file_name).exists(); @@ -981,7 +971,7 @@ fn s10_memcached_benchmark_internal() { assert!(r.is_ok()); let out = format!( "memcached,{},{},{},{},{}", - b_threads, b_mem, b_queries, b_time, b_thpt, + ret.b_threads, ret.b_mem, ret.b_queries, ret.b_time, ret.b_thpt ); let r = csv_file.write(out.as_bytes()); assert!(r.is_ok()); diff --git a/kernel/tests/s11_rackscale_benchmarks.rs b/kernel/tests/s11_rackscale_benchmarks.rs index 349a3326e..a2e58fa86 100644 --- a/kernel/tests/s11_rackscale_benchmarks.rs +++ b/kernel/tests/s11_rackscale_benchmarks.rs @@ -8,18 +8,29 @@ //! The naming scheme of the tests ensures a somewhat useful order of test //! execution taking into account the dependency chain: //! * `s11_*`: Rackscale (distributed) benchmarks +use std::env; use std::fs::OpenOptions; use std::io::Write; -use std::path::Path; +use std::path::{Path, PathBuf}; +use std::process::Command; +use std::time::Duration; use rexpect::errors::*; +use rexpect::session::spawn_command; use rexpect::session::PtySession; use testutils::builder::{BuildArgs, Machine}; use testutils::helpers::{DCMConfig, DCMSolver}; + use testutils::rackscale_runner::{RackscaleBench, RackscaleRun}; use testutils::runner_args::RackscaleTransport; +use testutils::memcached::{ + linux_spawn_memcached, parse_memcached_output, rackscale_memcached_checkout, + MemcachedShardedConfig, MEMCACHED_MEM_SIZE_MB, MEMCACHED_NUM_QUERIES, + RACKSCALE_MEMCACHED_CSV_COLUMNS, +}; + #[test] #[cfg(not(feature = "baremetal"))] fn s11_rackscale_shmem_fxmark_benchmark() { @@ -122,7 +133,7 @@ fn rackscale_fxmark_benchmark(transport: RackscaleTransport) { test.file_name = file_name.clone(); test.arg = Some(config); - fn cmd_fn(num_cores: usize, arg: Option) -> String { + fn cmd_fn(num_cores: usize, _num_clients: usize, arg: Option) -> String { // TODO: add in arg with formatting. //1XmixX0 is - mix benchmark for 0% writes with 1 open file let config = arg.expect("Missing fxmark config"); @@ -134,7 +145,7 @@ fn rackscale_fxmark_benchmark(transport: RackscaleTransport) { fn timeout_fn(num_cores: usize) -> u64 { 180_000 + 5_000 * num_cores as u64 } - fn mem_fn(num_cores: usize, is_smoke: bool) -> usize { + fn mem_fn(num_cores: usize, _num_clients: usize, is_smoke: bool) -> usize { if is_smoke { 8192 } else { @@ -289,7 +300,7 @@ fn rackscale_vmops_benchmark(transport: RackscaleTransport, benchtype: VMOpsBenc test.file_name = file_name.clone(); test.arg = Some(benchtype); - fn cmd_fn(num_cores: usize, _arg: Option) -> String { + fn cmd_fn(num_cores: usize, _num_clients: usize, _arg: Option) -> String { format!("initargs={}", num_cores) } fn baseline_timeout_fn(num_cores: usize) -> u64 { @@ -298,7 +309,7 @@ fn rackscale_vmops_benchmark(transport: RackscaleTransport, benchtype: VMOpsBenc fn rackscale_timeout_fn(num_cores: usize) -> u64 { 240_000 + 1_000 * num_cores as u64 } - fn mem_fn(num_cores: usize, is_smoke: bool) -> usize { + fn mem_fn(num_cores: usize, _num_clients: usize, is_smoke: bool) -> usize { if is_smoke { 8192 } else { @@ -420,7 +431,7 @@ fn s11_rackscale_shmem_leveldb_benchmark() { test.arg = Some(config); test.run_dhcpd_for_baseline = true; - fn cmd_fn(num_cores: usize, arg: Option) -> String { + fn cmd_fn(num_cores: usize, _num_clients: usize, arg: Option) -> String { let config = arg.expect("missing leveldb config"); format!( r#"init=dbbench.bin initargs={} appcmd='--threads={} --benchmarks=fillseq,readrandom --reads={} --num={} --value_size={}'"#, @@ -436,7 +447,7 @@ fn s11_rackscale_shmem_leveldb_benchmark() { 240_000 + 500 * num_cores as u64 } - fn mem_fn(num_cores: usize, is_smoke: bool) -> usize { + fn mem_fn(num_cores: usize, _num_clients: usize, is_smoke: bool) -> usize { if is_smoke { 8192 } else { @@ -467,12 +478,12 @@ struct MemcachedInternalConfig { #[test] #[cfg(not(feature = "baremetal"))] -fn s11_rackscale_memcached_benchmark_internal() { - rackscale_memcached_benchmark(RackscaleTransport::Shmem); +fn s11_rackscale_shmem_memcached_internal_benchmark() { + rackscale_memcached_internal_benchmark(RackscaleTransport::Shmem); } #[cfg(not(feature = "baremetal"))] -fn rackscale_memcached_benchmark(transport: RackscaleTransport) { +fn rackscale_memcached_internal_benchmark(transport: RackscaleTransport) { let is_smoke = cfg!(feature = "smoke"); let file_name = format!( @@ -481,6 +492,11 @@ fn rackscale_memcached_benchmark(transport: RackscaleTransport) { ); let _ignore = std::fs::remove_file(file_name.clone()); + let baseline_file_name = "rackscale_baseline_memcached_benchmark.csv"; + if cfg!(feature = "baseline") { + let _ignore = std::fs::remove_file(baseline_file_name.clone()); + } + let built = BuildArgs::default() .module("rkapps") .user_feature("rkapps:memcached-bench") @@ -491,7 +507,7 @@ fn rackscale_memcached_benchmark(transport: RackscaleTransport) { fn controller_match_fn( proc: &mut PtySession, output: &mut String, - _cores_per_client: usize, + cores_per_client: usize, num_clients: usize, file_name: &str, is_baseline: bool, @@ -502,67 +518,12 @@ fn rackscale_memcached_benchmark(transport: RackscaleTransport) { // match the title let (prev, matched) = proc.exp_regex(r#"INTERNAL BENCHMARK CONFIGURE"#)?; - *output += prev.as_str(); - *output += matched.as_str(); - - // x_benchmark_mem = 10 MB - let (prev, matched) = proc.exp_regex(r#"x_benchmark_mem = (\d+) MB"#)?; - println!("> {}", matched); - let b_mem = matched.replace("x_benchmark_mem = ", "").replace(" MB", ""); - - *output += prev.as_str(); - *output += matched.as_str(); - - // number of threads: 3 - let (prev, matched) = proc.exp_regex(r#"number of threads: (\d+)"#)?; - println!("> {}", matched); - let b_threads = matched.replace("number of threads: ", ""); - - *output += prev.as_str(); - *output += matched.as_str(); - - // number of keys: 131072 - let (prev, matched) = proc.exp_regex(r#"number of keys: (\d+)"#)?; - println!("> {}", matched); - - *output += prev.as_str(); - *output += matched.as_str(); - - let (prev, matched) = proc.exp_regex(r#"Executing (\d+) queries with (\d+) threads"#)?; - println!("> {}", matched); - - *output += prev.as_str(); - *output += matched.as_str(); - - // benchmark took 129 seconds - let (prev, matched) = proc.exp_regex(r#"benchmark took (\d+) ms"#)?; - println!("> {}", matched); - let b_time = matched.replace("benchmark took ", "").replace(" ms", ""); - - *output += prev.as_str(); - *output += matched.as_str(); - - // benchmark took 7937984 queries / second - let (prev, matched) = proc.exp_regex(r#"benchmark took (\d+) queries / second"#)?; - println!("> {}", matched); - let b_thpt = matched - .replace("benchmark took ", "") - .replace(" queries / second", ""); + println!("Configured. Waiting for benchmark to start..."); *output += prev.as_str(); *output += matched.as_str(); - let (prev, matched) = proc.exp_regex(r#"benchmark executed (\d+)"#)?; - println!("> {}", matched); - let b_queries = matched - .replace("benchmark executed ", "") - .split(" ") - .next() - .unwrap() - .to_string(); - - *output += prev.as_str(); - *output += matched.as_str(); + let ret = parse_memcached_output(proc, num_clients * cores_per_client, output)?; // Append parsed results to a CSV file let write_headers = !Path::new(file_name).exists(); @@ -572,18 +533,32 @@ fn rackscale_memcached_benchmark(transport: RackscaleTransport) { .open(file_name) .expect("Can't open file"); if write_headers { - let row = "git_rev,benchmark,nthreads,mem,queries,time,thpt,num_clients,num_replicas\n"; - let r = csv_file.write(row.as_bytes()); + let r = csv_file.write(RACKSCALE_MEMCACHED_CSV_COLUMNS.as_bytes()); assert!(r.is_ok()); } - let actual_num_clients = if is_baseline { 0 } else { num_clients }; + let os_name = if is_baseline { "nros" } else { "dinos" }; + let protocol = if is_baseline { + "internal" + } else if file_name.contains(&RackscaleTransport::Ethernet.to_string()) { + "tcp" + } else { + "shmem" + }; let r = csv_file.write(format!("{},", env!("GIT_HASH")).as_bytes()); assert!(r.is_ok()); + let out = format!( - "memcached,{},{},{},{},{},{},{}", - b_threads, b_mem, b_queries, b_time, b_thpt, actual_num_clients, num_clients + "memcached_internal,{},{},{},{},{},{},{},{}", + os_name, + protocol, + num_clients, + ret.b_threads, + ret.b_mem, + ret.b_queries, + ret.b_time, + ret.b_thpt ); let r = csv_file.write(out.as_bytes()); assert!(r.is_ok()); @@ -599,9 +574,11 @@ fn rackscale_memcached_benchmark(transport: RackscaleTransport) { mem_size: 16, } } else { + // keep in sync with the s10_memcached_benchmark_internal configuration + // and the s11_rackscale_memcached_benchmark_sharded configuration MemcachedInternalConfig { - num_queries: 1_000_000, // TODO(rackscale): should be 100_000_000, - mem_size: 16, // TODO(rackscale): should be 32_000, + num_queries: MEMCACHED_NUM_QUERIES, + mem_size: MEMCACHED_MEM_SIZE_MB, } }; @@ -616,7 +593,18 @@ fn rackscale_memcached_benchmark(transport: RackscaleTransport) { test.arg = Some(config); test.run_dhcpd_for_baseline = true; - fn cmd_fn(num_cores: usize, arg: Option) -> String { + if !is_smoke { + test.shmem_size = std::cmp::max( + MEMCACHED_MEM_SIZE_MB * 2, + testutils::helpers::SHMEM_SIZE * 2, + ); + } + + fn cmd_fn( + num_cores: usize, + _num_clients: usize, + arg: Option, + ) -> String { let config = arg.expect("missing leveldb config"); format!( r#"init=memcachedbench.bin initargs={} appcmd='--x-benchmark-mem={} --x-benchmark-queries={}'"#, @@ -629,19 +617,32 @@ fn rackscale_memcached_benchmark(transport: RackscaleTransport) { } fn rackscale_timeout_fn(num_cores: usize) -> u64 { - 600_000 + 6_000 * num_cores as u64 + if cfg!(feature = "smoke") { + 60_000 as u64 + } else { + ((MEMCACHED_MEM_SIZE_MB * 1000 / 10 + MEMCACHED_NUM_QUERIES / 1000)) as u64 + } } - fn mem_fn(num_cores: usize, is_smoke: bool) -> usize { + fn mem_fn(num_cores: usize, num_clients: usize, is_smoke: bool) -> usize { + let base_memory = if num_cores > 64 { 8192 } else { 4096 }; + if is_smoke { - 8192 + base_memory } else { // Memory must also be divisible by number of nodes, which could be 1, 2, 3, or 4 - core::cmp::max(8192, 1024 * (((((num_cores + 1) / 2) + 3 - 1) / 3) * 3)) + // memory = result of this function / num_clients - shmem_size + // (base_memory + // + std::cmp::max( + // MEMCACHED_MEM_SIZE_MB * 2, + // testutils::helpers::SHMEM_SIZE * 2, + // )) + // * num_clients + base_memory } } - let bench = RackscaleBench { + let mut bench = RackscaleBench { test, cmd_fn, baseline_timeout_fn, @@ -649,10 +650,12 @@ fn rackscale_memcached_benchmark(transport: RackscaleTransport) { mem_fn, }; + bench.run_bench(false, is_smoke); + if cfg!(feature = "baseline") { + bench.test.file_name = baseline_file_name.to_string(); bench.run_bench(true, is_smoke); } - bench.run_bench(false, is_smoke); } #[ignore] @@ -880,6 +883,501 @@ fn rackscale_memcached_dcm(transport: RackscaleTransport, dcm_config: Option PtySession { + Command::new("killall").args(&["memcached"]).status().ok(); + + let mut command = Command::new("taskset"); + command.arg("--cpu-list"); + command.arg(format!("0-{}", config.num_threads - 1).as_str()); + command.arg("./build/bin/memcached"); + command.arg(format!("--x-benchmark-queries={}", config.num_queries).as_str()); + command.arg(format!("--x-benchmark-mem={}", config.mem_size).as_str()); + command.current_dir(config.path.as_path()); + spawn_command(command, Some(timeout_ms)).expect("failed to spawn memcached") + } + + let file_name = "linux_memcached_sharded_benchmark.csv"; + + let _r = std::fs::remove_file(file_name); + + let mut csv_file = OpenOptions::new() + .append(true) + .create(true) + .open(file_name) + .expect("Can't open file"); + + let r = csv_file.write(RACKSCALE_MEMCACHED_CSV_COLUMNS.as_bytes()); + assert!(r.is_ok()); + + let machine = Machine::determine(); + let max_cores = if is_smoke { 2 } else { machine.max_cores() }; + let max_numa = machine.max_numa_nodes(); + let total_cores_per_node = core::cmp::max(1, max_cores / max_numa); + + // Do initial network configuration + let mut num_clients = 1; // num_clients == num_replicas, for baseline + let mut total_cores = 1; + while total_cores < max_cores { + // Round up to get the number of clients + let new_num_clients = (total_cores + (total_cores_per_node - 1)) / total_cores_per_node; + + // Do network setup if number of clients has changed. + if num_clients != new_num_clients { + num_clients = new_num_clients; + + // ensure total cores is divisible by num clients + total_cores = total_cores - (total_cores % num_clients); + } + let cores_per_client = total_cores / num_clients; + + // Break if not enough total cores for the controller, or if we would have to split controller across nodes to make it fit + // We want controller to have it's own socket, so if it's not a 1 socket machine, break when there's equal number of clients + // to numa nodes. + if total_cores + num_clients + 1 > machine.max_cores() + || num_clients == machine.max_numa_nodes() + && cores_per_client + num_clients + 1 > total_cores_per_node + || num_clients == max_numa && max_numa > 1 + { + break; + } + + eprintln!( + "\n\nRunning Sharded Memcached test with {:?} total core(s), {:?} (client|replica)(s) (cores_per_(client|replica)={:?})", + total_cores, num_clients, cores_per_client + ); + + // terminate any previous memcached + let _ = Command::new("killall") + .args(&["memcached", "-s", "SIGKILL"]) + .output(); + + // run the internal configuration + config.num_threads = total_cores; + + println!("Memcached Internal: {total_cores} cores"); + + let mut pty = run_benchmark_internal(&config, timeout_ms); + let mut output = String::new(); + let res = parse_memcached_output(&mut pty, config.num_threads, &mut output).expect("could not parse output!"); + let r = csv_file.write(format!("{},", env!("GIT_HASH")).as_bytes()); + assert!(r.is_ok()); + let out = format!( + "memcached_sharded,linux,{},{},{},{},{},{}\n", + res.b_threads, "internal", res.b_mem, res.b_queries, res.b_time, res.b_thpt, + ); + let r = csv_file.write(out.as_bytes()); + assert!(r.is_ok()); + + let r = pty + .process + .kill(SIGKILL) + .expect("unable to terminate memcached"); + + for protocol in &["tcp", "unix"] { + config.protocol = protocol; + config.num_servers = num_clients; + config.num_threads = cores_per_client; + + println!("Memcached Sharded: {cores_per_client}x{num_clients} with {protocol}"); + + // terminate the memcached instance + let _ = Command::new("killall") + .args(&["memcached", "-s", "SIGKILL"]) + .status(); + + // give some time so memcached can be cleaned up + std::thread::sleep(Duration::from_secs(5)); + + let mut memcached_ctrls = Vec::new(); + for i in 0..num_clients { + memcached_ctrls.push( + linux_spawn_memcached(i, &config, timeout_ms) + .expect("could not spawn memcached"), + ); + } + + config.num_threads = total_cores; + + let mut pty = testutils::memcached::spawn_loadbalancer(&config, timeout_ms) + .expect("failed to spawn load balancer"); + let mut output = String::new(); + use rexpect::errors::ErrorKind::Timeout; + match parse_memcached_output(&mut pty, config.num_threads, &mut output) { + Ok(res) => { + let r = csv_file.write(format!("{},", env!("GIT_HASH")).as_bytes()); + assert!(r.is_ok()); + let out = format!( + "memcached_sharded,linux,{},{},{},{},{},{}\n", + res.b_threads, protocol, res.b_mem, res.b_queries, res.b_time, res.b_thpt, + ); + let r = csv_file.write(out.as_bytes()); + assert!(r.is_ok()); + + println!("{:?}", res); + } + Err(e) => { + if let Timeout(expected, got, timeout) = e.0 { + println!("Timeout while waiting for {} ms\n", timeout.as_millis()); + println!("Expected: `{expected}`\n"); + println!("Got:",); + for l in got.lines().take(5) { + println!(" > {l}"); + } + } else { + println!("error: {}", e); + } + + let r = csv_file.write(format!("{},", env!("GIT_HASH")).as_bytes()); + assert!(r.is_ok()); + let out = format!( + "memcached_sharded,linux,{},{},failure,failure,failure,failure\n", + config.num_servers, protocol, + ); + let r = csv_file.write(out.as_bytes()); + assert!(r.is_ok()); + + for mc in memcached_ctrls.iter_mut() { + mc.process + .kill(rexpect::process::signal::Signal::SIGKILL) + .expect("couldn't terminate memcached"); + while let Ok(l) = mc.read_line() { + println!("MEMCACHED-OUTPUT: {}", l); + } + } + } + }; + + if total_cores == 1 { + total_cores = 0; + } + + if num_clients == 3 { + total_cores += 3; + } else { + total_cores += 4; + } + + let _ = pty.process.kill(rexpect::process::signal::Signal::SIGKILL); + } + } + + // terminate the memcached instance + let _ = Command::new("killall") + .args(&["memcached", "-s", "SIGKILL"]) + .status(); +} + +#[test] +#[ignore] +#[cfg(not(feature = "baremetal"))] +fn s11_rackscale_memcached_benchmark_sharded_nros() { + use rexpect::process::signal::Signal::SIGKILL; + + let out_dir_path = PathBuf::from(env!("CARGO_TARGET_TMPDIR")).join("sharded-memcached"); + let is_smoke = cfg!(feature = "smoke"); + + rackscale_memcached_checkout(env!("CARGO_TARGET_TMPDIR")); + + // stuff has been built, now we can run the benchmark + let mut config = if is_smoke { + MemcachedShardedConfig { + num_servers: 1, + num_queries: 100_000, + mem_size: 16, + protocol: "tcp", + is_local_host: true, + num_threads: 4, + path: out_dir_path, + } + } else { + // keep in sync with the s10_memcached_benchmark_internal configuration + MemcachedShardedConfig { + num_servers: 1, + num_queries: MEMCACHED_NUM_QUERIES, + mem_size: MEMCACHED_MEM_SIZE_MB, + protocol: "tcp", + is_local_host: true, + num_threads: 4, + path: out_dir_path, + } + }; + + // TODO: consolidate code with testutils::memcached::spawn_loadbalancer + fn spawn_loadbalancer(config: &MemcachedShardedConfig, timeout_ms: u64) -> Result { + let mut command = Command::new("./loadbalancer/loadbalancer"); + command.args(&["--binary"]); + command.arg(format!("--num-queries={}", config.num_queries).as_str()); + command.arg(format!("--num-threads={}", config.num_threads).as_str()); + command.arg(format!("--max-memory={}", config.mem_size).as_str()); + let mut servers = String::from("--servers="); + for i in 0..config.num_servers { + if i > 0 { + servers.push_str(","); + } + if config.protocol == "tcp" { + if config.is_local_host { + servers.push_str(format!("tcp://localhost:{}", 11211 + i).as_str()); + } else { + // +1 because tap0 is reserved for the controller. + let ip = 10 + i + 1; + servers.push_str(format!("tcp://172.31.0.{}:{}", ip, 11211).as_str()); + } + } else { + servers.push_str( + format!("unix://{}/memcached{}.sock", config.path.display(), i).as_str(), + ); + } + } + command.arg(servers.as_str()); + command.current_dir(config.path.as_path()); + + // give the servers some time to be spawned + std::thread::sleep(Duration::from_secs(5)); + + println!("Spawning Loadbalancer: \n $ `{:?}`", command); + + spawn_command(command, Some(timeout_ms)) + } + + let file_name = "memcached_benchmark_sharded_nros.csv"; + let _r = std::fs::remove_file(file_name); + + let mut csv_file = OpenOptions::new() + .append(true) + .create(true) + .open(file_name) + .expect("Can't open file"); + + let row = "git_rev,benchmark,os,nthreads,protocol,mem,queries,time,thpt\n"; + let r = csv_file.write(row.as_bytes()); + assert!(r.is_ok()); + + // run with NrOS as host + let built = BuildArgs::default() + .module("rkapps") + .user_feature("rkapps:memcached-bench") + .user_feature("rkapps:virtio") + .user_feature("libvibrio:virtio") + .kernel_feature("pages-4k") + .release() + .set_rackscale(false) + .build(); + + fn controller_run_fun( + config: Option<&MemcachedShardedConfig>, + num_servers: usize, + num_threads: usize, + timeout_ms: u64, + ) -> Result { + // here we should wait + std::thread::sleep(Duration::from_secs(15 + 2 * num_servers as u64)); + + let mut config = config.unwrap().clone(); + + config.num_servers = num_servers; + config.num_threads = num_servers * num_threads; + spawn_loadbalancer(&config, timeout_ms) + } + + fn controller_match_fn( + proc: &mut PtySession, + output: &mut String, + cores_per_client: usize, + num_clients: usize, + file_name: &str, + _is_baseline: bool, + _arg: Option, + ) -> Result<()> { + let mut csv_file = OpenOptions::new() + .append(true) + .create(true) + .open(file_name) + .expect("Can't open file"); + + use rexpect::errors::Error; + use rexpect::errors::ErrorKind::Timeout; + let res = match parse_memcached_output(proc, num_clients * cores_per_client, output) { + Ok(res) => res, + Err(Error(Timeout(expected, got, timeout), st)) => { + println!("Expected: `{expected}`\n"); + println!("Got:",); + for l in got.lines().take(5) { + println!(" > {l}"); + } + return Err(Error(Timeout(expected, got, timeout), st)); + } + Err(err) => { + // println!("Failed: {:?}", err); + return Err(err); + } + }; + + let r = csv_file.write(format!("{},", env!("GIT_HASH")).as_bytes()); + assert!(r.is_ok()); + let out = format!( + "memcached_sharded,nros,{},{},{},{},{},{}\n", + res.b_threads, "tcp", res.b_mem, res.b_queries, res.b_time, res.b_thpt, + ); + let r = csv_file.write(out.as_bytes()); + assert!(r.is_ok()); + + Ok(()) + } + + fn client_match_fn( + proc: &mut PtySession, + output: &mut String, + _cores_per_client: usize, + _num_clients: usize, + _file_name: &str, + _is_baseline: bool, + _arg: Option, + ) -> Result<()> { + match proc.exp_regex(r#"dhcp: vioif0: adding IP address (\d+).(\d+).(\d+).(\d+)/(\d+)"#) { + Ok((_prev, matched)) => { + println!(" > Networking setup succeeded. {matched}"); + } + Err(e) => { + println!(" > Networking setup failed. {e}"); + return Err(e); + } + } + + match proc.exp_regex(r#"INTERNAL BENCHMARK CONFIGURE"#) { + Ok((prev, matched)) => { + println!(" > Memcached started."); + *output += prev.as_str(); + *output += matched.as_str(); + } + Err(e) => { + println!(" > Memcached failed to start. {e}"); + return Err(e); + } + } + + let (prev, matched) = proc.exp_regex(r#"x_benchmark_mem = (\d+) MB"#).unwrap(); + println!("> {}", matched); + // let b_mem = matched.replace("x_benchmark_mem = ", "").replace(" MB", ""); + + *output += prev.as_str(); + *output += matched.as_str(); + Ok(()) + } + + config.is_local_host = false; + config.protocol = "tcp"; + + let mut test = RackscaleRun::new("userspace-smp".to_string(), built); + test.controller_match_fn = controller_match_fn; + test.controller_run_fn = Some(controller_run_fun); + test.client_match_fn = client_match_fn; + test.use_qemu_huge_pages = cfg!(feature = "affinity-shmem"); + test.file_name = file_name.to_string(); + test.arg = Some(config); + test.run_dhcpd_for_baseline = true; + test.is_multi_node = true; + test.shmem_size = 0; + + fn cmd_fn(num_cores: usize, num_clients: usize, arg: Option) -> String { + let config = arg.expect("missing configuration"); + let num_threads = num_cores / num_clients; + + format!( + r#"init=memcachedbench.bin initargs={num_threads} appcmd='--x-benchmark-no-run --disable-evictions --conn-limit=1024 --threads={num_threads} --x-benchmark-mem={} --memory-limit={}'"#, + config.mem_size * 2, + config.mem_size * 4 + ) + } + + fn baseline_timeout_fn(num_cores: usize) -> u64 { + 120_000 + 500 * num_cores as u64 + } + + fn rackscale_timeout_fn(num_cores: usize) -> u64 { + 1200_000 + 60_000 * num_cores as u64 + } + + fn mem_fn(_num_cores: usize, num_clients: usize, is_smoke: bool) -> usize { + if is_smoke { + 8192 + } else { + // Memory must also be divisible by number of nodes, which could be 1, 2, 3, or 4 + // mem = result of this function / num_clients - shmem_size + (8092 + + 2 * std::cmp::max( + MEMCACHED_MEM_SIZE_MB * 2, + testutils::helpers::SHMEM_SIZE * 2, + )) + * num_clients + } + } + + println!("----------------------------------------------------------"); + + let machine = Machine::determine(); + + let mut pings = Vec::new(); + for i in 0..machine.max_numa_nodes() { + let mut command = Command::new("ping"); + command.arg(&format!("172.31.0.{}", 10 + i + 1)); + + let proc = spawn_command(command, None).unwrap(); + pings.push(proc); + } + + // construct bench and run it! + let bench = RackscaleBench { + test, + cmd_fn, + baseline_timeout_fn, + rackscale_timeout_fn, + mem_fn, + }; + bench.run_bench(false, is_smoke); + for mut ping in pings.into_iter() { + if !ping.process.kill(SIGKILL).is_ok() { + println!("Failed to kill ping process"); + } + } +} + #[test] #[cfg(not(feature = "baremetal"))] fn s11_rackscale_monetdb_benchmark() { @@ -926,7 +1424,7 @@ fn rackscale_monetdb_benchmark(transport: RackscaleTransport) { test.arg = None; test.run_dhcpd_for_baseline = true; - fn cmd_fn(num_cores: usize, _arg: Option<()>) -> String { + fn cmd_fn(num_cores: usize, _num_clients: usize, _arg: Option<()>) -> String { format!( r#"init=monetdbd.bin initargs={} appcmd='create dbfarm'"#, num_cores @@ -941,7 +1439,7 @@ fn rackscale_monetdb_benchmark(transport: RackscaleTransport) { 180_000 + 500 * num_cores as u64 } - fn mem_fn(num_cores: usize, is_smoke: bool) -> usize { + fn mem_fn(num_cores: usize, _num_clients: usize, is_smoke: bool) -> usize { if is_smoke { 8192 } else { diff --git a/kernel/testutils/src/helpers.rs b/kernel/testutils/src/helpers.rs index 536df2c83..66c8cc781 100644 --- a/kernel/testutils/src/helpers.rs +++ b/kernel/testutils/src/helpers.rs @@ -17,7 +17,7 @@ use crate::runner_args::RunnerArgs; /// /// # Depends on /// - `tests/dhcpd.conf`: config file contains match of MAC to IP -pub const DHCP_ACK_MATCH: &'static str = "DHCPACK on 172.31.0.10 to 56:b4:44:e9:62:d0 via tap0"; +pub const DHCP_ACK_MATCH: &'static str = "DHCPACK on 172.31.0.10 to 56:b4:44:e9:62:d0 via br0"; pub const DHCP_ACK_MATCH_NRK2: &'static str = "DHCPACK on 172.31.0.11 to 56:b4:44:e9:62:d1 via br0"; /// Default shmem region size (in MB) @@ -214,7 +214,7 @@ pub fn spawn_dcm(cfg: Option) -> Result /// Spawns a DHCP server on our host using most common interface: tap0 pub fn spawn_dhcpd() -> Result { - spawn_dhcpd_with_interface("tap0".to_string()) + spawn_dhcpd_with_interface("br0".to_string()) } /// Spawns a DHCP server on our host diff --git a/kernel/testutils/src/lib.rs b/kernel/testutils/src/lib.rs index be8ccbcb0..af1e94825 100644 --- a/kernel/testutils/src/lib.rs +++ b/kernel/testutils/src/lib.rs @@ -10,6 +10,7 @@ extern crate serde; pub mod builder; pub mod helpers; +pub mod memcached; pub mod rackscale_runner; pub mod redis; pub mod runner_args; diff --git a/kernel/testutils/src/memcached.rs b/kernel/testutils/src/memcached.rs new file mode 100644 index 000000000..33f36afb9 --- /dev/null +++ b/kernel/testutils/src/memcached.rs @@ -0,0 +1,262 @@ +// Copyright © 2023 VMware, Inc. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 OR MIT + +use std::env; +use std::fs::remove_file; +use std::io::Write; +use std::path::PathBuf; +use std::process::Command; +use std::time::Duration; + +use rexpect::errors::*; +use rexpect::session::{spawn_command, PtySession}; + +pub const MEMCACHED_MEM_SIZE_MB: usize = 64 * 1024; // 64 * 1024; +pub const MEMCACHED_NUM_QUERIES: usize = 10_000_000; + +pub const RACKSCALE_MEMCACHED_CSV_COLUMNS: &str = + "git_rev,benchmark,os,protocol,npieces,nthreads,mem,queries,time,thpt\n"; + +#[derive(Clone)] +pub struct MemcachedShardedConfig { + pub num_servers: usize, + pub num_queries: usize, + pub is_local_host: bool, + pub mem_size: usize, + pub protocol: &'static str, + pub num_threads: usize, + pub path: PathBuf, +} + +#[derive(Clone, Debug)] +pub struct MemcachedResult { + pub b_threads: String, + pub b_mem: String, + pub b_queries: String, + pub b_time: String, + pub b_thpt: String, +} + +pub fn parse_memcached_output(p: &mut PtySession, num_threads: usize, output: &mut String) -> Result { + // x_benchmark_mem = 10 MB + let (prev, matched) = p.exp_regex(r#"x_benchmark_mem = (\d+) MB"#)?; + println!("> {}", matched); + let b_mem = matched.replace("x_benchmark_mem = ", "").replace(" MB", ""); + + *output += prev.as_str(); + *output += matched.as_str(); + + // number of threads: 3 + let (prev, matched) = p.exp_regex(r#"number of threads: (\d+)"#)?; + println!("> {}", matched); + let b_threads = matched.replace("number of threads: ", ""); + + *output += prev.as_str(); + *output += matched.as_str(); + + // number of keys: 131072 + let (prev, matched) = p.exp_regex(r#"number of keys: (\d+)"#)?; + println!("> {}", matched); + + *output += prev.as_str(); + *output += matched.as_str(); + + let (prev, matched) = p.exp_regex(r#"Executing (\d+) queries with (\d+) threads."#)?; + println!("> {}", matched); + + // benchmark took 129 seconds + let (prev, matched) = p.exp_regex(r#"benchmark took (\d+) ms"#)?; + println!("> {}", matched); + let b_time = matched.replace("benchmark took ", "").replace(" ms", ""); + + *output += prev.as_str(); + *output += matched.as_str(); + + // benchmark took 7937984 queries / second + let (prev, matched) = p.exp_regex(r#"benchmark took (\d+) queries / second"#)?; + println!("> {}", matched); + let b_thpt = matched + .replace("benchmark took ", "") + .replace(" queries / second", ""); + + *output += prev.as_str(); + *output += matched.as_str(); + + let (prev, matched) = p.exp_regex(r#"benchmark executed (\d+)"#)?; + println!("> {}", matched); + let b_queries = matched + .replace("benchmark executed ", "") + .split(" ") + .next() + .unwrap() + .to_string(); + + *output += prev.as_str(); + *output += matched.as_str(); + + if output.contains("MEMORY ALLOCATION FAILURE") { + println!("Detected memory allocation error in memcached output"); + Err("Memory allocation failure".into()) + } else { + Ok(MemcachedResult { + b_threads, + b_mem, + b_queries, + b_time, + b_thpt, + }) + } +} + +#[cfg(not(feature = "baremetal"))] +pub fn rackscale_memcached_checkout(tmpdir: &str) { + let out_dir_path = PathBuf::from(tmpdir).join("sharded-memcached"); + + let out_dir = out_dir_path.display().to_string(); + + println!("CARGO_TARGET_TMPDIR {:?}", out_dir); + + // clone abd build the benchmark + if !out_dir_path.is_dir() { + println!("RMDIR {:?}", out_dir_path); + Command::new(format!("rm",)) + .args(&["-rf", out_dir.as_str()]) + .status() + .unwrap(); + + println!("MKDIR {:?}", out_dir_path); + Command::new(format!("mkdir",)) + .args(&["-p", out_dir.as_str()]) + .status() + .unwrap(); + + println!("CLONE {:?}", out_dir_path); + let url = "https://github.com/achreto/memcached-bench.git"; + Command::new("git") + .args(&["clone", "--depth=1", url, out_dir.as_str()]) + .output() + .expect("failed to clone"); + } else { + Command::new("git") + .args(&["pull"]) + .current_dir(out_dir_path.as_path()) + .output() + .expect("failed to pull"); + } + + println!( + "CHECKOUT 0a4f217105d994d2ce438464041546ab4f4c4b2c {:?}", + out_dir + ); + + let res = Command::new("git") + .args(&["checkout", "0a4f217105d994d2ce438464041546ab4f4c4b2c"]) + .current_dir(out_dir_path.as_path()) + .output() + .expect("git checkout failed"); + if !res.status.success() { + std::io::stdout().write_all(&res.stdout).unwrap(); + std::io::stderr().write_all(&res.stderr).unwrap(); + panic!("git checkout failed!"); + } + + println!("BUILD {:?}", out_dir_path); + for (key, value) in env::vars() { + println!("{}: {}", key, value); + } + + let build_args = &["-j", "8"]; + + // now build the benchmark + let status = Command::new("make") + .args(build_args) + .current_dir(&out_dir_path) + .output() + .expect("Can't make app dir"); + + if !status.status.success() { + println!("BUILD FAILED"); + std::io::stdout().write_all(&status.stdout).unwrap(); + std::io::stderr().write_all(&status.stderr).unwrap(); + panic!("BUILD FAILED"); + } +} + +pub fn linux_spawn_memcached( + id: usize, + config: &MemcachedShardedConfig, + timeout_ms: u64, +) -> Result { + let con_info = if config.protocol == "tcp" { + format!("tcp://localhost:{}", 11212 + id) + } else { + let pathname = config.path.join(format!("memcached{id}.sock")); + if pathname.is_file() { + remove_file(pathname.clone()).expect("Failed to remove path"); // make sure the socket file is removed + } + format!("unix://{}", pathname.display()) + }; + + let mut command = Command::new("bash"); + + command.args(&[ + "scripts/spawn-memcached-process.sh", + id.to_string().as_str(), + con_info.as_str(), + (2 * config.mem_size).to_string().as_str(), + config.num_threads.to_string().as_str(), + ]); + command.current_dir(config.path.as_path()); + + println!("Spawning memcached:\n $ `{:?}`", command); + + let mut res = spawn_command(command, Some(timeout_ms))?; + std::thread::sleep(Duration::from_secs(1)); + + match res.exp_regex(r#"INTERNAL BENCHMARK CONFIGURE"#) { + Ok((_prev, _matched)) => { + println!(" $ OK."); + Ok(res) + } + Err(e) => { + println!(" $ FAILED. {}", e); + Err(e) + } + } +} + +pub fn spawn_loadbalancer(config: &MemcachedShardedConfig, timeout_ms: u64) -> Result { + let mut command = Command::new("./loadbalancer/loadbalancer"); + command.args(&["--binary"]); + command.arg(format!("--num-queries={}", config.num_queries).as_str()); + command.arg(format!("--num-threads={}", config.num_threads).as_str()); + command.arg(format!("--max-memory={}", config.mem_size).as_str()); + let mut servers = String::from("--servers="); + for i in 0..config.num_servers { + if i > 0 { + servers.push_str(","); + } + if config.protocol == "tcp" { + if config.is_local_host { + servers.push_str(format!("tcp://localhost:{}", 11212 + i).as_str()); + } else { + // +1 because tap0 is reserved for the controller. + let ip = 10 + i + 1; + servers.push_str(format!("tcp://172.31.0.{}:{}", ip, 11211).as_str()); + } + } else { + servers + .push_str(format!("unix://{}/memcached{}.sock", config.path.display(), i).as_str()); + } + } + command.arg(servers.as_str()); + command.current_dir(config.path.as_path()); + command.env("LD_LIBRARY_PATH", "build/lib"); + + // give the servers some time to be spawned + std::thread::sleep(Duration::from_secs(5)); + + println!("Spawning Loadbalancer: \n $ `{:?}`", command); + + spawn_command(command, Some(timeout_ms)) +} diff --git a/kernel/testutils/src/rackscale_runner.rs b/kernel/testutils/src/rackscale_runner.rs index cda022e43..c98aee880 100644 --- a/kernel/testutils/src/rackscale_runner.rs +++ b/kernel/testutils/src/rackscale_runner.rs @@ -1,3 +1,4 @@ +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::mpsc::{Receiver, Sender, TryRecvError}; use std::sync::{mpsc::channel, Arc, Mutex}; use std::thread; @@ -47,6 +48,13 @@ type RackscaleMatchFn = fn( arg: Option, ) -> Result<()>; +type ControllerRunFn = fn( + config: Option<&T>, + num_clients: usize, + num_threas: usize, + timeout_ms: u64, +) -> Result; + #[derive(Clone)] pub struct RackscaleRun where @@ -60,6 +68,8 @@ where pub controller_timeout: u64, /// Function that is called after the controller is spawned to match output of the controller process pub controller_match_fn: RackscaleMatchFn, + /// function to start the controller + pub controller_run_fn: Option>, /// Timeout for each client process pub client_timeout: u64, /// Amount of non-shmem QEMU memory given to each QEMU instance @@ -92,6 +102,8 @@ where pub use_qemu_huge_pages: bool, /// DCM config pub dcm_config: Option, + /// whether we're running in multi-node mode. + pub is_multi_node: bool, } impl RackscaleRun { @@ -112,6 +124,7 @@ impl RackscaleRun { RackscaleRun { controller_timeout: 60_000, controller_match_fn: blank_match_fn, + controller_run_fn: None, client_timeout: 60_000, client_match_fn: blank_match_fn, memory: 1024, @@ -130,10 +143,16 @@ impl RackscaleRun { run_dhcpd_for_baseline: false, use_qemu_huge_pages: false, dcm_config: None, + is_multi_node: false, } } pub fn run_rackscale(&self) { + if self.is_multi_node { + self.run_multi_node(); + return; + } + // Do not allow over provisioning let machine = Machine::determine(); assert!(self.cores_per_client * self.num_clients + 1 <= machine.max_cores()); @@ -418,6 +437,291 @@ impl RackscaleRun { controller_ret.unwrap(); } + pub fn run_multi_node(&self) { + // Do not allow over provisioning + let machine = Machine::determine(); + assert!(self.cores_per_client * self.num_clients + 1 <= machine.max_cores()); + let controller_cores = self.num_clients + 1; + + let mut vm_cores = vec![self.cores_per_client; self.num_clients + 1]; + vm_cores[0] = controller_cores; + let placement_cores = machine.rackscale_core_affinity(vm_cores); + + setup_network(self.num_clients + 1); + + // start the dhcp server + let mut dhcpd_server = crate::helpers::spawn_dhcpd_with_interface("br0".to_string()) + .expect("could not spawn dhcpd server"); + + let all_outputs = Arc::new(Mutex::new(Vec::new())); + + let (tx, rx) = channel(); + let rx_mut = Arc::new(Mutex::new(rx)); + let tx_mut = Arc::new(Mutex::new(tx)); + + let (tx_build_timer, _rx_build_timer) = channel(); + let tx_build_timer_mut = Arc::new(Mutex::new(tx_build_timer)); + + let boot_counter = Arc::new(AtomicUsize::new(0)); + + // Run client in separate thead. Wait a bit to make sure controller started + let mut client_procs = Vec::new(); + for i in 0..self.num_clients { + let client_output_array: Arc>> = all_outputs.clone(); + let client_rx = rx_mut.clone(); + let client_tx = tx_mut.clone(); + let client_kernel_test = self.kernel_test.clone(); + let client_file_name = self.file_name.clone(); + let client_cmd = self.cmd.clone(); + let client_placement_cores = placement_cores.clone(); + let client_boot_counter = boot_counter.clone(); + let state = self.clone(); + let client_tx_build_timer = tx_build_timer_mut.clone(); + let use_large_pages = self.use_qemu_huge_pages; + let client = std::thread::Builder::new() + .name(format!("Client{}", i + 1)) + .spawn(move || { + let mut cmdline_client = + RunnerArgs::new_with_build(&client_kernel_test, &state.built) + .timeout(state.client_timeout) + .use_virtio() + .tap(&format!("tap{}", (i + 1) * 2)) + .no_network_setup() + .cores(state.cores_per_client) + .memory(state.memory) + .nobuild() // Use single build for all for consistency + .cmd(&client_cmd) + .machine_id(0) // always hardcoded to 0 for the sharded case + .nodes(1) + .node_offset(client_placement_cores[i + 1].0) + .setaffinity(client_placement_cores[i + 1].1.clone()); + + if use_large_pages { + cmdline_client = cmdline_client.large_pages().prealloc(); + } + + let mut output = String::new(); + let qemu_run = || -> Result { + let mut p = spawn_nrk(&cmdline_client)?; + output += p.exp_string("NRK booting on")?.as_str(); + client_boot_counter.fetch_add(1, Ordering::SeqCst); + + // User-supplied function to check output + (state.client_match_fn)( + &mut p, + &mut output, + state.cores_per_client, + state.num_clients, + &client_file_name, + false, + state.arg, + )?; + + // Wait for controller to terminate + if !state.wait_for_client { + let rx = client_rx.lock().expect("Failed to get rx lock"); + let _ = wait_for_signal::<()>(&rx); + } + + let ret = p.process.kill(SIGTERM); + output += p.exp_eof()?.as_str(); + ret + }; + + // Could exit with 'success' or from sigterm, depending on number of clients. + let ret = qemu_run(); + + if ret.is_err() { + let tx = client_tx_build_timer + .lock() + .expect("Failed to get build timer lock"); + send_signal(&tx); + } + + if state.wait_for_client { + let tx = client_tx.lock().expect("Failed to get rx lock"); + send_signal(&tx); + } + + client_output_array + .lock() + .expect("Failed to get mutex to output array") + .push((format!("Client{}", i + 1), output)); + wait_for_sigterm_or_successful_exit_no_log( + &cmdline_client, + ret, + format!("Client{}", i + 1), + ); + }) + .expect("Client thread failed to spawn"); + + while i == boot_counter.load(Ordering::Relaxed) { + thread::sleep(Duration::from_millis(500)); + } + + client_procs.push(client); + } + + // Run controller in separate thread + let controller_output_array: Arc>> = all_outputs.clone(); + let controller_kernel_test = self.kernel_test.clone(); + let controller_rx = rx_mut.clone(); + let controller_tx = tx_mut.clone(); + let controller_file_name = self.file_name.clone(); + let controller_placement_cores = placement_cores.clone(); + let state = self.clone(); + let controller_tx_build_timer = tx_build_timer_mut.clone(); + let controller_run_fn = self.controller_run_fn.clone(); + let use_large_pages = self.use_qemu_huge_pages; + let controller_arg = self.arg.clone(); + let controller = std::thread::Builder::new() + .name("Controller".to_string()) + .spawn(move || { + let mut output = String::new(); + let ret = if let Some(run_fn) = controller_run_fn { + let qemu_run = || -> Result { + let mut p = run_fn( + controller_arg.as_ref(), + state.num_clients, + state.cores_per_client, + state.controller_timeout, + )?; + + // User-supplied function to check output + (state.controller_match_fn)( + &mut p, + &mut output, + state.cores_per_client, + state.num_clients, + &controller_file_name, + false, + state.arg, + )?; + + for _ in 0..state.num_clients { + if state.wait_for_client { + // Wait for signal from each client that it is done + let rx = controller_rx.lock().expect("Failed to get rx lock"); + let _ = wait_for_signal::<()>(&rx); + } + } + + let ret = p.process.kill(SIGTERM)?; + output += p.exp_eof()?.as_str(); + Ok(ret) + }; + qemu_run() + } else { + let mut cmdline_controller = + RunnerArgs::new_with_build(&controller_kernel_test, &state.built) + .timeout(state.controller_timeout) + .transport(state.transport) + .mode(RackscaleMode::Controller) + .tap("tap0") + .no_network_setup() + .workers(state.num_clients + 1) + .use_vmxnet3() + .memory(state.memory) + .nodes(1) + .cores(controller_cores) + .node_offset(controller_placement_cores[0].0) + .setaffinity(controller_placement_cores[0].1.clone()); + + if use_large_pages { + cmdline_controller = cmdline_controller.large_pages().prealloc(); + } + + let mut output = String::new(); + let qemu_run = || -> Result { + let mut p = spawn_nrk(&cmdline_controller)?; + + output += p.exp_string("CONTROLLER READY")?.as_str(); + { + let tx = controller_tx_build_timer + .lock() + .expect("Failed to get build timer lock"); + send_signal(&tx); + } + + // User-supplied function to check output + (state.controller_match_fn)( + &mut p, + &mut output, + state.cores_per_client, + state.num_clients, + &controller_file_name, + false, + state.arg, + )?; + + for _ in 0..state.num_clients { + if state.wait_for_client { + // Wait for signal from each client that it is done + let rx = controller_rx.lock().expect("Failed to get rx lock"); + let _ = wait_for_signal::<()>(&rx); + } + } + + let ret = p.process.kill(SIGTERM)?; + output += p.exp_eof()?.as_str(); + Ok(ret) + }; + qemu_run() + }; + + if ret.is_err() { + let tx = controller_tx_build_timer + .lock() + .expect("Failed to get build timer lock"); + send_signal(&tx); + } + + if !state.wait_for_client { + let tx = controller_tx.lock().expect("Failed to get tx lock"); + for _ in 0..state.num_clients { + // Notify each client it's okay to shutdown + send_signal(&tx); + } + } + + controller_output_array + .lock() + .expect("Failed to get mutex to output array") + .push((String::from("Controller"), output)); + + // This will only find sigterm, that's okay + wait_for_sigterm_or_successful_exit_no_log( + &RunnerArgs::new_with_build(&controller_kernel_test, &state.built), + ret, + String::from("Controller"), + ); + }) + .expect("Controller thread failed to spawn"); + + let mut client_rets = Vec::new(); + for client in client_procs { + client_rets.push(client.join()); + } + let controller_ret = controller.join(); + + dhcpd_server + .send_control('c') + .expect("could not terminate dhcp"); + + // If there's been an error, print everything + if controller_ret.is_err() || (&client_rets).into_iter().any(|ret| ret.is_err()) { + let outputs = all_outputs.lock().expect("Failed to get output lock"); + for (name, output) in outputs.iter() { + log_qemu_out_with_name(None, name.to_string(), output.to_string()); + } + } + + for client_ret in client_rets { + client_ret.unwrap(); + } + controller_ret.unwrap(); + } + pub fn run_baseline(&self) { // Here we assume run.num_clients == run.num_replicas (num nodes) // And the controller match function, timeout, memory will be used @@ -483,14 +787,14 @@ impl RackscaleRun { pub struct RackscaleBench { // Test to run pub test: RackscaleRun, - // Function to calculate the command. Takes as argument number of application cores - pub cmd_fn: fn(usize, Option) -> String, + // Function to calculate the command. Takes as argument number of application cores and the number of clients + pub cmd_fn: fn(usize, usize, Option) -> String, // Function to calculate the timeout. Takes as argument number of application cores pub rackscale_timeout_fn: fn(usize) -> u64, // Function to calculate the timeout. Takes as argument number of application cores pub baseline_timeout_fn: fn(usize) -> u64, // Function to calculate memory (excpeting controller memory). Takes as argument number of application cores and is_smoke - pub mem_fn: fn(usize, bool) -> usize, + pub mem_fn: fn(usize, usize, bool) -> usize, } impl RackscaleBench { @@ -499,12 +803,13 @@ impl RackscaleBench { // Set rackscale appropriately, rebuild if necessary. if !is_baseline != test_run.built.with_args.rackscale { - eprintln!("\tRebuilding with rackscale={}", !is_baseline,); + let is_rackscale = !is_baseline && !test_run.is_multi_node; + eprintln!("\tRebuilding with rackscale={}", is_rackscale); test_run.built = test_run .built .with_args .clone() - .set_rackscale(!is_baseline) + .set_rackscale(is_rackscale) .build(); } @@ -567,23 +872,26 @@ impl RackscaleBench { test_run.cores_per_client = cores_per_client; test_run.num_clients = num_clients; - // Set controller timeout for this test - test_run.controller_timeout = test_run.client_timeout; - // Calculate command based on the number of cores - test_run.cmd = (self.cmd_fn)(total_cores, test_run.arg.clone()); + test_run.cmd = (self.cmd_fn)(total_cores, num_clients, test_run.arg.clone()); // Caclulate memory and timeouts, and then run test if is_baseline { test_run.client_timeout = (self.baseline_timeout_fn)(total_cores); // Total client memory in test is: (mem_based_on_cores) + shmem_size * num_clients - test_run.memory = (self.mem_fn)(total_cores, is_smoke) + test_run.memory = (self.mem_fn)(total_cores, cores_per_client, is_smoke) + test_run.shmem_size * test_run.num_clients; + // Set controller timeout for this test + test_run.controller_timeout = test_run.client_timeout; + test_run.run_baseline(); } else { test_run.client_timeout = (self.rackscale_timeout_fn)(total_cores); - test_run.memory = (self.mem_fn)(total_cores, is_smoke) / test_run.num_clients; + test_run.memory = (self.mem_fn)(total_cores, cores_per_client, is_smoke); + + // Set controller timeout for this test + test_run.controller_timeout = test_run.client_timeout; test_run.run_rackscale(); } diff --git a/kernel/testutils/src/runner_args.rs b/kernel/testutils/src/runner_args.rs index c9b3c5562..df86d3db7 100644 --- a/kernel/testutils/src/runner_args.rs +++ b/kernel/testutils/src/runner_args.rs @@ -9,6 +9,9 @@ use rexpect::process::wait::WaitStatus; use crate::builder::{BuildArgs, Built, Machine}; use crate::ExitStatus; +/// defines the threshold on when the output is truncated. +const PRINT_NUM_LINES: usize = 100; + /// Different build modes for rackscale #[derive(Eq, PartialEq, Debug, Clone)] pub enum RackscaleMode { @@ -59,6 +62,8 @@ pub struct RunnerArgs<'a> { nobuild: bool, /// Parameters to add to the QEMU command line qemu_args: Vec<&'a str>, + /// the machine id to set + machine_id: Option, /// Timeout in ms pub timeout: Option, /// Default network interface for QEMU @@ -119,6 +124,7 @@ impl<'a> RunnerArgs<'a> { no_network_setup: false, mode: None, transport: None, + machine_id: None, }; if cfg!(feature = "prealloc") { @@ -156,6 +162,7 @@ impl<'a> RunnerArgs<'a> { no_network_setup: false, mode: None, transport: None, + machine_id: None, }; if cfg!(feature = "prealloc") { @@ -283,6 +290,11 @@ impl<'a> RunnerArgs<'a> { self } + pub fn machine_id(mut self, id: usize) -> RunnerArgs<'a> { + self.machine_id = Some(id); + self + } + pub fn shmem_size(mut self, sizes: Vec) -> RunnerArgs<'a> { self.shmem_sizes = sizes; self @@ -456,6 +468,10 @@ impl<'a> RunnerArgs<'a> { cmd.push(String::from("--kgdb")); } + if let Some(mid) = self.machine_id { + cmd.push(format!("--mid={mid}")); + } + // Don't run qemu, just build? if self.norun { cmd.push(String::from("--norun")); @@ -486,7 +502,21 @@ pub fn log_qemu_out(args: &RunnerArgs, output: String) { pub fn log_qemu_out_with_name(args: Option<&RunnerArgs>, name: String, output: String) { if !output.is_empty() { println!("\n===== QEMU LOG {}=====", name); - println!("{}", &output); + let num_lines = output.lines().count(); + + if num_lines > PRINT_NUM_LINES { + for l in output.lines().take(PRINT_NUM_LINES / 2) { + println!(" > {}", l); + } + println!(" > ... {} more lines\n", num_lines - PRINT_NUM_LINES); + for l in output.lines().skip(num_lines - PRINT_NUM_LINES / 2) { + println!(" > {}", l); + } + } else { + for l in output.lines() { + println!(" > {l}"); + } + } println!("===== END QEMU LOG {}=====", name); } if let Some(nrk_args) = args { @@ -607,7 +637,32 @@ pub fn wait_for_sigterm_or_successful_exit_no_log( } Err(e) => { log_qemu_args(args); - panic!("Qemu testing failed: {} {}", name, e); + println!("Qemu testing failed: {} ", name); + use rexpect::errors::ErrorKind::Timeout; + match e { + Error(Timeout(expected, got, _timeout), _st) => { + println!("Timeout"); + println!("Expected: `{expected}`\n"); + println!("Got:",); + let count = got.lines().count(); + if count > PRINT_NUM_LINES { + for l in got.lines().take(PRINT_NUM_LINES / 2) { + println!(" > {l}"); + } + println!(" > ... skipping {} more lines...", count - PRINT_NUM_LINES); + for l in got.lines().skip(count - PRINT_NUM_LINES / 2) { + println!(" > {l}"); + } + } else { + for l in got.lines() { + println!(" > {l}"); + } + } + } + _ => println!("{e}"), + } + + panic!("Qemu testing failed"); } e => { log_qemu_args(args); diff --git a/lib/lineup/src/mutex.rs b/lib/lineup/src/mutex.rs index 6e5bb0fb7..b2d9a86a7 100644 --- a/lib/lineup/src/mutex.rs +++ b/lib/lineup/src/mutex.rs @@ -109,11 +109,6 @@ struct MutexInner { impl MutexInner { fn try_enter(&self) -> bool { let tid = Environment::tid(); - assert!( - self.owner.get() != Some(tid), - "Locking mutex against itself." - ); - let counter = self.counter.load(Ordering::Relaxed); loop { if counter != 0 { diff --git a/lib/vibrio/src/rumprt/crt/mod.rs b/lib/vibrio/src/rumprt/crt/mod.rs index bb004d9c1..ff7e46a2c 100644 --- a/lib/vibrio/src/rumprt/crt/mod.rs +++ b/lib/vibrio/src/rumprt/crt/mod.rs @@ -238,6 +238,21 @@ pub extern "C" fn main() { ta_root_mode: u32, // mode_t ta_root_mode; } + #[repr(C)] + struct sockaddr_in { + sin_len: u8, + sin_family: u8, //typedef __uint8_t __sa_family_t; + sin_port: u16, // typedef __uint16_t __in_port_t; /* "Internet" port number */ + sin_addr: u32, // typedef __uint32_t __in_addr_t; /* IP(v4) address */ + zero: [u8; 8], + } + + #[repr(C)] + struct timespec_t { + tv_sec: i64, // time_t + tv_nsec: u64, // long + } + extern "C" { static __init_array_start: extern "C" fn(); static __init_array_end: extern "C" fn(); @@ -248,6 +263,18 @@ pub extern "C" fn main() { fn rump_pub_netconfig_dhcp_ipv4_oneshot(iface: *const i8) -> i64; fn _libc_init(); fn mount(typ: *const i8, path: *const i8, n: u64, args: *const tmpfs_args, argsize: usize); + + fn socket(domain: i64, typ: i64, protocol: i64) -> i64; + fn sendto( + fd: i64, + buf: *const i8, + len: usize, + flags: i64, + addr: *const sockaddr_in, + len: usize, + ) -> i64; + fn close(sock: i64) -> i64; + fn nanosleep(rqtp: *const timespec_t, rmtp: *mut timespec_t) -> i64; } unsafe { @@ -371,6 +398,58 @@ pub extern "C" fn main() { "rump_pub_netconfig_dhcp_ipv4_oneshot done in {:?}", start.elapsed() ); + + // HACK: send a message so things get initialized, otherwise we don't have + // connectivity. + + const AF_INET: i64 = 2; + const SOCK_DGRAM: i64 = 2; + const IPPROTO_UDP: i64 = 17; + const MSG_DONTWAIT: i64 = 0x0080; + let sockfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); + assert!(sockfd > 0); + info!("socket done in {:?}", start.elapsed()); + + for i in 0..10 { + info!("sending packet {} of 10 ({:?})", i, start.elapsed()); + let addr = sockaddr_in { + sin_len: core::mem::size_of::() as u8, + sin_family: AF_INET as u8, + sin_port: (8889 as u16).to_be(), + sin_addr: (2887712788 as u32).to_be(), // 172.31.0.20 + zero: [0; 8], + }; + + // not sure what this one does here + let _r = lineup::tls2::Environment::thread().relinquish(); + + use alloc::string::String; + let buf = String::from("package content\n\0"); + let cstr = CStr::from_bytes_with_nul(buf.as_str().as_bytes()).unwrap(); + + let r = sendto( + sockfd, + cstr.as_ptr() as *const i8, + buf.len(), + MSG_DONTWAIT, + &addr as *const sockaddr_in, + core::mem::size_of::(), + ); + assert_eq!(r, buf.len() as i64); + core::mem::forget(cstr); + + // Add some sleep time here, as otherwise + // we send the packet too fast and nothing appears on the other side + // it seems after 6s (pkt 6) things start working. + // I suspect it's due to some ARP resolution issue, but unclear. + let sleep_dur = timespec_t { + tv_sec: 1, + tv_nsec: 0, + }; + nanosleep(&sleep_dur as *const timespec_t, ptr::null_mut()); + } + // keep the socket open here... + // close(sockfd); } // Set up a garbage environment diff --git a/scripts/ci.bash b/scripts/ci.bash index a8776d39f..0881edeb5 100644 --- a/scripts/ci.bash +++ b/scripts/ci.bash @@ -16,7 +16,8 @@ rm -f leveldb_benchmark.csv rm -f rackscale_shmem_vmops_benchmark.csv rm -f rackscale_shmem_vmops_latency_benchmark.csv rm -f rackscale_shmem_fxmark_benchmark.csv -rm -f rackscale_shmem_memcached_benchmark.csv +rm -f rackscale_shmem_memcached_internal_benchmark.csv +rm -f linux_memcached_sharded_benchmark.csv # For vmops: --features prealloc can improve performance further (at the expense of test duration) RUST_TEST_THREADS=1 cargo test --test s10* -- s10_vmops_benchmark --nocapture @@ -29,7 +30,8 @@ RUST_TEST_THREADS=1 cargo test --test s10* -- s10_fxmark_bench --nocapture RUST_TEST_THREADS=1 cargo test --test s11* -- s11_rackscale_shmem_vmops_maptput_benchmark --nocapture RUST_TEST_THREADS=1 cargo test --test s11* -- s11_rackscale_shmem_vmops_maplat_benchmark --nocapture RUST_TEST_THREADS=1 cargo test --test s11* -- s11_rackscale_shmem_fxmark_bench --nocapture -RUST_TEST_THREADS=1 cargo test --test s11* -- s11_rackscale_memcached_benchmark_internal --nocapture +RUST_TEST_THREADS=1 cargo test --features baseline --test s11* -- s11_rackscale_shmem_memcached_internal_benchmark --nocapture +RUST_TEST_THREADS=1 cargo test --test s11* -- s11_linux_memcached_sharded_benchmark --nocapture # Clone repo rm -rf gh-pages @@ -61,7 +63,9 @@ if [ -d "${DEPLOY_DIR}" ]; then fi mkdir -p ${DEPLOY_DIR} mv memcached_benchmark_internal.csv ${DEPLOY_DIR} +mv linux_memcached_sharded_benchmark.csv ${DEPLOY_DIR} gzip ${DEPLOY_DIR}/memcached_benchmark_internal.csv +gzip ${DEPLOY_DIR}/linux_memcached_sharded_benchmark.csv # Copy vmops results DEPLOY_DIR="gh-pages/vmops/${CI_MACHINE_TYPE}/${GIT_REV_CURRENT}/" @@ -132,8 +136,8 @@ if [ -d "${DEPLOY_DIR}" ]; then DEPLOY_DIR=${DEPLOY_DIR}${DATE_PREFIX} fi mkdir -p ${DEPLOY_DIR} -mv rackscale_shmem_memcached_benchmark.csv ${DEPLOY_DIR} -gzip ${DEPLOY_DIR}/rackscale_shmem_memcached_benchmark.csv +mv rackscale_shmem_memcached_internal_benchmark.csv ${DEPLOY_DIR} +gzip ${DEPLOY_DIR}/rackscale_shmem_memcached_internal_benchmark.csv # Update CI history plots python3 gh-pages/_scripts/ci_history.py --append --machine $CI_MACHINE_TYPE diff --git a/usr/rkapps/Cargo.toml b/usr/rkapps/Cargo.toml index 88da3ff81..9038f5c94 100644 --- a/usr/rkapps/Cargo.toml +++ b/usr/rkapps/Cargo.toml @@ -20,4 +20,5 @@ nginx = [] leveldb-bench = [] memcached-bench = [] monetdb = [] +virtio = ["vibrio/virtio"] diff --git a/usr/rkapps/build.rs b/usr/rkapps/build.rs index 534bbdd80..16d31f70b 100644 --- a/usr/rkapps/build.rs +++ b/usr/rkapps/build.rs @@ -133,16 +133,16 @@ fn main() { println!("CLONE {:?}", out_dir); let url = "https://github.com/gz/librettos-packages.git"; Command::new("git") - .args(&["clone", "--depth=1", url, out_dir.as_str()]) + .args(&["clone", url, out_dir.as_str()]) .status() .unwrap(); println!( - "CHECKOUT be303d8bfc2c40d63704848bb3acd9e075dd61e4 {:?}", + "CHECKOUT 161e05606915fb9a29c8387db8702e16f85b8806 {:?}", out_dir ); Command::new("git") - .args(&["checkout", "be303d8bfc2c40d63704848bb3acd9e075dd61e4"]) + .args(&["checkout", "161e05606915fb9a29c8387db8702e16f85b8806"]) .current_dir(&Path::new(&out_dir)) .status() .unwrap();