|
| 1 | +# Signal Extraction Layer |
| 2 | + |
| 3 | +This document specifies the extraction layer that converts raw CI data (workflows/jobs/tests) into Signal objects used by the Signal logic (`signal.py`). |
| 4 | + |
| 5 | +## Overview |
| 6 | + |
| 7 | +Two parallel tracks produce Signals: |
| 8 | +- Test‑track Signals: per‑test identities (file + test name) across commits. |
| 9 | +- Non‑test Signals: normalized job base names across commits (grouping shards). |
| 10 | + |
| 11 | +Extraction runs in two phases: |
| 12 | +- Phase A (jobs fetch): fetch recent commits and all workflow jobs (including restarts and retries) for target workflows. |
| 13 | +- Phase B (test details fetch): select relevant jobs for the test‑track in Python and batch query test artifacts for just those jobs. |
| 14 | + |
| 15 | +The output is a list of `Signal` instances, each with commits (newest → older) and time‑ordered `SignalEvent`s per commit. |
| 16 | + |
| 17 | +## Principles |
| 18 | + |
| 19 | +- Pure data mapping: no pattern logic; just construct Signals from source rows. |
| 20 | +- Prefer simple, batched queries aligned to ClickHouse primary keys. |
| 21 | +- Emit multiple events per commit when meaningful (different runs, retries, shards). |
| 22 | +- Reuse existing types and helpers where possible (`CommitJobs`, `JobResult`, `normalize_job_name`). |
| 23 | +- Keep the module self‑contained and easy to unit‑test. |
| 24 | + |
| 25 | +## Inputs and Windowing |
| 26 | + |
| 27 | +- Workflows of interest: e.g., `['trunk', 'pull', 'rocm-mi300', ...]` (configurable). |
| 28 | +- Time window: 16–32h (configurable). Window is short to keep flakiness assumptions stable and long enough to include restarts. |
| 29 | +- Commits considered: pushes to `refs/heads/main` within the window (deduplicated per head_sha). |
| 30 | + |
| 31 | +## Phase A — Jobs Fetch (single query) |
| 32 | + |
| 33 | +Fetch all workflow jobs (original, retries, and restarts) for the commits in the window. |
| 34 | + |
| 35 | +- Include both original runs and restarts (re‑runs via the UI or API). |
| 36 | +- Join to pushes to scope to recent main commits and order newest → older. |
| 37 | +- Select minimal fields used by the extractor: |
| 38 | + - commit/run/job identity: `head_sha, workflow_name, id AS job_id, run_id (aka wf_run_id), run_attempt` |
| 39 | + - names/time: `name, started_at, created_at, status, conclusion` |
| 40 | + - classification shortcut: `tupleElement(torchci_classification_kg,'rule') AS rule` |
| 41 | + |
| 42 | +Notes |
| 43 | +- This preserves all runs (original + restarts) and per‑run attempts (`run_attempt`). |
| 44 | +- Job retries typically show up as separate job rows; names may include `Attempt #2` and have later `started_at`. |
| 45 | + |
| 46 | +## Phase B — Test Details Fetch (batched, from `default.test_run_s3`) |
| 47 | + |
| 48 | +Decide in Python which jobs belong to the test‑track (e.g., `rule IN ('pytest failure','Python unittest failure')`. For those (job_id, run_id[, run_attempt]) triples, fetch per‑test rows directly from `default.test_run_s3` — this table contains one row per testcase, including successful ones (failure_count=0, error_count=0). |
| 49 | + |
| 50 | +Why `test_run_s3` only? |
| 51 | +- We need per‑test identities to build per‑test Signals; `default.test_run_s3` has them. Summary is optional and redundant for this layer. |
| 52 | +- Performance remains good by filtering on `job_id IN (...)` (first PK column) and grouping; limit to the time window implicitly via the selected job set from Phase A. |
| 53 | + |
| 54 | +Job selection for test track: |
| 55 | +- Step 1: find normalized job base names that exhibited a test‑related classification in any commit within the window. |
| 56 | +- Step 2: include ALL jobs across ALL commits whose normalized base is in that set (original runs, restarts; any run_id/attempt) so we can observe successes or pendings for the same test on other commits. |
| 57 | + |
| 58 | +Optimized batched test_run_s3 query (for N job_ids): |
| 59 | + |
| 60 | +``` |
| 61 | +SELECT job_id, workflow_id, workflow_run_attempt, file, classname, name, |
| 62 | + max(failure_count > 0) AS failing, |
| 63 | + max(error_count > 0) AS errored, |
| 64 | + max(rerun_count > 0) AS rerun_seen, |
| 65 | + count() AS rows |
| 66 | +FROM default.test_run_s3 |
| 67 | +WHERE job_id IN {job_ids:Array(Int64)} |
| 68 | +GROUP BY job_id, workflow_id, workflow_run_attempt, file, classname, name |
| 69 | +``` |
| 70 | + |
| 71 | +Notes |
| 72 | +- Use `job_id IN (...)` to leverage the PK prefix `(job_id, name, classname, invoking_file, file)`. |
| 73 | +- We keep `workflow_run_attempt` to distinguish attempts within the same workflow run. |
| 74 | + |
| 75 | +## Mapping to Signals |
| 76 | + |
| 77 | +### Common conventions |
| 78 | +- Commits are ordered newest → older using the push timestamp (`push_dedup.ts`). |
| 79 | +- Each Signal carries `workflow_name` and a stable `key`: |
| 80 | + - Test‑track: `key = file + '::' + name` (optionally include `classname`). |
| 81 | + - Non‑test: `key = normalize_job_name(job_name)` (reuse `CommitJobs.normalize_job_name`). |
| 82 | +- Each commit holds a list of `SignalEvent`s (time‑ordered by `started_at`). |
| 83 | + Ordering: dicts in Python 3.7+ preserve insertion order. Phase A inserts commit keys in push‑timestamp DESC order, so iterating the mapping yields newest→older commits without extra sorting. |
| 84 | + |
| 85 | +Event naming (for debuggability): |
| 86 | +- Consistent key=value format: `wf=<workflow> kind=<test|job> id=<test_id|job_base> run=<wf_run_id> attempt=<run_attempt>` |
| 87 | +- Examples: |
| 88 | + - Test event: `wf=trunk kind=test id=inductor/test_foo.py::test_bar run=1744 attempt=1` |
| 89 | + - Job event: `wf=trunk kind=job id=linux-jammy-cuda12.8-py3.10-gcc11 / test run=1744 attempt=2` |
| 90 | + |
| 91 | +### Test‑track mapping |
| 92 | +- Build a per‑commit map `test_id -> list[SignalEvent]` by combining all relevant jobs and shards: |
| 93 | + - For each (wf_run_id, run_attempt, job_base_name) group in the commit, consult `test_run_s3` rows (if any) for each candidate `test_id`: |
| 94 | + - If `test_run_s3` rows exist for this `test_id` → status should reflect the found test verdict. |
| 95 | + - If no `test_run_s3` rows exist and the group is still running (some jobs pending) → status = PENDING. |
| 96 | + - Else (no rows and group completed) → missing/unknown (no event emitted). |
| 97 | + - Event boundaries (naturally arise from grouping): |
| 98 | + - Separate events for distinct workflow runs (different `wf_run_id`) on the same commit (regardless of how they were triggered). |
| 99 | + - Within the same run, separate events for retries via `run_attempt` (name hints like "Attempt #2" are not relied upon). |
| 100 | + |
| 101 | +### Non‑test mapping |
| 102 | +- Similar to test‑track but grouping is coarser (by normalized job base name): |
| 103 | +- For each (run_id, run_attempt, job_base_name) group in the commit |
| 104 | + - Within each group compute event status: |
| 105 | + - FAILURE if any row concluded failure. |
| 106 | + - SUCCESS if all rows concluded success. |
| 107 | + - PENDING otherwise (some rows pending, none failed). |
| 108 | + - Emit one event per group. The Signal model supports multiple events per commit. |
| 109 | + - Results include Signals for (wf, job_base_name) that have at least one FAILURE across commits. |
| 110 | + - Determine status for each event: |
| 111 | + - FAILURE if `conclusion_kg = 'failure'` or `conclusion='failure'` with `status='completed'`. |
| 112 | + - SUCCESS if `conclusion='success'` with `status='completed'`. |
| 113 | + - PENDING if `status != 'completed'` or `conclusion=''` (keep‑going/pending). |
| 114 | + - Event boundaries (naturally arise from grouping): |
| 115 | + - Separate events for distinct workflow runs on the same commit (different `run_id`; trigger type is irrelevant). |
| 116 | + - Separate events for retries using `run_attempt` within the same run (no string parsing of job names). |
| 117 | + - Separate events for different normalized job base names. |
| 118 | + |
| 119 | +Example (same commit & workflow): |
| 120 | + |
| 121 | +- wf1 has: `jobX_(shard1, attempt1)`, `jobX_(shard1, attempt2)`, `jobX_(shard2, attempt1)` |
| 122 | +- wf2 (retry) has: `jobX_(shard1, attempt1)` |
| 123 | + |
| 124 | +Aggregation by normalized base `jobX`: |
| 125 | +- event1: group (wf1, attempt1) → rows: `[wf1:jobX_(shard1, attempt1), wf1:jobX_(shard2, attempt1)]` |
| 126 | +- event2: group (wf1, attempt2) → rows: `[wf1:jobX_(shard1, attempt2)]` |
| 127 | +- event3: group (wf2, attempt1) → rows: `[wf2:jobX_(shard1, attempt1)]` |
| 128 | + |
| 129 | +## Module Structure |
| 130 | + |
| 131 | +Create `pytorch_auto_revert/signal_extraction.py` with: |
| 132 | +- `class SignalExtractor` (entry point) |
| 133 | + - `extract(workflow_names: list[str], hours: int) -> list[Signal]` |
| 134 | + - Internals: |
| 135 | + - `_fetch_commits_and_jobs(...) -> list[Commit]` (see data structures below) |
| 136 | + - `_select_test_track_job_ids(commits) -> (job_ids: List[int], bases_to_track: Set[JobBaseNameKey])` |
| 137 | + - `_fetch_tests_for_jobs(job_ids) -> List[TestRow]` (s3 only) |
| 138 | + - `_build_test_signals(commits, test_rows, bases_to_track) -> list[Signal]` |
| 139 | + - `_build_job_signals(commits) -> list[Signal]` |
| 140 | +- Keep logic small and pure; avoid side effects. |
| 141 | + |
| 142 | +Notes |
| 143 | +- Reuse `normalize_job_name` from `CommitJobs` for non‑test keys. |
| 144 | +- For minimal coupling, do not import the existing autorevert pattern logic here. |
| 145 | +- Prefer dataclass‑like simple structures for intermediate maps (dicts/tuples). |
| 146 | + |
| 147 | +### Indexing & Data Structures |
| 148 | + |
| 149 | +- Strongly-typed ids for clarity (type-checker only), like: |
| 150 | + - `WfRunId = NewType('WfRunId', int)` |
| 151 | + - `RunAttempt = NewType('RunAttempt', int)` |
| 152 | + These are used in the code for readability and to reduce keying mistakes. |
| 153 | + |
| 154 | +## Implementation Plan |
| 155 | + |
| 156 | +1) Add `signal_extraction.py` with `SignalExtractor` shell and clear method stubs. Keep types simple. |
| 157 | +2) Implement Phase A query in a helper (reuse CHCliFactory). Unit test: query builder emits expected SQL filters. |
| 158 | +3) Implement selectors for test‑track pairs (Python filter on `rule`). |
| 159 | +4) Implement batched Phase B queries: |
| 160 | + - Use `(workflow_id, job_id) IN array(tuple(...))` to leverage PK prefixes. |
| 161 | + - call `test_run_s3` to enumerate failing tests |
| 162 | +5) Implement mapping to Signals for both tracks, emitting multiple events per commit as specified. |
| 163 | +6) Add unit tests: |
| 164 | + - Test‑track: a) failure on one commit; b) success on another; c) unknown/gap. |
| 165 | + - Non‑test: separate events for main vs restart and for `Attempt #2` retries. |
| 166 | +7) Wire optional extraction invocation from the CLI/tester layer (behind a flag) without touching Signal’s pattern logic. |
| 167 | + |
| 168 | +## Performance Notes |
| 169 | + |
| 170 | +- Keep the window small (16–32h) and deduplicate commits via push timestamps. |
| 171 | +- Limit the batched pairs size; chunk when necessary. |
| 172 | +- Align filters with primary keys: `job_id` for `test_run_s3`. |
| 173 | +- Avoid scanning all of `workflow_job` by joining to recent pushes and filtering repo/branches. |
| 174 | + |
| 175 | +## Open Questions |
| 176 | + |
| 177 | +- Exact classification list for “test failure” track (start with `pytest failure`, `Python unittest failure`). |
| 178 | +- Whether to include tests that succeeded but were present (for stronger success evidence) vs only failing tests. |
| 179 | + - A: if a test failure is observed on any commit, that test status is extracted from all commits (success/failure/pending). |
| 180 | +- How to surface shard boundaries for test‑track Signals (usually we just OR across shards at status level). |
| 181 | + - A: for test track shard boundaries are irrelevant: |
| 182 | + - when test outcome was recorded, it is extracted as an Event (regardless of shard) |
| 183 | + - when no outcome was recorded, all shards with the job base name are considered: |
| 184 | + - when any shard is still running → PENDING |
| 185 | + - when all shards completed → no event (unknown) |
| 186 | +- Whether to treat known infra classifications as gaps vs ignored (policy TBD). |
0 commit comments