diff --git a/diracx-client/src/diracx/client/_generated/aio/operations/_operations.py b/diracx-client/src/diracx/client/_generated/aio/operations/_operations.py index a87316174..c0b04fa41 100644 --- a/diracx-client/src/diracx/client/_generated/aio/operations/_operations.py +++ b/diracx-client/src/diracx/client/_generated/aio/operations/_operations.py @@ -1,4 +1,4 @@ -# pylint: disable=too-many-lines +# pylint: disable=line-too-long,useless-suppression,too-many-lines # coding=utf-8 # -------------------------------------------------------------------------- # Code generated by Microsoft (R) AutoRest Code Generator (autorest: 3.10.4, generator: @autorest/python@6.34.2) @@ -40,6 +40,10 @@ build_config_serve_config_request, build_jobs_add_heartbeat_request, build_jobs_assign_sandbox_to_job_request, + build_jobs_get_input_data_request, + build_jobs_get_job_heartbeat_info_request, + build_jobs_get_job_jdl_request, + build_jobs_get_job_parameters_request, build_jobs_get_job_sandbox_request, build_jobs_get_job_sandboxes_request, build_jobs_get_sandbox_file_request, @@ -2260,6 +2264,232 @@ async def summary(self, body: Union[_models.SummaryParams, IO[bytes]], **kwargs: return deserialized # type: ignore + @distributed_trace_async + async def get_input_data(self, job_id: int, **kwargs: Any) -> List[Dict[str, Any]]: + """Get Input Data. + + Fetches a job's input data. + + :param job_id: Required. + :type job_id: int + :return: list of dict mapping str to any + :rtype: list[dict[str, any]] + :raises ~azure.core.exceptions.HttpResponseError: + """ + error_map: MutableMapping = { + 401: ClientAuthenticationError, + 404: ResourceNotFoundError, + 409: ResourceExistsError, + 304: ResourceNotModifiedError, + } + error_map.update(kwargs.pop("error_map", {}) or {}) + + _headers = kwargs.pop("headers", {}) or {} + _params = kwargs.pop("params", {}) or {} + + cls: ClsType[List[Dict[str, Any]]] = kwargs.pop("cls", None) + + _request = build_jobs_get_input_data_request( + job_id=job_id, + headers=_headers, + params=_params, + ) + _request.url = self._client.format_url(_request.url) + + _stream = False + pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access + _request, stream=_stream, **kwargs + ) + + response = pipeline_response.http_response + + if response.status_code not in [200]: + map_error(status_code=response.status_code, response=response, error_map=error_map) + raise HttpResponseError(response=response) + + deserialized = self._deserialize("[{object}]", pipeline_response.http_response) + + if cls: + return cls(pipeline_response, deserialized, {}) # type: ignore + + return deserialized # type: ignore + + @distributed_trace_async + async def get_job_parameters(self, job_id: int, **kwargs: Any) -> List[Dict[str, Any]]: + """Get Job Parameters. + + Get job parameters. + + :param job_id: Required. + :type job_id: int + :return: list of dict mapping str to any + :rtype: list[dict[str, any]] + :raises ~azure.core.exceptions.HttpResponseError: + """ + error_map: MutableMapping = { + 401: ClientAuthenticationError, + 404: ResourceNotFoundError, + 409: ResourceExistsError, + 304: ResourceNotModifiedError, + } + error_map.update(kwargs.pop("error_map", {}) or {}) + + _headers = kwargs.pop("headers", {}) or {} + _params = kwargs.pop("params", {}) or {} + + cls: ClsType[List[Dict[str, Any]]] = kwargs.pop("cls", None) + + _request = build_jobs_get_job_parameters_request( + job_id=job_id, + headers=_headers, + params=_params, + ) + _request.url = self._client.format_url(_request.url) + + _stream = False + pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access + _request, stream=_stream, **kwargs + ) + + response = pipeline_response.http_response + + if response.status_code not in [200]: + map_error(status_code=response.status_code, response=response, error_map=error_map) + raise HttpResponseError(response=response) + + deserialized = self._deserialize("[{object}]", pipeline_response.http_response) + + if cls: + return cls(pipeline_response, deserialized, {}) # type: ignore + + return deserialized # type: ignore + + @distributed_trace_async + async def get_job_jdl( + self, job_id: int, **kwargs: Any + ) -> Dict[str, _models.PathsZ91EtjApiJobsJobIdJdlGetResponses200ContentApplicationJsonSchemaAdditionalproperties]: + """Get Job Jdl. + + Get job JDLs. + + :param job_id: Required. + :type job_id: int + :return: dict mapping str to + PathsZ91EtjApiJobsJobIdJdlGetResponses200ContentApplicationJsonSchemaAdditionalproperties + :rtype: dict[str, + ~_generated.models.PathsZ91EtjApiJobsJobIdJdlGetResponses200ContentApplicationJsonSchemaAdditionalproperties] + :raises ~azure.core.exceptions.HttpResponseError: + """ + error_map: MutableMapping = { + 401: ClientAuthenticationError, + 404: ResourceNotFoundError, + 409: ResourceExistsError, + 304: ResourceNotModifiedError, + } + error_map.update(kwargs.pop("error_map", {}) or {}) + + _headers = kwargs.pop("headers", {}) or {} + _params = kwargs.pop("params", {}) or {} + + cls: ClsType[ + Dict[str, _models.PathsZ91EtjApiJobsJobIdJdlGetResponses200ContentApplicationJsonSchemaAdditionalproperties] + ] = kwargs.pop("cls", None) + + _request = build_jobs_get_job_jdl_request( + job_id=job_id, + headers=_headers, + params=_params, + ) + _request.url = self._client.format_url(_request.url) + + _stream = False + pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access + _request, stream=_stream, **kwargs + ) + + response = pipeline_response.http_response + + if response.status_code not in [200]: + map_error(status_code=response.status_code, response=response, error_map=error_map) + raise HttpResponseError(response=response) + + deserialized = self._deserialize( + "{PathsZ91EtjApiJobsJobIdJdlGetResponses200ContentApplicationJsonSchemaAdditionalproperties}", + pipeline_response.http_response, + ) + + if cls: + return cls(pipeline_response, deserialized, {}) # type: ignore + + return deserialized # type: ignore + + @distributed_trace_async + async def get_job_heartbeat_info(self, job_id: int, **kwargs: Any) -> List[ + Dict[ + str, + _models.PathsLs6HcyApiJobsJobIdHeartbeatGetResponses200ContentApplicationJsonSchemaItemsAdditionalproperties, + ] + ]: + """Get Job Heartbeat Info. + + Get job heartbeat info. + + :param job_id: Required. + :type job_id: int + :return: list of dict mapping str to + PathsLs6HcyApiJobsJobIdHeartbeatGetResponses200ContentApplicationJsonSchemaItemsAdditionalproperties + :rtype: list[dict[str, + ~_generated.models.PathsLs6HcyApiJobsJobIdHeartbeatGetResponses200ContentApplicationJsonSchemaItemsAdditionalproperties]] + :raises ~azure.core.exceptions.HttpResponseError: + """ + error_map: MutableMapping = { + 401: ClientAuthenticationError, + 404: ResourceNotFoundError, + 409: ResourceExistsError, + 304: ResourceNotModifiedError, + } + error_map.update(kwargs.pop("error_map", {}) or {}) + + _headers = kwargs.pop("headers", {}) or {} + _params = kwargs.pop("params", {}) or {} + + cls: ClsType[ + List[ + Dict[ + str, + _models.PathsLs6HcyApiJobsJobIdHeartbeatGetResponses200ContentApplicationJsonSchemaItemsAdditionalproperties, + ] + ] + ] = kwargs.pop("cls", None) + + _request = build_jobs_get_job_heartbeat_info_request( + job_id=job_id, + headers=_headers, + params=_params, + ) + _request.url = self._client.format_url(_request.url) + + _stream = False + pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access + _request, stream=_stream, **kwargs + ) + + response = pipeline_response.http_response + + if response.status_code not in [200]: + map_error(status_code=response.status_code, response=response, error_map=error_map) + raise HttpResponseError(response=response) + + deserialized = self._deserialize( + "[{PathsLs6HcyApiJobsJobIdHeartbeatGetResponses200ContentApplicationJsonSchemaItemsAdditionalproperties}]", + pipeline_response.http_response, + ) + + if cls: + return cls(pipeline_response, deserialized, {}) # type: ignore + + return deserialized # type: ignore + @overload async def submit_jdl_jobs( self, body: List[str], *, content_type: str = "application/json", **kwargs: Any diff --git a/diracx-client/src/diracx/client/_generated/models/__init__.py b/diracx-client/src/diracx/client/_generated/models/__init__.py index 06de02aab..2e2443b7a 100644 --- a/diracx-client/src/diracx/client/_generated/models/__init__.py +++ b/diracx-client/src/diracx/client/_generated/models/__init__.py @@ -27,6 +27,8 @@ JobStatusUpdate, Metadata, OpenIDConfiguration, + PathsLs6HcyApiJobsJobIdHeartbeatGetResponses200ContentApplicationJsonSchemaItemsAdditionalproperties, + PathsZ91EtjApiJobsJobIdJdlGetResponses200ContentApplicationJsonSchemaAdditionalproperties, SandboxDownloadResponse, SandboxInfo, SandboxUploadResponse, @@ -78,6 +80,8 @@ "JobStatusUpdate", "Metadata", "OpenIDConfiguration", + "PathsLs6HcyApiJobsJobIdHeartbeatGetResponses200ContentApplicationJsonSchemaItemsAdditionalproperties", + "PathsZ91EtjApiJobsJobIdJdlGetResponses200ContentApplicationJsonSchemaAdditionalproperties", "SandboxDownloadResponse", "SandboxInfo", "SandboxUploadResponse", diff --git a/diracx-client/src/diracx/client/_generated/models/_models.py b/diracx-client/src/diracx/client/_generated/models/_models.py index fc909fe5a..b55139ed5 100644 --- a/diracx-client/src/diracx/client/_generated/models/_models.py +++ b/diracx-client/src/diracx/client/_generated/models/_models.py @@ -886,6 +886,18 @@ def __init__( self.code_challenge_methods_supported = code_challenge_methods_supported +class PathsLs6HcyApiJobsJobIdHeartbeatGetResponses200ContentApplicationJsonSchemaItemsAdditionalproperties( + _serialization.Model +): # pylint: disable=name-too-long + """PathsLs6HcyApiJobsJobIdHeartbeatGetResponses200ContentApplicationJsonSchemaItemsAdditionalproperties.""" + + +class PathsZ91EtjApiJobsJobIdJdlGetResponses200ContentApplicationJsonSchemaAdditionalproperties( + _serialization.Model +): # pylint: disable=name-too-long + """PathsZ91EtjApiJobsJobIdJdlGetResponses200ContentApplicationJsonSchemaAdditionalproperties.""" + + class SandboxDownloadResponse(_serialization.Model): """SandboxDownloadResponse. diff --git a/diracx-client/src/diracx/client/_generated/operations/_operations.py b/diracx-client/src/diracx/client/_generated/operations/_operations.py index b8800ca84..823ae5c8a 100644 --- a/diracx-client/src/diracx/client/_generated/operations/_operations.py +++ b/diracx-client/src/diracx/client/_generated/operations/_operations.py @@ -1,4 +1,4 @@ -# pylint: disable=too-many-lines +# pylint: disable=line-too-long,useless-suppression,too-many-lines # coding=utf-8 # -------------------------------------------------------------------------- # Code generated by Microsoft (R) AutoRest Code Generator (autorest: 3.10.4, generator: @autorest/python@6.34.2) @@ -569,6 +569,84 @@ def build_jobs_summary_request(**kwargs: Any) -> HttpRequest: return HttpRequest(method="POST", url=_url, headers=_headers, **kwargs) +def build_jobs_get_input_data_request(job_id: int, **kwargs: Any) -> HttpRequest: + _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) + + accept = _headers.pop("Accept", "application/json") + + # Construct URL + _url = "/api/jobs/{job_id}/input-data" + path_format_arguments = { + "job_id": _SERIALIZER.url("job_id", job_id, "int"), + } + + _url: str = _url.format(**path_format_arguments) # type: ignore + + # Construct headers + _headers["Accept"] = _SERIALIZER.header("accept", accept, "str") + + return HttpRequest(method="GET", url=_url, headers=_headers, **kwargs) + + +def build_jobs_get_job_parameters_request(job_id: int, **kwargs: Any) -> HttpRequest: + _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) + + accept = _headers.pop("Accept", "application/json") + + # Construct URL + _url = "/api/jobs/{job_id}/parameters" + path_format_arguments = { + "job_id": _SERIALIZER.url("job_id", job_id, "int"), + } + + _url: str = _url.format(**path_format_arguments) # type: ignore + + # Construct headers + _headers["Accept"] = _SERIALIZER.header("accept", accept, "str") + + return HttpRequest(method="GET", url=_url, headers=_headers, **kwargs) + + +def build_jobs_get_job_jdl_request(job_id: int, **kwargs: Any) -> HttpRequest: + _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) + + accept = _headers.pop("Accept", "application/json") + + # Construct URL + _url = "/api/jobs/{job_id}/jdl" + path_format_arguments = { + "job_id": _SERIALIZER.url("job_id", job_id, "int"), + } + + _url: str = _url.format(**path_format_arguments) # type: ignore + + # Construct headers + _headers["Accept"] = _SERIALIZER.header("accept", accept, "str") + + return HttpRequest(method="GET", url=_url, headers=_headers, **kwargs) + + +def build_jobs_get_job_heartbeat_info_request( # pylint: disable=name-too-long + job_id: int, **kwargs: Any +) -> HttpRequest: + _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) + + accept = _headers.pop("Accept", "application/json") + + # Construct URL + _url = "/api/jobs/{job_id}/heartbeat" + path_format_arguments = { + "job_id": _SERIALIZER.url("job_id", job_id, "int"), + } + + _url: str = _url.format(**path_format_arguments) # type: ignore + + # Construct headers + _headers["Accept"] = _SERIALIZER.header("accept", accept, "str") + + return HttpRequest(method="GET", url=_url, headers=_headers, **kwargs) + + def build_jobs_submit_jdl_jobs_request(**kwargs: Any) -> HttpRequest: _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) @@ -2779,6 +2857,232 @@ def summary(self, body: Union[_models.SummaryParams, IO[bytes]], **kwargs: Any) return deserialized # type: ignore + @distributed_trace + def get_input_data(self, job_id: int, **kwargs: Any) -> List[Dict[str, Any]]: + """Get Input Data. + + Fetches a job's input data. + + :param job_id: Required. + :type job_id: int + :return: list of dict mapping str to any + :rtype: list[dict[str, any]] + :raises ~azure.core.exceptions.HttpResponseError: + """ + error_map: MutableMapping = { + 401: ClientAuthenticationError, + 404: ResourceNotFoundError, + 409: ResourceExistsError, + 304: ResourceNotModifiedError, + } + error_map.update(kwargs.pop("error_map", {}) or {}) + + _headers = kwargs.pop("headers", {}) or {} + _params = kwargs.pop("params", {}) or {} + + cls: ClsType[List[Dict[str, Any]]] = kwargs.pop("cls", None) + + _request = build_jobs_get_input_data_request( + job_id=job_id, + headers=_headers, + params=_params, + ) + _request.url = self._client.format_url(_request.url) + + _stream = False + pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access + _request, stream=_stream, **kwargs + ) + + response = pipeline_response.http_response + + if response.status_code not in [200]: + map_error(status_code=response.status_code, response=response, error_map=error_map) + raise HttpResponseError(response=response) + + deserialized = self._deserialize("[{object}]", pipeline_response.http_response) + + if cls: + return cls(pipeline_response, deserialized, {}) # type: ignore + + return deserialized # type: ignore + + @distributed_trace + def get_job_parameters(self, job_id: int, **kwargs: Any) -> List[Dict[str, Any]]: + """Get Job Parameters. + + Get job parameters. + + :param job_id: Required. + :type job_id: int + :return: list of dict mapping str to any + :rtype: list[dict[str, any]] + :raises ~azure.core.exceptions.HttpResponseError: + """ + error_map: MutableMapping = { + 401: ClientAuthenticationError, + 404: ResourceNotFoundError, + 409: ResourceExistsError, + 304: ResourceNotModifiedError, + } + error_map.update(kwargs.pop("error_map", {}) or {}) + + _headers = kwargs.pop("headers", {}) or {} + _params = kwargs.pop("params", {}) or {} + + cls: ClsType[List[Dict[str, Any]]] = kwargs.pop("cls", None) + + _request = build_jobs_get_job_parameters_request( + job_id=job_id, + headers=_headers, + params=_params, + ) + _request.url = self._client.format_url(_request.url) + + _stream = False + pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access + _request, stream=_stream, **kwargs + ) + + response = pipeline_response.http_response + + if response.status_code not in [200]: + map_error(status_code=response.status_code, response=response, error_map=error_map) + raise HttpResponseError(response=response) + + deserialized = self._deserialize("[{object}]", pipeline_response.http_response) + + if cls: + return cls(pipeline_response, deserialized, {}) # type: ignore + + return deserialized # type: ignore + + @distributed_trace + def get_job_jdl( + self, job_id: int, **kwargs: Any + ) -> Dict[str, _models.PathsZ91EtjApiJobsJobIdJdlGetResponses200ContentApplicationJsonSchemaAdditionalproperties]: + """Get Job Jdl. + + Get job JDLs. + + :param job_id: Required. + :type job_id: int + :return: dict mapping str to + PathsZ91EtjApiJobsJobIdJdlGetResponses200ContentApplicationJsonSchemaAdditionalproperties + :rtype: dict[str, + ~_generated.models.PathsZ91EtjApiJobsJobIdJdlGetResponses200ContentApplicationJsonSchemaAdditionalproperties] + :raises ~azure.core.exceptions.HttpResponseError: + """ + error_map: MutableMapping = { + 401: ClientAuthenticationError, + 404: ResourceNotFoundError, + 409: ResourceExistsError, + 304: ResourceNotModifiedError, + } + error_map.update(kwargs.pop("error_map", {}) or {}) + + _headers = kwargs.pop("headers", {}) or {} + _params = kwargs.pop("params", {}) or {} + + cls: ClsType[ + Dict[str, _models.PathsZ91EtjApiJobsJobIdJdlGetResponses200ContentApplicationJsonSchemaAdditionalproperties] + ] = kwargs.pop("cls", None) + + _request = build_jobs_get_job_jdl_request( + job_id=job_id, + headers=_headers, + params=_params, + ) + _request.url = self._client.format_url(_request.url) + + _stream = False + pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access + _request, stream=_stream, **kwargs + ) + + response = pipeline_response.http_response + + if response.status_code not in [200]: + map_error(status_code=response.status_code, response=response, error_map=error_map) + raise HttpResponseError(response=response) + + deserialized = self._deserialize( + "{PathsZ91EtjApiJobsJobIdJdlGetResponses200ContentApplicationJsonSchemaAdditionalproperties}", + pipeline_response.http_response, + ) + + if cls: + return cls(pipeline_response, deserialized, {}) # type: ignore + + return deserialized # type: ignore + + @distributed_trace + def get_job_heartbeat_info(self, job_id: int, **kwargs: Any) -> List[ + Dict[ + str, + _models.PathsLs6HcyApiJobsJobIdHeartbeatGetResponses200ContentApplicationJsonSchemaItemsAdditionalproperties, + ] + ]: + """Get Job Heartbeat Info. + + Get job heartbeat info. + + :param job_id: Required. + :type job_id: int + :return: list of dict mapping str to + PathsLs6HcyApiJobsJobIdHeartbeatGetResponses200ContentApplicationJsonSchemaItemsAdditionalproperties + :rtype: list[dict[str, + ~_generated.models.PathsLs6HcyApiJobsJobIdHeartbeatGetResponses200ContentApplicationJsonSchemaItemsAdditionalproperties]] + :raises ~azure.core.exceptions.HttpResponseError: + """ + error_map: MutableMapping = { + 401: ClientAuthenticationError, + 404: ResourceNotFoundError, + 409: ResourceExistsError, + 304: ResourceNotModifiedError, + } + error_map.update(kwargs.pop("error_map", {}) or {}) + + _headers = kwargs.pop("headers", {}) or {} + _params = kwargs.pop("params", {}) or {} + + cls: ClsType[ + List[ + Dict[ + str, + _models.PathsLs6HcyApiJobsJobIdHeartbeatGetResponses200ContentApplicationJsonSchemaItemsAdditionalproperties, + ] + ] + ] = kwargs.pop("cls", None) + + _request = build_jobs_get_job_heartbeat_info_request( + job_id=job_id, + headers=_headers, + params=_params, + ) + _request.url = self._client.format_url(_request.url) + + _stream = False + pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access + _request, stream=_stream, **kwargs + ) + + response = pipeline_response.http_response + + if response.status_code not in [200]: + map_error(status_code=response.status_code, response=response, error_map=error_map) + raise HttpResponseError(response=response) + + deserialized = self._deserialize( + "[{PathsLs6HcyApiJobsJobIdHeartbeatGetResponses200ContentApplicationJsonSchemaItemsAdditionalproperties}]", + pipeline_response.http_response, + ) + + if cls: + return cls(pipeline_response, deserialized, {}) # type: ignore + + return deserialized # type: ignore + @overload def submit_jdl_jobs( self, body: List[str], *, content_type: str = "application/json", **kwargs: Any diff --git a/diracx-db/src/diracx/db/os/utils.py b/diracx-db/src/diracx/db/os/utils.py index ea5d292e6..d810b1520 100644 --- a/diracx-db/src/diracx/db/os/utils.py +++ b/diracx-db/src/diracx/db/os/utils.py @@ -198,12 +198,24 @@ async def upsert(self, vo: str, doc_id: int, document: Any) -> None: ) async def search( - self, parameters, search, sorts, *, per_page: int = 100, page: int | None = None - ) -> list[dict[str, Any]]: + self, + parameters, + search, + sorts, + *, + per_page: int = 10000, + page: int | None = None, + ) -> tuple[int, list[dict[str, Any]]]: """Search the database for matching results. See the DiracX search API documentation for details. """ + if page: + if page < 1: + raise InvalidQueryError("Page must be a positive integer") + if per_page < 1: + raise InvalidQueryError("Per page must be a positive integer") + body = {} if parameters: body["_source"] = parameters @@ -213,7 +225,12 @@ async def search( for sort in sorts: field_name = sort["parameter"] field_type = self.fields.get(field_name, {}).get("type") - require_type("sort", field_name, field_type, {"keyword", "long", "date"}) + require_type( + "sort", + field_name, + field_type, + {"keyword", "long", "date", "date_nanos"}, + ) body["sort"].append({field_name: {"order": sort["direction"]}}) params = {} @@ -226,17 +243,19 @@ async def search( ) hits = [hit["_source"] for hit in response["hits"]["hits"]] + total_hits = response["hits"]["total"]["value"] + # Dates are returned as strings, convert them to Python datetimes for hit in hits: for field_name in hit: if field_name not in self.fields: continue - if self.fields[field_name]["type"] == "date": + if self.fields[field_name]["type"] in ["date", "date_nanos"]: hit[field_name] = datetime.strptime( hit[field_name], "%Y-%m-%dT%H:%M:%S.%f%z" ) - return hits + return total_hits, hits def require_type(operator, field_name, field_type, allowed_types): diff --git a/diracx-db/src/diracx/db/sql/job/db.py b/diracx-db/src/diracx/db/sql/job/db.py index 01cdb83a1..3296f6f00 100644 --- a/diracx-db/src/diracx/db/sql/job/db.py +++ b/diracx-db/src/diracx/db/sql/job/db.py @@ -70,6 +70,69 @@ async def search( page=page, ) + async def search_input_data( + self, + parameters: list[str] | None, + search: list[SearchSpec], + sorts: list[SortSpec], + *, + distinct: bool = False, + per_page: int = 100, + page: int | None = None, + ) -> tuple[int, list[dict[Any, Any]]]: + """Search for input data in the database.""" + return await self._search( + table=InputData, + parameters=parameters, + search=search, + sorts=sorts, + distinct=distinct, + per_page=per_page, + page=page, + ) + + async def search_jdl( + self, + parameters: list[str] | None, + search: list[SearchSpec], + sorts: list[SortSpec], + *, + distinct: bool = False, + per_page: int = 100, + page: int | None = None, + ) -> tuple[int, list[dict[Any, Any]]]: + """Search for job JDL the database.""" + return await self._search( + table=JobJDLs, + parameters=parameters, + search=search, + sorts=sorts, + distinct=distinct, + per_page=per_page, + page=page, + ) + + async def search_heartbeat_logging_info( + self, + parameters: list[str] | None, + search: list[SearchSpec], + sorts: list[SortSpec], + *, + distinct: bool = False, + per_page: int = 100, + page: int | None = None, + ) -> tuple[int, list[dict[Any, Any]]]: + """Search for heartbeat logging info in the database.""" + return await self._search( + table=HeartBeatLoggingInfo, + parameters=parameters, + search=search, + sorts=sorts, + distinct=distinct, + per_page=per_page, + page=page, + ) + async def create_job(self, compressed_original_jdl: str): """Used to insert a new job with original JDL. Returns inserted job id.""" result = await self.conn.execute( diff --git a/diracx-db/src/diracx/db/sql/utils/__init__.py b/diracx-db/src/diracx/db/sql/utils/__init__.py index 5cbb31b3f..53b3f3c96 100644 --- a/diracx-db/src/diracx/db/sql/utils/__init__.py +++ b/diracx-db/src/diracx/db/sql/utils/__init__.py @@ -7,21 +7,25 @@ apply_search_filters, apply_sort_constraints, ) -from .functions import hash, substract_date, utcnow +from .functions import ( + hash, + substract_date, + utcnow, +) from .types import Column, DateNowColumn, EnumBackedBool, EnumColumn, NullColumn __all__ = ( "_get_columns", - "utcnow", + "apply_search_filters", + "apply_sort_constraints", + "BaseSQLDB", "Column", - "NullColumn", "DateNowColumn", - "BaseSQLDB", "EnumBackedBool", "EnumColumn", - "apply_search_filters", - "apply_sort_constraints", - "substract_date", "hash", + "NullColumn", + "substract_date", "SQLDBUnavailableError", + "utcnow", ) diff --git a/diracx-db/tests/jobs/test_job_db.py b/diracx-db/tests/jobs/test_job_db.py index d95de7205..af4cbcaa3 100644 --- a/diracx-db/tests/jobs/test_job_db.py +++ b/diracx-db/tests/jobs/test_job_db.py @@ -336,3 +336,40 @@ async def test_set_job_commands_invalid_job_id(job_db: JobDB): async with job_db as job_db: with pytest.raises(IntegrityError): await job_db.set_job_commands([(123456, "test_command", "")]) + + +async def test_insert_input_data_with_unknown_job(job_db: JobDB): + async with job_db as job_db: + with pytest.raises(IntegrityError): + await job_db.insert_input_data( + {id: [f"lfn_{id}_{i}" for i in range(5)] for id in range(10)} + ) + + +async def test_insert_input_data_with_known_job(populated_job_db: JobDB): + async with populated_job_db as job_db: + _, jobs = await job_db.search(["JobID"], search=[], sorts=[]) + + job_ids = [job["JobID"] for job in jobs] + + await job_db.insert_input_data( + {id: [f"lfn_{id}_{i}" for i in range(5)] for id in job_ids} + ) + + for job_id in job_ids: + _, jobs = await job_db.search_input_data( + parameters=[], + search=[ + ScalarSearchSpec( + parameter="JobID", + operator=ScalarSearchOperator.EQUAL, + value=job_id, + ) + ], + sorts=[], + ) + + lfns = [job["LFN"] for job in jobs] + + assert len(lfns) == 5 + assert all(f"lfn_{job_id}_{i}" in lfns for i in range(5)) diff --git a/diracx-db/tests/opensearch/test_search.py b/diracx-db/tests/opensearch/test_search.py index 93998ac3e..1663ee2e1 100644 --- a/diracx-db/tests/opensearch/test_search.py +++ b/diracx-db/tests/opensearch/test_search.py @@ -120,15 +120,15 @@ async def prefilled_db(request): async def test_specified_parameters(prefilled_db: DummyOSDB): - results = await prefilled_db.search(None, [], []) - assert len(results) == 3 + total, results = await prefilled_db.search(None, [], []) + assert len(results) == total == 3 assert DOC1 in results and DOC2 in results and DOC3 in results - results = await prefilled_db.search([], [], []) - assert len(results) == 3 + total, results = await prefilled_db.search([], [], []) + assert len(results) == total == 3 assert DOC1 in results and DOC2 in results and DOC3 in results - results = await prefilled_db.search(["IntField"], [], []) + total, results = await prefilled_db.search(["IntField"], [], []) expected_results = [] for doc in [DOC1, DOC2, DOC3]: expected_doc = {key: doc[key] for key in {"IntField"}} @@ -136,58 +136,69 @@ async def test_specified_parameters(prefilled_db: DummyOSDB): # If it is the all() check below no longer makes sense assert expected_doc not in expected_results expected_results.append(expected_doc) - assert len(results) == len(expected_results) + assert len(results) == len(expected_results) == total assert all(result in expected_results for result in results) - results = await prefilled_db.search(["IntField", "UnknownField"], [], []) + total, results = await prefilled_db.search(["IntField", "UnknownField"], [], []) expected_results = [ {"IntField": DOC1["IntField"], "UnknownField": DOC1["UnknownField"]}, {"IntField": DOC2["IntField"], "UnknownField": DOC2["UnknownField"]}, {"IntField": DOC3["IntField"]}, ] - assert len(results) == len(expected_results) + assert len(results) == len(expected_results) == total assert all(result in expected_results for result in results) async def test_pagination_asc(prefilled_db: DummyOSDB): sort = [{"parameter": "IntField", "direction": "asc"}] - results = await prefilled_db.search(None, [], sort) + total, results = await prefilled_db.search(None, [], sort) + assert total == 3 assert results == [DOC3, DOC2, DOC1] # Pagination has no effect if a specific page isn't requested - results = await prefilled_db.search(None, [], sort, per_page=2) + total, results = await prefilled_db.search(None, [], sort, per_page=2) + assert total == 3 assert results == [DOC3, DOC2, DOC1] - results = await prefilled_db.search(None, [], sort, per_page=2, page=1) + total, results = await prefilled_db.search(None, [], sort, per_page=2, page=1) + assert total == 3 assert results == [DOC3, DOC2] - results = await prefilled_db.search(None, [], sort, per_page=2, page=2) + total, results = await prefilled_db.search(None, [], sort, per_page=2, page=2) + assert total == 3 assert results == [DOC1] - results = await prefilled_db.search(None, [], sort, per_page=2, page=3) + total, results = await prefilled_db.search(None, [], sort, per_page=2, page=3) + assert total == 3 assert results == [] - results = await prefilled_db.search(None, [], sort, per_page=1, page=1) + total, results = await prefilled_db.search(None, [], sort, per_page=1, page=1) + assert total == 3 assert results == [DOC3] - results = await prefilled_db.search(None, [], sort, per_page=1, page=2) + total, results = await prefilled_db.search(None, [], sort, per_page=1, page=2) + assert total == 3 assert results == [DOC2] - results = await prefilled_db.search(None, [], sort, per_page=1, page=3) + total, results = await prefilled_db.search(None, [], sort, per_page=1, page=3) + assert total == 3 assert results == [DOC1] - results = await prefilled_db.search(None, [], sort, per_page=1, page=4) + total, results = await prefilled_db.search(None, [], sort, per_page=1, page=4) + assert total == 3 assert results == [] async def test_pagination_desc(prefilled_db: DummyOSDB): sort = [{"parameter": "IntField", "direction": "desc"}] - results = await prefilled_db.search(None, [], sort, per_page=2, page=1) + total, results = await prefilled_db.search(None, [], sort, per_page=2, page=1) + assert total == 3 # 3 Because of pagination assert results == [DOC1, DOC2] - results = await prefilled_db.search(None, [], sort, per_page=2, page=2) + total, results = await prefilled_db.search(None, [], sort, per_page=2, page=2) + assert total == 3 # Still 3 because of pagination assert results == [DOC3] @@ -195,22 +206,26 @@ async def test_eq_filter_long(prefilled_db: DummyOSDB): part = {"parameter": "IntField", "operator": "eq"} # Search for an ID which doesn't exist - results = await prefilled_db.search(None, [part | {"value": "78"}], []) + total, results = await prefilled_db.search(None, [part | {"value": "78"}], []) + assert total == 0 assert results == [] # Check the DB contains what we expect when not filtering - results = await prefilled_db.search(None, [], []) - assert len(results) == 3 + total, results = await prefilled_db.search(None, [], []) + assert total == 3 assert DOC1 in results assert DOC2 in results assert DOC3 in results # Search separately for the two documents which do exist - results = await prefilled_db.search(None, [part | {"value": "1234"}], []) + total, results = await prefilled_db.search(None, [part | {"value": "1234"}], []) + assert total == 1 assert results == [DOC1] - results = await prefilled_db.search(None, [part | {"value": "679"}], []) + total, results = await prefilled_db.search(None, [part | {"value": "679"}], []) + assert total == 1 assert results == [DOC2] - results = await prefilled_db.search(None, [part | {"value": "42"}], []) + total, results = await prefilled_db.search(None, [part | {"value": "42"}], []) + assert total == 1 assert results == [DOC3] @@ -218,31 +233,38 @@ async def test_operators_long(prefilled_db: DummyOSDB): part = {"parameter": "IntField"} query = part | {"operator": "neq", "value": "1234"} - results = await prefilled_db.search(["IntField"], [query], []) + total, results = await prefilled_db.search(["IntField"], [query], []) + assert total == 2 assert {x["IntField"] for x in results} == {DOC2["IntField"], DOC3["IntField"]} query = part | {"operator": "in", "values": ["1234", "42"]} - results = await prefilled_db.search(["IntField"], [query], []) + total, results = await prefilled_db.search(["IntField"], [query], []) + assert total == 2 assert {x["IntField"] for x in results} == {DOC1["IntField"], DOC3["IntField"]} query = part | {"operator": "not in", "values": ["1234", "42"]} - results = await prefilled_db.search(["IntField"], [query], []) + total, results = await prefilled_db.search(["IntField"], [query], []) + assert total == 1 assert {x["IntField"] for x in results} == {DOC2["IntField"]} query = part | {"operator": "lt", "value": "1234"} - results = await prefilled_db.search(["IntField"], [query], []) + total, results = await prefilled_db.search(["IntField"], [query], []) + assert total == 2 assert {x["IntField"] for x in results} == {DOC2["IntField"], DOC3["IntField"]} query = part | {"operator": "lt", "value": "679"} - results = await prefilled_db.search(["IntField"], [query], []) + total, results = await prefilled_db.search(["IntField"], [query], []) + assert total == 1 assert {x["IntField"] for x in results} == {DOC3["IntField"]} query = part | {"operator": "gt", "value": "1234"} - results = await prefilled_db.search(["IntField"], [query], []) + total, results = await prefilled_db.search(["IntField"], [query], []) + assert total == 0 assert {x["IntField"] for x in results} == set() query = part | {"operator": "lt", "value": "42"} - results = await prefilled_db.search(["IntField"], [query], []) + total, results = await prefilled_db.search(["IntField"], [query], []) + assert total == 0 assert {x["IntField"] for x in results} == set() @@ -250,11 +272,13 @@ async def test_operators_date(prefilled_db: DummyOSDB): part = {"parameter": "DateField"} query = part | {"operator": "eq", "value": DOC3["DateField"]} - results = await prefilled_db.search(["IntField"], [query], []) + total, results = await prefilled_db.search(["IntField"], [query], []) + assert total == 1 assert {x["IntField"] for x in results} == {DOC3["IntField"]} query = part | {"operator": "neq", "value": DOC2["DateField"]} - results = await prefilled_db.search(["IntField"], [query], []) + total, results = await prefilled_db.search(["IntField"], [query], []) + assert total == 2 assert {x["IntField"] for x in results} == {DOC1["IntField"], DOC3["IntField"]} doc1_time = DOC1["DateField"].strftime("%Y-%m-%dT%H:%M") @@ -262,35 +286,43 @@ async def test_operators_date(prefilled_db: DummyOSDB): doc3_time = DOC3["DateField"].strftime("%Y-%m-%dT%H:%M") query = part | {"operator": "in", "values": [doc1_time, doc2_time]} - results = await prefilled_db.search(["IntField"], [query], []) + total, results = await prefilled_db.search(["IntField"], [query], []) + assert total == 2 assert {x["IntField"] for x in results} == {DOC1["IntField"], DOC2["IntField"]} query = part | {"operator": "not in", "values": [doc1_time, doc2_time]} - results = await prefilled_db.search(["IntField"], [query], []) + total, results = await prefilled_db.search(["IntField"], [query], []) + assert total == 1 assert {x["IntField"] for x in results} == {DOC3["IntField"]} query = part | {"operator": "lt", "value": doc1_time} - results = await prefilled_db.search(["IntField"], [query], []) + total, results = await prefilled_db.search(["IntField"], [query], []) + assert total == 2 assert {x["IntField"] for x in results} == {DOC2["IntField"], DOC3["IntField"]} query = part | {"operator": "lt", "value": doc3_time} - results = await prefilled_db.search(["IntField"], [query], []) + total, results = await prefilled_db.search(["IntField"], [query], []) + assert total == 1 assert {x["IntField"] for x in results} == {DOC2["IntField"]} query = part | {"operator": "lt", "value": doc2_time} - results = await prefilled_db.search(["IntField"], [query], []) + total, results = await prefilled_db.search(["IntField"], [query], []) + assert total == 0 assert {x["IntField"] for x in results} == set() query = part | {"operator": "gt", "value": doc1_time} - results = await prefilled_db.search(["IntField"], [query], []) + total, results = await prefilled_db.search(["IntField"], [query], []) + assert total == 0 assert {x["IntField"] for x in results} == set() query = part | {"operator": "gt", "value": doc3_time} - results = await prefilled_db.search(["IntField"], [query], []) + total, results = await prefilled_db.search(["IntField"], [query], []) + assert total == 1 assert {x["IntField"] for x in results} == {DOC1["IntField"]} query = part | {"operator": "gt", "value": doc2_time} - results = await prefilled_db.search(["IntField"], [query], []) + total, results = await prefilled_db.search(["IntField"], [query], []) + assert total == 2 assert {x["IntField"] for x in results} == {DOC1["IntField"], DOC3["IntField"]} @@ -312,11 +344,13 @@ async def test_operators_date_partial_doc1(prefilled_db: DummyOSDB, date_format: formatted_date = DOC1["DateField"].strftime(date_format) query = {"parameter": "DateField", "operator": "eq", "value": formatted_date} - results = await prefilled_db.search(["IntField"], [query], []) + total, results = await prefilled_db.search(["IntField"], [query], []) + assert total == 1 assert {x["IntField"] for x in results} == {DOC1["IntField"]} query = {"parameter": "DateField", "operator": "neq", "value": formatted_date} - results = await prefilled_db.search(["IntField"], [query], []) + total, results = await prefilled_db.search(["IntField"], [query], []) + assert total == 2 assert {x["IntField"] for x in results} == {DOC2["IntField"], DOC3["IntField"]} @@ -324,11 +358,13 @@ async def test_operators_keyword(prefilled_db: DummyOSDB): part = {"parameter": "KeywordField1"} query = part | {"operator": "eq", "value": DOC1["KeywordField1"]} - results = await prefilled_db.search(["IntField"], [query], []) + total, results = await prefilled_db.search(["IntField"], [query], []) + assert total == 2 assert {x["IntField"] for x in results} == {DOC1["IntField"], DOC2["IntField"]} query = part | {"operator": "neq", "value": DOC1["KeywordField1"]} - results = await prefilled_db.search(["IntField"], [query], []) + total, results = await prefilled_db.search(["IntField"], [query], []) + assert total == 1 assert {x["IntField"] for x in results} == {DOC3["IntField"]} part = {"parameter": "KeywordField0"} @@ -337,22 +373,26 @@ async def test_operators_keyword(prefilled_db: DummyOSDB): "operator": "in", "values": [DOC1["KeywordField0"], DOC3["KeywordField0"]], } - results = await prefilled_db.search(["IntField"], [query], []) + total, results = await prefilled_db.search(["IntField"], [query], []) + assert total == 2 assert {x["IntField"] for x in results} == {DOC1["IntField"], DOC3["IntField"]} query = part | {"operator": "in", "values": ["missing"]} - results = await prefilled_db.search(["IntField"], [query], []) + total, results = await prefilled_db.search(["IntField"], [query], []) + assert total == 0 assert {x["IntField"] for x in results} == set() query = part | { "operator": "not in", "values": [DOC1["KeywordField0"], DOC3["KeywordField0"]], } - results = await prefilled_db.search(["IntField"], [query], []) + total, results = await prefilled_db.search(["IntField"], [query], []) + assert total == 1 assert {x["IntField"] for x in results} == {DOC2["IntField"]} query = part | {"operator": "not in", "values": ["missing"]} - results = await prefilled_db.search(["IntField"], [query], []) + total, results = await prefilled_db.search(["IntField"], [query], []) + assert total == 3 assert {x["IntField"] for x in results} == { DOC1["IntField"], DOC2["IntField"], @@ -387,35 +427,41 @@ async def test_unindexed_field(prefilled_db: DummyOSDB): async def test_sort_long(prefilled_db: DummyOSDB): - results = await prefilled_db.search( + total, results = await prefilled_db.search( None, [], [{"parameter": "IntField", "direction": "asc"}] ) + assert total == 3 assert results == [DOC3, DOC2, DOC1] - results = await prefilled_db.search( + total, results = await prefilled_db.search( None, [], [{"parameter": "IntField", "direction": "desc"}] ) + assert total == 3 assert results == [DOC1, DOC2, DOC3] async def test_sort_date(prefilled_db: DummyOSDB): - results = await prefilled_db.search( + total, results = await prefilled_db.search( None, [], [{"parameter": "DateField", "direction": "asc"}] ) + assert total == 3 assert results == [DOC2, DOC3, DOC1] - results = await prefilled_db.search( + total, results = await prefilled_db.search( None, [], [{"parameter": "DateField", "direction": "desc"}] ) + assert total == 3 assert results == [DOC1, DOC3, DOC2] async def test_sort_keyword(prefilled_db: DummyOSDB): - results = await prefilled_db.search( + total, results = await prefilled_db.search( None, [], [{"parameter": "KeywordField0", "direction": "asc"}] ) + assert total == 3 assert results == [DOC1, DOC3, DOC2] - results = await prefilled_db.search( + total, results = await prefilled_db.search( None, [], [{"parameter": "KeywordField0", "direction": "desc"}] ) + assert total == 3 assert results == [DOC2, DOC3, DOC1] @@ -436,7 +482,7 @@ async def test_sort_unknown(prefilled_db: DummyOSDB): async def test_sort_multiple(prefilled_db: DummyOSDB): - results = await prefilled_db.search( + total, results = await prefilled_db.search( None, [], [ @@ -444,9 +490,10 @@ async def test_sort_multiple(prefilled_db: DummyOSDB): {"parameter": "IntField", "direction": "asc"}, ], ) + assert total == 3 assert results == [DOC2, DOC1, DOC3] - results = await prefilled_db.search( + total, results = await prefilled_db.search( None, [], [ @@ -454,9 +501,10 @@ async def test_sort_multiple(prefilled_db: DummyOSDB): {"parameter": "IntField", "direction": "desc"}, ], ) + assert total == 3 assert results == [DOC1, DOC2, DOC3] - results = await prefilled_db.search( + total, results = await prefilled_db.search( None, [], [ @@ -464,9 +512,10 @@ async def test_sort_multiple(prefilled_db: DummyOSDB): {"parameter": "IntField", "direction": "asc"}, ], ) + assert total == 3 assert results == [DOC3, DOC2, DOC1] - results = await prefilled_db.search( + total, results = await prefilled_db.search( None, [], [ @@ -474,4 +523,5 @@ async def test_sort_multiple(prefilled_db: DummyOSDB): {"parameter": "KeywordField1", "direction": "asc"}, ], ) + assert total == 3 assert results == [DOC3, DOC2, DOC1] diff --git a/diracx-logic/src/diracx/logic/jobs/query.py b/diracx-logic/src/diracx/logic/jobs/query.py index 0d5e50df4..87335c899 100644 --- a/diracx-logic/src/diracx/logic/jobs/query.py +++ b/diracx-logic/src/diracx/logic/jobs/query.py @@ -6,6 +6,7 @@ from diracx.core.config.schema import Config from diracx.core.models import ( ScalarSearchOperator, + ScalarSearchSpec, SearchParams, SummaryParams, ) @@ -81,6 +82,72 @@ async def search( return total, jobs +async def get_input_data(job_db: JobDB, job_id: int) -> list[dict[str, Any]]: + """Retrieve a job's input data.""" + _, input_data = await job_db.search_input_data( + [], + [ + ScalarSearchSpec( + parameter="JobID", operator=ScalarSearchOperator.EQUAL, value=job_id + ) + ], + [], + ) + + return input_data + + +async def get_job_parameters( + job_parameters_db: JobParametersDB, job_id: int +) -> list[dict[str, Any]]: + _, parameters = await job_parameters_db.search( + [], + [ + ScalarSearchSpec( + parameter="JobID", operator=ScalarSearchOperator.EQUAL, value=job_id + ) + ], + [], + ) + + return parameters + + +async def get_job_jdl(job_db: JobDB, job_id: int) -> dict[str, str | int]: + _, jdls = await job_db.search_jdl( + [], + [ + ScalarSearchSpec( + parameter="JobID", operator=ScalarSearchOperator.EQUAL, value=job_id + ) + ], + sorts=[], + ) + + assert len(jdls) <= 1 # If not, there's a problem + + if len(jdls) == 1: + return jdls[0] + + return {} + + +async def get_job_heartbeat_info( + job_db: JobDB, job_id: int +) -> list[dict[str, str | int]]: + _, jdls = await job_db.search_heartbeat_logging_info( + [], + [ + ScalarSearchSpec( + parameter="JobID", operator=ScalarSearchOperator.EQUAL, value=job_id + ) + ], + sorts=[], + ) + + return jdls + + async def summary( config: Config, job_db: JobDB, diff --git a/diracx-routers/src/diracx/routers/jobs/query.py b/diracx-routers/src/diracx/routers/jobs/query.py index 26bb3c053..40289216d 100644 --- a/diracx-routers/src/diracx/routers/jobs/query.py +++ b/diracx-routers/src/diracx/routers/jobs/query.py @@ -10,6 +10,10 @@ SummaryParams, ) from diracx.core.properties import JOB_ADMINISTRATOR +from diracx.logic.jobs.query import get_input_data as get_input_data_bl +from diracx.logic.jobs.query import get_job_heartbeat_info as get_job_heartbeat_info_bl +from diracx.logic.jobs.query import get_job_jdl as get_job_jdl_bl +from diracx.logic.jobs.query import get_job_parameters as get_job_parameters_bl from diracx.logic.jobs.query import search as search_bl from diracx.logic.jobs.query import summary as summary_bl @@ -310,3 +314,54 @@ async def summary( preferred_username=user_info.preferred_username, body=body, ) + + +@router.get("/{job_id}/input-data") +async def get_input_data( + job_id: int, + job_db: JobDB, + check_permissions: CheckWMSPolicyCallable, +) -> list[dict[str, Any]]: + """Fetches a job's input data.""" + await check_permissions(action=ActionType.MANAGE, job_db=job_db, job_ids=[job_id]) + + return await get_input_data_bl(job_db=job_db, job_id=job_id) + + +@router.get("/{job_id}/parameters") +async def get_job_parameters( + job_id: int, + job_db: JobDB, + job_parameters_db: JobParametersDB, + check_permissions: CheckWMSPolicyCallable, +) -> list[dict[str, Any]]: + """Get job parameters.""" + await check_permissions(action=ActionType.MANAGE, job_db=job_db, job_ids=[job_id]) + + return await get_job_parameters_bl( + job_parameters_db=job_parameters_db, job_id=job_id + ) + + +@router.get("/{job_id}/jdl") +async def get_job_jdl( + job_id: int, + job_db: JobDB, + check_permissions: CheckWMSPolicyCallable, +) -> dict[str, str | int]: + """Get job JDLs.""" + await check_permissions(action=ActionType.MANAGE, job_db=job_db, job_ids=[job_id]) + + return await get_job_jdl_bl(job_db=job_db, job_id=job_id) + + +@router.get("/{job_id}/heartbeat") +async def get_job_heartbeat_info( + job_id: int, + job_db: JobDB, + check_permissions: CheckWMSPolicyCallable, +) -> list[dict[str, str | int]]: + """Get job heartbeat info.""" + await check_permissions(action=ActionType.MANAGE, job_db=job_db, job_ids=[job_id]) + + return await get_job_heartbeat_info_bl(job_db=job_db, job_id=job_id) diff --git a/diracx-routers/tests/jobs/test_status.py b/diracx-routers/tests/jobs/test_status.py index b5612d1ad..a470270dd 100644 --- a/diracx-routers/tests/jobs/test_status.py +++ b/diracx-routers/tests/jobs/test_status.py @@ -400,6 +400,21 @@ def test_insert_and_reschedule(normal_user_client: TestClient): } +def test_insert_and_get_jdl(normal_user_client: TestClient): + job_definitions = [TEST_JDL] * 10 + r = normal_user_client.post("/api/jobs/jdl", json=job_definitions) + assert r.status_code == 200, r.json() + assert len(r.json()) == len(job_definitions) + + submitted_job_ids = set(job_dict["JobID"] for job_dict in r.json()) + + for id in submitted_job_ids: + # Get the JDL + r = normal_user_client.get(f"/api/jobs/{id}/jdl") + assert r.status_code == 200, r.json() + # Won't fetch it, submission logic alters the JDL + + # test edge case for rescheduling diff --git a/diracx-testing/src/diracx/testing/mock_osdb.py b/diracx-testing/src/diracx/testing/mock_osdb.py index 5f7fe7f93..6a87b0579 100644 --- a/diracx-testing/src/diracx/testing/mock_osdb.py +++ b/diracx-testing/src/diracx/testing/mock_osdb.py @@ -10,9 +10,10 @@ from functools import partial from typing import Any, AsyncIterator -from sqlalchemy import select +from sqlalchemy import func, select from sqlalchemy.dialects.sqlite import insert as sqlite_insert +from diracx.core.exceptions import InvalidQueryError from diracx.core.models import SearchSpec, SortSpec from diracx.db.sql import utils as sql_utils @@ -53,7 +54,11 @@ def __init__(self, connection_kwargs: dict[str, Any]) -> None: for field, field_type in self.fields.items(): match field_type["type"]: case "date": + # TODO: Warning, maybe this will crash? See date_nanos + # I needed to set Varchar because it is sent as 2022-06-15T10:12:52.382719622Z, and not datetime column_type = DateNowColumn + case "date_nanos": + column_type = partial(Column, type_=String(32)) case "long": column_type = partial(Column, type_=Integer) case "keyword": @@ -100,6 +105,21 @@ async def upsert(self, vo, doc_id, document) -> None: stmt = stmt.on_conflict_do_update(index_elements=["doc_id"], set_=values) await self._sql_db.conn.execute(stmt) + async def bulk_insert(self, index_name: str, docs: list[dict[str, Any]]) -> None: + async with self._sql_db: + rows = [] + for doc in docs: + # don't use doc_id column explicitly. This ensures that doc_id is unique. + values = {} + for key, value in doc.items(): + if key in self.fields: + values[key] = value + else: + values.setdefault("extra", {})[key] = value + rows.append(values) + stmt = sqlite_insert(self._table).values(rows) + await self._sql_db.conn.execute(stmt) + async def search( self, parameters: list[str] | None, @@ -135,8 +155,17 @@ async def search( self._table.columns.__getitem__, stmt, sorts ) + # Calculate total count before applying pagination + total_count_subquery = stmt.alias() + total_count_stmt = select(func.count()).select_from(total_count_subquery) + total = (await self._sql_db.conn.execute(total_count_stmt)).scalar_one() + # Apply pagination if page is not None: + if page < 1: + raise InvalidQueryError("Page must be a positive integer") + if per_page < 1: + raise InvalidQueryError("Per page must be a positive integer") stmt = stmt.offset((page - 1) * per_page).limit(per_page) results = [] @@ -151,7 +180,8 @@ async def search( if v is None: result.pop(k) results.append(result) - return results + + return total, results async def ping(self): async with self._sql_db: diff --git a/extensions/gubbins/gubbins-client/src/gubbins/client/_generated/aio/operations/_operations.py b/extensions/gubbins/gubbins-client/src/gubbins/client/_generated/aio/operations/_operations.py index 8927a2921..a3c06741c 100644 --- a/extensions/gubbins/gubbins-client/src/gubbins/client/_generated/aio/operations/_operations.py +++ b/extensions/gubbins/gubbins-client/src/gubbins/client/_generated/aio/operations/_operations.py @@ -1,4 +1,4 @@ -# pylint: disable=too-many-lines +# pylint: disable=line-too-long,useless-suppression,too-many-lines # coding=utf-8 # -------------------------------------------------------------------------- # Code generated by Microsoft (R) AutoRest Code Generator (autorest: 3.10.4, generator: @autorest/python@6.34.2) @@ -40,6 +40,10 @@ build_config_serve_config_request, build_jobs_add_heartbeat_request, build_jobs_assign_sandbox_to_job_request, + build_jobs_get_input_data_request, + build_jobs_get_job_heartbeat_info_request, + build_jobs_get_job_jdl_request, + build_jobs_get_job_parameters_request, build_jobs_get_job_sandbox_request, build_jobs_get_job_sandboxes_request, build_jobs_get_sandbox_file_request, @@ -2263,6 +2267,232 @@ async def summary(self, body: Union[_models.SummaryParams, IO[bytes]], **kwargs: return deserialized # type: ignore + @distributed_trace_async + async def get_input_data(self, job_id: int, **kwargs: Any) -> List[Dict[str, Any]]: + """Get Input Data. + + Fetches a job's input data. + + :param job_id: Required. + :type job_id: int + :return: list of dict mapping str to any + :rtype: list[dict[str, any]] + :raises ~azure.core.exceptions.HttpResponseError: + """ + error_map: MutableMapping = { + 401: ClientAuthenticationError, + 404: ResourceNotFoundError, + 409: ResourceExistsError, + 304: ResourceNotModifiedError, + } + error_map.update(kwargs.pop("error_map", {}) or {}) + + _headers = kwargs.pop("headers", {}) or {} + _params = kwargs.pop("params", {}) or {} + + cls: ClsType[List[Dict[str, Any]]] = kwargs.pop("cls", None) + + _request = build_jobs_get_input_data_request( + job_id=job_id, + headers=_headers, + params=_params, + ) + _request.url = self._client.format_url(_request.url) + + _stream = False + pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access + _request, stream=_stream, **kwargs + ) + + response = pipeline_response.http_response + + if response.status_code not in [200]: + map_error(status_code=response.status_code, response=response, error_map=error_map) + raise HttpResponseError(response=response) + + deserialized = self._deserialize("[{object}]", pipeline_response.http_response) + + if cls: + return cls(pipeline_response, deserialized, {}) # type: ignore + + return deserialized # type: ignore + + @distributed_trace_async + async def get_job_parameters(self, job_id: int, **kwargs: Any) -> List[Dict[str, Any]]: + """Get Job Parameters. + + Get job parameters. + + :param job_id: Required. + :type job_id: int + :return: list of dict mapping str to any + :rtype: list[dict[str, any]] + :raises ~azure.core.exceptions.HttpResponseError: + """ + error_map: MutableMapping = { + 401: ClientAuthenticationError, + 404: ResourceNotFoundError, + 409: ResourceExistsError, + 304: ResourceNotModifiedError, + } + error_map.update(kwargs.pop("error_map", {}) or {}) + + _headers = kwargs.pop("headers", {}) or {} + _params = kwargs.pop("params", {}) or {} + + cls: ClsType[List[Dict[str, Any]]] = kwargs.pop("cls", None) + + _request = build_jobs_get_job_parameters_request( + job_id=job_id, + headers=_headers, + params=_params, + ) + _request.url = self._client.format_url(_request.url) + + _stream = False + pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access + _request, stream=_stream, **kwargs + ) + + response = pipeline_response.http_response + + if response.status_code not in [200]: + map_error(status_code=response.status_code, response=response, error_map=error_map) + raise HttpResponseError(response=response) + + deserialized = self._deserialize("[{object}]", pipeline_response.http_response) + + if cls: + return cls(pipeline_response, deserialized, {}) # type: ignore + + return deserialized # type: ignore + + @distributed_trace_async + async def get_job_jdl( + self, job_id: int, **kwargs: Any + ) -> Dict[str, _models.PathsZ91EtjApiJobsJobIdJdlGetResponses200ContentApplicationJsonSchemaAdditionalproperties]: + """Get Job Jdl. + + Get job JDLs. + + :param job_id: Required. + :type job_id: int + :return: dict mapping str to + PathsZ91EtjApiJobsJobIdJdlGetResponses200ContentApplicationJsonSchemaAdditionalproperties + :rtype: dict[str, + ~_generated.models.PathsZ91EtjApiJobsJobIdJdlGetResponses200ContentApplicationJsonSchemaAdditionalproperties] + :raises ~azure.core.exceptions.HttpResponseError: + """ + error_map: MutableMapping = { + 401: ClientAuthenticationError, + 404: ResourceNotFoundError, + 409: ResourceExistsError, + 304: ResourceNotModifiedError, + } + error_map.update(kwargs.pop("error_map", {}) or {}) + + _headers = kwargs.pop("headers", {}) or {} + _params = kwargs.pop("params", {}) or {} + + cls: ClsType[ + Dict[str, _models.PathsZ91EtjApiJobsJobIdJdlGetResponses200ContentApplicationJsonSchemaAdditionalproperties] + ] = kwargs.pop("cls", None) + + _request = build_jobs_get_job_jdl_request( + job_id=job_id, + headers=_headers, + params=_params, + ) + _request.url = self._client.format_url(_request.url) + + _stream = False + pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access + _request, stream=_stream, **kwargs + ) + + response = pipeline_response.http_response + + if response.status_code not in [200]: + map_error(status_code=response.status_code, response=response, error_map=error_map) + raise HttpResponseError(response=response) + + deserialized = self._deserialize( + "{PathsZ91EtjApiJobsJobIdJdlGetResponses200ContentApplicationJsonSchemaAdditionalproperties}", + pipeline_response.http_response, + ) + + if cls: + return cls(pipeline_response, deserialized, {}) # type: ignore + + return deserialized # type: ignore + + @distributed_trace_async + async def get_job_heartbeat_info(self, job_id: int, **kwargs: Any) -> List[ + Dict[ + str, + _models.PathsLs6HcyApiJobsJobIdHeartbeatGetResponses200ContentApplicationJsonSchemaItemsAdditionalproperties, + ] + ]: + """Get Job Heartbeat Info. + + Get job heartbeat info. + + :param job_id: Required. + :type job_id: int + :return: list of dict mapping str to + PathsLs6HcyApiJobsJobIdHeartbeatGetResponses200ContentApplicationJsonSchemaItemsAdditionalproperties + :rtype: list[dict[str, + ~_generated.models.PathsLs6HcyApiJobsJobIdHeartbeatGetResponses200ContentApplicationJsonSchemaItemsAdditionalproperties]] + :raises ~azure.core.exceptions.HttpResponseError: + """ + error_map: MutableMapping = { + 401: ClientAuthenticationError, + 404: ResourceNotFoundError, + 409: ResourceExistsError, + 304: ResourceNotModifiedError, + } + error_map.update(kwargs.pop("error_map", {}) or {}) + + _headers = kwargs.pop("headers", {}) or {} + _params = kwargs.pop("params", {}) or {} + + cls: ClsType[ + List[ + Dict[ + str, + _models.PathsLs6HcyApiJobsJobIdHeartbeatGetResponses200ContentApplicationJsonSchemaItemsAdditionalproperties, + ] + ] + ] = kwargs.pop("cls", None) + + _request = build_jobs_get_job_heartbeat_info_request( + job_id=job_id, + headers=_headers, + params=_params, + ) + _request.url = self._client.format_url(_request.url) + + _stream = False + pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access + _request, stream=_stream, **kwargs + ) + + response = pipeline_response.http_response + + if response.status_code not in [200]: + map_error(status_code=response.status_code, response=response, error_map=error_map) + raise HttpResponseError(response=response) + + deserialized = self._deserialize( + "[{PathsLs6HcyApiJobsJobIdHeartbeatGetResponses200ContentApplicationJsonSchemaItemsAdditionalproperties}]", + pipeline_response.http_response, + ) + + if cls: + return cls(pipeline_response, deserialized, {}) # type: ignore + + return deserialized # type: ignore + @overload async def submit_jdl_jobs( self, body: List[str], *, content_type: str = "application/json", **kwargs: Any diff --git a/extensions/gubbins/gubbins-client/src/gubbins/client/_generated/models/__init__.py b/extensions/gubbins/gubbins-client/src/gubbins/client/_generated/models/__init__.py index d8e29cfeb..bfad34655 100644 --- a/extensions/gubbins/gubbins-client/src/gubbins/client/_generated/models/__init__.py +++ b/extensions/gubbins/gubbins-client/src/gubbins/client/_generated/models/__init__.py @@ -27,6 +27,8 @@ JobMetaDataAccountedFlag, JobStatusUpdate, OpenIDConfiguration, + PathsLs6HcyApiJobsJobIdHeartbeatGetResponses200ContentApplicationJsonSchemaItemsAdditionalproperties, + PathsZ91EtjApiJobsJobIdJdlGetResponses200ContentApplicationJsonSchemaAdditionalproperties, SandboxDownloadResponse, SandboxInfo, SandboxUploadResponse, @@ -78,6 +80,8 @@ "JobMetaDataAccountedFlag", "JobStatusUpdate", "OpenIDConfiguration", + "PathsLs6HcyApiJobsJobIdHeartbeatGetResponses200ContentApplicationJsonSchemaItemsAdditionalproperties", + "PathsZ91EtjApiJobsJobIdJdlGetResponses200ContentApplicationJsonSchemaAdditionalproperties", "SandboxDownloadResponse", "SandboxInfo", "SandboxUploadResponse", diff --git a/extensions/gubbins/gubbins-client/src/gubbins/client/_generated/models/_models.py b/extensions/gubbins/gubbins-client/src/gubbins/client/_generated/models/_models.py index faaea49b4..c2cec1262 100644 --- a/extensions/gubbins/gubbins-client/src/gubbins/client/_generated/models/_models.py +++ b/extensions/gubbins/gubbins-client/src/gubbins/client/_generated/models/_models.py @@ -907,6 +907,18 @@ def __init__( self.code_challenge_methods_supported = code_challenge_methods_supported +class PathsLs6HcyApiJobsJobIdHeartbeatGetResponses200ContentApplicationJsonSchemaItemsAdditionalproperties( + _serialization.Model +): # pylint: disable=name-too-long + """PathsLs6HcyApiJobsJobIdHeartbeatGetResponses200ContentApplicationJsonSchemaItemsAdditionalproperties.""" + + +class PathsZ91EtjApiJobsJobIdJdlGetResponses200ContentApplicationJsonSchemaAdditionalproperties( + _serialization.Model +): # pylint: disable=name-too-long + """PathsZ91EtjApiJobsJobIdJdlGetResponses200ContentApplicationJsonSchemaAdditionalproperties.""" + + class SandboxDownloadResponse(_serialization.Model): """SandboxDownloadResponse. diff --git a/extensions/gubbins/gubbins-client/src/gubbins/client/_generated/operations/_operations.py b/extensions/gubbins/gubbins-client/src/gubbins/client/_generated/operations/_operations.py index fa5e665ce..a4dd6df2a 100644 --- a/extensions/gubbins/gubbins-client/src/gubbins/client/_generated/operations/_operations.py +++ b/extensions/gubbins/gubbins-client/src/gubbins/client/_generated/operations/_operations.py @@ -1,4 +1,4 @@ -# pylint: disable=too-many-lines +# pylint: disable=line-too-long,useless-suppression,too-many-lines # coding=utf-8 # -------------------------------------------------------------------------- # Code generated by Microsoft (R) AutoRest Code Generator (autorest: 3.10.4, generator: @autorest/python@6.34.2) @@ -569,6 +569,84 @@ def build_jobs_summary_request(**kwargs: Any) -> HttpRequest: return HttpRequest(method="POST", url=_url, headers=_headers, **kwargs) +def build_jobs_get_input_data_request(job_id: int, **kwargs: Any) -> HttpRequest: + _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) + + accept = _headers.pop("Accept", "application/json") + + # Construct URL + _url = "/api/jobs/{job_id}/input-data" + path_format_arguments = { + "job_id": _SERIALIZER.url("job_id", job_id, "int"), + } + + _url: str = _url.format(**path_format_arguments) # type: ignore + + # Construct headers + _headers["Accept"] = _SERIALIZER.header("accept", accept, "str") + + return HttpRequest(method="GET", url=_url, headers=_headers, **kwargs) + + +def build_jobs_get_job_parameters_request(job_id: int, **kwargs: Any) -> HttpRequest: + _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) + + accept = _headers.pop("Accept", "application/json") + + # Construct URL + _url = "/api/jobs/{job_id}/parameters" + path_format_arguments = { + "job_id": _SERIALIZER.url("job_id", job_id, "int"), + } + + _url: str = _url.format(**path_format_arguments) # type: ignore + + # Construct headers + _headers["Accept"] = _SERIALIZER.header("accept", accept, "str") + + return HttpRequest(method="GET", url=_url, headers=_headers, **kwargs) + + +def build_jobs_get_job_jdl_request(job_id: int, **kwargs: Any) -> HttpRequest: + _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) + + accept = _headers.pop("Accept", "application/json") + + # Construct URL + _url = "/api/jobs/{job_id}/jdl" + path_format_arguments = { + "job_id": _SERIALIZER.url("job_id", job_id, "int"), + } + + _url: str = _url.format(**path_format_arguments) # type: ignore + + # Construct headers + _headers["Accept"] = _SERIALIZER.header("accept", accept, "str") + + return HttpRequest(method="GET", url=_url, headers=_headers, **kwargs) + + +def build_jobs_get_job_heartbeat_info_request( # pylint: disable=name-too-long + job_id: int, **kwargs: Any +) -> HttpRequest: + _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) + + accept = _headers.pop("Accept", "application/json") + + # Construct URL + _url = "/api/jobs/{job_id}/heartbeat" + path_format_arguments = { + "job_id": _SERIALIZER.url("job_id", job_id, "int"), + } + + _url: str = _url.format(**path_format_arguments) # type: ignore + + # Construct headers + _headers["Accept"] = _SERIALIZER.header("accept", accept, "str") + + return HttpRequest(method="GET", url=_url, headers=_headers, **kwargs) + + def build_jobs_submit_jdl_jobs_request(**kwargs: Any) -> HttpRequest: _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) @@ -2828,6 +2906,232 @@ def summary(self, body: Union[_models.SummaryParams, IO[bytes]], **kwargs: Any) return deserialized # type: ignore + @distributed_trace + def get_input_data(self, job_id: int, **kwargs: Any) -> List[Dict[str, Any]]: + """Get Input Data. + + Fetches a job's input data. + + :param job_id: Required. + :type job_id: int + :return: list of dict mapping str to any + :rtype: list[dict[str, any]] + :raises ~azure.core.exceptions.HttpResponseError: + """ + error_map: MutableMapping = { + 401: ClientAuthenticationError, + 404: ResourceNotFoundError, + 409: ResourceExistsError, + 304: ResourceNotModifiedError, + } + error_map.update(kwargs.pop("error_map", {}) or {}) + + _headers = kwargs.pop("headers", {}) or {} + _params = kwargs.pop("params", {}) or {} + + cls: ClsType[List[Dict[str, Any]]] = kwargs.pop("cls", None) + + _request = build_jobs_get_input_data_request( + job_id=job_id, + headers=_headers, + params=_params, + ) + _request.url = self._client.format_url(_request.url) + + _stream = False + pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access + _request, stream=_stream, **kwargs + ) + + response = pipeline_response.http_response + + if response.status_code not in [200]: + map_error(status_code=response.status_code, response=response, error_map=error_map) + raise HttpResponseError(response=response) + + deserialized = self._deserialize("[{object}]", pipeline_response.http_response) + + if cls: + return cls(pipeline_response, deserialized, {}) # type: ignore + + return deserialized # type: ignore + + @distributed_trace + def get_job_parameters(self, job_id: int, **kwargs: Any) -> List[Dict[str, Any]]: + """Get Job Parameters. + + Get job parameters. + + :param job_id: Required. + :type job_id: int + :return: list of dict mapping str to any + :rtype: list[dict[str, any]] + :raises ~azure.core.exceptions.HttpResponseError: + """ + error_map: MutableMapping = { + 401: ClientAuthenticationError, + 404: ResourceNotFoundError, + 409: ResourceExistsError, + 304: ResourceNotModifiedError, + } + error_map.update(kwargs.pop("error_map", {}) or {}) + + _headers = kwargs.pop("headers", {}) or {} + _params = kwargs.pop("params", {}) or {} + + cls: ClsType[List[Dict[str, Any]]] = kwargs.pop("cls", None) + + _request = build_jobs_get_job_parameters_request( + job_id=job_id, + headers=_headers, + params=_params, + ) + _request.url = self._client.format_url(_request.url) + + _stream = False + pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access + _request, stream=_stream, **kwargs + ) + + response = pipeline_response.http_response + + if response.status_code not in [200]: + map_error(status_code=response.status_code, response=response, error_map=error_map) + raise HttpResponseError(response=response) + + deserialized = self._deserialize("[{object}]", pipeline_response.http_response) + + if cls: + return cls(pipeline_response, deserialized, {}) # type: ignore + + return deserialized # type: ignore + + @distributed_trace + def get_job_jdl( + self, job_id: int, **kwargs: Any + ) -> Dict[str, _models.PathsZ91EtjApiJobsJobIdJdlGetResponses200ContentApplicationJsonSchemaAdditionalproperties]: + """Get Job Jdl. + + Get job JDLs. + + :param job_id: Required. + :type job_id: int + :return: dict mapping str to + PathsZ91EtjApiJobsJobIdJdlGetResponses200ContentApplicationJsonSchemaAdditionalproperties + :rtype: dict[str, + ~_generated.models.PathsZ91EtjApiJobsJobIdJdlGetResponses200ContentApplicationJsonSchemaAdditionalproperties] + :raises ~azure.core.exceptions.HttpResponseError: + """ + error_map: MutableMapping = { + 401: ClientAuthenticationError, + 404: ResourceNotFoundError, + 409: ResourceExistsError, + 304: ResourceNotModifiedError, + } + error_map.update(kwargs.pop("error_map", {}) or {}) + + _headers = kwargs.pop("headers", {}) or {} + _params = kwargs.pop("params", {}) or {} + + cls: ClsType[ + Dict[str, _models.PathsZ91EtjApiJobsJobIdJdlGetResponses200ContentApplicationJsonSchemaAdditionalproperties] + ] = kwargs.pop("cls", None) + + _request = build_jobs_get_job_jdl_request( + job_id=job_id, + headers=_headers, + params=_params, + ) + _request.url = self._client.format_url(_request.url) + + _stream = False + pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access + _request, stream=_stream, **kwargs + ) + + response = pipeline_response.http_response + + if response.status_code not in [200]: + map_error(status_code=response.status_code, response=response, error_map=error_map) + raise HttpResponseError(response=response) + + deserialized = self._deserialize( + "{PathsZ91EtjApiJobsJobIdJdlGetResponses200ContentApplicationJsonSchemaAdditionalproperties}", + pipeline_response.http_response, + ) + + if cls: + return cls(pipeline_response, deserialized, {}) # type: ignore + + return deserialized # type: ignore + + @distributed_trace + def get_job_heartbeat_info(self, job_id: int, **kwargs: Any) -> List[ + Dict[ + str, + _models.PathsLs6HcyApiJobsJobIdHeartbeatGetResponses200ContentApplicationJsonSchemaItemsAdditionalproperties, + ] + ]: + """Get Job Heartbeat Info. + + Get job heartbeat info. + + :param job_id: Required. + :type job_id: int + :return: list of dict mapping str to + PathsLs6HcyApiJobsJobIdHeartbeatGetResponses200ContentApplicationJsonSchemaItemsAdditionalproperties + :rtype: list[dict[str, + ~_generated.models.PathsLs6HcyApiJobsJobIdHeartbeatGetResponses200ContentApplicationJsonSchemaItemsAdditionalproperties]] + :raises ~azure.core.exceptions.HttpResponseError: + """ + error_map: MutableMapping = { + 401: ClientAuthenticationError, + 404: ResourceNotFoundError, + 409: ResourceExistsError, + 304: ResourceNotModifiedError, + } + error_map.update(kwargs.pop("error_map", {}) or {}) + + _headers = kwargs.pop("headers", {}) or {} + _params = kwargs.pop("params", {}) or {} + + cls: ClsType[ + List[ + Dict[ + str, + _models.PathsLs6HcyApiJobsJobIdHeartbeatGetResponses200ContentApplicationJsonSchemaItemsAdditionalproperties, + ] + ] + ] = kwargs.pop("cls", None) + + _request = build_jobs_get_job_heartbeat_info_request( + job_id=job_id, + headers=_headers, + params=_params, + ) + _request.url = self._client.format_url(_request.url) + + _stream = False + pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access + _request, stream=_stream, **kwargs + ) + + response = pipeline_response.http_response + + if response.status_code not in [200]: + map_error(status_code=response.status_code, response=response, error_map=error_map) + raise HttpResponseError(response=response) + + deserialized = self._deserialize( + "[{PathsLs6HcyApiJobsJobIdHeartbeatGetResponses200ContentApplicationJsonSchemaItemsAdditionalproperties}]", + pipeline_response.http_response, + ) + + if cls: + return cls(pipeline_response, deserialized, {}) # type: ignore + + return deserialized # type: ignore + @overload def submit_jdl_jobs( self, body: List[str], *, content_type: str = "application/json", **kwargs: Any