-
Notifications
You must be signed in to change notification settings - Fork 1.8k
storage: config: engine: Introduce dead letter queue #11000
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. WalkthroughAdds Dead-Letter Queue (DLQ) support: new config keys/fields, public storage APIs to quarantine chunks, engine calls to invoke quarantine on specific output failure paths, filesystem-backed implementation to write DLQ files, and unit tests validating DLQ scenarios. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant Engine as Engine (handle_output_event)
participant Task as Output Task / chunk
participant StorageAPI as flb_storage_quarantine_chunk
participant CIO as CIO (source chunk)
participant Rej as Rejected Stream (DLQ FS)
rect rgb(250,250,255)
note over Engine,Task: Output failure detected (no-retry / retry-create/schedule fail / error)
Engine->>Task: identify rejected chunk
alt DLQ enabled & chunk available
Engine->>StorageAPI: quarantine_chunk(ctx, ch, tag, status, out_name)
StorageAPI->>CIO: ensure source chunk is up/readable
StorageAPI->>Rej: get_or_create_rejected_stream()
StorageAPI->>Rej: open DLQ chunk (sanitized name) & write payload
StorageAPI->>Rej: fsync & close
StorageAPI-->>Engine: return success/failure
else DLQ disabled or unavailable
Engine-->>Engine: skip quarantine
end
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Suggested reviewers
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
📜 Recent review detailsConfiguration used: CodeRabbit UI Review profile: CHILL Plan: Pro 📒 Files selected for processing (1)
🧰 Additional context used🧬 Code graph analysis (1)tests/internal/storage_dlq.c (3)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (30)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
e91922d
to
4530987
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codex Review
Here are some automated review suggestions for this pull request.
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (3)
src/flb_storage.c (1)
750-785
: Guard rejected-stream lazy creation against races; consider pre-creating at initThis function lazily creates and caches ctx->storage_rejected_stream with no synchronization. Concurrent calls (from multiple output workers) can race, leading to duplicate create attempts or a torn write to the cached pointer.
- Prefer pre-creating the rejected stream during flb_storage_create when storage_keep_rejected is enabled.
- If you keep lazy creation, add a lock or other synchronization around get/create/cache.
Also, sanitize the configured name: storage_rejected_path may contain path separators; pass a single safe stream name to ChunkIO.
If ChunkIO stream creation is guaranteed to be thread-safe and idempotent, please confirm; otherwise, adopt one of the above.
Example minimal sanitization within this function:
- struct cio_stream *st; - const char *name; + struct cio_stream *st; + const char *raw; + char name[256]; + size_t i; @@ - name = ctx->storage_rejected_path ? ctx->storage_rejected_path : "rejected"; + raw = ctx->storage_rejected_path ? ctx->storage_rejected_path : "rejected"; + snprintf(name, sizeof(name), "%s", raw); + name[sizeof(name) - 1] = '\0'; + for (i = 0; name[i] != '\0'; i++) { + if (name[i] == '/' || name[i] == '\\') { + name[i] = '_'; + } + }tests/internal/storage_dlq.c (2)
262-274
: Use join_path for portability (Windows path separators)Hardcoding "/" may fail on Windows. Reuse join_path.
- snprintf(path, sizeof(path), "%s/%s", root, stream_name); - path[sizeof(path)-1] = '\0'; + join_path(path, sizeof(path), root, stream_name);
458-463
: Avoid non-portable basename; implement a portable helperbasename() isn’t available on Windows and requires libgen.h on Unix. Use a small portable helper.
Apply this helper near other utilities:
+static const char *base_name_portable(const char *p) +{ + const char *s1 = strrchr(p, '/'); +#ifdef _WIN32 + const char *s2 = strrchr(p, '\\'); + const char *b = (s1 && s2) ? (s1 > s2 ? s1 : s2) : (s1 ? s1 : s2); +#else + const char *b = s1; +#endif + return b ? b + 1 : p; +}Then replace usage:
- /* get just the filename (basename) */ - strncpy(latest_copy, latest, sizeof(latest_copy)-1); - latest_copy[sizeof(latest_copy)-1] = '\0'; - base = basename(latest_copy); + /* get just the filename (basename) */ + base = base_name_portable(latest);And remove the now-unused latest_copy declaration:
- char latest_copy[1024];
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (7)
include/fluent-bit/flb_config.h
(2 hunks)include/fluent-bit/flb_storage.h
(2 hunks)src/flb_config.c
(3 hunks)src/flb_engine.c
(5 hunks)src/flb_storage.c
(1 hunks)tests/internal/CMakeLists.txt
(1 hunks)tests/internal/storage_dlq.c
(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (5)
src/flb_storage.c (1)
include/fluent-bit/flb_mem.h (1)
flb_free
(126-128)
include/fluent-bit/flb_storage.h (1)
src/flb_storage.c (1)
flb_storage_quarantine_chunk
(787-859)
src/flb_engine.c (5)
src/flb_input_chunk.c (1)
flb_input_chunk_get_tag
(2174-2200)src/flb_sds.c (2)
flb_sds_create_len
(58-76)flb_sds_destroy
(389-399)src/flb_input.c (1)
flb_input_name
(790-797)src/flb_output.c (1)
flb_output_name
(1087-1094)src/flb_storage.c (1)
flb_storage_quarantine_chunk
(787-859)
src/flb_config.c (1)
include/fluent-bit/flb_mem.h (1)
flb_free
(126-128)
tests/internal/storage_dlq.c (2)
src/flb_config.c (2)
flb_config_exit
(448-628)flb_config_init
(232-446)src/flb_storage.c (1)
flb_storage_quarantine_chunk
(787-859)
🔇 Additional comments (2)
src/flb_engine.c (1)
234-276
: Solid defensive gating on DLQ handling.Short-circuiting when DLQ is disabled or the task lacks a chunk keeps us off the hot path unless quarantine is actually configured, which is exactly what we need here.
tests/internal/storage_dlq.c (1)
386-397
: Confirm cio_qsort comparator usagePassing NULL as comparator assumes ChunkIO provides a sensible default. If not guaranteed, pass a comparator or drop the call.
8874e0d
to
de96d72
Compare
Signed-off-by: Hiroshi Hatake <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/flb_storage.c
(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
src/flb_storage.c (1)
include/fluent-bit/flb_mem.h (1)
flb_free
(126-128)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (30)
- GitHub Check: pr-windows-build / call-build-windows-package (Windows 64bit (Arm64), amd64_arm64, -DCMAKE_SYSTEM_NAME=Windows -DCMA...
- GitHub Check: pr-windows-build / call-build-windows-package (Windows 64bit, x64, x64-windows-static, 3.31.6)
- GitHub Check: pr-windows-build / call-build-windows-package (Windows 32bit, x86, x86-windows-static, 3.31.6)
- GitHub Check: PR - fuzzing test
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_THREAD=On, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_ARROW=On, 3.31.6, gcc, g++)
- GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, gcc, g++, ubuntu-22.04, clang-12)
- GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, clang, clang++, ubuntu-22.04, clang-12)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_COVERAGE=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_MEMORY=On, 3.31.6, clang, clang++)
- GitHub Check: pr-compile-without-cxx (3.31.6)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=Off, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=Off, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_MEMORY=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SMALL=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_UNDEFINED=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SMALL=On, 3.31.6, clang, clang++)
- GitHub Check: pr-compile-centos-7
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_THREAD=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=On, 3.31.6, gcc, g++)
- GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, clang, clang++, ubuntu-24.04, clang-14)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=On, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_UNDEFINED=On, 3.31.6, clang, clang++)
- GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, gcc, g++, ubuntu-24.04, clang-14)
- GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_ADDRESS=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=Off, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=On, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_ADDRESS=On, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=Off, 3.31.6, clang, clang++)
🔇 Additional comments (2)
src/flb_storage.c (2)
750-767
: LGTM! Filename sanitization properly implemented.The sanitization logic correctly replaces filesystem-hostile characters ('/', '\', ':') with underscores and handles edge cases (NULL input, bounds checking, null termination).
Based on past review comments, this addresses the previous concern about sanitizing DLQ filename components.
769-803
: Ensure thread-safe lazy init of ctx->storage_rejected_streamget_or_create_rejected_stream caches ctx->storage_rejected_stream without any synchronization; repository search found no protection or callers guaranteeing single-threaded access — add a mutex or atomic compare‑and‑swap around the read/create/store, or verify and document that callers run single-threaded.
Location: src/flb_storage.c:769-803
Signed-off-by: Hiroshi Hatake <[email protected]>
62df481
to
249a59b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
src/flb_storage.c (1)
805-814
: Consider logging at higher severity when state restoration fails on success path.When
ret_val == 0
(quarantine succeeded) butcio_chunk_down
fails, the chunk remains up, increasing memory residency despite a successful quarantine. While the current debug log is appropriate for the error path (ret_val == -1
), a warning might be more suitable when the DLQ operation itself succeeded.Consider this adjustment:
static inline int flb_storage_chunk_restore_state(struct cio_chunk *src, int was_up, int ret_val) { if (!was_up) { if (cio_chunk_down(src) != CIO_OK) { - flb_debug("[storage] failed to bring chunk back down"); + if (ret_val == 0) { + flb_warn("[storage] DLQ succeeded but failed to restore chunk state"); + } else { + flb_debug("[storage] failed to bring chunk back down"); + } } } return ret_val; }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/flb_storage.c
(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
src/flb_storage.c (1)
include/fluent-bit/flb_mem.h (1)
flb_free
(126-128)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (30)
- GitHub Check: pr-windows-build / call-build-windows-package (Windows 64bit, x64, x64-windows-static, 3.31.6)
- GitHub Check: pr-windows-build / call-build-windows-package (Windows 64bit (Arm64), amd64_arm64, -DCMAKE_SYSTEM_NAME=Windows -DCMA...
- GitHub Check: pr-windows-build / call-build-windows-package (Windows 32bit, x86, x86-windows-static, 3.31.6)
- GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, clang, clang++, ubuntu-24.04, clang-14)
- GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, gcc, g++, ubuntu-24.04, clang-14)
- GitHub Check: pr-compile-without-cxx (3.31.6)
- GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, clang, clang++, ubuntu-22.04, clang-12)
- GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, gcc, g++, ubuntu-22.04, clang-12)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=On, 3.31.6, clang, clang++)
- GitHub Check: pr-compile-centos-7
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=Off, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_ARROW=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=Off, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=On, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=Off, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_ADDRESS=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_THREAD=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_MEMORY=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SMALL=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_UNDEFINED=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_COVERAGE=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_UNDEFINED=On, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_ADDRESS=On, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SMALL=On, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_THREAD=On, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_MEMORY=On, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=Off, 3.31.6, gcc, g++)
- GitHub Check: PR - fuzzing test
🔇 Additional comments (3)
src/flb_storage.c (3)
750-767
: LGTM! Robust sanitization with proper boundary checks.The helper correctly sanitizes filename components by replacing filesystem-hostile characters. The implementation includes proper null termination, boundary checks, and safe handling of null input with a sensible default.
769-803
: LGTM! Clean stream initialization with proper caching.The function correctly initializes or retrieves the DLQ stream with appropriate guards for filesystem backend availability, proper null checks, and efficient caching to avoid repeated stream creation.
816-893
: Approve quarantine implementation
Verifiedcio_chunk_up_force
preserves original chunk state on failure; no additional restore call needed.
Signed-off-by: Hiroshi Hatake <[email protected]>
This is because the current Fluent Bit mechanism does not offer for preserving invalid chunks for requesting via network.
So, if users encounter such behavior, the chunks will be just deleted and there is no clues to solve it.
Related to #9363.
Enter
[N/A]
in the box, if an item is not applicable to your change.Testing
Before we can approve your change; please submit the following in a comment:
For example, this feature will be enabled with:
If this is a change to packaging of containers or native binaries then please confirm it works for all targets.
ok-package-test
label to test for all targets (requires maintainer to do).Documentation
Backporting
Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.
Summary by CodeRabbit
New Features
Tests