Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions cads_processing_api_service/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def get_auth_header(pat: str | None = None, jwt: str | None = None) -> tuple[str
Parameters
----------
pat : str | None, optional
Personal Access Token
API Token
jwt : str | None, optional
JSON Web Token

Expand Down Expand Up @@ -125,19 +125,24 @@ def authenticate_user(

def get_auth_info(
pat: str | None = fastapi.Header(
None, description="Personal Access Token", alias="PRIVATE-TOKEN"
None, description="API Token.", alias="PRIVATE-TOKEN"
),
jwt: str | None = fastapi.Header(
None, description="JSON Web Token", alias="Authorization"
None,
description="JSON Web Token",
alias="Authorization",
include_in_schema=False,
),
portal_header: str | None = fastapi.Header(
None, alias=SETTINGS.portal_header_name, include_in_schema=False
),
portal_header: str | None = fastapi.Header(None, alias=SETTINGS.portal_header_name),
) -> models.AuthInfo | None:
"""Get authentication information from the incoming HTTP request.

Parameters
----------
pat : str | None, optional
Personal Access Token
API Token
jwt : str | None, optional
JSON Web Token
portal_header : str | None, optional
Expand Down
54 changes: 39 additions & 15 deletions cads_processing_api_service/clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,12 @@ class DatabaseClient(ogc_api_processes_fastapi.clients.BaseClient):
@exceptions.exception_logger
def get_processes(
self,
limit: int | None = fastapi.Query(10, ge=1, le=10000),
limit: int | None = fastapi.Query(
10, ge=1, le=10000, description="Maximum number of results to return."
),
sortby: utils.ProcessSortCriterion | None = fastapi.Query(
utils.ProcessSortCriterion.resource_uid_asc
utils.ProcessSortCriterion.resource_uid_asc,
description="Sorting criterion.",
),
cursor: str | None = fastapi.Query(None, include_in_schema=False),
back: bool | None = fastapi.Query(None, include_in_schema=False),
Expand Down Expand Up @@ -148,7 +151,7 @@ def get_processes(
def get_process(
self,
response: fastapi.Response,
process_id: str = fastapi.Path(...),
process_id: str = fastapi.Path(..., description="Process identifier."),
portals: tuple[str] | None = fastapi.Depends(utils.get_portals),
) -> ogc_api_processes_fastapi.models.ProcessDescription:
"""Implement OGC API - Processes `GET /processes/{process_id}` endpoint.
Expand Down Expand Up @@ -201,7 +204,7 @@ def get_process(
def post_process_execution(
self,
request: fastapi.Request,
process_id: str = fastapi.Path(...),
process_id: str = fastapi.Path(..., description="Process identifier."),
execution_content: models.Execute = fastapi.Body(...),
auth_info: models.AuthInfo = fastapi.Depends(auth.get_auth_info),
) -> models.StatusInfo:
Expand Down Expand Up @@ -330,18 +333,30 @@ def post_process_execution(
@exceptions.exception_logger
def get_jobs(
self,
processID: list[str] | None = fastapi.Query(None),
processID: list[str] | None = fastapi.Query(
None,
description=(
"Processes identifiers. Only jobs associated to the specified "
"processes shall be included in the response."
),
),
status: list[models.StatusCode] | None = fastapi.Query(
[
ogc_api_processes_fastapi.models.StatusCode.accepted,
ogc_api_processes_fastapi.models.StatusCode.running,
ogc_api_processes_fastapi.models.StatusCode.successful,
ogc_api_processes_fastapi.models.StatusCode.failed,
]
],
description=(
"Job statuses. Only jobs with the specified statuses shall be included in "
"the response."
),
),
limit: int | None = fastapi.Query(
10, ge=1, le=10000, description="Maximum number of results to return."
),
limit: int | None = fastapi.Query(10, ge=1, le=10000),
sortby: utils.JobSortCriterion | None = fastapi.Query(
utils.JobSortCriterion.created_at_desc
utils.JobSortCriterion.created_at_desc, description="Sorting criterion."
),
cursor: str | None = fastapi.Query(None, include_in_schema=False),
back: bool | None = fastapi.Query(None, include_in_schema=False),
Expand Down Expand Up @@ -459,12 +474,21 @@ def get_jobs(
@exceptions.exception_logger
def get_job(
self,
job_id: str = fastapi.Path(...),
qos: bool = fastapi.Query(False),
request: bool = fastapi.Query(False),
log: bool = fastapi.Query(False),
job_id: str = fastapi.Path(..., description="Job identifier."),
qos: bool = fastapi.Query(
False, description="Whether to include job qos info in the response."
),
request: bool = fastapi.Query(
False,
description="Whether to include the sumbitted request in the response.",
),
log: bool = fastapi.Query(
False, description="Whether to include the job's log in the response."
),
log_start_time: datetime.datetime | None = fastapi.Query(
None, alias="logStartTime"
None,
alias="logStartTime",
description="Datetime of the first log message to be returned.",
),
auth_info: models.AuthInfo = fastapi.Depends(auth.get_auth_info),
) -> models.StatusInfo:
Expand Down Expand Up @@ -589,7 +613,7 @@ def get_job(
@exceptions.exception_logger
def get_job_results(
self,
job_id: str = fastapi.Path(...),
job_id: str = fastapi.Path(..., description="Job identifier."),
auth_info: models.AuthInfo = fastapi.Depends(auth.get_auth_info),
) -> ogc_api_processes_fastapi.models.Results:
"""Implement OGC API - Processes `GET /jobs/{job_id}/results` endpoint.
Expand Down Expand Up @@ -652,7 +676,7 @@ def get_job_results(
@exceptions.exception_logger
def delete_job(
self,
job_id: str = fastapi.Path(...),
job_id: str = fastapi.Path(..., description="Job identifier."),
auth_info: models.AuthInfo = fastapi.Depends(auth.get_auth_info),
) -> ogc_api_processes_fastapi.models.StatusInfo:
"""Implement OGC API - Processes `DELETE /jobs/{job_id}` endpoint.
Expand Down
15 changes: 15 additions & 0 deletions cads_processing_api_service/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,18 @@

logger: structlog.stdlib.BoundLogger = structlog.get_logger(__name__)

API_TITLE = "CADS Processing API"
API_DESCRIPTION = (
"This REST API service enables the submission of processing tasks (data retrieval) to the "
"CADS system, and their consequent monitoring and management. "
"The service is based on the [OGC API - Processes standard](https://ogcapi.ogc.org/processes/).\n\n"
"Being based on the OGC API - Processes standard, some terminology is inherited from it. "
"In the context of this specific API, each _process_ is associated with a specific dataset "
"and enables the retrieval of data from that dataset: as such, each _process_ identifier "
"corresponds to a specific dataset identifier.\n"
"A _job_, instead, is a specific data retrieval task that has been submitted for execution."
)

API_REQUEST_TEMPLATE = """import cdsapi

dataset = "{process_id}"
Expand Down Expand Up @@ -189,6 +201,9 @@ def profiles_api_url(self) -> str:
cache_resources_maxsize: int = 1000
cache_resources_ttl: int = 10

api_title: str = API_TITLE
api_description: str = API_DESCRIPTION

api_request_template: str = API_REQUEST_TEMPLATE
api_request_max_list_length: dict[str, int] = API_REQUEST_MAX_LIST_LENGTH
missing_dataset_title: str = "Dataset not available"
Expand Down
2 changes: 1 addition & 1 deletion cads_processing_api_service/constraints.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

@exceptions.exception_logger
def apply_constraints(
process_id: str = fastapi.Path(...),
process_id: str = fastapi.Path(..., description="Process identifier."),
execution_content: models.Execute = fastapi.Body(...),
portals: tuple[str] | None = fastapi.Depends(utils.get_portals),
) -> dict[str, Any]:
Expand Down
6 changes: 3 additions & 3 deletions cads_processing_api_service/costing.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ class RequestOrigin(str, enum.Enum):

@exceptions.exception_logger
def estimate_cost(
process_id: str = fastapi.Path(...),
request_origin: RequestOrigin = fastapi.Query("api"),
mandatory_inputs: bool = fastapi.Query(False),
process_id: str = fastapi.Path(..., description="Process identifier."),
request_origin: RequestOrigin = fastapi.Query("api", include_in_schema=False),
mandatory_inputs: bool = fastapi.Query(False, include_in_schema=False),
execution_content: models.Execute = fastapi.Body(...),
portals: tuple[str] | None = fastapi.Depends(utils.get_portals),
) -> models.RequestCost:
Expand Down
9 changes: 7 additions & 2 deletions cads_processing_api_service/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,12 @@ async def lifespan(application: fastapi.FastAPI) -> AsyncGenerator[Any, None]:

logger = structlog.get_logger(__name__)


app = ogc_api_processes_fastapi.instantiate_app(
client=clients.DatabaseClient(), # type: ignore
exception_handler=exceptions.exception_handler,
title=SETTINGS.api_title,
description=SETTINGS.api_description,
)
app = exceptions.include_exception_handlers(app)
# FIXME : "app.router.lifespan_context" is not officially supported and would likely break
Expand All @@ -91,11 +94,13 @@ async def lifespan(application: fastapi.FastAPI) -> AsyncGenerator[Any, None]:
app.router.add_api_route(
"/processes/{process_id}/api-request",
translators.get_api_request,
description="Get API request equivalent to the submitted prrocess execution json.",
description="Get API request equivalent to the submitted process execution json.",
methods=["POST"],
)

app.router.add_api_route("/metrics", starlette_exporter.handle_metrics)
app.router.add_api_route(
"/metrics", starlette_exporter.handle_metrics, include_in_schema=False
)
app.add_middleware(middlewares.ProcessingPrometheusMiddleware, group_paths=True)


Expand Down
2 changes: 1 addition & 1 deletion cads_processing_api_service/translators.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ def format_api_request(

@exceptions.exception_logger
def get_api_request(
process_id: str = fastapi.Path(...),
process_id: str = fastapi.Path(..., description="Process identifier."),
request: dict[str, Any] = fastapi.Body(...),
) -> dict[str, str]:
"""Get CADS API request equivalent to the provided processing request.
Expand Down
4 changes: 3 additions & 1 deletion cads_processing_api_service/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,9 @@ def make_status_info(


def get_portals(
portal_header: str | None = fastapi.Header(None, alias=SETTINGS.portal_header_name),
portal_header: str | None = fastapi.Header(
None, alias=SETTINGS.portal_header_name, include_in_schema=False
),
) -> tuple[str, ...] | None:
"""Get the list of portals from the incoming HTTP request's header.

Expand Down