Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 116 additions & 15 deletions hyperactor_mesh/src/alloc/logtailer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@ use std::sync::Mutex;
use tokio::io;
use tokio::io::AsyncBufReadExt;
use tokio::io::AsyncRead;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWrite;
use tokio::io::AsyncWriteExt;
use tokio::io::BufReader;

/// Maximum byte size of a single log line before truncation
const MAX_BYTE_SIZE_LOG_LINE: usize = 256 * 1024;

/// A tailer (ring buffer) of (text) log lines.
pub struct LogTailer {
state: Arc<Mutex<State>>,
Expand All @@ -34,6 +38,21 @@ struct State {
}

impl LogTailer {
/// Helper method to push a line to the ring buffer
fn push_line_to_buffer(state: &Arc<Mutex<State>>, buffer: &mut String, max: usize) {
while buffer.ends_with('\n') {
buffer.pop();
}
let mut locked = state.lock().unwrap();
let next = locked.next;
if next < locked.lines.len() {
swap(&mut locked.lines[next], buffer);
} else {
locked.lines.push(buffer.clone());
}
locked.next = (next + 1) % max;
}

/// Create a new tailer given a `stream`. The tailer tails the reader in the
/// background, while keeping at most `max` log lines in its buffer. The tailer
/// stops when the stream is ended (i.e., returns an EOF).
Expand All @@ -60,25 +79,52 @@ impl LogTailer {
// and make this awaitable, etc
let handle = tokio::spawn(async move {
let mut reader = BufReader::new(stream);
let mut skip_until_newline = false;
let mut buffer = String::new();
loop {
buffer.clear(); // clear retains the buffer
// TODO: we should probably limit line length
if reader.read_line(&mut buffer).await? == 0 {
// this gives at most a reference to 8KB of data in the internal buffer
// based on internal implementation of BufReader's `DEFAULT_BUF_SIZE`
let reader_buf = reader.fill_buf().await?;

if reader_buf.is_empty() {
// EOF reached, write any remaining buffer content as a line
if !buffer.is_empty() {
tee.write_all(buffer.as_bytes()).await?;
Self::push_line_to_buffer(&state, &mut buffer, max);
buffer.clear()
}
break Ok(());
}
let _ = tee.write_all(buffer.as_bytes()).await;
while buffer.ends_with('\n') {
buffer.pop();
}
let mut locked = state.lock().unwrap();
let next = locked.next;
if next < locked.lines.len() {
swap(&mut locked.lines[next], &mut buffer);
} else {
locked.lines.push(buffer.clone());

let mut _consumed = 0;
for &b in reader_buf {
_consumed += 1;
if skip_until_newline {
if b == b'\n' {
skip_until_newline = false;
}
continue;
}

buffer.push(char::from(b));
if b == b'\n' {
// End of line reached, write buffer to state
if !buffer.is_empty() {
tee.write_all(buffer.as_bytes()).await?;
Self::push_line_to_buffer(&state, &mut buffer, max);
buffer.clear();
}
} else if buffer.len() == MAX_BYTE_SIZE_LOG_LINE {
buffer.push_str("<TRUNCATED>\n");
skip_until_newline = true;
tee.write_all(buffer.as_bytes()).await?;
Self::push_line_to_buffer(&state, &mut buffer, max);

buffer.clear();
}
}
locked.next = (next + 1) % max;
// commit consumed bytes
reader.consume(_consumed);
}
});

Expand All @@ -94,7 +140,6 @@ impl LogTailer {
lines.rotate_left(next);
lines
}

/// Abort the tailer. This will stop any ongoing reads, and drop the
/// stream. Abort is complete after `join` returns.
pub fn abort(&self) {
Expand Down Expand Up @@ -145,6 +190,62 @@ mod tests {
assert_eq!(lines.next_line().await.unwrap().unwrap(), "world");
}

#[tokio::test]
async fn test_line_truncation() {
// Create input with 3 MAX_BYTE_SIZE_LOG_LINE-byte lines
let mut input_bytes = Vec::new();
// first line is exactly `MAX_BYTE_SIZE_LOG_LINE` bytes including `\n`
input_bytes.extend(vec![b'a'; MAX_BYTE_SIZE_LOG_LINE - 1]);
input_bytes.extend([b'\n']);

// second line is more than `MAX_BYTE_SIZE_LOG_LINE` bytes including `\n`
input_bytes.extend(vec![b'b'; MAX_BYTE_SIZE_LOG_LINE]);
input_bytes.extend([b'\n']);

// last line of the input stream is < `MAX_BYTE_SIZE_LOG_LINE` bytes to ensure complete flush
input_bytes.extend(vec![b'c'; MAX_BYTE_SIZE_LOG_LINE - 1]);

let reader = Cursor::new(input_bytes);

let (lines, result) = LogTailer::new(5, reader).join().await;
assert!(result.is_ok());

// Should have 3 lines
assert_eq!(lines.len(), 3);

// First line should be MAX_BYTE_SIZE_LOG_LINE-1 'a's
assert_eq!(
lines[0],
format!("{}", "a".repeat(MAX_BYTE_SIZE_LOG_LINE - 1))
);

// Second line should be `MAX_BYTE_SIZE_LOG_LINE` 'b's + "<TRUNCATED>"
assert_eq!(
lines[1],
format!("{}<TRUNCATED>", "b".repeat(MAX_BYTE_SIZE_LOG_LINE))
);

// last line before stream closes should be MAX_BYTE_SIZE_LOG_LINE-1 c's
assert_eq!(lines[2], "c".repeat(MAX_BYTE_SIZE_LOG_LINE - 1));
}

#[tokio::test]
async fn test_ring_buffer_behavior() {
let input = "line1\nline2\nline3\nline4\nline5\nline6\nline7\n";
let reader = Cursor::new(input.as_bytes());
let max_lines = 3; // Small ring buffer for easy testing

let (lines, result) = LogTailer::new(max_lines, reader).join().await;
assert!(result.is_ok());

// Should only have the last 3 lines (ring buffer behavior)
// Lines 1-4 should be overwritten (lost due to ring buffer)
assert_eq!(lines.len(), 3);
assert_eq!(lines[0], "line5"); // oldest in current buffer
assert_eq!(lines[1], "line6"); // middle
assert_eq!(lines[2], "line7"); // newest
}

#[tokio::test]
async fn test_streaming_logtailer() {
let (reader, mut writer) = tokio::io::simplex(1);
Expand Down