Skip to content

Commit 759de92

Browse files
Merge pull request #172 from ecmwf-projects/COPDS-1504-live-logs
Live logs
2 parents b83f2b8 + 7beb791 commit 759de92

File tree

4 files changed

+114
-56
lines changed

4 files changed

+114
-56
lines changed

cads_processing_api_service/clients.py

Lines changed: 65 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
# See the License for the specific language governing permissions and
1717
# limitations under the License
1818

19+
import datetime
1920
import uuid
2021

2122
import attrs
@@ -242,8 +243,9 @@ def post_process_execution(
242243
def get_jobs(
243244
self,
244245
processID: list[str] | None = fastapi.Query(None),
245-
status: list[ogc_api_processes_fastapi.models.StatusCode]
246-
| None = fastapi.Query(None),
246+
status: (
247+
list[ogc_api_processes_fastapi.models.StatusCode] | None
248+
) = fastapi.Query(None),
247249
limit: int | None = fastapi.Query(10, ge=1, le=10000),
248250
sortby: utils.JobSortCriterion | None = fastapi.Query(
249251
utils.JobSortCriterion.created_at_desc
@@ -317,34 +319,38 @@ def get_jobs(
317319
**job_filters,
318320
)
319321
job_entries = compute_session.scalars(statement).all()
320-
if back:
321-
job_entries = reversed(job_entries)
322-
jobs = []
323-
catalogue_sessionmaker = db_utils.get_catalogue_sessionmaker(
324-
db_utils.ConnectionMode.read
325-
)
326-
for job in job_entries:
327-
with catalogue_sessionmaker() as catalogue_session:
328-
try:
329-
(dataset_title,) = utils.get_resource_properties(
330-
resource_id=job.process_id,
331-
properties="title",
332-
table=self.process_table,
333-
session=catalogue_session,
322+
if back:
323+
job_entries = reversed(job_entries)
324+
jobs = []
325+
catalogue_sessionmaker = db_utils.get_catalogue_sessionmaker(
326+
db_utils.ConnectionMode.read
327+
)
328+
for job in job_entries:
329+
with catalogue_sessionmaker() as catalogue_session:
330+
try:
331+
(dataset_title,) = utils.get_resource_properties(
332+
resource_id=job.process_id,
333+
properties="title",
334+
table=self.process_table,
335+
session=catalogue_session,
336+
)
337+
except ogc_api_processes_fastapi.exceptions.NoSuchProcess:
338+
dataset_title = config.ensure_settings().missing_dataset_title
339+
results = utils.parse_results_from_broker_db(
340+
job, session=compute_session
341+
)
342+
jobs.append(
343+
utils.make_status_info(
344+
job=job,
345+
results=results,
346+
dataset_metadata={"title": dataset_title},
347+
qos={
348+
"status": cads_broker.database.get_qos_status_from_request(
349+
job
350+
)
351+
},
334352
)
335-
except ogc_api_processes_fastapi.exceptions.NoSuchProcess:
336-
dataset_title = config.ensure_settings().missing_dataset_title
337-
results = utils.parse_results_from_broker_db(job)
338-
jobs.append(
339-
utils.make_status_info(
340-
job=job,
341-
results=results,
342-
dataset_metadata={"title": dataset_title},
343-
qos={
344-
"status": cads_broker.database.get_qos_status_from_request(job)
345-
},
346353
)
347-
)
348354
job_list = models.JobList(
349355
jobs=jobs,
350356
links=[ogc_api_processes_fastapi.models.Link(href="")],
@@ -367,6 +373,9 @@ def get_job(
367373
qos: bool = fastapi.Query(False),
368374
request: bool = fastapi.Query(False),
369375
log: bool = fastapi.Query(False),
376+
log_start_time: datetime.datetime | None = fastapi.Query(
377+
None, alias="logStartTime"
378+
),
370379
) -> models.StatusInfo:
371380
"""Implement OGC API - Processes `GET /jobs/{job_id}` endpoint.
372381
@@ -386,6 +395,8 @@ def get_job(
386395
Whether to include the request in the response
387396
log : bool, optional
388397
Whether to include the job's log in the response
398+
log_start_time: datetime.datetime, optional
399+
Datetime of the first log message to be returned
389400
390401
Returns
391402
-------
@@ -403,7 +414,16 @@ def get_job(
403414
job_id=job_id, session=compute_session
404415
)
405416
if qos:
406-
job_qos_info = utils.collect_job_qos_info(job, compute_session)
417+
job_qos_info = utils.get_job_qos_info(job, compute_session)
418+
# These lines are inside the session context because the related fields
419+
# are lazy loaded
420+
if log:
421+
job_log = utils.get_job_events(
422+
job,
423+
compute_session,
424+
"user_visible_log",
425+
log_start_time,
426+
)
407427
except ogc_api_processes_fastapi.exceptions.NoSuchJob:
408428
compute_sessionmaker = db_utils.get_compute_sessionmaker(
409429
mode=db_utils.ConnectionMode.write
@@ -413,7 +433,16 @@ def get_job(
413433
job_id=job_id, session=compute_session
414434
)
415435
if qos:
416-
job_qos_info = utils.collect_job_qos_info(job, compute_session)
436+
job_qos_info = utils.get_job_qos_info(job, compute_session)
437+
# These lines are inside the session context because the related fields
438+
# are lazy loaded
439+
if log:
440+
job_log = utils.get_job_events(
441+
job,
442+
compute_session,
443+
"user_visible_log",
444+
log_start_time,
445+
)
417446
if job.portal not in portals:
418447
raise ogc_api_processes_fastapi.exceptions.NoSuchJob(
419448
detail=f"job {job_id} not found"
@@ -438,13 +467,15 @@ def get_job(
438467
request_ids, form_data
439468
),
440469
}
470+
if log:
471+
kwargs["log"] = [
472+
(message[0].isoformat(), message[1]) for message in job_log
473+
]
441474
if qos:
442475
kwargs["qos"] = {
443476
**job_qos_info,
444477
"status": cads_broker.database.get_qos_status_from_request(job),
445478
}
446-
if log:
447-
kwargs["log"] = utils.extract_job_log(job)
448479
status_info = utils.make_status_info(job=job, **kwargs)
449480
return status_info
450481

@@ -483,8 +514,8 @@ def get_job_results(
483514
job = utils.get_job_from_broker_db(
484515
job_id=job_id, session=compute_session
485516
)
517+
results = utils.get_results_from_job(job=job, session=compute_session)
486518
auth.verify_permission(user_uid, job)
487-
results = utils.get_results_from_job(job=job)
488519
except (
489520
ogc_api_processes_fastapi.exceptions.NoSuchJob,
490521
ogc_api_processes_fastapi.exceptions.ResultsNotReady,
@@ -496,8 +527,8 @@ def get_job_results(
496527
job = utils.get_job_from_broker_db(
497528
job_id=job_id, session=compute_session
498529
)
530+
results = utils.get_results_from_job(job=job, session=compute_session)
499531
auth.verify_permission(user_uid, job)
500-
results = utils.get_results_from_job(job=job)
501532
handle_download_metrics(job.process_id, results)
502533
return results
503534

cads_processing_api_service/models.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ class StatusInfoMetadata(pydantic.BaseModel):
2626
results: dict[str, Any] | None = None
2727
datasetMetadata: dict[str, Any] | None = None
2828
qos: dict[str, Any] | None = None
29-
log: list[str] | None = None
29+
log: list[tuple[str, str]] | None = None
3030

3131

3232
class StatusInfo(ogc_api_processes_fastapi.models.StatusInfo):
@@ -56,3 +56,4 @@ class JobList(ogc_api_processes_fastapi.models.JobList):
5656
class Exception(ogc_api_processes_fastapi.models.Exception):
5757
trace_id: str | None = None
5858
traceback: str | None = None
59+
messages: list[tuple[str, str]] | None = None

cads_processing_api_service/utils.py

Lines changed: 33 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@
1515
# limitations under the License.
1616

1717
import base64
18+
import datetime
1819
import enum
19-
import json
2020
from typing import Any, Callable, Mapping
2121

2222
import cachetools
@@ -429,7 +429,8 @@ def dictify_job(request: cads_broker.database.SystemRequest) -> dict[str, Any]:
429429

430430

431431
def get_job_from_broker_db(
432-
job_id: str, session: sqlalchemy.orm.Session
432+
job_id: str,
433+
session: sqlalchemy.orm.Session,
433434
) -> cads_broker.SystemRequest:
434435
"""Get job description from the Broker database.
435436
@@ -467,7 +468,9 @@ def get_job_from_broker_db(
467468
return job
468469

469470

470-
def get_results_from_job(job: cads_broker.SystemRequest) -> dict[str, Any]:
471+
def get_results_from_job(
472+
job: cads_broker.SystemRequest, session: sqlalchemy.orm.Session
473+
) -> dict[str, Any]:
471474
"""Get job results description from SystemRequest instance.
472475
473476
Parameters
@@ -498,9 +501,13 @@ def get_results_from_job(job: cads_broker.SystemRequest) -> dict[str, Any]:
498501
detail=f"results of job {job_id} expired"
499502
)
500503
elif job_status == "failed":
504+
error_messages = get_job_events(
505+
job=job, session=session, event_type="user_visible_error"
506+
)
507+
traceback = "\n".join([message[1] for message in error_messages])
501508
raise ogc_api_processes_fastapi.exceptions.JobResultsFailed(
502509
status_code=fastapi.status.HTTP_400_BAD_REQUEST,
503-
traceback=str(job.response_error["message"]), # type: ignore
510+
traceback=traceback,
504511
)
505512
elif job_status in ("accepted", "running"):
506513
raise ogc_api_processes_fastapi.exceptions.ResultsNotReady(
@@ -509,15 +516,17 @@ def get_results_from_job(job: cads_broker.SystemRequest) -> dict[str, Any]:
509516
return results
510517

511518

512-
def parse_results_from_broker_db(job: cads_broker.SystemRequest) -> dict[str, Any]:
519+
def parse_results_from_broker_db(
520+
job: cads_broker.SystemRequest, session: sqlalchemy.orm.Session
521+
) -> dict[str, Any]:
513522
try:
514-
results = get_results_from_job(job=job)
523+
results = get_results_from_job(job=job, session=session)
515524
except ogc_api_processes_fastapi.exceptions.OGCAPIException as exc:
516525
results = exceptions.format_exception_content(exc=exc)
517526
return results
518527

519528

520-
def collect_job_qos_info(
529+
def get_job_qos_info(
521530
job: cads_broker.SystemRequest, session: sqlalchemy.orm.Session
522531
) -> dict[str, Any]:
523532
entry_point = str(job.entry_point)
@@ -560,13 +569,22 @@ def collect_job_qos_info(
560569
return qos
561570

562571

563-
def extract_job_log(job: cads_broker.SystemRequest) -> list[str]:
564-
log = []
565-
if job.response_user_visible_log:
566-
job_log = json.loads(str(job.response_user_visible_log))
567-
for log_timestamp, log_message in job_log:
568-
log.append(log_message)
569-
return log
572+
def get_job_events(
573+
job: cads_broker.SystemRequest,
574+
session: sqlalchemy.orm.Session,
575+
event_type: str | None = None,
576+
start_time: datetime.datetime | None = None,
577+
) -> list[tuple[datetime.datetime, str]]:
578+
events = []
579+
request_uid = str(job.request_uid)
580+
request_events: list[
581+
cads_broker.database.Events
582+
] = cads_broker.database.get_events_from_request(
583+
request_uid, session, event_type, start_time
584+
)
585+
for request_event in request_events:
586+
events.append((request_event.timestamp, request_event.message))
587+
return events # type: ignore
570588

571589

572590
def make_status_info(
@@ -575,7 +593,7 @@ def make_status_info(
575593
results: dict[str, Any] | None = None,
576594
dataset_metadata: dict[str, Any] | None = None,
577595
qos: dict[str, Any] | None = None,
578-
log: list[str] | None = None,
596+
log: list[tuple[str, str]] | None = None,
579597
) -> models.StatusInfo:
580598
"""Compose job's status information.
581599

tests/test_30_utils.py

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,7 @@ def test_get_job_from_broker_db() -> None:
241241

242242

243243
def test_get_results_from_job() -> None:
244+
mock_session = unittest.mock.Mock(spec=sqlalchemy.orm.Session)
244245
job = cads_broker.SystemRequest(
245246
**{
246247
"status": "successful",
@@ -250,27 +251,34 @@ def test_get_results_from_job() -> None:
250251
),
251252
}
252253
)
253-
results = utils.get_results_from_job(job)
254+
results = utils.get_results_from_job(job, session=mock_session)
254255
exp_results = {"asset": {"value": {"key": "value"}}}
255256
assert results == exp_results
256257

257258
job = cads_broker.SystemRequest(
258259
**{
259260
"status": "failed",
260261
"request_uid": "1234",
261-
"response_error": {"message": "traceback"},
262262
}
263263
)
264-
with pytest.raises(ogc_api_processes_fastapi.exceptions.JobResultsFailed):
265-
results = utils.get_results_from_job(job)
264+
with pytest.raises(ogc_api_processes_fastapi.exceptions.JobResultsFailed) as exc:
265+
with unittest.mock.patch(
266+
"cads_processing_api_service.utils.get_job_events"
267+
) as mock_get_job_events:
268+
mock_get_job_events.return_value = [
269+
"2024-01-01T16:20:12.175021",
270+
"error message",
271+
]
272+
results = utils.get_results_from_job(job, session=mock_session)
273+
assert exc.value.traceback == "error message"
266274

267275
job = cads_broker.SystemRequest(**{"status": "accepted", "request_uid": "1234"})
268276
with pytest.raises(ogc_api_processes_fastapi.exceptions.ResultsNotReady):
269-
results = utils.get_results_from_job(job)
277+
results = utils.get_results_from_job(job, session=mock_session)
270278

271279
job = cads_broker.SystemRequest(**{"status": "running", "request_uid": "1234"})
272280
with pytest.raises(ogc_api_processes_fastapi.exceptions.ResultsNotReady):
273-
results = utils.get_results_from_job(job)
281+
results = utils.get_results_from_job(job, session=mock_session)
274282

275283

276284
def test_make_status_info() -> None:

0 commit comments

Comments
 (0)