Skip to content

Commit 9392bcf

Browse files
committed
Implement the Drain::flush method
Sends a message to the logging thread, then blocks until the logger acknowledges it. Contrast this to the busy-loop approach in slog-rs#36 Pinned to the branch used in PR #349, so cannot be released until that PR is accepted. Fixes issue slog-rs#35
1 parent c2040ac commit 9392bcf

File tree

3 files changed

+174
-24
lines changed

3 files changed

+174
-24
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,13 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
66

77
## [Unreleased]
88

9+
* Implement the `Drain::flush` method added in [slog-rs/slog#349]
910
* Define minimum supported rust version.
1011
* Setup Github Actions.
1112
* Fixup some minor typos in CHANGELOG.md, including an incorrect release date for 2.8.0.
1213

14+
[slog-rs/slog#349]: https://github.com/slog-rs/slog/pull/349
15+
1316
## 2.8.0 - 2023-08-26
1417

1518
* Call of deprecated `err.description()` replaced with `err.to_string()`.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ default = []
2121
path = "lib.rs"
2222

2323
[dependencies]
24-
slog = "2.1"
24+
slog = { git = "https://github.com/Techcable/slog.git", branch = "feature/simple-flush-method" }
2525
thread_local = "1"
2626
take_mut = "0.2.0"
2727
crossbeam-channel = "0.5"

lib.rs

Lines changed: 170 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ use std::{io, thread};
6868

6969
use std::sync::atomic::AtomicUsize;
7070
use std::sync::atomic::Ordering;
71-
use std::sync::Mutex;
71+
use std::sync::{Arc, Condvar, Mutex};
7272
use take_mut::take;
7373

7474
use std::panic::{catch_unwind, AssertUnwindSafe};
@@ -245,6 +245,12 @@ where
245245
thread_name: Option<String>,
246246
}
247247

248+
struct SpawnedThreadInfo {
249+
sender: Sender<AsyncMsg>,
250+
flush_lock: Arc<Mutex<FlushStatus>>,
251+
flush_cond: Arc<Condvar>,
252+
}
253+
248254
impl<D> AsyncCoreBuilder<D>
249255
where
250256
D: slog::Drain<Err = slog::Never, Ok = ()> + Send + 'static,
@@ -287,25 +293,71 @@ where
287293
self
288294
}
289295

290-
fn spawn_thread(self) -> (thread::JoinHandle<()>, Sender<AsyncMsg>) {
296+
fn spawn_thread(self) -> (thread::JoinHandle<()>, SpawnedThreadInfo) {
291297
let (tx, rx) = crossbeam_channel::bounded(self.chan_size);
292298
let mut builder = thread::Builder::new();
293299
if let Some(thread_name) = self.thread_name {
294300
builder = builder.name(thread_name);
295301
}
302+
let flush_lock = Arc::new(Mutex::new(FlushStatus::NotRequested));
303+
let flush_cond = Arc::new(Condvar::new());
304+
let state = SpawnedThreadInfo {
305+
sender: tx,
306+
flush_lock: Arc::clone(&flush_lock),
307+
flush_cond: Arc::clone(&flush_cond),
308+
};
296309
let drain = self.drain;
297310
let join = builder
298311
.spawn(move || {
299312
let drain = AssertUnwindSafe(&drain);
313+
let mut gave_flush_warning = false;
300314
// catching possible unwinding panics which can occur in used inner Drain implementation
301315
if let Err(panic_cause) = catch_unwind(move || loop {
316+
let mut give_flush_warning = |x: &str| {
317+
if !gave_flush_warning {
318+
eprintln!("slog-async: {}", x);
319+
}
320+
gave_flush_warning = true;
321+
};
302322
match rx.recv() {
303323
Ok(AsyncMsg::Record(r)) => {
304324
if r.log_to(&*drain).is_err() {
305325
eprintln!("slog-async failed while writing");
306326
return;
307327
}
308-
}
328+
},
329+
Ok(AsyncMsg::Flush) => {
330+
let status_lock = match flush_lock.lock() {
331+
Err(_e) => {
332+
give_flush_warning("fush lock poisoned");
333+
continue;
334+
},
335+
Ok(s) => s,
336+
};
337+
if !matches!(*status_lock, FlushStatus::Pending) {
338+
flush_cond.notify_all();
339+
drop(status_lock);
340+
continue;
341+
}
342+
drop(status_lock);
343+
let res_status = match drain.flush() {
344+
Ok(()) => FlushStatus::Success,
345+
Err(e) => FlushStatus::Failure(e),
346+
};
347+
let mut status_lock = match flush_lock.lock() {
348+
Err(_e) => {
349+
give_flush_warning("fush lock poisoned");
350+
continue;
351+
},
352+
Ok(s) => s,
353+
};
354+
if !matches!(*status_lock, FlushStatus::Pending) {
355+
give_flush_warning("fush status corrupted");
356+
}
357+
*status_lock = res_status;
358+
flush_cond.notify_all();
359+
drop(status_lock);
360+
},
309361
Ok(AsyncMsg::Finish) => return,
310362
Err(recv_error) => {
311363
eprintln!("slog-async failed while receiving: {recv_error}");
@@ -318,7 +370,7 @@ where
318370
})
319371
.unwrap();
320372

321-
(join, tx)
373+
(join, state)
322374
}
323375

324376
/// Build `AsyncCore`
@@ -329,12 +381,14 @@ where
329381
/// Build `AsyncCore`
330382
pub fn build_no_guard(self) -> AsyncCore {
331383
let blocking = self.blocking;
332-
let (join, tx) = self.spawn_thread();
384+
let (join, info) = self.spawn_thread();
333385

334386
AsyncCore {
335-
ref_sender: tx,
387+
ref_sender: info.sender,
336388
tl_sender: thread_local::ThreadLocal::new(),
337389
join: Mutex::new(Some(join)),
390+
flush_lock: info.flush_lock,
391+
flush_cond: info.flush_cond,
338392
blocking,
339393
}
340394
}
@@ -344,18 +398,20 @@ where
344398
/// See `AsyncGuard` for more information.
345399
pub fn build_with_guard(self) -> (AsyncCore, AsyncGuard) {
346400
let blocking = self.blocking;
347-
let (join, tx) = self.spawn_thread();
401+
let (join, info) = self.spawn_thread();
348402

349403
(
350404
AsyncCore {
351-
ref_sender: tx.clone(),
405+
ref_sender: info.sender.clone(),
352406
tl_sender: thread_local::ThreadLocal::new(),
353407
join: Mutex::new(None),
408+
flush_lock: info.flush_lock,
409+
flush_cond: info.flush_cond,
354410
blocking,
355411
},
356412
AsyncGuard {
357413
join: Some(join),
358-
tx,
414+
tx: info.sender,
359415
},
360416
)
361417
}
@@ -403,6 +459,14 @@ impl Drop for AsyncGuard {
403459
}
404460
}
405461

462+
#[derive(Debug)]
463+
enum FlushStatus {
464+
NotRequested,
465+
Pending,
466+
Failure(slog::FlushError),
467+
Success,
468+
}
469+
406470
/// Core of `Async` drain
407471
///
408472
/// See `Async` for documentation.
@@ -418,6 +482,8 @@ pub struct AsyncCore {
418482
tl_sender: thread_local::ThreadLocal<Sender<AsyncMsg>>,
419483
join: Mutex<Option<thread::JoinHandle<()>>>,
420484
blocking: bool,
485+
flush_lock: Arc<Mutex<FlushStatus>>,
486+
flush_cond: Arc<Condvar>,
421487
}
422488

423489
impl AsyncCore {
@@ -474,8 +540,52 @@ impl Drain for AsyncCore {
474540
) -> AsyncResult<()> {
475541
self.send(AsyncRecord::from(record, logger_values))
476542
}
477-
}
478543

544+
fn flush(&self) -> Result<(), slog::FlushError> {
545+
fn handle_poison(
546+
mut e: std::sync::PoisonError<
547+
std::sync::MutexGuard<'_, FlushStatus>,
548+
>,
549+
) -> std::io::Error {
550+
**e.get_mut() = FlushStatus::NotRequested; // cancel request, allowing retry
551+
std::io::Error::new(std::io::ErrorKind::Other, "mutex poisoned")
552+
}
553+
let sender = self.get_sender().map_err(|_e| {
554+
std::io::Error::new(std::io::ErrorKind::Other, "mutex poisoned")
555+
})?;
556+
let mut status_lock = self.flush_lock.lock().map_err(handle_poison)?;
557+
while !matches!(*status_lock, FlushStatus::NotRequested) {
558+
// another flush is in progress, block until that one succeeds
559+
status_lock =
560+
self.flush_cond.wait(status_lock).map_err(handle_poison)?;
561+
}
562+
assert!(
563+
matches!(*status_lock, FlushStatus::NotRequested),
564+
"{:?}",
565+
*status_lock
566+
);
567+
match sender.send(AsyncMsg::Flush) {
568+
Ok(()) => {}
569+
Err(_) => {
570+
return Err(std::io::Error::new(
571+
std::io::ErrorKind::Other,
572+
"channel disconnected",
573+
)
574+
.into());
575+
}
576+
}
577+
*status_lock = FlushStatus::Pending;
578+
while matches!(*status_lock, FlushStatus::Pending) {
579+
status_lock =
580+
self.flush_cond.wait(status_lock).map_err(handle_poison)?;
581+
}
582+
match std::mem::replace(&mut *status_lock, FlushStatus::NotRequested) {
583+
FlushStatus::NotRequested | FlushStatus::Pending => unreachable!(),
584+
FlushStatus::Failure(cause) => Err(cause),
585+
FlushStatus::Success => Ok(()),
586+
}
587+
}
588+
}
479589
/// Serialized record.
480590
pub struct AsyncRecord {
481591
msg: String,
@@ -545,6 +655,7 @@ impl AsyncRecord {
545655
enum AsyncMsg {
546656
Record(AsyncRecord),
547657
Finish,
658+
Flush,
548659
}
549660

550661
impl Drop for AsyncCore {
@@ -796,6 +907,10 @@ impl Drain for Async {
796907

797908
Ok(())
798909
}
910+
911+
fn flush(&self) -> Result<(), slog::FlushError> {
912+
self.core.flush()
913+
}
799914
}
800915

801916
impl Drop for Async {
@@ -806,7 +921,6 @@ impl Drop for Async {
806921

807922
// }}}
808923

809-
810924
#[cfg(test)]
811925
mod test {
812926
use super::*;
@@ -815,25 +929,45 @@ mod test {
815929
#[test]
816930
fn integration_test() {
817931
let (mock_drain, mock_drain_rx) = MockDrain::new();
818-
let async_drain = AsyncBuilder::new(mock_drain)
819-
.build();
820-
let slog = slog::Logger::root(async_drain.fuse(), o!("field1" => "value1"));
932+
let async_drain = AsyncBuilder::new(mock_drain).build();
933+
let slog =
934+
slog::Logger::root(async_drain.fuse(), o!("field1" => "value1"));
821935

822936
info!(slog, "Message 1"; "field2" => "value2");
823937
warn!(slog, "Message 2"; "field3" => "value3");
824-
assert_eq!(mock_drain_rx.recv().unwrap(), r#"INFO Message 1: [("field1", "value1"), ("field2", "value2")]"#);
825-
assert_eq!(mock_drain_rx.recv().unwrap(), r#"WARN Message 2: [("field1", "value1"), ("field3", "value3")]"#);
938+
assert_eq!(
939+
mock_drain_rx.recv().unwrap(),
940+
*r#"INFO Message 1: [("field1", "value1"), ("field2", "value2")]"#
941+
);
942+
assert_eq!(
943+
mock_drain_rx.recv().unwrap(),
944+
*r#"WARN Message 2: [("field1", "value1"), ("field3", "value3")]"#
945+
);
946+
slog.flush().unwrap();
947+
assert_eq!(mock_drain_rx.recv().unwrap(), MockMsg::Flush);
948+
}
949+
950+
#[derive(Debug, PartialEq)]
951+
enum MockMsg {
952+
Log(String),
953+
Flush,
954+
}
955+
impl PartialEq<str> for MockMsg {
956+
fn eq(&self, other: &str) -> bool {
957+
match self {
958+
MockMsg::Log(ref msg) => msg == other,
959+
_ => false,
960+
}
961+
}
826962
}
827-
828-
829963
/// Test-helper drain
830964
#[derive(Debug)]
831965
struct MockDrain {
832-
tx: mpsc::Sender<String>,
966+
tx: mpsc::Sender<MockMsg>,
833967
}
834968

835969
impl MockDrain {
836-
fn new() -> (Self, mpsc::Receiver<String>) {
970+
fn new() -> (Self, mpsc::Receiver<MockMsg>) {
837971
let (tx, rx) = mpsc::channel();
838972
(Self { tx }, rx)
839973
}
@@ -843,14 +977,23 @@ mod test {
843977
type Ok = ();
844978
type Err = slog::Never;
845979

846-
fn log(&self, record: &Record, logger_kv: &OwnedKVList) -> Result<Self::Ok, Self::Err> {
980+
fn log(
981+
&self,
982+
record: &Record,
983+
logger_kv: &OwnedKVList,
984+
) -> Result<Self::Ok, Self::Err> {
847985
let mut serializer = MockSerializer::default();
848986
logger_kv.serialize(record, &mut serializer).unwrap();
849987
record.kv().serialize(record, &mut serializer).unwrap();
850988
let level = record.level().as_short_str();
851989
let msg = record.msg().to_string();
852990
let entry = format!("{} {}: {:?}", level, msg, serializer.kvs);
853-
self.tx.send(entry).unwrap();
991+
self.tx.send(MockMsg::Log(entry)).unwrap();
992+
Ok(())
993+
}
994+
995+
fn flush(&self) -> Result<(), slog::FlushError> {
996+
self.tx.send(MockMsg::Flush).unwrap();
854997
Ok(())
855998
}
856999
}
@@ -861,7 +1004,11 @@ mod test {
8611004
}
8621005

8631006
impl slog::Serializer for MockSerializer {
864-
fn emit_arguments(&mut self, key: Key, val: &fmt::Arguments) -> Result<(), slog::Error> {
1007+
fn emit_arguments(
1008+
&mut self,
1009+
key: Key,
1010+
val: &fmt::Arguments,
1011+
) -> Result<(), slog::Error> {
8651012
self.kvs.push((key.to_string(), val.to_string()));
8661013
Ok(())
8671014
}

0 commit comments

Comments
 (0)