Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion optimizely/cmab/cmab_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from optimizely.exceptions import CmabFetchError, CmabInvalidResponseError

# Default constants for CMAB requests
DEFAULT_MAX_RETRIES = 3
DEFAULT_MAX_RETRIES = 1
DEFAULT_INITIAL_BACKOFF = 0.1 # in seconds (100 ms)
DEFAULT_MAX_BACKOFF = 10 # in seconds
DEFAULT_BACKOFF_MULTIPLIER = 2.0
Expand Down
51 changes: 45 additions & 6 deletions optimizely/cmab/cmab_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,18 @@
import hashlib
import threading

from typing import Optional, List, TypedDict
from typing import Optional, List, TypedDict, Tuple
from optimizely.cmab.cmab_client import DefaultCmabClient
from optimizely.odp.lru_cache import LRUCache
from optimizely.optimizely_user_context import OptimizelyUserContext, UserAttributes
from optimizely.project_config import ProjectConfig
from optimizely.decision.optimizely_decide_option import OptimizelyDecideOption
from optimizely import logger as _logging
from optimizely.lib import pymmh3 as mmh3

NUM_LOCK_STRIPES = 1000
DEFAULT_CMAB_CACHE_TIMEOUT = 30 * 60 # 30 minutes
DEFAULT_CMAB_CACHE_SIZE = 10000


class CmabDecision(TypedDict):
Expand Down Expand Up @@ -65,26 +68,40 @@ def _get_lock_index(self, user_id: str, rule_id: str) -> int:
return hash_value % NUM_LOCK_STRIPES

def get_decision(self, project_config: ProjectConfig, user_context: OptimizelyUserContext,
rule_id: str, options: List[str]) -> CmabDecision:
rule_id: str, options: List[str]) -> Tuple[CmabDecision, List[str]]:

lock_index = self._get_lock_index(user_context.user_id, rule_id)
with self.locks[lock_index]:
return self._get_decision(project_config, user_context, rule_id, options)

def _get_decision(self, project_config: ProjectConfig, user_context: OptimizelyUserContext,
rule_id: str, options: List[str]) -> CmabDecision:
rule_id: str, options: List[str]) -> Tuple[CmabDecision, List[str]]:

filtered_attributes = self._filter_attributes(project_config, user_context, rule_id)
reasons = []

if OptimizelyDecideOption.IGNORE_CMAB_CACHE in options:
return self._fetch_decision(rule_id, user_context.user_id, filtered_attributes)
reason = f"Ignoring CMAB cache for user '{user_context.user_id}' and rule '{rule_id}'"
if self.logger:
self.logger.debug(reason)
reasons.append(reason)
cmab_decision = self._fetch_decision(rule_id, user_context.user_id, filtered_attributes)
return cmab_decision, reasons

if OptimizelyDecideOption.RESET_CMAB_CACHE in options:
reason = f"Resetting CMAB cache for user '{user_context.user_id}' and rule '{rule_id}'"
if self.logger:
self.logger.debug(reason)
reasons.append(reason)
self.cmab_cache.reset()

cache_key = self._get_cache_key(user_context.user_id, rule_id)

if OptimizelyDecideOption.INVALIDATE_USER_CMAB_CACHE in options:
reason = f"Invalidating CMAB cache for user '{user_context.user_id}' and rule '{rule_id}'"
if self.logger:
self.logger.debug(reason)
reasons.append(reason)
self.cmab_cache.remove(cache_key)

cached_value = self.cmab_cache.lookup(cache_key)
Expand All @@ -93,17 +110,39 @@ def _get_decision(self, project_config: ProjectConfig, user_context: OptimizelyU

if cached_value:
if cached_value['attributes_hash'] == attributes_hash:
return CmabDecision(variation_id=cached_value['variation_id'], cmab_uuid=cached_value['cmab_uuid'])
reason = f"CMAB cache hit for user '{user_context.user_id}' and rule '{rule_id}'"
if self.logger:
self.logger.debug(reason)
reasons.append(reason)
return CmabDecision(variation_id=cached_value['variation_id'],
cmab_uuid=cached_value['cmab_uuid']), reasons
else:
reason = (
f"CMAB cache attributes mismatch for user '{user_context.user_id}' "
f"and rule '{rule_id}', fetching new decision."
)
if self.logger:
self.logger.debug(reason)
reasons.append(reason)
self.cmab_cache.remove(cache_key)
else:
reason = f"CMAB cache miss for user '{user_context.user_id}' and rule '{rule_id}'"
if self.logger:
self.logger.debug(reason)
reasons.append(reason)

cmab_decision = self._fetch_decision(rule_id, user_context.user_id, filtered_attributes)
reason = f"CMAB decision is {cmab_decision}"
if self.logger:
self.logger.debug(reason)
reasons.append(reason)

self.cmab_cache.save(cache_key, {
'attributes_hash': attributes_hash,
'variation_id': cmab_decision['variation_id'],
'cmab_uuid': cmab_decision['cmab_uuid'],
})
return cmab_decision
return cmab_decision, reasons

def _fetch_decision(self, rule_id: str, user_id: str, attributes: UserAttributes) -> CmabDecision:
cmab_uuid = str(uuid.uuid4())
Expand Down
3 changes: 2 additions & 1 deletion optimizely/decision_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,10 @@ def _get_decision_for_cmab_experiment(
# User is in CMAB allocation, proceed to CMAB decision
try:
options_list = list(options) if options is not None else []
cmab_decision = self.cmab_service.get_decision(
cmab_decision, cmab_reasons = self.cmab_service.get_decision(
project_config, user_context, experiment.id, options_list
)
decide_reasons.extend(cmab_reasons)
return {
"error": False,
"result": cmab_decision,
Expand Down
2 changes: 1 addition & 1 deletion optimizely/event/event_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class Signal:

def __init__(
self,
event_dispatcher: Optional[type[EventDispatcher] | CustomEventDispatcher] = None,
event_dispatcher: Optional[EventDispatcher | CustomEventDispatcher] = None,
logger: Optional[_logging.Logger] = None,
start_on_init: bool = False,
event_queue: Optional[queue.Queue[UserEvent | Signal]] = None,
Expand Down
31 changes: 16 additions & 15 deletions optimizely/optimizely.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,13 @@
from .optimizely_user_context import OptimizelyUserContext, UserAttributes
from .project_config import ProjectConfig
from .cmab.cmab_client import DefaultCmabClient, CmabRetryConfig
from .cmab.cmab_service import DefaultCmabService, CmabCacheValue
from .cmab.cmab_service import DefaultCmabService, CmabCacheValue, DEFAULT_CMAB_CACHE_SIZE, DEFAULT_CMAB_CACHE_TIMEOUT

if TYPE_CHECKING:
# prevent circular dependency by skipping import at runtime
from .user_profile import UserProfileService
from .helpers.event_tag_utils import EventTags

# Default constants for CMAB cache
DEFAULT_CMAB_CACHE_TIMEOUT = 30 * 60 * 1000 # 30 minutes in milliseconds
DEFAULT_CMAB_CACHE_SIZE = 1000


class Optimizely:
""" Class encapsulating all SDK functionality. """
Expand All @@ -77,6 +73,7 @@ def __init__(
default_decide_options: Optional[list[str]] = None,
event_processor_options: Optional[dict[str, Any]] = None,
settings: Optional[OptimizelySdkSettings] = None,
cmab_service: Optional[DefaultCmabService] = None,
) -> None:
""" Optimizely init method for managing Custom projects.

Expand Down Expand Up @@ -178,16 +175,20 @@ def __init__(
self.event_builder = event_builder.EventBuilder()

# Initialize CMAB components
self.cmab_client = DefaultCmabClient(
retry_config=CmabRetryConfig(),
logger=self.logger
)
self.cmab_cache: LRUCache[str, CmabCacheValue] = LRUCache(DEFAULT_CMAB_CACHE_SIZE, DEFAULT_CMAB_CACHE_TIMEOUT)
self.cmab_service = DefaultCmabService(
cmab_cache=self.cmab_cache,
cmab_client=self.cmab_client,
logger=self.logger
)
if cmab_service:
self.cmab_service = cmab_service
else:
self.cmab_client = DefaultCmabClient(
retry_config=CmabRetryConfig(),
logger=self.logger
)
self.cmab_cache: LRUCache[str, CmabCacheValue] = LRUCache(DEFAULT_CMAB_CACHE_SIZE,
DEFAULT_CMAB_CACHE_TIMEOUT)
self.cmab_service = DefaultCmabService(
cmab_cache=self.cmab_cache,
cmab_client=self.cmab_client,
logger=self.logger
)
self.decision_service = decision_service.DecisionService(self.logger, user_profile_service, self.cmab_service)
self.user_profile_service = user_profile_service

Expand Down
72 changes: 70 additions & 2 deletions optimizely/optimizely_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
from .event_dispatcher import EventDispatcher, CustomEventDispatcher
from .notification_center import NotificationCenter
from .optimizely import Optimizely
from .odp.lru_cache import LRUCache
from .cmab.cmab_client import DefaultCmabClient, CmabRetryConfig
from .cmab.cmab_service import DefaultCmabService, CmabCacheValue, DEFAULT_CMAB_CACHE_TIMEOUT, DEFAULT_CMAB_CACHE_SIZE

if TYPE_CHECKING:
# prevent circular dependenacy by skipping import at runtime
Expand All @@ -36,6 +39,9 @@ class OptimizelyFactory:
max_event_flush_interval: Optional[int] = None
polling_interval: Optional[float] = None
blocking_timeout: Optional[int] = None
cmab_cache_size: Optional[int] = None
cmab_cache_ttl: Optional[int] = None
cmab_custom_cache: Optional[LRUCache[str, CmabCacheValue]] = None

@staticmethod
def set_batch_size(batch_size: int) -> int:
Expand Down Expand Up @@ -75,6 +81,51 @@ def set_blocking_timeout(blocking_timeout: int) -> int:
OptimizelyFactory.blocking_timeout = blocking_timeout
return OptimizelyFactory.blocking_timeout

@staticmethod
def set_cmab_cache_size(cache_size: int, logger: Optional[optimizely_logger.Logger] = None) -> Optional[int]:
""" Convenience method for setting the maximum number of items in CMAB cache.
Args:
cache_size: Maximum number of items in CMAB cache.
logger: Optional logger for logging messages.
"""
logger = logger or optimizely_logger.NoOpLogger()

if not isinstance(cache_size, int) or cache_size <= 0:
logger.error(
f"CMAB cache size is invalid, setting to default size {DEFAULT_CMAB_CACHE_SIZE}."
)
return None

OptimizelyFactory.cmab_cache_size = cache_size
return OptimizelyFactory.cmab_cache_size

@staticmethod
def set_cmab_cache_ttl(cache_ttl: int, logger: Optional[optimizely_logger.Logger] = None) -> Optional[int]:
""" Convenience method for setting CMAB cache TTL.
Args:
cache_ttl: Time in seconds for cache entries to live.
logger: Optional logger for logging messages.
"""
logger = logger or optimizely_logger.NoOpLogger()

if not isinstance(cache_ttl, (int, float)) or cache_ttl <= 0:
logger.error(
f"CMAB cache TTL is invalid, setting to default TTL {DEFAULT_CMAB_CACHE_TIMEOUT}."
)
return None

OptimizelyFactory.cmab_cache_ttl = int(cache_ttl)
return OptimizelyFactory.cmab_cache_ttl

@staticmethod
def set_cmab_custom_cache(custom_cache: LRUCache[str, CmabCacheValue]) -> LRUCache[str, CmabCacheValue]:
""" Convenience method for setting custom CMAB cache.
Args:
custom_cache: Cache implementation with lookup, save, remove, and reset methods.
"""
OptimizelyFactory.cmab_custom_cache = custom_cache
return OptimizelyFactory.cmab_custom_cache

@staticmethod
def default_instance(sdk_key: str, datafile: Optional[str] = None) -> Optimizely:
""" Returns a new optimizely instance..
Expand Down Expand Up @@ -104,9 +155,17 @@ def default_instance(sdk_key: str, datafile: Optional[str] = None) -> Optimizely
notification_center=notification_center,
)

# Initialize CMAB components
cmab_client = DefaultCmabClient(retry_config=CmabRetryConfig(), logger=logger)
cmab_cache = OptimizelyFactory.cmab_custom_cache or LRUCache(
OptimizelyFactory.cmab_cache_size or DEFAULT_CMAB_CACHE_SIZE,
OptimizelyFactory.cmab_cache_ttl or DEFAULT_CMAB_CACHE_TIMEOUT
)
cmab_service = DefaultCmabService(cmab_cache, cmab_client, logger)

optimizely = Optimizely(
datafile, None, logger, error_handler, None, None, sdk_key, config_manager, notification_center,
event_processor
event_processor, cmab_service=cmab_service
)
return optimizely

Expand Down Expand Up @@ -174,7 +233,16 @@ def custom_instance(
notification_center=notification_center,
)

# Initialize CMAB components
cmab_client = DefaultCmabClient(retry_config=CmabRetryConfig(), logger=logger)
cmab_cache = OptimizelyFactory.cmab_custom_cache or LRUCache(
OptimizelyFactory.cmab_cache_size or DEFAULT_CMAB_CACHE_SIZE,
OptimizelyFactory.cmab_cache_ttl or DEFAULT_CMAB_CACHE_TIMEOUT
)
cmab_service = DefaultCmabService(cmab_cache, cmab_client, logger)

return Optimizely(
datafile, event_dispatcher, logger, error_handler, skip_json_validation, user_profile_service,
sdk_key, config_manager, notification_center, event_processor, settings=settings
sdk_key, config_manager, notification_center, event_processor, settings=settings,
cmab_service=cmab_service
)
10 changes: 5 additions & 5 deletions tests/test_cmab_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def test_returns_decision_from_cache_when_valid(self):
"cmab_uuid": "uuid-123"
}

decision = self.cmab_service.get_decision(
decision, _ = self.cmab_service.get_decision(
self.mock_project_config, self.mock_user_context, "exp1", []
)

Expand All @@ -72,7 +72,7 @@ def test_ignores_cache_when_option_given(self):
self.mock_cmab_client.fetch_decision.return_value = "varB"
expected_attributes = {"age": 25, "location": "USA"}

decision = self.cmab_service.get_decision(
decision, _ = self.cmab_service.get_decision(
self.mock_project_config,
self.mock_user_context,
"exp1",
Expand Down Expand Up @@ -105,7 +105,7 @@ def test_invalidates_user_cache_when_option_given(self):
def test_resets_cache_when_option_given(self):
self.mock_cmab_client.fetch_decision.return_value = "varD"

decision = self.cmab_service.get_decision(
decision, _ = self.cmab_service.get_decision(
self.mock_project_config,
self.mock_user_context,
"exp1",
Expand All @@ -128,7 +128,7 @@ def test_new_decision_when_hash_changes(self):
expected_hash = self.cmab_service._hash_attributes(expected_attribute)
expected_key = self.cmab_service._get_cache_key("user123", "exp1")

decision = self.cmab_service.get_decision(self.mock_project_config, self.mock_user_context, "exp1", [])
decision, _ = self.cmab_service.get_decision(self.mock_project_config, self.mock_user_context, "exp1", [])
self.mock_cmab_cache.remove.assert_called_once_with(expected_key)
self.mock_cmab_cache.save.assert_called_once_with(
expected_key,
Expand Down Expand Up @@ -171,7 +171,7 @@ def test_only_cmab_attributes_passed_to_client(self):
}
self.mock_cmab_client.fetch_decision.return_value = "varF"

decision = self.cmab_service.get_decision(
decision, _ = self.cmab_service.get_decision(
self.mock_project_config,
self.mock_user_context,
"exp1",
Expand Down
11 changes: 7 additions & 4 deletions tests/test_decision_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -792,10 +792,13 @@ def test_get_variation_cmab_experiment_user_in_traffic_allocation(self):
'logger') as mock_logger:

# Configure CMAB service to return a decision
mock_cmab_service.get_decision.return_value = {
'variation_id': '111151',
'cmab_uuid': 'test-cmab-uuid-123'
}
mock_cmab_service.get_decision.return_value = (
{
'variation_id': '111151',
'cmab_uuid': 'test-cmab-uuid-123'
},
[] # reasons list
)

# Call get_variation with the CMAB experiment
variation_result = self.decision_service.get_variation(
Expand Down