From 9debbecdb8a5806e6e908befbf3a48422f267428 Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Wed, 3 Sep 2025 14:51:03 -0700 Subject: [PATCH 1/3] Update [ghstack-poisoned] --- .../.gitignore | 3 + .../common/benchmark_time_series_api_model.py | 61 +++ .../common/config.py | 81 ++++ .../common/config_model.py | 224 ++++++++++ .../lambda_function.py | 416 ++++++++++++++++++ .../requirements.txt | 5 + .../schema.sql | 28 ++ 7 files changed, 818 insertions(+) create mode 100644 aws/lambda/benchmark_regression_summary_report/.gitignore create mode 100644 aws/lambda/benchmark_regression_summary_report/common/benchmark_time_series_api_model.py create mode 100644 aws/lambda/benchmark_regression_summary_report/common/config.py create mode 100644 aws/lambda/benchmark_regression_summary_report/common/config_model.py create mode 100644 aws/lambda/benchmark_regression_summary_report/lambda_function.py create mode 100644 aws/lambda/benchmark_regression_summary_report/requirements.txt create mode 100644 clickhouse_db_schema/benchmark_regression_summary_report/schema.sql diff --git a/aws/lambda/benchmark_regression_summary_report/.gitignore b/aws/lambda/benchmark_regression_summary_report/.gitignore new file mode 100644 index 0000000000..bd92f6376a --- /dev/null +++ b/aws/lambda/benchmark_regression_summary_report/.gitignore @@ -0,0 +1,3 @@ +*.zip +deployment/ +venv/ diff --git a/aws/lambda/benchmark_regression_summary_report/common/benchmark_time_series_api_model.py b/aws/lambda/benchmark_regression_summary_report/common/benchmark_time_series_api_model.py new file mode 100644 index 0000000000..24d8a8ec62 --- /dev/null +++ b/aws/lambda/benchmark_regression_summary_report/common/benchmark_time_series_api_model.py @@ -0,0 +1,61 @@ +from dataclasses import dataclass, field +from typing import Optional, List, Dict, Any +import requests + +# The data class to provide api response model from get_time_series api + +@dataclass +class TimeRange: + start: str + end: str + + +@dataclass +class BenchmarkTimeSeriesItem: + group_info: Dict[str, Any] + num_of_dp: int + data: List[Dict[str, Any]] = field(default_factory=list) + + +@dataclass +class BenchmarkTimeSeriesApiData: + time_series: List[BenchmarkTimeSeriesItem] + time_range: TimeRange + + +@dataclass +class BenchmarkTimeSeriesApiResponse: + data: BenchmarkTimeSeriesApiData + + @classmethod + def from_request( + cls, url: str, query: dict, timeout: int = 180 + ) -> "BenchmarkTimeSeriesApiResponse": + """ + Send a POST request and parse into BenchmarkTimeSeriesApiResponse. + + Args: + url: API endpoint + query: JSON payload must + timeout: max seconds to wait for connect + response (default: 30) + Returns: + ApiResponse + Raises: + requests.exceptions.RequestException if network/timeout/HTTP error + RuntimeError if the API returns an "error" field or malformed data + """ + resp = requests.post(url, json=query, timeout=timeout) + resp.raise_for_status() + payload = resp.json() + + if "error" in payload: + raise RuntimeError(f"API error: {payload['error']}") + try: + tr = TimeRange(**payload["data"]["time_range"]) + ts = [ + BenchmarkTimeSeriesItem(**item) + for item in payload["data"]["time_series"] + ] + except Exception as e: + raise RuntimeError(f"Malformed API payload: {e}") + return cls(data=BenchmarkTimeSeriesApiData(time_series=ts, time_range=tr)) diff --git a/aws/lambda/benchmark_regression_summary_report/common/config.py b/aws/lambda/benchmark_regression_summary_report/common/config.py new file mode 100644 index 0000000000..eb3573f10d --- /dev/null +++ b/aws/lambda/benchmark_regression_summary_report/common/config.py @@ -0,0 +1,81 @@ +from common.config_model import ( + BenchmarkApiSource, + BenchmarkConfig, + BenchmarkRegressionConfigBook, + DayRangeWindow, + Frequency, + RegressionPolicy, + Policy, + RangeConfig, +) + +# Compiler benchmark regression config +# todo(elainewy): eventually each team should configure their own benchmark regression config, currenlty place here for lambda + + +COMPILER_BENCHMARK_CONFIG = BenchmarkConfig( + name="Compiler Benchmark Regression", + id="compiler_regression", + source=BenchmarkApiSource( + api_query_url="http://localhost:3000/api/benchmark/get_time_series", + type="benchmark_time_series_api", + # currently we only detect the regression for h100 with dtype bfloat16, and mode inference + # we can extend this to other devices, dtypes and mode in the future + api_endpoint_params_template=""" + { + "name": "compiler_precompute", + "query_params": { + "commits": [], + "compilers": [], + "arch": "h100", + "device": "cuda", + "dtype": "bfloat16", + "granularity": "hour", + "mode": "inference", + "startTime": "{{ startTime }}", + "stopTime": "{{ stopTime }}", + "suites": ["torchbench", "huggingface", "timm_models"], + "workflowId": 0, + "branches": ["main"] + } + } + """, + ), + # set baseline from past 7 days using avg, and compare with the last 1 day + policy=Policy( + frequency=Frequency(value=1, unit="days"), + range=RangeConfig( + baseline=DayRangeWindow(value=7), + comparison=DayRangeWindow(value=2), + ), + metrics={ + "passrate": RegressionPolicy( + name="passrate", condition="greater_equal", threshold=0.9, baseline_aggregation="max", + ), + "geomean": RegressionPolicy( + name="geomean", condition="greater_equal", threshold=0.95,baseline_aggregation="max", + ), + "compression_ratio": RegressionPolicy( + name="compression_ratio", condition="greater_equal", threshold=0.9, baseline_aggregation="max", + ), + }, + notification_config={ + "type": "github", + "repo": "pytorch/test-infra", + "issue": "7081", + }, + ), +) + +BENCHMARK_REGRESSION_CONFIG = BenchmarkRegressionConfigBook( + configs={ + "compiler_regression": COMPILER_BENCHMARK_CONFIG, + } +) + +def get_benchmark_regression_config(config_id: str) -> BenchmarkConfig: + """Get benchmark regression config by config id""" + try: + return BENCHMARK_REGRESSION_CONFIG[config_id] + except KeyError: + raise ValueError(f"Invalid config id: {config_id}") diff --git a/aws/lambda/benchmark_regression_summary_report/common/config_model.py b/aws/lambda/benchmark_regression_summary_report/common/config_model.py new file mode 100644 index 0000000000..59c2f86d9a --- /dev/null +++ b/aws/lambda/benchmark_regression_summary_report/common/config_model.py @@ -0,0 +1,224 @@ +from __future__ import annotations +from dataclasses import dataclass, field, fields +from typing import Any, ClassVar, Dict, Literal, Optional, Set, Type, Union +from datetime import datetime, timedelta +from jinja2 import Environment, Template, meta +import requests +import json + + +# -------- Frequency -------- +@dataclass(frozen=True) +class Frequency: + """ + The frequency of how often the report should be generated. + The minimum frequency we support is 1 day. + Attributes: + value: Number of units (e.g., 7 for 7 days). + unit: Unit of time, either "days" or "weeks". + + Methods: + to_timedelta: Convert frequency into a datetime.timedelta. + get_text: return the frequency in text format + """ + value: int + unit: Literal["days", "weeks"] + def to_timedelta(self) -> timedelta: + """Convert frequency N days or M weeks into a datetime.timedelta.""" + if self.unit == "days": + return timedelta(days=self.value) + elif self.unit == "weeks": + return timedelta(weeks=self.value) + else: + raise ValueError(f"Unsupported unit: {self.unit}") + + def get_text(self): + return f"{self.value} {self.unit}" + + +# -------- Source -------- +_JINJA_ENV = Environment(autoescape=False) + +@dataclass +class BenchmarkApiSource: + """ + Defines the source of the benchmark data we want to query + api_query_url: the url of the api to query + api_endpoint_params_template: the jinjia2 template of the api endpoint's query params + default_ctx: the default context to use when rendering the api_endpoint_params_template + """ + api_query_url: str + api_endpoint_params_template: str + type: Literal["benchmark_time_series_api", "other"] = "benchmark_time_series_api" + default_ctx: Dict[str, Any] = field(default_factory=dict) + + def required_template_vars(self) -> set[str]: + ast = _JINJA_ENV.parse(self.api_endpoint_params_template) + return set(meta.find_undeclared_variables(ast)) + + def render(self, ctx: Dict[str, Any], strict: bool = True) -> dict: + """Render with caller-supplied context (no special casing for start/end).""" + merged = {**self.default_ctx, **ctx} + + if strict: + required = self.required_template_vars() + missing = required - merged.keys() + if missing: + raise ValueError(f"Missing required vars: {missing}") + rendered = Template(self.api_endpoint_params_template).render(**merged) + return json.loads(rendered) + + +# -------- Policy: range windows -------- +@dataclass +class DayRangeWindow: + value: int + # raw indicates fetch from the source data + source: Literal["raw"] = "raw" + +@dataclass +class RangeConfig: + """ + Defines the range of baseline and comparison windows for a given policy. + - baseline: the baseline window that build the baseline value + - comparison: the comparison window that we fetch data from to compare against the baseline value + """ + baseline: DayRangeWindow + comparison: DayRangeWindow + + def total_timedelta(self) -> timedelta: + return timedelta(days=self.baseline.value + self.comparison.value) + def comparison_timedelta(self) -> timedelta: + return timedelta(days=self.comparison.value) + def baseline_timedelta(self) -> timedelta: + return timedelta(days=self.baseline.value) + +# -------- Policy: metrics -------- +@dataclass +class RegressionPolicy: + """ + Defines the policy for a given metric. + - new value muset be {x} baseline value: + - "greater_than": higher is better; new value must be strictly greater to baseline + - "less_than": lower is better; new value must be strictly lower to baseline + - "equal_to": new value should be ~= baseline * threshold within rel_tol + - "greater_equal": higher is better; new value must be greater or equal to baseline + - "less_equal": lower is better; new value must be less or equal to baseline + """ + name: str + condition: Literal["greater_than", "less_than", "equal_to","greater_equal","less_equal"] + threshold: float + baseline_aggregation: Literal["avg", "max", "min", "p50", "p90", "p95","latest","earliest"] = "max" + rel_tol: float = 1e-3 # used only for "equal_to" + + def is_violation(self, value: float, baseline: float) -> bool: + target = baseline * self.threshold + + if self.condition == "greater_than": + # value must be strictly greater than target + return value <= target + + if self.condition == "greater_equal": + # value must be greater or equal to target + return value < target + + if self.condition == "less_than": + # value must be strictly less than target + return value >= target + + if self.condition == "less_equal": + # value must be less or equal to target + return value > target + + if self.condition == "equal_to": + # |value - target| should be within rel_tol * max(1, |target|) + denom = max(1.0, abs(target)) + return abs(value - target) > self.rel_tol * denom + + raise ValueError(f"Unknown condition: {self.condition}") +class BaseNotificationConfig: + # every subclass must override this + type_tag: ClassVar[str] + + @classmethod + def from_dict(cls: Type[T], d: Dict[str, Any]) -> T: + # pick only known fields for this dataclass + kwargs = {f.name: d.get(f.name) for f in fields(cls)} + return cls(**kwargs) # type: ignore + + @classmethod + def matches(cls, d: Dict[str, Any]) -> bool: + return d.get("type") == cls.type_tag + + +@dataclass +class GitHubNotificationConfig(BaseNotificationConfig): + type: str = "github" + repo: str = "" + issue_number: str = "" + type_tag: ClassVar[str] = "github" + + def create_github_comment(self, body: str, github_token: str) -> Dict[str, Any]: + """ + Create a new comment on a GitHub issue. + Args: + notification_config: dict with keys: + - type: must be "github" + - repo: "owner/repo" + - issue: issue number (string or int) + body: text of the comment + token: GitHub personal access token or GitHub Actions token + + Returns: + The GitHub API response as a dict (JSON). + """ + url = f"https://api.github.com/repos/{self.repo}/issues/{self.issue_number}/comments" + headers = { + "Authorization": f"token {github_token}", + "Accept": "application/vnd.github+json", + "User-Agent": "bench-reporter/1.0", + } + resp = requests.post(url, headers=headers, json={"body": body}) + resp.raise_for_status() + return resp.json() + +@dataclass +class Policy: + frequency: Frequency + range: RangeConfig + metrics: Dict[str, RegressionPolicy] + notification_config: Optional[Dict[str, Any]] = None + + def get_github_notification_config(self) -> Optional[GitHubNotificationConfig]: + if not self.notification_config: + return None + return notification_from_dict(self.notification_config) # type: ignore + + +# -------- Top-level benchmark regression config -------- +@dataclass +class BenchmarkConfig: + """ + Represents a single benchmark regression configuration. + + - BenchmarkConfig defines the benchmark regression config for a given benchmark. + - source: defines the source of the benchmark data we want to query + - policy: defines the policy for the benchmark regressions + - name: the name of the benchmark + - id: the id of the benchmark, this must be unique for each benchmark, and cannot be changed once set + """ + name: str + id: str + source: BenchmarkApiSource + policy: Policy + + +@dataclass +class BenchmarkRegressionConfigBook: + configs: Dict[str, BenchmarkConfig] = field(default_factory=dict) + + def __getitem__(self, key: str) -> BenchmarkConfig: + config = self.configs.get(key, None) + if not config: + raise KeyError(f"Config {key} not found") + return config diff --git a/aws/lambda/benchmark_regression_summary_report/lambda_function.py b/aws/lambda/benchmark_regression_summary_report/lambda_function.py new file mode 100644 index 0000000000..60dbe6899b --- /dev/null +++ b/aws/lambda/benchmark_regression_summary_report/lambda_function.py @@ -0,0 +1,416 @@ +#!/usr/bin/env python +import argparse +from concurrent.futures import ThreadPoolExecutor, as_completed +import json +import logging +import os +import threading +import requests +import datetime as dt +from typing import Any, Optional +import clickhouse_connect +from common.benchmark_time_series_api_model import ( + BenchmarkTimeSeriesApiResponse, +) +from common.config_model import ( + BenchmarkApiSource, + BenchmarkConfig, + Frequency, +) +from common.config import get_benchmark_regression_config +from dateutil.parser import isoparse + +logging.basicConfig( + level=logging.INFO, +) +logger = logging.getLogger() +logger.setLevel("INFO") + +ENVS = { + "GITHUB_ACCESS_TOKEN": os.getenv("GITHUB_ACCESS_TOKEN", ""), + "CLICKHOUSE_ENDPOINT": os.getenv("CLICKHOUSE_ENDPOINT", ""), + "CLICKHOUSE_PASSWORD": os.getenv("CLICKHOUSE_PASSWORD", ""), + "CLICKHOUSE_USERNAME": os.getenv("CLICKHOUSE_USERNAME", ""), +} + +# TODO(elainewy): change this to benchmark.benchmark_regression_report once the table is created +BENCHMARK_REGRESSION_REPORT_TABLE = "fortesting.benchmark_regression_report" +BENCHMARK_REGRESSION_TRACKING_CONFIG_IDS = ["compiler_regression"] + + +def truncate_to_hour(ts: dt.datetime) -> dt.datetime: + return ts.replace(minute=0, second=0, microsecond=0) + +def get_clickhouse_client( + host: str, user: str, password: str +) -> clickhouse_connect.driver.client.Client: + # for local testing only, disable SSL verification + logger.info("trying to connect with clickhouse") + # return clickhouse_connect.get_client(host=host, user=user, password=password,secure=True, verify=False) + + return clickhouse_connect.get_client( + host=host, user=user, password=password, secure=True + ) + + +def get_clickhouse_client_environment() -> clickhouse_connect.driver.client.Client: + for name, env_val in ENVS.items(): + if not env_val: + raise ValueError(f"Missing environment variable {name}") + return get_clickhouse_client( + host=ENVS["CLICKHOUSE_ENDPOINT"], + user=ENVS["CLICKHOUSE_USERNAME"], + password=ENVS["CLICKHOUSE_PASSWORD"], + ) + +BENCHMARK_REGRESSION_SUMMARY_REPORT_TABLE = ( + "fortesting.benchmark_regression_summary_report" +) + + +class BenchmarkSummaryProcessor: + """ """ + + def __init__( + self, + is_dry_run: bool = False, + ) -> None: + self.is_dry_run = is_dry_run + + def process( + self, + config_id: str, + end_time: dt.datetime, + cc: Optional[clickhouse_connect.driver.client.Client] = None, + args: Optional[argparse.Namespace] = None, + ): + def log_info(msg: str): + logger.info("[%s] %s", config_id, msg) + def log_error(msg:str): + logger.error("[%s] %s", config_id, msg) + + # ensure each thread has its own clickhouse client. clickhouse client + # is not thread-safe. + if cc is None: + tlocal = threading.local() + if not hasattr(tlocal, "cc") or tlocal.cc is None: + if args: + tlocal.cc = get_clickhouse_client( + args.clickhouse_endpoint, + args.clickhouse_username, + args.clickhouse_password, + ) + else: + tlocal.cc = get_clickhouse_client_environment() + cc = tlocal.cc + try: + config = get_benchmark_regression_config(config_id) + log_info(f"found config for config_id {config_id}") + except ValueError as e: + log_error(f"Skip process, Invalid config: {e}") + return + except Exception as e: + log_error( + f"Unexpected error from get_benchmark_regression_config: {e}" + ) + return + + # check if the current time is > policy's time_delta + previous record_ts from summary_table + report_freq = config.policy.frequency + should_generate = self._should_generate_report( + cc, end_time, config_id, report_freq + ) + if not should_generate: + log_info( + "Skip generate report for time:{end_time} with frequency {report_freq.get_text()}, no data found", + ) + return + else: + log_info("Plan to generate report for time:{end_time} with frequency {report_freq.get_text()}...") + latest, ls, le = self.get_latest(config, end_time) + if not latest: + log_info(f"no latest data found for time range [{ls},{le}] with frequency {report_freq.get_text()}...") + return + + baseline,bs,be = self.get_basline(config, end_time) + if not baseline: + log_info(f"no baseline data found for time range [{bs},{be}] with frequency {report_freq.get_text()}...") + return + + def get_latest(self, config: BenchmarkConfig, end_time: dt.datetime): + data_range = config.policy.range + latest_s = end_time - data_range.comparison_timedelta() + latest_e = end_time + latest_data = self._fetch_from_benchmark_ts_api( + config_id=config.id, + start_time=latest_s, + end_time=latest_e, + source=config.source, + ) + if not latest_data.time_range or not latest_data.time_range.end: + return None,latest_s,latest_e + if not self.should_use_data(latest_data.time_range.end, end_time): + return None,latest_s,latest_e + return latest_data,latest_s,latest_e + + def get_basline(self, config: BenchmarkConfig, end_time: dt.datetime): + data_range = config.policy.range + baseline_s = end_time - data_range.total_timedelta() + baseline_e = end_time - data_range.comparison_timedelta() + # fetch baseline from api + raw_data = self._fetch_from_benchmark_ts_api( + config_id=config.id, + start_time=baseline_s, + end_time=baseline_e, + source=config.source, + ) + if not self.should_use_data(raw_data.time_range.end, end_time): + logger.info( + "[%s][get_basline] Skip generate report, no data found during [%s,%s]", + config.id, + baseline_s.isoformat(), + baseline_e.isoformat(), + ) + return None, baseline_s, baseline_e + return raw_data, baseline_s,baseline_e + + def should_use_data( + self, + latest_ts_str: str, + end_time: dt.datetime, + min_delta: dt.timedelta = dt.timedelta(days=2), + ) -> bool: + if not latest_ts_str: + return False + latest_dt = isoparse(latest_ts_str) + cutoff = end_time - min_delta + return latest_dt >= cutoff + + def _fetch_from_benchmark_ts_api( + self, + config_id: str, + end_time: dt.datetime, + start_time: dt.datetime, + source: BenchmarkApiSource, + ): + str_end_time = end_time.strftime("%Y-%m-%dT%H:%M:%S") + str_start_time = start_time.strftime("%Y-%m-%dT%H:%M:%S") + query = source.render( + ctx={ + "startTime": str_start_time, + "stopTime": str_end_time, + } + ) + url = source.api_query_url + + logger.info("[%s]trying to call %s, with query\n %s",config_id, url,query) + try: + resp: BenchmarkTimeSeriesApiResponse = ( + BenchmarkTimeSeriesApiResponse.from_request(url, query) + ) + return resp.data + except requests.exceptions.HTTPError as e: + logger.error("Server error message: %s", e.response.json().get("error")) + raise + except Exception as e: + raise RuntimeError(f"[{config_id}]Fetch failed: {e}") + + def _should_generate_report( + self, + cc: clickhouse_connect.driver.client.Client, + end_time: dt.datetime, + config_id: str, + f: Frequency, + ) -> bool: + def _get_latest_record_ts( + cc: clickhouse_connect.driver.Client, + config_id: str, + ) -> Optional[dt.datetime]: + table = BENCHMARK_REGRESSION_REPORT_TABLE + res = cc.query( + f""" + SELECT max(last_record_ts) + FROM {table} + WHERE report_id = {{config_id:String}} + """, + parameters={"config_id": config_id}, + ) + if not res.result_rows or res.result_rows[0][0] is None: + return None + latest: dt.datetime = res.result_rows[0][ + 0 + ] # typically tz-aware UTC from clickhouse_connect + # If not tz-aware, force UTC: + if latest.tzinfo is None: + latest = latest.replace(tzinfo=dt.timezone.utc) + return latest + + freq_delta = f.to_timedelta() + latest_record_ts = _get_latest_record_ts(cc, config_id) + + # No report exists yet, generate + if not latest_record_ts: + return True + + end_utc = ( + end_time if end_time.tzinfo else end_time.replace(tzinfo=dt.timezone.utc) + ) + end_utc = end_utc.astimezone(dt.timezone.utc) + cutoff = end_time - freq_delta + return latest_record_ts < cutoff + + +class WorkerPoolHandler: + """ + WorkerPoolHandler runs workers in parallel to generate benchmark regression report + and writes the results to the target destination. + + """ + + def __init__( + self, + benchmark_summary_processor: BenchmarkSummaryProcessor, + max_workers: int = 6, + ): + self.benchmark_summary_processor = benchmark_summary_processor + self.max_workers = max_workers + + def start( + self, + config_ids: list[str], + args: Optional[argparse.Namespace] = None, + ) -> None: + logger.info( + "[WorkerPoolHandler] start to process benchmark " + "summary data with config_ids %s", + config_ids, + ) + end_time = dt.datetime.now(dt.timezone.utc).replace( + minute=0, second=0, microsecond=0 + ) + logger.info("current time with hour granularity(utc) %s", end_time) + with ThreadPoolExecutor(max_workers=self.max_workers) as executor: + futures = [] + for config_id in config_ids: + future = executor.submit( + self.benchmark_summary_processor.process, + config_id, + end_time, + cc=None, + args=args, + ) + futures.append(future) + results = [] + errors = [] + + # handle results from parallel processing + for future in as_completed(futures): + try: + result = future.result() + # This will raise an exception if one occurred + results.append(result) + except Exception as e: + logger.warning(f"Error processing future: {e}") + errors.append({"error": str(e)}) + + +def main( + args: Optional[argparse.Namespace] = None, + github_access_token: str = "", + is_dry_run: bool = False, +): + """ + Main method to run in both local environment and lambda handler. + 1. generate intervals[start_time,end_time] using latest timestamp from source table and target table + 2. call WorkerPoolHandler to geneterate and write histogram data for each interval in parallel + """ + if not github_access_token: + raise ValueError("Missing environment variable GITHUB_ACCESS_TOKEN") + + # get time intervals. + logger.info("[Main] start work ....") + + # get jobs in queue from clickhouse for list of time intervals, in parallel + handler = WorkerPoolHandler( + BenchmarkSummaryProcessor(is_dry_run=is_dry_run), + ) + handler.start(BENCHMARK_REGRESSION_TRACKING_CONFIG_IDS, args) + logger.info(" [Main] Done. work completed.") + + +def lambda_handler(event: Any, context: Any) -> None: + """ + Main method to run in aws lambda environment + """ + main( + None, + github_access_token=ENVS["GITHUB_ACCESS_TOKEN"], + ) + return + + +def parse_args() -> argparse.Namespace: + """ + Parse command line args, this is mainly used for local test environment. + """ + parser = argparse.ArgumentParser() + parser.add_argument( + "--clickhouse-endpoint", + default=ENVS["CLICKHOUSE_ENDPOINT"], + type=str, + help="the clickhouse endpoint, the clickhouse_endpoint " + + "name is https://{clickhouse_endpoint}:{port} for full url ", + ) + parser.add_argument( + "--clickhouse-username", + type=str, + default=ENVS["CLICKHOUSE_USERNAME"], + help="the clickhouse username", + ) + parser.add_argument( + "--clickhouse-password", + type=str, + default=ENVS["CLICKHOUSE_PASSWORD"], + help="the clickhouse password for the user name", + ) + parser.add_argument( + "--github-access-token", + type=str, + default=ENVS["GITHUB_ACCESS_TOKEN"], + help="the github access token to access github api", + ) + parser.add_argument( + "--not-dry-run", + action="store_true", + help="when set, writing results to destination from local " + + "environment. By default, we run in dry-run mode for local " + + "environment", + ) + args, _ = parser.parse_known_args() + return args + + +def local_run() -> None: + """ + method to run in local test environment + """ + + args = parse_args() + + + logger.info("args: %s",args) + + # update environment variables for input parameters + + # always run in dry-run mode in local environment, unless it's disabled. + is_dry_run = not args.not_dry_run + + main( + args, + args.github_access_token, + is_dry_run=is_dry_run, + ) + + +if __name__ == "__main__": + local_run() diff --git a/aws/lambda/benchmark_regression_summary_report/requirements.txt b/aws/lambda/benchmark_regression_summary_report/requirements.txt new file mode 100644 index 0000000000..87c33c2e7f --- /dev/null +++ b/aws/lambda/benchmark_regression_summary_report/requirements.txt @@ -0,0 +1,5 @@ +clickhouse_connect==0.8.5 +boto3==1.35.33 +PyGithub==1.59.0 +python-dateutil==2.8.2 +PyYAML==6.0.1 diff --git a/clickhouse_db_schema/benchmark_regression_summary_report/schema.sql b/clickhouse_db_schema/benchmark_regression_summary_report/schema.sql new file mode 100644 index 0000000000..df85548710 --- /dev/null +++ b/clickhouse_db_schema/benchmark_regression_summary_report/schema.sql @@ -0,0 +1,28 @@ +CREATE TABLE benchmark.benchmark_regression_report +( + `id` UUID DEFAULT generateUUIDv4(), + `report_id` String, -- unique id for the report config + `created_at` DateTime64(0, 'UTC') DEFAULT now(), + `last_record_ts` DateTime64(0, 'UTC'), + `last_record_commit` String, + `type` String, -- e.g. 'daily','weekly' + `status` String, -- e.g. 'no_regression',"regression",'failure' + `regression_count` UInt32 DEFAULT 0, + `insufficient_data_count` UInt32 DEFAULT 0, + `total_count` UInt32 DEFAULT 0, + `report` String DEFAULT '{}' +) +ENGINE = SharedMergeTree('/clickhouse/tables/{uuid}/{shard}', '{replica}') +PARTITION BY toYYYYMM(report_date) +ORDER BY +( + report_id, + type, + status, + last_record_ts, + last_record_commit, + created_at, + id +) +TTL created_at + toIntervalYear(10) +SETTINGS index_granularity = 8192; From 5be01aca2fe9d222e5b2ad3e93451c9a0297a0bd Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Wed, 3 Sep 2025 15:08:54 -0700 Subject: [PATCH 2/3] Update [ghstack-poisoned] --- .../common/benchmark_time_series_api_model.py | 1 + .../common/config.py | 18 ++++- .../common/config_model.py | 80 ++++++------------- .../lambda_function.py | 73 +++++++++++------ 4 files changed, 88 insertions(+), 84 deletions(-) diff --git a/aws/lambda/benchmark_regression_summary_report/common/benchmark_time_series_api_model.py b/aws/lambda/benchmark_regression_summary_report/common/benchmark_time_series_api_model.py index 24d8a8ec62..552b8cefbd 100644 --- a/aws/lambda/benchmark_regression_summary_report/common/benchmark_time_series_api_model.py +++ b/aws/lambda/benchmark_regression_summary_report/common/benchmark_time_series_api_model.py @@ -4,6 +4,7 @@ # The data class to provide api response model from get_time_series api + @dataclass class TimeRange: start: str diff --git a/aws/lambda/benchmark_regression_summary_report/common/config.py b/aws/lambda/benchmark_regression_summary_report/common/config.py index eb3573f10d..a68dc7355a 100644 --- a/aws/lambda/benchmark_regression_summary_report/common/config.py +++ b/aws/lambda/benchmark_regression_summary_report/common/config.py @@ -17,7 +17,7 @@ name="Compiler Benchmark Regression", id="compiler_regression", source=BenchmarkApiSource( - api_query_url="http://localhost:3000/api/benchmark/get_time_series", + api_query_url="https://hud.pytorch.org/api/benchmark/get_time_series", type="benchmark_time_series_api", # currently we only detect the regression for h100 with dtype bfloat16, and mode inference # we can extend this to other devices, dtypes and mode in the future @@ -50,13 +50,22 @@ ), metrics={ "passrate": RegressionPolicy( - name="passrate", condition="greater_equal", threshold=0.9, baseline_aggregation="max", + name="passrate", + condition="greater_equal", + threshold=0.9, + baseline_aggregation="max", ), "geomean": RegressionPolicy( - name="geomean", condition="greater_equal", threshold=0.95,baseline_aggregation="max", + name="geomean", + condition="greater_equal", + threshold=0.95, + baseline_aggregation="max", ), "compression_ratio": RegressionPolicy( - name="compression_ratio", condition="greater_equal", threshold=0.9, baseline_aggregation="max", + name="compression_ratio", + condition="greater_equal", + threshold=0.9, + baseline_aggregation="max", ), }, notification_config={ @@ -73,6 +82,7 @@ } ) + def get_benchmark_regression_config(config_id: str) -> BenchmarkConfig: """Get benchmark regression config by config id""" try: diff --git a/aws/lambda/benchmark_regression_summary_report/common/config_model.py b/aws/lambda/benchmark_regression_summary_report/common/config_model.py index 59c2f86d9a..f452e84da2 100644 --- a/aws/lambda/benchmark_regression_summary_report/common/config_model.py +++ b/aws/lambda/benchmark_regression_summary_report/common/config_model.py @@ -1,9 +1,8 @@ from __future__ import annotations -from dataclasses import dataclass, field, fields -from typing import Any, ClassVar, Dict, Literal, Optional, Set, Type, Union -from datetime import datetime, timedelta +from dataclasses import dataclass, field +from typing import Any, Dict, Literal, Optional +from datetime import timedelta from jinja2 import Environment, Template, meta -import requests import json @@ -21,8 +20,10 @@ class Frequency: to_timedelta: Convert frequency into a datetime.timedelta. get_text: return the frequency in text format """ + value: int unit: Literal["days", "weeks"] + def to_timedelta(self) -> timedelta: """Convert frequency N days or M weeks into a datetime.timedelta.""" if self.unit == "days": @@ -39,6 +40,7 @@ def get_text(self): # -------- Source -------- _JINJA_ENV = Environment(autoescape=False) + @dataclass class BenchmarkApiSource: """ @@ -47,6 +49,7 @@ class BenchmarkApiSource: api_endpoint_params_template: the jinjia2 template of the api endpoint's query params default_ctx: the default context to use when rendering the api_endpoint_params_template """ + api_query_url: str api_endpoint_params_template: str type: Literal["benchmark_time_series_api", "other"] = "benchmark_time_series_api" @@ -76,6 +79,7 @@ class DayRangeWindow: # raw indicates fetch from the source data source: Literal["raw"] = "raw" + @dataclass class RangeConfig: """ @@ -83,16 +87,20 @@ class RangeConfig: - baseline: the baseline window that build the baseline value - comparison: the comparison window that we fetch data from to compare against the baseline value """ + baseline: DayRangeWindow comparison: DayRangeWindow def total_timedelta(self) -> timedelta: return timedelta(days=self.baseline.value + self.comparison.value) + def comparison_timedelta(self) -> timedelta: return timedelta(days=self.comparison.value) + def baseline_timedelta(self) -> timedelta: return timedelta(days=self.baseline.value) + # -------- Policy: metrics -------- @dataclass class RegressionPolicy: @@ -105,10 +113,15 @@ class RegressionPolicy: - "greater_equal": higher is better; new value must be greater or equal to baseline - "less_equal": lower is better; new value must be less or equal to baseline """ + name: str - condition: Literal["greater_than", "less_than", "equal_to","greater_equal","less_equal"] + condition: Literal[ + "greater_than", "less_than", "equal_to", "greater_equal", "less_equal" + ] threshold: float - baseline_aggregation: Literal["avg", "max", "min", "p50", "p90", "p95","latest","earliest"] = "max" + baseline_aggregation: Literal[ + "avg", "max", "min", "p50", "p90", "p95", "latest", "earliest" + ] = "max" rel_tol: float = 1e-3 # used only for "equal_to" def is_violation(self, value: float, baseline: float) -> bool: @@ -136,63 +149,16 @@ def is_violation(self, value: float, baseline: float) -> bool: return abs(value - target) > self.rel_tol * denom raise ValueError(f"Unknown condition: {self.condition}") -class BaseNotificationConfig: - # every subclass must override this - type_tag: ClassVar[str] - - @classmethod - def from_dict(cls: Type[T], d: Dict[str, Any]) -> T: - # pick only known fields for this dataclass - kwargs = {f.name: d.get(f.name) for f in fields(cls)} - return cls(**kwargs) # type: ignore - - @classmethod - def matches(cls, d: Dict[str, Any]) -> bool: - return d.get("type") == cls.type_tag -@dataclass -class GitHubNotificationConfig(BaseNotificationConfig): - type: str = "github" - repo: str = "" - issue_number: str = "" - type_tag: ClassVar[str] = "github" - - def create_github_comment(self, body: str, github_token: str) -> Dict[str, Any]: - """ - Create a new comment on a GitHub issue. - Args: - notification_config: dict with keys: - - type: must be "github" - - repo: "owner/repo" - - issue: issue number (string or int) - body: text of the comment - token: GitHub personal access token or GitHub Actions token - - Returns: - The GitHub API response as a dict (JSON). - """ - url = f"https://api.github.com/repos/{self.repo}/issues/{self.issue_number}/comments" - headers = { - "Authorization": f"token {github_token}", - "Accept": "application/vnd.github+json", - "User-Agent": "bench-reporter/1.0", - } - resp = requests.post(url, headers=headers, json={"body": body}) - resp.raise_for_status() - return resp.json() - @dataclass class Policy: frequency: Frequency range: RangeConfig metrics: Dict[str, RegressionPolicy] - notification_config: Optional[Dict[str, Any]] = None - def get_github_notification_config(self) -> Optional[GitHubNotificationConfig]: - if not self.notification_config: - return None - return notification_from_dict(self.notification_config) # type: ignore + # TODO(elainewy): add notification config + notification_config: Optional[Dict[str, Any]] = None # -------- Top-level benchmark regression config -------- @@ -200,13 +166,13 @@ def get_github_notification_config(self) -> Optional[GitHubNotificationConfig]: class BenchmarkConfig: """ Represents a single benchmark regression configuration. - - BenchmarkConfig defines the benchmark regression config for a given benchmark. - source: defines the source of the benchmark data we want to query - - policy: defines the policy for the benchmark regressions + - policy: defines the policy for the benchmark regressions, including frequency to generate the report, range of the baseline and new values, and regression thresholds for metrics - name: the name of the benchmark - id: the id of the benchmark, this must be unique for each benchmark, and cannot be changed once set """ + name: str id: str source: BenchmarkApiSource diff --git a/aws/lambda/benchmark_regression_summary_report/lambda_function.py b/aws/lambda/benchmark_regression_summary_report/lambda_function.py index 60dbe6899b..e13dbf749d 100644 --- a/aws/lambda/benchmark_regression_summary_report/lambda_function.py +++ b/aws/lambda/benchmark_regression_summary_report/lambda_function.py @@ -41,12 +41,14 @@ def truncate_to_hour(ts: dt.datetime) -> dt.datetime: return ts.replace(minute=0, second=0, microsecond=0) + def get_clickhouse_client( host: str, user: str, password: str ) -> clickhouse_connect.driver.client.Client: # for local testing only, disable SSL verification - logger.info("trying to connect with clickhouse") - # return clickhouse_connect.get_client(host=host, user=user, password=password,secure=True, verify=False) + return clickhouse_connect.get_client( + host=host, user=user, password=password, secure=True, verify=False + ) return clickhouse_connect.get_client( host=host, user=user, password=password, secure=True @@ -63,14 +65,13 @@ def get_clickhouse_client_environment() -> clickhouse_connect.driver.client.Clie password=ENVS["CLICKHOUSE_PASSWORD"], ) + BENCHMARK_REGRESSION_SUMMARY_REPORT_TABLE = ( "fortesting.benchmark_regression_summary_report" ) class BenchmarkSummaryProcessor: - """ """ - def __init__( self, is_dry_run: bool = False, @@ -86,7 +87,8 @@ def process( ): def log_info(msg: str): logger.info("[%s] %s", config_id, msg) - def log_error(msg:str): + + def log_error(msg: str): logger.error("[%s] %s", config_id, msg) # ensure each thread has its own clickhouse client. clickhouse client @@ -110,9 +112,7 @@ def log_error(msg:str): log_error(f"Skip process, Invalid config: {e}") return except Exception as e: - log_error( - f"Unexpected error from get_benchmark_regression_config: {e}" - ) + log_error(f"Unexpected error from get_benchmark_regression_config: {e}") return # check if the current time is > policy's time_delta + previous record_ts from summary_table @@ -126,15 +126,21 @@ def log_error(msg:str): ) return else: - log_info("Plan to generate report for time:{end_time} with frequency {report_freq.get_text()}...") + log_info( + "Plan to generate report for time:{end_time} with frequency {report_freq.get_text()}..." + ) latest, ls, le = self.get_latest(config, end_time) if not latest: - log_info(f"no latest data found for time range [{ls},{le}] with frequency {report_freq.get_text()}...") + log_info( + f"no latest data found for time range [{ls},{le}] with frequency {report_freq.get_text()}..." + ) return - baseline,bs,be = self.get_basline(config, end_time) + baseline, bs, be = self.get_basline(config, end_time) if not baseline: - log_info(f"no baseline data found for time range [{bs},{be}] with frequency {report_freq.get_text()}...") + log_info( + f"no baseline data found for time range [{bs},{be}] with frequency {report_freq.get_text()}..." + ) return def get_latest(self, config: BenchmarkConfig, end_time: dt.datetime): @@ -147,11 +153,17 @@ def get_latest(self, config: BenchmarkConfig, end_time: dt.datetime): end_time=latest_e, source=config.source, ) + logger.info( + "[%s] found %s # of data, with time range %s", + config.id, + len(latest_data.time_series), + latest_data.time_range, + ) if not latest_data.time_range or not latest_data.time_range.end: - return None,latest_s,latest_e - if not self.should_use_data(latest_data.time_range.end, end_time): - return None,latest_s,latest_e - return latest_data,latest_s,latest_e + return None, latest_s, latest_e + if not self.should_use_data(config.id, latest_data.time_range.end, end_time): + return None, latest_s, latest_e + return latest_data, latest_s, latest_e def get_basline(self, config: BenchmarkConfig, end_time: dt.datetime): data_range = config.policy.range @@ -164,7 +176,13 @@ def get_basline(self, config: BenchmarkConfig, end_time: dt.datetime): end_time=baseline_e, source=config.source, ) - if not self.should_use_data(raw_data.time_range.end, end_time): + + logger.info( + "found %s # of data, with time range %s", + len(raw_data.time_series), + raw_data.time_range, + ) + if not self.should_use_data(config.id, raw_data.time_range.end, baseline_e): logger.info( "[%s][get_basline] Skip generate report, no data found during [%s,%s]", config.id, @@ -172,10 +190,11 @@ def get_basline(self, config: BenchmarkConfig, end_time: dt.datetime): baseline_e.isoformat(), ) return None, baseline_s, baseline_e - return raw_data, baseline_s,baseline_e + return raw_data, baseline_s, baseline_e def should_use_data( self, + config_id: str, latest_ts_str: str, end_time: dt.datetime, min_delta: dt.timedelta = dt.timedelta(days=2), @@ -184,7 +203,16 @@ def should_use_data( return False latest_dt = isoparse(latest_ts_str) cutoff = end_time - min_delta - return latest_dt >= cutoff + + if latest_dt >= cutoff: + return True + logger.info( + "[%s] expect latest data to be after %s, but got %s", + config_id, + cutoff, + latest_dt, + ) + return False def _fetch_from_benchmark_ts_api( self, @@ -203,7 +231,7 @@ def _fetch_from_benchmark_ts_api( ) url = source.api_query_url - logger.info("[%s]trying to call %s, with query\n %s",config_id, url,query) + logger.info("[%s]trying to call %s, with query\n %s", config_id, url, query) try: resp: BenchmarkTimeSeriesApiResponse = ( BenchmarkTimeSeriesApiResponse.from_request(url, query) @@ -282,7 +310,7 @@ def start( ) -> None: logger.info( "[WorkerPoolHandler] start to process benchmark " - "summary data with config_ids %s", + "summary data with required config: %s", config_ids, ) end_time = dt.datetime.now(dt.timezone.utc).replace( @@ -397,8 +425,7 @@ def local_run() -> None: args = parse_args() - - logger.info("args: %s",args) + logger.info("args: %s", args) # update environment variables for input parameters From 07507f998aae9b14cf09caaf0245c423bbc7eabd Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Wed, 3 Sep 2025 15:54:49 -0700 Subject: [PATCH 3/3] Update [ghstack-poisoned] --- .../.gitignore | 3 - .../common/benchmark_time_series_api_model.py | 62 --- .../common/config.py | 91 ---- .../common/config_model.py | 190 -------- .../lambda_function.py | 443 ------------------ .../requirements.txt | 5 - 6 files changed, 794 deletions(-) delete mode 100644 aws/lambda/benchmark_regression_summary_report/.gitignore delete mode 100644 aws/lambda/benchmark_regression_summary_report/common/benchmark_time_series_api_model.py delete mode 100644 aws/lambda/benchmark_regression_summary_report/common/config.py delete mode 100644 aws/lambda/benchmark_regression_summary_report/common/config_model.py delete mode 100644 aws/lambda/benchmark_regression_summary_report/lambda_function.py delete mode 100644 aws/lambda/benchmark_regression_summary_report/requirements.txt diff --git a/aws/lambda/benchmark_regression_summary_report/.gitignore b/aws/lambda/benchmark_regression_summary_report/.gitignore deleted file mode 100644 index bd92f6376a..0000000000 --- a/aws/lambda/benchmark_regression_summary_report/.gitignore +++ /dev/null @@ -1,3 +0,0 @@ -*.zip -deployment/ -venv/ diff --git a/aws/lambda/benchmark_regression_summary_report/common/benchmark_time_series_api_model.py b/aws/lambda/benchmark_regression_summary_report/common/benchmark_time_series_api_model.py deleted file mode 100644 index 552b8cefbd..0000000000 --- a/aws/lambda/benchmark_regression_summary_report/common/benchmark_time_series_api_model.py +++ /dev/null @@ -1,62 +0,0 @@ -from dataclasses import dataclass, field -from typing import Optional, List, Dict, Any -import requests - -# The data class to provide api response model from get_time_series api - - -@dataclass -class TimeRange: - start: str - end: str - - -@dataclass -class BenchmarkTimeSeriesItem: - group_info: Dict[str, Any] - num_of_dp: int - data: List[Dict[str, Any]] = field(default_factory=list) - - -@dataclass -class BenchmarkTimeSeriesApiData: - time_series: List[BenchmarkTimeSeriesItem] - time_range: TimeRange - - -@dataclass -class BenchmarkTimeSeriesApiResponse: - data: BenchmarkTimeSeriesApiData - - @classmethod - def from_request( - cls, url: str, query: dict, timeout: int = 180 - ) -> "BenchmarkTimeSeriesApiResponse": - """ - Send a POST request and parse into BenchmarkTimeSeriesApiResponse. - - Args: - url: API endpoint - query: JSON payload must - timeout: max seconds to wait for connect + response (default: 30) - Returns: - ApiResponse - Raises: - requests.exceptions.RequestException if network/timeout/HTTP error - RuntimeError if the API returns an "error" field or malformed data - """ - resp = requests.post(url, json=query, timeout=timeout) - resp.raise_for_status() - payload = resp.json() - - if "error" in payload: - raise RuntimeError(f"API error: {payload['error']}") - try: - tr = TimeRange(**payload["data"]["time_range"]) - ts = [ - BenchmarkTimeSeriesItem(**item) - for item in payload["data"]["time_series"] - ] - except Exception as e: - raise RuntimeError(f"Malformed API payload: {e}") - return cls(data=BenchmarkTimeSeriesApiData(time_series=ts, time_range=tr)) diff --git a/aws/lambda/benchmark_regression_summary_report/common/config.py b/aws/lambda/benchmark_regression_summary_report/common/config.py deleted file mode 100644 index a68dc7355a..0000000000 --- a/aws/lambda/benchmark_regression_summary_report/common/config.py +++ /dev/null @@ -1,91 +0,0 @@ -from common.config_model import ( - BenchmarkApiSource, - BenchmarkConfig, - BenchmarkRegressionConfigBook, - DayRangeWindow, - Frequency, - RegressionPolicy, - Policy, - RangeConfig, -) - -# Compiler benchmark regression config -# todo(elainewy): eventually each team should configure their own benchmark regression config, currenlty place here for lambda - - -COMPILER_BENCHMARK_CONFIG = BenchmarkConfig( - name="Compiler Benchmark Regression", - id="compiler_regression", - source=BenchmarkApiSource( - api_query_url="https://hud.pytorch.org/api/benchmark/get_time_series", - type="benchmark_time_series_api", - # currently we only detect the regression for h100 with dtype bfloat16, and mode inference - # we can extend this to other devices, dtypes and mode in the future - api_endpoint_params_template=""" - { - "name": "compiler_precompute", - "query_params": { - "commits": [], - "compilers": [], - "arch": "h100", - "device": "cuda", - "dtype": "bfloat16", - "granularity": "hour", - "mode": "inference", - "startTime": "{{ startTime }}", - "stopTime": "{{ stopTime }}", - "suites": ["torchbench", "huggingface", "timm_models"], - "workflowId": 0, - "branches": ["main"] - } - } - """, - ), - # set baseline from past 7 days using avg, and compare with the last 1 day - policy=Policy( - frequency=Frequency(value=1, unit="days"), - range=RangeConfig( - baseline=DayRangeWindow(value=7), - comparison=DayRangeWindow(value=2), - ), - metrics={ - "passrate": RegressionPolicy( - name="passrate", - condition="greater_equal", - threshold=0.9, - baseline_aggregation="max", - ), - "geomean": RegressionPolicy( - name="geomean", - condition="greater_equal", - threshold=0.95, - baseline_aggregation="max", - ), - "compression_ratio": RegressionPolicy( - name="compression_ratio", - condition="greater_equal", - threshold=0.9, - baseline_aggregation="max", - ), - }, - notification_config={ - "type": "github", - "repo": "pytorch/test-infra", - "issue": "7081", - }, - ), -) - -BENCHMARK_REGRESSION_CONFIG = BenchmarkRegressionConfigBook( - configs={ - "compiler_regression": COMPILER_BENCHMARK_CONFIG, - } -) - - -def get_benchmark_regression_config(config_id: str) -> BenchmarkConfig: - """Get benchmark regression config by config id""" - try: - return BENCHMARK_REGRESSION_CONFIG[config_id] - except KeyError: - raise ValueError(f"Invalid config id: {config_id}") diff --git a/aws/lambda/benchmark_regression_summary_report/common/config_model.py b/aws/lambda/benchmark_regression_summary_report/common/config_model.py deleted file mode 100644 index f452e84da2..0000000000 --- a/aws/lambda/benchmark_regression_summary_report/common/config_model.py +++ /dev/null @@ -1,190 +0,0 @@ -from __future__ import annotations -from dataclasses import dataclass, field -from typing import Any, Dict, Literal, Optional -from datetime import timedelta -from jinja2 import Environment, Template, meta -import json - - -# -------- Frequency -------- -@dataclass(frozen=True) -class Frequency: - """ - The frequency of how often the report should be generated. - The minimum frequency we support is 1 day. - Attributes: - value: Number of units (e.g., 7 for 7 days). - unit: Unit of time, either "days" or "weeks". - - Methods: - to_timedelta: Convert frequency into a datetime.timedelta. - get_text: return the frequency in text format - """ - - value: int - unit: Literal["days", "weeks"] - - def to_timedelta(self) -> timedelta: - """Convert frequency N days or M weeks into a datetime.timedelta.""" - if self.unit == "days": - return timedelta(days=self.value) - elif self.unit == "weeks": - return timedelta(weeks=self.value) - else: - raise ValueError(f"Unsupported unit: {self.unit}") - - def get_text(self): - return f"{self.value} {self.unit}" - - -# -------- Source -------- -_JINJA_ENV = Environment(autoescape=False) - - -@dataclass -class BenchmarkApiSource: - """ - Defines the source of the benchmark data we want to query - api_query_url: the url of the api to query - api_endpoint_params_template: the jinjia2 template of the api endpoint's query params - default_ctx: the default context to use when rendering the api_endpoint_params_template - """ - - api_query_url: str - api_endpoint_params_template: str - type: Literal["benchmark_time_series_api", "other"] = "benchmark_time_series_api" - default_ctx: Dict[str, Any] = field(default_factory=dict) - - def required_template_vars(self) -> set[str]: - ast = _JINJA_ENV.parse(self.api_endpoint_params_template) - return set(meta.find_undeclared_variables(ast)) - - def render(self, ctx: Dict[str, Any], strict: bool = True) -> dict: - """Render with caller-supplied context (no special casing for start/end).""" - merged = {**self.default_ctx, **ctx} - - if strict: - required = self.required_template_vars() - missing = required - merged.keys() - if missing: - raise ValueError(f"Missing required vars: {missing}") - rendered = Template(self.api_endpoint_params_template).render(**merged) - return json.loads(rendered) - - -# -------- Policy: range windows -------- -@dataclass -class DayRangeWindow: - value: int - # raw indicates fetch from the source data - source: Literal["raw"] = "raw" - - -@dataclass -class RangeConfig: - """ - Defines the range of baseline and comparison windows for a given policy. - - baseline: the baseline window that build the baseline value - - comparison: the comparison window that we fetch data from to compare against the baseline value - """ - - baseline: DayRangeWindow - comparison: DayRangeWindow - - def total_timedelta(self) -> timedelta: - return timedelta(days=self.baseline.value + self.comparison.value) - - def comparison_timedelta(self) -> timedelta: - return timedelta(days=self.comparison.value) - - def baseline_timedelta(self) -> timedelta: - return timedelta(days=self.baseline.value) - - -# -------- Policy: metrics -------- -@dataclass -class RegressionPolicy: - """ - Defines the policy for a given metric. - - new value muset be {x} baseline value: - - "greater_than": higher is better; new value must be strictly greater to baseline - - "less_than": lower is better; new value must be strictly lower to baseline - - "equal_to": new value should be ~= baseline * threshold within rel_tol - - "greater_equal": higher is better; new value must be greater or equal to baseline - - "less_equal": lower is better; new value must be less or equal to baseline - """ - - name: str - condition: Literal[ - "greater_than", "less_than", "equal_to", "greater_equal", "less_equal" - ] - threshold: float - baseline_aggregation: Literal[ - "avg", "max", "min", "p50", "p90", "p95", "latest", "earliest" - ] = "max" - rel_tol: float = 1e-3 # used only for "equal_to" - - def is_violation(self, value: float, baseline: float) -> bool: - target = baseline * self.threshold - - if self.condition == "greater_than": - # value must be strictly greater than target - return value <= target - - if self.condition == "greater_equal": - # value must be greater or equal to target - return value < target - - if self.condition == "less_than": - # value must be strictly less than target - return value >= target - - if self.condition == "less_equal": - # value must be less or equal to target - return value > target - - if self.condition == "equal_to": - # |value - target| should be within rel_tol * max(1, |target|) - denom = max(1.0, abs(target)) - return abs(value - target) > self.rel_tol * denom - - raise ValueError(f"Unknown condition: {self.condition}") - - -@dataclass -class Policy: - frequency: Frequency - range: RangeConfig - metrics: Dict[str, RegressionPolicy] - - # TODO(elainewy): add notification config - notification_config: Optional[Dict[str, Any]] = None - - -# -------- Top-level benchmark regression config -------- -@dataclass -class BenchmarkConfig: - """ - Represents a single benchmark regression configuration. - - BenchmarkConfig defines the benchmark regression config for a given benchmark. - - source: defines the source of the benchmark data we want to query - - policy: defines the policy for the benchmark regressions, including frequency to generate the report, range of the baseline and new values, and regression thresholds for metrics - - name: the name of the benchmark - - id: the id of the benchmark, this must be unique for each benchmark, and cannot be changed once set - """ - - name: str - id: str - source: BenchmarkApiSource - policy: Policy - - -@dataclass -class BenchmarkRegressionConfigBook: - configs: Dict[str, BenchmarkConfig] = field(default_factory=dict) - - def __getitem__(self, key: str) -> BenchmarkConfig: - config = self.configs.get(key, None) - if not config: - raise KeyError(f"Config {key} not found") - return config diff --git a/aws/lambda/benchmark_regression_summary_report/lambda_function.py b/aws/lambda/benchmark_regression_summary_report/lambda_function.py deleted file mode 100644 index e13dbf749d..0000000000 --- a/aws/lambda/benchmark_regression_summary_report/lambda_function.py +++ /dev/null @@ -1,443 +0,0 @@ -#!/usr/bin/env python -import argparse -from concurrent.futures import ThreadPoolExecutor, as_completed -import json -import logging -import os -import threading -import requests -import datetime as dt -from typing import Any, Optional -import clickhouse_connect -from common.benchmark_time_series_api_model import ( - BenchmarkTimeSeriesApiResponse, -) -from common.config_model import ( - BenchmarkApiSource, - BenchmarkConfig, - Frequency, -) -from common.config import get_benchmark_regression_config -from dateutil.parser import isoparse - -logging.basicConfig( - level=logging.INFO, -) -logger = logging.getLogger() -logger.setLevel("INFO") - -ENVS = { - "GITHUB_ACCESS_TOKEN": os.getenv("GITHUB_ACCESS_TOKEN", ""), - "CLICKHOUSE_ENDPOINT": os.getenv("CLICKHOUSE_ENDPOINT", ""), - "CLICKHOUSE_PASSWORD": os.getenv("CLICKHOUSE_PASSWORD", ""), - "CLICKHOUSE_USERNAME": os.getenv("CLICKHOUSE_USERNAME", ""), -} - -# TODO(elainewy): change this to benchmark.benchmark_regression_report once the table is created -BENCHMARK_REGRESSION_REPORT_TABLE = "fortesting.benchmark_regression_report" -BENCHMARK_REGRESSION_TRACKING_CONFIG_IDS = ["compiler_regression"] - - -def truncate_to_hour(ts: dt.datetime) -> dt.datetime: - return ts.replace(minute=0, second=0, microsecond=0) - - -def get_clickhouse_client( - host: str, user: str, password: str -) -> clickhouse_connect.driver.client.Client: - # for local testing only, disable SSL verification - return clickhouse_connect.get_client( - host=host, user=user, password=password, secure=True, verify=False - ) - - return clickhouse_connect.get_client( - host=host, user=user, password=password, secure=True - ) - - -def get_clickhouse_client_environment() -> clickhouse_connect.driver.client.Client: - for name, env_val in ENVS.items(): - if not env_val: - raise ValueError(f"Missing environment variable {name}") - return get_clickhouse_client( - host=ENVS["CLICKHOUSE_ENDPOINT"], - user=ENVS["CLICKHOUSE_USERNAME"], - password=ENVS["CLICKHOUSE_PASSWORD"], - ) - - -BENCHMARK_REGRESSION_SUMMARY_REPORT_TABLE = ( - "fortesting.benchmark_regression_summary_report" -) - - -class BenchmarkSummaryProcessor: - def __init__( - self, - is_dry_run: bool = False, - ) -> None: - self.is_dry_run = is_dry_run - - def process( - self, - config_id: str, - end_time: dt.datetime, - cc: Optional[clickhouse_connect.driver.client.Client] = None, - args: Optional[argparse.Namespace] = None, - ): - def log_info(msg: str): - logger.info("[%s] %s", config_id, msg) - - def log_error(msg: str): - logger.error("[%s] %s", config_id, msg) - - # ensure each thread has its own clickhouse client. clickhouse client - # is not thread-safe. - if cc is None: - tlocal = threading.local() - if not hasattr(tlocal, "cc") or tlocal.cc is None: - if args: - tlocal.cc = get_clickhouse_client( - args.clickhouse_endpoint, - args.clickhouse_username, - args.clickhouse_password, - ) - else: - tlocal.cc = get_clickhouse_client_environment() - cc = tlocal.cc - try: - config = get_benchmark_regression_config(config_id) - log_info(f"found config for config_id {config_id}") - except ValueError as e: - log_error(f"Skip process, Invalid config: {e}") - return - except Exception as e: - log_error(f"Unexpected error from get_benchmark_regression_config: {e}") - return - - # check if the current time is > policy's time_delta + previous record_ts from summary_table - report_freq = config.policy.frequency - should_generate = self._should_generate_report( - cc, end_time, config_id, report_freq - ) - if not should_generate: - log_info( - "Skip generate report for time:{end_time} with frequency {report_freq.get_text()}, no data found", - ) - return - else: - log_info( - "Plan to generate report for time:{end_time} with frequency {report_freq.get_text()}..." - ) - latest, ls, le = self.get_latest(config, end_time) - if not latest: - log_info( - f"no latest data found for time range [{ls},{le}] with frequency {report_freq.get_text()}..." - ) - return - - baseline, bs, be = self.get_basline(config, end_time) - if not baseline: - log_info( - f"no baseline data found for time range [{bs},{be}] with frequency {report_freq.get_text()}..." - ) - return - - def get_latest(self, config: BenchmarkConfig, end_time: dt.datetime): - data_range = config.policy.range - latest_s = end_time - data_range.comparison_timedelta() - latest_e = end_time - latest_data = self._fetch_from_benchmark_ts_api( - config_id=config.id, - start_time=latest_s, - end_time=latest_e, - source=config.source, - ) - logger.info( - "[%s] found %s # of data, with time range %s", - config.id, - len(latest_data.time_series), - latest_data.time_range, - ) - if not latest_data.time_range or not latest_data.time_range.end: - return None, latest_s, latest_e - if not self.should_use_data(config.id, latest_data.time_range.end, end_time): - return None, latest_s, latest_e - return latest_data, latest_s, latest_e - - def get_basline(self, config: BenchmarkConfig, end_time: dt.datetime): - data_range = config.policy.range - baseline_s = end_time - data_range.total_timedelta() - baseline_e = end_time - data_range.comparison_timedelta() - # fetch baseline from api - raw_data = self._fetch_from_benchmark_ts_api( - config_id=config.id, - start_time=baseline_s, - end_time=baseline_e, - source=config.source, - ) - - logger.info( - "found %s # of data, with time range %s", - len(raw_data.time_series), - raw_data.time_range, - ) - if not self.should_use_data(config.id, raw_data.time_range.end, baseline_e): - logger.info( - "[%s][get_basline] Skip generate report, no data found during [%s,%s]", - config.id, - baseline_s.isoformat(), - baseline_e.isoformat(), - ) - return None, baseline_s, baseline_e - return raw_data, baseline_s, baseline_e - - def should_use_data( - self, - config_id: str, - latest_ts_str: str, - end_time: dt.datetime, - min_delta: dt.timedelta = dt.timedelta(days=2), - ) -> bool: - if not latest_ts_str: - return False - latest_dt = isoparse(latest_ts_str) - cutoff = end_time - min_delta - - if latest_dt >= cutoff: - return True - logger.info( - "[%s] expect latest data to be after %s, but got %s", - config_id, - cutoff, - latest_dt, - ) - return False - - def _fetch_from_benchmark_ts_api( - self, - config_id: str, - end_time: dt.datetime, - start_time: dt.datetime, - source: BenchmarkApiSource, - ): - str_end_time = end_time.strftime("%Y-%m-%dT%H:%M:%S") - str_start_time = start_time.strftime("%Y-%m-%dT%H:%M:%S") - query = source.render( - ctx={ - "startTime": str_start_time, - "stopTime": str_end_time, - } - ) - url = source.api_query_url - - logger.info("[%s]trying to call %s, with query\n %s", config_id, url, query) - try: - resp: BenchmarkTimeSeriesApiResponse = ( - BenchmarkTimeSeriesApiResponse.from_request(url, query) - ) - return resp.data - except requests.exceptions.HTTPError as e: - logger.error("Server error message: %s", e.response.json().get("error")) - raise - except Exception as e: - raise RuntimeError(f"[{config_id}]Fetch failed: {e}") - - def _should_generate_report( - self, - cc: clickhouse_connect.driver.client.Client, - end_time: dt.datetime, - config_id: str, - f: Frequency, - ) -> bool: - def _get_latest_record_ts( - cc: clickhouse_connect.driver.Client, - config_id: str, - ) -> Optional[dt.datetime]: - table = BENCHMARK_REGRESSION_REPORT_TABLE - res = cc.query( - f""" - SELECT max(last_record_ts) - FROM {table} - WHERE report_id = {{config_id:String}} - """, - parameters={"config_id": config_id}, - ) - if not res.result_rows or res.result_rows[0][0] is None: - return None - latest: dt.datetime = res.result_rows[0][ - 0 - ] # typically tz-aware UTC from clickhouse_connect - # If not tz-aware, force UTC: - if latest.tzinfo is None: - latest = latest.replace(tzinfo=dt.timezone.utc) - return latest - - freq_delta = f.to_timedelta() - latest_record_ts = _get_latest_record_ts(cc, config_id) - - # No report exists yet, generate - if not latest_record_ts: - return True - - end_utc = ( - end_time if end_time.tzinfo else end_time.replace(tzinfo=dt.timezone.utc) - ) - end_utc = end_utc.astimezone(dt.timezone.utc) - cutoff = end_time - freq_delta - return latest_record_ts < cutoff - - -class WorkerPoolHandler: - """ - WorkerPoolHandler runs workers in parallel to generate benchmark regression report - and writes the results to the target destination. - - """ - - def __init__( - self, - benchmark_summary_processor: BenchmarkSummaryProcessor, - max_workers: int = 6, - ): - self.benchmark_summary_processor = benchmark_summary_processor - self.max_workers = max_workers - - def start( - self, - config_ids: list[str], - args: Optional[argparse.Namespace] = None, - ) -> None: - logger.info( - "[WorkerPoolHandler] start to process benchmark " - "summary data with required config: %s", - config_ids, - ) - end_time = dt.datetime.now(dt.timezone.utc).replace( - minute=0, second=0, microsecond=0 - ) - logger.info("current time with hour granularity(utc) %s", end_time) - with ThreadPoolExecutor(max_workers=self.max_workers) as executor: - futures = [] - for config_id in config_ids: - future = executor.submit( - self.benchmark_summary_processor.process, - config_id, - end_time, - cc=None, - args=args, - ) - futures.append(future) - results = [] - errors = [] - - # handle results from parallel processing - for future in as_completed(futures): - try: - result = future.result() - # This will raise an exception if one occurred - results.append(result) - except Exception as e: - logger.warning(f"Error processing future: {e}") - errors.append({"error": str(e)}) - - -def main( - args: Optional[argparse.Namespace] = None, - github_access_token: str = "", - is_dry_run: bool = False, -): - """ - Main method to run in both local environment and lambda handler. - 1. generate intervals[start_time,end_time] using latest timestamp from source table and target table - 2. call WorkerPoolHandler to geneterate and write histogram data for each interval in parallel - """ - if not github_access_token: - raise ValueError("Missing environment variable GITHUB_ACCESS_TOKEN") - - # get time intervals. - logger.info("[Main] start work ....") - - # get jobs in queue from clickhouse for list of time intervals, in parallel - handler = WorkerPoolHandler( - BenchmarkSummaryProcessor(is_dry_run=is_dry_run), - ) - handler.start(BENCHMARK_REGRESSION_TRACKING_CONFIG_IDS, args) - logger.info(" [Main] Done. work completed.") - - -def lambda_handler(event: Any, context: Any) -> None: - """ - Main method to run in aws lambda environment - """ - main( - None, - github_access_token=ENVS["GITHUB_ACCESS_TOKEN"], - ) - return - - -def parse_args() -> argparse.Namespace: - """ - Parse command line args, this is mainly used for local test environment. - """ - parser = argparse.ArgumentParser() - parser.add_argument( - "--clickhouse-endpoint", - default=ENVS["CLICKHOUSE_ENDPOINT"], - type=str, - help="the clickhouse endpoint, the clickhouse_endpoint " - + "name is https://{clickhouse_endpoint}:{port} for full url ", - ) - parser.add_argument( - "--clickhouse-username", - type=str, - default=ENVS["CLICKHOUSE_USERNAME"], - help="the clickhouse username", - ) - parser.add_argument( - "--clickhouse-password", - type=str, - default=ENVS["CLICKHOUSE_PASSWORD"], - help="the clickhouse password for the user name", - ) - parser.add_argument( - "--github-access-token", - type=str, - default=ENVS["GITHUB_ACCESS_TOKEN"], - help="the github access token to access github api", - ) - parser.add_argument( - "--not-dry-run", - action="store_true", - help="when set, writing results to destination from local " - + "environment. By default, we run in dry-run mode for local " - + "environment", - ) - args, _ = parser.parse_known_args() - return args - - -def local_run() -> None: - """ - method to run in local test environment - """ - - args = parse_args() - - logger.info("args: %s", args) - - # update environment variables for input parameters - - # always run in dry-run mode in local environment, unless it's disabled. - is_dry_run = not args.not_dry_run - - main( - args, - args.github_access_token, - is_dry_run=is_dry_run, - ) - - -if __name__ == "__main__": - local_run() diff --git a/aws/lambda/benchmark_regression_summary_report/requirements.txt b/aws/lambda/benchmark_regression_summary_report/requirements.txt deleted file mode 100644 index 87c33c2e7f..0000000000 --- a/aws/lambda/benchmark_regression_summary_report/requirements.txt +++ /dev/null @@ -1,5 +0,0 @@ -clickhouse_connect==0.8.5 -boto3==1.35.33 -PyGithub==1.59.0 -python-dateutil==2.8.2 -PyYAML==6.0.1