Skip to content

Commit 5b464e0

Browse files
committed
add db writes
ghstack-source-id: d86cb06 Pull-Request: #7095
1 parent a7654d8 commit 5b464e0

File tree

2 files changed

+289
-0
lines changed

2 files changed

+289
-0
lines changed
Lines changed: 281 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,281 @@
1+
import dataclasses
2+
import datetime as dt
3+
import json
4+
import logging
5+
import uuid
6+
from typing import Any, Dict
7+
8+
import clickhouse_connect
9+
from common.config_model import BenchmarkConfig, Frequency
10+
from common.regression_utils import (
11+
BenchmarkRegressionReport,
12+
get_regression_status,
13+
PerGroupResult,
14+
)
15+
16+
17+
logger = logging.getLogger()
18+
19+
20+
class ReportManager:
21+
"""
22+
handles db insertion and notification processing
23+
Currently, it only supports clickhouse as db and github as notification channel (via github api)
24+
"""
25+
26+
def __init__(
27+
self,
28+
db_table_name: str,
29+
config: BenchmarkConfig,
30+
regression_report: BenchmarkRegressionReport,
31+
type: str = "general",
32+
repo: str = "pytorch/pytorch",
33+
is_dry_run: bool = False,
34+
):
35+
self.is_dry_run = is_dry_run
36+
37+
self.report = regression_report
38+
self.config_id = config.id
39+
self.config = config
40+
41+
self.type = type
42+
self.repo = repo
43+
self.db_table_name = db_table_name
44+
45+
self.id = str(uuid.uuid4())
46+
47+
# extract latest meta data from report
48+
self.baseline = self.report["baseline_meta_data"]
49+
self.target = self.report["new_meta_data"]
50+
self.target_latest_commit = self.target["end"]["commit"]
51+
self.target_latest_ts_str = self.target["end"]["timestamp"]
52+
self.status = get_regression_status(self.report["summary"])
53+
54+
self.report_data = self._to_report_data(
55+
config_id=config.id,
56+
regression_report=self.report,
57+
frequency=self.config.policy.frequency,
58+
)
59+
60+
def run(
61+
self, cc: clickhouse_connect.driver.client.Client, github_token: str
62+
) -> None:
63+
"""
64+
main method used to insert the report to db and create github comment in targeted issue
65+
"""
66+
try:
67+
self.insert_to_db(cc)
68+
except Exception as e:
69+
logger.error(f"failed to insert report to db, error: {e}")
70+
raise
71+
72+
def _collect_regression_items(self) -> list[PerGroupResult]:
73+
items = []
74+
for item in self.report["results"]:
75+
if item["label"] == "regression":
76+
items.append(item)
77+
return items
78+
79+
def insert_to_db(
80+
self,
81+
cc: clickhouse_connect.driver.client.Client,
82+
) -> None:
83+
logger.info(
84+
"[%s]prepare data for db insertion report (%s)...", self.config_id, self.id
85+
)
86+
latest_ts_str = self.target_latest_ts_str
87+
if not latest_ts_str:
88+
raise ValueError(
89+
f"timestamp from latest is required, latest is {self.target}"
90+
)
91+
aware = dt.datetime.fromisoformat(latest_ts_str.replace("Z", "+00:00"))
92+
utc_naive = aware.astimezone(dt.timezone.utc).replace(tzinfo=None)
93+
last_record_ts = utc_naive.strftime("%Y-%m-%d %H:%M:%S")
94+
95+
try:
96+
report_json = json.dumps(
97+
self.report, ensure_ascii=False, separators=(",", ":"), default=str
98+
)
99+
except Exception:
100+
logger.exception(
101+
"[%s] failed to serialize report data to json",
102+
self.config_id,
103+
)
104+
raise
105+
106+
regression_summary = self.report["summary"]
107+
params = {
108+
"id": str(self.id),
109+
"report_id": self.config_id,
110+
"type": self.type,
111+
"status": get_regression_status(self.report["summary"]),
112+
"last_record_commit": self.target_latest_commit,
113+
"last_record_ts": last_record_ts,
114+
"regression_count": regression_summary["regression_count"],
115+
"insufficient_data_count": int(
116+
regression_summary["insufficient_data_count"]
117+
),
118+
"suspected_regression_count": regression_summary["suspicious_count"],
119+
"total_count": regression_summary["total_count"],
120+
"repo": self.repo,
121+
"report_json": report_json,
122+
}
123+
124+
if self.is_dry_run:
125+
logger.info(
126+
"[%s]dry run, skip inserting report to db, report(%s)",
127+
self.config_id,
128+
self.id,
129+
)
130+
logger.info("[dry run] printing db params data")
131+
if self.is_dry_run:
132+
print(json.dumps(params, indent=2, default=str))
133+
logger.info("[dry run] Done! Finish printing db params data")
134+
return
135+
logger.info(
136+
"[%s]inserting benchmark regression report(%s)", self.config_id, self.id
137+
)
138+
try:
139+
self._db_insert(cc, self.db_table_name, params)
140+
except Exception:
141+
logger.exception(
142+
"[%s] failed to insert report to target table %s",
143+
self.config_id,
144+
self.db_table_name,
145+
)
146+
raise
147+
logger.info(
148+
"[%s] Done. inserted benchmark regression report(%s)",
149+
self.config_id,
150+
self.id,
151+
)
152+
153+
def _db_insert(
154+
self,
155+
cc: clickhouse_connect.driver.Client,
156+
table: str,
157+
params: dict,
158+
):
159+
"""
160+
Insert one row into ClickHouse using cc.insert().
161+
Returns (inserted, written_rows).
162+
"""
163+
if self._row_exists(
164+
cc,
165+
table,
166+
params["report_id"],
167+
params["type"],
168+
params["repo"],
169+
params["last_record_ts"],
170+
):
171+
return False, 0
172+
173+
sql = f"""
174+
INSERT INTO {table} (
175+
id,
176+
report_id,
177+
last_record_ts,
178+
last_record_commit,
179+
type,
180+
status,
181+
regression_count,
182+
insufficient_data_count,
183+
suspected_regression_count,
184+
total_count,
185+
repo,
186+
report
187+
)
188+
VALUES
189+
(
190+
%(id)s,
191+
%(report_id)s,
192+
%(last_record_ts)s,
193+
%(last_record_commit)s,
194+
%(type)s,
195+
%(status)s,
196+
%(regression_count)s,
197+
%(insufficient_data_count)s,
198+
%(suspected_regression_count)s,
199+
%(total_count)s,
200+
%(repo)s,
201+
%(report_json)s
202+
)
203+
"""
204+
cc.command(sql, parameters=params)
205+
206+
def _row_exists(
207+
self,
208+
cc: clickhouse_connect.driver.Client,
209+
table: str,
210+
report_id: str,
211+
type_str: str,
212+
repo: str,
213+
last_record_ts,
214+
) -> bool:
215+
"""
216+
Check if a row already exists with the same (report_id, type, repo, stamp).
217+
Returns True if found, False otherwise.
218+
"""
219+
sql = f"""
220+
SELECT 1
221+
FROM {table}
222+
WHERE report_id = %(report_id)s
223+
AND type = %(type)s
224+
AND repo = %(repo)s
225+
AND stamp = toDate(%(last_record_ts)s)
226+
LIMIT 1
227+
"""
228+
res = cc.query(
229+
sql,
230+
parameters={
231+
"report_id": report_id,
232+
"type": type_str,
233+
"repo": repo,
234+
"last_record_ts": last_record_ts,
235+
},
236+
)
237+
return bool(res.result_rows)
238+
239+
def _validate_latest_meta_info(
240+
self, latest_meta_info: Dict[str, Any]
241+
) -> Dict[str, Any]:
242+
latest_commit = latest_meta_info.get("commit")
243+
if not latest_commit:
244+
raise ValueError(
245+
f"missing commit from latest is required, latest is {latest_meta_info}"
246+
)
247+
lastest_ts_str = latest_meta_info.get("timestamp")
248+
if not lastest_ts_str:
249+
raise ValueError(
250+
f"timestamp from latest is required, latest is {latest_meta_info}"
251+
)
252+
return latest_meta_info
253+
254+
def _to_report_data(
255+
self,
256+
config_id: str,
257+
regression_report: BenchmarkRegressionReport,
258+
frequency: Frequency,
259+
) -> dict[str, Any]:
260+
if not self.target_latest_commit:
261+
raise ValueError(
262+
f"missing commit from new is required, latest is {self.target}"
263+
)
264+
lastest_ts_str = self.target_latest_ts_str
265+
if not lastest_ts_str:
266+
raise ValueError(f"timestamp from new is required, latest is {self.target}")
267+
268+
def to_dict(x): # handle dataclass or dict/object
269+
if dataclasses.is_dataclass(x):
270+
return dataclasses.asdict(x)
271+
if isinstance(x, dict):
272+
return x
273+
return vars(x) if hasattr(x, "__dict__") else {"value": str(x)}
274+
275+
report = to_dict(regression_report)
276+
return {
277+
"status": self.status,
278+
"report_id": config_id,
279+
"report": report,
280+
"frequency": frequency.get_text(),
281+
}

aws/lambda/benchmark_regression_summary_report/lambda_function.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from common.config import get_benchmark_regression_config
1515
from common.config_model import BenchmarkApiSource, BenchmarkConfig, Frequency
1616
from common.regression_utils import BenchmarkRegressionReportGenerator
17+
from common.report_manager import ReportManager
1718
from dateutil.parser import isoparse
1819

1920

@@ -157,6 +158,13 @@ def process(
157158
if self.is_dry_run:
158159
print(json.dumps(regression_report, indent=2, default=str))
159160
return
161+
162+
reportManager = ReportManager(
163+
config=config,
164+
regression_report=regression_report,
165+
db_table_name=BENCHMARK_REGRESSION_REPORT_TABLE,
166+
)
167+
reportManager.run(cc, ENVS["GITHUB_TOKEN"])
160168
return
161169

162170
def get_target(self, config: BenchmarkConfig, end_time: int):

0 commit comments

Comments
 (0)