Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 83 additions & 31 deletions newrelic/api/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ def __init__(self, application, enabled=None, source=None):
self.tracestate = ""
self._priority = None
self._sampled = None
self._traceparent_sampled = None
self._remote_parent_sampled = None

self._distributed_trace_state = 0

Expand Down Expand Up @@ -569,7 +569,7 @@ def __exit__(self, exc, value, tb):
if self._settings.distributed_tracing.enabled:
# Sampled and priority need to be computed at the end of the
# transaction when distributed tracing or span events are enabled.
self._compute_sampled_and_priority()
self._make_sampling_decision()

self._cached_path._name = self.path
agent_attributes = self.agent_attributes
Expand Down Expand Up @@ -636,6 +636,7 @@ def __exit__(self, exc, value, tb):
trace_id=self.trace_id,
loop_time=self._loop_time,
root=root_node,
partial_granularity_sampled=hasattr(self, "partial_granularity_sampled"),
)

# Clear settings as we are all done and don't need it
Expand Down Expand Up @@ -1004,35 +1005,87 @@ def _update_agent_attributes(self):
def user_attributes(self):
return create_attributes(self._custom_params, DST_ALL, self.attribute_filter)

def sampling_algo_compute_sampled_and_priority(self):
if self._priority is None:
def sampling_algo_compute_sampled_and_priority(self, priority, sampled):
# self._priority and self._sampled are set when parsing the W3C tracestate
# or newrelic DT headers and may be overridden in _make_sampling_decision
# based on the configuration. The only time they are set in here is when the
# sampling decision must be made by the adaptive sampling algorithm.
if priority is None:
# Truncate priority field to 6 digits past the decimal.
self._priority = float(f"{random.random():.6f}") # noqa: S311
if self._sampled is None:
self._sampled = self._application.compute_sampled()
if self._sampled:
self._priority += 1

def _compute_sampled_and_priority(self):
if self._traceparent_sampled is None:
priority = float(f"{random.random():.6f}") # noqa: S311
if sampled is None:
_logger.debug("No trusted account id found. Sampling decision will be made by adaptive sampling algorithm.")
sampled = self._application.compute_sampled()
if sampled:
priority += 1
return priority, sampled

def _compute_sampled_and_priority(self, priority, sampled, remote_parent_sampled_path, remote_parent_sampled_setting, remote_parent_not_sampled_path, remote_parent_not_sampled_setting):
if self._remote_parent_sampled is None:
config = "default" # Use sampling algo.
elif self._traceparent_sampled:
setting_path = "distributed_tracing.sampler.remote_parent_sampled"
config = self.settings.distributed_tracing.sampler.remote_parent_sampled
else: # self._traceparent_sampled is False.
setting_path = "distributed_tracing.sampler.remote_parent_not_sampled"
config = self.settings.distributed_tracing.sampler.remote_parent_not_sampled

_logger.debug("Sampling decision made based on no remote parent sampling decision present.")
elif self._remote_parent_sampled:
setting_path = remote_parent_sampled_path
config = remote_parent_sampled_setting
_logger.debug("Sampling decision made based on remote_parent_sampled=%s and %s=%s.", self._remote_parent_sampled, setting_path, config)
else: # self._remote_parent_sampled is False.
setting_path = remote_parent_not_sampled_path
config = remote_parent_not_sampled_setting
_logger.debug("Sampling decision made based on remote_parent_sampled=%s and %s=%s.", self._remote_parent_sampled, setting_path, config)
if config == "always_on":
self._sampled = True
self._priority = 2.0
sampled = True
priority = 2.0
elif config == "always_off":
self._sampled = False
self._priority = 0
sampled = False
priority = 0
else:
if config != "default":
if config not in ("default", "adaptive"):
_logger.warning("%s=%s is not a recognized value. Using 'default' instead.", setting_path, config)
self.sampling_algo_compute_sampled_and_priority()

_logger.debug("Let adaptive sampler algorithm decide based on sampled=%s and priority=%s.", sampled, priority)
priority, sampled = self.sampling_algo_compute_sampled_and_priority(priority, sampled)
return priority, sampled

def _make_sampling_decision(self):
# The sampling decision is computed each time a DT header is generated for exit spans as it is needed
# to send the DT headers. Don't recompute the sampling decision multiple times as it is expensive.
if hasattr(self, "_sampling_decision_made"):
return
priority = self._priority
sampled = self._sampled
# Compute sampling decision for full granularity.
if self.settings.distributed_tracing.sampler.full_granularity.enabled:
_logger.debug("Full granularity tracing is enabled. Asking if full granularity wants to sample. priority=%s, sampled=%s", priority, sampled)
computed_priority, computed_sampled = self._compute_sampled_and_priority(
priority,
sampled,
remote_parent_sampled_path = "distributed_tracing.sampler.full_granularity.remote_parent_sampled",
remote_parent_sampled_setting = self.settings.distributed_tracing.sampler.full_granularity.remote_parent_sampled,
remote_parent_not_sampled_path = "distributed_tracing.sampler.full_granularity.remote_parent_not_sampled",
remote_parent_not_sampled_setting = self.settings.distributed_tracing.sampler.full_granularity.remote_parent_not_sampled,
)
_logger.debug("Full granularity sampling decision was %s with priority=%s.", sampled, priority)
if computed_sampled:
self._priority = computed_priority
self._sampled = computed_sampled
self._sampling_decision_made = True
return

# If full granularity is not going to sample, let partial granularity decide.
if self.settings.distributed_tracing.sampler.partial_granularity.enabled:
_logger.debug("Partial granularity tracing is enabled. Asking if partial granularity wants to sample.")
self._priority, self._sampled = self._compute_sampled_and_priority(
priority,
sampled,
remote_parent_sampled_path = "distributed_tracing.sampler.partial_granularity.remote_parent_sampled",
remote_parent_sampled_setting = self.settings.distributed_tracing.sampler.partial_granularity.remote_parent_sampled,
remote_parent_not_sampled_path = "distributed_tracing.sampler.partial_granularity.remote_parent_not_sampled",
remote_parent_not_sampled_setting = self.settings.distributed_tracing.sampler.partial_granularity.remote_parent_not_sampled,
)
_logger.debug("Partial granularity sampling decision was %s with priority=%s.", self._sampled, self._priority)
self._sampling_decision_made = True
if self._sampled:
self.partial_granularity_sampled = True

def _freeze_path(self):
if self._frozen_path is None:
Expand Down Expand Up @@ -1101,7 +1154,7 @@ def _create_distributed_trace_data(self):
if not (account_id and application_id and trusted_account_key and settings.distributed_tracing.enabled):
return

self._compute_sampled_and_priority()
self._make_sampling_decision()
data = {
"ty": "App",
"ac": account_id,
Expand Down Expand Up @@ -1184,6 +1237,7 @@ def _accept_distributed_trace_payload(self, payload, transport_type="HTTP"):
return False

try:
self._remote_parent_sampled = payload.get("sa")
version = payload.get("v")
major_version = version and int(version[0])

Expand Down Expand Up @@ -1254,10 +1308,8 @@ def _accept_distributed_trace_data(self, data, transport_type):

self._trace_id = data.get("tr")

priority = data.get("pr")
if priority is not None:
self._priority = priority
self._sampled = data.get("sa")
self._priority = data.get("pr")
self._sampled = data.get("sa")

if "ti" in data:
transport_start = data["ti"] / 1000.0
Expand Down Expand Up @@ -1297,6 +1349,7 @@ def accept_distributed_trace_headers(self, headers, transport_type="HTTP"):
try:
traceparent = ensure_str(traceparent).strip()
data = W3CTraceParent.decode(traceparent)
self._remote_parent_sampled = data.get("sa")
except:
data = None

Expand Down Expand Up @@ -1332,7 +1385,6 @@ def accept_distributed_trace_headers(self, headers, transport_type="HTTP"):
else:
self._record_supportability("Supportability/TraceContext/TraceState/NoNrEntry")

self._traceparent_sampled = data.get("sa")
self._accept_distributed_trace_data(data, transport_type)
self._record_supportability("Supportability/TraceContext/Accept/Success")
return True
Expand Down
8 changes: 8 additions & 0 deletions newrelic/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,8 +404,16 @@ def _process_configuration(section):
_process_setting(section, "ml_insights_events.enabled", "getboolean", None)
_process_setting(section, "distributed_tracing.enabled", "getboolean", None)
_process_setting(section, "distributed_tracing.exclude_newrelic_header", "getboolean", None)
_process_setting(section, "distributed_tracing.sampler.adaptive_sampling_target", "getint", None)
_process_setting(section, "distributed_tracing.sampler.remote_parent_sampled", "get", None)
_process_setting(section, "distributed_tracing.sampler.remote_parent_not_sampled", "get", None)
_process_setting(section, "distributed_tracing.sampler.full_granularity.enabled", "getboolean", None)
_process_setting(section, "distributed_tracing.sampler.full_granularity.remote_parent_sampled", "get", None)
_process_setting(section, "distributed_tracing.sampler.full_granularity.remote_parent_not_sampled", "get", None)
_process_setting(section, "distributed_tracing.sampler.partial_granularity.enabled", "getboolean", None)
_process_setting(section, "distributed_tracing.sampler.partial_granularity.type", "get", None)
_process_setting(section, "distributed_tracing.sampler.partial_granularity.remote_parent_sampled", "get", None)
_process_setting(section, "distributed_tracing.sampler.partial_granularity.remote_parent_not_sampled", "get", None)
_process_setting(section, "span_events.enabled", "getboolean", None)
_process_setting(section, "span_events.max_samples_stored", "getint", None)
_process_setting(section, "span_events.attributes.enabled", "getboolean", None)
Expand Down
1 change: 1 addition & 0 deletions newrelic/core/agent_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ def _connect_payload(app_name, linked_applications, environment, settings):
connect_settings["browser_monitoring.loader"] = settings["browser_monitoring.loader"]
connect_settings["browser_monitoring.debug"] = settings["browser_monitoring.debug"]
connect_settings["ai_monitoring.enabled"] = settings["ai_monitoring.enabled"]
connect_settings["distributed_tracing.sampler.adaptive_sampling_target"] = settings["distributed_tracing.sampler.adaptive_sampling_target"]

security_settings = {}
security_settings["capture_params"] = settings["capture_params"]
Expand Down
17 changes: 17 additions & 0 deletions newrelic/core/attribute.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,23 @@
"zeebe.client.resourceFile",
}

SPAN_ENTITY_RELATIONSHIP_ATTRIBUTES = {
"cloud.account.id",
"cloud.platform",
"cloud.region",
"cloud.resource_id",
"db.instance",
"db.system",
"http.url",
"messaging.destination.name",
"messaging.system",
"peer.hostname",
"server.address",
"server.port",
"span.kind",
}


MAX_NUM_USER_ATTRIBUTES = 128
MAX_ATTRIBUTE_LENGTH = 255
MAX_NUM_ML_USER_ATTRIBUTES = 64
Expand Down
45 changes: 45 additions & 0 deletions newrelic/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,14 @@ class DistributedTracingSamplerSettings(Settings):
pass


class DistributedTracingSamplerFullGranularitySettings(Settings):
pass


class DistributedTracingSamplerPartialGranularitySettings(Settings):
pass


class ServerlessModeSettings(Settings):
pass

Expand Down Expand Up @@ -507,6 +515,8 @@ class EventHarvestConfigHarvestLimitSettings(Settings):
_settings.debug = DebugSettings()
_settings.distributed_tracing = DistributedTracingSettings()
_settings.distributed_tracing.sampler = DistributedTracingSamplerSettings()
_settings.distributed_tracing.sampler.full_granularity = DistributedTracingSamplerFullGranularitySettings()
_settings.distributed_tracing.sampler.partial_granularity = DistributedTracingSamplerPartialGranularitySettings()
_settings.error_collector = ErrorCollectorSettings()
_settings.error_collector.attributes = ErrorCollectorAttributesSettings()
_settings.event_harvest_config = EventHarvestConfigSettings()
Expand Down Expand Up @@ -837,12 +847,32 @@ def default_otlp_host(host):
_settings.ml_insights_events.enabled = False

_settings.distributed_tracing.enabled = _environ_as_bool("NEW_RELIC_DISTRIBUTED_TRACING_ENABLED", default=True)
_settings.distributed_tracing.sampler.adaptive_sampling_target = _environ_as_int(
"NEW_RELIC_DISTRIBUTED_TRACING_SAMPLER_ADAPTIVE_SAMPLING_TARGET", default=10
)
_settings.distributed_tracing.sampler.remote_parent_sampled = os.environ.get(
"NEW_RELIC_DISTRIBUTED_TRACING_SAMPLER_REMOTE_PARENT_SAMPLED", "default"
)
_settings.distributed_tracing.sampler.remote_parent_not_sampled = os.environ.get(
"NEW_RELIC_DISTRIBUTED_TRACING_SAMPLER_REMOTE_PARENT_NOT_SAMPLED", "default"
)
_settings.distributed_tracing.sampler.full_granularity.enabled = _environ_as_bool("NEW_RELIC_DISTRIBUTED_TRACING_SAMPLER_FULL_GRANULARITY_ENABLED", default=True)
_settings.distributed_tracing.sampler.full_granularity.remote_parent_sampled = os.environ.get(
"NEW_RELIC_DISTRIBUTED_TRACING_SAMPLER_FULL_GRANULARITY_REMOTE_PARENT_SAMPLED", None
)
_settings.distributed_tracing.sampler.full_granularity.remote_parent_not_sampled = os.environ.get(
"NEW_RELIC_DISTRIBUTED_TRACING_SAMPLER_FULL_GRANULARITY_REMOTE_PARENT_NOT_SAMPLED", None
)
_settings.distributed_tracing.sampler.partial_granularity.enabled = _environ_as_bool("NEW_RELIC_DISTRIBUTED_TRACING_SAMPLER_PARTIAL_GRANULARITY_ENABLED", default=False)
_settings.distributed_tracing.sampler.partial_granularity.type = os.environ.get(
"NEW_RELIC_DISTRIBUTED_TRACING_SAMPLER_PARTIAL_GRANULARITY_TYPE", "essential"
)
_settings.distributed_tracing.sampler.partial_granularity.remote_parent_sampled = os.environ.get(
"NEW_RELIC_DISTRIBUTED_TRACING_SAMPLER_PARTIAL_GRANULARITY_REMOTE_PARENT_SAMPLED", "default"
)
_settings.distributed_tracing.sampler.partial_granularity.remote_parent_not_sampled = os.environ.get(
"NEW_RELIC_DISTRIBUTED_TRACING_SAMPLER_PARTIAL_GRANULARITY_REMOTE_PARENT_NOT_SAMPLED", "default"
)
_settings.distributed_tracing.exclude_newrelic_header = False
_settings.span_events.enabled = _environ_as_bool("NEW_RELIC_SPAN_EVENTS_ENABLED", default=True)
_settings.event_harvest_config.harvest_limits.span_event_data = _environ_as_int(
Expand Down Expand Up @@ -1369,9 +1399,24 @@ def finalize_application_settings(server_side_config=None, settings=_settings):

application_settings.attribute_filter = AttributeFilter(flatten_settings(application_settings))

simplify_distributed_tracing_sampler_granularity_settings(application_settings)

return application_settings


def simplify_distributed_tracing_sampler_granularity_settings(settings):
# Full granularity settings may appear under:
# * `distributed_tracing.sampler`
# * `distributed_tracing.sampler.full_granularity`
# The `distributed_tracing.sampler.full_granularity` path takes precedence.
# To simplify logic in the code that uses these settings, store the values that
# should be used at the `distributed_tracing.sampler.full_granularity` path.
if not settings.distributed_tracing.sampler.full_granularity.remote_parent_sampled:
settings.distributed_tracing.sampler.full_granularity.remote_parent_sampled = settings.distributed_tracing.sampler.remote_parent_sampled
if not settings.distributed_tracing.sampler.full_granularity.remote_parent_not_sampled:
settings.distributed_tracing.sampler.full_granularity.remote_parent_not_sampled = settings.distributed_tracing.sampler.remote_parent_not_sampled


def _remove_ignored_configs(server_settings):
if not server_settings.get("agent_config"):
return server_settings
Expand Down
6 changes: 5 additions & 1 deletion newrelic/core/data_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,11 @@ def send_ml_events(self, sampling_info, custom_event_data):

def send_span_events(self, sampling_info, span_event_data):
"""Called to submit sample set for span events."""

# TODO: remove this later after list types are suported.
for span_event in span_event_data:
ids = span_event[1].get("nr.ids")
if ids:
span_event[1]["nr.ids"] = ",".join(ids)
payload = (self.agent_run_id, sampling_info, span_event_data)
return self._protocol.send("span_event_data", payload)

Expand Down
4 changes: 2 additions & 2 deletions newrelic/core/database_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ def trace_node(self, stats, root, connections):
start_time=start_time, end_time=end_time, name=name, params=params, children=children, label=None
)

def span_event(self, *args, **kwargs):
def span_event(self, settings, base_attrs=None, parent_guid=None, attr_class=dict, partial_granularity_sampled=False, ct_exit_spans=None):
sql = self.formatted

if sql:
Expand All @@ -288,4 +288,4 @@ def span_event(self, *args, **kwargs):

self.agent_attributes["db.statement"] = sql

return super().span_event(*args, **kwargs)
return super().span_event(settings, base_attrs=base_attrs, parent_guid=parent_guid, attr_class=attr_class, partial_granularity_sampled=partial_granularity_sampled, ct_exit_spans=ct_exit_spans)
11 changes: 5 additions & 6 deletions newrelic/core/external_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,16 +169,15 @@ def trace_node(self, stats, root, connections):
start_time=start_time, end_time=end_time, name=name, params=params, children=children, label=None
)

def span_event(self, *args, **kwargs):
def span_event(self, settings, base_attrs=None, parent_guid=None, attr_class=dict, partial_granularity_sampled=False, ct_exit_spans=None):
self.agent_attributes["http.url"] = self.http_url
attrs = super().span_event(*args, **kwargs)
i_attrs = attrs[0]

i_attrs = (base_attrs and base_attrs.copy()) or attr_class()
i_attrs["category"] = "http"
i_attrs["span.kind"] = "client"
_, i_attrs["component"] = attribute.process_user_attribute("component", self.library)
i_attrs["component"] = self.library

if self.method:
_, i_attrs["http.method"] = attribute.process_user_attribute("http.method", self.method)
i_attrs["http.method"] = self.method

return attrs
return super().span_event(settings, base_attrs=i_attrs, parent_guid=parent_guid, attr_class=attr_class, partial_granularity_sampled=partial_granularity_sampled, ct_exit_spans=ct_exit_spans)
Loading
Loading