Skip to content

Commit 6090d10

Browse files
authored
Merge pull request #257 from ecmwf-projects/COPDS-2811-per-dataset-rate-limit
Implement per-dataset rate limits
2 parents 44a6603 + 157390b commit 6090d10

File tree

6 files changed

+309
-80
lines changed

6 files changed

+309
-80
lines changed

cads_processing_api_service/clients.py

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -230,8 +230,11 @@ def post_process_execution(
230230
"""
231231
structlog.contextvars.bind_contextvars(user_uid=auth_info.user_uid)
232232
_ = limits.check_rate_limits(
233-
SETTINGS.rate_limits.process_execution.post,
233+
SETTINGS.rate_limits,
234+
"processes_processid_execution",
235+
"post",
234236
auth_info,
237+
process_id,
235238
)
236239
request_body = execution_content.model_dump()
237240
catalogue_sessionmaker = db_utils.get_catalogue_sessionmaker(
@@ -396,7 +399,9 @@ def get_jobs(
396399
"""
397400
structlog.contextvars.bind_contextvars(user_uid=auth_info.user_uid)
398401
_ = limits.check_rate_limits(
399-
SETTINGS.rate_limits.jobs.get,
402+
SETTINGS.rate_limits,
403+
"jobs",
404+
"get",
400405
auth_info,
401406
)
402407
job_filters = {
@@ -526,7 +531,9 @@ def get_job(
526531
"""
527532
structlog.contextvars.bind_contextvars(user_uid=auth_info.user_uid)
528533
_ = limits.check_rate_limits(
529-
SETTINGS.rate_limits.job.get,
534+
SETTINGS.rate_limits,
535+
"jobs_jobid",
536+
"get",
530537
auth_info,
531538
)
532539
compute_connection_mode = (
@@ -646,7 +653,9 @@ def get_job_results(
646653
"""
647654
structlog.contextvars.bind_contextvars(user_uid=auth_info.user_uid)
648655
_ = limits.check_rate_limits(
649-
SETTINGS.rate_limits.job_results.get,
656+
SETTINGS.rate_limits,
657+
"jobs_jobsid_results",
658+
"get",
650659
auth_info,
651660
)
652661
compute_connection_mode = (
@@ -711,7 +720,9 @@ def delete_job(
711720
"""
712721
structlog.contextvars.bind_contextvars(user_uid=auth_info.user_uid)
713722
_ = limits.check_rate_limits(
714-
SETTINGS.rate_limits.job.delete,
723+
SETTINGS.rate_limits,
724+
"jobs_jobsid",
725+
"delete",
715726
auth_info,
716727
)
717728
compute_sessionmaker = db_utils.get_compute_sessionmaker(

cads_processing_api_service/config.py

Lines changed: 25 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -108,45 +108,47 @@ class RateLimitsRouteConfig(pydantic.BaseModel):
108108
delete: RateLimitsMethodConfig = pydantic.Field(default=RateLimitsMethodConfig())
109109

110110

111+
class RateLimitsRouteParamConfig(pydantic.BaseModel):
112+
__pydantic_extra__: dict[str, RateLimitsRouteConfig] = pydantic.Field(init=False)
113+
114+
default: RateLimitsRouteConfig = pydantic.Field(default=RateLimitsRouteConfig())
115+
116+
model_config = pydantic.ConfigDict(extra="allow")
117+
118+
111119
class RateLimitsConfig(pydantic.BaseModel):
112120
default: RateLimitsRouteConfig = pydantic.Field(
113121
default=RateLimitsRouteConfig(), validate_default=True
114122
)
115-
process_execution: RateLimitsRouteConfig = pydantic.Field(
116-
default=RateLimitsRouteConfig(),
123+
processes_processid_execution: RateLimitsRouteParamConfig = pydantic.Field(
117124
alias="/processes/{process_id}/execution",
125+
default=RateLimitsRouteParamConfig(),
126+
validate_default=True,
127+
)
128+
processes_processid_constraints: RateLimitsRouteParamConfig = pydantic.Field(
129+
alias="/processes/{process_id}/constraints",
130+
default=RateLimitsRouteParamConfig(),
131+
validate_default=True,
132+
)
133+
processes_processid_costing: RateLimitsRouteParamConfig = pydantic.Field(
134+
alias="/processes/{process_id}/costing",
135+
default=RateLimitsRouteParamConfig(),
118136
validate_default=True,
119137
)
120138
jobs: RateLimitsRouteConfig = pydantic.Field(
121139
default=RateLimitsRouteConfig(), alias="/jobs", validate_default=True
122140
)
123-
job: RateLimitsRouteConfig = pydantic.Field(
141+
jobs_jobsid: RateLimitsRouteConfig = pydantic.Field(
124142
default=RateLimitsRouteConfig(), alias="/jobs/{job_id}", validate_default=True
125143
)
126-
job_results: RateLimitsRouteConfig = pydantic.Field(
144+
jobs_jobsid_results: RateLimitsRouteConfig = pydantic.Field(
127145
default=RateLimitsRouteConfig(),
128146
alias="/jobs/{job_id}/results",
129147
validate_default=True,
130148
)
131-
132-
@pydantic.model_validator(mode="after") # type: ignore
133-
def populate_fields_with_default(self) -> pydantic.BaseModel:
134-
default = self.default
135-
if default is RateLimitsRouteConfig():
136-
return self
137-
routes = self.model_fields
138-
for route in routes:
139-
if route == "default":
140-
continue
141-
route_config: RateLimitsRouteConfig = getattr(self, route)
142-
for method in route_config.model_fields:
143-
method_config: RateLimitsMethodConfig = getattr(route_config, method)
144-
for origin in method_config.model_fields:
145-
set_value = getattr(getattr(getattr(self, route), method), origin)
146-
if not set_value:
147-
default_value = getattr(getattr(default, method), origin)
148-
setattr(getattr(route_config, method), origin, default_value)
149-
return self
149+
jobs_delete: RateLimitsRouteConfig = pydantic.Field(
150+
default=RateLimitsRouteConfig(), alias="/jobs/delete", validate_default=True
151+
)
150152

151153

152154
def load_rate_limits(rate_limits_file: str | None) -> RateLimitsConfig:

cads_processing_api_service/endpoints.py

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,6 @@
2626

2727
SETTINGS = config.settings
2828

29-
logger: structlog.stdlib.BoundLogger = structlog.get_logger(__name__)
30-
3129

3230
@exceptions.exception_logger
3331
def apply_constraints(
@@ -39,6 +37,13 @@ def apply_constraints(
3937
)
4038
),
4139
) -> dict[str, Any]:
40+
limits.check_rate_limits(
41+
SETTINGS.rate_limits,
42+
"processes_processid_constraints",
43+
"post",
44+
auth_info,
45+
process_id,
46+
)
4247
request = execution_content.model_dump()
4348
table = cads_catalogue.database.Resource
4449
catalogue_sessionmaker = db_utils.get_catalogue_sessionmaker(
@@ -93,6 +98,13 @@ def estimate_cost(
9398
models.RequestCost
9499
Info on the cost with the highest cost/limit ratio.
95100
"""
101+
limits.check_rate_limits(
102+
SETTINGS.rate_limits,
103+
"processes_processid_costing",
104+
"post",
105+
auth_info,
106+
process_id,
107+
)
96108
request = execution_content.model_dump()
97109
table = cads_catalogue.database.Resource
98110
catalogue_sessionmaker = db_utils.get_catalogue_sessionmaker(
@@ -172,7 +184,9 @@ def delete_jobs(
172184
"""
173185
structlog.contextvars.bind_contextvars(user_uid=auth_info.user_uid)
174186
limits.check_rate_limits(
175-
SETTINGS.rate_limits.jobs.delete,
187+
SETTINGS.rate_limits,
188+
"jobs_delete",
189+
"post",
176190
auth_info,
177191
)
178192
job_ids = request.job_ids

cads_processing_api_service/limits.py

Lines changed: 53 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
# See the License for the specific language governing permissions and
1515
# limitations under the License
1616

17+
from typing import Any
18+
1719
import limits
1820
import structlog
1921

@@ -27,6 +29,47 @@
2729
limiter = config.RATE_LIMITS_LIMITER
2830

2931

32+
def get_rate_limits(
33+
rate_limits_config: config.RateLimitsConfig,
34+
route: str,
35+
method: str,
36+
request_origin: str,
37+
route_param: str | None = None,
38+
) -> list[str]:
39+
"""Get the rate limits for a specific route and method."""
40+
rate_limits = rate_limits_config.model_dump()
41+
route_rate_limits: dict[str, Any] = rate_limits.get(route, {})
42+
if route_param is not None:
43+
route_param_rate_limits: dict[str, Any] = route_rate_limits.get(route_param, {})
44+
else:
45+
route_param_rate_limits = route_rate_limits
46+
method_rate_limits: dict[str, Any] = route_param_rate_limits.get(method, {})
47+
rate_limit_ids: list[str] = method_rate_limits.get(request_origin, [])
48+
return rate_limit_ids
49+
50+
51+
def get_rate_limits_defaulted(
52+
rate_limits_config: config.RateLimitsConfig,
53+
route: str,
54+
method: str,
55+
request_origin: str,
56+
route_param: str | None = None,
57+
) -> list[str]:
58+
"""Get the rate limits for a specific route and method, with defaults."""
59+
rate_limits = get_rate_limits(
60+
rate_limits_config, route, method, request_origin, route_param
61+
)
62+
if not rate_limits:
63+
rate_limits = get_rate_limits(
64+
rate_limits_config, route, method, request_origin, "default"
65+
)
66+
if not rate_limits:
67+
rate_limits = get_rate_limits(
68+
rate_limits_config, "default", method, request_origin
69+
)
70+
return rate_limits
71+
72+
3073
def check_rate_limits_for_user(
3174
user_uid: str, rate_limits: list[limits.RateLimitItem]
3275
) -> None:
@@ -52,13 +95,18 @@ def check_rate_limits_for_user(
5295

5396

5497
def check_rate_limits(
55-
method_rate_limits: config.RateLimitsMethodConfig,
98+
rate_limits_config: config.RateLimitsConfig,
99+
route: str,
100+
method: str,
56101
auth_info: models.AuthInfo,
102+
route_param: str | None = None,
57103
) -> None:
58104
"""Check if the rate limits are exceeded."""
59-
user_uid = auth_info.user_uid
60105
request_origin = auth_info.request_origin
61-
rate_limit_ids = getattr(method_rate_limits, request_origin)
62-
rate_limits = [limits.parse(rate_limit_id) for rate_limit_id in rate_limit_ids]
63-
check_rate_limits_for_user(user_uid, rate_limits)
106+
user_uid = auth_info.user_uid
107+
rate_limits = get_rate_limits_defaulted(
108+
rate_limits_config, route, method, request_origin, route_param
109+
)
110+
rate_limits_parsed = [limits.parse(rate_limit) for rate_limit in rate_limits]
111+
check_rate_limits_for_user(user_uid, rate_limits_parsed)
64112
return None

tests/test_10_config.py

Lines changed: 28 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -73,18 +73,41 @@ def test_load_rate_limits(tmp_path: pathlib.Path, caplog) -> None:
7373

7474
rate_limits_file = str(tmp_path / "rate-limits.yaml")
7575
rate_limits = {
76-
"/processes/{process_id}/execution": {
77-
"post": {"api": ["1/second"], "ui": ["2/second"]}
76+
"/jobs/{job_id}": {"get": {"api": ["1/second"], "ui": ["2/second"]}},
77+
"/processes/{process_id}/constraints": {
78+
"default": {"get": {"api": ["1/second"], "ui": ["2/second"]}},
79+
"process-id": {"post": {"api": ["1/second"], "ui": ["2/second"]}},
7880
},
7981
}
8082
with open(rate_limits_file, "w") as file:
8183
yaml.dump(rate_limits, file)
82-
loaded_rate_limits = config.load_rate_limits(rate_limits_file)
83-
assert loaded_rate_limits == config.RateLimitsConfig(**rate_limits)
84+
loaded_rate_limits = config.load_rate_limits(rate_limits_file).model_dump()
85+
expected_jobs_limits = {
86+
"get": {"api": ["1/second"], "ui": ["2/second"]},
87+
"post": {"api": [], "ui": []},
88+
"delete": {"api": [], "ui": []},
89+
}
90+
assert loaded_rate_limits["jobs_jobsid"] == expected_jobs_limits
91+
expected_process_constraints_limits = {
92+
"default": {
93+
"get": {"api": ["1/second"], "ui": ["2/second"]},
94+
"post": {"api": [], "ui": []},
95+
"delete": {"api": [], "ui": []},
96+
},
97+
"process-id": {
98+
"get": {"api": [], "ui": []},
99+
"post": {"api": ["1/second"], "ui": ["2/second"]},
100+
"delete": {"api": [], "ui": []},
101+
},
102+
}
103+
assert (
104+
loaded_rate_limits["processes_processid_constraints"]
105+
== expected_process_constraints_limits
106+
)
84107

85108
rate_limits_file = str(tmp_path / "invalid-rate-limits.yaml")
86109
rate_limits = {
87-
"/processes/{process_id}/execution": {"post": {"api": ["invalid_limit"]}},
110+
"/jobs/{job_id}": {"get": {"api": ["invalid_limit"]}},
88111
}
89112
with open(rate_limits_file, "w") as file:
90113
yaml.dump(rate_limits, file)
@@ -94,41 +117,3 @@ def test_load_rate_limits(tmp_path: pathlib.Path, caplog) -> None:
94117
rate_limits_file = str(tmp_path / "not-found-rate-limits.yaml")
95118
loaded_rate_limits = config.load_rate_limits(rate_limits_file)
96119
assert loaded_rate_limits == config.RateLimitsConfig()
97-
98-
99-
def test_rate_limits_config_populate_with_default() -> None:
100-
rate_limits_config = config.RateLimitsConfig(
101-
**{
102-
"default": {
103-
"post": {"api": ["1/second"], "ui": ["2/second"]},
104-
"get": {"api": ["2/second"]},
105-
},
106-
"/processes/{process_id}/execution": {"post": {"api": ["1/minute"]}},
107-
}
108-
)
109-
exp_populated_rate_limits_config = {
110-
"default": {
111-
"post": {"api": ["1/second"], "ui": ["2/second"]},
112-
"get": {"api": ["2/second"]},
113-
},
114-
"process_execution": {
115-
"post": {"api": ["1/minute"], "ui": ["2/second"]},
116-
"get": {"api": ["2/second"]},
117-
},
118-
"jobs": {
119-
"post": {"api": ["1/second"], "ui": ["2/second"]},
120-
"get": {"api": ["2/second"]},
121-
},
122-
"job": {
123-
"post": {"api": ["1/second"], "ui": ["2/second"]},
124-
"get": {"api": ["2/second"]},
125-
},
126-
"job_results": {
127-
"post": {"api": ["1/second"], "ui": ["2/second"]},
128-
"get": {"api": ["2/second"]},
129-
},
130-
}
131-
assert (
132-
rate_limits_config.model_dump(exclude_defaults=True)
133-
== exp_populated_rate_limits_config
134-
)

0 commit comments

Comments
 (0)