From dd1dc3983d05943705bddf363c1aa1194ae8d498 Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Wed, 3 Sep 2025 20:50:29 -0700 Subject: [PATCH 1/9] Update (base update) [ghstack-poisoned] --- .../common/benchmark_time_series_api_model.py | 64 +++ .../common/config.py | 94 ++++ .../common/config_model.py | 194 +++++++ .../common/regression_utils.py | 296 +++++++++++ .../lambda_function.py | 479 ++++++++++++++++++ .../requirements.txt | 6 + .../schema.sql | 31 ++ 7 files changed, 1164 insertions(+) create mode 100644 aws/lambda/benchmark_regression_summary_report/common/benchmark_time_series_api_model.py create mode 100644 aws/lambda/benchmark_regression_summary_report/common/config.py create mode 100644 aws/lambda/benchmark_regression_summary_report/common/config_model.py create mode 100644 aws/lambda/benchmark_regression_summary_report/common/regression_utils.py create mode 100644 aws/lambda/benchmark_regression_summary_report/lambda_function.py create mode 100644 aws/lambda/benchmark_regression_summary_report/requirements.txt create mode 100644 clickhouse_db_schema/benchmark_regression_summary_report/schema.sql diff --git a/aws/lambda/benchmark_regression_summary_report/common/benchmark_time_series_api_model.py b/aws/lambda/benchmark_regression_summary_report/common/benchmark_time_series_api_model.py new file mode 100644 index 0000000000..fe7705a6ea --- /dev/null +++ b/aws/lambda/benchmark_regression_summary_report/common/benchmark_time_series_api_model.py @@ -0,0 +1,64 @@ +from dataclasses import dataclass, field +from typing import Any, Dict, List + +import requests + + +# The data class to provide api response model from get_time_series api + + +@dataclass +class TimeRange: + start: str + end: str + + +@dataclass +class BenchmarkTimeSeriesItem: + group_info: Dict[str, Any] + num_of_dp: int + data: List[Dict[str, Any]] = field(default_factory=list) + + +@dataclass +class BenchmarkTimeSeriesApiData: + time_series: List[BenchmarkTimeSeriesItem] + time_range: TimeRange + + +@dataclass +class BenchmarkTimeSeriesApiResponse: + data: BenchmarkTimeSeriesApiData + + @classmethod + def from_request( + cls, url: str, query: dict, timeout: int = 180 + ) -> "BenchmarkTimeSeriesApiResponse": + """ + Send a POST request and parse into BenchmarkTimeSeriesApiResponse. + + Args: + url: API endpoint + query: JSON payload must + timeout: max seconds to wait for connect + response (default: 30) + Returns: + ApiResponse + Raises: + requests.exceptions.RequestException if network/timeout/HTTP error + RuntimeError if the API returns an "error" field or malformed data + """ + resp = requests.post(url, json=query, timeout=timeout) + resp.raise_for_status() + payload = resp.json() + + if "error" in payload: + raise RuntimeError(f"API error: {payload['error']}") + try: + tr = TimeRange(**payload["data"]["time_range"]) + ts = [ + BenchmarkTimeSeriesItem(**item) + for item in payload["data"]["time_series"] + ] + except Exception as e: + raise RuntimeError(f"Malformed API payload: {e}") + return cls(data=BenchmarkTimeSeriesApiData(time_series=ts, time_range=tr)) diff --git a/aws/lambda/benchmark_regression_summary_report/common/config.py b/aws/lambda/benchmark_regression_summary_report/common/config.py new file mode 100644 index 0000000000..ef0586758f --- /dev/null +++ b/aws/lambda/benchmark_regression_summary_report/common/config.py @@ -0,0 +1,94 @@ +from common.config_model import ( + BenchmarkApiSource, + BenchmarkConfig, + BenchmarkRegressionConfigBook, + DayRangeWindow, + Frequency, + Policy, + RangeConfig, + RegressionPolicy, +) + + +# Compiler benchmark regression config +# todo(elainewy): eventually each team should configure +# their own benchmark regression config, currenlty place +# here for lambda + + +COMPILER_BENCHMARK_CONFIG = BenchmarkConfig( + name="Compiler Benchmark Regression", + id="compiler_regression", + source=BenchmarkApiSource( + api_query_url="https://hud.pytorch.org/api/benchmark/get_time_series", + type="benchmark_time_series_api", + # currently we only detect the regression for h100 with dtype bfloat16, and mode inference + # we can extend this to other devices, dtypes and mode in the future + api_endpoint_params_template=""" + { + "name": "compiler_precompute", + "query_params": { + "commits": [], + "compilers": [], + "arch": "h100", + "device": "cuda", + "dtype": "bfloat16", + "granularity": "hour", + "mode": "inference", + "startTime": "{{ startTime }}", + "stopTime": "{{ stopTime }}", + "suites": ["torchbench", "huggingface", "timm_models"], + "workflowId": 0, + "branches": ["main"] + } + } + """, + ), + # set baseline from past 7 days using avg, and compare with the last 1 day + policy=Policy( + frequency=Frequency(value=1, unit="days"), + range=RangeConfig( + baseline=DayRangeWindow(value=7), + comparison=DayRangeWindow(value=2), + ), + metrics={ + "passrate": RegressionPolicy( + name="passrate", + condition="greater_equal", + threshold=0.9, + baseline_aggregation="max", + ), + "geomean": RegressionPolicy( + name="geomean", + condition="greater_equal", + threshold=0.95, + baseline_aggregation="max", + ), + "compression_ratio": RegressionPolicy( + name="compression_ratio", + condition="greater_equal", + threshold=0.9, + baseline_aggregation="max", + ), + }, + notification_config={ + "type": "github", + "repo": "pytorch/test-infra", + "issue": "7081", + }, + ), +) + +BENCHMARK_REGRESSION_CONFIG = BenchmarkRegressionConfigBook( + configs={ + "compiler_regression": COMPILER_BENCHMARK_CONFIG, + } +) + + +def get_benchmark_regression_config(config_id: str) -> BenchmarkConfig: + """Get benchmark regression config by config id""" + try: + return BENCHMARK_REGRESSION_CONFIG[config_id] + except KeyError: + raise ValueError(f"Invalid config id: {config_id}") diff --git a/aws/lambda/benchmark_regression_summary_report/common/config_model.py b/aws/lambda/benchmark_regression_summary_report/common/config_model.py new file mode 100644 index 0000000000..7779f17f2d --- /dev/null +++ b/aws/lambda/benchmark_regression_summary_report/common/config_model.py @@ -0,0 +1,194 @@ +from __future__ import annotations + +import json +from dataclasses import dataclass, field +from datetime import timedelta +from typing import Any, Dict, Literal, Optional + +from jinja2 import Environment, meta, Template + + +# -------- Frequency -------- +@dataclass(frozen=True) +class Frequency: + """ + The frequency of how often the report should be generated. + The minimum frequency we support is 1 day. + Attributes: + value: Number of units (e.g., 7 for 7 days). + unit: Unit of time, either "days" or "weeks". + + Methods: + to_timedelta: Convert frequency into a datetime.timedelta. + get_text: return the frequency in text format + """ + + value: int + unit: Literal["days", "weeks"] + + def to_timedelta(self) -> timedelta: + """Convert frequency N days or M weeks into a datetime.timedelta.""" + if self.unit == "days": + return timedelta(days=self.value) + elif self.unit == "weeks": + return timedelta(weeks=self.value) + else: + raise ValueError(f"Unsupported unit: {self.unit}") + + def get_text(self): + return f"{self.value} {self.unit}" + + +# -------- Source -------- +_JINJA_ENV = Environment(autoescape=False) + + +@dataclass +class BenchmarkApiSource: + """ + Defines the source of the benchmark data we want to query + api_query_url: the url of the api to query + api_endpoint_params_template: the jinjia2 template of the api endpoint's query params + default_ctx: the default context to use when rendering the api_endpoint_params_template + """ + + api_query_url: str + api_endpoint_params_template: str + type: Literal["benchmark_time_series_api", "other"] = "benchmark_time_series_api" + default_ctx: Dict[str, Any] = field(default_factory=dict) + + def required_template_vars(self) -> set[str]: + ast = _JINJA_ENV.parse(self.api_endpoint_params_template) + return set(meta.find_undeclared_variables(ast)) + + def render(self, ctx: Dict[str, Any], strict: bool = True) -> dict: + """Render with caller-supplied context (no special casing for start/end).""" + merged = {**self.default_ctx, **ctx} + + if strict: + required = self.required_template_vars() + missing = required - merged.keys() + if missing: + raise ValueError(f"Missing required vars: {missing}") + rendered = Template(self.api_endpoint_params_template).render(**merged) + return json.loads(rendered) + + +# -------- Policy: range windows -------- +@dataclass +class DayRangeWindow: + value: int + # raw indicates fetch from the source data + source: Literal["raw"] = "raw" + + +@dataclass +class RangeConfig: + """ + Defines the range of baseline and comparison windows for a given policy. + - baseline: the baseline window that build the baseline value + - comparison: the comparison window that we fetch data from to compare against the baseline value + """ + + baseline: DayRangeWindow + comparison: DayRangeWindow + + def total_timedelta(self) -> timedelta: + return timedelta(days=self.baseline.value + self.comparison.value) + + def comparison_timedelta(self) -> timedelta: + return timedelta(days=self.comparison.value) + + def baseline_timedelta(self) -> timedelta: + return timedelta(days=self.baseline.value) + + +# -------- Policy: metrics -------- +@dataclass +class RegressionPolicy: + """ + Defines the policy for a given metric. + - new value muset be {x} baseline value: + - "greater_than": higher is better; new value must be strictly greater to baseline + - "less_than": lower is better; new value must be strictly lower to baseline + - "equal_to": new value should be ~= baseline * threshold within rel_tol + - "greater_equal": higher is better; new value must be greater or equal to baseline + - "less_equal": lower is better; new value must be less or equal to baseline + """ + + name: str + condition: Literal[ + "greater_than", "less_than", "equal_to", "greater_equal", "less_equal" + ] + threshold: float + baseline_aggregation: Literal[ + "avg", "max", "min", "p50", "p90", "p95", "latest", "earliest" + ] = "max" + rel_tol: float = 1e-3 # used only for "equal_to" + + def is_violation(self, value: float, baseline: float) -> bool: + target = baseline * self.threshold + + if self.condition == "greater_than": + # value must be strictly greater than target + return value <= target + + if self.condition == "greater_equal": + # value must be greater or equal to target + return value < target + + if self.condition == "less_than": + # value must be strictly less than target + return value >= target + + if self.condition == "less_equal": + # value must be less or equal to target + return value > target + + if self.condition == "equal_to": + # |value - target| should be within rel_tol * max(1, |target|) + denom = max(1.0, abs(target)) + return abs(value - target) > self.rel_tol * denom + + raise ValueError(f"Unknown condition: {self.condition}") + + +@dataclass +class Policy: + frequency: Frequency + range: RangeConfig + metrics: Dict[str, RegressionPolicy] + + # TODO(elainewy): add notification config + notification_config: Optional[Dict[str, Any]] = None + + +# -------- Top-level benchmark regression config -------- +@dataclass +class BenchmarkConfig: + """ + Represents a single benchmark regression configuration. + - BenchmarkConfig defines the benchmark regression config for a given benchmark. + - source: defines the source of the benchmark data we want to query + - policy: defines the policy for the benchmark regressions, including frequency to + generate the report, range of the baseline and new values, and regression thresholds + for metrics + - name: the name of the benchmark + - id: the id of the benchmark, this must be unique for each benchmark, and cannot be changed once set + """ + + name: str + id: str + source: BenchmarkApiSource + policy: Policy + + +@dataclass +class BenchmarkRegressionConfigBook: + configs: Dict[str, BenchmarkConfig] = field(default_factory=dict) + + def __getitem__(self, key: str) -> BenchmarkConfig: + config = self.configs.get(key) + if not config: + raise KeyError(f"Config {key} not found") + return config diff --git a/aws/lambda/benchmark_regression_summary_report/common/regression_utils.py b/aws/lambda/benchmark_regression_summary_report/common/regression_utils.py new file mode 100644 index 0000000000..b1d01aafee --- /dev/null +++ b/aws/lambda/benchmark_regression_summary_report/common/regression_utils.py @@ -0,0 +1,296 @@ +import logging +import math +import statistics +from typing import Any, Counter, Dict, List, Literal, Optional, Tuple, TypedDict + +from common.benchmark_time_series_api_model import BenchmarkTimeSeriesApiData +from common.config_model import BenchmarkConfig, RegressionPolicy +from dateutil.parser import isoparse + + +logger = logging.getLogger() + +RegressionClassifyLabel = Literal[ + "regression", "suspicious", "no_regression", "insufficient_data" +] + + +class BaselineItem(TypedDict): + group_info: Dict[str, Any] + value: float + + +class BenchmarkValueItem(TypedDict): + group_info: Dict[str, Any] + values: List[Dict[str, Any]] + + +class PerGroupResult(TypedDict, total=True): + group_info: Dict[str, Any] + baseline: Optional[float] + points: List[Any] + label: RegressionClassifyLabel + policy: Optional["RegressionPolicy"] + + +def percentile(values: list[float], q: float): + v = sorted(values) + k = (len(v) - 1) * q + f = math.floor(k) + c = math.ceil(k) + if f == c: + return v[int(k)] + return v[f] + (v[c] - v[f]) * (k - f) + + +class BenchmarkRegressionReportGenerator: + def __init__( + self, + config: BenchmarkConfig, + latest_ts: BenchmarkTimeSeriesApiData, + baseline_ts: BenchmarkTimeSeriesApiData, + ) -> None: + self.metric_policies = config.policy.metrics + self.latest_ts = self._to_data_map(latest_ts) + self.baseline_raw = self._to_data_map(baseline_ts) + + def generate(self) -> Tuple[List[PerGroupResult], Dict[str, Any]]: + return self.detect_regressions_with_policies( + self.baseline_raw, + self.latest_ts, + metric_policies=self.metric_policies, + ) + + def detect_regressions_with_policies( + self, + baseline_map: Dict[tuple, BenchmarkValueItem], + dp_map: Dict[tuple, BenchmarkValueItem], + *, + metric_policies: Dict[str, RegressionPolicy], + min_points: int = 2, + ) -> Tuple[List[PerGroupResult], Dict[str, Any]]: + """ + For each dp_map: + - choose policy based on targeting metric from group_info['metric'] (ex passrate, geomean ..) + - calculate baseline value based on policy.baseline_aggregation (ex mean, p90, max, min, latest, p50, p95) + - use baseline value to generate violation flag list for each point, using policy.is_violation(value, baseline) + - classify with labels to detect regression, using self.classify_flags(flags, min_points) + Returns a list of Regression result {group_info, baseline, values, flags, label, policy} + """ + logger.info("Generating regression results ...") + results: List[PerGroupResult] = [] + + for key in sorted(dp_map.keys()): + cur_item = dp_map.get(key) + gi = cur_item["group_info"] if cur_item else {} + points: List[Any] = cur_item["values"] if cur_item else [] + + base_item = baseline_map.get(key) + if not base_item: + logger.warning("Skip. No baseline item found for %s", gi) + results.append( + PerGroupResult( + group_info=gi, + baseline=None, + points=[], + label="insufficient_data", + policy=None, + ) + ) + continue + policy = self._resolve_policy(metric_policies, gi.get("metric", "")) + if not policy: + logger.warning("No policy for %s", gi) + results.append( + PerGroupResult( + group_info=gi, + baseline=None, + points=[], + label="insufficient_data", + policy=None, + ) + ) + continue + + baseline_aggre_mode = policy.baseline_aggregation + baseline_value = self._get_baseline(base_item, baseline_aggre_mode) + if baseline_value is None or len(points) == 0: + logger.warning( + "baseline_value is %s, len(points) == %s", + baseline_value, + len(points), + ) + results.append( + PerGroupResult( + group_info=gi, + baseline=None, + points=[], + label="insufficient_data", + policy=policy, + ) + ) + continue + + # Per-point violations (True = regression) + flags: List[bool] = [ + policy.is_violation(p["value"], baseline_value["value"]) for p in points + ] + label = self.classify_flags(flags, min_points=min_points) + + enriched_points = [{**p, "flag": f} for p, f in zip(points, flags)] + results.append( + PerGroupResult( + group_info=gi, + baseline=baseline_value["value"], + points=enriched_points, + label=label, + policy=policy, + ) + ) + + logger.info("Done. Generated %s regression results", len(results)) + summary = self.summarize_label_counts(results) + return results, summary + + def summarize_label_counts(self, results: list[PerGroupResult]): + counts = Counter(self._label_str(r["label"]) for r in results) + total_count = len(results) + return { + "total_count": total_count, + "regression_count": counts.get("regression", 0), + "suspicious_count": counts.get("suspicious", 0), + "no_regression_count": counts.get("no_regression", 0), + "insufficient_data_count": counts.get("insufficient_data", 0), + "is_regression": int(counts.get("regression", 0) > 0), + } + + def _label_str(self, x) -> str: + # Robust: works for str or Enum-like labels + if isinstance(x, str): + return x.lower() + if hasattr(x, "value"): + v = x.value + return (v if isinstance(v, str) else str(v)).lower() + return str(x).lower() + + def _to_data_map( + self, data: "BenchmarkTimeSeriesApiData", field: str = "value" + ) -> Dict[tuple, BenchmarkValueItem]: + result: Dict[tuple, BenchmarkValueItem] = {} + for ts_group in data.time_series: + group_keys = tuple(sorted(ts_group.group_info.items())) + points: List[Dict[str, Any]] = [] + for d in sorted( + ts_group.data, key=lambda d: isoparse(d["granularity_bucket"]) + ): + if field not in d: + continue + points.append( + { + "value": float(d[field]), + "commit": d.get("commit"), + "branch": d.get("branch"), + "timestamp": isoparse(d["granularity_bucket"]), + } + ) + result[group_keys] = { + "group_info": ts_group.group_info, + "values": points, + } + return result + + def _get_baseline( + self, + data: BenchmarkValueItem, + mode: str = "mean", + field: str = "value", + ) -> Optional[BaselineItem]: + """ + calculate the baseline value based on the mode + mode: mean, p90, max, min, latest, p50, p95 + """ + values = [float(d[field]) for d in data["values"] if field in d] + if not values: + return None + + if mode == "mean": + val = statistics.fmean(values) + elif mode == "p90": + val = percentile(values, 0.9) + elif mode == "max": + val = max(values) + elif mode == "min": + val = min(values) + elif mode == "latest": + val = values[-1] + elif mode == "earliest": + val = values[0] + elif mode == "p50": + val = percentile(values, 0.5) + elif mode == "p95": + val = percentile(values, 0.95) + else: + logger.warning("Unknown mode: %s", mode) + return None + result: BaselineItem = { + "group_info": data["group_info"], + "value": val, + } + return result + + def classify_flags( + self, flags: list[bool], min_points: int = 3 + ) -> RegressionClassifyLabel: + """ + Classify a sequence of boolean flags to detect regression. + + - regression: last run has >= 2 consecutive True values + - suspicious: there is a run of >= 3 consecutive True values, but not at the end + - no_regression: all other cases + - insufficient_data: not enough data points (< min_points) + + Special case: + - If min_points == 1, then just look at the last flag: + True -> regression + False -> no_regression + """ + n = len(flags) + if n == 0: + return "insufficient_data" + + if min_points == 1: + return "regression" if flags[-1] else "no_regression" + + if n < min_points: + return "insufficient_data" + + # trailing run length + t = 0 + for v in reversed(flags): + if v: + t += 1 + else: + break + if t >= 2: + return "regression" + + # longest run anywhere + longest = cur = 0 + for v in flags: + cur = cur + 1 if v else 0 + longest = max(longest, cur) + + if longest >= 3: + return "suspicious" + + return "no_regression" + + def _resolve_policy( + self, + metric_policies: Dict[str, RegressionPolicy], + metric: str, + ) -> Optional[RegressionPolicy]: + if not metric: + return None + m = metric.lower() + return metric_policies.get(m) diff --git a/aws/lambda/benchmark_regression_summary_report/lambda_function.py b/aws/lambda/benchmark_regression_summary_report/lambda_function.py new file mode 100644 index 0000000000..fb7f57bd9e --- /dev/null +++ b/aws/lambda/benchmark_regression_summary_report/lambda_function.py @@ -0,0 +1,479 @@ +#!/usr/bin/env python +import argparse +import datetime as dt +import json +import logging +import os +import threading +import time +from concurrent.futures import as_completed, ThreadPoolExecutor +from typing import Any, Optional + +import clickhouse_connect +import requests +from common.benchmark_time_series_api_model import BenchmarkTimeSeriesApiResponse +from common.config import get_benchmark_regression_config +from common.config_model import BenchmarkApiSource, BenchmarkConfig, Frequency +from common.regression_utils import BenchmarkRegressionReportGenerator +from dateutil.parser import isoparse + + +logging.basicConfig( + level=logging.INFO, +) +logger = logging.getLogger() +logger.setLevel("INFO") + +ENVS = { + "GITHUB_ACCESS_TOKEN": os.getenv("GITHUB_ACCESS_TOKEN", ""), + "CLICKHOUSE_ENDPOINT": os.getenv("CLICKHOUSE_ENDPOINT", ""), + "CLICKHOUSE_PASSWORD": os.getenv("CLICKHOUSE_PASSWORD", ""), + "CLICKHOUSE_USERNAME": os.getenv("CLICKHOUSE_USERNAME", ""), +} + +# TODO(elainewy): change this to benchmark.benchmark_regression_report once the table is created +BENCHMARK_REGRESSION_REPORT_TABLE = "fortesting.benchmark_regression_report" +BENCHMARK_REGRESSION_TRACKING_CONFIG_IDS = ["compiler_regression"] + + +def truncate_to_hour(ts: dt.datetime) -> dt.datetime: + return ts.replace(minute=0, second=0, microsecond=0) + + +def get_clickhouse_client( + host: str, user: str, password: str +) -> clickhouse_connect.driver.client.Client: + # for local testing only, disable SSL verification + return clickhouse_connect.get_client( host=host, user=user, password=password, secure=True, verify=False) + + return clickhouse_connect.get_client( + host=host, user=user, password=password, secure=True + ) + + +def get_clickhouse_client_environment() -> clickhouse_connect.driver.client.Client: + for name, env_val in ENVS.items(): + if not env_val: + raise ValueError(f"Missing environment variable {name}") + return get_clickhouse_client( + host=ENVS["CLICKHOUSE_ENDPOINT"], + user=ENVS["CLICKHOUSE_USERNAME"], + password=ENVS["CLICKHOUSE_PASSWORD"], + ) + + +BENCHMARK_REGRESSION_SUMMARY_REPORT_TABLE = ( + "fortesting.benchmark_regression_summary_report" +) + + +class BenchmarkSummaryProcessor: + def __init__( + self, + is_dry_run: bool = False, + ) -> None: + self.is_dry_run = is_dry_run + + def process( + self, + config_id: str, + end_time: dt.datetime, + cc: Optional[clickhouse_connect.driver.client.Client] = None, + args: Optional[argparse.Namespace] = None, + ): + def log_info(msg: str): + logger.info("[%s] %s", config_id, msg) + + def log_error(msg: str): + logger.error("[%s] %s", config_id, msg) + + # ensure each thread has its own clickhouse client. clickhouse client + # is not thread-safe. + if cc is None: + tlocal = threading.local() + if not hasattr(tlocal, "cc") or tlocal.cc is None: + if args: + tlocal.cc = get_clickhouse_client( + args.clickhouse_endpoint, + args.clickhouse_username, + args.clickhouse_password, + ) + else: + tlocal.cc = get_clickhouse_client_environment() + cc = tlocal.cc + try: + config = get_benchmark_regression_config(config_id) + log_info(f"found config for config_id {config_id}") + except ValueError as e: + log_error(f"Skip process, Invalid config: {e}") + return + except Exception as e: + log_error(f"Unexpected error from get_benchmark_regression_config: {e}") + return + + # check if the current time is > policy's time_delta + previous record_ts from summary_table + report_freq = config.policy.frequency + should_generate = self._should_generate_report( + cc, end_time, config_id, report_freq + ) + if not should_generate: + log_info( + f"Skip generate report for time:{end_time} with frequency {report_freq.get_text()}, no data found", + ) + return + else: + log_info( + f"Plan to generate report for time:{end_time} with frequency {report_freq.get_text()}..." + ) + latest, ls, le = self.get_latest(config, end_time) + if not latest: + log_info( + f"no latest data found for time range [{ls},{le}] with frequency {report_freq.get_text()}..." + ) + return + + baseline, bs, be = self.get_baseline(config, end_time) + if not baseline: + log_info( + f"no baseline data found for time range [{bs},{be}] with frequency {report_freq.get_text()}..." + ) + return + + generator = BenchmarkRegressionReportGenerator( + config=config, latest_ts=latest, baseline_ts=baseline + ) + + result, regression_summary = generator.generate() + if self.is_dry_run: + print("regression_detected: ", regression_summary) + print(json.dumps(result, indent=2, default=str)) + return + + def get_latest(self, config: BenchmarkConfig, end_time: dt.datetime): + data_range = config.policy.range + latest_s = end_time - data_range.comparison_timedelta() + latest_e = end_time + latest_data = self._fetch_from_benchmark_ts_api( + config_id=config.id, + start_time=latest_s, + end_time=latest_e, + source=config.source, + ) + logger.info( + "[%s] found %s # of data, with time range %s", + config.id, + len(latest_data.time_series), + latest_data.time_range, + ) + if not latest_data.time_range or not latest_data.time_range.end: + return None, latest_s, latest_e + if not self.should_use_data(config.id, latest_data.time_range.end, end_time): + return None, latest_s, latest_e + return latest_data, latest_s, latest_e + + def get_baseline(self, config: BenchmarkConfig, end_time: dt.datetime): + data_range = config.policy.range + baseline_s = end_time - data_range.total_timedelta() + baseline_e = end_time - data_range.comparison_timedelta() + # fetch baseline from api + raw_data = self._fetch_from_benchmark_ts_api( + config_id=config.id, + start_time=baseline_s, + end_time=baseline_e, + source=config.source, + ) + + logger.info( + "found %s # of data, with time range %s", + len(raw_data.time_series), + raw_data.time_range, + ) + if not self.should_use_data(config.id, raw_data.time_range.end, baseline_e): + logger.info( + "[%s][get_basline] Skip generate report, no data found during [%s,%s]", + config.id, + baseline_s.isoformat(), + baseline_e.isoformat(), + ) + return None, baseline_s, baseline_e + return raw_data, baseline_s, baseline_e + + def should_use_data( + self, + config_id: str, + latest_ts_str: str, + end_time: dt.datetime, + min_delta: Optional[dt.timedelta] = None, + ) -> bool: + # set default + if not min_delta: + min_delta = dt.timedelta(days=2) + + if not latest_ts_str: + return False + latest_dt = isoparse(latest_ts_str) + cutoff = end_time - min_delta + + if latest_dt >= cutoff: + return True + logger.info( + "[%s] expect latest data to be after %s, but got %s", + config_id, + cutoff, + latest_dt, + ) + return False + + def _fetch_from_benchmark_ts_api( + self, + config_id: str, + end_time: dt.datetime, + start_time: dt.datetime, + source: BenchmarkApiSource, + ): + str_end_time = end_time.strftime("%Y-%m-%dT%H:%M:%S") + str_start_time = start_time.strftime("%Y-%m-%dT%H:%M:%S") + query = source.render( + ctx={ + "startTime": str_start_time, + "stopTime": str_end_time, + } + ) + url = source.api_query_url + + logger.info("[%s]trying to call %s", config_id, url) + t0 = time.perf_counter() + try: + resp: BenchmarkTimeSeriesApiResponse = ( + BenchmarkTimeSeriesApiResponse.from_request(url, query) + ) + + elapsed_ms = (time.perf_counter() - t0) * 1000.0 + logger.info( + "[%s] call OK in %.1f ms (query_len=%d)", + config_id, + elapsed_ms, + len(query), + ) + return resp.data + except requests.exceptions.HTTPError as e: + elapsed_ms = (time.perf_counter() - t0) * 1000.0 + # Try to extract a useful server message safely + try: + err_msg = ( + e.response.json().get("error") if e.response is not None else str(e) + ) + except Exception: + err_msg = ( + e.response.text + if (e.response is not None and hasattr(e.response, "text")) + else str(e) + ) + logger.error( + "[%s] call FAILED in %.1f ms: %s", config_id, elapsed_ms, err_msg + ) + raise + + except Exception as e: + elapsed_ms = (time.perf_counter() - t0) * 1000.0 + logger.error("[%s] call CRASHED in %.1f ms", config_id, elapsed_ms) + raise RuntimeError(f"[{config_id}]Fetch failed: {e}") + + def _should_generate_report( + self, + cc: clickhouse_connect.driver.client.Client, + end_time: dt.datetime, + config_id: str, + f: Frequency, + ) -> bool: + def _get_latest_record_ts( + cc: clickhouse_connect.driver.Client, + config_id: str, + ) -> Optional[dt.datetime]: + table = BENCHMARK_REGRESSION_REPORT_TABLE + res = cc.query( + f""" + SELECT max(last_record_ts) + FROM {table} + WHERE report_id = {{config_id:String}} + """, + parameters={"config_id": config_id}, + ) + if not res.result_rows or res.result_rows[0][0] is None: + return None + latest: dt.datetime = res.result_rows[0][ + 0 + ] # typically tz-aware UTC from clickhouse_connect + # If not tz-aware, force UTC: + if latest.tzinfo is None: + latest = latest.replace(tzinfo=dt.timezone.utc) + return latest + + freq_delta = f.to_timedelta() + latest_record_ts = _get_latest_record_ts(cc, config_id) + + # No report exists yet, generate + if not latest_record_ts: + return True + + end_utc = ( + end_time if end_time.tzinfo else end_time.replace(tzinfo=dt.timezone.utc) + ) + end_utc = end_utc.astimezone(dt.timezone.utc) + cutoff = end_time - freq_delta + return latest_record_ts < cutoff + + +class WorkerPoolHandler: + """ + WorkerPoolHandler runs workers in parallel to generate benchmark regression report + and writes the results to the target destination. + + """ + + def __init__( + self, + benchmark_summary_processor: BenchmarkSummaryProcessor, + max_workers: int = 6, + ): + self.benchmark_summary_processor = benchmark_summary_processor + self.max_workers = max_workers + + def start( + self, + config_ids: list[str], + args: Optional[argparse.Namespace] = None, + ) -> None: + logger.info( + "[WorkerPoolHandler] start to process benchmark " + "summary data with required config: %s", + config_ids, + ) + end_time = dt.datetime.now(dt.timezone.utc).replace( + minute=0, second=0, microsecond=0 + ) + logger.info("current time with hour granularity(utc) %s", end_time) + with ThreadPoolExecutor(max_workers=self.max_workers) as executor: + futures = [] + for config_id in config_ids: + future = executor.submit( + self.benchmark_summary_processor.process, + config_id, + end_time, + cc=None, + args=args, + ) + futures.append(future) + results = [] + errors = [] + + # handle results from parallel processing + for future in as_completed(futures): + try: + result = future.result() + # This will raise an exception if one occurred + results.append(result) + except Exception as e: + logger.warning(f"Error processing future: {e}") + errors.append({"error": str(e)}) + + +def main( + args: Optional[argparse.Namespace] = None, + github_access_token: str = "", + is_dry_run: bool = False, +): + """ + Main method to run in both local environment and lambda handler. + 1. generate intervals[start_time,end_time] using latest timestamp from source table and target table + 2. call WorkerPoolHandler to geneterate and write histogram data for each interval in parallel + """ + if not github_access_token: + raise ValueError("Missing environment variable GITHUB_ACCESS_TOKEN") + + # get time intervals. + logger.info("[Main] start work ....") + + # get jobs in queue from clickhouse for list of time intervals, in parallel + handler = WorkerPoolHandler( + BenchmarkSummaryProcessor(is_dry_run=is_dry_run), + ) + handler.start(BENCHMARK_REGRESSION_TRACKING_CONFIG_IDS, args) + logger.info(" [Main] Done. work completed.") + + +def lambda_handler(event: Any, context: Any) -> None: + """ + Main method to run in aws lambda environment + """ + main( + None, + github_access_token=ENVS["GITHUB_ACCESS_TOKEN"], + ) + return + + +def parse_args() -> argparse.Namespace: + """ + Parse command line args, this is mainly used for local test environment. + """ + parser = argparse.ArgumentParser() + parser.add_argument( + "--clickhouse-endpoint", + default=ENVS["CLICKHOUSE_ENDPOINT"], + type=str, + help="the clickhouse endpoint, the clickhouse_endpoint " + + "name is https://{clickhouse_endpoint}:{port} for full url ", + ) + parser.add_argument( + "--clickhouse-username", + type=str, + default=ENVS["CLICKHOUSE_USERNAME"], + help="the clickhouse username", + ) + parser.add_argument( + "--clickhouse-password", + type=str, + default=ENVS["CLICKHOUSE_PASSWORD"], + help="the clickhouse password for the user name", + ) + parser.add_argument( + "--github-access-token", + type=str, + default=ENVS["GITHUB_ACCESS_TOKEN"], + help="the github access token to access github api", + ) + parser.add_argument( + "--not-dry-run", + action="store_true", + help="when set, writing results to destination from local " + + "environment. By default, we run in dry-run mode for local " + + "environment", + ) + args, _ = parser.parse_known_args() + return args + + +def local_run() -> None: + """ + method to run in local test environment + """ + + args = parse_args() + + logger.info("args: %s", args) + + # update environment variables for input parameters + + # always run in dry-run mode in local environment, unless it's disabled. + is_dry_run = not args.not_dry_run + + main( + args, + args.github_access_token, + is_dry_run=is_dry_run, + ) + + +if __name__ == "__main__": + local_run() diff --git a/aws/lambda/benchmark_regression_summary_report/requirements.txt b/aws/lambda/benchmark_regression_summary_report/requirements.txt new file mode 100644 index 0000000000..2a715720c7 --- /dev/null +++ b/aws/lambda/benchmark_regression_summary_report/requirements.txt @@ -0,0 +1,6 @@ +clickhouse_connect==0.8.5 +boto3==1.35.33 +PyGithub==1.59.0 +python-dateutil==2.8.2 +PyYAML==6.0.1 +Jinja2==3.1.2 diff --git a/clickhouse_db_schema/benchmark_regression_summary_report/schema.sql b/clickhouse_db_schema/benchmark_regression_summary_report/schema.sql new file mode 100644 index 0000000000..a1580672ea --- /dev/null +++ b/clickhouse_db_schema/benchmark_regression_summary_report/schema.sql @@ -0,0 +1,31 @@ +CREATE TABLE fortesting.benchmark_regression_report +( + `id` UUID DEFAULT generateUUIDv4(), + `report_id` String, -- unique id for the report config + `created_at` DateTime64(0, 'UTC') DEFAULT now(), + `last_record_ts` DateTime64(0, 'UTC'), + `last_record_commit` String, + `type` String, -- e.g. 'daily','weekly' + `status` String, -- e.g. 'no_regression',"regression",'failure' + `regression_count` UInt32 DEFAULT 0, + `insufficient_data_count` UInt32 DEFAULT 0, + `suspected_regression_count` UInt32 DEFAULT 0, + `total_count` UInt32 DEFAULT 0, + `repo` String, + `report` String DEFAULT '{}' +) +ENGINE = SharedMergeTree('/clickhouse/tables/{uuid}/{shard}', '{replica}') +PARTITION BY toYYYYMM(created_at) +ORDER BY +( + report_id, + type, + status, + last_record_ts, + last_record_commit, + created_at, + repo, + id +) +TTL created_at + toIntervalYear(10) +SETTINGS index_granularity = 8192; From ed9db736615387d3d0a6e05a46ac759f5bac0e61 Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Wed, 3 Sep 2025 20:50:29 -0700 Subject: [PATCH 2/9] Update [ghstack-poisoned] --- .../common/benchmark_time_series_api_model.py | 24 +- .../common/config_model.py | 2 +- .../common/report_manager.py | 248 ++++++++++++++++++ .../lambda_function.py | 52 +++- .../schema.sql | 2 + 5 files changed, 314 insertions(+), 14 deletions(-) create mode 100644 aws/lambda/benchmark_regression_summary_report/common/report_manager.py diff --git a/aws/lambda/benchmark_regression_summary_report/common/benchmark_time_series_api_model.py b/aws/lambda/benchmark_regression_summary_report/common/benchmark_time_series_api_model.py index fe7705a6ea..f5eba89275 100644 --- a/aws/lambda/benchmark_regression_summary_report/common/benchmark_time_series_api_model.py +++ b/aws/lambda/benchmark_regression_summary_report/common/benchmark_time_series_api_model.py @@ -1,5 +1,6 @@ +import datetime as dt from dataclasses import dataclass, field -from typing import Any, Dict, List +from typing import Any, Dict, List, Optional import requests @@ -62,3 +63,24 @@ def from_request( except Exception as e: raise RuntimeError(f"Malformed API payload: {e}") return cls(data=BenchmarkTimeSeriesApiData(time_series=ts, time_range=tr)) + + +def get_latest_meta_info( + time_series: List[BenchmarkTimeSeriesItem], +) -> Optional[dict[str, Any]]: + if not time_series: + return None + + pts = [p for s in time_series for p in s.data] + latest = max( + pts, + key=lambda p: dt.datetime.fromisoformat( + p["granularity_bucket"].replace("Z", "+00:00") + ), + ) + return { + "commit": latest.get("commit", ""), + "branch": latest.get("branch", ""), + "timestamp": latest.get("granularity_bucket", ""), + "workflow_id": latest.get("workflow_id", ""), + } diff --git a/aws/lambda/benchmark_regression_summary_report/common/config_model.py b/aws/lambda/benchmark_regression_summary_report/common/config_model.py index 7779f17f2d..db8fe09455 100644 --- a/aws/lambda/benchmark_regression_summary_report/common/config_model.py +++ b/aws/lambda/benchmark_regression_summary_report/common/config_model.py @@ -36,7 +36,7 @@ def to_timedelta(self) -> timedelta: raise ValueError(f"Unsupported unit: {self.unit}") def get_text(self): - return f"{self.value} {self.unit}" + return f"{self.value}_{self.unit}" # -------- Source -------- diff --git a/aws/lambda/benchmark_regression_summary_report/common/report_manager.py b/aws/lambda/benchmark_regression_summary_report/common/report_manager.py new file mode 100644 index 0000000000..c087284c4f --- /dev/null +++ b/aws/lambda/benchmark_regression_summary_report/common/report_manager.py @@ -0,0 +1,248 @@ +import dataclasses +import datetime as dt +import json +import logging +import uuid +from typing import Any, Dict, List + +import clickhouse_connect +from common.config_model import BenchmarkConfig, Frequency +from common.regression_utils import PerGroupResult +from jinja2 import Template + + +logger = logging.getLogger() + + +REPORT_MD_TEMPLATE = """# Benchmark Report {{id}} +config_id: `{{ report_id }}` + +> **Status:** {{ status }} · **Frequency:** {{ frequency }} + +## Latest +- **Timestamp:** `{{ latest.timestamp | default('') }}` +- **Commit:** `{{ (latest.commit | default(''))[:12] }}` +- **Branch:** `{{ latest.branch | default('') }}` +- **Workflow ID:** `{{ latest.workflow_id | default('') }}` + +## Summary +| Metric | Value | +| :-- | --: | +| Total | {{ summary.total_count | default(0) }} | +| Regressions | {{ summary.regression_count | default(0) }} | +| Suspicious | {{ summary.suspicious_count | default(0) }} | +| No Regression | {{ summary.no_regression_count | default(0) }} | +| Insufficient Data | {{ summary.insufficient_data_count | default(0) }} | +""" + + +class ReportManager: + """ + handles db insertion and notification processing + """ + + def __init__( + self, + db_table_name: str, + config_id: str, + config: BenchmarkConfig, + regression_summary: Dict[str, Any], + latest_meta_info: Dict[str, Any], + result: List[PerGroupResult], + type: str = "general", + repo: str = "pytorch/pytorch", + ): + self.regression_summary = regression_summary + self.regression_result = result + self.config_id = config_id + self.config = config + self.status = self._resolve_status(regression_summary) + self.latest_meta_info = self._validate_latest_meta_info(latest_meta_info) + self.report_data = self._to_report_data( + config_id=config_id, + summary=self.regression_summary, + report=self.regression_result, + latest=self.latest_meta_info, + status=self.status, + frequency=self.config.policy.frequency, + ) + self.type = type + self.repo = repo + self.db_table_name = db_table_name + self.id = str(uuid.uuid4()) + + def run(self, cc: clickhouse_connect.driver.client.Client) -> None: + try: + self.insert_to_db(cc) + except Exception as e: + logger.error(f"failed to insert report to db, error: {e}") + raise + + def _to_markdoown(self): + md = Template(REPORT_MD_TEMPLATE, trim_blocks=True, lstrip_blocks=True).render( + id=self.id, + status=self.status, + report_id=self.config_id, + summary=self.regression_summary, + latest=self.latest_meta_info, + frequency=self.config.policy.frequency.get_text(), + ) + return md + + def insert_to_db( + self, + cc: clickhouse_connect.driver.client.Client, + ) -> None: + logger.info( + "[%s]prepare data for db insertion report (%s)...", self.config_id, self.id + ) + + table = self.db_table_name + + latest_ts_str = self.latest_meta_info.get("timestamp") + if not latest_ts_str: + raise ValueError( + f"timestamp from latest is required, latest is {self.latest_meta_info}" + ) + + # ---- 转 UTC,并格式成 ClickHouse 友好的 'YYYY-MM-DD HH:MM:SS' ---- + aware = dt.datetime.fromisoformat(latest_ts_str.replace("Z", "+00:00")) + utc_naive = aware.astimezone(dt.timezone.utc).replace(tzinfo=None) + last_record_ts = utc_naive.strftime( + "%Y-%m-%d %H:%M:%S" + ) # 给 {DateTime64(0)} 用 + + report_json = json.dumps( + self.report_data, ensure_ascii=False, separators=(",", ":"), default=str + ) + + params = { + "id": str(self.id), # 列是 UUID,用 {id:UUID} + "report_id": self.config_id, + "type": self.type, + "status": self.status, + "last_record_commit": self.latest_meta_info.get("commit", ""), + "last_record_ts": last_record_ts, # 已是 UTC,无时区 + "regression_count": int(self.regression_summary.get("regression_count", 0)), + "insufficient_data_count": int( + self.regression_summary.get("insufficient_data_count", 0) + ), + "suspected_regression_count": int( + self.regression_summary.get("suspicious_count", 0) + ), + "total_count": int(self.regression_summary.get("total_count", 0)), + "repo": self.repo, + "report_json": report_json, + } + + logger.info( + "[%s]inserting benchmark regression report(%s)", self.config_id, self.id + ) + + # 纯 INSERT ... SELECT ... FROM system.one + NOT EXISTS 保护 + cc.query( + f""" + INSERT INTO {table} ( + id, + report_id, + last_record_ts, + last_record_commit, + `type`, + status, + regression_count, + insufficient_data_count, + suspected_regression_count, + total_count, + repo, + report + ) + SELECT + {{id:UUID}}, + {{report_id:String}}, + {{last_record_ts:DateTime64(0)}}, + {{last_record_commit:String}}, + {{type:String}}, + {{status:String}}, + {{regression_count:UInt32}}, + {{insufficient_data_count:UInt32}}, + {{suspected_regression_count:UInt32}}, + {{total_count:UInt32}}, + {{repo:String}}, + {{report_json:String}} + FROM system.one + WHERE NOT EXISTS ( + SELECT 1 + FROM {table} + WHERE report_id = {{report_id:String}} + AND `type` = {{type:String}} + AND repo = {{repo:String}} + AND stamp = toDate({{last_record_ts:DateTime64(0)}}) + ); + """, + parameters=params, + ) + + logger.info( + "[%s] Done. inserted benchmark regression report(%s)", + self.config_id, + self.id, + ) + + def _resolve_status(self, regression_summary: Dict[str, Any]) -> str: + status = ( + "regression" + if regression_summary.get("regression_count", 0) > 0 + else "suspicious" + if regression_summary.get("suspicious_count", 0) > 0 + else "no_regression" + ) + return status + + def _validate_latest_meta_info( + self, latest_meta_info: Dict[str, Any] + ) -> Dict[str, Any]: + latest_commit = latest_meta_info.get("commit") + if not latest_commit: + raise ValueError( + f"missing commit from latest is required, latest is {latest_meta_info}" + ) + lastest_ts_str = latest_meta_info.get("timestamp") + if not lastest_ts_str: + raise ValueError( + f"timestamp from latest is required, latest is {latest_meta_info}" + ) + return latest_meta_info + + def _to_report_data( + self, + config_id: str, + summary: Dict[str, Any], + report: List[Any], # List[PerGroupResult] or dicts + latest: dict[str, Any], # {"commit","branch","timestamp","workflow_id"} + status: str, + frequency: Frequency, + ) -> dict[str, Any]: + latest_commit = latest.get("commit") + if not latest_commit: + raise ValueError( + f"missing commit from latest is required, latest is {latest}" + ) + lastest_ts_str = latest.get("timestamp") + if not lastest_ts_str: + raise ValueError(f"timestamp from latest is required, latest is {latest}") + + def to_dict(x): # handle dataclass or dict/object + if dataclasses.is_dataclass(x): + return dataclasses.asdict(x) + if isinstance(x, dict): + return x + return vars(x) if hasattr(x, "__dict__") else {"value": str(x)} + + return { + "status": status, + "report_id": config_id, + "summary": summary, + "latest": latest, + "details": [to_dict(x) for x in report], + "frequency": frequency.get_text(), + } diff --git a/aws/lambda/benchmark_regression_summary_report/lambda_function.py b/aws/lambda/benchmark_regression_summary_report/lambda_function.py index fb7f57bd9e..8f3c42aab1 100644 --- a/aws/lambda/benchmark_regression_summary_report/lambda_function.py +++ b/aws/lambda/benchmark_regression_summary_report/lambda_function.py @@ -11,13 +11,19 @@ import clickhouse_connect import requests -from common.benchmark_time_series_api_model import BenchmarkTimeSeriesApiResponse +from common.benchmark_time_series_api_model import ( + BenchmarkTimeSeriesApiResponse, + get_latest_meta_info, +) from common.config import get_benchmark_regression_config from common.config_model import BenchmarkApiSource, BenchmarkConfig, Frequency from common.regression_utils import BenchmarkRegressionReportGenerator +from common.report_manager import ReportManager from dateutil.parser import isoparse +BENCHMARK_REGRESSION_REPORT_TABLE = "fortesting.benchmark_regression_report" + logging.basicConfig( level=logging.INFO, ) @@ -32,7 +38,6 @@ } # TODO(elainewy): change this to benchmark.benchmark_regression_report once the table is created -BENCHMARK_REGRESSION_REPORT_TABLE = "fortesting.benchmark_regression_report" BENCHMARK_REGRESSION_TRACKING_CONFIG_IDS = ["compiler_regression"] @@ -44,7 +49,9 @@ def get_clickhouse_client( host: str, user: str, password: str ) -> clickhouse_connect.driver.client.Client: # for local testing only, disable SSL verification - return clickhouse_connect.get_client( host=host, user=user, password=password, secure=True, verify=False) + return clickhouse_connect.get_client( + host=host, user=user, password=password, secure=True, verify=False + ) return clickhouse_connect.get_client( host=host, user=user, password=password, secure=True @@ -62,11 +69,6 @@ def get_clickhouse_client_environment() -> clickhouse_connect.driver.client.Clie ) -BENCHMARK_REGRESSION_SUMMARY_REPORT_TABLE = ( - "fortesting.benchmark_regression_summary_report" -) - - class BenchmarkSummaryProcessor: def __init__( self, @@ -132,6 +134,12 @@ def log_error(msg: str): ) return + latest_meta_info = get_latest_meta_info(latest.time_series) + if not latest_meta_info: + log_error("no meta info found for latest data") + return + log_info(f"latest data info: {latest_meta_info}") + baseline, bs, be = self.get_baseline(config, end_time) if not baseline: log_info( @@ -142,11 +150,30 @@ def log_error(msg: str): generator = BenchmarkRegressionReportGenerator( config=config, latest_ts=latest, baseline_ts=baseline ) - result, regression_summary = generator.generate() if self.is_dry_run: print("regression_detected: ", regression_summary) print(json.dumps(result, indent=2, default=str)) + return + latest_commit = latest_meta_info.get("commit") + if not latest_commit: + raise ValueError( + f"missing commit from latest is required, latest is {latest}" + ) + lastest_ts_str = latest_meta_info.get("timestamp") + if not lastest_ts_str: + raise ValueError(f"timestamp from latest is required, latest is {latest}") + + reportManager = ReportManager( + config_id=config_id, + config=config, + regression_summary=regression_summary, + latest_meta_info=latest_meta_info, + result=result, + db_table_name=BENCHMARK_REGRESSION_REPORT_TABLE, + ) + reportManager.run(cc) + return def get_latest(self, config: BenchmarkConfig, end_time: dt.datetime): @@ -241,7 +268,7 @@ def _fetch_from_benchmark_ts_api( ) url = source.api_query_url - logger.info("[%s]trying to call %s", config_id, url) + logger.info("[%s] trying to call %s", config_id, url) t0 = time.perf_counter() try: resp: BenchmarkTimeSeriesApiResponse = ( @@ -315,7 +342,6 @@ def _get_latest_record_ts( # No report exists yet, generate if not latest_record_ts: return True - end_utc = ( end_time if end_time.tzinfo else end_time.replace(tzinfo=dt.timezone.utc) ) @@ -352,7 +378,9 @@ def start( end_time = dt.datetime.now(dt.timezone.utc).replace( minute=0, second=0, microsecond=0 ) - logger.info("current time with hour granularity(utc) %s", end_time) + logger.info( + "[WorkerPoolHandler] current time with hour granularity(utc) %s", end_time + ) with ThreadPoolExecutor(max_workers=self.max_workers) as executor: futures = [] for config_id in config_ids: diff --git a/clickhouse_db_schema/benchmark_regression_summary_report/schema.sql b/clickhouse_db_schema/benchmark_regression_summary_report/schema.sql index a1580672ea..1057b87ac3 100644 --- a/clickhouse_db_schema/benchmark_regression_summary_report/schema.sql +++ b/clickhouse_db_schema/benchmark_regression_summary_report/schema.sql @@ -4,6 +4,7 @@ CREATE TABLE fortesting.benchmark_regression_report `report_id` String, -- unique id for the report config `created_at` DateTime64(0, 'UTC') DEFAULT now(), `last_record_ts` DateTime64(0, 'UTC'), + `stamp` Date DEFAULT toDate(last_record_ts), `last_record_commit` String, `type` String, -- e.g. 'daily','weekly' `status` String, -- e.g. 'no_regression',"regression",'failure' @@ -20,6 +21,7 @@ ORDER BY ( report_id, type, + stamp, status, last_record_ts, last_record_commit, From a14217a62254aadda1539dc054403bd260fefae7 Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Wed, 3 Sep 2025 21:10:23 -0700 Subject: [PATCH 3/9] Update (base update) [ghstack-poisoned] --- .../benchmark_regression_summary_report/lambda_function.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/aws/lambda/benchmark_regression_summary_report/lambda_function.py b/aws/lambda/benchmark_regression_summary_report/lambda_function.py index fb7f57bd9e..ecd38082f7 100644 --- a/aws/lambda/benchmark_regression_summary_report/lambda_function.py +++ b/aws/lambda/benchmark_regression_summary_report/lambda_function.py @@ -7,14 +7,15 @@ import threading import time from concurrent.futures import as_completed, ThreadPoolExecutor +import time from typing import Any, Optional +from common.regression_utils import BenchmarkRegressionReportGenerator import clickhouse_connect import requests from common.benchmark_time_series_api_model import BenchmarkTimeSeriesApiResponse from common.config import get_benchmark_regression_config from common.config_model import BenchmarkApiSource, BenchmarkConfig, Frequency -from common.regression_utils import BenchmarkRegressionReportGenerator from dateutil.parser import isoparse @@ -142,12 +143,11 @@ def log_error(msg: str): generator = BenchmarkRegressionReportGenerator( config=config, latest_ts=latest, baseline_ts=baseline ) - result, regression_summary = generator.generate() if self.is_dry_run: print("regression_detected: ", regression_summary) print(json.dumps(result, indent=2, default=str)) - return + return def get_latest(self, config: BenchmarkConfig, end_time: dt.datetime): data_range = config.policy.range From 998edee46d57cdfdd24ec70675ac0894ba8d599b Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Wed, 3 Sep 2025 21:22:18 -0700 Subject: [PATCH 4/9] Update (base update) [ghstack-poisoned] --- .../lambda_function.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/aws/lambda/benchmark_regression_summary_report/lambda_function.py b/aws/lambda/benchmark_regression_summary_report/lambda_function.py index ecd38082f7..2acdf4db65 100644 --- a/aws/lambda/benchmark_regression_summary_report/lambda_function.py +++ b/aws/lambda/benchmark_regression_summary_report/lambda_function.py @@ -7,15 +7,14 @@ import threading import time from concurrent.futures import as_completed, ThreadPoolExecutor -import time from typing import Any, Optional -from common.regression_utils import BenchmarkRegressionReportGenerator import clickhouse_connect import requests from common.benchmark_time_series_api_model import BenchmarkTimeSeriesApiResponse from common.config import get_benchmark_regression_config from common.config_model import BenchmarkApiSource, BenchmarkConfig, Frequency +from common.regression_utils import BenchmarkRegressionReportGenerator from dateutil.parser import isoparse @@ -139,15 +138,16 @@ def log_error(msg: str): f"no baseline data found for time range [{bs},{be}] with frequency {report_freq.get_text()}..." ) return - generator = BenchmarkRegressionReportGenerator( config=config, latest_ts=latest, baseline_ts=baseline ) + result, regression_summary = generator.generate() if self.is_dry_run: print("regression_detected: ", regression_summary) print(json.dumps(result, indent=2, default=str)) - return + return + def get_latest(self, config: BenchmarkConfig, end_time: dt.datetime): data_range = config.policy.range From 4e5a709e56f695315868c1069bbe6ef1e7986d66 Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Wed, 3 Sep 2025 21:23:52 -0700 Subject: [PATCH 5/9] Update (base update) [ghstack-poisoned] --- .../benchmark_regression_summary_report/lambda_function.py | 1 - 1 file changed, 1 deletion(-) diff --git a/aws/lambda/benchmark_regression_summary_report/lambda_function.py b/aws/lambda/benchmark_regression_summary_report/lambda_function.py index 2acdf4db65..6f6345e097 100644 --- a/aws/lambda/benchmark_regression_summary_report/lambda_function.py +++ b/aws/lambda/benchmark_regression_summary_report/lambda_function.py @@ -1,7 +1,6 @@ #!/usr/bin/env python import argparse import datetime as dt -import json import logging import os import threading From e51eb0caee93f888889f89d0f8536b6474daef3a Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Wed, 3 Sep 2025 21:27:30 -0700 Subject: [PATCH 6/9] Update (base update) [ghstack-poisoned] --- .../lambda_function.py | 6 ++++-- .../schema.sql | 13 +++++++------ 2 files changed, 11 insertions(+), 8 deletions(-) rename clickhouse_db_schema/{benchmark_regression_summary_report => benchmark_regression_report}/schema.sql (76%) diff --git a/aws/lambda/benchmark_regression_summary_report/lambda_function.py b/aws/lambda/benchmark_regression_summary_report/lambda_function.py index 6f6345e097..bfc5ae4fa0 100644 --- a/aws/lambda/benchmark_regression_summary_report/lambda_function.py +++ b/aws/lambda/benchmark_regression_summary_report/lambda_function.py @@ -1,6 +1,7 @@ #!/usr/bin/env python import argparse import datetime as dt +import json import logging import os import threading @@ -43,7 +44,9 @@ def get_clickhouse_client( host: str, user: str, password: str ) -> clickhouse_connect.driver.client.Client: # for local testing only, disable SSL verification - return clickhouse_connect.get_client( host=host, user=user, password=password, secure=True, verify=False) + return clickhouse_connect.get_client( + host=host, user=user, password=password, secure=True, verify=False + ) return clickhouse_connect.get_client( host=host, user=user, password=password, secure=True @@ -147,7 +150,6 @@ def log_error(msg: str): print(json.dumps(result, indent=2, default=str)) return - def get_latest(self, config: BenchmarkConfig, end_time: dt.datetime): data_range = config.policy.range latest_s = end_time - data_range.comparison_timedelta() diff --git a/clickhouse_db_schema/benchmark_regression_summary_report/schema.sql b/clickhouse_db_schema/benchmark_regression_report/schema.sql similarity index 76% rename from clickhouse_db_schema/benchmark_regression_summary_report/schema.sql rename to clickhouse_db_schema/benchmark_regression_report/schema.sql index a1580672ea..ad25c5708f 100644 --- a/clickhouse_db_schema/benchmark_regression_summary_report/schema.sql +++ b/clickhouse_db_schema/benchmark_regression_report/schema.sql @@ -1,12 +1,13 @@ CREATE TABLE fortesting.benchmark_regression_report ( `id` UUID DEFAULT generateUUIDv4(), - `report_id` String, -- unique id for the report config + `report_id` String, `created_at` DateTime64(0, 'UTC') DEFAULT now(), `last_record_ts` DateTime64(0, 'UTC'), + `stamp` Date DEFAULT toDate(last_record_ts), `last_record_commit` String, - `type` String, -- e.g. 'daily','weekly' - `status` String, -- e.g. 'no_regression',"regression",'failure' + `type` String, + `status` String, `regression_count` UInt32 DEFAULT 0, `insufficient_data_count` UInt32 DEFAULT 0, `suspected_regression_count` UInt32 DEFAULT 0, @@ -16,10 +17,10 @@ CREATE TABLE fortesting.benchmark_regression_report ) ENGINE = SharedMergeTree('/clickhouse/tables/{uuid}/{shard}', '{replica}') PARTITION BY toYYYYMM(created_at) -ORDER BY -( +ORDER BY ( report_id, type, + stamp, status, last_record_ts, last_record_commit, @@ -28,4 +29,4 @@ ORDER BY id ) TTL created_at + toIntervalYear(10) -SETTINGS index_granularity = 8192; +SETTINGS index_granularity = 8192 From 308616f30d4443f33c52cdf6576e0f91895d3b8d Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Thu, 4 Sep 2025 14:50:41 -0700 Subject: [PATCH 7/9] Update (base update) [ghstack-poisoned] --- .../common/benchmark_time_series_api_model.py | 23 +- .../common/config_model.py | 82 ++++- .../lambda_function.py | 348 +++++++++--------- 3 files changed, 264 insertions(+), 189 deletions(-) diff --git a/aws/lambda/benchmark_regression_summary_report/common/benchmark_time_series_api_model.py b/aws/lambda/benchmark_regression_summary_report/common/benchmark_time_series_api_model.py index fe7705a6ea..d8e54c7a12 100644 --- a/aws/lambda/benchmark_regression_summary_report/common/benchmark_time_series_api_model.py +++ b/aws/lambda/benchmark_regression_summary_report/common/benchmark_time_series_api_model.py @@ -1,5 +1,5 @@ from dataclasses import dataclass, field -from typing import Any, Dict, List +from typing import Any, Dict, List, Optional import requests @@ -62,3 +62,24 @@ def from_request( except Exception as e: raise RuntimeError(f"Malformed API payload: {e}") return cls(data=BenchmarkTimeSeriesApiData(time_series=ts, time_range=tr)) + + +def get_latest_meta_info( + time_series: List[BenchmarkTimeSeriesItem], +) -> Optional[dict[str, Any]]: + if not time_series: + return None + + pts = [p for s in time_series for p in s.data] + latest = max( + pts, + key=lambda p: dt.datetime.fromisoformat( + p["granularity_bucket"].replace("Z", "+00:00") + ), + ) + return { + "commit": latest.get("commit", ""), + "branch": latest.get("branch", ""), + "timestamp": latest.get("granularity_bucket", ""), + "workflow_id": latest.get("workflow_id", ""), + } diff --git a/aws/lambda/benchmark_regression_summary_report/common/config_model.py b/aws/lambda/benchmark_regression_summary_report/common/config_model.py index 7779f17f2d..c262b35939 100644 --- a/aws/lambda/benchmark_regression_summary_report/common/config_model.py +++ b/aws/lambda/benchmark_regression_summary_report/common/config_model.py @@ -3,8 +3,9 @@ import json from dataclasses import dataclass, field from datetime import timedelta -from typing import Any, Dict, Literal, Optional +from typing import Any, ClassVar, Dict, Literal, Optional +import requests from jinja2 import Environment, meta, Template @@ -35,8 +36,11 @@ def to_timedelta(self) -> timedelta: else: raise ValueError(f"Unsupported unit: {self.unit}") + def to_timedelta_s(self) -> int: + return int(self.to_timedelta().total_seconds()) + def get_text(self): - return f"{self.value} {self.unit}" + return f"{self.value}_{self.unit}" # -------- Source -------- @@ -96,12 +100,23 @@ class RangeConfig: def total_timedelta(self) -> timedelta: return timedelta(days=self.baseline.value + self.comparison.value) + def total_timedelta_s(self) -> int: + return int( + timedelta(days=self.baseline.value + self.comparison.value).total_seconds() + ) + def comparison_timedelta(self) -> timedelta: return timedelta(days=self.comparison.value) + def comparison_timedelta_s(self) -> int: + return int(self.comparison_timedelta().total_seconds()) + def baseline_timedelta(self) -> timedelta: return timedelta(days=self.baseline.value) + def baseline_timedelta_s(self) -> int: + return int(self.baseline_timedelta().total_seconds()) + # -------- Policy: metrics -------- @dataclass @@ -121,9 +136,7 @@ class RegressionPolicy: "greater_than", "less_than", "equal_to", "greater_equal", "less_equal" ] threshold: float - baseline_aggregation: Literal[ - "avg", "max", "min", "p50", "p90", "p95", "latest", "earliest" - ] = "max" + baseline_aggregation: Literal["max", "min", "latest", "earliest"] = "max" rel_tol: float = 1e-3 # used only for "equal_to" def is_violation(self, value: float, baseline: float) -> bool: @@ -154,13 +167,60 @@ def is_violation(self, value: float, baseline: float) -> bool: @dataclass -class Policy: - frequency: Frequency - range: RangeConfig - metrics: Dict[str, RegressionPolicy] +class BaseNotificationConfig: + # subclasses override this + type_tag: ClassVar[str] = "" + + @classmethod + def matches(cls, d: Dict[str, Any]) -> bool: + return d.get("type") == cls.type_tag - # TODO(elainewy): add notification config - notification_config: Optional[Dict[str, Any]] = None + +@dataclass +class GitHubNotificationConfig(BaseNotificationConfig): + type_tag: ClassVar[str] = "github" + + # actual fields + type: str = "github" + repo: str = "" # e.g. "owner/repo" + issue_number: str = "" # store as str for simplicity + + @classmethod + def from_dict(cls, d: Dict[str, Any]) -> "GitHubNotificationConfig": + # support 'issue' alias + issue = d.get("issue_number") or d.get("issue") or "" + return cls( + type="github", + repo=d.get("repo", ""), + issue_number=str(issue), + ) + + def create_github_comment(self, body: str, github_token: str) -> Dict[str, Any]: + url = f"https://api.github.com/repos/{self.repo}/issues/{self.issue_number}/comments" + headers = { + "Authorization": f"token {github_token}", + "Accept": "application/vnd.github+json", + "User-Agent": "bench-reporter/1.0", + } + resp = requests.post(url, headers=headers, json={"body": body}) + resp.raise_for_status() + return resp.json() + + +@dataclass +class Policy: + frequency: "Frequency" + range: "RangeConfig" + metrics: Dict[str, "RegressionPolicy"] + + notification_config: Optional[dict[str, Any]] = None + + def get_github_notification_config(self) -> Optional[GitHubNotificationConfig]: + if not self.notification_config: + return None + if self.notification_config.get("type") != "github": + return None + return GitHubNotificationConfig.from_dict(self.notification_config) # -------- Top-level benchmark regression config -------- diff --git a/aws/lambda/benchmark_regression_summary_report/lambda_function.py b/aws/lambda/benchmark_regression_summary_report/lambda_function.py index bfc5ae4fa0..e54cef3dae 100644 --- a/aws/lambda/benchmark_regression_summary_report/lambda_function.py +++ b/aws/lambda/benchmark_regression_summary_report/lambda_function.py @@ -6,18 +6,22 @@ import os import threading import time -from concurrent.futures import as_completed, ThreadPoolExecutor from typing import Any, Optional import clickhouse_connect import requests -from common.benchmark_time_series_api_model import BenchmarkTimeSeriesApiResponse +from common.benchmark_time_series_api_model import ( + BenchmarkTimeSeriesApiResponse, + get_latest_meta_info, +) from common.config import get_benchmark_regression_config from common.config_model import BenchmarkApiSource, BenchmarkConfig, Frequency from common.regression_utils import BenchmarkRegressionReportGenerator from dateutil.parser import isoparse +BENCHMARK_REGRESSION_REPORT_TABLE = "fortesting.benchmark_regression_report" + logging.basicConfig( level=logging.INFO, ) @@ -25,17 +29,22 @@ logger.setLevel("INFO") ENVS = { - "GITHUB_ACCESS_TOKEN": os.getenv("GITHUB_ACCESS_TOKEN", ""), + "GITHUB_TOKEN": os.getenv("GITHUB_TOKEN", ""), "CLICKHOUSE_ENDPOINT": os.getenv("CLICKHOUSE_ENDPOINT", ""), "CLICKHOUSE_PASSWORD": os.getenv("CLICKHOUSE_PASSWORD", ""), "CLICKHOUSE_USERNAME": os.getenv("CLICKHOUSE_USERNAME", ""), } # TODO(elainewy): change this to benchmark.benchmark_regression_report once the table is created -BENCHMARK_REGRESSION_REPORT_TABLE = "fortesting.benchmark_regression_report" BENCHMARK_REGRESSION_TRACKING_CONFIG_IDS = ["compiler_regression"] +def format_ts_with_t(ts: int) -> str: + return dt.datetime.fromtimestamp(ts, tz=dt.timezone.utc).strftime( + "%Y-%m-%dT%H:%M:%S" + ) + + def truncate_to_hour(ts: dt.datetime) -> dt.datetime: return ts.replace(minute=0, second=0, microsecond=0) @@ -64,33 +73,31 @@ def get_clickhouse_client_environment() -> clickhouse_connect.driver.client.Clie ) -BENCHMARK_REGRESSION_SUMMARY_REPORT_TABLE = ( - "fortesting.benchmark_regression_summary_report" -) - - class BenchmarkSummaryProcessor: def __init__( self, + config_id: str, + end_time: int, is_dry_run: bool = False, ) -> None: self.is_dry_run = is_dry_run + self.config_id = config_id + self.end_time = end_time + + def log_info(self, msg: str): + logger.info("[%s][%s] %s", self.end_time, self.config_id, msg) + + def log_error(self, msg: str): + logger.error("[%s][%s] %s", self.end_time, self.config_id, msg) def process( self, - config_id: str, - end_time: dt.datetime, cc: Optional[clickhouse_connect.driver.client.Client] = None, args: Optional[argparse.Namespace] = None, ): - def log_info(msg: str): - logger.info("[%s] %s", config_id, msg) - - def log_error(msg: str): - logger.error("[%s] %s", config_id, msg) - # ensure each thread has its own clickhouse client. clickhouse client # is not thread-safe. + self.log_info("start process, getting clickhouse client") if cc is None: tlocal = threading.local() if not hasattr(tlocal, "cc") or tlocal.cc is None: @@ -103,40 +110,53 @@ def log_error(msg: str): else: tlocal.cc = get_clickhouse_client_environment() cc = tlocal.cc + self.log_info("done. got clickhouse client") try: - config = get_benchmark_regression_config(config_id) - log_info(f"found config for config_id {config_id}") + config = get_benchmark_regression_config(self.config_id) + self.log_info(f"found config with config_id: `{self.config_id}`") except ValueError as e: - log_error(f"Skip process, Invalid config: {e}") + self.log_error(f"Skip process, Invalid config: {e}") return except Exception as e: - log_error(f"Unexpected error from get_benchmark_regression_config: {e}") + self.log_error( + f"Unexpected error from get_benchmark_regression_config: {e}" + ) return # check if the current time is > policy's time_delta + previous record_ts from summary_table report_freq = config.policy.frequency + should_generate = self._should_generate_report( - cc, end_time, config_id, report_freq + cc, self.end_time, self.config_id, report_freq ) if not should_generate: - log_info( - f"Skip generate report for time:{end_time} with frequency {report_freq.get_text()}, no data found", + self.log_info( + "Skip generate report", ) return else: - log_info( - f"Plan to generate report for time:{end_time} with frequency {report_freq.get_text()}..." + self.log_info( + f"Plan to generate report for time: {format_ts_with_t(self.end_time)} " + f"with frequency {report_freq.get_text()}..." ) - latest, ls, le = self.get_latest(config, end_time) + + self.log_info("get latest data") + latest, ls, le = self.get_latest(config, self.end_time) if not latest: - log_info( + self.log_info( f"no latest data found for time range [{ls},{le}] with frequency {report_freq.get_text()}..." ) return - baseline, bs, be = self.get_baseline(config, end_time) + latest_meta_info = get_latest_meta_info(latest.time_series) + if not latest_meta_info: + self.log_error("no meta info found for latest data") + return + self.log_info(f"latest data info: {latest_meta_info}") + + baseline, bs, be = self.get_baseline(config, self.end_time) if not baseline: - log_info( + self.log_info( f"no baseline data found for time range [{bs},{be}] with frequency {report_freq.get_text()}..." ) return @@ -150,32 +170,37 @@ def log_error(msg: str): print(json.dumps(result, indent=2, default=str)) return - def get_latest(self, config: BenchmarkConfig, end_time: dt.datetime): + def get_latest(self, config: BenchmarkConfig, end_time: int): data_range = config.policy.range - latest_s = end_time - data_range.comparison_timedelta() + latest_s = end_time - data_range.comparison_timedelta_s() latest_e = end_time + self.log_info( + f"get baseline data for time range [{format_ts_with_t(latest_s)},{format_ts_with_t(latest_e)}]" + ) latest_data = self._fetch_from_benchmark_ts_api( config_id=config.id, start_time=latest_s, end_time=latest_e, source=config.source, ) - logger.info( - "[%s] found %s # of data, with time range %s", - config.id, - len(latest_data.time_series), - latest_data.time_range, + self.log_info( + f"found {len(latest_data.time_series)} # of data, with time range {latest_data.time_range}", ) if not latest_data.time_range or not latest_data.time_range.end: return None, latest_s, latest_e - if not self.should_use_data(config.id, latest_data.time_range.end, end_time): + + latest_ts = int(isoparse(latest_data.time_range.end).timestamp()) + if not self.should_use_data(config.id, latest_ts, end_time): return None, latest_s, latest_e return latest_data, latest_s, latest_e - def get_baseline(self, config: BenchmarkConfig, end_time: dt.datetime): + def get_baseline(self, config: BenchmarkConfig, end_time: int): data_range = config.policy.range - baseline_s = end_time - data_range.total_timedelta() - baseline_e = end_time - data_range.comparison_timedelta() + baseline_s = end_time - data_range.total_timedelta_s() + baseline_e = end_time - data_range.comparison_timedelta_s() + self.log_info( + f"get baseline data for time range [{format_ts_with_t(baseline_s)},{format_ts_with_t(baseline_e)}]" + ) # fetch baseline from api raw_data = self._fetch_from_benchmark_ts_api( config_id=config.id, @@ -184,17 +209,19 @@ def get_baseline(self, config: BenchmarkConfig, end_time: dt.datetime): source=config.source, ) - logger.info( - "found %s # of data, with time range %s", - len(raw_data.time_series), - raw_data.time_range, + self.log_info( + f"get baseline data for time range [{format_ts_with_t(baseline_s)},{format_ts_with_t(baseline_e)}]" ) - if not self.should_use_data(config.id, raw_data.time_range.end, baseline_e): - logger.info( - "[%s][get_basline] Skip generate report, no data found during [%s,%s]", - config.id, - baseline_s.isoformat(), - baseline_e.isoformat(), + + self.log_info( + f"found {len(raw_data.time_series)} # of data, with time range {raw_data.time_range}", + ) + + baseline_latest_ts = int(isoparse(raw_data.time_range.end).timestamp()) + + if not self.should_use_data(config.id, baseline_latest_ts, baseline_e): + self.log_info( + f"[get_basline] Skip generate report, no data found during [{format_ts_with_t(baseline_s)},{format_ts_with_t(baseline_e)}]" ) return None, baseline_s, baseline_e return raw_data, baseline_s, baseline_e @@ -202,38 +229,33 @@ def get_baseline(self, config: BenchmarkConfig, end_time: dt.datetime): def should_use_data( self, config_id: str, - latest_ts_str: str, - end_time: dt.datetime, + latest_ts: int, + end_time: int, min_delta: Optional[dt.timedelta] = None, ) -> bool: # set default if not min_delta: min_delta = dt.timedelta(days=2) - if not latest_ts_str: + if not latest_ts: return False - latest_dt = isoparse(latest_ts_str) - cutoff = end_time - min_delta - if latest_dt >= cutoff: + cutoff = end_time - min_delta.total_seconds() + + if latest_ts >= cutoff: return True - logger.info( - "[%s] expect latest data to be after %s, but got %s", - config_id, - cutoff, - latest_dt, - ) + self.log_info(f"expect latest data to be after {cutoff}, but got {latest_ts}") return False def _fetch_from_benchmark_ts_api( self, config_id: str, - end_time: dt.datetime, - start_time: dt.datetime, + end_time: int, + start_time: int, source: BenchmarkApiSource, ): - str_end_time = end_time.strftime("%Y-%m-%dT%H:%M:%S") - str_start_time = start_time.strftime("%Y-%m-%dT%H:%M:%S") + str_end_time = format_ts_with_t(end_time) + str_start_time = format_ts_with_t(start_time) query = source.render( ctx={ "startTime": str_start_time, @@ -242,7 +264,7 @@ def _fetch_from_benchmark_ts_api( ) url = source.api_query_url - logger.info("[%s]trying to call %s", config_id, url) + self.log_info(f"trying to call {url}") t0 = time.perf_counter() try: resp: BenchmarkTimeSeriesApiResponse = ( @@ -270,136 +292,99 @@ def _fetch_from_benchmark_ts_api( if (e.response is not None and hasattr(e.response, "text")) else str(e) ) - logger.error( - "[%s] call FAILED in %.1f ms: %s", config_id, elapsed_ms, err_msg + self.log_error( + f"[{config_id}] call FAILED in {elapsed_ms} ms: {err_msg}", ) raise except Exception as e: elapsed_ms = (time.perf_counter() - t0) * 1000.0 - logger.error("[%s] call CRASHED in %.1f ms", config_id, elapsed_ms) + self.log_error(f"call CRASHED in {elapsed_ms} ms: {e}") raise RuntimeError(f"[{config_id}]Fetch failed: {e}") def _should_generate_report( self, cc: clickhouse_connect.driver.client.Client, - end_time: dt.datetime, + end_time: int, config_id: str, f: Frequency, ) -> bool: def _get_latest_record_ts( cc: clickhouse_connect.driver.Client, config_id: str, - ) -> Optional[dt.datetime]: + ) -> Optional[int]: table = BENCHMARK_REGRESSION_REPORT_TABLE res = cc.query( f""" - SELECT max(last_record_ts) + SELECT toUnixTimestamp(max(last_record_ts)) FROM {table} WHERE report_id = {{config_id:String}} """, parameters={"config_id": config_id}, ) + if not res.result_rows or res.result_rows[0][0] is None: return None - latest: dt.datetime = res.result_rows[0][ - 0 - ] # typically tz-aware UTC from clickhouse_connect - # If not tz-aware, force UTC: - if latest.tzinfo is None: - latest = latest.replace(tzinfo=dt.timezone.utc) - return latest - - freq_delta = f.to_timedelta() - latest_record_ts = _get_latest_record_ts(cc, config_id) + return int(res.result_rows[0][0]) + freq_delta = f.to_timedelta_s() + latest_record_ts = _get_latest_record_ts(cc, config_id) # No report exists yet, generate if not latest_record_ts: + self.log_info( + f"no latest record ts from db for the config_id, got {latest_record_ts}" + ) return True + self.log_info(f"found latest record ts from db {latest_record_ts}") + time_boundary = latest_record_ts + freq_delta + should_generate = end_time > time_boundary - end_utc = ( - end_time if end_time.tzinfo else end_time.replace(tzinfo=dt.timezone.utc) - ) - end_utc = end_utc.astimezone(dt.timezone.utc) - cutoff = end_time - freq_delta - return latest_record_ts < cutoff - - -class WorkerPoolHandler: - """ - WorkerPoolHandler runs workers in parallel to generate benchmark regression report - and writes the results to the target destination. - - """ - - def __init__( - self, - benchmark_summary_processor: BenchmarkSummaryProcessor, - max_workers: int = 6, - ): - self.benchmark_summary_processor = benchmark_summary_processor - self.max_workers = max_workers - - def start( - self, - config_ids: list[str], - args: Optional[argparse.Namespace] = None, - ) -> None: - logger.info( - "[WorkerPoolHandler] start to process benchmark " - "summary data with required config: %s", - config_ids, - ) - end_time = dt.datetime.now(dt.timezone.utc).replace( - minute=0, second=0, microsecond=0 - ) - logger.info("current time with hour granularity(utc) %s", end_time) - with ThreadPoolExecutor(max_workers=self.max_workers) as executor: - futures = [] - for config_id in config_ids: - future = executor.submit( - self.benchmark_summary_processor.process, - config_id, - end_time, - cc=None, - args=args, - ) - futures.append(future) - results = [] - errors = [] - - # handle results from parallel processing - for future in as_completed(futures): - try: - result = future.result() - # This will raise an exception if one occurred - results.append(result) - except Exception as e: - logger.warning(f"Error processing future: {e}") - errors.append({"error": str(e)}) + if not should_generate: + self.log_info( + f"[{f.get_text()}] skip generate report. end_time({format_ts_with_t(end_time)}) must greater than " + "time_boundary(format_ts_with_t(time_boundary)) based on latest_record_ts(format_ts_with_t(latest_record_ts))", + ) + else: + self.log_info( + f"[{f.get_text()}]plan to generate report. . end_time({format_ts_with_t(end_time)}) is greater than " + "time_boundary(format_ts_with_t(time_boundary)) based on latest_record_ts(format_ts_with_t(latest_record_ts))", + ) + return should_generate def main( - args: Optional[argparse.Namespace] = None, + config_id: str, github_access_token: str = "", + args: Optional[argparse.Namespace] = None, + *, is_dry_run: bool = False, ): - """ - Main method to run in both local environment and lambda handler. - 1. generate intervals[start_time,end_time] using latest timestamp from source table and target table - 2. call WorkerPoolHandler to geneterate and write histogram data for each interval in parallel - """ if not github_access_token: - raise ValueError("Missing environment variable GITHUB_ACCESS_TOKEN") + raise ValueError("Missing environment variable GITHUB_TOKEN") - # get time intervals. - logger.info("[Main] start work ....") + if not config_id: + raise ValueError("Missing required parameter: config_id") - # get jobs in queue from clickhouse for list of time intervals, in parallel - handler = WorkerPoolHandler( - BenchmarkSummaryProcessor(is_dry_run=is_dry_run), + end_time = dt.datetime.now(dt.timezone.utc).replace( + minute=0, second=0, microsecond=0 ) - handler.start(BENCHMARK_REGRESSION_TRACKING_CONFIG_IDS, args) + end_time_ts = int(end_time.timestamp()) + logger.info( + "[Main] current time with hour granularity(utc) %s with unix timestamp %s", + end_time, + end_time_ts, + ) + logger.info("[Main] start work ....") + + # caution, raise exception may lead lambda to retry + try: + processor = BenchmarkSummaryProcessor( + config_id=config_id, end_time=end_time_ts, is_dry_run=is_dry_run + ) + processor.process(args=args) + except Exception as e: + logger.error(f"[Main] failed to process config_id {config_id}, error: {e}") + raise logger.info(" [Main] Done. work completed.") @@ -407,9 +392,13 @@ def lambda_handler(event: Any, context: Any) -> None: """ Main method to run in aws lambda environment """ + config_id = event.get("config_id") + if not config_id: + raise ValueError("Missing required parameter: config_id") + main( - None, - github_access_token=ENVS["GITHUB_ACCESS_TOKEN"], + config_id=config_id, + github_access_token=ENVS["GITHUB_TOKEN"], ) return @@ -419,6 +408,23 @@ def parse_args() -> argparse.Namespace: Parse command line args, this is mainly used for local test environment. """ parser = argparse.ArgumentParser() + parser.add_argument( + "--dry-run", + dest="dry_run", + action="store_true", + help="Enable dry-run mode", + ) + parser.add_argument( + "--no-dry-run", + dest="dry_run", + action="store_false", + help="Disable dry-run mode", + ) + parser.add_argument( + "--config-id", + type=str, + help="the config id to run", + ) parser.add_argument( "--clickhouse-endpoint", default=ENVS["CLICKHOUSE_ENDPOINT"], @@ -441,16 +447,10 @@ def parse_args() -> argparse.Namespace: parser.add_argument( "--github-access-token", type=str, - default=ENVS["GITHUB_ACCESS_TOKEN"], + default=ENVS["GITHUB_TOKEN"], help="the github access token to access github api", ) - parser.add_argument( - "--not-dry-run", - action="store_true", - help="when set, writing results to destination from local " - + "environment. By default, we run in dry-run mode for local " - + "environment", - ) + parser.set_defaults(dry_run=True) # default is True args, _ = parser.parse_known_args() return args @@ -461,18 +461,12 @@ def local_run() -> None: """ args = parse_args() - - logger.info("args: %s", args) - # update environment variables for input parameters - - # always run in dry-run mode in local environment, unless it's disabled. - is_dry_run = not args.not_dry_run - main( - args, - args.github_access_token, - is_dry_run=is_dry_run, + config_id=args.config_id, + github_access_token=args.github_access_token, + args=args, + is_dry_run=args.dry_run, ) From d72bbab2db4526c6bf0aaecb82dc1bdbd1f689f8 Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Thu, 4 Sep 2025 17:08:56 -0700 Subject: [PATCH 8/9] Update (base update) [ghstack-poisoned] --- .../common/regression_utils.py | 63 ++++++++++--------- 1 file changed, 35 insertions(+), 28 deletions(-) diff --git a/aws/lambda/benchmark_regression_summary_report/common/regression_utils.py b/aws/lambda/benchmark_regression_summary_report/common/regression_utils.py index 0a2e4c1108..b0dc76e9ce 100644 --- a/aws/lambda/benchmark_regression_summary_report/common/regression_utils.py +++ b/aws/lambda/benchmark_regression_summary_report/common/regression_utils.py @@ -1,6 +1,5 @@ import datetime as dt import logging -from dataclasses import dataclass from typing import Any, Counter, Dict, List, Literal, Optional, TypedDict from common.benchmark_time_series_api_model import ( @@ -30,7 +29,6 @@ class TimeSeriesMetaInfo(TypedDict): end: TimeSeriesDataMetaInfo -@dataclass class BenchmarkRegressionSummary(TypedDict): total_count: int regression_count: int @@ -40,20 +38,28 @@ class BenchmarkRegressionSummary(TypedDict): is_regression: int +class BenchmarkRegressionPoint(TypedDict): + value: float + commit: str + branch: str + workflow_id: str + timestamp: str + + class BaselineResult(TypedDict): group_info: Dict[str, Any] - orignal_item: Dict[str, Any] + original_point: BenchmarkRegressionPoint value: float -class BenchmarkValueItem(TypedDict): +class BenchmarkRegressionPointGroup(TypedDict): group_info: Dict[str, Any] - values: List[Dict[str, Any]] + values: List[BenchmarkRegressionPoint] class PerGroupResult(TypedDict, total=True): group_info: Dict[str, Any] - baseline_item: Optional[Dict[str, Any]] + baseline_point: Optional[BenchmarkRegressionPoint] points: List[Any] label: RegressionClassifyLabel policy: Optional["RegressionPolicy"] @@ -103,8 +109,8 @@ def generate(self) -> BenchmarkRegressionReport: def detect_regressions_with_policies( self, - baseline_map: Dict[tuple, BenchmarkValueItem], - dp_map: Dict[tuple, BenchmarkValueItem], + baseline_map: Dict[tuple, BenchmarkRegressionPointGroup], + dp_map: Dict[tuple, BenchmarkRegressionPointGroup], *, metric_policies: Dict[str, RegressionPolicy], min_points: int = 2, @@ -131,7 +137,7 @@ def detect_regressions_with_policies( results.append( PerGroupResult( group_info=gi, - baseline_item=None, + baseline_point=None, points=[], label="insufficient_data", policy=None, @@ -144,7 +150,7 @@ def detect_regressions_with_policies( results.append( PerGroupResult( group_info=gi, - baseline_item=None, + baseline_point=None, points=[], label="insufficient_data", policy=None, @@ -155,18 +161,18 @@ def detect_regressions_with_policies( baseline_result = self._get_baseline(base_item, baseline_aggre_mode) if ( not baseline_result - or not baseline_result["orignal_item"] + or not baseline_result["original_point"] or len(points) == 0 ): logger.warning( - "No valid baseline result found, baseline_item is %s, len(points) == %s", + "No valid baseline result found, baseline_point is %s, len(points) == %s", baseline_result, len(points), ) results.append( PerGroupResult( group_info=gi, - baseline_item=None, + baseline_point=None, points=[], label="insufficient_data", policy=policy, @@ -174,7 +180,7 @@ def detect_regressions_with_policies( ) continue - orignal_baseline_obj = baseline_result["orignal_item"] + orignal_baseline_obj = baseline_result["original_point"] # Per-point violations (True = regression) flags: List[bool] = [ @@ -187,7 +193,7 @@ def detect_regressions_with_policies( results.append( PerGroupResult( group_info=gi, - baseline_item=orignal_baseline_obj, + baseline_point=orignal_baseline_obj, points=enriched_points, label=label, policy=policy, @@ -229,24 +235,25 @@ def _label_str(self, x) -> str: def _to_data_map( self, data: "BenchmarkTimeSeriesApiData", field: str = "value" - ) -> Dict[tuple, BenchmarkValueItem]: - result: Dict[tuple, BenchmarkValueItem] = {} + ) -> Dict[tuple, BenchmarkRegressionPointGroup]: + result: Dict[tuple, BenchmarkRegressionPointGroup] = {} for ts_group in data.time_series: group_keys = tuple(sorted(ts_group.group_info.items())) - points: List[Dict[str, Any]] = [] + points: List[BenchmarkRegressionPoint] = [] for d in sorted( ts_group.data, key=lambda d: isoparse(d["granularity_bucket"]) ): if field not in d: continue - points.append( - { - "value": float(d[field]), - "commit": d.get("commit"), - "branch": d.get("branch"), - "timestamp": isoparse(d["granularity_bucket"]), - } - ) + + p: BenchmarkRegressionPoint = { + "value": float(d[field]), + "commit": d.get("commit", ""), + "branch": d.get("branch", ""), + "workflow_id": d.get("workflow_id", ""), + "timestamp": d.get("granularity_bucket", ""), + } + points.append(p) result[group_keys] = { "group_info": ts_group.group_info, "values": points, @@ -255,7 +262,7 @@ def _to_data_map( def _get_baseline( self, - data: BenchmarkValueItem, + data: BenchmarkRegressionPointGroup, mode: str = "max", field: str = "value", ) -> Optional[BaselineResult]: @@ -282,7 +289,7 @@ def _get_baseline( result: BaselineResult = { "group_info": data["group_info"], "value": float(baseline_obj[field]), - "orignal_item": baseline_obj, + "original_point": baseline_obj, } return result From 06223fd244734192e9429a71bd5ad6945f9a9736 Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Fri, 5 Sep 2025 23:32:59 -0700 Subject: [PATCH 9/9] Update [ghstack-poisoned] --- .../common/report_manager.py | 126 ++++++++++++------ 1 file changed, 83 insertions(+), 43 deletions(-) diff --git a/aws/lambda/benchmark_regression_summary_report/common/report_manager.py b/aws/lambda/benchmark_regression_summary_report/common/report_manager.py index 88482e3985..ce43792926 100644 --- a/aws/lambda/benchmark_regression_summary_report/common/report_manager.py +++ b/aws/lambda/benchmark_regression_summary_report/common/report_manager.py @@ -120,11 +120,30 @@ def insert_to_db( "repo": self.repo, "report_json": report_json, } + + if self.is_dry_run: + logger.info( + "[%s]dry run, skip inserting report to db, report(%s)", + self.config_id, + self.id, + ) + logger.info("[dry run] printing db params data") + if self.is_dry_run: + print(json.dumps(params, indent=2, default=str)) + logger.info("[dry run] Done! Finish printing db params data") + return logger.info( "[%s]inserting benchmark regression report(%s)", self.config_id, self.id ) - self._db_insert(cc, self.db_table_name, params) - + try: + self._db_insert(cc, self.db_table_name, params) + except Exception: + logger.exception( + "[%s] failed to insert report to target table %s", + self.config_id, + self.db_table_name, + ) + raise logger.info( "[%s] Done. inserted benchmark regression report(%s)", self.config_id, @@ -136,14 +155,28 @@ def _db_insert( cc: clickhouse_connect.driver.Client, table: str, params: dict, - ) -> tuple[bool, int]: + ): + """ + Insert one row into ClickHouse using cc.insert(). + Returns (inserted, written_rows). + """ + if self._row_exists( + cc, + table, + params["report_id"], + params["type"], + params["repo"], + params["last_record_ts"], + ): + return False, 0 + sql = f""" INSERT INTO {table} ( id, report_id, last_record_ts, last_record_commit, - `type`, + type, status, regression_count, insufficient_data_count, @@ -152,49 +185,56 @@ def _db_insert( repo, report ) - SELECT - {{id:UUID}}, - {{report_id:String}}, - {{last_record_ts:DateTime64(0)}}, - {{last_record_commit:String}}, - {{type:String}}, - {{status:String}}, - {{regression_count:UInt32}}, - {{insufficient_data_count:UInt32}}, - {{suspected_regression_count:UInt32}}, - {{total_count:UInt32}}, - {{repo:String}}, - {{report_json:String}} - FROM system.one - WHERE NOT EXISTS ( - SELECT 1 - FROM {table} - WHERE report_id = {{report_id:String}} - AND `type` = {{type:String}} - AND repo = {{repo:String}} - AND stamp = toDate({{last_record_ts:DateTime64(0)}}) + VALUES + ( + %(id)s, + %(report_id)s, + %(last_record_ts)s, + %(last_record_commit)s, + %(type)s, + %(status)s, + %(regression_count)s, + %(insufficient_data_count)s, + %(suspected_regression_count)s, + %(total_count)s, + %(repo)s, + %(report_json)s ) + """ + cc.command(sql, parameters=params) + + def _row_exists( + self, + cc: clickhouse_connect.driver.Client, + table: str, + report_id: str, + type_str: str, + repo: str, + last_record_ts, + ) -> bool: + """ + Check if a row already exists with the same (report_id, type, repo, stamp). + Returns True if found, False otherwise. + """ + sql = f""" + SELECT 1 + FROM {table} + WHERE report_id = %(report_id)s + AND type = %(type)s + AND repo = %(repo)s + AND stamp = toDate(%(last_record_ts)s) LIMIT 1 """ - - res = cc.query(sql, parameters=params) - summary = getattr(res, "summary", {}) or {} - - written_any = ( - summary.get("written_rows") - or summary.get("rows_written") - or summary.get("written", 0) - or 0 + res = cc.query( + sql, + parameters={ + "report_id": report_id, + "type": type_str, + "repo": repo, + "last_record_ts": last_record_ts, + }, ) - - logger.info("wrting to db summmary %s", summary) - try: - written = int(written_any) - except (TypeError, ValueError): - written = 0 - - inserted = written > 0 - return inserted, written + return bool(res.result_rows) def _validate_latest_meta_info( self, latest_meta_info: Dict[str, Any]