Skip to content
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 65 additions & 11 deletions src/diskio/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,22 +51,24 @@
// loss or errors in this model.
// f) data gathering: record (name, bytes, start, duration)
// write to disk afterwards as a csv file?
pub(crate) mod immediate;
#[cfg(test)]
mod test;
pub(crate) mod threaded;
use threaded::PoolReference;

use std::io::{self, Write};
use std::ops::{Deref, DerefMut};
use std::path::{Path, PathBuf};
use std::sync::OnceLock;
use std::sync::mpsc::Receiver;
use std::time::{Duration, Instant};
use std::{fmt::Debug, fs::OpenOptions};

use anyhow::Result;
use tracing::{error, trace, warn};

use crate::process::Process;
use crate::utils::units::Size;

pub(crate) mod immediate;
#[cfg(test)]
mod test;
pub(crate) mod threaded;
use threaded::PoolReference;

/// Carries the implementation specific data for complete file transfers into the executor.
#[derive(Debug)]
Expand Down Expand Up @@ -443,11 +445,63 @@ pub(crate) fn create_dir<P: AsRef<Path>>(path: P) -> io::Result<()> {
/// Get the executor for disk IO.
pub(crate) fn get_executor<'a>(
ram_budget: usize,
process: &Process,
) -> Result<Box<dyn Executor + 'a>> {
io_thread_count: usize,
) -> Box<dyn Executor + 'a> {
// If this gets lots of use, consider exposing via the config file.
Ok(match process.io_thread_count()? {
match io_thread_count {
0 | 1 => Box::new(immediate::ImmediateUnpacker::new()),
n => Box::new(threaded::Threaded::new(n, ram_budget)),
})
}
}

pub(crate) fn unpack_ram(io_chunk_size: usize, budget: Option<usize>) -> usize {
const RAM_ALLOWANCE_FOR_RUSTUP_AND_BUFFERS: usize = 200 * 1024 * 1024;
let minimum_ram = io_chunk_size * 2;

let default_max_unpack_ram = match effective_limits::memory_limit() {
Ok(effective)
if effective as usize > minimum_ram + RAM_ALLOWANCE_FOR_RUSTUP_AND_BUFFERS =>
{
effective as usize - RAM_ALLOWANCE_FOR_RUSTUP_AND_BUFFERS
}
Ok(_) => minimum_ram,
Err(error) => {
error!("can't determine memory limit: {error}");
minimum_ram
}
};

let unpack_ram = match budget {
Some(budget) => {
if budget < minimum_ram {
warn!(
"Ignoring RUSTUP_UNPACK_RAM ({}) less than minimum of {}.",
budget, minimum_ram
);
minimum_ram
} else if budget > default_max_unpack_ram {
warn!(
"Ignoring RUSTUP_UNPACK_RAM ({}) greater than detected available RAM of {}.",
budget, default_max_unpack_ram
);
default_max_unpack_ram
} else {
budget
}
}
None => {
if RAM_NOTICE_SHOWN.set(()).is_ok() {
trace!(size = %Size::new(default_max_unpack_ram), "unpacking components in memory");
}
default_max_unpack_ram
}
};

if minimum_ram > unpack_ram {
panic!("RUSTUP_UNPACK_RAM must be larger than {minimum_ram}");
} else {
unpack_ram
}
}

static RAM_NOTICE_SHOWN: OnceLock<()> = OnceLock::new();
6 changes: 4 additions & 2 deletions src/diskio/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ fn test_incremental_file(io_threads: &str) -> Result<()> {

let mut written = 0;
let mut file_finished = false;
let mut io_executor: Box<dyn Executor> = get_executor(32 * 1024 * 1024, &tp.process)?;
let mut io_executor: Box<dyn Executor> =
get_executor(32 * 1024 * 1024, tp.process.io_thread_count()?);
let (item, mut sender) = Item::write_file_segmented(
work_dir.path().join("scratch"),
0o666,
Expand Down Expand Up @@ -90,7 +91,8 @@ fn test_complete_file(io_threads: &str) -> Result<()> {
vars.insert("RUSTUP_IO_THREADS".to_string(), io_threads.to_string());
let tp = TestProcess::with_vars(vars);

let mut io_executor: Box<dyn Executor> = get_executor(32 * 1024 * 1024, &tp.process)?;
let mut io_executor: Box<dyn Executor> =
get_executor(32 * 1024 * 1024, tp.process.io_thread_count()?);
let mut chunk = io_executor.get_buffer(10);
chunk.extend(b"0123456789");
assert_eq!(chunk.len(), 10);
Expand Down
9 changes: 2 additions & 7 deletions src/dist/component/components.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use crate::dist::component::package::{INSTALLER_VERSION, VERSION_FILE};
use crate::dist::component::transaction::Transaction;
use crate::dist::prefix::InstallPrefix;
use crate::errors::RustupError;
use crate::process::Process;
use crate::utils;

const COMPONENTS_FILE: &str = "components";
Expand Down Expand Up @@ -255,18 +254,14 @@ impl Component {
}
Ok(result)
}
pub fn uninstall<'a>(
&self,
mut tx: Transaction<'a>,
process: &Process,
) -> Result<Transaction<'a>> {
pub fn uninstall<'a>(&self, mut tx: Transaction<'a>) -> Result<Transaction<'a>> {
// Update components file
let path = self.components.rel_components_file();
let abs_path = self.components.prefix.abs_path(&path);
let temp = tx.temp().new_file()?;
utils::filter_file("components", &abs_path, &temp, |l| l != self.name)?;
tx.modify_file(path)?;
utils::rename("components", &temp, &abs_path, process)?;
utils::rename("components", &temp, &abs_path, tx.permit_copy_rename)?;

// TODO: If this is the last component remove the components file
// and the version file.
Expand Down
112 changes: 25 additions & 87 deletions src/dist/component/package.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,18 @@ use std::io::{self, ErrorKind as IOErrorKind, Read};
use std::mem;
use std::ops::Deref;
use std::path::{Path, PathBuf};
use std::sync::OnceLock;

use anyhow::{Context, Result, anyhow, bail};
use tar::EntryType;
use tracing::{error, trace, warn};
use tracing::warn;

use crate::diskio::{CompletedIo, Executor, FileBuffer, IO_CHUNK_SIZE, Item, Kind, get_executor};
use crate::diskio::{CompletedIo, Executor, FileBuffer, IO_CHUNK_SIZE, Item, Kind};
use crate::dist::component::components::{ComponentPart, ComponentPartKind, Components};
use crate::dist::component::transaction::Transaction;
use crate::dist::download::DownloadCfg;
use crate::dist::manifest::CompressionKind;
use crate::dist::temp;
use crate::errors::RustupError;
use crate::utils;
use crate::utils::units::Size;

/// The current metadata revision used by rust-installer
pub(crate) const INSTALLER_VERSION: &str = "3";
Expand All @@ -38,24 +35,35 @@ impl DirectoryPackage<temp::Dir> {
pub(crate) fn compressed<R: Read>(
stream: R,
kind: CompressionKind,
dl_cfg: &DownloadCfg<'_>,
temp_dir: temp::Dir,
io_executor: Box<dyn Executor>,
) -> Result<Self> {
match kind {
CompressionKind::GZip => Self::from_tar(flate2::read::GzDecoder::new(stream), dl_cfg),
CompressionKind::ZStd => {
Self::from_tar(zstd::stream::read::Decoder::new(stream)?, dl_cfg)
CompressionKind::GZip => {
Self::from_tar(flate2::read::GzDecoder::new(stream), temp_dir, io_executor)
}
CompressionKind::ZStd => Self::from_tar(
zstd::stream::read::Decoder::new(stream)?,
temp_dir,
io_executor,
),
CompressionKind::XZ => {
Self::from_tar(xz2::read::XzDecoder::new(stream), temp_dir, io_executor)
}
CompressionKind::XZ => Self::from_tar(xz2::read::XzDecoder::new(stream), dl_cfg),
}
}

fn from_tar(stream: impl Read, dl_cfg: &DownloadCfg<'_>) -> Result<Self> {
let temp_dir = dl_cfg.tmp_cx.new_directory()?;
fn from_tar(
stream: impl Read,
temp_dir: temp::Dir,
io_executor: Box<dyn Executor>,
) -> Result<Self> {
let mut archive = tar::Archive::new(stream);

// The rust-installer packages unpack to a directory called
// $pkgname-$version-$target. Skip that directory when
// unpacking.
unpack_without_first_dir(&mut archive, &temp_dir, dl_cfg)
unpack_without_first_dir(&mut archive, &temp_dir, io_executor)
.context("failed to extract package")?;

Self::new(temp_dir, false)
Expand Down Expand Up @@ -144,64 +152,6 @@ impl<P: Deref<Target = Path>> DirectoryPackage<P> {
}
}

// Probably this should live in diskio but ¯\_(ツ)_/¯
fn unpack_ram(
io_chunk_size: usize,
effective_max_ram: Option<usize>,
dl_cfg: &DownloadCfg<'_>,
) -> usize {
const RAM_ALLOWANCE_FOR_RUSTUP_AND_BUFFERS: usize = 200 * 1024 * 1024;
let minimum_ram = io_chunk_size * 2;
let default_max_unpack_ram = if let Some(effective_max_ram) = effective_max_ram {
if effective_max_ram > minimum_ram + RAM_ALLOWANCE_FOR_RUSTUP_AND_BUFFERS {
effective_max_ram - RAM_ALLOWANCE_FOR_RUSTUP_AND_BUFFERS
} else {
minimum_ram
}
} else {
// Rustup does not know how much RAM the machine has: use the minimum
minimum_ram
};
let unpack_ram = match dl_cfg
.process
.var("RUSTUP_UNPACK_RAM")
.ok()
.and_then(|budget_str| budget_str.parse::<usize>().ok())
{
Some(budget) => {
if budget < minimum_ram {
warn!(
"Ignoring RUSTUP_UNPACK_RAM ({}) less than minimum of {}.",
budget, minimum_ram
);
minimum_ram
} else if budget > default_max_unpack_ram {
warn!(
"Ignoring RUSTUP_UNPACK_RAM ({}) greater than detected available RAM of {}.",
budget, default_max_unpack_ram
);
default_max_unpack_ram
} else {
budget
}
}
None => {
if RAM_NOTICE_SHOWN.set(()).is_ok() {
trace!(size = %Size::new(default_max_unpack_ram), "unpacking components in memory");
}
default_max_unpack_ram
}
};

if minimum_ram > unpack_ram {
panic!("RUSTUP_UNPACK_RAM must be larger than {minimum_ram}");
} else {
unpack_ram
}
}

static RAM_NOTICE_SHOWN: OnceLock<()> = OnceLock::new();

/// Handle the async result of io operations
/// Replaces op.result with Ok(())
fn filter_result(op: &mut CompletedIo) -> io::Result<()> {
Expand Down Expand Up @@ -274,19 +224,9 @@ enum DirStatus {
fn unpack_without_first_dir<R: Read>(
archive: &mut tar::Archive<R>,
path: &Path,
dl_cfg: &DownloadCfg<'_>,
mut io_executor: Box<dyn Executor>,
) -> Result<()> {
let entries = archive.entries()?;
let effective_max_ram = match effective_limits::memory_limit() {
Ok(ram) => Some(ram as usize),
Err(error) => {
error!("can't determine memory limit: {error}");
None
}
};
let unpack_ram = unpack_ram(IO_CHUNK_SIZE, effective_max_ram, dl_cfg);
let mut io_executor: Box<dyn Executor> = get_executor(unpack_ram, dl_cfg.process)?;

let mut directories: HashMap<PathBuf, DirStatus> = HashMap::new();
// Path is presumed to exist. Call it a precondition.
directories.insert(path.to_owned(), DirStatus::Exists);
Expand Down Expand Up @@ -444,13 +384,11 @@ fn unpack_without_first_dir<R: Read>(
None => {
// Tar has item before containing directory
// Complain about this so we can see if these exist.
use std::io::Write as _;
writeln!(
dl_cfg.process.stderr().lock(),
"Unexpected: missing parent '{}' for '{}'",
warn!(
"unexpected: missing parent '{}' for '{}'",
parent.display(),
entry.path()?.display()
)?;
);
directories.insert(parent.to_owned(), DirStatus::Pending(vec![item]));
item = Item::make_dir(parent.to_owned(), 0o755);
// Check the parent's parent
Expand Down
Loading
Loading