Skip to content

Commit 6f69c5a

Browse files
pankit-engfacebook-github-bot
authored andcommitted
Fix LogTailer's unbounded mem usage
Summary: **Problem**: In the current implementation, LogTailer uses string that elastically grows with log line size. This is because it uses buffer String object in the tee operation. LogTailer being the underlying implementation for piping use code's stduot/stderr is currently **prone to bad user actor code leading to unbounded mem usage**. And once the string buffer has grown to a given size, it remains at the same size leading to inefficient usage or hogging of memory. Bad Actor code that exposed the issue: ``` class LogBomber(Actor): def __init__(self) -> None: self.logger = logging.getLogger() self.logger.setLevel(logging.INFO) endpoint async def spam_logs(self, num_logs: int, delay_ms: int = 0) -> None: """Generate a massive number of logs in rapid succession""" for i in range(num_logs): # Generate both stdout and stderr logs to maximize channel pressure print(f"STDOUT_SPAM_{i}: " + "X" * 1000000000, flush=True) # Large log lines self.logger.error(f"STDERR_SPAM_{i}: " + "Y" * 100000000) # Large error logs if delay_ms > 0 and i % 100 == 0: await asyncio.sleep(delay_ms / 1000.0) ``` **Solution**: Limit the read to 256 KB for a single text line. The rest of the text is skipped and marked with "<TRUNCATED>". Differential Revision: D82412752
1 parent f3467b2 commit 6f69c5a

File tree

1 file changed

+116
-15
lines changed

1 file changed

+116
-15
lines changed

hyperactor_mesh/src/alloc/logtailer.rs

Lines changed: 116 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,14 @@ use std::sync::Mutex;
1717
use tokio::io;
1818
use tokio::io::AsyncBufReadExt;
1919
use tokio::io::AsyncRead;
20+
use tokio::io::AsyncReadExt;
2021
use tokio::io::AsyncWrite;
2122
use tokio::io::AsyncWriteExt;
2223
use tokio::io::BufReader;
2324

25+
/// Maximum byte size of a single log line before truncation
26+
const MAX_BYTE_SIZE_LOG_LINE: usize = 256 * 1024;
27+
2428
/// A tailer (ring buffer) of (text) log lines.
2529
pub struct LogTailer {
2630
state: Arc<Mutex<State>>,
@@ -34,6 +38,21 @@ struct State {
3438
}
3539

3640
impl LogTailer {
41+
/// Helper method to push a line to the ring buffer
42+
fn push_line_to_buffer(state: &Arc<Mutex<State>>, buffer: &mut String, max: usize) {
43+
while buffer.ends_with('\n') {
44+
buffer.pop();
45+
}
46+
let mut locked = state.lock().unwrap();
47+
let next = locked.next;
48+
if next < locked.lines.len() {
49+
swap(&mut locked.lines[next], buffer);
50+
} else {
51+
locked.lines.push(buffer.clone());
52+
}
53+
locked.next = (next + 1) % max;
54+
}
55+
3756
/// Create a new tailer given a `stream`. The tailer tails the reader in the
3857
/// background, while keeping at most `max` log lines in its buffer. The tailer
3958
/// stops when the stream is ended (i.e., returns an EOF).
@@ -60,25 +79,52 @@ impl LogTailer {
6079
// and make this awaitable, etc
6180
let handle = tokio::spawn(async move {
6281
let mut reader = BufReader::new(stream);
82+
let mut skip_until_newline = false;
6383
let mut buffer = String::new();
6484
loop {
65-
buffer.clear(); // clear retains the buffer
66-
// TODO: we should probably limit line length
67-
if reader.read_line(&mut buffer).await? == 0 {
85+
// this gives at most a reference to 8KB of data in the internal buffer
86+
// based on internal implementation of BufReader's `DEFAULT_BUF_SIZE`
87+
let reader_buf = reader.fill_buf().await?;
88+
89+
if reader_buf.is_empty() {
90+
// EOF reached, write any remaining buffer content as a line
91+
if !buffer.is_empty() {
92+
tee.write_all(buffer.as_bytes()).await?;
93+
Self::push_line_to_buffer(&state, &mut buffer, max);
94+
buffer.clear()
95+
}
6896
break Ok(());
6997
}
70-
let _ = tee.write_all(buffer.as_bytes()).await;
71-
while buffer.ends_with('\n') {
72-
buffer.pop();
73-
}
74-
let mut locked = state.lock().unwrap();
75-
let next = locked.next;
76-
if next < locked.lines.len() {
77-
swap(&mut locked.lines[next], &mut buffer);
78-
} else {
79-
locked.lines.push(buffer.clone());
98+
99+
let mut _consumed = 0;
100+
for &b in reader_buf {
101+
_consumed += 1;
102+
if skip_until_newline {
103+
if b == b'\n' {
104+
skip_until_newline = false;
105+
}
106+
continue;
107+
}
108+
109+
buffer.push(char::from(b));
110+
if b == b'\n' {
111+
// End of line reached, write buffer to state
112+
if !buffer.is_empty() {
113+
tee.write_all(buffer.as_bytes()).await?;
114+
Self::push_line_to_buffer(&state, &mut buffer, max);
115+
buffer.clear();
116+
}
117+
} else if buffer.len() == MAX_BYTE_SIZE_LOG_LINE {
118+
buffer.push_str("<TRUNCATED>\n");
119+
skip_until_newline = true;
120+
tee.write_all(buffer.as_bytes()).await?;
121+
Self::push_line_to_buffer(&state, &mut buffer, max);
122+
123+
buffer.clear();
124+
}
80125
}
81-
locked.next = (next + 1) % max;
126+
// commit consumed bytes
127+
reader.consume(_consumed);
82128
}
83129
});
84130

@@ -94,7 +140,6 @@ impl LogTailer {
94140
lines.rotate_left(next);
95141
lines
96142
}
97-
98143
/// Abort the tailer. This will stop any ongoing reads, and drop the
99144
/// stream. Abort is complete after `join` returns.
100145
pub fn abort(&self) {
@@ -145,6 +190,62 @@ mod tests {
145190
assert_eq!(lines.next_line().await.unwrap().unwrap(), "world");
146191
}
147192

193+
#[tokio::test]
194+
async fn test_line_truncation() {
195+
// Create input with 3 MAX_BYTE_SIZE_LOG_LINE-byte lines
196+
let mut input_bytes = Vec::new();
197+
// first line is exactly `MAX_BYTE_SIZE_LOG_LINE` bytes including `\n`
198+
input_bytes.extend(vec![b'a'; MAX_BYTE_SIZE_LOG_LINE - 1]);
199+
input_bytes.extend([b'\n']);
200+
201+
// second line is more than `MAX_BYTE_SIZE_LOG_LINE` bytes including `\n`
202+
input_bytes.extend(vec![b'b'; MAX_BYTE_SIZE_LOG_LINE]);
203+
input_bytes.extend([b'\n']);
204+
205+
// last line of the input stream is < `MAX_BYTE_SIZE_LOG_LINE` bytes to ensure complete flush
206+
input_bytes.extend(vec![b'c'; MAX_BYTE_SIZE_LOG_LINE - 1]);
207+
208+
let reader = Cursor::new(input_bytes);
209+
210+
let (lines, result) = LogTailer::new(5, reader).join().await;
211+
assert!(result.is_ok());
212+
213+
// Should have 3 lines
214+
assert_eq!(lines.len(), 3);
215+
216+
// First line should be MAX_BYTE_SIZE_LOG_LINE-1 'a's
217+
assert_eq!(
218+
lines[0],
219+
format!("{}", "a".repeat(MAX_BYTE_SIZE_LOG_LINE - 1))
220+
);
221+
222+
// Second line should be `MAX_BYTE_SIZE_LOG_LINE` 'b's + "<TRUNCATED>"
223+
assert_eq!(
224+
lines[1],
225+
format!("{}<TRUNCATED>", "b".repeat(MAX_BYTE_SIZE_LOG_LINE))
226+
);
227+
228+
// last line before stream closes should be MAX_BYTE_SIZE_LOG_LINE-1 c's
229+
assert_eq!(lines[2], "c".repeat(MAX_BYTE_SIZE_LOG_LINE - 1));
230+
}
231+
232+
#[tokio::test]
233+
async fn test_ring_buffer_behavior() {
234+
let input = "line1\nline2\nline3\nline4\nline5\nline6\nline7\n";
235+
let reader = Cursor::new(input.as_bytes());
236+
let max_lines = 3; // Small ring buffer for easy testing
237+
238+
let (lines, result) = LogTailer::new(max_lines, reader).join().await;
239+
assert!(result.is_ok());
240+
241+
// Should only have the last 3 lines (ring buffer behavior)
242+
// Lines 1-4 should be overwritten (lost due to ring buffer)
243+
assert_eq!(lines.len(), 3);
244+
assert_eq!(lines[0], "line5"); // oldest in current buffer
245+
assert_eq!(lines[1], "line6"); // middle
246+
assert_eq!(lines[2], "line7"); // newest
247+
}
248+
148249
#[tokio::test]
149250
async fn test_streaming_logtailer() {
150251
let (reader, mut writer) = tokio::io::simplex(1);

0 commit comments

Comments
 (0)