Skip to content

Commit b02e16b

Browse files
authored
add db writes (#7095)
Stack from [ghstack](https://github.com/ezyang/ghstack) (oldest at bottom): * #7112 * #7096 * __->__ #7095 add logics to write the report to db table
1 parent 88d9d6e commit b02e16b

File tree

2 files changed

+306
-0
lines changed

2 files changed

+306
-0
lines changed
Lines changed: 298 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,298 @@
1+
import dataclasses
2+
import datetime as dt
3+
import json
4+
import logging
5+
import uuid
6+
from typing import Any, Dict
7+
8+
import clickhouse_connect
9+
from common.config_model import BenchmarkConfig, Frequency
10+
from common.regression_utils import (
11+
BenchmarkRegressionReport,
12+
get_regression_status,
13+
PerGroupResult,
14+
)
15+
16+
17+
logger = logging.getLogger()
18+
19+
20+
class ReportManager:
21+
"""
22+
handles db insertion and notification processing
23+
Currently, it only supports clickhouse as db and github as notification channel (via github api)
24+
"""
25+
26+
def __init__(
27+
self,
28+
db_table_name: str,
29+
config: BenchmarkConfig,
30+
regression_report: BenchmarkRegressionReport,
31+
type: str = "general",
32+
repo: str = "pytorch/pytorch",
33+
is_dry_run: bool = False,
34+
):
35+
self.is_dry_run = is_dry_run
36+
37+
self.report = regression_report
38+
self.config_id = config.id
39+
self.config = config
40+
41+
self.type = type
42+
self.repo = repo
43+
self.db_table_name = db_table_name
44+
45+
self.id = str(uuid.uuid4())
46+
47+
# extract latest meta data from report
48+
self.baseline = self.report["baseline_meta_data"]
49+
self.target = self.report["new_meta_data"]
50+
self.target_latest_commit = self.target["end"]["commit"]
51+
self.target_latest_ts_str = self.target["end"]["timestamp"]
52+
self.status = get_regression_status(self.report["summary"])
53+
54+
self.report_data = self._to_report_data(
55+
config_id=config.id,
56+
regression_report=self.report,
57+
frequency=self.config.policy.frequency,
58+
)
59+
60+
def run(
61+
self, cc: clickhouse_connect.driver.client.Client, github_token: str
62+
) -> None:
63+
"""
64+
main method used to insert the report to db and create github comment in targeted issue
65+
"""
66+
try:
67+
self.insert_to_db(cc)
68+
except Exception as e:
69+
logger.error(f"failed to insert report to db, error: {e}")
70+
raise
71+
72+
def _collect_regression_items(self) -> list[PerGroupResult]:
73+
items = []
74+
for item in self.report["results"]:
75+
if item["label"] == "regression":
76+
items.append(item)
77+
return items
78+
79+
def insert_to_db(
80+
self,
81+
cc: clickhouse_connect.driver.client.Client,
82+
) -> bool:
83+
logger.info(
84+
"[%s]prepare data for db insertion report (%s)...", self.config_id, self.id
85+
)
86+
latest_ts_str = self.target_latest_ts_str
87+
if not latest_ts_str:
88+
raise ValueError(
89+
f"timestamp from latest is required, latest is {self.target}"
90+
)
91+
aware = dt.datetime.fromisoformat(latest_ts_str.replace("Z", "+00:00"))
92+
utc_naive = aware.astimezone(dt.timezone.utc).replace(tzinfo=None)
93+
last_record_ts = utc_naive.strftime("%Y-%m-%d %H:%M:%S")
94+
95+
try:
96+
report_json = json.dumps(
97+
self.report, ensure_ascii=False, separators=(",", ":"), default=str
98+
)
99+
except Exception:
100+
logger.exception(
101+
"[%s] failed to serialize report data to json",
102+
self.config_id,
103+
)
104+
raise
105+
106+
regression_summary = self.report["summary"]
107+
params = {
108+
"id": str(self.id),
109+
"report_id": self.config_id,
110+
"type": self.type,
111+
"status": get_regression_status(self.report["summary"]),
112+
"last_record_commit": self.target_latest_commit,
113+
"last_record_ts": last_record_ts,
114+
"regression_count": regression_summary["regression_count"],
115+
"insufficient_data_count": int(
116+
regression_summary["insufficient_data_count"]
117+
),
118+
"suspected_regression_count": regression_summary["suspicious_count"],
119+
"total_count": regression_summary["total_count"],
120+
"repo": self.repo,
121+
"report_json": report_json,
122+
}
123+
124+
if self.is_dry_run:
125+
logger.info(
126+
"[%s]dry run, skip inserting report to db, report(%s)",
127+
self.config_id,
128+
self.id,
129+
)
130+
logger.info("[dry run] printing db params data")
131+
if self.is_dry_run:
132+
print(json.dumps(params, indent=2, default=str))
133+
logger.info("[dry run] Done! Finish printing db params data")
134+
return False
135+
logger.info(
136+
"[%s]inserting benchmark regression report(%s)", self.config_id, self.id
137+
)
138+
try:
139+
if self._row_exists(
140+
cc,
141+
self.db_table_name,
142+
params["report_id"],
143+
params["type"],
144+
params["repo"],
145+
params["last_record_ts"],
146+
):
147+
logger.info(
148+
"[%s] report already exists, skip inserting report to db, report(%s)",
149+
self.config_id,
150+
self.id,
151+
)
152+
return False
153+
self._db_insert(cc, self.db_table_name, params)
154+
logger.info(
155+
"[%s] Done. inserted benchmark regression report(%s)",
156+
self.config_id,
157+
self.id,
158+
)
159+
return True
160+
except Exception:
161+
logger.exception(
162+
"[%s] failed to insert report to target table %s",
163+
self.config_id,
164+
self.db_table_name,
165+
)
166+
raise
167+
168+
def _db_insert(
169+
self,
170+
cc: clickhouse_connect.driver.Client,
171+
table: str,
172+
params: dict,
173+
):
174+
"""
175+
Insert one row into ClickHouse using cc.insert().
176+
Returns (inserted, written_rows).
177+
"""
178+
if self._row_exists(
179+
cc,
180+
table,
181+
params["report_id"],
182+
params["type"],
183+
params["repo"],
184+
params["last_record_ts"],
185+
):
186+
return False, 0
187+
188+
sql = f"""
189+
INSERT INTO {table} (
190+
id,
191+
report_id,
192+
last_record_ts,
193+
last_record_commit,
194+
type,
195+
status,
196+
regression_count,
197+
insufficient_data_count,
198+
suspected_regression_count,
199+
total_count,
200+
repo,
201+
report
202+
)
203+
VALUES
204+
(
205+
%(id)s,
206+
%(report_id)s,
207+
%(last_record_ts)s,
208+
%(last_record_commit)s,
209+
%(type)s,
210+
%(status)s,
211+
%(regression_count)s,
212+
%(insufficient_data_count)s,
213+
%(suspected_regression_count)s,
214+
%(total_count)s,
215+
%(repo)s,
216+
%(report_json)s
217+
)
218+
"""
219+
cc.command(sql, parameters=params)
220+
221+
def _row_exists(
222+
self,
223+
cc: clickhouse_connect.driver.Client,
224+
table: str,
225+
report_id: str,
226+
type_str: str,
227+
repo: str,
228+
last_record_ts,
229+
) -> bool:
230+
"""
231+
Check if a row already exists with the same (report_id, type, repo, stamp).
232+
Returns True if found, False otherwise.
233+
Stamp is the datetime of the last record ts, this makes sure we only insert one
234+
report for a (config,type) per day.
235+
"""
236+
sql = f"""
237+
SELECT 1
238+
FROM {table}
239+
WHERE report_id = %(report_id)s
240+
AND type = %(type)s
241+
AND repo = %(repo)s
242+
AND stamp = toDate(%(last_record_ts)s)
243+
LIMIT 1
244+
"""
245+
res = cc.query(
246+
sql,
247+
parameters={
248+
"report_id": report_id,
249+
"type": type_str,
250+
"repo": repo,
251+
"last_record_ts": last_record_ts,
252+
},
253+
)
254+
return bool(res.result_rows)
255+
256+
def _validate_latest_meta_info(
257+
self, latest_meta_info: Dict[str, Any]
258+
) -> Dict[str, Any]:
259+
latest_commit = latest_meta_info.get("commit")
260+
if not latest_commit:
261+
raise ValueError(
262+
f"missing commit from latest is required, latest is {latest_meta_info}"
263+
)
264+
lastest_ts_str = latest_meta_info.get("timestamp")
265+
if not lastest_ts_str:
266+
raise ValueError(
267+
f"timestamp from latest is required, latest is {latest_meta_info}"
268+
)
269+
return latest_meta_info
270+
271+
def _to_report_data(
272+
self,
273+
config_id: str,
274+
regression_report: BenchmarkRegressionReport,
275+
frequency: Frequency,
276+
) -> dict[str, Any]:
277+
if not self.target_latest_commit:
278+
raise ValueError(
279+
f"missing commit from new is required, latest is {self.target}"
280+
)
281+
lastest_ts_str = self.target_latest_ts_str
282+
if not lastest_ts_str:
283+
raise ValueError(f"timestamp from new is required, latest is {self.target}")
284+
285+
def to_dict(x): # handle dataclass or dict/object
286+
if dataclasses.is_dataclass(x):
287+
return dataclasses.asdict(x)
288+
if isinstance(x, dict):
289+
return x
290+
return vars(x) if hasattr(x, "__dict__") else {"value": str(x)}
291+
292+
report = to_dict(regression_report)
293+
return {
294+
"status": self.status,
295+
"report_id": config_id,
296+
"report": report,
297+
"frequency": frequency.get_text(),
298+
}

aws/lambda/benchmark_regression_summary_report/lambda_function.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from common.config import get_benchmark_regression_config
1515
from common.config_model import BenchmarkApiSource, BenchmarkConfig, Frequency
1616
from common.regression_utils import BenchmarkRegressionReportGenerator
17+
from common.report_manager import ReportManager
1718
from dateutil.parser import isoparse
1819

1920

@@ -157,6 +158,13 @@ def process(
157158
if self.is_dry_run:
158159
print(json.dumps(regression_report, indent=2, default=str))
159160
return
161+
162+
reportManager = ReportManager(
163+
config=config,
164+
regression_report=regression_report,
165+
db_table_name=BENCHMARK_REGRESSION_REPORT_TABLE,
166+
)
167+
reportManager.run(cc, ENVS["GITHUB_TOKEN"])
160168
return
161169

162170
def get_target(self, config: BenchmarkConfig, end_time: int):

0 commit comments

Comments
 (0)