Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lustre/changelog.d/21270.added
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add `filesystem`, `jobid_var` and `jobid_name` tags
88 changes: 55 additions & 33 deletions lustre/datadog_checks/lustre/check.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,19 @@
FILESYSTEM_DISCOVERY_PARAM_MAPPING,
IGNORED_LNET_GROUPS,
IGNORED_STATS,
JOBID_TAG_PARAMS,
JOBSTATS_PARAMS,
TAGS_WITH_FILESYSTEM,
LustreParam,
)

RATE_UNITS: Set[str] = {'locks/s'}


class IgnoredFilesystemName(Exception):
pass


def _get_stat_type(suffix: str, unit: str) -> str:
"""
Returns the metric type for a given stat suffix and unit.
Expand Down Expand Up @@ -115,13 +121,13 @@ def check(self, _: Any) -> None:
self.submit_changelogs(self.changelog_lines_per_check)

self.submit_device_health(self.devices)
self.submit_param_data(self.params, self.filesystems)
self.submit_param_data(self.params)
self.submit_lnet_stats_metrics()
self.submit_lnet_local_ni_metrics()
self.submit_lnet_peer_ni_metrics()

if self.node_type in ('mds', 'oss'):
self.submit_jobstats_metrics(self.filesystems)
self.submit_jobstats_metrics()

def update(self) -> None:
'''
Expand Down Expand Up @@ -173,7 +179,7 @@ def _update_filesystems(self) -> None:

def _update_changelog_targets(self, devices: List[Dict[str, Any]], filesystems: List[str]) -> None:
self.log.debug('Determining changelog targets...')
target_regex = [filesystem + r'-MDT\d\d\d\d' for filesystem in filesystems]
target_regex = [re.escape(filesystem) + r'-MDT\d\d\d\d' for filesystem in filesystems]
targets = []
for device in devices:
for regex in target_regex:
Expand Down Expand Up @@ -218,24 +224,41 @@ def _run_command(self, bin: str, *args: str, sudo: bool = False) -> str:
self.log.error('Failed to run command %s: %s', cmd, e)
return ''

def submit_jobstats_metrics(self, filesystems: List[str]) -> None:
def submit_jobstats_metrics(self) -> None:
'''
Submit the jobstats metrics to Datadog.

For more information, see: https://doc.lustre.org/lustre_manual.xhtml#jobstats
'''
jobstats_params = self._get_jobstats_params_list()
for jobstats_param in jobstats_params:
device_name = jobstats_param.split('.')[1] # For example: lustre-MDT0000
if not any(device_name.startswith(fs) for fs in filesystems):
jobstats_param: LustreParam | None = None
for param in JOBSTATS_PARAMS:
if self.node_type in param.node_types:
jobstats_param = param
break
if jobstats_param is None:
self.log.debug('Invalid jobstats device_type: %s', self.node_type)
return
param_names = self._get_jobstats_params_list(jobstats_param)
jobid_config_tags = [
f'{param.regex}:{self._run_command("lctl", "get_param", "-ny", param.regex, sudo=True).strip()}'
for param in JOBID_TAG_PARAMS
]
for param_name in param_names:
try:
tags = (
self.tags
+ self._extract_tags_from_param(jobstats_param.regex, param_name, jobstats_param.wildcards)
+ jobid_config_tags
)
except IgnoredFilesystemName:
continue
jobstats_metrics = self._get_jobstats_metrics(jobstats_param).get('job_stats')
jobstats_metrics = self._get_jobstats_metrics(param_name).get('job_stats')
if jobstats_metrics is None:
self.log.debug('No jobstats metrics found for %s', jobstats_param)
self.log.debug('No jobstats metrics found for %s', param_name)
continue
for job in jobstats_metrics:
job_id = job.get('job_id', "unknown")
tags = self.tags + [f'device_name:{device_name}', f'job_id:{job_id}']
tags.append(f'job_id:{job_id}')
for metric_name, metric_values in job.items():
if not isinstance(metric_values, dict):
continue
Expand All @@ -254,18 +277,10 @@ def _submit_jobstat(self, name: str, values: Dict[str, Any], tags: List[str]) ->
metric_type = _get_stat_type(suffix, values['unit'])
self._submit(f'job_stats.{name}.{suffix}', value, metric_type, tags=tags)

def _get_jobstats_params_list(self) -> List[str]:
def _get_jobstats_params_list(self, param) -> List[str]:
'''
Get the jobstats params from the command line.
'''
param = None
for jobstat_param in JOBSTATS_PARAMS:
if self.node_type in jobstat_param.node_types:
param = jobstat_param
break
if param is None:
self.log.debug('Invalid jobstats device_type: %s', self.node_type)
return []
raw_params = self._run_command('lctl', 'list_param', param.regex, sudo=True)
return [line.strip() for line in raw_params.splitlines() if line.strip()]

Expand Down Expand Up @@ -374,7 +389,7 @@ def _get_lnet_metrics(self, stats_type: str = 'stats') -> Dict[str, Any]:
self.log.debug('Could not get lnet %s, caught exception: %s', stats_type, e)
return {}

def submit_param_data(self, params: Set[LustreParam], filesystems: List[str]) -> None:
def submit_param_data(self, params: Set[LustreParam]) -> None:
'''
Submit general stats and metrics from Lustre parameters.
'''
Expand All @@ -384,11 +399,10 @@ def submit_param_data(self, params: Set[LustreParam], filesystems: List[str]) ->
continue
matched_params = self._run_command('lctl', 'list_param', param.regex, sudo=True)
for param_name in matched_params.splitlines():
tags = self.tags + self._extract_tags_from_param(param.regex, param_name, param.wildcards)
if any(fs_tag in param.wildcards for fs_tag in ('device_name', 'device_uuid')):
if not any(fs in param_name for fs in filesystems):
self.log.debug('Skipping param %s as it did not match any filesystem', param_name)
continue
try:
tags = self.tags + self._extract_tags_from_param(param.regex, param_name, param.wildcards)
except IgnoredFilesystemName:
continue
raw_stats = self._run_command('lctl', 'get_param', '-ny', param_name, sudo=True)
if not param.regex.endswith('.stats'):
self._submit_param(param.prefix, param_name, tags)
Expand Down Expand Up @@ -432,7 +446,8 @@ def _extract_tags_from_param(self, param_regex: str, param_name: str, wildcards:
tags = []
regex_parts = param_regex.split('.')
param_parts = param_name.split('.')
wildcard_number = 0
wildcard_generator = (wildcard for wildcard in wildcards)
filesystem = None
if not len(regex_parts) == len(param_parts):
# Edge case: [email protected]
if len(regex_parts) + 3 == len(param_parts):
Expand All @@ -446,13 +461,20 @@ def _extract_tags_from_param(self, param_regex: str, param_name: str, wildcards:
return tags
for part_number, part in enumerate(regex_parts):
if part == '*':
if wildcard_number >= len(wildcards):
self.log.debug(
'Found %s wildcards, which exceeds available wildcard tags %s', wildcard_number, wildcards
)
try:
current_wildcard = next(wildcard_generator)
current_part = param_parts[part_number]
tags.append(f'{current_wildcard}:{current_part}')
if current_wildcard in TAGS_WITH_FILESYSTEM and filesystem is None:
filesystem = current_part.split('-')[0]
tags.append(f'filesystem:{filesystem}')
self.log.debug('Determined filesystem as %s from parameter %s', filesystem, param_name)
if filesystem not in self.filesystems:
self.log.debug('Skipping param %s as it did not match any filesystem', param_name)
raise IgnoredFilesystemName
except StopIteration:
self.log.debug('Number of found wildcards exceeds available wildcard tags %s', wildcards)
return tags
tags.append(f'{wildcards[wildcard_number]}:{param_parts[part_number]}')
wildcard_number += 1
return tags

def _parse_stats(self, raw_stats: str) -> Dict[str, Dict[str, Union[int, str]]]:
Expand Down
44 changes: 40 additions & 4 deletions lustre/datadog_checks/lustre/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@
'interfaces',
}

TAGS_WITH_FILESYSTEM = {
'device_name',
'device_uuid',
}


@dataclass(frozen=True)
class LustreParam:
Expand Down Expand Up @@ -47,6 +52,27 @@ class LustreParam:
),
]

JOBID_TAG_PARAMS = [
LustreParam(
regex=r'jobid_var',
node_types=(
'client',
'mds',
'oss',
),
fixture='disable',
),
LustreParam(
regex=r'jobid_name',
node_types=(
'client',
'mds',
'oss',
),
fixture='%e.%u',
),
]

CURATED_PARAMS = [
LustreParam(
regex=r'osd-*.*.blocksize',
Expand Down Expand Up @@ -280,7 +306,12 @@ class LustreParam:

EXTRA_STATS = [
# MDS (Metadata Server) params
LustreParam(regex='mds.MDS.mdt.stats', node_types=('mds',), prefix='mds.mdt', fixture='mds_mdt_stats.txt'),
LustreParam(
regex='mds.MDS.mdt.stats',
node_types=('mds',),
prefix='mds.mdt',
fixture='mds_mdt_stats.txt',
),
LustreParam(
regex='mdt.*.exports.*.stats',
node_types=('mds',),
Expand All @@ -307,20 +338,25 @@ class LustreParam:
LustreParam(
regex='ldlm.namespaces.*.pool.stats',
node_types=('client', 'mds', 'oss'),
wildcards=('device_nid',),
wildcards=('nid',),
prefix='ldlm.namespaces.pool',
fixture='all_ldlm_namespace_stats.txt',
),
# MGS (Management Server) params
LustreParam(
regex='mgs.MGS.exports.*.stats',
node_types=('mds',),
wildcards=('device_name', 'nid'),
wildcards=('nid',),
prefix='mgs.exports',
fixture='mds_mgs_export_stats.txt',
),
# OSS (Object Storage Server) params
LustreParam(regex='ost.OSS.ost.stats', node_types=('oss',), prefix='oss', fixture='oss_ost_stats.txt'),
LustreParam(
regex='ost.OSS.ost.stats',
node_types=('oss',),
prefix='oss',
fixture='oss_ost_stats.txt',
),
LustreParam(
regex='osc.*.stats',
node_types=('client',),
Expand Down
2 changes: 1 addition & 1 deletion lustre/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def dd_environment():

@pytest.fixture
def instance():
return {'node_type': 'client'}
return {'node_type': 'client', "filesystems": ["lustre", "*"]}


@pytest.fixture
Expand Down
Loading
Loading