|
| 1 | +import logging |
| 2 | +import math |
| 3 | +from typing import Any, Dict, List, Literal, Optional, Tuple, TypedDict |
| 4 | +import statistics |
| 5 | +from dateutil.parser import isoparse |
| 6 | +from common.config_model import BenchmarkConfig, RegressionPolicy |
| 7 | +from common.benchmark_time_series_api_model import ( |
| 8 | + BenchmarkTimeSeriesApiData, |
| 9 | + BenchmarkTimeSeriesItem, |
| 10 | +) |
| 11 | +import pprint |
| 12 | + |
| 13 | +logger = logging.getLogger() |
| 14 | + |
| 15 | +RegressionClassifyLabel = Literal[ |
| 16 | + "regression", "suspicious", "no_regression", "insufficient_data" |
| 17 | +] |
| 18 | + |
| 19 | + |
| 20 | +class BaselineItem(TypedDict): |
| 21 | + group_info: Dict[str, Any] |
| 22 | + value: float |
| 23 | + |
| 24 | + |
| 25 | +class BenchmarkValueItem(TypedDict): |
| 26 | + group_info: Dict[str, Any] |
| 27 | + values: List[Dict[str, Any]] |
| 28 | + |
| 29 | + |
| 30 | +class PerGroupResult(TypedDict, total=True): |
| 31 | + group_info: Dict[str, Any] |
| 32 | + baseline: Optional[float] |
| 33 | + points: List[Any] |
| 34 | + label: RegressionClassifyLabel |
| 35 | + policy: Optional["RegressionPolicy"] |
| 36 | + |
| 37 | + |
| 38 | +def percentile(values: list[float], q: float): |
| 39 | + v = sorted(values) |
| 40 | + k = (len(v) - 1) * q |
| 41 | + f = math.floor(k) |
| 42 | + c = math.ceil(k) |
| 43 | + if f == c: |
| 44 | + return v[int(k)] |
| 45 | + return v[f] + (v[c] - v[f]) * (k - f) |
| 46 | + |
| 47 | + |
| 48 | +class BenchmarkRegressionReportGenerator: |
| 49 | + def __init__( |
| 50 | + self, |
| 51 | + config: BenchmarkConfig, |
| 52 | + latest_ts: BenchmarkTimeSeriesApiData, |
| 53 | + baseline_ts: BenchmarkTimeSeriesApiData, |
| 54 | + ) -> None: |
| 55 | + self.metric_policies = config.policy.metrics |
| 56 | + self.latest_ts = self._to_data_map(latest_ts) |
| 57 | + self.baseline_raw = self._to_data_map(baseline_ts) |
| 58 | + |
| 59 | + def generate(self) -> Tuple[List[PerGroupResult], bool]: |
| 60 | + return self.detect_regressions_with_policies( |
| 61 | + self.baseline_raw, |
| 62 | + self.latest_ts, |
| 63 | + metric_policies=self.metric_policies, |
| 64 | + ) |
| 65 | + |
| 66 | + def detect_regressions_with_policies( |
| 67 | + self, |
| 68 | + baseline_map: Dict[tuple, BenchmarkValueItem], |
| 69 | + dp_map: Dict[tuple, BenchmarkValueItem], |
| 70 | + *, |
| 71 | + metric_policies: Dict[str, RegressionPolicy], |
| 72 | + min_points: int = 2, |
| 73 | + ) -> Tuple[List[PerGroupResult], bool]: |
| 74 | + """ |
| 75 | + For each group: |
| 76 | + - choose policy by group_info['metric'] |
| 77 | + - compute flags via policy.is_violation(value, baseline) |
| 78 | + - classify with classify_flags |
| 79 | + Returns a list of {group_info, baseline, values, flags, label, policy} |
| 80 | + """ |
| 81 | + results: List[PerGroupResult] = [] |
| 82 | + |
| 83 | + is_any_regression = False |
| 84 | + |
| 85 | + for key in sorted(dp_map.keys()): |
| 86 | + cur_item = dp_map.get(key) |
| 87 | + gi = cur_item["group_info"] if cur_item else {} |
| 88 | + points: List[Any] = cur_item["values"] if cur_item else [] |
| 89 | + |
| 90 | + base_item = baseline_map.get(key) |
| 91 | + if not base_item: |
| 92 | + logger.warning("Skip. No baseline item found for %s", gi) |
| 93 | + results.append( |
| 94 | + PerGroupResult( |
| 95 | + group_info=gi, |
| 96 | + baseline=None, |
| 97 | + points=[], |
| 98 | + label="insufficient_data", |
| 99 | + policy=None, |
| 100 | + ) |
| 101 | + ) |
| 102 | + continue |
| 103 | + policy = self._resolve_policy(metric_policies, gi.get("metric", "")) |
| 104 | + if not policy: |
| 105 | + logger.warning("No policy for %s", gi) |
| 106 | + results.append( |
| 107 | + PerGroupResult( |
| 108 | + group_info=gi, |
| 109 | + baseline=None, |
| 110 | + points=[], |
| 111 | + label="insufficient_data", |
| 112 | + policy=None, |
| 113 | + ) |
| 114 | + ) |
| 115 | + continue |
| 116 | + |
| 117 | + baseline_aggre_mode = policy.baseline_aggregation |
| 118 | + baseline_value = self._get_baseline(base_item,baseline_aggre_mode) |
| 119 | + if baseline_value is None or len(points) == 0: |
| 120 | + logger.warning("baseline_value is %s, len(points) == %s", baseline_value,len(points)) |
| 121 | + results.append( |
| 122 | + PerGroupResult( |
| 123 | + group_info=gi, |
| 124 | + baseline=None, |
| 125 | + points=[], |
| 126 | + label="insufficient_data", |
| 127 | + policy=policy, |
| 128 | + ) |
| 129 | + ) |
| 130 | + continue |
| 131 | + |
| 132 | + # Per-point violations (True = regression) |
| 133 | + flags: List[bool] = [ |
| 134 | + policy.is_violation(p["value"], baseline_value["value"]) for p in points |
| 135 | + ] |
| 136 | + label = self.classify_flags(flags, min_points=min_points) |
| 137 | + |
| 138 | + enriched_points = [{**p, "flag": f} for p, f in zip(points, flags)] |
| 139 | + results.append( |
| 140 | + PerGroupResult( |
| 141 | + group_info=gi, |
| 142 | + baseline= baseline_value["value"], |
| 143 | + points=enriched_points, |
| 144 | + label=label, |
| 145 | + policy=policy, |
| 146 | + ) |
| 147 | + ) |
| 148 | + if label == "regression": |
| 149 | + is_any_regression = True |
| 150 | + return results, is_any_regression |
| 151 | + |
| 152 | + def _to_data_map( |
| 153 | + self, data: "BenchmarkTimeSeriesApiData", field: str = "value" |
| 154 | + ) -> Dict[tuple, BenchmarkValueItem]: |
| 155 | + result: Dict[tuple, BenchmarkValueItem] = {} |
| 156 | + for ts_group in data.time_series: |
| 157 | + group_keys = tuple(sorted(ts_group.group_info.items())) |
| 158 | + points: List[Dict[str, Any]] = [] |
| 159 | + for d in sorted( |
| 160 | + ts_group.data, key=lambda d: isoparse(d["granularity_bucket"]) |
| 161 | + ): |
| 162 | + if field not in d: |
| 163 | + continue |
| 164 | + points.append( |
| 165 | + { |
| 166 | + "value": float(d[field]), |
| 167 | + "commit": d.get("commit"), |
| 168 | + "branch": d.get("branch"), |
| 169 | + "timestamp": isoparse(d["granularity_bucket"]), |
| 170 | + } |
| 171 | + ) |
| 172 | + result[group_keys] = { |
| 173 | + "group_info": ts_group.group_info, |
| 174 | + "values": points, |
| 175 | + } |
| 176 | + return result |
| 177 | + |
| 178 | + def _get_baseline( |
| 179 | + self, |
| 180 | + data: BenchmarkValueItem, |
| 181 | + mode: str = "mean", |
| 182 | + field: str = "value", |
| 183 | + ) -> Optional[BaselineItem]: |
| 184 | + values = [float(d[field]) for d in data["values"] if field in d] |
| 185 | + if not values: |
| 186 | + return None |
| 187 | + |
| 188 | + if mode == "mean": |
| 189 | + val = statistics.fmean(values) |
| 190 | + elif mode == "p90": |
| 191 | + val = percentile(values, 0.9) |
| 192 | + elif mode == "max": |
| 193 | + val = max(values) |
| 194 | + elif mode == "min": |
| 195 | + val = min(values) |
| 196 | + elif mode == "latest": |
| 197 | + val = values[-1] |
| 198 | + elif mode == "earliest": |
| 199 | + val = values[0] |
| 200 | + elif mode == "p50": |
| 201 | + val = percentile(values, 0.5) |
| 202 | + elif mode == "p95": |
| 203 | + val = percentile(values, 0.95) |
| 204 | + else: |
| 205 | + logger.warning("Unknown mode: %s", mode) |
| 206 | + return None |
| 207 | + result:BaselineItem = { |
| 208 | + "group_info": data["group_info"], |
| 209 | + "value": val, |
| 210 | + } |
| 211 | + return result |
| 212 | + |
| 213 | + def classify_flags( |
| 214 | + self, flags: list[bool], min_points: int = 3 |
| 215 | + ) -> RegressionClassifyLabel: |
| 216 | + """ |
| 217 | + Classify a sequence of boolean flags to detect regression. |
| 218 | +
|
| 219 | + - regression: last run has >= 2 consecutive True values |
| 220 | + - suspicious: there is a run of >= 3 consecutive True values, but not at the end |
| 221 | + - no_regression: all other cases |
| 222 | + - insufficient_data: not enough data points (< min_points) |
| 223 | +
|
| 224 | + Special case: |
| 225 | + - If min_points == 1, then just look at the last flag: |
| 226 | + True -> regression |
| 227 | + False -> no_regression |
| 228 | + """ |
| 229 | + n = len(flags) |
| 230 | + if n == 0: |
| 231 | + return "insufficient_data" |
| 232 | + |
| 233 | + if min_points == 1: |
| 234 | + return "regression" if flags[-1] else "no_regression" |
| 235 | + |
| 236 | + if n < min_points: |
| 237 | + return "insufficient_data" |
| 238 | + |
| 239 | + # trailing run length |
| 240 | + t = 0 |
| 241 | + for v in reversed(flags): |
| 242 | + if v: |
| 243 | + t += 1 |
| 244 | + else: |
| 245 | + break |
| 246 | + if t >= 2: |
| 247 | + return "regression" |
| 248 | + |
| 249 | + # longest run anywhere |
| 250 | + longest = cur = 0 |
| 251 | + for v in flags: |
| 252 | + cur = cur + 1 if v else 0 |
| 253 | + longest = max(longest, cur) |
| 254 | + |
| 255 | + if longest >= 3: |
| 256 | + return "suspicious" |
| 257 | + |
| 258 | + return "no_regression" |
| 259 | + |
| 260 | + def _resolve_policy( |
| 261 | + self, |
| 262 | + metric_policies: Dict[str, RegressionPolicy], |
| 263 | + metric: str, |
| 264 | + ) -> Optional[RegressionPolicy]: |
| 265 | + if not metric: |
| 266 | + return None |
| 267 | + m = metric.lower() |
| 268 | + return metric_policies.get(m) |
0 commit comments