diff --git a/.github/workflows/drishti-darshan-3.4.0.yml b/.github/workflows/drishti-darshan-3.4.0.yml index 6535241..3665a98 100644 --- a/.github/workflows/drishti-darshan-3.4.0.yml +++ b/.github/workflows/drishti-darshan-3.4.0.yml @@ -12,7 +12,7 @@ jobs: timeout-minutes: 60 steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 with: submodules: true @@ -93,7 +93,7 @@ jobs: - name: Upload Artifact if: always() - uses: actions/upload-artifact@v2 + uses: actions/upload-artifact@v4 with: name: tests path: sample/** diff --git a/.github/workflows/drishti-darshan-3.4.1.yml b/.github/workflows/drishti-darshan-3.4.1.yml index 29ed011..155f6e7 100644 --- a/.github/workflows/drishti-darshan-3.4.1.yml +++ b/.github/workflows/drishti-darshan-3.4.1.yml @@ -12,7 +12,7 @@ jobs: timeout-minutes: 60 steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 with: submodules: true @@ -93,7 +93,7 @@ jobs: - name: Upload Artifact if: always() - uses: actions/upload-artifact@v2 + uses: actions/upload-artifact@v4 with: name: tests path: sample/** diff --git a/.github/workflows/drishti-darshan-3.4.2.yml b/.github/workflows/drishti-darshan-3.4.2.yml index 0398f37..686aaa2 100644 --- a/.github/workflows/drishti-darshan-3.4.2.yml +++ b/.github/workflows/drishti-darshan-3.4.2.yml @@ -12,7 +12,7 @@ jobs: timeout-minutes: 60 steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 with: submodules: true @@ -93,7 +93,7 @@ jobs: - name: Upload Artifact if: always() - uses: actions/upload-artifact@v2 + uses: actions/upload-artifact@v4 with: name: tests path: sample/** diff --git a/.gitignore b/.gitignore index 2ed7bdc..29bd232 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,310 @@ +tensorflow_unet3d_darshan_per_rank_workload/ + +# Created by https://www.toptal.com/developers/gitignore/api/python,visualstudiocode,pycharm +# Edit at https://www.toptal.com/developers/gitignore?templates=python,visualstudiocode,pycharm + +### PyCharm ### +# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio, WebStorm and Rider +# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839 + +# User-specific stuff +.idea/**/workspace.xml +.idea/**/tasks.xml +.idea/**/usage.statistics.xml +.idea/**/dictionaries +.idea/**/shelf + +# AWS User-specific +.idea/**/aws.xml + +# Generated files +.idea/**/contentModel.xml + +# Sensitive or high-churn files +.idea/**/dataSources/ +.idea/**/dataSources.ids +.idea/**/dataSources.local.xml +.idea/**/sqlDataSources.xml +.idea/**/dynamic.xml +.idea/**/uiDesigner.xml +.idea/**/dbnavigator.xml + +# Gradle +.idea/**/gradle.xml +.idea/**/libraries + +# Gradle and Maven with auto-import +# When using Gradle or Maven with auto-import, you should exclude module files, +# since they will be recreated, and may cause churn. Uncomment if using +# auto-import. +# .idea/artifacts +# .idea/compiler.xml +# .idea/jarRepositories.xml +# .idea/modules.xml +# .idea/*.iml +# .idea/modules +# *.iml +# *.ipr + +# CMake +cmake-build-*/ + +# Mongo Explorer plugin +.idea/**/mongoSettings.xml + +# File-based project format +*.iws + +# IntelliJ +out/ + +# mpeltonen/sbt-idea plugin +.idea_modules/ + +# JIRA plugin +atlassian-ide-plugin.xml + +# Cursive Clojure plugin +.idea/replstate.xml + +# SonarLint plugin +.idea/sonarlint/ + +# Crashlytics plugin (for Android Studio and IntelliJ) +com_crashlytics_export_strings.xml +crashlytics.properties +crashlytics-build.properties +fabric.properties + +# Editor-based Rest Client +.idea/httpRequests + +# Android studio 3.1+ serialized cache file +.idea/caches/build_file_checksums.ser + +### PyCharm Patch ### +# Comment Reason: https://github.com/joeblau/gitignore.io/issues/186#issuecomment-215987721 + +# *.iml +# modules.xml +# .idea/misc.xml +# *.ipr + +# Sonarlint plugin +# https://plugins.jetbrains.com/plugin/7973-sonarlint +.idea/**/sonarlint/ + +# SonarQube Plugin +# https://plugins.jetbrains.com/plugin/7238-sonarqube-community-plugin +.idea/**/sonarIssues.xml + +# Markdown Navigator plugin +# https://plugins.jetbrains.com/plugin/7896-markdown-navigator-enhanced +.idea/**/markdown-navigator.xml +.idea/**/markdown-navigator-enh.xml +.idea/**/markdown-navigator/ + +# Cache file creation bug +# See https://youtrack.jetbrains.com/issue/JBR-2257 +.idea/$CACHE_FILE$ + +# CodeStream plugin +# https://plugins.jetbrains.com/plugin/12206-codestream +.idea/codestream.xml + +# Azure Toolkit for IntelliJ plugin +# https://plugins.jetbrains.com/plugin/8053-azure-toolkit-for-intellij +.idea/**/azureSettings.xml + +### Python ### +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python build/ +develop-eggs/ dist/ -*.egg-info/ \ No newline at end of file +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# poetry +# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control +#poetry.lock + +# pdm +# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. +#pdm.lock +# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it +# in version control. +# https://pdm.fming.dev/#use-with-ide +.pdm.toml + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +# PyCharm +# JetBrains specific template is maintained in a separate JetBrains.gitignore that can +# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore +# and can be added to the global gitignore or merged into this file. For a more nuclear +# option (not recommended) you can uncomment the following to ignore the entire idea folder. +#.idea/ + +### Python Patch ### +# Poetry local configuration file - https://python-poetry.org/docs/configuration/#local-configuration +poetry.toml + +# ruff +.ruff_cache/ + +# LSP config files +pyrightconfig.json + +### VisualStudioCode ### +.vscode/* +!.vscode/settings.json +!.vscode/tasks.json +!.vscode/launch.json +!.vscode/extensions.json +!.vscode/*.code-snippets + +# Local History for Visual Studio Code +.history/ + +# Built Visual Studio Code Extensions +*.vsix + +### VisualStudioCode Patch ### +# Ignore all local history of files +.history +.ionide + +# End of https://www.toptal.com/developers/gitignore/api/python,visualstudiocode,pycharm diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..13566b8 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,8 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Editor-based HTTP Client requests +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/.idea/feat-multiple-darshan-files.iml b/.idea/feat-multiple-darshan-files.iml new file mode 100644 index 0000000..a319e2b --- /dev/null +++ b/.idea/feat-multiple-darshan-files.iml @@ -0,0 +1,25 @@ + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/.idea/inspectionProfiles/profiles_settings.xml b/.idea/inspectionProfiles/profiles_settings.xml new file mode 100644 index 0000000..105ce2d --- /dev/null +++ b/.idea/inspectionProfiles/profiles_settings.xml @@ -0,0 +1,6 @@ + + + + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000..da694c4 --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,7 @@ + + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..2d7ac4e --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/ruff.xml b/.idea/ruff.xml new file mode 100644 index 0000000..916a850 --- /dev/null +++ b/.idea/ruff.xml @@ -0,0 +1,6 @@ + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..35eb1dd --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/drishti/handlers/handle_darshan.py b/drishti/handlers/handle_darshan.py index d47fbea..c1eb2b2 100644 --- a/drishti/handlers/handle_darshan.py +++ b/drishti/handlers/handle_darshan.py @@ -1,18 +1,33 @@ #!/usr/bin/env python3 - +import abc +import csv +import dataclasses +import datetime import io -import sys -import time +import os import shlex import shutil import subprocess -import pandas as pd +import sys +import time +import typing +from typing import Dict +from dataclasses import dataclass, field +from typing import List, Optional + import darshan import darshan.backend.cffi_backend as darshanll - -from rich import print +import pandas as pd from packaging import version -from drishti.includes.module import * +from rich import print +from rich.padding import Padding + +import drishti.includes.config as config +import drishti.includes.module as module +from drishti.includes.module import HIGH, RECOMMENDATIONS, WARN, Panel, insights_total + +# from includes.module import * +from drishti.includes.parser import args def is_available(name): @@ -70,648 +85,1019 @@ def check_log_version(console, file, log_version, library_version): return use_file -def handler(): - console = init_console() - - insights_start_time = time.time() - - log = darshanll.log_open(args.log_path) - - modules = darshanll.log_get_modules(log) - - information = darshanll.log_get_job(log) - - if 'log_ver' in information: - log_version = information['log_ver'] - else: - log_version = information['metadata']['lib_ver'] - library_version = darshanll.get_lib_version() - - # Make sure log format is of the same version - filename = args.log_path - # check_log_version(console, args.log_path, log_version, library_version) - - darshanll.log_close(log) - - darshan.enable_experimental() - - report = darshan.DarshanReport(filename) - - job = report.metadata - - ######################################################################################################################################################################### - - # Check usage of STDIO, POSIX, and MPI-IO per file - - if 'STDIO' in report.records: - df_stdio = report.records['STDIO'].to_df() - - if df_stdio: - total_write_size_stdio = df_stdio['counters']['STDIO_BYTES_WRITTEN'].sum() - total_read_size_stdio = df_stdio['counters']['STDIO_BYTES_READ'].sum() - - total_size_stdio = total_write_size_stdio + total_read_size_stdio - else: - total_size_stdio = 0 - else: - df_stdio = None - - total_size_stdio = 0 - - if 'POSIX' in report.records: - df_posix = report.records['POSIX'].to_df() +@dataclass +class TimestampPair: + start: datetime.date + end: datetime.date + +@dataclass +class AbstractDarshanTrace(abc.ABC): + # Trace metadata + jobid: str + log_ver: str + time: TimestampPair + exe: str + + # Report + modules: typing.Iterable[str] + name_records: Dict[str, str] = field(default_factory=dict) + + max_read_offset: int = float('-inf') + max_write_offset: int = float('-inf') + ### + + total_files_stdio: int = 0 + total_files_posix: int = 0 + total_files_mpiio: int = 0 + + files: Dict[str, Dict[str, int]] = None + + total_reads: int = 0 + total_writes: int = 0 + total_operations: int = 0 + total_read_size: int = 0 + total_written_size: int = 0 + total_posix_size: int = 0 + total_reads_small: int = 0 + total_writes_small: int = 0 + + ### + total_write_size_stdio: int = 0 + total_write_size_stdio: int = 0 + total_size_stdio: int = 0 + + total_write_size_posix: int = 0 + total_read_size_posix: int = 0 + total_size_posix: int = 0 + + total_write_size_mpiio: int = 0 + total_read_size_mpiio: int = 0 + total_size_mpiio: int = 0 + + total_size: int = 0 + total_files: int = 0 + ### + + total_mem_not_aligned: int = 0 + total_file_not_aligned: int = 0 + + read_consecutive: int = 0 + read_sequential: int = 0 + read_random: int = 0 + write_consecutive: int = 0 + write_sequential: int = 0 + write_random: int = 0 + + total_shared_reads: int = 0 + total_shared_reads_small: int = 0 + total_shared_writes: int = 0 + total_shared_writes_small: int = 0 + + count_long_metadata: int = 0 + + posix_shared_data_imbalance_stragglers_count: int = 0 + + # 2 functions (unsure ones) + + has_hdf5_extension: bool = False # TODO: OR this + + mpiio_nb_reads: int = 0 + mpiio_nb_writes: int = 0 + + # TODO: Should be a list of CB nodes for agg + cb_nodes: Optional[int] = None + number_of_compute_nodes: int = 0 + hints: List[str] = dataclasses.field(default_factory=list) + + job_start: Optional[datetime.datetime] = None + job_end: Optional[datetime.datetime] = None + + aggregated: pd.DataFrame = None + + ## EXTRA from module being split + mpiio_coll_reads: int = 0 + mpiio_indep_reads: int = 0 + total_mpiio_read_operations: int = 0 + detected_files_mpi_coll_reads: pd.DataFrame = None + mpiio_coll_writes: int = 0 + mpiio_indep_writes: int = 0 + total_mpiio_write_operations: int = 0 + detected_files_mpiio_coll_writes: pd.DataFrame = None + imbalance_count_posix_shared_time: int = 0 + posix_shared_time_imbalance_detected_files1: pd.DataFrame = None + posix_shared_time_imbalance_detected_files2: pd.DataFrame = None + posix_shared_time_imbalance_detected_files3: pd.DataFrame = None + posix_total_size: int = 0 + posix_total_read_size: int = 0 + posix_total_written_size: int = 0 + + def generate_dxt_posix_rw_df(self) -> None: + if not args.backtrace: + return + if not self.dxt_posix: + return + if "address_line_mapping" not in self.dxt_posix: + args.backtrace = False + return + + read_id = [] + read_rank = [] + read_length = [] + read_offsets = [] + read_end_time = [] + read_start_time = [] + read_operation = [] + + write_id = [] + write_rank = [] + write_length = [] + write_offsets = [] + write_end_time = [] + write_start_time = [] + write_operation = [] + + for r in zip(self.dxt_posix['rank'], self.dxt_posix['read_segments'], self.dxt_posix['write_segments'], + self.dxt_posix['id']): + if not r[1].empty: + read_id.append([r[3]] * len((r[1]["length"].to_list()))) + read_rank.append([r[0]] * len((r[1]["length"].to_list()))) + read_length.append(r[1]["length"].to_list()) + read_end_time.append(r[1]["end_time"].to_list()) + read_start_time.append(r[1]["start_time"].to_list()) + read_operation.append(["read"] * len((r[1]["length"].to_list()))) + read_offsets.append(r[1]["offset"].to_list()) + + if not r[2].empty: + write_id.append([r[3]] * len((r[2]['length'].to_list()))) + write_rank.append([r[0]] * len((r[2]['length'].to_list()))) + write_length.append(r[2]['length'].to_list()) + write_end_time.append(r[2]['end_time'].to_list()) + write_start_time.append(r[2]['start_time'].to_list()) + write_operation.append(['write'] * len((r[2]['length'].to_list()))) + write_offsets.append(r[2]['offset'].to_list()) + + read_id = [element for nestedlist in read_id for element in nestedlist] + read_rank = [element for nestedlist in read_rank for element in nestedlist] + read_length = [element for nestedlist in read_length for element in nestedlist] + read_offsets = [element for nestedlist in read_offsets for element in nestedlist] + read_end_time = [element for nestedlist in read_end_time for element in nestedlist] + read_operation = [element for nestedlist in read_operation for element in nestedlist] + read_start_time = [element for nestedlist in read_start_time for element in nestedlist] + + write_id = [element for nestedlist in write_id for element in nestedlist] + write_rank = [element for nestedlist in write_rank for element in nestedlist] + write_length = [element for nestedlist in write_length for element in nestedlist] + write_offsets = [element for nestedlist in write_offsets for element in nestedlist] + write_end_time = [element for nestedlist in write_end_time for element in nestedlist] + write_operation = [element for nestedlist in write_operation for element in nestedlist] + write_start_time = [element for nestedlist in write_start_time for element in nestedlist] + + self.dxt_posix_read_data = pd.DataFrame( + { + "id": read_id, + "rank": read_rank, + "length": read_length, + "end_time": read_end_time, + "start_time": read_start_time, + "operation": read_operation, + "offsets": read_offsets, + } + ) - if df_posix: - total_write_size_posix = df_posix['counters']['POSIX_BYTES_WRITTEN'].sum() - total_read_size_posix = df_posix['counters']['POSIX_BYTES_READ'].sum() + self.dxt_posix_write_data = pd.DataFrame( + { + "id": write_id, + "rank": write_rank, + "length": write_length, + "end_time": write_end_time, + "start_time": write_start_time, + "operation": write_operation, + "offsets": write_offsets, + } + ) - total_size_posix = total_write_size_posix + total_read_size_posix - else: - total_size_posix = 0 - else: - df_posix = None + def calculate_insights(self) -> None: + self.total_write_size_stdio = self.stdio_df['counters']['STDIO_BYTES_WRITTEN'].sum() if self.stdio_df else 0 + self.total_read_size_stdio = self.stdio_df['counters']['STDIO_BYTES_READ'].sum() if self.stdio_df else 0 + self.total_size_stdio = self.total_write_size_stdio + self.total_read_size_stdio - total_size_posix = 0 + self.total_write_size_posix = self.posix_df['counters']['POSIX_BYTES_WRITTEN'].sum() if self.posix_df else 0 + self.total_read_size_posix = self.posix_df['counters']['POSIX_BYTES_READ'].sum() if self.posix_df else 0 + self.total_size_posix = self.total_write_size_posix + self.total_read_size_posix - if 'MPI-IO' in report.records: - df_mpiio = report.records['MPI-IO'].to_df() + self.total_write_size_mpiio = self.mpiio_df['counters']['MPIIO_BYTES_WRITTEN'].sum() if self.mpiio_df else 0 + self.total_read_size_mpiio = self.mpiio_df['counters']['MPIIO_BYTES_READ'].sum() if self.mpiio_df else 0 + self.total_size_mpiio = self.total_write_size_mpiio + self.total_read_size_mpiio - if df_mpiio: - total_write_size_mpiio = df_mpiio['counters']['MPIIO_BYTES_WRITTEN'].sum() - total_read_size_mpiio = df_mpiio['counters']['MPIIO_BYTES_READ'].sum() + # POSIX will capture POSIX-only and MPI-IO + if self.total_size_posix > 0 and self.total_size_posix >= self.total_size_mpiio: + self.total_size_posix -= self.total_size_mpiio - total_size_mpiio = total_write_size_mpiio + total_read_size_mpiio - else: - total_size_mpiio = 0 - else: - df_mpiio = None - - total_size_mpiio = 0 - - dxt_posix = None - dxt_posix_read_data = None - dxt_posix_write_data = None - dxt_mpiio = None - - df_lustre = None - if "LUSTRE" in report.records: - df_lustre = report.records['LUSTRE'].to_df() - - if args.backtrace: - if "DXT_POSIX" in report.records: - dxt_posix = report.records["DXT_POSIX"].to_df() - dxt_posix = pd.DataFrame(dxt_posix) - if "address_line_mapping" not in dxt_posix: - args.backtrace = False - else: - read_id = [] - read_rank = [] - read_length = [] - read_offsets = [] - read_end_time = [] - read_start_time = [] - read_operation = [] - - write_id = [] - write_rank = [] - write_length = [] - write_offsets = [] - write_end_time = [] - write_start_time = [] - write_operation = [] - - for r in zip(dxt_posix['rank'], dxt_posix['read_segments'], dxt_posix['write_segments'], dxt_posix['id']): - if not r[1].empty: - read_id.append([r[3]] * len((r[1]['length'].to_list()))) - read_rank.append([r[0]] * len((r[1]['length'].to_list()))) - read_length.append(r[1]['length'].to_list()) - read_end_time.append(r[1]['end_time'].to_list()) - read_start_time.append(r[1]['start_time'].to_list()) - read_operation.append(['read'] * len((r[1]['length'].to_list()))) - read_offsets.append(r[1]['offset'].to_list()) - - if not r[2].empty: - write_id.append([r[3]] * len((r[2]['length'].to_list()))) - write_rank.append([r[0]] * len((r[2]['length'].to_list()))) - write_length.append(r[2]['length'].to_list()) - write_end_time.append(r[2]['end_time'].to_list()) - write_start_time.append(r[2]['start_time'].to_list()) - write_operation.append(['write'] * len((r[2]['length'].to_list()))) - write_offsets.append(r[2]['offset'].to_list()) - - read_id = [element for nestedlist in read_id for element in nestedlist] - read_rank = [element for nestedlist in read_rank for element in nestedlist] - read_length = [element for nestedlist in read_length for element in nestedlist] - read_offsets = [element for nestedlist in read_offsets for element in nestedlist] - read_end_time = [element for nestedlist in read_end_time for element in nestedlist] - read_operation = [element for nestedlist in read_operation for element in nestedlist] - read_start_time = [element for nestedlist in read_start_time for element in nestedlist] - - write_id = [element for nestedlist in write_id for element in nestedlist] - write_rank = [element for nestedlist in write_rank for element in nestedlist] - write_length = [element for nestedlist in write_length for element in nestedlist] - write_offsets = [element for nestedlist in write_offsets for element in nestedlist] - write_end_time = [element for nestedlist in write_end_time for element in nestedlist] - write_operation = [element for nestedlist in write_operation for element in nestedlist] - write_start_time = [element for nestedlist in write_start_time for element in nestedlist] - - dxt_posix_read_data = pd.DataFrame( - { - 'id': read_id, - 'rank': read_rank, - 'length': read_length, - 'end_time': read_end_time, - 'start_time': read_start_time, - 'operation': read_operation, - 'offsets': read_offsets, - }) - - dxt_posix_write_data = pd.DataFrame( - { - 'id': write_id, - 'rank': write_rank, - 'length': write_length, - 'end_time': write_end_time, - 'start_time': write_start_time, - 'operation': write_operation, - 'offsets': write_offsets, - }) - - if "DXT_MPIIO" in report.records: - dxt_mpiio = report.records["DXT_MPIIO"].to_df() - dxt_mpiio = pd.DataFrame(dxt_mpiio) - - - # Since POSIX will capture both POSIX-only accesses and those comming from MPI-IO, we can subtract those - if total_size_posix > 0 and total_size_posix >= total_size_mpiio: - total_size_posix -= total_size_mpiio - - total_size = total_size_stdio + total_size_posix + total_size_mpiio - - assert(total_size_stdio >= 0) - assert(total_size_posix >= 0) - assert(total_size_mpiio >= 0) - - files = {} - - # Check interface usage for each file - file_map = report.name_records - - total_files = len(file_map) - - total_files_stdio = 0 - total_files_posix = 0 - total_files_mpiio = 0 - - for id, path in file_map.items(): - if df_stdio: - uses_stdio = len(df_stdio['counters'][(df_stdio['counters']['id'] == id)]) > 0 - else: - uses_stdio = 0 - - if df_posix: - uses_posix = len(df_posix['counters'][(df_posix['counters']['id'] == id)]) > 0 - else: - uses_posix = 0 + self.total_posix_size = self.total_size_stdio + self.total_size_posix + self.total_size_mpiio - if df_mpiio: - uses_mpiio = len(df_mpiio['counters'][(df_mpiio['counters']['id'] == id)]) > 0 - else: - uses_mpiio = 0 + assert (self.total_size_stdio >= 0) + assert (self.total_size_posix >= 0) + assert (self.total_size_mpiio >= 0) - total_files_stdio += uses_stdio - total_files_posix += uses_posix - total_files_mpiio += uses_mpiio + def files_stuff(self) -> None: + self.report.name_records = self.report.name_records - files[id] = { - 'path': path, - 'stdio': uses_stdio, - 'posix': uses_posix, - 'mpiio': uses_mpiio - } + self.total_files = len(self.report.name_records) - check_stdio(total_size, total_size_stdio) - check_mpiio(modules) + # files = dict() - ######################################################################################################################################################################### + for id, path in self.report.name_records.items(): + uses_stdio = len( + self.stdio_df['counters'][self.stdio_df['counters']['id'] == id]) > 0 if self.stdio_df else 0 + uses_posix = len( + self.posix_df['counters'][self.posix_df['counters']['id'] == id]) > 0 if self.posix_df else 0 + uses_mpiio = len( + self.mpiio_df['counters'][self.mpiio_df['counters']['id'] == id]) > 0 if self.mpiio_df else 0 - if 'POSIX' in report.records: - df = report.records['POSIX'].to_df() + self.total_files_stdio += uses_stdio + self.total_files_posix += uses_posix + self.total_files_mpiio += uses_mpiio - ######################################################################################################################################################################### + self.files[id] = { + 'path': path, + 'stdio': uses_stdio, + 'posix': uses_posix, + 'mpiio': uses_mpiio + } - # Get number of write/read operations - total_reads = df['counters']['POSIX_READS'].sum() - total_writes = df['counters']['POSIX_WRITES'].sum() - # Get total number of I/O operations - total_operations = total_writes + total_reads + def generate_insights(self): + # TODO: Check if module exists. Replicate from each function which calculates insights. + self._check_stdio() + self._check_mpiio() + self._do_something() + self._small_operation_insight() - # To check whether the application is write-intersive or read-intensive we only look at the POSIX level and check if the difference between reads and writes is larger than 10% (for more or less), otherwise we assume a balance - check_operation_intensive(total_operations, total_reads, total_writes) - total_read_size = df['counters']['POSIX_BYTES_READ'].sum() - total_written_size = df['counters']['POSIX_BYTES_WRITTEN'].sum() - total_size = total_written_size + total_read_size + def _check_stdio(self) -> None: + module.check_stdio(self.total_posix_size, self.total_size_stdio) - check_size_intensive(total_size, total_read_size, total_written_size) + def _check_mpiio(self) -> None: + module.check_mpiio(self.modules) - ######################################################################################################################################################################### + def _do_something(self): + module.check_operation_intensive(self.total_operations, self.total_reads, self.total_writes) + module.check_size_intensive(self.posix_total_size, self.posix_total_read_size, self.posix_total_written_size) - # Get the number of small I/O operations (less than 1 MB) - total_reads_small = ( - df['counters']['POSIX_SIZE_READ_0_100'].sum() + - df['counters']['POSIX_SIZE_READ_100_1K'].sum() + - df['counters']['POSIX_SIZE_READ_1K_10K'].sum() + - df['counters']['POSIX_SIZE_READ_10K_100K'].sum() + - df['counters']['POSIX_SIZE_READ_100K_1M'].sum() + # TODO: for trace in traces + for trace in self.traces: + pass + # module.check_misaligned(self.total_operations, self.total_mem_not_aligned, self.total_file_not_aligned, + # self.modules, self.name_records, self.lustre_df, self.dxt_posix, + # self.dxt_posix_read_data) # posix alignment + + # module.check_traffic(self.max_read_offset, self.total_read_size, self.max_write_offset, self.total_written_size, + # self.dxt_posix, self.dxt_posix_read_data, self.dxt_posix_write_data) # redundant reads + + # module.check_random_operation(self.read_consecutive, self.read_sequential, self.read_random, self.total_reads, + # self.write_consecutive, self.write_sequential, self.write_random, + # self.total_writes, self.dxt_posix, + # self.dxt_posix_read_data, self.dxt_posix_write_data) # random check + + # module.check_shared_small_operation(self.total_shared_reads, self.total_shared_reads_small, + # self.total_shared_writes, + # self.total_shared_writes_small, self.shared_files, self.report.name_records) + + module.check_long_metadata(self.count_long_metadata, self.modules) + + # module.check_shared_data_imblance(self.posix_shared_data_imbalance_stragglers_count, + # self.posix_data_straggler_files, + # self.report.name_records, self.dxt_posix, + # self.dxt_posix_read_data, + # self.dxt_posix_write_data) + + # module.check_shared_time_imbalance(self.posix_stragglers_shared_file_time_imbalance_count, + # self.posix_shared_time_imbalance_detected_files1, self.report.name_records) + + # module.check_individual_write_imbalance(self.posix_data_imbalance_count, + # self.posix_shared_time_imbalance_detected_files2, + # self.report.name_records, self.dxt_posix, self.dxt_posix_write_data) + + # module.check_mpi_collective_read_operation(self.mpiio_coll_reads, self.mpiio_indep_reads, + # self.total_mpiio_read_operations, + # self.detected_files_mpi_coll_reads, self.report.name_records, + # self.dxt_mpiio) + + # module.check_mpi_collective_write_operation(self.mpiio_coll_writes, self.mpiio_indep_writes, + # self.total_mpiio_write_operations, + # self.detected_files_mpiio_coll_writes, self.report.name_records, self.dxt_mpiio) + # + # module.check_individual_read_imbalance(self.imbalance_count_posix_shared_time, + # self.posix_shared_time_imbalance_detected_files3, + # self.report.name_records, self.dxt_posix, self.dxt_posix_read_data) + + module.check_mpi_none_block_operation(self.mpiio_nb_reads, self.mpiio_nb_writes, self.has_hdf5_extension, + self.modules) + + + + def _small_operation_insight(self): + pass + # module.check_small_operation(self.total_reads, self.total_reads_small, self.total_writes, + # self.total_writes_small, + # self.small_operation_detected_files, + # self.modules, self.report.name_records, self.dxt_posix, self.dxt_posix_read_data, + # self.dxt_posix_write_data) + + + + def something(self) -> None: + if not self.posix_df: + return + + self.total_reads = self.posix_df['counters']['POSIX_READS'].sum() + self.total_writes = self.posix_df['counters']['POSIX_WRITES'].sum() + self.total_operations = self.total_writes + self.total_reads + # ---------------------------------------------------------------------------------------------------------------------- + # module.check_operation_intensive(self.total_operations, self.total_reads, self.total_writes) + + self.posix_total_read_size = self.posix_df['counters']['POSIX_BYTES_READ'].sum() + self.posix_total_written_size = self.posix_df['counters']['POSIX_BYTES_WRITTEN'].sum() + self.posix_total_size = self.posix_total_written_size + self.posix_total_read_size + + # module.check_size_intensive(self.posix_total_size, self.posix_total_read_size, self.posix_total_written_size) + # ----- + self.total_reads_small = ( + self.posix_df['counters']['POSIX_SIZE_READ_0_100'].sum() + + self.posix_df['counters']['POSIX_SIZE_READ_100_1K'].sum() + + self.posix_df['counters']['POSIX_SIZE_READ_1K_10K'].sum() + + self.posix_df['counters']['POSIX_SIZE_READ_10K_100K'].sum() + + self.posix_df['counters']['POSIX_SIZE_READ_100K_1M'].sum() ) - - total_writes_small = ( - df['counters']['POSIX_SIZE_WRITE_0_100'].sum() + - df['counters']['POSIX_SIZE_WRITE_100_1K'].sum() + - df['counters']['POSIX_SIZE_WRITE_1K_10K'].sum() + - df['counters']['POSIX_SIZE_WRITE_10K_100K'].sum() + - df['counters']['POSIX_SIZE_WRITE_100K_1M'].sum() + self.total_writes_small = ( + self.posix_df['counters']['POSIX_SIZE_WRITE_0_100'].sum() + + self.posix_df['counters']['POSIX_SIZE_WRITE_100_1K'].sum() + + self.posix_df['counters']['POSIX_SIZE_WRITE_1K_10K'].sum() + + self.posix_df['counters']['POSIX_SIZE_WRITE_10K_100K'].sum() + + self.posix_df['counters']['POSIX_SIZE_WRITE_100K_1M'].sum() ) - # Get the files responsible for more than half of these accesses + def small_operation_calculation(self): + if not self.posix_df: + return + files = [] - df['counters']['INSIGHTS_POSIX_SMALL_READ'] = ( - df['counters']['POSIX_SIZE_READ_0_100'] + - df['counters']['POSIX_SIZE_READ_100_1K'] + - df['counters']['POSIX_SIZE_READ_1K_10K'] + - df['counters']['POSIX_SIZE_READ_10K_100K'] + - df['counters']['POSIX_SIZE_READ_100K_1M'] + self.posix_df['counters']['INSIGHTS_POSIX_SMALL_READ'] = ( + self.posix_df['counters']['POSIX_SIZE_READ_0_100'] + + self.posix_df['counters']['POSIX_SIZE_READ_100_1K'] + + self.posix_df['counters']['POSIX_SIZE_READ_1K_10K'] + + self.posix_df['counters']['POSIX_SIZE_READ_10K_100K'] + + self.posix_df['counters']['POSIX_SIZE_READ_100K_1M'] ) - df['counters']['INSIGHTS_POSIX_SMALL_WRITE'] = ( - df['counters']['POSIX_SIZE_WRITE_0_100'] + - df['counters']['POSIX_SIZE_WRITE_100_1K'] + - df['counters']['POSIX_SIZE_WRITE_1K_10K'] + - df['counters']['POSIX_SIZE_WRITE_10K_100K'] + - df['counters']['POSIX_SIZE_WRITE_100K_1M'] + self.posix_df['counters']['INSIGHTS_POSIX_SMALL_WRITE'] = ( + self.posix_df['counters']['POSIX_SIZE_WRITE_0_100'] + + self.posix_df['counters']['POSIX_SIZE_WRITE_100_1K'] + + self.posix_df['counters']['POSIX_SIZE_WRITE_1K_10K'] + + self.posix_df['counters']['POSIX_SIZE_WRITE_10K_100K'] + + self.posix_df['counters']['POSIX_SIZE_WRITE_100K_1M'] ) - detected_files = pd.DataFrame(df['counters'].groupby('id')[['INSIGHTS_POSIX_SMALL_READ', 'INSIGHTS_POSIX_SMALL_WRITE']].sum()).reset_index() - detected_files.columns = ['id', 'total_reads', 'total_writes'] - detected_files.loc[:, 'id'] = detected_files.loc[:, 'id'].astype(str) - - check_small_operation(total_reads, total_reads_small, total_writes, total_writes_small, detected_files, modules, file_map, dxt_posix, dxt_posix_read_data, dxt_posix_write_data) - - ######################################################################################################################################################################### - - # How many requests are misaligned? - - total_mem_not_aligned = df['counters']['POSIX_MEM_NOT_ALIGNED'].sum() - total_file_not_aligned = df['counters']['POSIX_FILE_NOT_ALIGNED'].sum() + self.small_operation_detected_files = pd.DataFrame(self.posix_df['counters'].groupby('id')[['INSIGHTS_POSIX_SMALL_READ', + 'INSIGHTS_POSIX_SMALL_WRITE']].sum()).reset_index() + self.small_operation_detected_files.columns = ['id', 'total_reads', + 'total_writes'] # !: Rename later. total_small_reads, total_small_writes + self.small_operation_detected_files.loc[:, 'id'] = self.small_operation_detected_files.loc[:, 'id'].astype(str) - check_misaligned(total_operations, total_mem_not_aligned, total_file_not_aligned, modules, file_map, df_lustre, dxt_posix, dxt_posix_read_data) + self.report.name_records = self.report.name_records + # module.check_small_operation(self.total_reads, self.total_reads_small, self.total_writes, + # self.total_writes_small, + # self.small_operation_detected_files, + # self.modules, self.report.name_records, self.dxt_posix, self.dxt_posix_read_data, + # self.dxt_posix_write_data) - ######################################################################################################################################################################### + def posix_alignment(self): + if not self.posix_df: + return - # Redundant read-traffic (based on Phill) - # POSIX_MAX_BYTE_READ (Highest offset in the file that was read) - max_read_offset = df['counters']['POSIX_MAX_BYTE_READ'].max() - max_write_offset = df['counters']['POSIX_MAX_BYTE_WRITTEN'].max() + self.total_mem_not_aligned = self.posix_df['counters']['POSIX_MEM_NOT_ALIGNED'].sum() + self.total_file_not_aligned = self.posix_df['counters']['POSIX_FILE_NOT_ALIGNED'].sum() - check_traffic(max_read_offset, total_read_size, max_write_offset, total_written_size, dxt_posix, dxt_posix_read_data, dxt_posix_write_data) + self.report.name_records = self.report.name_records + # module.check_misaligned(self.total_operations, self.total_mem_not_aligned, self.total_file_not_aligned, + # self.modules, self.report.name_records, self.lustre_df, self.dxt_posix, + # self.dxt_posix_read_data) - ######################################################################################################################################################################### + def posix_redundant_reads(self): + if not self.posix_df: + return - # Check for a lot of random operations + self.max_read_offset = self.posix_df['counters']['POSIX_MAX_BYTE_READ'].max() + self.max_write_offset = self.posix_df['counters']['POSIX_MAX_BYTE_WRITTEN'].max() - read_consecutive = df['counters']['POSIX_CONSEC_READS'].sum() - #print('READ Consecutive: {} ({:.2f}%)'.format(read_consecutive, read_consecutive / total_reads * 100)) + # module.check_traffic(self.max_read_offset, self.total_read_size, self.max_write_offset, self.total_written_size, + # self.dxt_posix, self.dxt_posix_read_data, self.dxt_posix_write_data) - read_sequential = df['counters']['POSIX_SEQ_READS'].sum() - read_sequential -= read_consecutive - #print('READ Sequential: {} ({:.2f}%)'.format(read_sequential, read_sequential / total_reads * 100)) + def posix_random_check(self): + if not self.posix_df: + return - read_random = total_reads - read_consecutive - read_sequential - #print('READ Random: {} ({:.2f}%)'.format(read_random, read_random / total_reads * 100)) + self.read_consecutive = self.posix_df['counters']['POSIX_CONSEC_READS'].sum() + self.read_sequential = self.posix_df['counters']['POSIX_SEQ_READS'].sum() + self.read_sequential -= self.read_consecutive - write_consecutive = df['counters']['POSIX_CONSEC_WRITES'].sum() + self.read_random = self.total_reads - self.read_consecutive - self.read_sequential - write_sequential = df['counters']['POSIX_SEQ_WRITES'].sum() - write_sequential -= write_consecutive + self.write_consecutive = self.posix_df['counters']['POSIX_CONSEC_WRITES'].sum() - write_random = total_writes - write_consecutive - write_sequential - #print('WRITE Random: {} ({:.2f}%)'.format(write_random, write_random / total_writes * 100)) + self.write_sequential = self.posix_df['counters']['POSIX_SEQ_WRITES'].sum() + self.write_sequential -= self.write_consecutive - check_random_operation(read_consecutive, read_sequential, read_random, total_reads, write_consecutive, write_sequential, write_random, total_writes, dxt_posix, dxt_posix_read_data, dxt_posix_write_data) + self.write_random = self.total_writes - self.write_consecutive - self.write_sequential - ######################################################################################################################################################################### + # module.check_random_operation(self.read_consecutive, self.read_sequential, self.read_random, self.total_reads, + # self.write_consecutive, self.write_sequential, self.write_random, + # self.total_writes, self.dxt_posix, + # self.dxt_posix_read_data, self.dxt_posix_write_data) - # Shared file with small operations + def posix_shared_file(self): + if not self.posix_df: + return - shared_files = df['counters'].loc[(df['counters']['rank'] == -1)] + self.shared_files = self.posix_df['counters'].loc[(self.posix_df['counters']['rank'] == -1)] - shared_files = shared_files.assign(id=lambda d: d['id'].astype(str)) + self.shared_files = self.shared_files.assign(id=lambda d: d['id'].astype(str)) - if not shared_files.empty: - total_shared_reads = shared_files['POSIX_READS'].sum() - total_shared_reads_small = ( - shared_files['POSIX_SIZE_READ_0_100'].sum() + - shared_files['POSIX_SIZE_READ_100_1K'].sum() + - shared_files['POSIX_SIZE_READ_1K_10K'].sum() + - shared_files['POSIX_SIZE_READ_10K_100K'].sum() + - shared_files['POSIX_SIZE_READ_100K_1M'].sum() + if not self.shared_files.empty: + self.total_shared_reads = self.shared_files['POSIX_READS'].sum() + self.total_shared_reads_small = ( + self.shared_files['POSIX_SIZE_READ_0_100'].sum() + + self.shared_files['POSIX_SIZE_READ_100_1K'].sum() + + self.shared_files['POSIX_SIZE_READ_1K_10K'].sum() + + self.shared_files['POSIX_SIZE_READ_10K_100K'].sum() + + self.shared_files['POSIX_SIZE_READ_100K_1M'].sum() ) - shared_files['INSIGHTS_POSIX_SMALL_READS'] = ( - shared_files['POSIX_SIZE_READ_0_100'] + - shared_files['POSIX_SIZE_READ_100_1K'] + - shared_files['POSIX_SIZE_READ_1K_10K'] + - shared_files['POSIX_SIZE_READ_10K_100K'] + - shared_files['POSIX_SIZE_READ_100K_1M'] + self.shared_files['INSIGHTS_POSIX_SMALL_READS'] = ( + self.shared_files['POSIX_SIZE_READ_0_100'] + + self.shared_files['POSIX_SIZE_READ_100_1K'] + + self.shared_files['POSIX_SIZE_READ_1K_10K'] + + self.shared_files['POSIX_SIZE_READ_10K_100K'] + + self.shared_files['POSIX_SIZE_READ_100K_1M'] ) - - total_shared_writes = shared_files['POSIX_WRITES'].sum() - total_shared_writes_small = ( - shared_files['POSIX_SIZE_WRITE_0_100'].sum() + - shared_files['POSIX_SIZE_WRITE_100_1K'].sum() + - shared_files['POSIX_SIZE_WRITE_1K_10K'].sum() + - shared_files['POSIX_SIZE_WRITE_10K_100K'].sum() + - shared_files['POSIX_SIZE_WRITE_100K_1M'].sum() + self.total_shared_writes = self.shared_files['POSIX_WRITES'].sum() + self.total_shared_writes_small = ( + self.shared_files['POSIX_SIZE_WRITE_0_100'].sum() + + self.shared_files['POSIX_SIZE_WRITE_100_1K'].sum() + + self.shared_files['POSIX_SIZE_WRITE_1K_10K'].sum() + + self.shared_files['POSIX_SIZE_WRITE_10K_100K'].sum() + + self.shared_files['POSIX_SIZE_WRITE_100K_1M'].sum() ) - shared_files['INSIGHTS_POSIX_SMALL_WRITES'] = ( - shared_files['POSIX_SIZE_WRITE_0_100'] + - shared_files['POSIX_SIZE_WRITE_100_1K'] + - shared_files['POSIX_SIZE_WRITE_1K_10K'] + - shared_files['POSIX_SIZE_WRITE_10K_100K'] + - shared_files['POSIX_SIZE_WRITE_100K_1M'] + self.shared_files['INSIGHTS_POSIX_SMALL_WRITES'] = ( + self.shared_files['POSIX_SIZE_WRITE_0_100'] + + self.shared_files['POSIX_SIZE_WRITE_100_1K'] + + self.shared_files['POSIX_SIZE_WRITE_1K_10K'] + + self.shared_files['POSIX_SIZE_WRITE_10K_100K'] + + self.shared_files['POSIX_SIZE_WRITE_100K_1M'] ) - check_shared_small_operation(total_shared_reads, total_shared_reads_small, total_shared_writes, total_shared_writes_small, shared_files, file_map) + self.report.name_records = self.report.name_records + # module.check_shared_small_operation(self.total_shared_reads, self.total_shared_reads_small, + # self.total_shared_writes, + # self.total_shared_writes_small, self.shared_files, self.report.name_records) - ######################################################################################################################################################################### + def posix_long_metadata(self): + if not self.posix_df: + return - count_long_metadata = len(df['fcounters'][(df['fcounters']['POSIX_F_META_TIME'] > thresholds['metadata_time_rank'][0])]) + self.count_long_metadata = len( + self.posix_df['fcounters'][ + (self.posix_df['fcounters']['POSIX_F_META_TIME'] > config.thresholds['metadata_time_rank'][0])]) - check_long_metadata(count_long_metadata, modules) + # module.check_long_metadata(self.count_long_metadata, self.modules) + def posix_stragglers(self): + if not self.posix_df: + return # We already have a single line for each shared-file access - # To check for stragglers, we can check the difference between the + # To check for stragglers, we can check the difference between the # POSIX_FASTEST_RANK_BYTES # POSIX_SLOWEST_RANK_BYTES # POSIX_F_VARIANCE_RANK_BYTES - stragglers_count = 0 - - shared_files = shared_files.assign(id=lambda d: d['id'].astype(str)) + self.shared_files = self.shared_files.assign(id=lambda d: d['id'].astype(str)) - # Get the files responsible - detected_files = [] + self.posix_data_straggler_files = [] - for index, row in shared_files.iterrows(): + for index, row in self.shared_files.iterrows(): total_transfer_size = row['POSIX_BYTES_WRITTEN'] + row['POSIX_BYTES_READ'] - if total_transfer_size and abs(row['POSIX_SLOWEST_RANK_BYTES'] - row['POSIX_FASTEST_RANK_BYTES']) / total_transfer_size > thresholds['imbalance_stragglers'][0]: - stragglers_count += 1 + if total_transfer_size and abs( + row['POSIX_SLOWEST_RANK_BYTES'] - row['POSIX_FASTEST_RANK_BYTES']) / total_transfer_size > \ + module.thresholds['imbalance_stragglers'][0]: + self.posix_shared_data_imbalance_stragglers_count += 1 - detected_files.append([ - row['id'], abs(row['POSIX_SLOWEST_RANK_BYTES'] - row['POSIX_FASTEST_RANK_BYTES']) / total_transfer_size * 100 + self.posix_data_straggler_files.append([ + row['id'], + abs(row['POSIX_SLOWEST_RANK_BYTES'] - row['POSIX_FASTEST_RANK_BYTES']) / total_transfer_size * 100 ]) column_names = ['id', 'data_imbalance'] - detected_files = pd.DataFrame(detected_files, columns=column_names) - check_shared_data_imblance(stragglers_count, detected_files, file_map, dxt_posix, dxt_posix_read_data, dxt_posix_write_data) + self.posix_data_straggler_files = pd.DataFrame(self.posix_data_straggler_files, columns=column_names) + + self.report.name_records = self.report.name_records + # module.check_shared_data_imblance(self.posix_shared_data_imbalance_stragglers_count, self.posix_data_straggler_files, + # self.report.name_records, self.dxt_posix, + # self.dxt_posix_read_data, + # self.dxt_posix_write_data) # POSIX_F_FASTEST_RANK_TIME # POSIX_F_SLOWEST_RANK_TIME # POSIX_F_VARIANCE_RANK_TIME - shared_files_times = df['fcounters'].loc[(df['fcounters']['rank'] == -1)] + ################################################################################################################# + + def posix_stragglers2(self): + if not self.posix_df: + return # Get the files responsible - detected_files = [] + shared_files_times = self.posix_df['fcounters'].loc[(self.posix_df['fcounters']['rank'] == -1)] + + self.posix_shared_time_imbalance_detected_files1 = [] - stragglers_count = 0 - stragglers_imbalance = {} + self.posix_stragglers_shared_file_time_imbalance_count = 0 + # posix_stragglers_shared_file_time_imbalance = {} # UNUSED? shared_files_times = shared_files_times.assign(id=lambda d: d['id'].astype(str)) for index, row in shared_files_times.iterrows(): total_transfer_time = row['POSIX_F_WRITE_TIME'] + row['POSIX_F_READ_TIME'] + row['POSIX_F_META_TIME'] - if total_transfer_time and abs(row['POSIX_F_SLOWEST_RANK_TIME'] - row['POSIX_F_FASTEST_RANK_TIME']) / total_transfer_time > thresholds['imbalance_stragglers'][0]: - stragglers_count += 1 + if total_transfer_time and abs( + row['POSIX_F_SLOWEST_RANK_TIME'] - row['POSIX_F_FASTEST_RANK_TIME']) / total_transfer_time > \ + config.thresholds['imbalance_stragglers'][0]: + self.posix_stragglers_shared_file_time_imbalance_count += 1 - detected_files.append([ - row['id'], abs(row['POSIX_F_SLOWEST_RANK_TIME'] - row['POSIX_F_FASTEST_RANK_TIME']) / total_transfer_time * 100 + self.posix_shared_time_imbalance_detected_files1.append([ + row['id'], + abs(row['POSIX_F_SLOWEST_RANK_TIME'] - row['POSIX_F_FASTEST_RANK_TIME']) / total_transfer_time * 100 ]) column_names = ['id', 'time_imbalance'] - detected_files = pd.DataFrame(detected_files, columns=column_names) - check_shared_time_imbalance(stragglers_count, detected_files, file_map) + self.posix_shared_time_imbalance_detected_files1 = pd.DataFrame(self.posix_shared_time_imbalance_detected_files1, + columns=column_names) + # module.check_shared_time_imbalance(self.posix_stragglers_shared_file_time_imbalance_count, + # self.posix_shared_time_imbalance_detected_files1, self.report.name_records) + + def posix_imbalance(self): + if not self.posix_df: + return - aggregated = df['counters'].loc[(df['counters']['rank'] != -1)][ + aggregated = self.posix_df['counters'].loc[(self.posix_df['counters']['rank'] != -1)][ ['rank', 'id', 'POSIX_BYTES_WRITTEN', 'POSIX_BYTES_READ'] ].groupby('id', as_index=False).agg({ 'rank': 'nunique', 'POSIX_BYTES_WRITTEN': ['sum', 'min', 'max'], 'POSIX_BYTES_READ': ['sum', 'min', 'max'] }) - aggregated.columns = list(map('_'.join, aggregated.columns.values)) - aggregated = aggregated.assign(id=lambda d: d['id_'].astype(str)) + self.aggregated = aggregated # Get the files responsible - imbalance_count = 0 + self.posix_data_imbalance_count = 0 - detected_files = [] + self.posix_shared_time_imbalance_detected_files2 = [] - for index, row in aggregated.iterrows(): - if row['POSIX_BYTES_WRITTEN_max'] and abs(row['POSIX_BYTES_WRITTEN_max'] - row['POSIX_BYTES_WRITTEN_min']) / row['POSIX_BYTES_WRITTEN_max'] > thresholds['imbalance_size'][0]: - imbalance_count += 1 + for index, row in self.aggregated.iterrows(): + if row['POSIX_BYTES_WRITTEN_max'] and abs(row['POSIX_BYTES_WRITTEN_max'] - row['POSIX_BYTES_WRITTEN_min']) / \ + row['POSIX_BYTES_WRITTEN_max'] > config.thresholds['imbalance_size'][0]: + self.posix_data_imbalance_count += 1 - detected_files.append([ - row['id'], abs(row['POSIX_BYTES_WRITTEN_max'] - row['POSIX_BYTES_WRITTEN_min']) / row['POSIX_BYTES_WRITTEN_max'] * 100 + self.posix_shared_time_imbalance_detected_files2.append([ + row['id'], abs(row['POSIX_BYTES_WRITTEN_max'] - row['POSIX_BYTES_WRITTEN_min']) / row[ + 'POSIX_BYTES_WRITTEN_max'] * 100 ]) column_names = ['id', 'write_imbalance'] - detected_files = pd.DataFrame(detected_files, columns=column_names) - check_individual_write_imbalance(imbalance_count, detected_files, file_map, dxt_posix, dxt_posix_write_data) + self.posix_shared_time_imbalance_detected_files2 = pd.DataFrame(self.posix_shared_time_imbalance_detected_files2, + columns=column_names) + # module.check_individual_write_imbalance(self.posix_data_imbalance_count, self.posix_shared_time_imbalance_detected_files2, + # self.report.name_records, self.dxt_posix, self.dxt_posix_write_data) + + def mpiio_processing(self): + if not self.mpiio_df: + return + + self.mpiio_df['counters'] = self.mpiio_df['counters'].assign( + id=lambda d: d['id'].astype(str)) # What does this do? + + + df_mpiio_collective_reads = self.mpiio_df['counters'] # .loc[(df_mpiio['counters']['MPIIO_COLL_READS'] > 0)] + + self.total_mpiio_read_operations = self.mpiio_df['counters']['MPIIO_INDEP_READS'].sum() + self.mpiio_df['counters'][ + 'MPIIO_COLL_READS'].sum() + + self.mpiio_coll_reads = self.mpiio_df['counters']['MPIIO_COLL_READS'].sum() + self.mpiio_indep_reads = self.mpiio_df['counters']['MPIIO_INDEP_READS'].sum() + + self.detected_files_mpi_coll_reads = [] + if self.mpiio_coll_reads == 0 and self.total_mpiio_read_operations and self.total_mpiio_read_operations > \ + module.thresholds['collective_operations_absolute'][0]: + files = pd.DataFrame(df_mpiio_collective_reads.groupby('id').sum()).reset_index() + for index, row in df_mpiio_collective_reads.iterrows(): + if ((row['MPIIO_INDEP_READS'] + row['MPIIO_INDEP_WRITES']) and + row['MPIIO_INDEP_READS'] / (row['MPIIO_INDEP_READS'] + row['MPIIO_INDEP_WRITES']) > + module.thresholds['collective_operations'][0] and + (row['MPIIO_INDEP_READS'] + row['MPIIO_INDEP_WRITES']) > + module.thresholds['collective_operations_absolute'][0]): + self.detected_files_mpi_coll_reads.append([ + row['id'], row['MPIIO_INDEP_READS'], + row['MPIIO_INDEP_READS'] / (row['MPIIO_INDEP_READS'] + row['MPIIO_INDEP_WRITES']) * 100 + ]) + + column_names = ['id', 'absolute_indep_reads', 'percent_indep_reads'] + self.detected_files_mpi_coll_reads = pd.DataFrame(self.detected_files_mpi_coll_reads, columns=column_names) - imbalance_count = 0 + # module.check_mpi_collective_read_operation(self.mpiio_coll_reads, self.mpiio_indep_reads, self.total_mpiio_read_operations, + # self.detected_files_mpi_coll_reads, self.report.name_records, self.dxt_mpiio) - detected_files = [] + # TODO: Split this into 2 functions for each module insight - for index, row in aggregated.iterrows(): - if row['POSIX_BYTES_READ_max'] and abs(row['POSIX_BYTES_READ_max'] - row['POSIX_BYTES_READ_min']) / row['POSIX_BYTES_READ_max'] > thresholds['imbalance_size'][0]: - imbalance_count += 1 - detected_files.append([ - row['id'], abs(row['POSIX_BYTES_READ_max'] - row['POSIX_BYTES_READ_min']) / row['POSIX_BYTES_READ_max'] * 100 + df_mpiio_collective_writes = self.mpiio_df['counters'] # .loc[(df_mpiio['counters']['MPIIO_COLL_WRITES'] > 0)] + + self.total_mpiio_write_operations = self.mpiio_df['counters']['MPIIO_INDEP_WRITES'].sum() + \ + self.mpiio_df['counters'][ + 'MPIIO_COLL_WRITES'].sum() + + self.mpiio_coll_writes = self.mpiio_df['counters']['MPIIO_COLL_WRITES'].sum() + self.mpiio_indep_writes = self.mpiio_df['counters']['MPIIO_INDEP_WRITES'].sum() + + self.detected_files_mpiio_coll_writes = [] + if self.mpiio_coll_writes == 0 and self.total_mpiio_write_operations and self.total_mpiio_write_operations > \ + module.thresholds['collective_operations_absolute'][0]: + files = pd.DataFrame(df_mpiio_collective_writes.groupby('id').sum()).reset_index() + + for index, row in df_mpiio_collective_writes.iterrows(): + if ((row['MPIIO_INDEP_READS'] + row['MPIIO_INDEP_WRITES']) and + row['MPIIO_INDEP_WRITES'] / (row['MPIIO_INDEP_READS'] + row['MPIIO_INDEP_WRITES']) > + module.thresholds['collective_operations'][0] and + (row['MPIIO_INDEP_READS'] + row['MPIIO_INDEP_WRITES']) > + module.thresholds['collective_operations_absolute'][0]): + self.detected_files_mpiio_coll_writes.append([ + row['id'], row['MPIIO_INDEP_WRITES'], + row['MPIIO_INDEP_WRITES'] / (row['MPIIO_INDEP_READS'] + row['MPIIO_INDEP_WRITES']) * 100 + ]) + + column_names = ['id', 'absolute_indep_writes', 'percent_indep_writes'] + self.detected_files_mpiio_coll_writes = pd.DataFrame(self.detected_files_mpiio_coll_writes, columns=column_names) + + # module.check_mpi_collective_write_operation(self.mpiio_coll_writes, self.mpiio_indep_writes, self.total_mpiio_write_operations, + # detected_files_mpiio_coll_writes, self.report.name_records, self.dxt_mpiio) + + def posix_imbalance2(self): + if not self.posix_df: + return + + self.imbalance_count_posix_shared_time = 0 + + self.posix_shared_time_imbalance_detected_files3 = [] + + for index, row in self.aggregated.iterrows(): + if row['POSIX_BYTES_READ_max'] and abs(row['POSIX_BYTES_READ_max'] - row['POSIX_BYTES_READ_min']) / row[ + 'POSIX_BYTES_READ_max'] > module.thresholds['imbalance_size'][0]: + self.imbalance_count_posix_shared_time += 1 + + self.posix_shared_time_imbalance_detected_files3.append([ + row['id'], + abs(row['POSIX_BYTES_READ_max'] - row['POSIX_BYTES_READ_min']) / row['POSIX_BYTES_READ_max'] * 100 ]) column_names = ['id', 'read_imbalance'] - detected_files = pd.DataFrame(detected_files, columns=column_names) - check_individual_read_imbalance(imbalance_count, detected_files, file_map, dxt_posix, dxt_posix_read_data) + self.posix_shared_time_imbalance_detected_files3 = pd.DataFrame(self.posix_shared_time_imbalance_detected_files3, + columns=column_names) + # module.check_individual_read_imbalance(self.imbalance_count_posix_shared_time, self.posix_shared_time_imbalance_detected_files3, + # self.report.name_records, self.dxt_posix, self.dxt_posix_read_data) - ######################################################################################################################################################################### + def hdf5_check(self): + if not self.mpiio_df: + return - if 'MPI-IO' in report.records: - # Check if application uses MPI-IO and collective operations - df_mpiio = report.records['MPI-IO'].to_df() - df_mpiio['counters'] = df_mpiio['counters'].assign(id=lambda d: d['id'].astype(str)) + self.report.name_records = self.report.name_records # Will this be optimised via JIT? Nvm CPython doesn't have JIT lol + for index, row in self.mpiio_df['counters'].iterrows(): + if self.report.name_records[int(row['id'])].endswith('.h5') or self.report.name_records[ + int(row['id'])].endswith('.hdf5'): + self.has_hdf5_extension = True + break # Early exit - # Get the files responsible - detected_files = [] + def mpiio_non_blocking(self): + if not self.mpiio_df: + return - df_mpiio_collective_reads = df_mpiio['counters'] #.loc[(df_mpiio['counters']['MPIIO_COLL_READS'] > 0)] + self.mpiio_nb_reads = self.mpiio_df['counters']['MPIIO_NB_READS'].sum() + self.mpiio_nb_writes = self.mpiio_df['counters']['MPIIO_NB_WRITES'].sum() - total_mpiio_read_operations = df_mpiio['counters']['MPIIO_INDEP_READS'].sum() + df_mpiio['counters']['MPIIO_COLL_READS'].sum() + # module.check_mpi_none_block_operation(self.mpiio_nb_reads, self.mpiio_nb_writes, self.has_hdf5_extension, + # self.modules) - mpiio_coll_reads = df_mpiio['counters']['MPIIO_COLL_READS'].sum() - mpiio_indep_reads = df_mpiio['counters']['MPIIO_INDEP_READS'].sum() + def CHECKnumber_of_aggregators(self): + hints = '' - detected_files = [] - if mpiio_coll_reads == 0 and total_mpiio_read_operations and total_mpiio_read_operations > thresholds['collective_operations_absolute'][0]: - files = pd.DataFrame(df_mpiio_collective_reads.groupby('id').sum()).reset_index() - for index, row in df_mpiio_collective_reads.iterrows(): - if ((row['MPIIO_INDEP_READS'] + row['MPIIO_INDEP_WRITES']) and - row['MPIIO_INDEP_READS'] / (row['MPIIO_INDEP_READS'] + row['MPIIO_INDEP_WRITES']) > thresholds['collective_operations'][0] and - (row['MPIIO_INDEP_READS'] + row['MPIIO_INDEP_WRITES']) > thresholds['collective_operations_absolute'][0]): + if 'h' in self.report.metadata['job']['metadata']: + hints = self.report.metadata['job']['metadata']['h'] - detected_files.append([ - row['id'], row['MPIIO_INDEP_READS'], row['MPIIO_INDEP_READS'] / (row['MPIIO_INDEP_READS'] + row['MPIIO_INDEP_WRITES']) * 100 - ]) - - column_names = ['id', 'absolute_indep_reads', 'percent_indep_reads'] - detected_files = pd.DataFrame(detected_files, columns=column_names) + if hints: + hints = hints.split(';') - check_mpi_collective_read_operation(mpiio_coll_reads, mpiio_indep_reads, total_mpiio_read_operations, detected_files, file_map, dxt_mpiio) + self.hints = hints - df_mpiio_collective_writes = df_mpiio['counters'] #.loc[(df_mpiio['counters']['MPIIO_COLL_WRITES'] > 0)] + if 'MPI-IO' in self.modules: - total_mpiio_write_operations = df_mpiio['counters']['MPIIO_INDEP_WRITES'].sum() + df_mpiio['counters']['MPIIO_COLL_WRITES'].sum() + for hint in hints: + if hint != 'no': + (key, value) = hint.split('=') - mpiio_coll_writes = df_mpiio['counters']['MPIIO_COLL_WRITES'].sum() - mpiio_indep_writes = df_mpiio['counters']['MPIIO_INDEP_WRITES'].sum() + if key == 'cb_nodes': + self.cb_nodes = value - detected_files = [] - if mpiio_coll_writes == 0 and total_mpiio_write_operations and total_mpiio_write_operations > thresholds['collective_operations_absolute'][0]: - files = pd.DataFrame(df_mpiio_collective_writes.groupby('id').sum()).reset_index() + # Try to get the number of compute nodes from SLURM, if not found, set as information + command = f'sacct --job {self.report.metadata["job"]["jobid"]} --format=JobID,JobIDRaw,NNodes,NCPUs --parsable2 --delimiter ","' - for index, row in df_mpiio_collective_writes.iterrows(): - if ((row['MPIIO_INDEP_READS'] + row['MPIIO_INDEP_WRITES']) and - row['MPIIO_INDEP_WRITES'] / (row['MPIIO_INDEP_READS'] + row['MPIIO_INDEP_WRITES']) > thresholds['collective_operations'][0] and - (row['MPIIO_INDEP_READS'] + row['MPIIO_INDEP_WRITES']) > thresholds['collective_operations_absolute'][0]): + arguments = shlex.split(command) - detected_files.append([ - row['id'], row['MPIIO_INDEP_WRITES'], row['MPIIO_INDEP_WRITES'] / (row['MPIIO_INDEP_READS'] + row['MPIIO_INDEP_WRITES']) * 100 - ]) + try: + result = subprocess.run(arguments, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - column_names = ['id', 'absolute_indep_writes', 'percent_indep_writes'] - detected_files = pd.DataFrame(detected_files, columns=column_names) + if result.returncode == 0: + # We have successfully fetched the information from SLURM + db = csv.DictReader(io.StringIO(result.stdout.decode('utf-8'))) - check_mpi_collective_write_operation(mpiio_coll_writes, mpiio_indep_writes, total_mpiio_write_operations, detected_files, file_map, dxt_mpiio) + try: + first = next(db) - ######################################################################################################################################################################### + if 'NNodes' in first: + self.number_of_compute_nodes = first['NNodes'] - # Look for usage of non-block operations + # Do we have one MPI-IO aggregator per node? + module.check_mpi_aggregator(self.cb_nodes, self.number_of_compute_nodes) + except StopIteration: + pass + except FileNotFoundError: + pass - # Look for HDF5 file extension + def something_else(self): + if 'start_time' in self.report.metadata['job']: + self.job_start = datetime.datetime.fromtimestamp(self.report.metadata['job']['start_time'], + datetime.timezone.utc) + self.job_end = datetime.datetime.fromtimestamp(self.report.metadata['job']['end_time'], + datetime.timezone.utc) + else: + self.job_start = datetime.datetime.fromtimestamp(self.report.metadata['job']['start_time_sec'], + datetime.timezone.utc) + self.job_end = datetime.datetime.fromtimestamp(self.report.metadata['job']['end_time_sec'], + datetime.timezone.utc) - has_hdf5_extension = False - for index, row in df_mpiio['counters'].iterrows(): - if file_map[int(row['id'])].endswith('.h5') or file_map[int(row['id'])].endswith('.hdf5'): - has_hdf5_extension = True - mpiio_nb_reads = df_mpiio['counters']['MPIIO_NB_READS'].sum() - mpiio_nb_writes = df_mpiio['counters']['MPIIO_NB_WRITES'].sum() +@dataclass +class DarshanTrace(AbstractDarshanTrace): + path: Optional[str] = None + report: Optional[darshan.DarshanReport] = None - check_mpi_none_block_operation(mpiio_nb_reads, mpiio_nb_writes, has_hdf5_extension, modules) + stdio_df: pd.DataFrame = None + posix_df: pd.DataFrame = None + mpiio_df: pd.DataFrame = None + lustre_df: pd.DataFrame = None - ######################################################################################################################################################################### + dxt_posix: pd.DataFrame = None + dxt_mpiio: pd.DataFrame = None - # Nodes and MPI-IO aggregators - # If the application uses collective reads or collective writes, look for the number of aggregators - hints = '' + dxt_posix_read_data: pd.DataFrame = None + dxt_posix_write_data: pd.DataFrame = None - if 'h' in job['job']['metadata']: - hints = job['job']['metadata']['h'] + shared_files: pd.DataFrame = None - if hints: - hints = hints.split(';') + def __init__(self, trace_path: str, job_information, report: darshan.DarshanReport): + self.path = trace_path - # print('Hints: ', hints) + self.jobid = job_information['jobid'] + self.log_ver = job_information['log_ver'] if 'log_ver' in job_information else job_information['metadata'][ + 'lib_ver'] + self.exe = report.metadata['exe'] - NUMBER_OF_COMPUTE_NODES = 0 + _start_time = datetime.datetime.fromtimestamp(job_information['start_time_sec'], tz=datetime.timezone.utc) + _end_time = datetime.datetime.fromtimestamp(job_information['end_time_sec'], tz=datetime.timezone.utc) + self.time = TimestampPair(_start_time, _end_time) - if 'MPI-IO' in modules: - cb_nodes = None + self.modules = report.modules.keys() - for hint in hints: - if hint != 'no': - (key, value) = hint.split('=') - - if key == 'cb_nodes': - cb_nodes = value + # TODO: Should I search in self.modules or in report.records? + # ! All dfs are being materialised + self.report = report + self.posix_df = report.records['POSIX'].to_df() if 'POSIX' in self.modules else None + self.stdio_df = report.records['STDIO'].to_df() if 'STDIO' in self.modules else None + self.mpiio_df = report.records['MPI-IO'].to_df() if 'MPI-IO' in self.modules else None - # Try to get the number of compute nodes from SLURM, if not found, set as information - command = 'sacct --job {} --format=JobID,JobIDRaw,NNodes,NCPUs --parsable2 --delimiter ","'.format( - job['job']['jobid'] - ) + self.lustre_df = report.records['LUSTRE'].to_df() if 'LUSTRE' in self.modules else None - arguments = shlex.split(command) + self.dxt_posix = report.records['DXT_POSIX'].to_df() if 'DXT_POSIX' in self.modules else None + self.dxt_mpiio = report.records['DXT_MPIIO'].to_df() if 'DXT_MPIIO' in self.modules else None - try: - result = subprocess.run(arguments, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + self.hints = [] + self.files = {} - if result.returncode == 0: - # We have successfully fetched the information from SLURM - db = csv.DictReader(io.StringIO(result.stdout.decode('utf-8'))) - try: - first = next(db) +@dataclass +class AggregatedDarshanTraces(AbstractDarshanTrace): + + traces: List[DarshanTrace] = field(default_factory=list) + # reports: List[darshan.DarshanReport] = field(default_factory=list) + + def __init__(self, traces: List[DarshanTrace]): + assert len(traces) > 0 + self.traces = traces + + reports = [current_trace.report for current_trace in traces] + self.name_records = dict() + for report in reports: + self.name_records.update(report.name_records) # self.name_records |= report.name_records + + def aggregate_traces(self): + self.modules = set() + self.files = dict() + for current_trace in self.traces: + self.modules.union(current_trace.modules) + + + self.total_write_size_stdio += current_trace.total_write_size_stdio + self.total_write_size_stdio += current_trace.total_write_size_stdio + self.total_size_stdio += current_trace.total_size_stdio + + self.total_write_size_posix += current_trace.total_write_size_posix + self.total_read_size_posix += current_trace.total_read_size_posix + self.total_size_posix += current_trace.total_size_posix + + self.total_write_size_mpiio += current_trace.total_write_size_mpiio + self.total_read_size_mpiio += current_trace.total_read_size_mpiio + self.total_size_mpiio += current_trace.total_size_mpiio + + self.total_size += current_trace.total_size + self.total_files += current_trace.total_files + ### + self.max_read_offset = max(self.max_read_offset, current_trace.max_read_offset) + self.max_write_offset = max(self.max_read_offset, current_trace.max_write_offset) + ### + + self.total_files_stdio += current_trace.total_files_stdio + self.total_files_posix += current_trace.total_size_posix + self.total_files_mpiio += current_trace.total_files_mpiio + + self.files.update(current_trace.files) # self.files |= current_trace.files + + self.total_reads += current_trace.total_reads + self.total_writes += current_trace.total_writes + self.total_operations += current_trace.total_operations + self.total_read_size += current_trace.total_read_size + self.total_written_size += current_trace.total_written_size + self.total_posix_size += current_trace.total_posix_size + self.total_reads_small += current_trace.total_reads_small + self.total_writes_small += current_trace.total_writes_small + + self.total_mem_not_aligned += current_trace.total_mem_not_aligned + self.total_file_not_aligned += current_trace.total_file_not_aligned + + self.read_consecutive += current_trace.read_consecutive + self.read_sequential += current_trace.read_sequential + self.read_random += current_trace.read_random + self.write_consecutive += current_trace.write_consecutive + self.write_sequential += current_trace.write_sequential + self.write_random += current_trace.write_random + + self.total_shared_reads += current_trace.total_shared_reads + self.total_shared_reads_small += current_trace.total_shared_reads_small + self.total_shared_writes += current_trace.total_shared_writes + self.total_shared_writes_small += current_trace.total_shared_writes_small + + self.count_long_metadata += current_trace.count_long_metadata + + self.posix_shared_data_imbalance_stragglers_count += current_trace.posix_shared_data_imbalance_stragglers_count + + self.has_hdf5_extension = self.has_hdf5_extension or current_trace.has_hdf5_extension + + self.mpiio_nb_reads += current_trace.mpiio_nb_reads + self.mpiio_nb_writes += current_trace.mpiio_nb_writes + +def log_relation_check(): + # TODO: Ensure that all logs are from a single job, generated at the same time, from the same executable and using the same library version + pass + + +def handler(): + console = config.init_console() + + insights_start_time = time.time() + + darshan.enable_experimental() + library_version = darshanll.get_lib_version() + + # trace_path = args.log_paths[0] # TODO: A single file rn + darshan_traces = [] + + + for trace_path in args.log_paths: + log = darshanll.log_open(trace_path) + information = darshanll.log_get_job(log) + darshanll.log_close(log) + + report = darshan.DarshanReport(trace_path) + current_trace = DarshanTrace(trace_path, information, report) + darshan_traces.append(current_trace) + # + + # Leave this as is for now + # # Make sure log format is of the same version + # filename = args.trace_path + # # check_log_version(console, args.trace_path, log_version, library_version) + # + + # Compute values for each trace + for current_trace in darshan_traces: + current_trace.generate_dxt_posix_rw_df() + current_trace.calculate_insights() + current_trace.files_stuff() + # current_trace.check_stdio() + # current_trace.check_mpiio() + current_trace.something() + current_trace.small_operation_calculation() + current_trace.posix_alignment() + current_trace.posix_redundant_reads() + current_trace.posix_random_check() + current_trace.posix_shared_file() + current_trace.posix_long_metadata() + current_trace.posix_stragglers() + current_trace.posix_stragglers2() + current_trace.posix_imbalance() + current_trace.hdf5_check() + current_trace.mpiio_non_blocking() + current_trace.CHECKnumber_of_aggregators() + current_trace.something_else() + # current_trace.generate_insights() + + # Create aggregated trace + aggregated_trace = AggregatedDarshanTraces(traces=darshan_traces) + aggregated_trace.aggregate_traces() + aggregated_trace.generate_insights() - if 'NNodes' in first: - NUMBER_OF_COMPUTE_NODES = first['NNodes'] - # Do we have one MPI-IO aggregator per node? - check_mpi_aggregator(cb_nodes, NUMBER_OF_COMPUTE_NODES) - except StopIteration: - pass - except FileNotFoundError: - pass - - ######################################################################################################################################################################### insights_end_time = time.time() # Version 3.4.1 of py-darshan changed the contents on what is reported in 'job' - if 'start_time' in job['job']: - job_start = datetime.datetime.fromtimestamp(job['job']['start_time'], datetime.timezone.utc) - job_end = datetime.datetime.fromtimestamp(job['job']['end_time'], datetime.timezone.utc) + job_end, job_start = set_job_time(report) + + print_insights(console, current_trace, insights_end_time, insights_start_time, job_end, job_start, trace_path, + report) + + export_results(console, trace_path, report) + + +def set_job_time(report): + if 'start_time' in report.metadata['job']: + job_start = datetime.datetime.fromtimestamp(report.metadata['job']['start_time'], datetime.timezone.utc) + job_end = datetime.datetime.fromtimestamp(report.metadata['job']['end_time'], datetime.timezone.utc) else: - job_start = datetime.datetime.fromtimestamp(job['job']['start_time_sec'], datetime.timezone.utc) - job_end = datetime.datetime.fromtimestamp(job['job']['end_time_sec'], datetime.timezone.utc) + job_start = datetime.datetime.fromtimestamp(report.metadata['job']['start_time_sec'], datetime.timezone.utc) + job_end = datetime.datetime.fromtimestamp(report.metadata['job']['end_time_sec'], datetime.timezone.utc) + return job_end, job_start - console.print() +def export_results(console, log_path, report): + # Export to HTML, SVG, and CSV + trace_name = os.path.basename(log_path).replace('.darshan', '') + out_dir = args.export_dir if args.export_dir != "" else os.getcwd() + module.export_html(console, out_dir, trace_name) + module.export_svg(console, out_dir, trace_name) + module.export_csv(out_dir, trace_name, report.metadata['job']['jobid']) + + +def print_insights(console, current_trace, insights_end_time, insights_start_time, job_end, job_start, log_path, + report): + console.print() console.print( Panel( '\n'.join([ ' [b]JOB[/b]: [white]{}[/white]'.format( - job['job']['jobid'] + report.metadata['job']['jobid'] ), ' [b]EXECUTABLE[/b]: [white]{}[/white]'.format( - job['exe'].split()[0] + report.metadata['exe'].split()[0] ), ' [b]DARSHAN[/b]: [white]{}[/white]'.format( - os.path.basename(args.log_path) + os.path.basename(log_path) ), ' [b]EXECUTION TIME[/b]: [white]{} to {} ({:.2f} hours)[/white]'.format( job_start, @@ -719,19 +1105,20 @@ def handler(): (job_end - job_start).total_seconds() / 3600 ), ' [b]FILES[/b]: [white]{} files ({} use STDIO, {} use POSIX, {} use MPI-IO)[/white]'.format( - total_files, - total_files_stdio, - total_files_posix - total_files_mpiio, # Since MPI-IO files will always use POSIX, we can decrement to get a unique count - total_files_mpiio + current_trace.total_files_posix, + current_trace.total_files_stdio, + current_trace.total_files_posix - current_trace.total_files_mpiio, + # Since MPI-IO files will always use POSIX, we can decrement to get a unique count + current_trace.total_files_mpiio ), ' [b]COMPUTE NODES[/b] [white]{}[/white]'.format( - NUMBER_OF_COMPUTE_NODES + current_trace.number_of_compute_nodes ), ' [b]PROCESSES[/b] [white]{}[/white]'.format( - job['job']['nprocs'] + report.metadata['job']['nprocs'] ), ' [b]HINTS[/b]: [white]{}[/white]'.format( - ' '.join(hints) + ' '.join(current_trace.hints) ) ]), title='[b][slate_blue3]DRISHTI[/slate_blue3] v.0.5[/b]', @@ -745,22 +1132,7 @@ def handler(): padding=1 ) ) - console.print() - - display_content(console) - display_thresholds(console) - display_footer(console, insights_start_time, insights_end_time) - - filename = '{}.html'.format(args.log_path) - export_html(console, filename) - - filename = '{}.svg'.format(args.log_path) - export_svg(console, filename) - - filename = '{}-summary.csv'.format( - args.log_path.replace('.darshan', '') - ) - - export_csv(filename, job['job']['jobid']) - + module.display_content(console) + module.display_thresholds(console) + module.display_footer(console, insights_start_time, insights_end_time) diff --git a/drishti/handlers/handle_recorder.py b/drishti/handlers/handle_recorder.py index 34c4790..1f30494 100644 --- a/drishti/handlers/handle_recorder.py +++ b/drishti/handlers/handle_recorder.py @@ -2,10 +2,11 @@ import os import time + import pandas as pd +from includes.module import * from recorder_utils import RecorderReader from recorder_utils.build_offset_intervals import build_offset_intervals -from drishti.includes.module import * def get_accessed_files(reader): @@ -577,23 +578,12 @@ def process_helper(file_map, df_intervals, df_posix_records, fid=None): display_thresholds(console) display_footer(console, insights_start_time, insights_end_time) + # Export to HTML, SVG, and CSV + trace_name = os.path.basename(os.path.dirname(args.log_path)) if args.split_files: - filename = '{}.{}.html'.format(args.log_path, fid) - else: - filename = '{}.html'.format(args.log_path) - - export_html(console, filename) - - if args.split_files: - filename = '{}.{}.svg'.format(args.log_path, fid) - else: - filename = '{}.svg'.format(args.log_path) - - export_svg(console, filename) - - if args.split_files: - filename = '{}.{}.summary.csv'.format(args.log_path, fid) - else: - filename = '{}-summary.csv'.format(args.log_path) - export_csv(filename) + trace_name = f"{trace_name}.{fid}" + out_dir = args.export_dir if args.export_dir != "" else os.getcwd() + export_html(console, out_dir, trace_name) + export_svg(console, out_dir, trace_name) + export_csv(out_dir, trace_name) diff --git a/drishti/includes/config.py b/drishti/includes/config.py index 15097fd..edad899 100644 --- a/drishti/includes/config.py +++ b/drishti/includes/config.py @@ -1,16 +1,13 @@ #!/usr/bin/env python3 -import os import json +import os +from drishti.includes.parser import * from rich.console import Console, Group from rich.padding import Padding from rich.panel import Panel -from rich.terminal_theme import TerminalTheme -from rich.terminal_theme import MONOKAI - -from drishti.includes.parser import * - +from rich.terminal_theme import MONOKAI, TerminalTheme RECOMMENDATIONS = 0 HIGH = 1 diff --git a/drishti/includes/module.py b/drishti/includes/module.py index dedaa09..3e7bd94 100644 --- a/drishti/includes/module.py +++ b/drishti/includes/module.py @@ -1,12 +1,13 @@ #!/usr/bin/env python3 -import datetime import csv +import datetime import time + import pandas as pd +from drishti.includes.config import * from rich import box from rich.syntax import Syntax -from drishti.includes.config import * ''' Before calling the functions below @@ -1823,76 +1824,90 @@ def display_footer(console, insights_start_time, insights_end_time): ) ) -def export_html(console, filename): - ''' - ''' - if args.export_html: - console.save_html( - filename, - theme=set_export_theme(), - clear=False - ) +def export_html(console, export_dir, trace_name): + if not args.export_html: + return + os.makedirs(export_dir, exist_ok=True) # Ensure export directory exists + filepath = os.path.join(export_dir, f"{trace_name}.html") -def export_svg(console, filename): - if args.export_svg: - console.save_svg( - filename, - title='Drishti', - theme=set_export_theme(), - clear=False - ) + console.save_html( + filepath, + theme=set_export_theme(), + clear=False + ) -def export_csv(filename, jobid=None): - if args.export_csv: - issues = [ - 'JOB', - INSIGHTS_STDIO_HIGH_USAGE, - INSIGHTS_POSIX_WRITE_COUNT_INTENSIVE, - INSIGHTS_POSIX_READ_COUNT_INTENSIVE, - INSIGHTS_POSIX_WRITE_SIZE_INTENSIVE, - INSIGHTS_POSIX_READ_SIZE_INTENSIVE, - INSIGHTS_POSIX_HIGH_SMALL_READ_REQUESTS_USAGE, - INSIGHTS_POSIX_HIGH_SMALL_WRITE_REQUESTS_USAGE, - INSIGHTS_POSIX_HIGH_MISALIGNED_MEMORY_USAGE, - INSIGHTS_POSIX_HIGH_MISALIGNED_FILE_USAGE, - INSIGHTS_POSIX_REDUNDANT_READ_USAGE, - INSIGHTS_POSIX_REDUNDANT_WRITE_USAGE, - INSIGHTS_POSIX_HIGH_RANDOM_READ_USAGE, - INSIGHTS_POSIX_HIGH_SEQUENTIAL_READ_USAGE, - INSIGHTS_POSIX_HIGH_RANDOM_WRITE_USAGE, - INSIGHTS_POSIX_HIGH_SEQUENTIAL_WRITE_USAGE, - INSIGHTS_POSIX_HIGH_SMALL_READ_REQUESTS_SHARED_FILE_USAGE, - INSIGHTS_POSIX_HIGH_SMALL_WRITE_REQUESTS_SHARED_FILE_USAGE, - INSIGHTS_POSIX_HIGH_METADATA_TIME, - INSIGHTS_POSIX_SIZE_IMBALANCE, - INSIGHTS_POSIX_TIME_IMBALANCE, - INSIGHTS_POSIX_INDIVIDUAL_WRITE_SIZE_IMBALANCE, - INSIGHTS_POSIX_INDIVIDUAL_READ_SIZE_IMBALANCE, - INSIGHTS_MPI_IO_NO_USAGE, - INSIGHTS_MPI_IO_NO_COLLECTIVE_READ_USAGE, - INSIGHTS_MPI_IO_NO_COLLECTIVE_WRITE_USAGE, - INSIGHTS_MPI_IO_COLLECTIVE_READ_USAGE, - INSIGHTS_MPI_IO_COLLECTIVE_WRITE_USAGE, - INSIGHTS_MPI_IO_BLOCKING_READ_USAGE, - INSIGHTS_MPI_IO_BLOCKING_WRITE_USAGE, - INSIGHTS_MPI_IO_AGGREGATORS_INTRA, - INSIGHTS_MPI_IO_AGGREGATORS_INTER, - INSIGHTS_MPI_IO_AGGREGATORS_OK - ] - if codes: - issues.extend(codes) +def export_svg(console, export_dir, trace_name): + if not args.export_svg: + return + + os.makedirs(export_dir, exist_ok=True) # Ensure export directory exists + filepath = os.path.join(export_dir, f"{trace_name}.svg") + + console.save_svg( + filepath, + title='Drishti', + theme=set_export_theme(), + clear=False + ) + - detected_issues = dict.fromkeys(issues, False) - detected_issues['JOB'] = jobid +def export_csv(export_dir, trace_name, jobid=None): + if not args.export_csv: + return + + issues = [ + 'JOB', + INSIGHTS_STDIO_HIGH_USAGE, + INSIGHTS_POSIX_WRITE_COUNT_INTENSIVE, + INSIGHTS_POSIX_READ_COUNT_INTENSIVE, + INSIGHTS_POSIX_WRITE_SIZE_INTENSIVE, + INSIGHTS_POSIX_READ_SIZE_INTENSIVE, + INSIGHTS_POSIX_HIGH_SMALL_READ_REQUESTS_USAGE, + INSIGHTS_POSIX_HIGH_SMALL_WRITE_REQUESTS_USAGE, + INSIGHTS_POSIX_HIGH_MISALIGNED_MEMORY_USAGE, + INSIGHTS_POSIX_HIGH_MISALIGNED_FILE_USAGE, + INSIGHTS_POSIX_REDUNDANT_READ_USAGE, + INSIGHTS_POSIX_REDUNDANT_WRITE_USAGE, + INSIGHTS_POSIX_HIGH_RANDOM_READ_USAGE, + INSIGHTS_POSIX_HIGH_SEQUENTIAL_READ_USAGE, + INSIGHTS_POSIX_HIGH_RANDOM_WRITE_USAGE, + INSIGHTS_POSIX_HIGH_SEQUENTIAL_WRITE_USAGE, + INSIGHTS_POSIX_HIGH_SMALL_READ_REQUESTS_SHARED_FILE_USAGE, + INSIGHTS_POSIX_HIGH_SMALL_WRITE_REQUESTS_SHARED_FILE_USAGE, + INSIGHTS_POSIX_HIGH_METADATA_TIME, + INSIGHTS_POSIX_SIZE_IMBALANCE, + INSIGHTS_POSIX_TIME_IMBALANCE, + INSIGHTS_POSIX_INDIVIDUAL_WRITE_SIZE_IMBALANCE, + INSIGHTS_POSIX_INDIVIDUAL_READ_SIZE_IMBALANCE, + INSIGHTS_MPI_IO_NO_USAGE, + INSIGHTS_MPI_IO_NO_COLLECTIVE_READ_USAGE, + INSIGHTS_MPI_IO_NO_COLLECTIVE_WRITE_USAGE, + INSIGHTS_MPI_IO_COLLECTIVE_READ_USAGE, + INSIGHTS_MPI_IO_COLLECTIVE_WRITE_USAGE, + INSIGHTS_MPI_IO_BLOCKING_READ_USAGE, + INSIGHTS_MPI_IO_BLOCKING_WRITE_USAGE, + INSIGHTS_MPI_IO_AGGREGATORS_INTRA, + INSIGHTS_MPI_IO_AGGREGATORS_INTER, + INSIGHTS_MPI_IO_AGGREGATORS_OK + ] + if codes: + issues.extend(codes) + + detected_issues = dict.fromkeys(issues, False) + detected_issues['JOB'] = jobid + + for report in csv_report: + detected_issues[report] = True - for report in csv_report: - detected_issues[report] = True + + os.makedirs(export_dir, exist_ok=True) # Ensure export directory exists + filepath = os.path.join(export_dir, f"{trace_name}.csv") - with open(filename, 'w') as f: - w = csv.writer(f) - w.writerow(detected_issues.keys()) - w.writerow(detected_issues.values()) + with open(filepath, 'w') as f: + w = csv.writer(f) + w.writerow(detected_issues.keys()) + w.writerow(detected_issues.values()) diff --git a/drishti/includes/parser.py b/drishti/includes/parser.py index 8659520..842874e 100644 --- a/drishti/includes/parser.py +++ b/drishti/includes/parser.py @@ -5,7 +5,8 @@ ) parser.add_argument( - 'log_path', + 'log_paths', + nargs='+', help='Input .darshan file or recorder folder' ) @@ -96,6 +97,13 @@ help='Export a CSV with the code of all issues that were triggered' ) +parser.add_argument( + '--export_dir', + default="", + dest='export_dir', + help='Specify the directory prefix for the output files (if any)' +) + parser.add_argument( '--json', default=False, diff --git a/drishti/reporter.py b/drishti/reporter.py index 8455040..c07be9a 100644 --- a/drishti/reporter.py +++ b/drishti/reporter.py @@ -3,8 +3,10 @@ import os import sys from subprocess import call -from drishti.includes.parser import * +from typing import List, Optional +# from includes.parser import * # imports {'parser', 'args', 'argparse'} # TODO: Is next line enuf +from drishti.includes.parser import args ''' |- handler_darshan -| @@ -17,7 +19,6 @@ |-----> /includes -> module -> config -> parser ''' - LOG_TYPE_DARSHAN = 0 LOG_TYPE_RECORDER = 1 @@ -29,27 +30,54 @@ def clear(): _ = call('clear' if os.name == 'posix' else 'cls') -def check_log_type(path): - if path.endswith('.darshan'): - if not os.path.isfile(path): - print('Unable to open .darshan file.') +def check_log_type(paths: List[str]) -> Optional[int]: + is_darshan = True + is_recorder = True + multiple_logs = len(paths) > 1 + + for path in paths: + if path.endswith('.darshan'): + if not os.path.isfile(path): + print('Unable to open .darshan file.') + sys.exit(os.EX_NOINPUT) + else: + is_darshan = True and is_darshan + is_recorder = False and is_recorder + else: # check whether is a valid recorder log + if not os.path.isdir(path): + print('Unable to open recorder folder.') + sys.exit(os.EX_NOINPUT) + else: + is_recorder = True and is_recorder + is_darshan = False and is_darshan + + if multiple_logs: + if is_darshan: + return LOG_TYPE_DARSHAN + else: + print('Only .darshan files are supported for multiple logs.') #TODO sys.exit(os.EX_NOINPUT) - else: return LOG_TYPE_DARSHAN - else: # check whether is a valid recorder log - if not os.path.isdir(path): - print('Unable to open recorder folder.') + else: + if is_darshan and not is_recorder: + return LOG_TYPE_DARSHAN + elif is_recorder and not is_darshan: + return LOG_TYPE_RECORDER + else: + print('Unable to reliably determine the log type.') sys.exit(os.EX_NOINPUT) - else: return LOG_TYPE_RECORDER def main(): - log_type = check_log_type(args.log_path) - + log_type = check_log_type(args.log_paths) + if log_type == LOG_TYPE_DARSHAN: from drishti.handlers.handle_darshan import handler elif log_type == LOG_TYPE_RECORDER: from drishti.handlers.handle_recorder import handler - + handler() + +if __name__ == '__main__': + main() diff --git a/requirements.txt b/requirements.txt index 65461cb..8700c81 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,5 @@ argparse -darshan==3.4.4.0 +darshan>=3.4.3.0 pandas rich==12.5.1 recorder-utils diff --git a/setup.py b/setup.py index 3e75113..a93a8ce 100644 --- a/setup.py +++ b/setup.py @@ -1,4 +1,4 @@ -from setuptools import setup, find_packages +from setuptools import find_packages, setup with open("README.md", "r") as f: long_description = f.read() @@ -19,7 +19,7 @@ install_requires=[ 'argparse', 'pandas', - 'darshan==3.4.4.0', + 'darshan>=3.4.4.0', 'rich==12.5.1', 'recorder-utils', ],