diff --git a/datadog_checks_base/changelog.d/21316.added b/datadog_checks_base/changelog.d/21316.added new file mode 100644 index 0000000000000..d5b7a0fb116e9 --- /dev/null +++ b/datadog_checks_base/changelog.d/21316.added @@ -0,0 +1 @@ +Add support for customizable cache keys to be used by the agent persistent cache. This allows integrations developers to define when the cache will be invalidated for each integration. \ No newline at end of file diff --git a/datadog_checks_base/datadog_checks/base/checks/base.py b/datadog_checks_base/datadog_checks/base/checks/base.py index 15c34f4db3830..06dcb7ee3f157 100644 --- a/datadog_checks_base/datadog_checks/base/checks/base.py +++ b/datadog_checks_base/datadog_checks/base/checks/base.py @@ -11,19 +11,10 @@ import re from collections import deque from os.path import basename -from typing import ( # noqa: F401 +from typing import ( TYPE_CHECKING, Any, - AnyStr, - Callable, - Deque, - Dict, - List, - Optional, - Sequence, - Set, - Tuple, - Union, + Deque, # noqa: F401 ) import lazy_loader @@ -294,10 +285,16 @@ def __init__(self, *args, **kwargs): # Functions that will be called exactly once (if successful) before the first check run self.check_initializations = deque() # type: Deque[Callable[[], None]] - self.check_initializations.append(self.load_configuration_models) + self.check_initializations.extend( + [ + self.load_configuration_models, + self.__initialize_persistent_cache_key_prefix, + ] + ) self.__formatted_tags = None self.__logs_enabled = None + self.__persistent_cache_key_prefix: str = "" if os.environ.get("GOFIPS", "0") == "1": enable_fips() @@ -491,6 +488,18 @@ def in_developer_mode(self): self._log_deprecation('in_developer_mode') return False + def persistent_cache_id(self) -> str: + """ + Returns the ID that identifies this check instance in the Agent persistent cache. + + Overriding this method modifies the default behavior of the AgentCheck and can + be used to customize when the persistent cache is invalidated. The default behavior + defines the persistent cache ID as the digest of the full check configuration. + + Some per-check isolation is still applied to avoid different checks with the same ID to share the same keys. + """ + return self.check_id.split(":")[-1] + def log_typos_in_options(self, user_config, models_config, level): # See Performance Optimizations in this package's README.md. from jellyfish import jaro_winkler_similarity @@ -1009,13 +1018,15 @@ def send_log(self, data, cursor=None, stream='default'): attributes['timestamp'] = int(timestamp * 1000) datadog_agent.send_log(json.encode(attributes), self.check_id) + if cursor is not None: - self.write_persistent_cache('log_cursor_{}'.format(stream), json.encode(cursor)) + self.write_persistent_cache(f'log_cursor_{stream}', json.encode(cursor)) def get_log_cursor(self, stream='default'): # type: (str) -> dict[str, Any] | None """Returns the most recent log cursor from disk.""" - data = self.read_persistent_cache('log_cursor_{}'.format(stream)) + data = self.read_persistent_cache(f'log_cursor_{stream}') + return json.decode(data) if data else None def _log_deprecation(self, deprecation_key, *args): @@ -1082,9 +1093,9 @@ def entrypoint(self, *args, **kwargs): return entrypoint - def _persistent_cache_id(self, key): - # type: (str) -> str - return '{}_{}'.format(self.check_id, key) + def __initialize_persistent_cache_key_prefix(self): + namespace = ':'.join(self.check_id.split(':')[:-1]) + self.__persistent_cache_key_prefix = f'{namespace}:{self.persistent_cache_id()}_' def read_persistent_cache(self, key): # type: (str) -> str @@ -1094,9 +1105,9 @@ def read_persistent_cache(self, key): key (str): the key to retrieve """ - return datadog_agent.read_persistent_cache(self._persistent_cache_id(key)) + return datadog_agent.read_persistent_cache(f"{self.__persistent_cache_key_prefix}{key}") - def write_persistent_cache(self, key, value): + def write_persistent_cache(self, key: str, value: str): # type: (str, str) -> None """Stores `value` in a persistent cache for this check instance. The cache is located in a path where the agent is guaranteed to have read & write permissions. Namely in @@ -1110,7 +1121,7 @@ def write_persistent_cache(self, key, value): value (str): the value to store """ - datadog_agent.write_persistent_cache(self._persistent_cache_id(key), value) + datadog_agent.write_persistent_cache(f"{self.__persistent_cache_key_prefix}{key}", value) def set_external_tags(self, external_tags): # type: (Sequence[ExternalTagType]) -> None @@ -1282,13 +1293,7 @@ def run(self): run_with_isolation(self, aggregator, datadog_agent) else: - while self.check_initializations: - initialization = self.check_initializations.popleft() - try: - initialization() - except Exception: - self.check_initializations.appendleft(initialization) - raise + self.run_check_initializations() instance = copy.deepcopy(self.instances[0]) @@ -1328,6 +1333,15 @@ def run(self): return error_report + def run_check_initializations(self): + while self.check_initializations: + initialization = self.check_initializations.popleft() + try: + initialization() + except Exception: + self.check_initializations.appendleft(initialization) + raise + def event(self, event): # type: (Event) -> None """Send an event. diff --git a/datadog_checks_base/datadog_checks/base/utils/persistent_cache.py b/datadog_checks_base/datadog_checks/base/utils/persistent_cache.py new file mode 100644 index 0000000000000..bbdb7a36c1398 --- /dev/null +++ b/datadog_checks_base/datadog_checks/base/utils/persistent_cache.py @@ -0,0 +1,33 @@ +from collections.abc import Collection + +from datadog_checks.base import AgentCheck +from datadog_checks.base.utils.containers import hash_mutable + + +def config_set_persistent_cache_id( + check: AgentCheck, + init_config_options: Collection[str] | None = None, + instance_config_options: Collection[str] | None = None, +): + """ + Returns an ID for the persisitent cache derives from a subset of the check's config options. + + If the value of any of the provided options changes, the generate cache ID will change. + + Parameters: + check: the check instance the key is going to be used for. + init_config_options: the subset of init_config options to use to generate the cache ID. + instance_config_options: the subset of config options to use to generate the cache ID. + """ + + if not init_config_options and not instance_config_options: + raise ValueError("At least one of init_config_options or instance_config_options must be provided") + + set_init_config_options = set(init_config_options) if init_config_options else set() + set_instance_config_options = set(instance_config_options) if instance_config_options else set() + + init_config_values = tuple(value for key, value in check.init_config.items() if key in set_init_config_options) + instance_config_values = tuple(value for key, value in check.instance.items() if key in set_instance_config_options) + + selected_values = init_config_values + instance_config_values + return str(hash_mutable(selected_values)).replace("-", "") diff --git a/datadog_checks_base/tests/base/checks/test_agent_check.py b/datadog_checks_base/tests/base/checks/test_agent_check.py index d8411507b6076..828ba9e49775c 100644 --- a/datadog_checks_base/tests/base/checks/test_agent_check.py +++ b/datadog_checks_base/tests/base/checks/test_agent_check.py @@ -42,12 +42,13 @@ def test_check_version(): def test_persistent_cache(datadog_agent): - check = AgentCheck() - check.check_id = 'test' + check = AgentCheck(init_config={}, instances=[{}]) + check.check_id = 'test:123' + check.run_check_initializations() check.write_persistent_cache('foo', 'bar') - assert datadog_agent.read_persistent_cache('test_foo') == 'bar' + assert datadog_agent.read_persistent_cache('test:123_foo') == 'bar' assert check.read_persistent_cache('foo') == 'bar' @@ -558,6 +559,45 @@ def test_cursor(self, datadog_agent): ) assert check.get_log_cursor() == {'data': '2'} + def custom_persistent_cache_id_check(self) -> AgentCheck: + class TestCheck(AgentCheck): + def persistent_cache_id(self) -> str: + return "always_the_same" + + return TestCheck(name="test", init_config={}, instances=[{}]) + + def test_cursor_with_custom_cache_invalidation_strategy_after_restart(self): + check = self.custom_persistent_cache_id_check() + check.check_id = 'test:bar:123' + check.send_log({'message': 'foo'}, cursor={'data': '1'}) + + assert check.get_log_cursor() == {'data': '1'} + + new_check = self.custom_persistent_cache_id_check() + new_check.check_id = 'test:bar:123456' + assert new_check.get_log_cursor() == {'data': '1'} + + check = self.custom_persistent_cache_id_check() + check.check_id = 'test:bar:123' + check.send_log({'message': 'foo'}, cursor={'data': '1'}) + + assert check.get_log_cursor() == {'data': '1'} + + new_check = self.custom_persistent_cache_id_check() + new_check.check_id = 'test:bar:123456' + assert new_check.get_log_cursor() == {'data': '1'} + + def test_cursor_invalidated_for_different_persistent_check_id_part(self): + check = self.custom_persistent_cache_id_check() + check.check_id = 'test:bar:123' + check.send_log({'message': 'foo'}, cursor={'data': '1'}) + + assert check.get_log_cursor() == {'data': '1'} + + new_check = self.custom_persistent_cache_id_check() + new_check.check_id = 'test:bar:123456' + assert new_check.get_log_cursor() == {'data': '1'} + def test_no_cursor(self, datadog_agent): check = AgentCheck('check_name', {}, [{}]) check.check_id = 'test' diff --git a/datadog_checks_base/tests/base/utils/test_persistent_cache.py b/datadog_checks_base/tests/base/utils/test_persistent_cache.py new file mode 100644 index 0000000000000..0fccc5f434059 --- /dev/null +++ b/datadog_checks_base/tests/base/utils/test_persistent_cache.py @@ -0,0 +1,134 @@ +from typing import Any + +import pytest + +from datadog_checks.base.checks.base import AgentCheck +from datadog_checks.base.utils.containers import hash_mutable +from datadog_checks.base.utils.persistent_cache import config_set_persistent_cache_id + + +@pytest.fixture(scope='module') +def config() -> dict[str, str]: + return { + 'init_option1': 'init_value1', + 'init_option2': 'init_value2', + 'global': 'init_global_value', + } + + +@pytest.fixture(scope='module') +def instance() -> dict[str, str]: + return { + 'instance_option1': 'instance_value1', + 'instance_option2': 'instance_value2', + 'global': 'instance_global_value', + } + + +def build_check(init_config: dict[str, Any], instance: dict[str, Any]) -> AgentCheck: + return TestCheck('test', init_config, [instance]) + + +@pytest.fixture(scope='module') +def check(config: dict[str, Any], instance: dict[str, Any]) -> AgentCheck: + return build_check(config, instance) + + +@pytest.fixture(scope='module') +def cache_id(check: AgentCheck) -> str: + return config_set_persistent_cache_id(check, init_config_options=['init_option1']) + + +class TestCheck(AgentCheck): + def check(self, instance): + pass + + +def normalized_hash(value: object) -> str: + return str(hash_mutable(value)).replace("-", "") + + +def test_config_set_caches(cache_id: str): + assert cache_id == normalized_hash(('init_value1',)) + + +def test_initialization_fails_without_any_options(check: AgentCheck): + with pytest.raises(ValueError): + config_set_persistent_cache_id(check) + + +def test_same_invalidation_token_on_changes_in_unlesected_other_options(config: dict[str, Any], check: AgentCheck): + cache_id = config_set_persistent_cache_id(check, init_config_options=['init_option1']) + expected_cache_id = normalized_hash(('init_value1',)) + assert cache_id == expected_cache_id + + config['init_option2'] = 'something elese' + cache_id = config_set_persistent_cache_id(check, init_config_options=['init_option1']) + assert cache_id == expected_cache_id + + +@pytest.mark.parametrize( + 'extra_option', + [ + ["item1", "item2"], + ("item1", "item3"), + {"key1": "item1", "key2": "item2"}, + {"key1": {"key2": "item2", "key3": "item3"}}, + ], + ids=["list", "tuple", "dict", "nested_dict"], +) +def test_support_for_complex_option_values( + check: AgentCheck, + instance: dict[str, Any], + extra_option: list[str] | tuple[str, str] | dict[str, str] | dict[str, dict[str, str]], +): + instance['extra_option'] = extra_option + cache_id = config_set_persistent_cache_id(check, instance_config_options=['extra_option']) + expected_cache_id = normalized_hash((extra_option,)) + assert cache_id == expected_cache_id + + +def deep_reverse(obj: Any) -> Any: + if isinstance(obj, dict): + return {k: deep_reverse(v) for k, v in reversed(list(obj.items()))} + if isinstance(obj, list): + return [deep_reverse(e) for e in reversed(obj)] + if isinstance(obj, tuple): + return tuple(deep_reverse(e) for e in reversed(obj)) + return obj + + +@pytest.mark.parametrize( + 'extra_option', + [ + ["item1", "item2"], + ("item1", "item2"), + {"key1": "item1", "key2": "item2"}, + { + "key1": {"key2": "item2", "key3": ["item3", "item4"]}, + }, + ], + ids=["list", "tuple", "dict", "nested_dict"], +) +def test_order_does_not_affect_key( + check: AgentCheck, + instance: dict[str, Any], + extra_option: list[str] | tuple[str, str] | dict[str, str] | dict[str, dict[str, str]], +): + instance['extra_option'] = extra_option + cache_id = config_set_persistent_cache_id(check, instance_config_options=['extra_option']) + expected_cache_id = normalized_hash((extra_option,)) + + instance['extra_option'] = deep_reverse(extra_option) + cache_id = config_set_persistent_cache_id(check, instance_config_options=['extra_option']) + assert cache_id == expected_cache_id + + +def test_same_option_names_in_init_config_and_instance_config(check: AgentCheck, instance: dict[str, Any]): + cache_id = config_set_persistent_cache_id(check, init_config_options=['global']) + expected_cache_id = normalized_hash(('init_global_value',)) + + # Modifying the same option name in instance has no effect on key + instance['global'] = 'something' + + assert cache_id == expected_cache_id