diff --git a/.changeset/soft-candles-grow.md b/.changeset/soft-candles-grow.md new file mode 100644 index 0000000000..f33720f2e6 --- /dev/null +++ b/.changeset/soft-candles-grow.md @@ -0,0 +1,6 @@ +--- +"@trigger.dev/sdk": patch +"trigger.dev": patch +--- + +Added the heartbeats.yield utility to allow tasks that do continuous CPU-heavy work to heartbeat and continue running diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index 059195d543..2d8384ec19 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -519,8 +519,8 @@ const EnvironmentSchema = z RUN_ENGINE_WORKER_IMMEDIATE_POLL_INTERVAL: z.coerce.number().int().default(100), RUN_ENGINE_TIMEOUT_PENDING_EXECUTING: z.coerce.number().int().default(60_000), RUN_ENGINE_TIMEOUT_PENDING_CANCEL: z.coerce.number().int().default(60_000), - RUN_ENGINE_TIMEOUT_EXECUTING: z.coerce.number().int().default(60_000), - RUN_ENGINE_TIMEOUT_EXECUTING_WITH_WAITPOINTS: z.coerce.number().int().default(60_000), + RUN_ENGINE_TIMEOUT_EXECUTING: z.coerce.number().int().default(300_000), // 5 minutes + RUN_ENGINE_TIMEOUT_EXECUTING_WITH_WAITPOINTS: z.coerce.number().int().default(300_000), // 5 minutes RUN_ENGINE_TIMEOUT_SUSPENDED: z.coerce .number() .int() @@ -735,6 +735,7 @@ const EnvironmentSchema = z RUN_ENGINE_RUN_QUEUE_LOG_LEVEL: z .enum(["log", "error", "warn", "info", "debug"]) .default("info"), + RUN_ENGINE_TREAT_PRODUCTION_EXECUTION_STALLS_AS_OOM: z.string().default("0"), /** How long should the presence ttl last */ DEV_PRESENCE_SSE_TIMEOUT: z.coerce.number().int().default(30_000), diff --git a/apps/webapp/app/v3/runEngine.server.ts b/apps/webapp/app/v3/runEngine.server.ts index ce2b38d94c..da01bd44db 100644 --- a/apps/webapp/app/v3/runEngine.server.ts +++ b/apps/webapp/app/v3/runEngine.server.ts @@ -15,6 +15,8 @@ function createRunEngine() { prisma, readOnlyPrisma: $replica, logLevel: env.RUN_ENGINE_WORKER_LOG_LEVEL, + treatProductionExecutionStallsAsOOM: + env.RUN_ENGINE_TREAT_PRODUCTION_EXECUTION_STALLS_AS_OOM === "1", worker: { disabled: env.RUN_ENGINE_WORKER_ENABLED === "0", workers: env.RUN_ENGINE_WORKER_COUNT, diff --git a/docs/images/machines-resource-monitor-ffmpeg.png b/docs/images/machines-resource-monitor-ffmpeg.png new file mode 100644 index 0000000000..97bb290657 Binary files /dev/null and b/docs/images/machines-resource-monitor-ffmpeg.png differ diff --git a/docs/images/machines-resource-monitor-logs.png b/docs/images/machines-resource-monitor-logs.png new file mode 100644 index 0000000000..bc2d323f92 Binary files /dev/null and b/docs/images/machines-resource-monitor-logs.png differ diff --git a/docs/machines.mdx b/docs/machines.mdx index 0b60926eaf..b3c08c177e 100644 --- a/docs/machines.mdx +++ b/docs/machines.mdx @@ -6,7 +6,7 @@ description: "Configure the number of vCPUs and GBs of RAM you want the task to The `machine` configuration is optional. Using higher spec machines will increase the cost of running the task but can also improve the performance of the task if it is CPU or memory bound. ```ts /trigger/heavy-task.ts -import { task } from "@trigger.dev/sdk"; +import { task } from "@trigger.dev/sdk"; export const heavyTask = task({ id: "heavy-task", @@ -30,15 +30,15 @@ export const config: TriggerConfig = { ## Machine configurations -| Preset | vCPU | Memory | Disk space | -| :------------------ | :--- | :----- | :--------- | -| micro | 0.25 | 0.25 | 10GB | -| small-1x (default) | 0.5 | 0.5 | 10GB | -| small-2x | 1 | 1 | 10GB | -| medium-1x | 1 | 2 | 10GB | -| medium-2x | 2 | 4 | 10GB | -| large-1x | 4 | 8 | 10GB | -| large-2x | 8 | 16 | 10GB | +| Preset | vCPU | Memory | Disk space | +| :----------------- | :--- | :----- | :--------- | +| micro | 0.25 | 0.25 | 10GB | +| small-1x (default) | 0.5 | 0.5 | 10GB | +| small-2x | 1 | 1 | 10GB | +| medium-1x | 1 | 2 | 10GB | +| medium-2x | 2 | 4 | 10GB | +| large-1x | 4 | 8 | 10GB | +| large-2x | 8 | 16 | 10GB | You can view the Trigger.dev cloud pricing for these machines [here](https://trigger.dev/pricing#computePricing). @@ -60,14 +60,855 @@ This is useful when you know that a certain payload will require more memory tha Sometimes you might see one of your runs fail with an "Out Of Memory" error. -> TASK_PROCESS_OOM_KILLED. Your task ran out of memory. Try increasing the machine specs. If this doesn't fix it there might be a memory leak. +> TASK_PROCESS_OOM_KILLED. Your run was terminated due to exceeding the machine's memory limit. Try increasing the machine preset in your task options or replay using a larger machine. -We automatically detect common Out Of Memory errors, including when ffmpeg throws an error because it ran out of memory. +We automatically detect common Out Of Memory errors: + +- When using Node.js, if the V8 heap limit is exceeded (this can happen when creating large long-lived objects) +- When the entire process exceeds the memory limit of the machine the run is executing on. +- When a child process, such as ffmpeg, causes the memory limit of the machine to be exceeded, and exits with a non-zero code. + +### Memory/Resource monitoring + +To better understand why an OOM error occurred, we've published a helper class that will log memory debug information at regular intervals. + +First, add this `ResourceMonitor` class to your project: + + + +```ts /src/resourceMonitor.ts +import { promisify } from "node:util"; +import { exec } from "node:child_process"; +import os from "node:os"; +import { promises as fs } from "node:fs"; +import { type Context, logger } from "@trigger.dev/sdk"; +import { getHeapStatistics } from "node:v8"; +import { PerformanceObserver, constants } from "node:perf_hooks"; + +const execAsync = promisify(exec); + +export type DiskMetrics = { + total: number; + used: number; + free: number; + percentUsed: number; + warning?: string; +}; + +export type MemoryMetrics = { + total: number; + free: number; + used: number; + percentUsed: number; +}; + +export type NodeProcessMetrics = { + memoryUsage: number; + memoryUsagePercent: number; + heapUsed: number; + heapSizeLimit: number; + heapUsagePercent: number; + availableHeap: number; + isNearHeapLimit: boolean; +}; + +export type TargetProcessMetrics = { + method: string; + processName: string; + count: number; + processes: ProcessInfo[]; + averages: { + cpu: number; + memory: number; + rss: number; + vsz: number; + } | null; + totals: { + cpu: number; + memory: number; + rss: number; + vsz: number; + } | null; +}; + +export type ProcessMetrics = { + node: NodeProcessMetrics; + targetProcess: TargetProcessMetrics | null; +}; + +type GCSummary = { + count: number; + totalDuration: number; // ms + avgDuration: number; // ms + maxDuration: number; // ms + kinds: Record< + string, + { + // breakdown by kind + count: number; + totalDuration: number; + avgDuration: number; + maxDuration: number; + } + >; +}; + +type ProcessInfo = { + user: string; + pid: number; + cpu: number; + mem: number; + vsz: number; + rss: number; + command: string; +}; + +export type SystemMetrics = { + disk: DiskMetrics; + memory: MemoryMetrics; +}; + +export type ResourceMonitorConfig = { + dirName?: string; + processName?: string; + ctx: Context; + compactLogging?: boolean; +}; + +// Constants +const DISK_LIMIT_GB = 10; +const DISK_LIMIT_BYTES = DISK_LIMIT_GB * 1024 * 1024 * 1024; // 10Gi in bytes + +export class ResourceMonitor { + private logInterval: NodeJS.Timeout | null = null; + private logger: typeof logger; + private dirName: string; + private processName: string | undefined; + private ctx: Context; + private verbose: boolean; + private compactLogging: boolean; + private gcObserver: PerformanceObserver | null = null; + private bufferedGcEntries: PerformanceEntry[] = []; + + constructor(config: ResourceMonitorConfig) { + this.logger = logger; + this.dirName = config.dirName ?? "/tmp"; + this.processName = config.processName; + this.ctx = config.ctx; + this.verbose = true; + this.compactLogging = config.compactLogging ?? false; + } + + /** + * Start periodic resource monitoring + * @param intervalMs Monitoring interval in milliseconds + */ + startMonitoring(intervalMs = 10000): void { + if (intervalMs < 1000) { + intervalMs = 1000; + this.logger.warn("ResourceMonitor: intervalMs is less than 1000, setting to 1000"); + } + + if (this.logInterval) { + clearInterval(this.logInterval); + } + + this.logInterval = setInterval(this.logResources.bind(this), intervalMs); + + this.gcObserver = new PerformanceObserver((list) => { + this.bufferedGcEntries.push(...list.getEntries()); + }); + + this.gcObserver.observe({ entryTypes: ["gc"], buffered: true }); + } + + /** + * Stop resource monitoring + */ + stopMonitoring(): void { + if (this.logInterval) { + clearInterval(this.logInterval); + this.logInterval = null; + } + + if (this.gcObserver) { + this.gcObserver.disconnect(); + this.gcObserver = null; + } + } + + private async logResources() { + try { + await this.logResourceSnapshot("ResourceMonitor"); + } catch (error) { + this.logger.error( + `Resource monitoring error: ${error instanceof Error ? error.message : String(error)}` + ); + } + } + + /** + * Get combined system metrics (disk and memory) + */ + private async getSystemMetrics(): Promise { + const [disk, memory] = await Promise.all([this.getDiskMetrics(), this.getMemoryMetrics()]); + return { disk, memory }; + } + + /** + * Get disk space information + */ + private async getDiskMetrics(): Promise { + try { + // Even with permission errors, du will output a total + const { stdout, stderr } = await execAsync(`du -sb ${this.dirName} || true`); + + // Get the last line of stdout which contains the total + const lastLine = stdout.split("\n").filter(Boolean).pop() || ""; + const usedBytes = parseInt(lastLine.split("\t")[0], 10); + + const effectiveTotal = DISK_LIMIT_BYTES; + const effectiveUsed = Math.min(usedBytes, DISK_LIMIT_BYTES); + const effectiveFree = effectiveTotal - effectiveUsed; + const percentUsed = (effectiveUsed / effectiveTotal) * 100; + + const metrics: DiskMetrics = { + total: effectiveTotal, + used: effectiveUsed, + free: effectiveFree, + percentUsed, + }; + + // If we had permission errors, add a warning + if (stderr.includes("Permission denied") || stderr.includes("cannot access")) { + metrics.warning = "Some directories were not accessible"; + } else if (stderr.includes("No such file or directory")) { + metrics.warning = "The directory does not exist"; + } + + return metrics; + } catch (error) { + this.logger.error( + `Error getting disk metrics: ${error instanceof Error ? error.message : String(error)}` + ); + return { + free: DISK_LIMIT_BYTES, + total: DISK_LIMIT_BYTES, + used: 0, + percentUsed: 0, + warning: "Failed to measure disk usage", + }; + } + } + + /** + * Get memory metrics + */ + private getMemoryMetrics(): MemoryMetrics { + const total = os.totalmem(); + const free = os.freemem(); + const used = total - free; + const percentUsed = (used / total) * 100; + + return { total, free, used, percentUsed }; + } + + /** + * Get process-specific metrics using /proc filesystem + */ + private async getProcMetrics(pids: number[]): Promise { + return Promise.all( + pids.map(async (pid) => { + try { + // Read process status + const status = await fs.readFile(`/proc/${pid}/status`, "utf8"); + const cmdline = await fs.readFile(`/proc/${pid}/cmdline`, "utf8"); + const stat = await fs.readFile(`/proc/${pid}/stat`, "utf8"); + + // Parse VmRSS (resident set size) from status + const rss = parseInt(status.match(/VmRSS:\s+(\d+)/)?.[1] ?? "0", 10); + // Parse VmSize (virtual memory size) from status + const vsz = parseInt(status.match(/VmSize:\s+(\d+)/)?.[1] ?? "0", 10); + // Get process owner + const user = (await fs.stat(`/proc/${pid}`)).uid.toString(); + + // Parse CPU stats from /proc/[pid]/stat + const stats = stat.split(" "); + const utime = parseInt(stats[13], 10); + const stime = parseInt(stats[14], 10); + const starttime = parseInt(stats[21], 10); + + // Calculate CPU percentage + const totalTime = utime + stime; + const uptime = os.uptime(); + const hertz = 100; // Usually 100 on Linux + const elapsedTime = uptime - starttime / hertz; + const cpuUsage = 100 * (totalTime / hertz / elapsedTime); + + // Calculate memory percentage against total system memory + const totalMem = os.totalmem(); + const memoryPercent = (rss * 1024 * 100) / totalMem; + + return { + user, + pid, + cpu: cpuUsage, + mem: memoryPercent, + vsz, + rss, + command: cmdline.replace(/\0/g, " ").trim(), + }; + } catch (error) { + return null; + } + }) + ).then((results) => results.filter((r): r is ProcessInfo => r !== null)); + } + + /** + * Find PIDs for a process name using /proc filesystem + */ + private async findPidsByName(processName?: string): Promise { + if (!processName) { + return []; + } + + try { + const pids: number[] = []; + const procDirs = await fs.readdir("/proc"); + + for (const dir of procDirs) { + if (!/^\d+$/.test(dir)) continue; + + const processPid = parseInt(dir, 10); + + // Ignore processes that have a lower PID than our own PID + if (processPid <= process.pid) { + continue; + } + + try { + const cmdline = await fs.readFile(`/proc/${dir}/cmdline`, "utf8"); + if (cmdline.includes(processName)) { + pids.push(parseInt(dir, 10)); + } + } catch { + // Ignore errors reading individual process info + continue; + } + } + + return pids; + } catch { + return []; + } + } + + /** + * Get process-specific metrics + */ + private async getProcessMetrics(): Promise { + // Get Node.js process metrics + const totalMemory = os.totalmem(); + // Convert GB to bytes (machine.memory is in GB) + const machineMemoryBytes = this.ctx.machine + ? this.ctx.machine.memory * 1024 * 1024 * 1024 + : totalMemory; + const nodeMemoryUsage = process.memoryUsage(); + + // Node process percentage is based on machine memory if available, otherwise system memory + const nodeMemoryPercent = (nodeMemoryUsage.rss / machineMemoryBytes) * 100; + const heapStats = getHeapStatistics(); + + const nodeMetrics: NodeProcessMetrics = { + memoryUsage: nodeMemoryUsage.rss, + memoryUsagePercent: nodeMemoryPercent, + heapUsed: nodeMemoryUsage.heapUsed, + heapSizeLimit: heapStats.heap_size_limit, + heapUsagePercent: (heapStats.used_heap_size / heapStats.heap_size_limit) * 100, + availableHeap: heapStats.total_available_size, + isNearHeapLimit: heapStats.used_heap_size / heapStats.heap_size_limit > 0.8, + }; + + let method = "ps"; + + try { + let processes: ProcessInfo[] = []; + + // Try ps first, fall back to /proc if it fails + try { + const { stdout: psOutput } = await execAsync( + `ps aux | grep ${this.processName} | grep -v grep` + ); + + if (psOutput.trim()) { + processes = psOutput + .trim() + .split("\n") + .filter((line) => { + const parts = line.trim().split(/\s+/); + const pid = parseInt(parts[1], 10); + + // Ignore processes that have a lower PID than our own PID + return pid > process.pid; + }) + .map((line) => { + const parts = line.trim().split(/\s+/); + return { + user: parts[0], + pid: parseInt(parts[1], 10), + cpu: parseFloat(parts[2]), + mem: parseFloat(parts[3]), + vsz: parseInt(parts[4], 10), + rss: parseInt(parts[5], 10), + command: parts.slice(10).join(" "), + }; + }); + } + } catch { + // ps failed, try /proc instead + method = "proc"; + const pids = await this.findPidsByName(this.processName); + processes = await this.getProcMetrics(pids); + } + + if (processes.length === 0) { + return { + node: nodeMetrics, + targetProcess: this.processName + ? { + method, + processName: this.processName, + count: 0, + processes: [], + averages: null, + totals: null, + } + : null, + }; + } + + // For CPU: + // - ps shows CPU percentage per core (e.g., 100% = 1 core) + // - machine.cpu is in cores (e.g., 0.5 = half a core) + // - we want to show percentage of allocated CPU (e.g., 100% = using all allocated CPU) + const availableCpu = this.ctx.machine?.cpu ?? os.cpus().length; + const cpuNormalizer = availableCpu * 100; // Convert to basis points for better precision with fractional CPUs + + // For Memory: + // - ps 'mem' is already a percentage of system memory + // - we need to convert it to a percentage of machine memory + // - if machine memory is 0.5GB and system has 16GB, we multiply the percentage by 32 + const memoryScaleFactor = this.ctx.machine ? totalMemory / machineMemoryBytes : 1; + + const totals = processes.reduce( + (acc, proc) => ({ + cpu: acc.cpu + proc.cpu, + // Scale memory percentage to machine memory + // TODO: test this + memory: acc.memory + proc.mem * memoryScaleFactor, + rss: acc.rss + proc.rss, + vsz: acc.vsz + proc.vsz, + }), + { cpu: 0, memory: 0, rss: 0, vsz: 0 } + ); + + const count = processes.length; + + const averages = { + cpu: totals.cpu / (count * cpuNormalizer), + memory: totals.memory / count, + rss: totals.rss / count, + vsz: totals.vsz / count, + }; + + return { + node: nodeMetrics, + targetProcess: this.processName + ? { + method, + processName: this.processName, + count, + processes, + averages, + totals: { + cpu: totals.cpu / cpuNormalizer, + memory: totals.memory, + rss: totals.rss, + vsz: totals.vsz, + }, + } + : null, + }; + } catch (error) { + return { + node: nodeMetrics, + targetProcess: this.processName + ? { + method, + processName: this.processName, + count: 0, + processes: [], + averages: null, + totals: null, + } + : null, + }; + } + } + + /** + * Log a snapshot of current resource usage + */ + async logResourceSnapshot(label = "Resource Snapshot"): Promise { + try { + const payload = await this.getResourceSnapshotPayload(); + const enhancedLabel = this.compactLogging + ? this.createCompactLabel(payload, label) + : this.createEnhancedLabel(payload, label); + + if (payload.process.node.isNearHeapLimit) { + this.logger.warn(`${enhancedLabel}: Node is near heap limit`, payload); + } else { + this.logger.info(enhancedLabel, payload); + } + } catch (error) { + this.logger.error( + `Error logging resource snapshot: ${error instanceof Error ? error.message : String(error)}` + ); + } + } + + /** + * Create an enhanced log label with key metrics for quick scanning + */ + private createEnhancedLabel(payload: any, baseLabel: string): string { + const parts: string[] = [baseLabel]; + + // System resources with text indicators + const diskPercent = parseFloat(payload.system.disk.percentUsed); + const memoryPercent = parseFloat(payload.system.memory.percentUsed); + const diskIndicator = this.getTextIndicator(diskPercent, 80, 90); + const memIndicator = this.getTextIndicator(memoryPercent, 80, 90); + parts.push(`Disk:${diskPercent.toFixed(1).padStart(5)}%${diskIndicator}`); + parts.push(`Mem:${memoryPercent.toFixed(1).padStart(5)}%${memIndicator}`); + + // Node process metrics with text indicators + const nodeMemPercent = parseFloat(payload.process.node.memoryUsagePercent); + const heapPercent = parseFloat(payload.process.node.heapUsagePercent); + const nodeIndicator = this.getTextIndicator(nodeMemPercent, 70, 85); + const heapIndicator = this.getTextIndicator(heapPercent, 70, 85); + parts.push(`Node:${nodeMemPercent.toFixed(1).padStart(4)}%${nodeIndicator}`); + parts.push(`Heap:${heapPercent.toFixed(1).padStart(4)}%${heapIndicator}`); + + // Target process metrics (if available) + if (payload.process.targetProcess && payload.process.targetProcess.count > 0) { + const targetCpu = payload.process.targetProcess.totals?.cpuPercent || "0"; + const targetMem = payload.process.targetProcess.totals?.memoryPercent || "0"; + const targetCpuNum = parseFloat(targetCpu); + const targetMemNum = parseFloat(targetMem); + const cpuIndicator = this.getTextIndicator(targetCpuNum, 80, 90); + const memIndicator = this.getTextIndicator(targetMemNum, 80, 90); + parts.push( + `${payload.process.targetProcess.processName}:${targetCpu.padStart( + 4 + )}%${cpuIndicator}/${targetMem.padStart(4)}%${memIndicator}` + ); + } + + // GC activity with performance indicators + if (payload.gc && payload.gc.count > 0) { + const avgDuration = payload.gc.avgDuration; + const gcIndicator = this.getTextIndicator(avgDuration, 5, 10, true); + parts.push( + `GC:${payload.gc.count.toString().padStart(2)}(${avgDuration + .toFixed(1) + .padStart(4)}ms)${gcIndicator}` + ); + } + + // Machine constraints + if (payload.constraints) { + parts.push(`[${payload.constraints.cpu}CPU/${payload.constraints.memoryGB}GB]`); + } + + // Warning indicators (only show critical ones in the main label) + const criticalWarnings: string[] = []; + if (payload.process.node.isNearHeapLimit) criticalWarnings.push("HEAP_LIMIT"); + if (diskPercent > 90) criticalWarnings.push("DISK_CRITICAL"); + if (memoryPercent > 95) criticalWarnings.push("MEM_CRITICAL"); + if (payload.system.disk.warning) criticalWarnings.push("DISK_WARN"); + + if (criticalWarnings.length > 0) { + parts.push(`[${criticalWarnings.join(",")}]`); + } + + return parts.join(" | "); + } + + /** + * Get text-based indicator for percentage values + */ + private getTextIndicator( + value: number, + warningThreshold: number, + criticalThreshold: number, + isDuration = false + ): string { + if (isDuration) { + // For duration values, higher is worse + if (value >= criticalThreshold) return " [CRIT]"; + if (value >= warningThreshold) return " [WARN]"; + return " [OK]"; + } else { + // For percentage values, higher is worse + if (value >= criticalThreshold) return " [CRIT]"; + if (value >= warningThreshold) return " [WARN]"; + return " [OK]"; + } + } + + /** + * Create a compact version of the enhanced label for high-frequency logging + */ + private createCompactLabel(payload: any, baseLabel: string): string { + const parts: string[] = [baseLabel]; + + // Only show critical metrics in compact mode + const diskPercent = parseFloat(payload.system.disk.percentUsed); + const memoryPercent = parseFloat(payload.system.memory.percentUsed); + const heapPercent = parseFloat(payload.process.node.heapUsagePercent); + + // Use single character indicators for compactness + const diskIndicator = diskPercent > 90 ? "!" : diskPercent > 80 ? "?" : "."; + const memIndicator = memoryPercent > 95 ? "!" : memoryPercent > 80 ? "?" : "."; + const heapIndicator = heapPercent > 85 ? "!" : heapPercent > 70 ? "?" : "."; + + parts.push(`D:${diskPercent.toFixed(0).padStart(2)}%${diskIndicator}`); + parts.push(`M:${memoryPercent.toFixed(0).padStart(2)}%${memIndicator}`); + parts.push(`H:${heapPercent.toFixed(0).padStart(2)}%${heapIndicator}`); + + // GC activity (only if significant) + if (payload.gc && payload.gc.count > 0 && payload.gc.avgDuration > 2) { + const gcIndicator = + payload.gc.avgDuration > 10 ? "!" : payload.gc.avgDuration > 5 ? "?" : "."; + parts.push(`GC:${payload.gc.count}${gcIndicator}`); + } + + return parts.join(" "); + } + + async getResourceSnapshotPayload() { + const [systemMetrics, processMetrics] = await Promise.all([ + this.getSystemMetrics(), + this.getProcessMetrics(), + ]); + + const gcSummary = summarizeGCEntries(this.bufferedGcEntries); + this.bufferedGcEntries = []; + + const formatBytes = (bytes: number) => (bytes / (1024 * 1024)).toFixed(2); + const formatPercent = (value: number) => value.toFixed(1); + + return { + system: { + disk: { + limitGiB: DISK_LIMIT_GB, + dirName: this.dirName, + usedGiB: (systemMetrics.disk.used / (1024 * 1024 * 1024)).toFixed(2), + freeGiB: (systemMetrics.disk.free / (1024 * 1024 * 1024)).toFixed(2), + percentUsed: formatPercent(systemMetrics.disk.percentUsed), + warning: systemMetrics.disk.warning, + }, + memory: { + freeGB: (systemMetrics.memory.free / (1024 * 1024 * 1024)).toFixed(2), + percentUsed: formatPercent(systemMetrics.memory.percentUsed), + }, + }, + gc: gcSummary, + constraints: this.ctx.machine + ? { + cpu: this.ctx.machine.cpu, + memoryGB: this.ctx.machine.memory, + diskGB: DISK_LIMIT_BYTES / (1024 * 1024 * 1024), + } + : { + cpu: os.cpus().length, + memoryGB: Math.floor(os.totalmem() / (1024 * 1024 * 1024)), + note: "Using system resources (no machine constraints specified)", + }, + process: { + node: { + memoryUsageMB: formatBytes(processMetrics.node.memoryUsage), + memoryUsagePercent: formatPercent(processMetrics.node.memoryUsagePercent), + heapUsedMB: formatBytes(processMetrics.node.heapUsed), + heapSizeLimitMB: formatBytes(processMetrics.node.heapSizeLimit), + heapUsagePercent: formatPercent(processMetrics.node.heapUsagePercent), + availableHeapMB: formatBytes(processMetrics.node.availableHeap), + isNearHeapLimit: processMetrics.node.isNearHeapLimit, + ...(this.verbose + ? { + heapStats: getHeapStatistics(), + } + : {}), + }, + targetProcess: processMetrics.targetProcess + ? { + method: processMetrics.targetProcess.method, + processName: processMetrics.targetProcess.processName, + count: processMetrics.targetProcess.count, + averages: processMetrics.targetProcess.averages + ? { + cpuPercent: formatPercent(processMetrics.targetProcess.averages.cpu * 100), + memoryPercent: formatPercent(processMetrics.targetProcess.averages.memory), + rssMB: formatBytes(processMetrics.targetProcess.averages.rss * 1024), + vszMB: formatBytes(processMetrics.targetProcess.averages.vsz * 1024), + } + : null, + totals: processMetrics.targetProcess.totals + ? { + cpuPercent: formatPercent(processMetrics.targetProcess.totals.cpu * 100), + memoryPercent: formatPercent(processMetrics.targetProcess.totals.memory), + rssMB: formatBytes(processMetrics.targetProcess.totals.rss * 1024), + vszMB: formatBytes(processMetrics.targetProcess.totals.vsz * 1024), + } + : null, + } + : null, + }, + timestamp: new Date().toISOString(), + }; + } +} + +function summarizeGCEntries(entries: PerformanceEntry[]): GCSummary { + if (entries.length === 0) { + return { + count: 0, + totalDuration: 0, + avgDuration: 0, + maxDuration: 0, + kinds: {}, + }; + } + + let totalDuration = 0; + let maxDuration = 0; + const kinds: Record = {}; + + for (const e of entries) { + const duration = e.duration; + totalDuration += duration; + if (duration > maxDuration) maxDuration = duration; + + const kind = kindName((e as any)?.detail?.kind ?? "unknown"); + if (!kinds[kind]) { + kinds[kind] = { count: 0, totalDuration: 0, maxDuration: 0 }; + } + kinds[kind].count += 1; + kinds[kind].totalDuration += duration; + if (duration > kinds[kind].maxDuration) kinds[kind].maxDuration = duration; + } + + // finalize averages + const avgDuration = totalDuration / entries.length; + const kindsWithAvg: GCSummary["kinds"] = {}; + for (const [kind, stats] of Object.entries(kinds)) { + kindsWithAvg[kind] = { + count: stats.count, + totalDuration: stats.totalDuration, + avgDuration: stats.totalDuration / stats.count, + maxDuration: stats.maxDuration, + }; + } + + return { + count: entries.length, + totalDuration, + avgDuration, + maxDuration, + kinds: kindsWithAvg, + }; +} + +const kindName = (k: number | string) => { + if (typeof k === "number") { + return ( + { + [constants.NODE_PERFORMANCE_GC_MAJOR]: "major", + [constants.NODE_PERFORMANCE_GC_MINOR]: "minor", + [constants.NODE_PERFORMANCE_GC_INCREMENTAL]: "incremental", + [constants.NODE_PERFORMANCE_GC_WEAKCB]: "weak-cb", + }[k] ?? `kind:${k}` + ); + } + return k; +}; +``` + + + +Then, in your task, you can create an instance of the `ResourceMonitor` class and start monitoring memory, disk, and CPU usage: + +```ts /src/trigger/example.ts +import { task, logger, wait } from "@trigger.dev/sdk"; +import { ResourceMonitor } from "../resourceMonitor.js"; + +// Middleware to enable the resource monitor +tasks.middleware("resource-monitor", async ({ ctx, next }) => { + const resourceMonitor = new ResourceMonitor({ + ctx, + }); + + // Only enable the resource monitor if the environment variable is set + if (process.env.RESOURCE_MONITOR_ENABLED === "1") { + resourceMonitor.startMonitoring(1_000); + } + + await next(); + + resourceMonitor.stopMonitoring(); +}); + +export const resourceMonitorTest = task({ + id: "resource-monitor-test", + run: async (payload: any, { ctx }) => { + const interval = createMemoryPressure(); + + await setTimeout(180_000); + + clearInterval(interval); + + return { + message: "Hello, resources!", + }; + }, +}); +``` + +This will produce logs that look like this: + +![Resource monitor logs](/images/machines-resource-monitor-logs.png) + +If you are spawning a child process and you want to monitor its memory usage, you can pass the `processName` option to the `ResourceMonitor` class: + +```ts /src/trigger/example.ts +const resourceMonitor = new ResourceMonitor({ + ctx, + processName: "ffmpeg", +}); +``` + +This will produce logs that includes the memory and CPU usage of the `ffmpeg` process: + +![Resource monitor logs](/images/machines-resource-monitor-ffmpeg.png) + +### Explicit OOM errors You can explicitly throw an Out Of Memory error in your task. This can be useful if you use a native package that detects it's going to run out of memory and then stops before it runs out. If you can detect this, you can then throw this error. ```ts /trigger/heavy-task.ts -import { task } from "@trigger.dev/sdk"; +import { task } from "@trigger.dev/sdk"; import { OutOfMemoryError } from "@trigger.dev/sdk"; export const yourTask = task({ @@ -88,7 +929,7 @@ If OOM errors happen regularly you need to either optimize the memory-efficiency If you are seeing rare OOM errors, it might make sense to add a setting to your task to retry with a large machine when an OOM happens: ```ts /trigger/heavy-task.ts -import { task } from "@trigger.dev/sdk"; +import { task } from "@trigger.dev/sdk"; export const yourTask = task({ id: "your-task", @@ -105,5 +946,7 @@ export const yourTask = task({ ``` - This will only retry the task if you get an OOM error. It won't permanently change the machine that a new run starts on, so if you consistently see OOM errors you should change the machine in the `machine` property. + This will only retry the task if you get an OOM error. It won't permanently change the machine + that a new run starts on, so if you consistently see OOM errors you should change the machine in + the `machine` property. diff --git a/docs/troubleshooting.mdx b/docs/troubleshooting.mdx index d8ed3d095a..8167b4320a 100644 --- a/docs/troubleshooting.mdx +++ b/docs/troubleshooting.mdx @@ -181,6 +181,38 @@ View the [rate limits](/limits) page for more information. This can happen in different situations, for example when using plain strings as idempotency keys. Support for `Crypto` without a special flag was added in Node `v19.0.0`. You will have to upgrade Node - we recommend even-numbered major releases, e.g. `v20` or `v22`. Alternatively, you can switch from plain strings to the `idempotencyKeys.create` SDK function. [Read the guide](/idempotency). +### Task run stalled executing + +If you see a `TASK_RUN_STALLED_EXECUTING` error it means that we didn't receive a heartbeat from your task before the stall timeout. We automatically heartbeat runs every 30 seconds, and the heartbeat timeout is 10 minutes. + + + +If this was a dev run, then most likely the `trigger.dev dev` CLI was stopped, and it wasn't an issue with your code. + + + +These errors can happen when code inside your task is blocking the event loop for too long. The most likely cause would be an accidental infinite loop. It could also be a CPU-heavy operation that's blocking the event loop, like nested loops with very large arrays. We recommend reading the [Don't Block the Event Loop](https://nodejs.org/en/learn/asynchronous-work/dont-block-the-event-loop) guide from Node.js for common patterns that can cause this. + +If you are doing a continuous CPU-heavy task, then we recommend you try using our `heartbeats.yield` function to automatically yield to the event loop periodically: + +```ts +import { heartbeats } from "@trigger.dev/sdk"; + +// code inside your task +for (const row of bigDataset) { + await heartbeats.yield(); // safe to call every iteration, we will only actually yield when we need to + process(row); // this is a synchronous operation +} +``` + + + +You could also offload the CPU-heavy work to a Node.js worker thread, but this is more complex to setup currently. We are planning on adding support for this in the future. + + + +If the above doesn't work, then we recommend you try increasing the machine size of your task. See our [machines guide](/machines) for more information. + ## Framework specific issues ### NestJS swallows all errors/exceptions diff --git a/internal-packages/run-engine/src/engine/errors.ts b/internal-packages/run-engine/src/engine/errors.ts index e4ff702ee0..cfc12e1b95 100644 --- a/internal-packages/run-engine/src/engine/errors.ts +++ b/internal-packages/run-engine/src/engine/errors.ts @@ -1,8 +1,11 @@ import { assertExhaustive } from "@trigger.dev/core"; import { TaskRunError } from "@trigger.dev/core/v3"; -import { TaskRunStatus } from "@trigger.dev/database"; +import { RuntimeEnvironmentType, TaskRunStatus } from "@trigger.dev/database"; -export function runStatusFromError(error: TaskRunError): TaskRunStatus { +export function runStatusFromError( + error: TaskRunError, + environmentType: RuntimeEnvironmentType +): TaskRunStatus { if (error.type !== "INTERNAL_ERROR") { return "COMPLETED_WITH_ERRORS"; } @@ -21,6 +24,15 @@ export function runStatusFromError(error: TaskRunError): TaskRunStatus { return "CANCELED"; case "MAX_DURATION_EXCEEDED": return "TIMED_OUT"; + case "TASK_RUN_STALLED_EXECUTING": + case "TASK_RUN_STALLED_EXECUTING_WITH_WAITPOINTS": { + if (environmentType === "DEVELOPMENT") { + return "CANCELED"; + } + + return "COMPLETED_WITH_ERRORS"; + } + case "TASK_PROCESS_OOM_KILLED": case "TASK_PROCESS_MAYBE_OOM_KILLED": case "TASK_PROCESS_SIGSEGV": @@ -40,8 +52,6 @@ export function runStatusFromError(error: TaskRunError): TaskRunStatus { case "TASK_DEQUEUED_INVALID_STATE": case "TASK_DEQUEUED_QUEUE_NOT_FOUND": case "TASK_RUN_DEQUEUED_MAX_RETRIES": - case "TASK_RUN_STALLED_EXECUTING": - case "TASK_RUN_STALLED_EXECUTING_WITH_WAITPOINTS": case "TASK_HAS_N0_EXECUTION_SNAPSHOT": case "GRACEFUL_EXIT_TIMEOUT": case "POD_EVICTED": diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index e81aedfe8b..6e426aaa26 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -8,10 +8,12 @@ import { CreateCheckpointResult, DequeuedMessage, ExecutionResult, + formatDurationMilliseconds, RunExecutionData, StartRunAttemptResult, TaskRunContext, TaskRunExecutionResult, + TaskRunInternalError, } from "@trigger.dev/core/v3"; import { RunId, WaitpointId } from "@trigger.dev/core/v3/isomorphic"; import { @@ -212,6 +214,8 @@ export class RunEngine { }); if (!options.worker.disabled) { + console.log("✅ Starting run engine worker"); + this.worker.start(); } @@ -1190,23 +1194,6 @@ export class RunEngine { snapshot: latestSnapshot, }); - // For dev, we just cancel runs that are stuck - if (latestSnapshot.environmentType === "DEVELOPMENT") { - this.logger.log("RunEngine.#handleStalledSnapshot() cancelling DEV run", { - runId, - snapshot: latestSnapshot, - }); - - await this.cancelRun({ - runId: latestSnapshot.runId, - finalizeRun: true, - reason: - "Run was disconnected, check you're running the CLI dev command and your network connection is healthy.", - tx, - }); - return; - } - switch (latestSnapshot.executionStatus) { case "RUN_CREATED": { throw new NotImplementedError("There shouldn't be a heartbeat for RUN_CREATED"); @@ -1263,28 +1250,64 @@ export class RunEngine { } case "EXECUTING": case "EXECUTING_WITH_WAITPOINTS": { + // Stalls for production runs should start being treated as an OOM error. + // We should calculate the retry delay using the retry settings on the run/task instead of hardcoding it. + // Stalls for dev runs should keep being treated as a timeout error because the vast majority of the time these snapshots stall because + // they have quit the CLI + const retryDelay = 250; - //todo call attemptFailed and force requeuing + const timeoutDuration = + latestSnapshot.executionStatus === "EXECUTING" + ? formatDurationMilliseconds(this.heartbeatTimeouts.EXECUTING) + : formatDurationMilliseconds(this.heartbeatTimeouts.EXECUTING_WITH_WAITPOINTS); + + // Dev runs don't retry, because the vast majority of the time these snapshots stall because + // they have quit the CLI + const shouldRetry = latestSnapshot.environmentType !== "DEVELOPMENT"; + const errorMessage = + latestSnapshot.environmentType === "DEVELOPMENT" + ? `Run timed out after ${timeoutDuration} due to missing heartbeats (sent every 30s). Check if your \`trigger.dev dev\` CLI is still running, or if CPU-heavy work is blocking the main thread.` + : `Run timed out after ${timeoutDuration} due to missing heartbeats (sent every 30s). This typically happens when CPU-heavy work blocks the main thread.`; + + const taskStalledErrorCode = + latestSnapshot.executionStatus === "EXECUTING" + ? "TASK_RUN_STALLED_EXECUTING" + : "TASK_RUN_STALLED_EXECUTING_WITH_WAITPOINTS"; + + const error = + latestSnapshot.environmentType === "DEVELOPMENT" + ? ({ + type: "INTERNAL_ERROR", + code: taskStalledErrorCode, + message: errorMessage, + } satisfies TaskRunInternalError) + : this.options.treatProductionExecutionStallsAsOOM + ? ({ + type: "INTERNAL_ERROR", + code: "TASK_PROCESS_OOM_KILLED", + message: "Run was terminated due to running out of memory", + } satisfies TaskRunInternalError) + : ({ + type: "INTERNAL_ERROR", + code: taskStalledErrorCode, + message: errorMessage, + } satisfies TaskRunInternalError); + await this.runAttemptSystem.attemptFailed({ runId, snapshotId: latestSnapshot.id, completion: { ok: false, id: runId, - error: { - type: "INTERNAL_ERROR", - code: - latestSnapshot.executionStatus === "EXECUTING" - ? "TASK_RUN_STALLED_EXECUTING" - : "TASK_RUN_STALLED_EXECUTING_WITH_WAITPOINTS", - message: `Run stalled while executing. This can happen when the run becomes unresponsive, for example because the CPU is overloaded.`, - }, - retry: { - //250ms in the future - timestamp: Date.now() + retryDelay, - delay: retryDelay, - }, + error, + retry: shouldRetry + ? { + //250ms in the future + timestamp: Date.now() + retryDelay, + delay: retryDelay, + } + : undefined, }, forceRequeue: true, tx: prisma, diff --git a/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts b/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts index 842eaefff1..0b4449f42f 100644 --- a/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts @@ -51,6 +51,7 @@ import { RunEngineOptions } from "../types.js"; import { BatchSystem } from "./batchSystem.js"; import { DelayedRunSystem } from "./delayedRunSystem.js"; import { + EnhancedExecutionSnapshot, executionResultFromSnapshot, ExecutionSnapshotSystem, getLatestExecutionSnapshot, @@ -690,6 +691,10 @@ export class RunAttemptSystem { throw new ServiceValidationError("Snapshot ID doesn't match the latest snapshot", 400); } + if (latestSnapshot.executionStatus === "FINISHED") { + throw new ServiceValidationError("Run is already finished", 400); + } + span.setAttribute("completionStatus", completion.ok); const completedAt = new Date(); @@ -843,6 +848,10 @@ export class RunAttemptSystem { throw new ServiceValidationError("Snapshot ID doesn't match the latest snapshot", 400); } + if (latestSnapshot.executionStatus === "FINISHED") { + throw new ServiceValidationError("Run is already finished", 400); + } + span.setAttribute("completionStatus", completion.ok); //remove waitpoints blocking the run @@ -923,7 +932,7 @@ export class RunAttemptSystem { case "fail_run": { return await this.#permanentlyFailRun({ runId, - snapshotId, + latestSnapshot, failedAt, error: retryResult.sanitizedError, workerId, @@ -1443,14 +1452,14 @@ export class RunAttemptSystem { async #permanentlyFailRun({ runId, - snapshotId, + latestSnapshot, failedAt, error, workerId, runnerId, }: { runId: string; - snapshotId?: string; + latestSnapshot: EnhancedExecutionSnapshot; failedAt: Date; error: TaskRunError; workerId?: string; @@ -1459,7 +1468,7 @@ export class RunAttemptSystem { const prisma = this.$.prisma; return startSpan(this.$.tracer, "permanentlyFailRun", async (span) => { - const status = runStatusFromError(error); + const status = runStatusFromError(error, latestSnapshot.environmentType); //run permanently failed const run = await prisma.taskRun.update({ @@ -1512,7 +1521,7 @@ export class RunAttemptSystem { executionStatus: "FINISHED", description: "Run failed", }, - previousSnapshotId: snapshotId, + previousSnapshotId: latestSnapshot.id, environmentId: run.runtimeEnvironment.id, environmentType: run.runtimeEnvironment.type, projectId: run.runtimeEnvironment.project.id, diff --git a/internal-packages/run-engine/src/engine/types.ts b/internal-packages/run-engine/src/engine/types.ts index 15a0322d39..5125be560e 100644 --- a/internal-packages/run-engine/src/engine/types.ts +++ b/internal-packages/run-engine/src/engine/types.ts @@ -71,6 +71,7 @@ export type RunEngineOptions = { /** If not set then checkpoints won't ever be used */ retryWarmStartThresholdMs?: number; heartbeatTimeoutsMs?: Partial; + treatProductionExecutionStallsAsOOM?: boolean; suspendedHeartbeatRetriesConfig?: { maxCount?: number; maxDelayMs?: number; diff --git a/packages/cli-v3/src/entryPoints/dev-run-controller.ts b/packages/cli-v3/src/entryPoints/dev-run-controller.ts index 50d9a61719..12876b3240 100644 --- a/packages/cli-v3/src/entryPoints/dev-run-controller.ts +++ b/packages/cli-v3/src/entryPoints/dev-run-controller.ts @@ -4,6 +4,7 @@ import { IntervalService, LogLevel, RunExecutionData, + SuspendedProcessError, TaskRunExecution, TaskRunExecutionMetrics, TaskRunExecutionResult, @@ -26,7 +27,6 @@ type DevRunControllerOptions = { worker: BackgroundWorker; httpClient: CliApiClient; logLevel: LogLevel; - heartbeatIntervalSeconds?: number; taskRunProcessPool: TaskRunProcessPool; onSubscribeToRunNotifications: (run: Run, snapshot: Snapshot) => void; onUnsubscribeFromRunNotifications: (run: Run, snapshot: Snapshot) => void; @@ -47,11 +47,11 @@ export class DevRunController { private taskRunProcess?: TaskRunProcess; private readonly worker: BackgroundWorker; private readonly httpClient: CliApiClient; - private readonly runHeartbeat: IntervalService; - private readonly heartbeatIntervalSeconds: number; private readonly snapshotPoller: IntervalService; private readonly snapshotPollIntervalSeconds: number; private readonly cwd?: string; + private isCompletingRun = false; + private isShuttingDown = false; private state: | { @@ -67,7 +67,6 @@ export class DevRunController { this.onExitRunPhase(run); this.state = { phase: "RUN", run, snapshot }; - this.runHeartbeat.start(); this.snapshotPoller.start(); } @@ -77,7 +76,6 @@ export class DevRunController { }); this.worker = opts.worker; - this.heartbeatIntervalSeconds = opts.heartbeatIntervalSeconds || 20; this.snapshotPollIntervalSeconds = 5; this.cwd = opts.cwd; this.httpClient = opts.httpClient; @@ -125,36 +123,7 @@ export class DevRunController { }, }); - this.runHeartbeat = new IntervalService({ - onInterval: async () => { - if (!this.runFriendlyId || !this.snapshotFriendlyId) { - logger.debug("[DevRunController] Skipping heartbeat, no run ID or snapshot ID"); - return; - } - - logger.debug("[DevRunController] Sending heartbeat"); - - const response = await this.httpClient.dev.heartbeatRun( - this.runFriendlyId, - this.snapshotFriendlyId, - { - cpu: 0, - memory: 0, - } - ); - - if (!response.success) { - logger.debug("[DevRunController] Heartbeat failed", { error: response.error }); - } - }, - intervalMs: this.heartbeatIntervalSeconds * 1000, - leadingEdge: false, - onError: async (error) => { - logger.debug("[DevRunController] Failed to send heartbeat", { error }); - }, - }); - - process.on("SIGTERM", this.sigterm); + process.on("SIGTERM", this.sigterm.bind(this)); } private async sigterm() { @@ -245,7 +214,6 @@ export class DevRunController { logger.debug("onExitRunPhase: Exiting run phase", { newRun }); - this.runHeartbeat.stop(); this.snapshotPoller.stop(); const { run, snapshot } = this.state; @@ -389,7 +357,15 @@ export class DevRunController { return; } case "FINISHED": { - logger.debug("Run is finished, nothing to do"); + if (this.isCompletingRun) { + logger.debug("Run is finished but we're completing it, skipping"); + return; + } + + await this.exitTaskRunProcessWithoutFailingRun({ + flush: true, + reason: "already-finished", + }); return; } case "EXECUTING_WITH_WAITPOINTS": { @@ -549,6 +525,12 @@ export class DevRunController { error, }); + if (error instanceof SuspendedProcessError) { + logger.debug("Attempt execution suspended", { error }); + this.runFinished(); + return; + } + logger.debug("Submitting attempt completion", { runId: run.friendlyId, snapshotId: snapshot.friendlyId, @@ -613,6 +595,8 @@ export class DevRunController { build: this.opts.worker.build, }); + this.isCompletingRun = false; + // Get process from pool instead of creating new one const { taskRunProcess, isReused } = await this.opts.taskRunProcessPool.getProcess( this.opts.worker.manifest, @@ -633,6 +617,30 @@ export class DevRunController { this.taskRunProcess = taskRunProcess; + taskRunProcess.unsafeDetachEvtHandlers(); + + taskRunProcess.onTaskRunHeartbeat.attach(async (runId) => { + if (!this.runFriendlyId || !this.snapshotFriendlyId) { + logger.debug("[DevRunController] Skipping heartbeat, no run ID or snapshot ID"); + return; + } + + logger.debug("[DevRunController] Sending heartbeat"); + + const response = await this.httpClient.dev.heartbeatRun( + this.runFriendlyId, + this.snapshotFriendlyId, + { + cpu: 0, + memory: 0, + } + ); + + if (!response.success) { + logger.debug("[DevRunController] Heartbeat failed", { error: response.error }); + } + }); + // Update the process environment for this specific run // Note: We may need to enhance TaskRunProcess to support updating env vars logger.debug("executing task run process from pool", { @@ -659,6 +667,8 @@ export class DevRunController { logger.debug("Completed run", completion); + this.isCompletingRun = true; + // Return process to pool instead of killing it try { const version = this.opts.worker.serverWorker?.version || "unknown"; @@ -779,6 +789,37 @@ export class DevRunController { assertExhaustive(attemptStatus); } + private async exitTaskRunProcessWithoutFailingRun({ + flush, + reason, + }: { + flush: boolean; + reason: string; + }) { + await this.taskRunProcess?.suspend({ flush }); + + // No services should be left running after this line - let's make sure of it + this.shutdownExecution(`exitTaskRunProcessWithoutFailingRun: ${reason}`); + } + + private shutdownExecution(reason: string) { + if (this.isShuttingDown) { + logger.debug(`[shutdown] ${reason} (already shutting down)`, { + newReason: reason, + }); + return; + } + + logger.debug(`[shutdown] ${reason}`); + + this.isShuttingDown = true; + + this.snapshotPoller.stop(); + + this.opts.onFinished(); + this.taskRunProcess?.unsafeDetachEvtHandlers(); + } + private async runFinished() { // Return the process to the pool instead of killing it directly if (this.taskRunProcess) { @@ -791,7 +832,6 @@ export class DevRunController { } } - this.runHeartbeat.stop(); this.snapshotPoller.stop(); this.opts.onFinished(); @@ -828,7 +868,6 @@ export class DevRunController { } } - this.runHeartbeat.stop(); this.snapshotPoller.stop(); } diff --git a/packages/cli-v3/src/entryPoints/dev-run-worker.ts b/packages/cli-v3/src/entryPoints/dev-run-worker.ts index 2f44466170..9239f2b2bd 100644 --- a/packages/cli-v3/src/entryPoints/dev-run-worker.ts +++ b/packages/cli-v3/src/entryPoints/dev-run-worker.ts @@ -31,6 +31,7 @@ import { WorkerManifest, WorkerToExecutorMessageCatalog, traceContext, + heartbeats, } from "@trigger.dev/core/v3"; import { TriggerTracer } from "@trigger.dev/core/v3/tracer"; import { @@ -55,6 +56,7 @@ import { usage, UsageTimeoutManager, StandardTraceContextManager, + StandardHeartbeatsManager, } from "@trigger.dev/core/v3/workers"; import { ZodIpcConnection } from "@trigger.dev/core/v3/zodIpc"; import { readFile } from "node:fs/promises"; @@ -145,6 +147,11 @@ waitUntil.setGlobalManager(waitUntilManager); const triggerLogLevel = getEnvVar("TRIGGER_LOG_LEVEL"); const showInternalLogs = getEnvVar("RUN_WORKER_SHOW_LOGS") === "true"; +const standardHeartbeatsManager = new StandardHeartbeatsManager( + parseInt(heartbeatIntervalMs ?? "30000", 10) +); +heartbeats.setGlobalManager(standardHeartbeatsManager); + async function importConfig( configPath: string ): Promise<{ config: TriggerConfig; handleError?: HandleErrorFunction }> { @@ -303,6 +310,7 @@ function resetExecutionEnvironment() { durableClock.reset(); taskContext.disable(); standardTraceContextManager.reset(); + standardHeartbeatsManager.reset(); // Wait for all streams to finish before completing the run waitUntil.register({ @@ -518,6 +526,8 @@ const zodIpc = new ZodIpcConnection({ _execution = execution; _isRunning = true; + standardHeartbeatsManager.startHeartbeat(attemptKey(execution)); + runMetadataManager.startPeriodicFlush( getNumberEnvVar("TRIGGER_RUN_METADATA_FLUSH_INTERVAL", 1000) ); @@ -550,6 +560,8 @@ const zodIpc = new ZodIpcConnection({ }); } } finally { + standardHeartbeatsManager.stopHeartbeat(); + _execution = undefined; _isRunning = false; log(`[${new Date().toISOString()}] Task run completed`); @@ -683,17 +695,9 @@ async function flushMetadata(timeoutInMs: number = 10_000) { _sharedWorkerRuntime = new SharedRuntimeManager(zodIpc, showInternalLogs); runtime.setGlobalRuntimeManager(_sharedWorkerRuntime); -const heartbeatInterval = parseInt(heartbeatIntervalMs ?? "30000", 10); - -for await (const _ of setInterval(heartbeatInterval)) { - if (_isRunning && _execution) { - try { - await zodIpc.send("TASK_HEARTBEAT", { id: attemptKey(_execution) }); - } catch (err) { - logError("Failed to send HEARTBEAT message", err); - } - } -} +standardHeartbeatsManager.registerListener(async (id) => { + await zodIpc.send("TASK_HEARTBEAT", { id }); +}); function log(message: string, ...args: any[]) { if (!showInternalLogs) return; diff --git a/packages/cli-v3/src/entryPoints/managed-run-worker.ts b/packages/cli-v3/src/entryPoints/managed-run-worker.ts index a962b4fbba..a9c593d720 100644 --- a/packages/cli-v3/src/entryPoints/managed-run-worker.ts +++ b/packages/cli-v3/src/entryPoints/managed-run-worker.ts @@ -30,6 +30,7 @@ import { WorkerManifest, WorkerToExecutorMessageCatalog, traceContext, + heartbeats, } from "@trigger.dev/core/v3"; import { TriggerTracer } from "@trigger.dev/core/v3/tracer"; import { @@ -55,6 +56,7 @@ import { usage, UsageTimeoutManager, StandardTraceContextManager, + StandardHeartbeatsManager, } from "@trigger.dev/core/v3/workers"; import { ZodIpcConnection } from "@trigger.dev/core/v3/zodIpc"; import { readFile } from "node:fs/promises"; @@ -134,6 +136,11 @@ runMetadata.setGlobalManager(runMetadataManager); const waitUntilManager = new StandardWaitUntilManager(); waitUntil.setGlobalManager(waitUntilManager); +const standardHeartbeatsManager = new StandardHeartbeatsManager( + parseInt(heartbeatIntervalMs ?? "30000", 10) +); +heartbeats.setGlobalManager(standardHeartbeatsManager); + const triggerLogLevel = getEnvVar("TRIGGER_LOG_LEVEL"); async function importConfig( @@ -289,6 +296,7 @@ function resetExecutionEnvironment() { durableClock.reset(); taskContext.disable(); standardTraceContextManager.reset(); + standardHeartbeatsManager.reset(); // Wait for all streams to finish before completing the run waitUntil.register({ @@ -522,6 +530,8 @@ const zodIpc = new ZodIpcConnection({ _execution = execution; _isRunning = true; + standardHeartbeatsManager.startHeartbeat(_execution.run.id); + runMetadataManager.startPeriodicFlush( getNumberEnvVar("TRIGGER_RUN_METADATA_FLUSH_INTERVAL", 1000) ); @@ -549,6 +559,8 @@ const zodIpc = new ZodIpcConnection({ }); } } finally { + standardHeartbeatsManager.stopHeartbeat(); + _execution = undefined; _isRunning = false; @@ -722,16 +734,8 @@ runtime.setGlobalRuntimeManager(_sharedWorkerRuntime); process.title = "trigger-managed-worker"; -const heartbeatInterval = parseInt(heartbeatIntervalMs ?? "30000", 10); - -for await (const _ of setInterval(heartbeatInterval)) { - if (_isRunning && _execution) { - try { - await zodIpc.send("TASK_HEARTBEAT", { id: _execution.run.id }); - } catch (err) { - console.error("Failed to send HEARTBEAT message", err); - } - } -} +standardHeartbeatsManager.registerListener(async (id) => { + await zodIpc.send("TASK_HEARTBEAT", { id }); +}); console.log(`[${new Date().toISOString()}] Executor started`); diff --git a/packages/cli-v3/src/executions/taskRunProcess.ts b/packages/cli-v3/src/executions/taskRunProcess.ts index 7b2adb6eb6..be971294aa 100644 --- a/packages/cli-v3/src/executions/taskRunProcess.ts +++ b/packages/cli-v3/src/executions/taskRunProcess.ts @@ -111,9 +111,7 @@ export class TaskRunProcess { try { await this.#cancel(); - } catch (err) { - console.error("Error cancelling task run process", { err }); - } + } catch (err) {} await this.#gracefullyTerminate(this.options.gracefulTerminationTimeoutInMs); } @@ -125,11 +123,7 @@ export class TaskRunProcess { return; } - try { - await this.#flush(); - } catch (err) { - console.error("Error flushing task run process", { err }); - } + await tryCatch(this.#flush()); if (kill) { await this.#gracefullyTerminate(this.options.gracefulTerminationTimeoutInMs); @@ -445,11 +439,7 @@ export class TaskRunProcess { this._isBeingSuspended = true; if (flush) { - const [error] = await tryCatch(this.#flush()); - - if (error) { - console.error("Error flushing task run process", { error }); - } + await tryCatch(this.#flush()); } await this.kill("SIGKILL"); diff --git a/packages/core/src/v3/errors.ts b/packages/core/src/v3/errors.ts index 6cf900d7b9..f857cfc32d 100644 --- a/packages/core/src/v3/errors.ts +++ b/packages/core/src/v3/errors.ts @@ -569,14 +569,14 @@ const prettyInternalErrors: Partial< Record< TaskRunInternalError["code"], { - message: string; + message?: string; link?: ErrorLink; } > > = { TASK_PROCESS_OOM_KILLED: { message: - "Your task ran out of memory. Try increasing the machine specs. If this doesn't fix it there might be a memory leak.", + "Your run was terminated due to exceeding the machine's memory limit. Try increasing the machine preset in your task options or replay using a larger machine.", link: { name: "Machines", href: links.docs.machines.home, @@ -584,7 +584,7 @@ const prettyInternalErrors: Partial< }, TASK_PROCESS_MAYBE_OOM_KILLED: { message: - "We think your task ran out of memory, but we can't be certain. If this keeps happening, try increasing the machine specs.", + "Your run was terminated due to exceeding the machine's memory limit. Try increasing the machine preset in your task options or replay using a larger machine.", link: { name: "Machines", href: links.docs.machines.home, @@ -632,6 +632,12 @@ const prettyInternalErrors: Partial< href: links.docs.concurrency.recursiveDeadlock, }, }, + TASK_RUN_STALLED_EXECUTING: { + link: { + name: "Read our troubleshooting guide", + href: links.docs.troubleshooting.stalledExecution, + }, + }, }; const getPrettyTaskRunError = (code: TaskRunInternalError["code"]): TaskRunInternalError => { diff --git a/packages/core/src/v3/heartbeats-api.ts b/packages/core/src/v3/heartbeats-api.ts new file mode 100644 index 0000000000..f0e9a269e5 --- /dev/null +++ b/packages/core/src/v3/heartbeats-api.ts @@ -0,0 +1,5 @@ +// Split module-level variable definition into separate files to allow +// tree-shaking on each api instance. +import { HeartbeatsAPI } from "./heartbeats/api.js"; +/** Entrypoint for heartbeats API */ +export const heartbeats = HeartbeatsAPI.getInstance(); diff --git a/packages/core/src/v3/heartbeats/api.ts b/packages/core/src/v3/heartbeats/api.ts new file mode 100644 index 0000000000..27dfeac213 --- /dev/null +++ b/packages/core/src/v3/heartbeats/api.ts @@ -0,0 +1,73 @@ +import { getGlobal, registerGlobal, unregisterGlobal } from "../utils/globals.js"; +import { HeartbeatsManager } from "./types.js"; + +const API_NAME = "heartbeats"; + +class NoopHeartbeatsManager implements HeartbeatsManager { + startHeartbeat(id: string) { + return; + } + + stopHeartbeat() { + return; + } + + async yield() { + return; + } + + get lastHeartbeat(): Date | undefined { + return undefined; + } + + reset() {} +} + +const NOOP_HEARTBEATS_MANAGER = new NoopHeartbeatsManager(); + +export class HeartbeatsAPI implements HeartbeatsManager { + private static _instance?: HeartbeatsAPI; + + private constructor() {} + + public static getInstance(): HeartbeatsAPI { + if (!this._instance) { + this._instance = new HeartbeatsAPI(); + } + + return this._instance; + } + + public setGlobalManager(manager: HeartbeatsManager): boolean { + return registerGlobal(API_NAME, manager); + } + + public disable() { + unregisterGlobal(API_NAME); + } + + public reset() { + this.#getManager().reset(); + this.disable(); + } + + public get lastHeartbeat(): Date | undefined { + return this.#getManager().lastHeartbeat; + } + + public startHeartbeat(id: string) { + return this.#getManager().startHeartbeat(id); + } + + public stopHeartbeat() { + return this.#getManager().stopHeartbeat(); + } + + public yield() { + return this.#getManager().yield(); + } + + #getManager(): HeartbeatsManager { + return getGlobal(API_NAME) ?? NOOP_HEARTBEATS_MANAGER; + } +} diff --git a/packages/core/src/v3/heartbeats/manager.ts b/packages/core/src/v3/heartbeats/manager.ts new file mode 100644 index 0000000000..2648d49256 --- /dev/null +++ b/packages/core/src/v3/heartbeats/manager.ts @@ -0,0 +1,78 @@ +import { tryCatch } from "../tryCatch.js"; +import { HeartbeatsManager } from "./types.js"; +import { setInterval, setImmediate, setTimeout } from "node:timers/promises"; + +export class StandardHeartbeatsManager implements HeartbeatsManager { + private listener: ((id: string) => Promise) | undefined = undefined; + private currentAbortController: AbortController | undefined = undefined; + private lastHeartbeatYieldTime: number | undefined = undefined; + private lastHeartbeatDate: Date | undefined = undefined; + + constructor(private readonly intervalInMs: number) {} + + registerListener(callback: (id: string) => Promise) { + this.listener = callback; + } + + async yield(): Promise { + if (!this.lastHeartbeatYieldTime) { + return; + } + + // Only call setImmediate if we haven't yielded in the last interval + if (Date.now() - this.lastHeartbeatYieldTime >= this.intervalInMs) { + // await setImmediate(); + await setTimeout(24); + + this.lastHeartbeatYieldTime = Date.now(); + } + } + + startHeartbeat(id: string) { + this.stopHeartbeat(); + this.currentAbortController = new AbortController(); + this.lastHeartbeatYieldTime = Date.now(); + + // Ignore errors as we expect them to be thrown when the heartbeat is stopped + this.startHeartbeatLoop(id, this.currentAbortController.signal).catch((error) => {}); + } + + private async startHeartbeatLoop(id: string, signal: AbortSignal) { + try { + for await (const _ of setInterval(this.intervalInMs, undefined, { + signal, + })) { + if (this.listener) { + const [error] = await tryCatch(this.listener(id)); + this.lastHeartbeatDate = new Date(); + + if (error) { + console.error("Failed to send HEARTBEAT message", { error: String(error) }); + } + } + } + } catch (error) { + // Ignore errors as we expect them to be thrown when the heartbeat is stopped + // And since we tryCatch inside the loop, we don't need to handle any other errors here + } + } + + stopHeartbeat(): void { + this.currentAbortController?.abort(); + } + + get lastHeartbeat(): Date | undefined { + return this.lastHeartbeatDate; + } + + reset() { + this.stopHeartbeat(); + this.lastHeartbeatDate = undefined; + this.lastHeartbeatYieldTime = undefined; + this.currentAbortController = undefined; + + // NOTE: Don't reset the listener, it's really just a single global callback, + // but because of the structure of the dev/managed-run-worker and the ZodIpc constructor, + // we have to create the StandardHeartbeatsManager instance before the ZodIpc instance is created. + } +} diff --git a/packages/core/src/v3/heartbeats/types.ts b/packages/core/src/v3/heartbeats/types.ts new file mode 100644 index 0000000000..f93dfa514b --- /dev/null +++ b/packages/core/src/v3/heartbeats/types.ts @@ -0,0 +1,8 @@ +export interface HeartbeatsManager { + startHeartbeat(id: string): void; + stopHeartbeat(): void; + yield(): Promise; + reset(): void; + + get lastHeartbeat(): Date | undefined; +} diff --git a/packages/core/src/v3/index.ts b/packages/core/src/v3/index.ts index e1785e5049..58b095aaa5 100644 --- a/packages/core/src/v3/index.ts +++ b/packages/core/src/v3/index.ts @@ -18,6 +18,7 @@ export * from "./timeout-api.js"; export * from "./run-timeline-metrics-api.js"; export * from "./lifecycle-hooks-api.js"; export * from "./locals-api.js"; +export * from "./heartbeats-api.js"; export * from "./schemas/index.js"; export { SemanticInternalAttributes } from "./semanticInternalAttributes.js"; export * from "./resource-catalog-api.js"; diff --git a/packages/core/src/v3/links.ts b/packages/core/src/v3/links.ts index f514bf62f8..d04284e73f 100644 --- a/packages/core/src/v3/links.ts +++ b/packages/core/src/v3/links.ts @@ -14,6 +14,7 @@ export const links = { }, troubleshooting: { concurrentWaits: "https://trigger.dev/docs/troubleshooting#parallel-waits-are-not-supported", + stalledExecution: "https://trigger.dev/docs/troubleshooting#task-run-stalled-executing", }, concurrency: { recursiveDeadlock: diff --git a/packages/core/src/v3/utils/globals.ts b/packages/core/src/v3/utils/globals.ts index 570bb34150..f2bdf8a936 100644 --- a/packages/core/src/v3/utils/globals.ts +++ b/packages/core/src/v3/utils/globals.ts @@ -1,5 +1,6 @@ import { ApiClientConfiguration } from "../apiClientManager/types.js"; import { Clock } from "../clock/clock.js"; +import { HeartbeatsManager } from "../heartbeats/types.js"; import { LifecycleHooksManager } from "../lifecycleHooks/types.js"; import { LocalsManager } from "../locals/types.js"; import { ResourceCatalog } from "../resource-catalog/catalog.js"; @@ -68,4 +69,5 @@ type TriggerDotDevGlobalAPI = { ["lifecycle-hooks"]?: LifecycleHooksManager; ["locals"]?: LocalsManager; ["trace-context"]?: TraceContextManager; + ["heartbeats"]?: HeartbeatsManager; }; diff --git a/packages/core/src/v3/workers/index.ts b/packages/core/src/v3/workers/index.ts index 613fe33025..83c4cc1d54 100644 --- a/packages/core/src/v3/workers/index.ts +++ b/packages/core/src/v3/workers/index.ts @@ -29,3 +29,4 @@ export { StandardLifecycleHooksManager } from "../lifecycleHooks/manager.js"; export { StandardLocalsManager } from "../locals/manager.js"; export { populateEnv } from "./populateEnv.js"; export { StandardTraceContextManager } from "../traceContext/manager.js"; +export { StandardHeartbeatsManager } from "../heartbeats/manager.js"; diff --git a/packages/trigger-sdk/src/v3/heartbeats.ts b/packages/trigger-sdk/src/v3/heartbeats.ts new file mode 100644 index 0000000000..7c84abede1 --- /dev/null +++ b/packages/trigger-sdk/src/v3/heartbeats.ts @@ -0,0 +1,44 @@ +import { heartbeats as coreHeartbeats } from "@trigger.dev/core/v3"; + +/** + * + * Yields to the Trigger.dev runtime to keep the task alive. + * + * This is a cooperative "heartbeat" that you can call as often as you like + * inside long-running or CPU-heavy loops (e.g. parsing large files, processing + * many records, or handling big Textract results). + * + * You don’t need to worry about over-calling it: the underlying implementation + * automatically decides when to actually yield to the event loop and send a + * heartbeat to the Trigger.dev runtime. Extra calls are effectively free. + * + * ### Example + * ```ts + * import { heartbeats } from "@trigger.dev/sdk/v3"; + * + * for (const row of bigDataset) { + * process(row); + * await heartbeats.yield(); // safe to call every iteration + * } + * ``` + * + * Using this regularly prevents `TASK_RUN_STALLED_EXECUTING` errors by ensuring + * the run never appears idle, even during heavy synchronous work. + * + * This function is also safe to call from outside of a Trigger.dev task run, it will effectively be a no-op. + */ +async function heartbeatsYield() { + await coreHeartbeats.yield(); +} + +/** + * Returns the last heartbeat timestamp, for debugging purposes only. You probably don't need this. + */ +function heartbeatsGetLastHeartbeat() { + return coreHeartbeats.lastHeartbeat; +} + +export const heartbeats = { + yield: heartbeatsYield, + getLastHeartbeat: heartbeatsGetLastHeartbeat, +}; diff --git a/packages/trigger-sdk/src/v3/index.ts b/packages/trigger-sdk/src/v3/index.ts index a9a833fe52..77448ae432 100644 --- a/packages/trigger-sdk/src/v3/index.ts +++ b/packages/trigger-sdk/src/v3/index.ts @@ -15,6 +15,7 @@ export * from "./webhooks.js"; export * from "./locals.js"; export * from "./otel.js"; export * from "./schemas.js"; +export * from "./heartbeats.js"; export type { Context }; import type { Context } from "./shared.js"; diff --git a/references/hello-world/src/resourceMonitor.ts b/references/hello-world/src/resourceMonitor.ts index 9b4bf28501..27f8418d64 100644 --- a/references/hello-world/src/resourceMonitor.ts +++ b/references/hello-world/src/resourceMonitor.ts @@ -3,6 +3,8 @@ import { exec } from "node:child_process"; import os from "node:os"; import { promises as fs } from "node:fs"; import { type Context, logger } from "@trigger.dev/sdk"; +import { getHeapStatistics } from "node:v8"; +import { PerformanceObserver, constants } from "node:perf_hooks"; const execAsync = promisify(exec); @@ -24,6 +26,11 @@ export type MemoryMetrics = { export type NodeProcessMetrics = { memoryUsage: number; memoryUsagePercent: number; + heapUsed: number; + heapSizeLimit: number; + heapUsagePercent: number; + availableHeap: number; + isNearHeapLimit: boolean; }; export type TargetProcessMetrics = { @@ -47,7 +54,24 @@ export type TargetProcessMetrics = { export type ProcessMetrics = { node: NodeProcessMetrics; - target: TargetProcessMetrics | null; + targetProcess: TargetProcessMetrics | null; +}; + +type GCSummary = { + count: number; + totalDuration: number; // ms + avgDuration: number; // ms + maxDuration: number; // ms + kinds: Record< + string, + { + // breakdown by kind + count: number; + totalDuration: number; + avgDuration: number; + maxDuration: number; + } + >; }; type ProcessInfo = { @@ -69,27 +93,31 @@ export type ResourceMonitorConfig = { dirName?: string; processName?: string; ctx: Context; + compactLogging?: boolean; }; // Constants const DISK_LIMIT_GB = 10; const DISK_LIMIT_BYTES = DISK_LIMIT_GB * 1024 * 1024 * 1024; // 10Gi in bytes -/** - * Utility class for monitoring system resources and process metrics - */ export class ResourceMonitor { private logInterval: NodeJS.Timeout | null = null; private logger: typeof logger; private dirName: string; - private processName: string; + private processName: string | undefined; private ctx: Context; + private verbose: boolean; + private compactLogging: boolean; + private gcObserver: PerformanceObserver | null = null; + private bufferedGcEntries: PerformanceEntry[] = []; constructor(config: ResourceMonitorConfig) { this.logger = logger; this.dirName = config.dirName ?? "/tmp"; - this.processName = config.processName ?? "node"; + this.processName = config.processName; this.ctx = config.ctx; + this.verbose = true; + this.compactLogging = config.compactLogging ?? false; } /** @@ -107,6 +135,12 @@ export class ResourceMonitor { } this.logInterval = setInterval(this.logResources.bind(this), intervalMs); + + this.gcObserver = new PerformanceObserver((list) => { + this.bufferedGcEntries.push(...list.getEntries()); + }); + + this.gcObserver.observe({ entryTypes: ["gc"], buffered: true }); } /** @@ -117,11 +151,16 @@ export class ResourceMonitor { clearInterval(this.logInterval); this.logInterval = null; } + + if (this.gcObserver) { + this.gcObserver.disconnect(); + this.gcObserver = null; + } } private async logResources() { try { - await this.logResourceSnapshot("RESOURCE_MONITOR"); + await this.logResourceSnapshot("ResourceMonitor"); } catch (error) { this.logger.error( `Resource monitoring error: ${error instanceof Error ? error.message : String(error)}` @@ -250,7 +289,11 @@ export class ResourceMonitor { /** * Find PIDs for a process name using /proc filesystem */ - private async findPidsByName(processName: string): Promise { + private async findPidsByName(processName?: string): Promise { + if (!processName) { + return []; + } + try { const pids: number[] = []; const procDirs = await fs.readdir("/proc"); @@ -258,6 +301,13 @@ export class ResourceMonitor { for (const dir of procDirs) { if (!/^\d+$/.test(dir)) continue; + const processPid = parseInt(dir, 10); + + // Ignore processes that have a lower PID than our own PID + if (processPid <= process.pid) { + continue; + } + try { const cmdline = await fs.readFile(`/proc/${dir}/cmdline`, "utf8"); if (cmdline.includes(processName)) { @@ -285,14 +335,20 @@ export class ResourceMonitor { const machineMemoryBytes = this.ctx.machine ? this.ctx.machine.memory * 1024 * 1024 * 1024 : totalMemory; - const nodeMemoryUsage = process.memoryUsage().rss; + const nodeMemoryUsage = process.memoryUsage(); // Node process percentage is based on machine memory if available, otherwise system memory - const nodeMemoryPercent = (nodeMemoryUsage / machineMemoryBytes) * 100; + const nodeMemoryPercent = (nodeMemoryUsage.rss / machineMemoryBytes) * 100; + const heapStats = getHeapStatistics(); const nodeMetrics: NodeProcessMetrics = { - memoryUsage: nodeMemoryUsage, + memoryUsage: nodeMemoryUsage.rss, memoryUsagePercent: nodeMemoryPercent, + heapUsed: nodeMemoryUsage.heapUsed, + heapSizeLimit: heapStats.heap_size_limit, + heapUsagePercent: (heapStats.used_heap_size / heapStats.heap_size_limit) * 100, + availableHeap: heapStats.total_available_size, + isNearHeapLimit: heapStats.used_heap_size / heapStats.heap_size_limit > 0.8, }; let method = "ps"; @@ -310,6 +366,13 @@ export class ResourceMonitor { processes = psOutput .trim() .split("\n") + .filter((line) => { + const parts = line.trim().split(/\s+/); + const pid = parseInt(parts[1], 10); + + // Ignore processes that have a lower PID than our own PID + return pid > process.pid; + }) .map((line) => { const parts = line.trim().split(/\s+/); return { @@ -333,14 +396,16 @@ export class ResourceMonitor { if (processes.length === 0) { return { node: nodeMetrics, - target: { - method, - processName: this.processName, - count: 0, - processes: [], - averages: null, - totals: null, - }, + targetProcess: this.processName + ? { + method, + processName: this.processName, + count: 0, + processes: [], + averages: null, + totals: null, + } + : null, }; } @@ -380,31 +445,35 @@ export class ResourceMonitor { return { node: nodeMetrics, - target: { - method, - processName: this.processName, - count, - processes, - averages, - totals: { - cpu: totals.cpu / cpuNormalizer, - memory: totals.memory, - rss: totals.rss, - vsz: totals.vsz, - }, - }, + targetProcess: this.processName + ? { + method, + processName: this.processName, + count, + processes, + averages, + totals: { + cpu: totals.cpu / cpuNormalizer, + memory: totals.memory, + rss: totals.rss, + vsz: totals.vsz, + }, + } + : null, }; } catch (error) { return { node: nodeMetrics, - target: { - method, - processName: this.processName, - count: 0, - processes: [], - averages: null, - totals: null, - }, + targetProcess: this.processName + ? { + method, + processName: this.processName, + count: 0, + processes: [], + averages: null, + totals: null, + } + : null, }; } } @@ -414,75 +483,285 @@ export class ResourceMonitor { */ async logResourceSnapshot(label = "Resource Snapshot"): Promise { try { - const [systemMetrics, processMetrics] = await Promise.all([ - this.getSystemMetrics(), - this.getProcessMetrics(), - ]); - - const formatBytes = (bytes: number) => (bytes / (1024 * 1024)).toFixed(2); - const formatPercent = (value: number) => value.toFixed(1); - - this.logger.info(label, { - system: { - disk: { - limitGiB: DISK_LIMIT_GB, - dirName: this.dirName, - usedGiB: (systemMetrics.disk.used / (1024 * 1024 * 1024)).toFixed(2), - freeGiB: (systemMetrics.disk.free / (1024 * 1024 * 1024)).toFixed(2), - percentUsed: formatPercent(systemMetrics.disk.percentUsed), - warning: systemMetrics.disk.warning, - }, - memory: { - freeGB: (systemMetrics.memory.free / (1024 * 1024 * 1024)).toFixed(2), - percentUsed: formatPercent(systemMetrics.memory.percentUsed), - }, + const payload = await this.getResourceSnapshotPayload(); + const enhancedLabel = this.compactLogging + ? this.createCompactLabel(payload, label) + : this.createEnhancedLabel(payload, label); + + if (payload.process.node.isNearHeapLimit) { + this.logger.warn(`${enhancedLabel}: Node is near heap limit`, payload); + } else { + this.logger.info(enhancedLabel, payload); + } + } catch (error) { + this.logger.error( + `Error logging resource snapshot: ${error instanceof Error ? error.message : String(error)}` + ); + } + } + + /** + * Create an enhanced log label with key metrics for quick scanning + */ + private createEnhancedLabel(payload: any, baseLabel: string): string { + const parts: string[] = [baseLabel]; + + // System resources with text indicators + const diskPercent = parseFloat(payload.system.disk.percentUsed); + const memoryPercent = parseFloat(payload.system.memory.percentUsed); + const diskIndicator = this.getTextIndicator(diskPercent, 80, 90); + const memIndicator = this.getTextIndicator(memoryPercent, 80, 90); + parts.push(`Disk:${diskPercent.toFixed(1).padStart(5)}%${diskIndicator}`); + parts.push(`Mem:${memoryPercent.toFixed(1).padStart(5)}%${memIndicator}`); + + // Node process metrics with text indicators + const nodeMemPercent = parseFloat(payload.process.node.memoryUsagePercent); + const heapPercent = parseFloat(payload.process.node.heapUsagePercent); + const nodeIndicator = this.getTextIndicator(nodeMemPercent, 70, 85); + const heapIndicator = this.getTextIndicator(heapPercent, 70, 85); + parts.push(`Node:${nodeMemPercent.toFixed(1).padStart(4)}%${nodeIndicator}`); + parts.push(`Heap:${heapPercent.toFixed(1).padStart(4)}%${heapIndicator}`); + + // Target process metrics (if available) + if (payload.process.targetProcess && payload.process.targetProcess.count > 0) { + const targetCpu = payload.process.targetProcess.totals?.cpuPercent || "0"; + const targetMem = payload.process.targetProcess.totals?.memoryPercent || "0"; + const targetCpuNum = parseFloat(targetCpu); + const targetMemNum = parseFloat(targetMem); + const cpuIndicator = this.getTextIndicator(targetCpuNum, 80, 90); + const memIndicator = this.getTextIndicator(targetMemNum, 80, 90); + parts.push( + `${payload.process.targetProcess.processName}:${targetCpu.padStart( + 4 + )}%${cpuIndicator}/${targetMem.padStart(4)}%${memIndicator}` + ); + } + + // GC activity with performance indicators + if (payload.gc && payload.gc.count > 0) { + const avgDuration = payload.gc.avgDuration; + const gcIndicator = this.getTextIndicator(avgDuration, 5, 10, true); + parts.push( + `GC:${payload.gc.count.toString().padStart(2)}(${avgDuration + .toFixed(1) + .padStart(4)}ms)${gcIndicator}` + ); + } + + // Machine constraints + if (payload.constraints) { + parts.push(`[${payload.constraints.cpu}CPU/${payload.constraints.memoryGB}GB]`); + } + + // Warning indicators (only show critical ones in the main label) + const criticalWarnings: string[] = []; + if (payload.process.node.isNearHeapLimit) criticalWarnings.push("HEAP_LIMIT"); + if (diskPercent > 90) criticalWarnings.push("DISK_CRITICAL"); + if (memoryPercent > 95) criticalWarnings.push("MEM_CRITICAL"); + if (payload.system.disk.warning) criticalWarnings.push("DISK_WARN"); + + if (criticalWarnings.length > 0) { + parts.push(`[${criticalWarnings.join(",")}]`); + } + + return parts.join(" | "); + } + + /** + * Get text-based indicator for percentage values + */ + private getTextIndicator( + value: number, + warningThreshold: number, + criticalThreshold: number, + isDuration = false + ): string { + if (isDuration) { + // For duration values, higher is worse + if (value >= criticalThreshold) return " [CRIT]"; + if (value >= warningThreshold) return " [WARN]"; + return " [OK]"; + } else { + // For percentage values, higher is worse + if (value >= criticalThreshold) return " [CRIT]"; + if (value >= warningThreshold) return " [WARN]"; + return " [OK]"; + } + } + + /** + * Create a compact version of the enhanced label for high-frequency logging + */ + private createCompactLabel(payload: any, baseLabel: string): string { + const parts: string[] = [baseLabel]; + + // Only show critical metrics in compact mode + const diskPercent = parseFloat(payload.system.disk.percentUsed); + const memoryPercent = parseFloat(payload.system.memory.percentUsed); + const heapPercent = parseFloat(payload.process.node.heapUsagePercent); + + // Use single character indicators for compactness + const diskIndicator = diskPercent > 90 ? "!" : diskPercent > 80 ? "?" : "."; + const memIndicator = memoryPercent > 95 ? "!" : memoryPercent > 80 ? "?" : "."; + const heapIndicator = heapPercent > 85 ? "!" : heapPercent > 70 ? "?" : "."; + + parts.push(`D:${diskPercent.toFixed(0).padStart(2)}%${diskIndicator}`); + parts.push(`M:${memoryPercent.toFixed(0).padStart(2)}%${memIndicator}`); + parts.push(`H:${heapPercent.toFixed(0).padStart(2)}%${heapIndicator}`); + + // GC activity (only if significant) + if (payload.gc && payload.gc.count > 0 && payload.gc.avgDuration > 2) { + const gcIndicator = + payload.gc.avgDuration > 10 ? "!" : payload.gc.avgDuration > 5 ? "?" : "."; + parts.push(`GC:${payload.gc.count}${gcIndicator}`); + } + + return parts.join(" "); + } + + async getResourceSnapshotPayload() { + const [systemMetrics, processMetrics] = await Promise.all([ + this.getSystemMetrics(), + this.getProcessMetrics(), + ]); + + const gcSummary = summarizeGCEntries(this.bufferedGcEntries); + this.bufferedGcEntries = []; + + const formatBytes = (bytes: number) => (bytes / (1024 * 1024)).toFixed(2); + const formatPercent = (value: number) => value.toFixed(1); + + return { + system: { + disk: { + limitGiB: DISK_LIMIT_GB, + dirName: this.dirName, + usedGiB: (systemMetrics.disk.used / (1024 * 1024 * 1024)).toFixed(2), + freeGiB: (systemMetrics.disk.free / (1024 * 1024 * 1024)).toFixed(2), + percentUsed: formatPercent(systemMetrics.disk.percentUsed), + warning: systemMetrics.disk.warning, }, - constraints: this.ctx.machine - ? { - cpu: this.ctx.machine.cpu, - memoryGB: this.ctx.machine.memory, - diskGB: DISK_LIMIT_BYTES / (1024 * 1024 * 1024), - } - : { - cpu: os.cpus().length, - memoryGB: Math.floor(os.totalmem() / (1024 * 1024 * 1024)), - note: "Using system resources (no machine constraints specified)", - }, - process: { - node: { - memoryUsageMB: formatBytes(processMetrics.node.memoryUsage), - memoryUsagePercent: formatPercent(processMetrics.node.memoryUsagePercent), + memory: { + freeGB: (systemMetrics.memory.free / (1024 * 1024 * 1024)).toFixed(2), + percentUsed: formatPercent(systemMetrics.memory.percentUsed), + }, + }, + gc: gcSummary, + constraints: this.ctx.machine + ? { + cpu: this.ctx.machine.cpu, + memoryGB: this.ctx.machine.memory, + diskGB: DISK_LIMIT_BYTES / (1024 * 1024 * 1024), + } + : { + cpu: os.cpus().length, + memoryGB: Math.floor(os.totalmem() / (1024 * 1024 * 1024)), + note: "Using system resources (no machine constraints specified)", }, - target: processMetrics.target + process: { + node: { + memoryUsageMB: formatBytes(processMetrics.node.memoryUsage), + memoryUsagePercent: formatPercent(processMetrics.node.memoryUsagePercent), + heapUsedMB: formatBytes(processMetrics.node.heapUsed), + heapSizeLimitMB: formatBytes(processMetrics.node.heapSizeLimit), + heapUsagePercent: formatPercent(processMetrics.node.heapUsagePercent), + availableHeapMB: formatBytes(processMetrics.node.availableHeap), + isNearHeapLimit: processMetrics.node.isNearHeapLimit, + ...(this.verbose ? { - method: processMetrics.target.method, - processName: processMetrics.target.processName, - count: processMetrics.target.count, - averages: processMetrics.target.averages - ? { - cpuPercent: formatPercent(processMetrics.target.averages.cpu * 100), - memoryPercent: formatPercent(processMetrics.target.averages.memory), - rssMB: formatBytes(processMetrics.target.averages.rss * 1024), - vszMB: formatBytes(processMetrics.target.averages.vsz * 1024), - } - : null, - totals: processMetrics.target.totals - ? { - cpuPercent: formatPercent(processMetrics.target.totals.cpu * 100), - memoryPercent: formatPercent(processMetrics.target.totals.memory), - rssMB: formatBytes(processMetrics.target.totals.rss * 1024), - vszMB: formatBytes(processMetrics.target.totals.vsz * 1024), - } - : null, + heapStats: getHeapStatistics(), } - : null, + : {}), }, - timestamp: new Date().toISOString(), - }); - } catch (error) { - this.logger.error( - `Error logging resource snapshot: ${error instanceof Error ? error.message : String(error)}` - ); + targetProcess: processMetrics.targetProcess + ? { + method: processMetrics.targetProcess.method, + processName: processMetrics.targetProcess.processName, + count: processMetrics.targetProcess.count, + averages: processMetrics.targetProcess.averages + ? { + cpuPercent: formatPercent(processMetrics.targetProcess.averages.cpu * 100), + memoryPercent: formatPercent(processMetrics.targetProcess.averages.memory), + rssMB: formatBytes(processMetrics.targetProcess.averages.rss * 1024), + vszMB: formatBytes(processMetrics.targetProcess.averages.vsz * 1024), + } + : null, + totals: processMetrics.targetProcess.totals + ? { + cpuPercent: formatPercent(processMetrics.targetProcess.totals.cpu * 100), + memoryPercent: formatPercent(processMetrics.targetProcess.totals.memory), + rssMB: formatBytes(processMetrics.targetProcess.totals.rss * 1024), + vszMB: formatBytes(processMetrics.targetProcess.totals.vsz * 1024), + } + : null, + } + : null, + }, + timestamp: new Date().toISOString(), + }; + } +} + +function summarizeGCEntries(entries: PerformanceEntry[]): GCSummary { + if (entries.length === 0) { + return { + count: 0, + totalDuration: 0, + avgDuration: 0, + maxDuration: 0, + kinds: {}, + }; + } + + let totalDuration = 0; + let maxDuration = 0; + const kinds: Record = {}; + + for (const e of entries) { + const duration = e.duration; + totalDuration += duration; + if (duration > maxDuration) maxDuration = duration; + + const kind = kindName((e as any)?.detail?.kind ?? "unknown"); + if (!kinds[kind]) { + kinds[kind] = { count: 0, totalDuration: 0, maxDuration: 0 }; } + kinds[kind].count += 1; + kinds[kind].totalDuration += duration; + if (duration > kinds[kind].maxDuration) kinds[kind].maxDuration = duration; } + + // finalize averages + const avgDuration = totalDuration / entries.length; + const kindsWithAvg: GCSummary["kinds"] = {}; + for (const [kind, stats] of Object.entries(kinds)) { + kindsWithAvg[kind] = { + count: stats.count, + totalDuration: stats.totalDuration, + avgDuration: stats.totalDuration / stats.count, + maxDuration: stats.maxDuration, + }; + } + + return { + count: entries.length, + totalDuration, + avgDuration, + maxDuration, + kinds: kindsWithAvg, + }; } + +const kindName = (k: number | string) => { + if (typeof k === "number") { + return ( + { + [constants.NODE_PERFORMANCE_GC_MAJOR]: "major", + [constants.NODE_PERFORMANCE_GC_MINOR]: "minor", + [constants.NODE_PERFORMANCE_GC_INCREMENTAL]: "incremental", + [constants.NODE_PERFORMANCE_GC_WEAKCB]: "weak-cb", + }[k] ?? `kind:${k}` + ); + } + return k; +}; diff --git a/references/hello-world/src/trigger/cpuHeavy.ts b/references/hello-world/src/trigger/cpuHeavy.ts new file mode 100644 index 0000000000..22b9bb547f --- /dev/null +++ b/references/hello-world/src/trigger/cpuHeavy.ts @@ -0,0 +1,32 @@ +import { task, heartbeats } from "@trigger.dev/sdk"; +import { setTimeout } from "timers/promises"; + +export const cpuHeavyTask = task({ + id: "cpu-heavy-task", + machine: "small-1x", + run: async ( + { + durationInMs = 1000, + yieldToHeartbeats = false, + }: { durationInMs: number; yieldToHeartbeats: boolean }, + { ctx } + ) => { + console.log("🧠 Starting CPU-heavy work"); + + // await setTimeout(durationInMs); + + await simulateCpuHeavyWork(durationInMs, yieldToHeartbeats); + + console.log("🧠 CPU-heavy work completed"); + }, +}); + +async function simulateCpuHeavyWork(durationInMs: number, yieldToHeartbeats: boolean) { + const start = Date.now(); + while (Date.now() - start < durationInMs) { + // Simulate 1 second of CPU-intensive work + if (yieldToHeartbeats) { + await heartbeats.yield(); + } + } +}