Skip to content
Open
1 change: 1 addition & 0 deletions datadog_checks_base/changelog.d/21316.added
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add support for customizable cache keys to be used by the agent persistent cache. This allows integrations developers to define when the cache will be invalidated for each integration.
46 changes: 27 additions & 19 deletions datadog_checks_base/datadog_checks/base/checks/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,10 @@
import re
from collections import deque
from os.path import basename
from typing import ( # noqa: F401
from typing import (
TYPE_CHECKING,
Any,
AnyStr,
Callable,
Deque,
Dict,
List,
Optional,
Sequence,
Set,
Tuple,
Union,
Deque, # noqa: F401
)

import lazy_loader
Expand Down Expand Up @@ -71,6 +62,7 @@
import traceback as _module_traceback
import unicodedata as _module_unicodedata

from datadog_checks.base.utils.cache_key.base import CacheKey
from datadog_checks.base.utils.diagnose import Diagnosis
from datadog_checks.base.utils.http import RequestsWrapper
from datadog_checks.base.utils.metadata import MetadataManager
Expand Down Expand Up @@ -298,6 +290,7 @@ def __init__(self, *args, **kwargs):

self.__formatted_tags = None
self.__logs_enabled = None
self.__persistent_cache_key: CacheKey | None = None

if os.environ.get("GOFIPS", "0") == "1":
enable_fips()
Expand Down Expand Up @@ -491,6 +484,16 @@ def in_developer_mode(self):
self._log_deprecation('in_developer_mode')
return False

def persistent_cache_key(self) -> CacheKey:
"""
Returns the cache key for the logs persistent cache.

Override this method to modify how the log cursor is persisted between agent restarts.
"""
from datadog_checks.base.utils.cache_key.full_config import FullConfigCacheKey

return FullConfigCacheKey(self)

def log_typos_in_options(self, user_config, models_config, level):
# See Performance Optimizations in this package's README.md.
from jellyfish import jaro_winkler_similarity
Expand Down Expand Up @@ -1009,13 +1012,15 @@ def send_log(self, data, cursor=None, stream='default'):
attributes['timestamp'] = int(timestamp * 1000)

datadog_agent.send_log(json.encode(attributes), self.check_id)

if cursor is not None:
self.write_persistent_cache('log_cursor_{}'.format(stream), json.encode(cursor))
self.write_persistent_cache(f'log_cursor_{stream}', json.encode(cursor))

def get_log_cursor(self, stream='default'):
# type: (str) -> dict[str, Any] | None
"""Returns the most recent log cursor from disk."""
data = self.read_persistent_cache('log_cursor_{}'.format(stream))
data = self.read_persistent_cache(f'log_cursor_{stream}')

return json.decode(data) if data else None

def _log_deprecation(self, deprecation_key, *args):
Expand Down Expand Up @@ -1082,9 +1087,10 @@ def entrypoint(self, *args, **kwargs):

return entrypoint

def _persistent_cache_id(self, key):
# type: (str) -> str
return '{}_{}'.format(self.check_id, key)
def __initialize_persistent_cache_key(self) -> CacheKey:
if self.__persistent_cache_key is None:
self._persistent_cache_key = self.persistent_cache_key()
return self._persistent_cache_key

def read_persistent_cache(self, key):
# type: (str) -> str
Expand All @@ -1094,9 +1100,10 @@ def read_persistent_cache(self, key):
key (str):
the key to retrieve
"""
return datadog_agent.read_persistent_cache(self._persistent_cache_id(key))
cache_key = self.__persistent_cache_key or self.__initialize_persistent_cache_key()
return datadog_agent.read_persistent_cache(cache_key.key_for(key))

def write_persistent_cache(self, key, value):
def write_persistent_cache(self, key: str, value: str, cache_key: CacheKey | None = None):
# type: (str, str) -> None
"""Stores `value` in a persistent cache for this check instance.
The cache is located in a path where the agent is guaranteed to have read & write permissions. Namely in
Expand All @@ -1110,7 +1117,8 @@ def write_persistent_cache(self, key, value):
value (str):
the value to store
"""
datadog_agent.write_persistent_cache(self._persistent_cache_id(key), value)
cache_key = self.__persistent_cache_key or self.__initialize_persistent_cache_key()
datadog_agent.write_persistent_cache(cache_key.key_for(key), value)

def set_external_tags(self, external_tags):
# type: (Sequence[ExternalTagType]) -> None
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# (C) Datadog, Inc. 2025-present
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)
49 changes: 49 additions & 0 deletions datadog_checks_base/datadog_checks/base/utils/cache_key/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# (C) Datadog, Inc. 2025-present
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)
from __future__ import annotations

from abc import ABC, abstractmethod
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from datadog_checks.base import AgentCheck


class CacheKey(ABC):
"""
Abstract base class for cache keys management.

Any implementation of this class provides the logic to generate cache keys to be used in the Agent persistent
cache.
"""

def __init__(self, check: AgentCheck):
self.check = check
self.__cache_key: str | None = None

def key(self) -> str:
Comment on lines +24 to +25
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def key(self) -> str:
@final
def key(self) -> str:

"""
Returns the cache key for the particular implementation.
"""
if self.__cache_key is not None:
return self.__cache_key

check_id_prefix = ":".join(self.check.check_id.split(":")[:-1])
Copy link
Contributor

@ofek ofek Sep 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no purpose to the key/base key dichotomy, it only adds unnecessary length to the filesystem path in the default case.

Given integration foo, instance name bar, instance hash 123 and context custom_key:

  • key + base key
    • Default: <run_path>/foo/barfoobar123_custom_key
    • Config set: <run_path>/foo/bar456_custom_key
  • key
    • Default: <run_path>/foo/bar123_custom_key
    • Config set: <run_path>/foo/bar456_custom_key

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason why I have split those 2 is to enforce some check related isolation between different checks using the same custom key. My intention here is to avoid a developer setting a base_key that is always constant, with no check_id related prefix and unintentionally creating a poor customer experience for all checks they develop following the same practice.

For example, imagine a partner that decide that their base key is going to be

class PartnerKey(CheckKey):
    def base_key(self) -> str:
        return "partner_static_key"

And they start using this kind of key for all integrations they develop that require the same behavior.

I can easily see a scenario in which they would naively expect this key to only affect this check, but without the enforcement on our side of a check-scoped prefix, this would not be true. Without the separation between key and base_key there is only one single method providing a key which the developer have full control over.

With that separation they can still create this scenario but they would need to intentionally override the key method as well. Like what we are doing in the FullConfigCacheKey, where we override both but once this is in the hand of the developers I feel there needs to be some protection against it.

self.__cache_key = f"{check_id_prefix}:{self.base_key()}"

return self.__cache_key

@abstractmethod
def base_key(self) -> str:
"""
Abstract method that derives the cache key for the particular implementation.
This method must return a stable key that only differs between instances based on the
specific implmentation of the invalidation logic.
"""

def key_for(self, context: str) -> str:
"""
Returns a key that is a combination of the base key and the provided context.
"""
return f"{self.key()}_{context}"
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# (C) Datadog, Inc. 2025-present
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)
from __future__ import annotations

from collections.abc import Collection
from typing import TYPE_CHECKING

from datadog_checks.base.utils.containers import hash_mutable

from .base import CacheKey

if TYPE_CHECKING:
from datadog_checks.base import AgentCheck


class ConfigSetCacheKey(CacheKey):
"""
Cache key that invalidates the cache when a subset of the check's config options changes.

Parameters:
check: the check instance the key is going to be used for.
init_config_options: the subset of init_config options to use to generate the cache key.
instance_config_options: the subset of config options to use to generate the cache key.
"""

def __init__(
self,
check: AgentCheck,
*,
init_config_options: Collection[str] | None = None,
instance_config_options: Collection[str] | None = None,
):
super().__init__(check)
self.init_config_options = set(init_config_options) if init_config_options else set()
self.instance_config_options = set(instance_config_options) if instance_config_options else set()

if not self.init_config_options and not self.instance_config_options:
raise ValueError("At least one of init_config_options or instance_config_options must be provided")

# Config cannot change on the fly, so we can cache the key
self.__key: str | None = None

def base_key(self) -> str:
if self.__key is not None:
return self.__key

init_config_values = tuple(
value for key, value in self.check.init_config.items() if key in self.init_config_options
)
instance_config_values = tuple(
value for key, value in self.check.instance.items() if key in self.instance_config_options
)

selected_values = init_config_values + instance_config_values
self.__key = str(hash_mutable(selected_values)).replace("-", "")
return self.__key
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# (C) Datadog, Inc. 2025-present
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)
from __future__ import annotations

from .base import CacheKey


class FullConfigCacheKey(CacheKey):
"""
Cache key based on the check_id of the check where it is being used.
The check_id includes a digest of the full configuration of the check. The cache is invalidated
whenever the configuration of the check changes.
"""

def key(self) -> str:
return self.check.check_id

def base_key(self) -> str:
return self.check.check_id
Comment on lines +17 to +21
Copy link
Contributor

@dkirov-dd dkirov-dd Sep 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def key(self) -> str:
return self.check.check_id
def base_key(self) -> str:
return self.check.check_id
self._CacheKey__cache_key = self.check.check_id
def base_key(self) -> str:
return "" # This method is not used for this CacheKey implementation

39 changes: 39 additions & 0 deletions datadog_checks_base/tests/base/checks/test_agent_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from datadog_checks.base import AgentCheck, to_native_string
from datadog_checks.base import __version__ as base_package_version
from datadog_checks.base.utils.cache_key.base import CacheKey

from .utils import BaseModelTest

Expand Down Expand Up @@ -558,6 +559,44 @@ def test_cursor(self, datadog_agent):
)
assert check.get_log_cursor() == {'data': '2'}

def test_cursor_with_custom_cache_key_after_restart(self):
class ConstantCacheKey(CacheKey):
def base_key(self) -> str:
return "always_the_same"

class TestCheck(AgentCheck):
def persistent_cache_key(self) -> CacheKey:
return ConstantCacheKey(self)
Comment on lines +563 to +569
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can move this out of the two tests.


check = TestCheck(name="test", init_config={}, instances=[{}])
check.check_id = 'test:bar:123'
check.send_log({'message': 'foo'}, cursor={'data': '1'})

assert check.get_log_cursor() == {'data': '1'}

new_check = TestCheck(name="test", init_config={}, instances=[{}])
new_check.check_id = 'test:bar:123456'
assert new_check.get_log_cursor() == {'data': '1'}

def test_cursor_invalidated_for_different_persistent_check_id_part(self):
class ConstantCacheKey(CacheKey):
def base_key(self) -> str:
return "always_the_same"

class TestCheck(AgentCheck):
def persistent_cache_key(self) -> CacheKey:
return ConstantCacheKey(self)

check = TestCheck(name="test", init_config={}, instances=[{}])
check.check_id = 'test:bar:123'
check.send_log({'message': 'foo'}, cursor={'data': '1'})

assert check.get_log_cursor() == {'data': '1'}

new_check = TestCheck(name="another_test", init_config={}, instances=[{}])
new_check.check_id = 'test2:bar:456'
Comment on lines +596 to +597
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not clear to me whether it is the new_check.check_id or the TestCheck.name that invalidated the cache.
Could you only modify one of the two and keep the other one identical to the first TestCheck?

assert new_check.get_log_cursor() is None

def test_no_cursor(self, datadog_agent):
check = AgentCheck('check_name', {}, [{}])
check.check_id = 'test'
Expand Down
Empty file.
Loading
Loading