Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 12 additions & 8 deletions internal-packages/run-engine/src/run-queue/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1862,10 +1862,12 @@ export class RunQueue {
// Call this every 10 minutes
private async scanConcurrencySets() {
if (this.abortController.signal.aborted) {
this.logger.info("Abort signal received, skipping concurrency scan");

return;
}

this.logger.debug("Scanning concurrency sets for completed runs");
this.logger.info("Scanning concurrency sets for completed runs");

const stats = {
streamCallbacks: 0,
Expand Down Expand Up @@ -1918,7 +1920,7 @@ export class RunQueue {
return;
}

this.logger.debug("Processing concurrency keys from stream", {
this.logger.info("Processing concurrency keys from stream", {
keys: uniqueKeys,
});

Expand Down Expand Up @@ -1988,27 +1990,29 @@ export class RunQueue {
}

private async processCurrentConcurrencyRunIds(concurrencyKey: string, runIds: string[]) {
this.logger.debug(`Processing concurrency set with ${runIds.length} runs`, {
this.logger.info("Processing concurrency set with runs", {
concurrencyKey,
runIds: runIds.slice(0, 5), // Log first 5 for debugging
runIds: runIds.slice(0, 5), // Log first 5 for debugging,
runIdsLength: runIds.length,
});

// Call the callback to determine which runs are completed
const completedRuns = await this.options.concurrencySweeper?.callback(runIds);

if (!completedRuns) {
this.logger.debug("No completed runs found in concurrency set", { concurrencyKey });
this.logger.info("No completed runs found in concurrency set", { concurrencyKey });
return;
}

if (completedRuns.length === 0) {
this.logger.debug("No completed runs found in concurrency set", { concurrencyKey });
this.logger.info("No completed runs found in concurrency set", { concurrencyKey });
return;
}

this.logger.debug(`Found ${completedRuns.length} completed runs to mark for ack`, {
this.logger.info("Found completed runs to mark for ack", {
concurrencyKey,
completedRunIds: completedRuns.map((r) => r.id).slice(0, 5),
completedRunIdsLength: completedRuns.length,
});

// Mark the completed runs for acknowledgment
Expand All @@ -2032,7 +2036,7 @@ export class RunQueue {

const count = await this.redis.zadd(markedForAckKey, ...args);

this.logger.debug(`Marked ${count} runs for acknowledgment`, {
this.logger.info("Marked runs for acknowledgment", {
markedForAckKey,
count,
});
Expand Down
Loading