diff --git a/.github/workflows/drishti-darshan-3.4.0.yml b/.github/workflows/drishti-darshan-3.4.0.yml
index 6535241..3665a98 100644
--- a/.github/workflows/drishti-darshan-3.4.0.yml
+++ b/.github/workflows/drishti-darshan-3.4.0.yml
@@ -12,7 +12,7 @@ jobs:
timeout-minutes: 60
steps:
- - uses: actions/checkout@v2
+ - uses: actions/checkout@v4
with:
submodules: true
@@ -93,7 +93,7 @@ jobs:
- name: Upload Artifact
if: always()
- uses: actions/upload-artifact@v2
+ uses: actions/upload-artifact@v4
with:
name: tests
path: sample/**
diff --git a/.github/workflows/drishti-darshan-3.4.1.yml b/.github/workflows/drishti-darshan-3.4.1.yml
index 29ed011..155f6e7 100644
--- a/.github/workflows/drishti-darshan-3.4.1.yml
+++ b/.github/workflows/drishti-darshan-3.4.1.yml
@@ -12,7 +12,7 @@ jobs:
timeout-minutes: 60
steps:
- - uses: actions/checkout@v2
+ - uses: actions/checkout@v4
with:
submodules: true
@@ -93,7 +93,7 @@ jobs:
- name: Upload Artifact
if: always()
- uses: actions/upload-artifact@v2
+ uses: actions/upload-artifact@v4
with:
name: tests
path: sample/**
diff --git a/.github/workflows/drishti-darshan-3.4.2.yml b/.github/workflows/drishti-darshan-3.4.2.yml
index 0398f37..686aaa2 100644
--- a/.github/workflows/drishti-darshan-3.4.2.yml
+++ b/.github/workflows/drishti-darshan-3.4.2.yml
@@ -12,7 +12,7 @@ jobs:
timeout-minutes: 60
steps:
- - uses: actions/checkout@v2
+ - uses: actions/checkout@v4
with:
submodules: true
@@ -93,7 +93,7 @@ jobs:
- name: Upload Artifact
if: always()
- uses: actions/upload-artifact@v2
+ uses: actions/upload-artifact@v4
with:
name: tests
path: sample/**
diff --git a/.gitignore b/.gitignore
index 2ed7bdc..29bd232 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,3 +1,310 @@
+tensorflow_unet3d_darshan_per_rank_workload/
+
+# Created by https://www.toptal.com/developers/gitignore/api/python,visualstudiocode,pycharm
+# Edit at https://www.toptal.com/developers/gitignore?templates=python,visualstudiocode,pycharm
+
+### PyCharm ###
+# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio, WebStorm and Rider
+# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839
+
+# User-specific stuff
+.idea/**/workspace.xml
+.idea/**/tasks.xml
+.idea/**/usage.statistics.xml
+.idea/**/dictionaries
+.idea/**/shelf
+
+# AWS User-specific
+.idea/**/aws.xml
+
+# Generated files
+.idea/**/contentModel.xml
+
+# Sensitive or high-churn files
+.idea/**/dataSources/
+.idea/**/dataSources.ids
+.idea/**/dataSources.local.xml
+.idea/**/sqlDataSources.xml
+.idea/**/dynamic.xml
+.idea/**/uiDesigner.xml
+.idea/**/dbnavigator.xml
+
+# Gradle
+.idea/**/gradle.xml
+.idea/**/libraries
+
+# Gradle and Maven with auto-import
+# When using Gradle or Maven with auto-import, you should exclude module files,
+# since they will be recreated, and may cause churn. Uncomment if using
+# auto-import.
+# .idea/artifacts
+# .idea/compiler.xml
+# .idea/jarRepositories.xml
+# .idea/modules.xml
+# .idea/*.iml
+# .idea/modules
+# *.iml
+# *.ipr
+
+# CMake
+cmake-build-*/
+
+# Mongo Explorer plugin
+.idea/**/mongoSettings.xml
+
+# File-based project format
+*.iws
+
+# IntelliJ
+out/
+
+# mpeltonen/sbt-idea plugin
+.idea_modules/
+
+# JIRA plugin
+atlassian-ide-plugin.xml
+
+# Cursive Clojure plugin
+.idea/replstate.xml
+
+# SonarLint plugin
+.idea/sonarlint/
+
+# Crashlytics plugin (for Android Studio and IntelliJ)
+com_crashlytics_export_strings.xml
+crashlytics.properties
+crashlytics-build.properties
+fabric.properties
+
+# Editor-based Rest Client
+.idea/httpRequests
+
+# Android studio 3.1+ serialized cache file
+.idea/caches/build_file_checksums.ser
+
+### PyCharm Patch ###
+# Comment Reason: https://github.com/joeblau/gitignore.io/issues/186#issuecomment-215987721
+
+# *.iml
+# modules.xml
+# .idea/misc.xml
+# *.ipr
+
+# Sonarlint plugin
+# https://plugins.jetbrains.com/plugin/7973-sonarlint
+.idea/**/sonarlint/
+
+# SonarQube Plugin
+# https://plugins.jetbrains.com/plugin/7238-sonarqube-community-plugin
+.idea/**/sonarIssues.xml
+
+# Markdown Navigator plugin
+# https://plugins.jetbrains.com/plugin/7896-markdown-navigator-enhanced
+.idea/**/markdown-navigator.xml
+.idea/**/markdown-navigator-enh.xml
+.idea/**/markdown-navigator/
+
+# Cache file creation bug
+# See https://youtrack.jetbrains.com/issue/JBR-2257
+.idea/$CACHE_FILE$
+
+# CodeStream plugin
+# https://plugins.jetbrains.com/plugin/12206-codestream
+.idea/codestream.xml
+
+# Azure Toolkit for IntelliJ plugin
+# https://plugins.jetbrains.com/plugin/8053-azure-toolkit-for-intellij
+.idea/**/azureSettings.xml
+
+### Python ###
+# Byte-compiled / optimized / DLL files
+__pycache__/
+*.py[cod]
+*$py.class
+
+# C extensions
+*.so
+
+# Distribution / packaging
+.Python
build/
+develop-eggs/
dist/
-*.egg-info/
\ No newline at end of file
+downloads/
+eggs/
+.eggs/
+lib/
+lib64/
+parts/
+sdist/
+var/
+wheels/
+share/python-wheels/
+*.egg-info/
+.installed.cfg
+*.egg
+MANIFEST
+
+# PyInstaller
+# Usually these files are written by a python script from a template
+# before PyInstaller builds the exe, so as to inject date/other infos into it.
+*.manifest
+*.spec
+
+# Installer logs
+pip-log.txt
+pip-delete-this-directory.txt
+
+# Unit test / coverage reports
+htmlcov/
+.tox/
+.nox/
+.coverage
+.coverage.*
+.cache
+nosetests.xml
+coverage.xml
+*.cover
+*.py,cover
+.hypothesis/
+.pytest_cache/
+cover/
+
+# Translations
+*.mo
+*.pot
+
+# Django stuff:
+*.log
+local_settings.py
+db.sqlite3
+db.sqlite3-journal
+
+# Flask stuff:
+instance/
+.webassets-cache
+
+# Scrapy stuff:
+.scrapy
+
+# Sphinx documentation
+docs/_build/
+
+# PyBuilder
+.pybuilder/
+target/
+
+# Jupyter Notebook
+.ipynb_checkpoints
+
+# IPython
+profile_default/
+ipython_config.py
+
+# pyenv
+# For a library or package, you might want to ignore these files since the code is
+# intended to run in multiple environments; otherwise, check them in:
+# .python-version
+
+# pipenv
+# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
+# However, in case of collaboration, if having platform-specific dependencies or dependencies
+# having no cross-platform support, pipenv may install dependencies that don't work, or not
+# install all needed dependencies.
+#Pipfile.lock
+
+# poetry
+# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
+# This is especially recommended for binary packages to ensure reproducibility, and is more
+# commonly ignored for libraries.
+# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
+#poetry.lock
+
+# pdm
+# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
+#pdm.lock
+# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
+# in version control.
+# https://pdm.fming.dev/#use-with-ide
+.pdm.toml
+
+# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
+__pypackages__/
+
+# Celery stuff
+celerybeat-schedule
+celerybeat.pid
+
+# SageMath parsed files
+*.sage.py
+
+# Environments
+.env
+.venv
+env/
+venv/
+ENV/
+env.bak/
+venv.bak/
+
+# Spyder project settings
+.spyderproject
+.spyproject
+
+# Rope project settings
+.ropeproject
+
+# mkdocs documentation
+/site
+
+# mypy
+.mypy_cache/
+.dmypy.json
+dmypy.json
+
+# Pyre type checker
+.pyre/
+
+# pytype static type analyzer
+.pytype/
+
+# Cython debug symbols
+cython_debug/
+
+# PyCharm
+# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
+# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
+# and can be added to the global gitignore or merged into this file. For a more nuclear
+# option (not recommended) you can uncomment the following to ignore the entire idea folder.
+#.idea/
+
+### Python Patch ###
+# Poetry local configuration file - https://python-poetry.org/docs/configuration/#local-configuration
+poetry.toml
+
+# ruff
+.ruff_cache/
+
+# LSP config files
+pyrightconfig.json
+
+### VisualStudioCode ###
+.vscode/*
+!.vscode/settings.json
+!.vscode/tasks.json
+!.vscode/launch.json
+!.vscode/extensions.json
+!.vscode/*.code-snippets
+
+# Local History for Visual Studio Code
+.history/
+
+# Built Visual Studio Code Extensions
+*.vsix
+
+### VisualStudioCode Patch ###
+# Ignore all local history of files
+.history
+.ionide
+
+# End of https://www.toptal.com/developers/gitignore/api/python,visualstudiocode,pycharm
diff --git a/.idea/.gitignore b/.idea/.gitignore
new file mode 100644
index 0000000..13566b8
--- /dev/null
+++ b/.idea/.gitignore
@@ -0,0 +1,8 @@
+# Default ignored files
+/shelf/
+/workspace.xml
+# Editor-based HTTP Client requests
+/httpRequests/
+# Datasource local storage ignored files
+/dataSources/
+/dataSources.local.xml
diff --git a/.idea/feat-multiple-darshan-files.iml b/.idea/feat-multiple-darshan-files.iml
new file mode 100644
index 0000000..a319e2b
--- /dev/null
+++ b/.idea/feat-multiple-darshan-files.iml
@@ -0,0 +1,25 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/inspectionProfiles/profiles_settings.xml b/.idea/inspectionProfiles/profiles_settings.xml
new file mode 100644
index 0000000..105ce2d
--- /dev/null
+++ b/.idea/inspectionProfiles/profiles_settings.xml
@@ -0,0 +1,6 @@
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/misc.xml b/.idea/misc.xml
new file mode 100644
index 0000000..da694c4
--- /dev/null
+++ b/.idea/misc.xml
@@ -0,0 +1,7 @@
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/modules.xml b/.idea/modules.xml
new file mode 100644
index 0000000..2d7ac4e
--- /dev/null
+++ b/.idea/modules.xml
@@ -0,0 +1,8 @@
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/ruff.xml b/.idea/ruff.xml
new file mode 100644
index 0000000..916a850
--- /dev/null
+++ b/.idea/ruff.xml
@@ -0,0 +1,6 @@
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/vcs.xml b/.idea/vcs.xml
new file mode 100644
index 0000000..35eb1dd
--- /dev/null
+++ b/.idea/vcs.xml
@@ -0,0 +1,6 @@
+
+
+
+
+
+
\ No newline at end of file
diff --git a/drishti/handlers/handle_darshan.py b/drishti/handlers/handle_darshan.py
index d47fbea..c1eb2b2 100644
--- a/drishti/handlers/handle_darshan.py
+++ b/drishti/handlers/handle_darshan.py
@@ -1,18 +1,33 @@
#!/usr/bin/env python3
-
+import abc
+import csv
+import dataclasses
+import datetime
import io
-import sys
-import time
+import os
import shlex
import shutil
import subprocess
-import pandas as pd
+import sys
+import time
+import typing
+from typing import Dict
+from dataclasses import dataclass, field
+from typing import List, Optional
+
import darshan
import darshan.backend.cffi_backend as darshanll
-
-from rich import print
+import pandas as pd
from packaging import version
-from drishti.includes.module import *
+from rich import print
+from rich.padding import Padding
+
+import drishti.includes.config as config
+import drishti.includes.module as module
+from drishti.includes.module import HIGH, RECOMMENDATIONS, WARN, Panel, insights_total
+
+# from includes.module import *
+from drishti.includes.parser import args
def is_available(name):
@@ -70,648 +85,1019 @@ def check_log_version(console, file, log_version, library_version):
return use_file
-def handler():
- console = init_console()
-
- insights_start_time = time.time()
-
- log = darshanll.log_open(args.log_path)
-
- modules = darshanll.log_get_modules(log)
-
- information = darshanll.log_get_job(log)
-
- if 'log_ver' in information:
- log_version = information['log_ver']
- else:
- log_version = information['metadata']['lib_ver']
- library_version = darshanll.get_lib_version()
-
- # Make sure log format is of the same version
- filename = args.log_path
- # check_log_version(console, args.log_path, log_version, library_version)
-
- darshanll.log_close(log)
-
- darshan.enable_experimental()
-
- report = darshan.DarshanReport(filename)
-
- job = report.metadata
-
- #########################################################################################################################################################################
-
- # Check usage of STDIO, POSIX, and MPI-IO per file
-
- if 'STDIO' in report.records:
- df_stdio = report.records['STDIO'].to_df()
-
- if df_stdio:
- total_write_size_stdio = df_stdio['counters']['STDIO_BYTES_WRITTEN'].sum()
- total_read_size_stdio = df_stdio['counters']['STDIO_BYTES_READ'].sum()
-
- total_size_stdio = total_write_size_stdio + total_read_size_stdio
- else:
- total_size_stdio = 0
- else:
- df_stdio = None
-
- total_size_stdio = 0
-
- if 'POSIX' in report.records:
- df_posix = report.records['POSIX'].to_df()
+@dataclass
+class TimestampPair:
+ start: datetime.date
+ end: datetime.date
+
+@dataclass
+class AbstractDarshanTrace(abc.ABC):
+ # Trace metadata
+ jobid: str
+ log_ver: str
+ time: TimestampPair
+ exe: str
+
+ # Report
+ modules: typing.Iterable[str]
+ name_records: Dict[str, str] = field(default_factory=dict)
+
+ max_read_offset: int = float('-inf')
+ max_write_offset: int = float('-inf')
+ ###
+
+ total_files_stdio: int = 0
+ total_files_posix: int = 0
+ total_files_mpiio: int = 0
+
+ files: Dict[str, Dict[str, int]] = None
+
+ total_reads: int = 0
+ total_writes: int = 0
+ total_operations: int = 0
+ total_read_size: int = 0
+ total_written_size: int = 0
+ total_posix_size: int = 0
+ total_reads_small: int = 0
+ total_writes_small: int = 0
+
+ ###
+ total_write_size_stdio: int = 0
+ total_write_size_stdio: int = 0
+ total_size_stdio: int = 0
+
+ total_write_size_posix: int = 0
+ total_read_size_posix: int = 0
+ total_size_posix: int = 0
+
+ total_write_size_mpiio: int = 0
+ total_read_size_mpiio: int = 0
+ total_size_mpiio: int = 0
+
+ total_size: int = 0
+ total_files: int = 0
+ ###
+
+ total_mem_not_aligned: int = 0
+ total_file_not_aligned: int = 0
+
+ read_consecutive: int = 0
+ read_sequential: int = 0
+ read_random: int = 0
+ write_consecutive: int = 0
+ write_sequential: int = 0
+ write_random: int = 0
+
+ total_shared_reads: int = 0
+ total_shared_reads_small: int = 0
+ total_shared_writes: int = 0
+ total_shared_writes_small: int = 0
+
+ count_long_metadata: int = 0
+
+ posix_shared_data_imbalance_stragglers_count: int = 0
+
+ # 2 functions (unsure ones)
+
+ has_hdf5_extension: bool = False # TODO: OR this
+
+ mpiio_nb_reads: int = 0
+ mpiio_nb_writes: int = 0
+
+ # TODO: Should be a list of CB nodes for agg
+ cb_nodes: Optional[int] = None
+ number_of_compute_nodes: int = 0
+ hints: List[str] = dataclasses.field(default_factory=list)
+
+ job_start: Optional[datetime.datetime] = None
+ job_end: Optional[datetime.datetime] = None
+
+ aggregated: pd.DataFrame = None
+
+ ## EXTRA from module being split
+ mpiio_coll_reads: int = 0
+ mpiio_indep_reads: int = 0
+ total_mpiio_read_operations: int = 0
+ detected_files_mpi_coll_reads: pd.DataFrame = None
+ mpiio_coll_writes: int = 0
+ mpiio_indep_writes: int = 0
+ total_mpiio_write_operations: int = 0
+ detected_files_mpiio_coll_writes: pd.DataFrame = None
+ imbalance_count_posix_shared_time: int = 0
+ posix_shared_time_imbalance_detected_files1: pd.DataFrame = None
+ posix_shared_time_imbalance_detected_files2: pd.DataFrame = None
+ posix_shared_time_imbalance_detected_files3: pd.DataFrame = None
+ posix_total_size: int = 0
+ posix_total_read_size: int = 0
+ posix_total_written_size: int = 0
+
+ def generate_dxt_posix_rw_df(self) -> None:
+ if not args.backtrace:
+ return
+ if not self.dxt_posix:
+ return
+ if "address_line_mapping" not in self.dxt_posix:
+ args.backtrace = False
+ return
+
+ read_id = []
+ read_rank = []
+ read_length = []
+ read_offsets = []
+ read_end_time = []
+ read_start_time = []
+ read_operation = []
+
+ write_id = []
+ write_rank = []
+ write_length = []
+ write_offsets = []
+ write_end_time = []
+ write_start_time = []
+ write_operation = []
+
+ for r in zip(self.dxt_posix['rank'], self.dxt_posix['read_segments'], self.dxt_posix['write_segments'],
+ self.dxt_posix['id']):
+ if not r[1].empty:
+ read_id.append([r[3]] * len((r[1]["length"].to_list())))
+ read_rank.append([r[0]] * len((r[1]["length"].to_list())))
+ read_length.append(r[1]["length"].to_list())
+ read_end_time.append(r[1]["end_time"].to_list())
+ read_start_time.append(r[1]["start_time"].to_list())
+ read_operation.append(["read"] * len((r[1]["length"].to_list())))
+ read_offsets.append(r[1]["offset"].to_list())
+
+ if not r[2].empty:
+ write_id.append([r[3]] * len((r[2]['length'].to_list())))
+ write_rank.append([r[0]] * len((r[2]['length'].to_list())))
+ write_length.append(r[2]['length'].to_list())
+ write_end_time.append(r[2]['end_time'].to_list())
+ write_start_time.append(r[2]['start_time'].to_list())
+ write_operation.append(['write'] * len((r[2]['length'].to_list())))
+ write_offsets.append(r[2]['offset'].to_list())
+
+ read_id = [element for nestedlist in read_id for element in nestedlist]
+ read_rank = [element for nestedlist in read_rank for element in nestedlist]
+ read_length = [element for nestedlist in read_length for element in nestedlist]
+ read_offsets = [element for nestedlist in read_offsets for element in nestedlist]
+ read_end_time = [element for nestedlist in read_end_time for element in nestedlist]
+ read_operation = [element for nestedlist in read_operation for element in nestedlist]
+ read_start_time = [element for nestedlist in read_start_time for element in nestedlist]
+
+ write_id = [element for nestedlist in write_id for element in nestedlist]
+ write_rank = [element for nestedlist in write_rank for element in nestedlist]
+ write_length = [element for nestedlist in write_length for element in nestedlist]
+ write_offsets = [element for nestedlist in write_offsets for element in nestedlist]
+ write_end_time = [element for nestedlist in write_end_time for element in nestedlist]
+ write_operation = [element for nestedlist in write_operation for element in nestedlist]
+ write_start_time = [element for nestedlist in write_start_time for element in nestedlist]
+
+ self.dxt_posix_read_data = pd.DataFrame(
+ {
+ "id": read_id,
+ "rank": read_rank,
+ "length": read_length,
+ "end_time": read_end_time,
+ "start_time": read_start_time,
+ "operation": read_operation,
+ "offsets": read_offsets,
+ }
+ )
- if df_posix:
- total_write_size_posix = df_posix['counters']['POSIX_BYTES_WRITTEN'].sum()
- total_read_size_posix = df_posix['counters']['POSIX_BYTES_READ'].sum()
+ self.dxt_posix_write_data = pd.DataFrame(
+ {
+ "id": write_id,
+ "rank": write_rank,
+ "length": write_length,
+ "end_time": write_end_time,
+ "start_time": write_start_time,
+ "operation": write_operation,
+ "offsets": write_offsets,
+ }
+ )
- total_size_posix = total_write_size_posix + total_read_size_posix
- else:
- total_size_posix = 0
- else:
- df_posix = None
+ def calculate_insights(self) -> None:
+ self.total_write_size_stdio = self.stdio_df['counters']['STDIO_BYTES_WRITTEN'].sum() if self.stdio_df else 0
+ self.total_read_size_stdio = self.stdio_df['counters']['STDIO_BYTES_READ'].sum() if self.stdio_df else 0
+ self.total_size_stdio = self.total_write_size_stdio + self.total_read_size_stdio
- total_size_posix = 0
+ self.total_write_size_posix = self.posix_df['counters']['POSIX_BYTES_WRITTEN'].sum() if self.posix_df else 0
+ self.total_read_size_posix = self.posix_df['counters']['POSIX_BYTES_READ'].sum() if self.posix_df else 0
+ self.total_size_posix = self.total_write_size_posix + self.total_read_size_posix
- if 'MPI-IO' in report.records:
- df_mpiio = report.records['MPI-IO'].to_df()
+ self.total_write_size_mpiio = self.mpiio_df['counters']['MPIIO_BYTES_WRITTEN'].sum() if self.mpiio_df else 0
+ self.total_read_size_mpiio = self.mpiio_df['counters']['MPIIO_BYTES_READ'].sum() if self.mpiio_df else 0
+ self.total_size_mpiio = self.total_write_size_mpiio + self.total_read_size_mpiio
- if df_mpiio:
- total_write_size_mpiio = df_mpiio['counters']['MPIIO_BYTES_WRITTEN'].sum()
- total_read_size_mpiio = df_mpiio['counters']['MPIIO_BYTES_READ'].sum()
+ # POSIX will capture POSIX-only and MPI-IO
+ if self.total_size_posix > 0 and self.total_size_posix >= self.total_size_mpiio:
+ self.total_size_posix -= self.total_size_mpiio
- total_size_mpiio = total_write_size_mpiio + total_read_size_mpiio
- else:
- total_size_mpiio = 0
- else:
- df_mpiio = None
-
- total_size_mpiio = 0
-
- dxt_posix = None
- dxt_posix_read_data = None
- dxt_posix_write_data = None
- dxt_mpiio = None
-
- df_lustre = None
- if "LUSTRE" in report.records:
- df_lustre = report.records['LUSTRE'].to_df()
-
- if args.backtrace:
- if "DXT_POSIX" in report.records:
- dxt_posix = report.records["DXT_POSIX"].to_df()
- dxt_posix = pd.DataFrame(dxt_posix)
- if "address_line_mapping" not in dxt_posix:
- args.backtrace = False
- else:
- read_id = []
- read_rank = []
- read_length = []
- read_offsets = []
- read_end_time = []
- read_start_time = []
- read_operation = []
-
- write_id = []
- write_rank = []
- write_length = []
- write_offsets = []
- write_end_time = []
- write_start_time = []
- write_operation = []
-
- for r in zip(dxt_posix['rank'], dxt_posix['read_segments'], dxt_posix['write_segments'], dxt_posix['id']):
- if not r[1].empty:
- read_id.append([r[3]] * len((r[1]['length'].to_list())))
- read_rank.append([r[0]] * len((r[1]['length'].to_list())))
- read_length.append(r[1]['length'].to_list())
- read_end_time.append(r[1]['end_time'].to_list())
- read_start_time.append(r[1]['start_time'].to_list())
- read_operation.append(['read'] * len((r[1]['length'].to_list())))
- read_offsets.append(r[1]['offset'].to_list())
-
- if not r[2].empty:
- write_id.append([r[3]] * len((r[2]['length'].to_list())))
- write_rank.append([r[0]] * len((r[2]['length'].to_list())))
- write_length.append(r[2]['length'].to_list())
- write_end_time.append(r[2]['end_time'].to_list())
- write_start_time.append(r[2]['start_time'].to_list())
- write_operation.append(['write'] * len((r[2]['length'].to_list())))
- write_offsets.append(r[2]['offset'].to_list())
-
- read_id = [element for nestedlist in read_id for element in nestedlist]
- read_rank = [element for nestedlist in read_rank for element in nestedlist]
- read_length = [element for nestedlist in read_length for element in nestedlist]
- read_offsets = [element for nestedlist in read_offsets for element in nestedlist]
- read_end_time = [element for nestedlist in read_end_time for element in nestedlist]
- read_operation = [element for nestedlist in read_operation for element in nestedlist]
- read_start_time = [element for nestedlist in read_start_time for element in nestedlist]
-
- write_id = [element for nestedlist in write_id for element in nestedlist]
- write_rank = [element for nestedlist in write_rank for element in nestedlist]
- write_length = [element for nestedlist in write_length for element in nestedlist]
- write_offsets = [element for nestedlist in write_offsets for element in nestedlist]
- write_end_time = [element for nestedlist in write_end_time for element in nestedlist]
- write_operation = [element for nestedlist in write_operation for element in nestedlist]
- write_start_time = [element for nestedlist in write_start_time for element in nestedlist]
-
- dxt_posix_read_data = pd.DataFrame(
- {
- 'id': read_id,
- 'rank': read_rank,
- 'length': read_length,
- 'end_time': read_end_time,
- 'start_time': read_start_time,
- 'operation': read_operation,
- 'offsets': read_offsets,
- })
-
- dxt_posix_write_data = pd.DataFrame(
- {
- 'id': write_id,
- 'rank': write_rank,
- 'length': write_length,
- 'end_time': write_end_time,
- 'start_time': write_start_time,
- 'operation': write_operation,
- 'offsets': write_offsets,
- })
-
- if "DXT_MPIIO" in report.records:
- dxt_mpiio = report.records["DXT_MPIIO"].to_df()
- dxt_mpiio = pd.DataFrame(dxt_mpiio)
-
-
- # Since POSIX will capture both POSIX-only accesses and those comming from MPI-IO, we can subtract those
- if total_size_posix > 0 and total_size_posix >= total_size_mpiio:
- total_size_posix -= total_size_mpiio
-
- total_size = total_size_stdio + total_size_posix + total_size_mpiio
-
- assert(total_size_stdio >= 0)
- assert(total_size_posix >= 0)
- assert(total_size_mpiio >= 0)
-
- files = {}
-
- # Check interface usage for each file
- file_map = report.name_records
-
- total_files = len(file_map)
-
- total_files_stdio = 0
- total_files_posix = 0
- total_files_mpiio = 0
-
- for id, path in file_map.items():
- if df_stdio:
- uses_stdio = len(df_stdio['counters'][(df_stdio['counters']['id'] == id)]) > 0
- else:
- uses_stdio = 0
-
- if df_posix:
- uses_posix = len(df_posix['counters'][(df_posix['counters']['id'] == id)]) > 0
- else:
- uses_posix = 0
+ self.total_posix_size = self.total_size_stdio + self.total_size_posix + self.total_size_mpiio
- if df_mpiio:
- uses_mpiio = len(df_mpiio['counters'][(df_mpiio['counters']['id'] == id)]) > 0
- else:
- uses_mpiio = 0
+ assert (self.total_size_stdio >= 0)
+ assert (self.total_size_posix >= 0)
+ assert (self.total_size_mpiio >= 0)
- total_files_stdio += uses_stdio
- total_files_posix += uses_posix
- total_files_mpiio += uses_mpiio
+ def files_stuff(self) -> None:
+ self.report.name_records = self.report.name_records
- files[id] = {
- 'path': path,
- 'stdio': uses_stdio,
- 'posix': uses_posix,
- 'mpiio': uses_mpiio
- }
+ self.total_files = len(self.report.name_records)
- check_stdio(total_size, total_size_stdio)
- check_mpiio(modules)
+ # files = dict()
- #########################################################################################################################################################################
+ for id, path in self.report.name_records.items():
+ uses_stdio = len(
+ self.stdio_df['counters'][self.stdio_df['counters']['id'] == id]) > 0 if self.stdio_df else 0
+ uses_posix = len(
+ self.posix_df['counters'][self.posix_df['counters']['id'] == id]) > 0 if self.posix_df else 0
+ uses_mpiio = len(
+ self.mpiio_df['counters'][self.mpiio_df['counters']['id'] == id]) > 0 if self.mpiio_df else 0
- if 'POSIX' in report.records:
- df = report.records['POSIX'].to_df()
+ self.total_files_stdio += uses_stdio
+ self.total_files_posix += uses_posix
+ self.total_files_mpiio += uses_mpiio
- #########################################################################################################################################################################
+ self.files[id] = {
+ 'path': path,
+ 'stdio': uses_stdio,
+ 'posix': uses_posix,
+ 'mpiio': uses_mpiio
+ }
- # Get number of write/read operations
- total_reads = df['counters']['POSIX_READS'].sum()
- total_writes = df['counters']['POSIX_WRITES'].sum()
- # Get total number of I/O operations
- total_operations = total_writes + total_reads
+ def generate_insights(self):
+ # TODO: Check if module exists. Replicate from each function which calculates insights.
+ self._check_stdio()
+ self._check_mpiio()
+ self._do_something()
+ self._small_operation_insight()
- # To check whether the application is write-intersive or read-intensive we only look at the POSIX level and check if the difference between reads and writes is larger than 10% (for more or less), otherwise we assume a balance
- check_operation_intensive(total_operations, total_reads, total_writes)
- total_read_size = df['counters']['POSIX_BYTES_READ'].sum()
- total_written_size = df['counters']['POSIX_BYTES_WRITTEN'].sum()
- total_size = total_written_size + total_read_size
+ def _check_stdio(self) -> None:
+ module.check_stdio(self.total_posix_size, self.total_size_stdio)
- check_size_intensive(total_size, total_read_size, total_written_size)
+ def _check_mpiio(self) -> None:
+ module.check_mpiio(self.modules)
- #########################################################################################################################################################################
+ def _do_something(self):
+ module.check_operation_intensive(self.total_operations, self.total_reads, self.total_writes)
+ module.check_size_intensive(self.posix_total_size, self.posix_total_read_size, self.posix_total_written_size)
- # Get the number of small I/O operations (less than 1 MB)
- total_reads_small = (
- df['counters']['POSIX_SIZE_READ_0_100'].sum() +
- df['counters']['POSIX_SIZE_READ_100_1K'].sum() +
- df['counters']['POSIX_SIZE_READ_1K_10K'].sum() +
- df['counters']['POSIX_SIZE_READ_10K_100K'].sum() +
- df['counters']['POSIX_SIZE_READ_100K_1M'].sum()
+ # TODO: for trace in traces
+ for trace in self.traces:
+ pass
+ # module.check_misaligned(self.total_operations, self.total_mem_not_aligned, self.total_file_not_aligned,
+ # self.modules, self.name_records, self.lustre_df, self.dxt_posix,
+ # self.dxt_posix_read_data) # posix alignment
+
+ # module.check_traffic(self.max_read_offset, self.total_read_size, self.max_write_offset, self.total_written_size,
+ # self.dxt_posix, self.dxt_posix_read_data, self.dxt_posix_write_data) # redundant reads
+
+ # module.check_random_operation(self.read_consecutive, self.read_sequential, self.read_random, self.total_reads,
+ # self.write_consecutive, self.write_sequential, self.write_random,
+ # self.total_writes, self.dxt_posix,
+ # self.dxt_posix_read_data, self.dxt_posix_write_data) # random check
+
+ # module.check_shared_small_operation(self.total_shared_reads, self.total_shared_reads_small,
+ # self.total_shared_writes,
+ # self.total_shared_writes_small, self.shared_files, self.report.name_records)
+
+ module.check_long_metadata(self.count_long_metadata, self.modules)
+
+ # module.check_shared_data_imblance(self.posix_shared_data_imbalance_stragglers_count,
+ # self.posix_data_straggler_files,
+ # self.report.name_records, self.dxt_posix,
+ # self.dxt_posix_read_data,
+ # self.dxt_posix_write_data)
+
+ # module.check_shared_time_imbalance(self.posix_stragglers_shared_file_time_imbalance_count,
+ # self.posix_shared_time_imbalance_detected_files1, self.report.name_records)
+
+ # module.check_individual_write_imbalance(self.posix_data_imbalance_count,
+ # self.posix_shared_time_imbalance_detected_files2,
+ # self.report.name_records, self.dxt_posix, self.dxt_posix_write_data)
+
+ # module.check_mpi_collective_read_operation(self.mpiio_coll_reads, self.mpiio_indep_reads,
+ # self.total_mpiio_read_operations,
+ # self.detected_files_mpi_coll_reads, self.report.name_records,
+ # self.dxt_mpiio)
+
+ # module.check_mpi_collective_write_operation(self.mpiio_coll_writes, self.mpiio_indep_writes,
+ # self.total_mpiio_write_operations,
+ # self.detected_files_mpiio_coll_writes, self.report.name_records, self.dxt_mpiio)
+ #
+ # module.check_individual_read_imbalance(self.imbalance_count_posix_shared_time,
+ # self.posix_shared_time_imbalance_detected_files3,
+ # self.report.name_records, self.dxt_posix, self.dxt_posix_read_data)
+
+ module.check_mpi_none_block_operation(self.mpiio_nb_reads, self.mpiio_nb_writes, self.has_hdf5_extension,
+ self.modules)
+
+
+
+ def _small_operation_insight(self):
+ pass
+ # module.check_small_operation(self.total_reads, self.total_reads_small, self.total_writes,
+ # self.total_writes_small,
+ # self.small_operation_detected_files,
+ # self.modules, self.report.name_records, self.dxt_posix, self.dxt_posix_read_data,
+ # self.dxt_posix_write_data)
+
+
+
+ def something(self) -> None:
+ if not self.posix_df:
+ return
+
+ self.total_reads = self.posix_df['counters']['POSIX_READS'].sum()
+ self.total_writes = self.posix_df['counters']['POSIX_WRITES'].sum()
+ self.total_operations = self.total_writes + self.total_reads
+ # ----------------------------------------------------------------------------------------------------------------------
+ # module.check_operation_intensive(self.total_operations, self.total_reads, self.total_writes)
+
+ self.posix_total_read_size = self.posix_df['counters']['POSIX_BYTES_READ'].sum()
+ self.posix_total_written_size = self.posix_df['counters']['POSIX_BYTES_WRITTEN'].sum()
+ self.posix_total_size = self.posix_total_written_size + self.posix_total_read_size
+
+ # module.check_size_intensive(self.posix_total_size, self.posix_total_read_size, self.posix_total_written_size)
+ # -----
+ self.total_reads_small = (
+ self.posix_df['counters']['POSIX_SIZE_READ_0_100'].sum() +
+ self.posix_df['counters']['POSIX_SIZE_READ_100_1K'].sum() +
+ self.posix_df['counters']['POSIX_SIZE_READ_1K_10K'].sum() +
+ self.posix_df['counters']['POSIX_SIZE_READ_10K_100K'].sum() +
+ self.posix_df['counters']['POSIX_SIZE_READ_100K_1M'].sum()
)
-
- total_writes_small = (
- df['counters']['POSIX_SIZE_WRITE_0_100'].sum() +
- df['counters']['POSIX_SIZE_WRITE_100_1K'].sum() +
- df['counters']['POSIX_SIZE_WRITE_1K_10K'].sum() +
- df['counters']['POSIX_SIZE_WRITE_10K_100K'].sum() +
- df['counters']['POSIX_SIZE_WRITE_100K_1M'].sum()
+ self.total_writes_small = (
+ self.posix_df['counters']['POSIX_SIZE_WRITE_0_100'].sum() +
+ self.posix_df['counters']['POSIX_SIZE_WRITE_100_1K'].sum() +
+ self.posix_df['counters']['POSIX_SIZE_WRITE_1K_10K'].sum() +
+ self.posix_df['counters']['POSIX_SIZE_WRITE_10K_100K'].sum() +
+ self.posix_df['counters']['POSIX_SIZE_WRITE_100K_1M'].sum()
)
- # Get the files responsible for more than half of these accesses
+ def small_operation_calculation(self):
+ if not self.posix_df:
+ return
+
files = []
- df['counters']['INSIGHTS_POSIX_SMALL_READ'] = (
- df['counters']['POSIX_SIZE_READ_0_100'] +
- df['counters']['POSIX_SIZE_READ_100_1K'] +
- df['counters']['POSIX_SIZE_READ_1K_10K'] +
- df['counters']['POSIX_SIZE_READ_10K_100K'] +
- df['counters']['POSIX_SIZE_READ_100K_1M']
+ self.posix_df['counters']['INSIGHTS_POSIX_SMALL_READ'] = (
+ self.posix_df['counters']['POSIX_SIZE_READ_0_100'] +
+ self.posix_df['counters']['POSIX_SIZE_READ_100_1K'] +
+ self.posix_df['counters']['POSIX_SIZE_READ_1K_10K'] +
+ self.posix_df['counters']['POSIX_SIZE_READ_10K_100K'] +
+ self.posix_df['counters']['POSIX_SIZE_READ_100K_1M']
)
- df['counters']['INSIGHTS_POSIX_SMALL_WRITE'] = (
- df['counters']['POSIX_SIZE_WRITE_0_100'] +
- df['counters']['POSIX_SIZE_WRITE_100_1K'] +
- df['counters']['POSIX_SIZE_WRITE_1K_10K'] +
- df['counters']['POSIX_SIZE_WRITE_10K_100K'] +
- df['counters']['POSIX_SIZE_WRITE_100K_1M']
+ self.posix_df['counters']['INSIGHTS_POSIX_SMALL_WRITE'] = (
+ self.posix_df['counters']['POSIX_SIZE_WRITE_0_100'] +
+ self.posix_df['counters']['POSIX_SIZE_WRITE_100_1K'] +
+ self.posix_df['counters']['POSIX_SIZE_WRITE_1K_10K'] +
+ self.posix_df['counters']['POSIX_SIZE_WRITE_10K_100K'] +
+ self.posix_df['counters']['POSIX_SIZE_WRITE_100K_1M']
)
- detected_files = pd.DataFrame(df['counters'].groupby('id')[['INSIGHTS_POSIX_SMALL_READ', 'INSIGHTS_POSIX_SMALL_WRITE']].sum()).reset_index()
- detected_files.columns = ['id', 'total_reads', 'total_writes']
- detected_files.loc[:, 'id'] = detected_files.loc[:, 'id'].astype(str)
-
- check_small_operation(total_reads, total_reads_small, total_writes, total_writes_small, detected_files, modules, file_map, dxt_posix, dxt_posix_read_data, dxt_posix_write_data)
-
- #########################################################################################################################################################################
-
- # How many requests are misaligned?
-
- total_mem_not_aligned = df['counters']['POSIX_MEM_NOT_ALIGNED'].sum()
- total_file_not_aligned = df['counters']['POSIX_FILE_NOT_ALIGNED'].sum()
+ self.small_operation_detected_files = pd.DataFrame(self.posix_df['counters'].groupby('id')[['INSIGHTS_POSIX_SMALL_READ',
+ 'INSIGHTS_POSIX_SMALL_WRITE']].sum()).reset_index()
+ self.small_operation_detected_files.columns = ['id', 'total_reads',
+ 'total_writes'] # !: Rename later. total_small_reads, total_small_writes
+ self.small_operation_detected_files.loc[:, 'id'] = self.small_operation_detected_files.loc[:, 'id'].astype(str)
- check_misaligned(total_operations, total_mem_not_aligned, total_file_not_aligned, modules, file_map, df_lustre, dxt_posix, dxt_posix_read_data)
+ self.report.name_records = self.report.name_records
+ # module.check_small_operation(self.total_reads, self.total_reads_small, self.total_writes,
+ # self.total_writes_small,
+ # self.small_operation_detected_files,
+ # self.modules, self.report.name_records, self.dxt_posix, self.dxt_posix_read_data,
+ # self.dxt_posix_write_data)
- #########################################################################################################################################################################
+ def posix_alignment(self):
+ if not self.posix_df:
+ return
- # Redundant read-traffic (based on Phill)
- # POSIX_MAX_BYTE_READ (Highest offset in the file that was read)
- max_read_offset = df['counters']['POSIX_MAX_BYTE_READ'].max()
- max_write_offset = df['counters']['POSIX_MAX_BYTE_WRITTEN'].max()
+ self.total_mem_not_aligned = self.posix_df['counters']['POSIX_MEM_NOT_ALIGNED'].sum()
+ self.total_file_not_aligned = self.posix_df['counters']['POSIX_FILE_NOT_ALIGNED'].sum()
- check_traffic(max_read_offset, total_read_size, max_write_offset, total_written_size, dxt_posix, dxt_posix_read_data, dxt_posix_write_data)
+ self.report.name_records = self.report.name_records
+ # module.check_misaligned(self.total_operations, self.total_mem_not_aligned, self.total_file_not_aligned,
+ # self.modules, self.report.name_records, self.lustre_df, self.dxt_posix,
+ # self.dxt_posix_read_data)
- #########################################################################################################################################################################
+ def posix_redundant_reads(self):
+ if not self.posix_df:
+ return
- # Check for a lot of random operations
+ self.max_read_offset = self.posix_df['counters']['POSIX_MAX_BYTE_READ'].max()
+ self.max_write_offset = self.posix_df['counters']['POSIX_MAX_BYTE_WRITTEN'].max()
- read_consecutive = df['counters']['POSIX_CONSEC_READS'].sum()
- #print('READ Consecutive: {} ({:.2f}%)'.format(read_consecutive, read_consecutive / total_reads * 100))
+ # module.check_traffic(self.max_read_offset, self.total_read_size, self.max_write_offset, self.total_written_size,
+ # self.dxt_posix, self.dxt_posix_read_data, self.dxt_posix_write_data)
- read_sequential = df['counters']['POSIX_SEQ_READS'].sum()
- read_sequential -= read_consecutive
- #print('READ Sequential: {} ({:.2f}%)'.format(read_sequential, read_sequential / total_reads * 100))
+ def posix_random_check(self):
+ if not self.posix_df:
+ return
- read_random = total_reads - read_consecutive - read_sequential
- #print('READ Random: {} ({:.2f}%)'.format(read_random, read_random / total_reads * 100))
+ self.read_consecutive = self.posix_df['counters']['POSIX_CONSEC_READS'].sum()
+ self.read_sequential = self.posix_df['counters']['POSIX_SEQ_READS'].sum()
+ self.read_sequential -= self.read_consecutive
- write_consecutive = df['counters']['POSIX_CONSEC_WRITES'].sum()
+ self.read_random = self.total_reads - self.read_consecutive - self.read_sequential
- write_sequential = df['counters']['POSIX_SEQ_WRITES'].sum()
- write_sequential -= write_consecutive
+ self.write_consecutive = self.posix_df['counters']['POSIX_CONSEC_WRITES'].sum()
- write_random = total_writes - write_consecutive - write_sequential
- #print('WRITE Random: {} ({:.2f}%)'.format(write_random, write_random / total_writes * 100))
+ self.write_sequential = self.posix_df['counters']['POSIX_SEQ_WRITES'].sum()
+ self.write_sequential -= self.write_consecutive
- check_random_operation(read_consecutive, read_sequential, read_random, total_reads, write_consecutive, write_sequential, write_random, total_writes, dxt_posix, dxt_posix_read_data, dxt_posix_write_data)
+ self.write_random = self.total_writes - self.write_consecutive - self.write_sequential
- #########################################################################################################################################################################
+ # module.check_random_operation(self.read_consecutive, self.read_sequential, self.read_random, self.total_reads,
+ # self.write_consecutive, self.write_sequential, self.write_random,
+ # self.total_writes, self.dxt_posix,
+ # self.dxt_posix_read_data, self.dxt_posix_write_data)
- # Shared file with small operations
+ def posix_shared_file(self):
+ if not self.posix_df:
+ return
- shared_files = df['counters'].loc[(df['counters']['rank'] == -1)]
+ self.shared_files = self.posix_df['counters'].loc[(self.posix_df['counters']['rank'] == -1)]
- shared_files = shared_files.assign(id=lambda d: d['id'].astype(str))
+ self.shared_files = self.shared_files.assign(id=lambda d: d['id'].astype(str))
- if not shared_files.empty:
- total_shared_reads = shared_files['POSIX_READS'].sum()
- total_shared_reads_small = (
- shared_files['POSIX_SIZE_READ_0_100'].sum() +
- shared_files['POSIX_SIZE_READ_100_1K'].sum() +
- shared_files['POSIX_SIZE_READ_1K_10K'].sum() +
- shared_files['POSIX_SIZE_READ_10K_100K'].sum() +
- shared_files['POSIX_SIZE_READ_100K_1M'].sum()
+ if not self.shared_files.empty:
+ self.total_shared_reads = self.shared_files['POSIX_READS'].sum()
+ self.total_shared_reads_small = (
+ self.shared_files['POSIX_SIZE_READ_0_100'].sum() +
+ self.shared_files['POSIX_SIZE_READ_100_1K'].sum() +
+ self.shared_files['POSIX_SIZE_READ_1K_10K'].sum() +
+ self.shared_files['POSIX_SIZE_READ_10K_100K'].sum() +
+ self.shared_files['POSIX_SIZE_READ_100K_1M'].sum()
)
- shared_files['INSIGHTS_POSIX_SMALL_READS'] = (
- shared_files['POSIX_SIZE_READ_0_100'] +
- shared_files['POSIX_SIZE_READ_100_1K'] +
- shared_files['POSIX_SIZE_READ_1K_10K'] +
- shared_files['POSIX_SIZE_READ_10K_100K'] +
- shared_files['POSIX_SIZE_READ_100K_1M']
+ self.shared_files['INSIGHTS_POSIX_SMALL_READS'] = (
+ self.shared_files['POSIX_SIZE_READ_0_100'] +
+ self.shared_files['POSIX_SIZE_READ_100_1K'] +
+ self.shared_files['POSIX_SIZE_READ_1K_10K'] +
+ self.shared_files['POSIX_SIZE_READ_10K_100K'] +
+ self.shared_files['POSIX_SIZE_READ_100K_1M']
)
-
- total_shared_writes = shared_files['POSIX_WRITES'].sum()
- total_shared_writes_small = (
- shared_files['POSIX_SIZE_WRITE_0_100'].sum() +
- shared_files['POSIX_SIZE_WRITE_100_1K'].sum() +
- shared_files['POSIX_SIZE_WRITE_1K_10K'].sum() +
- shared_files['POSIX_SIZE_WRITE_10K_100K'].sum() +
- shared_files['POSIX_SIZE_WRITE_100K_1M'].sum()
+ self.total_shared_writes = self.shared_files['POSIX_WRITES'].sum()
+ self.total_shared_writes_small = (
+ self.shared_files['POSIX_SIZE_WRITE_0_100'].sum() +
+ self.shared_files['POSIX_SIZE_WRITE_100_1K'].sum() +
+ self.shared_files['POSIX_SIZE_WRITE_1K_10K'].sum() +
+ self.shared_files['POSIX_SIZE_WRITE_10K_100K'].sum() +
+ self.shared_files['POSIX_SIZE_WRITE_100K_1M'].sum()
)
- shared_files['INSIGHTS_POSIX_SMALL_WRITES'] = (
- shared_files['POSIX_SIZE_WRITE_0_100'] +
- shared_files['POSIX_SIZE_WRITE_100_1K'] +
- shared_files['POSIX_SIZE_WRITE_1K_10K'] +
- shared_files['POSIX_SIZE_WRITE_10K_100K'] +
- shared_files['POSIX_SIZE_WRITE_100K_1M']
+ self.shared_files['INSIGHTS_POSIX_SMALL_WRITES'] = (
+ self.shared_files['POSIX_SIZE_WRITE_0_100'] +
+ self.shared_files['POSIX_SIZE_WRITE_100_1K'] +
+ self.shared_files['POSIX_SIZE_WRITE_1K_10K'] +
+ self.shared_files['POSIX_SIZE_WRITE_10K_100K'] +
+ self.shared_files['POSIX_SIZE_WRITE_100K_1M']
)
- check_shared_small_operation(total_shared_reads, total_shared_reads_small, total_shared_writes, total_shared_writes_small, shared_files, file_map)
+ self.report.name_records = self.report.name_records
+ # module.check_shared_small_operation(self.total_shared_reads, self.total_shared_reads_small,
+ # self.total_shared_writes,
+ # self.total_shared_writes_small, self.shared_files, self.report.name_records)
- #########################################################################################################################################################################
+ def posix_long_metadata(self):
+ if not self.posix_df:
+ return
- count_long_metadata = len(df['fcounters'][(df['fcounters']['POSIX_F_META_TIME'] > thresholds['metadata_time_rank'][0])])
+ self.count_long_metadata = len(
+ self.posix_df['fcounters'][
+ (self.posix_df['fcounters']['POSIX_F_META_TIME'] > config.thresholds['metadata_time_rank'][0])])
- check_long_metadata(count_long_metadata, modules)
+ # module.check_long_metadata(self.count_long_metadata, self.modules)
+ def posix_stragglers(self):
+ if not self.posix_df:
+ return
# We already have a single line for each shared-file access
- # To check for stragglers, we can check the difference between the
+ # To check for stragglers, we can check the difference between the
# POSIX_FASTEST_RANK_BYTES
# POSIX_SLOWEST_RANK_BYTES
# POSIX_F_VARIANCE_RANK_BYTES
- stragglers_count = 0
-
- shared_files = shared_files.assign(id=lambda d: d['id'].astype(str))
+ self.shared_files = self.shared_files.assign(id=lambda d: d['id'].astype(str))
- # Get the files responsible
- detected_files = []
+ self.posix_data_straggler_files = []
- for index, row in shared_files.iterrows():
+ for index, row in self.shared_files.iterrows():
total_transfer_size = row['POSIX_BYTES_WRITTEN'] + row['POSIX_BYTES_READ']
- if total_transfer_size and abs(row['POSIX_SLOWEST_RANK_BYTES'] - row['POSIX_FASTEST_RANK_BYTES']) / total_transfer_size > thresholds['imbalance_stragglers'][0]:
- stragglers_count += 1
+ if total_transfer_size and abs(
+ row['POSIX_SLOWEST_RANK_BYTES'] - row['POSIX_FASTEST_RANK_BYTES']) / total_transfer_size > \
+ module.thresholds['imbalance_stragglers'][0]:
+ self.posix_shared_data_imbalance_stragglers_count += 1
- detected_files.append([
- row['id'], abs(row['POSIX_SLOWEST_RANK_BYTES'] - row['POSIX_FASTEST_RANK_BYTES']) / total_transfer_size * 100
+ self.posix_data_straggler_files.append([
+ row['id'],
+ abs(row['POSIX_SLOWEST_RANK_BYTES'] - row['POSIX_FASTEST_RANK_BYTES']) / total_transfer_size * 100
])
column_names = ['id', 'data_imbalance']
- detected_files = pd.DataFrame(detected_files, columns=column_names)
- check_shared_data_imblance(stragglers_count, detected_files, file_map, dxt_posix, dxt_posix_read_data, dxt_posix_write_data)
+ self.posix_data_straggler_files = pd.DataFrame(self.posix_data_straggler_files, columns=column_names)
+
+ self.report.name_records = self.report.name_records
+ # module.check_shared_data_imblance(self.posix_shared_data_imbalance_stragglers_count, self.posix_data_straggler_files,
+ # self.report.name_records, self.dxt_posix,
+ # self.dxt_posix_read_data,
+ # self.dxt_posix_write_data)
# POSIX_F_FASTEST_RANK_TIME
# POSIX_F_SLOWEST_RANK_TIME
# POSIX_F_VARIANCE_RANK_TIME
- shared_files_times = df['fcounters'].loc[(df['fcounters']['rank'] == -1)]
+ #################################################################################################################
+
+ def posix_stragglers2(self):
+ if not self.posix_df:
+ return
# Get the files responsible
- detected_files = []
+ shared_files_times = self.posix_df['fcounters'].loc[(self.posix_df['fcounters']['rank'] == -1)]
+
+ self.posix_shared_time_imbalance_detected_files1 = []
- stragglers_count = 0
- stragglers_imbalance = {}
+ self.posix_stragglers_shared_file_time_imbalance_count = 0
+ # posix_stragglers_shared_file_time_imbalance = {} # UNUSED?
shared_files_times = shared_files_times.assign(id=lambda d: d['id'].astype(str))
for index, row in shared_files_times.iterrows():
total_transfer_time = row['POSIX_F_WRITE_TIME'] + row['POSIX_F_READ_TIME'] + row['POSIX_F_META_TIME']
- if total_transfer_time and abs(row['POSIX_F_SLOWEST_RANK_TIME'] - row['POSIX_F_FASTEST_RANK_TIME']) / total_transfer_time > thresholds['imbalance_stragglers'][0]:
- stragglers_count += 1
+ if total_transfer_time and abs(
+ row['POSIX_F_SLOWEST_RANK_TIME'] - row['POSIX_F_FASTEST_RANK_TIME']) / total_transfer_time > \
+ config.thresholds['imbalance_stragglers'][0]:
+ self.posix_stragglers_shared_file_time_imbalance_count += 1
- detected_files.append([
- row['id'], abs(row['POSIX_F_SLOWEST_RANK_TIME'] - row['POSIX_F_FASTEST_RANK_TIME']) / total_transfer_time * 100
+ self.posix_shared_time_imbalance_detected_files1.append([
+ row['id'],
+ abs(row['POSIX_F_SLOWEST_RANK_TIME'] - row['POSIX_F_FASTEST_RANK_TIME']) / total_transfer_time * 100
])
column_names = ['id', 'time_imbalance']
- detected_files = pd.DataFrame(detected_files, columns=column_names)
- check_shared_time_imbalance(stragglers_count, detected_files, file_map)
+ self.posix_shared_time_imbalance_detected_files1 = pd.DataFrame(self.posix_shared_time_imbalance_detected_files1,
+ columns=column_names)
+ # module.check_shared_time_imbalance(self.posix_stragglers_shared_file_time_imbalance_count,
+ # self.posix_shared_time_imbalance_detected_files1, self.report.name_records)
+
+ def posix_imbalance(self):
+ if not self.posix_df:
+ return
- aggregated = df['counters'].loc[(df['counters']['rank'] != -1)][
+ aggregated = self.posix_df['counters'].loc[(self.posix_df['counters']['rank'] != -1)][
['rank', 'id', 'POSIX_BYTES_WRITTEN', 'POSIX_BYTES_READ']
].groupby('id', as_index=False).agg({
'rank': 'nunique',
'POSIX_BYTES_WRITTEN': ['sum', 'min', 'max'],
'POSIX_BYTES_READ': ['sum', 'min', 'max']
})
-
aggregated.columns = list(map('_'.join, aggregated.columns.values))
-
aggregated = aggregated.assign(id=lambda d: d['id_'].astype(str))
+ self.aggregated = aggregated
# Get the files responsible
- imbalance_count = 0
+ self.posix_data_imbalance_count = 0
- detected_files = []
+ self.posix_shared_time_imbalance_detected_files2 = []
- for index, row in aggregated.iterrows():
- if row['POSIX_BYTES_WRITTEN_max'] and abs(row['POSIX_BYTES_WRITTEN_max'] - row['POSIX_BYTES_WRITTEN_min']) / row['POSIX_BYTES_WRITTEN_max'] > thresholds['imbalance_size'][0]:
- imbalance_count += 1
+ for index, row in self.aggregated.iterrows():
+ if row['POSIX_BYTES_WRITTEN_max'] and abs(row['POSIX_BYTES_WRITTEN_max'] - row['POSIX_BYTES_WRITTEN_min']) / \
+ row['POSIX_BYTES_WRITTEN_max'] > config.thresholds['imbalance_size'][0]:
+ self.posix_data_imbalance_count += 1
- detected_files.append([
- row['id'], abs(row['POSIX_BYTES_WRITTEN_max'] - row['POSIX_BYTES_WRITTEN_min']) / row['POSIX_BYTES_WRITTEN_max'] * 100
+ self.posix_shared_time_imbalance_detected_files2.append([
+ row['id'], abs(row['POSIX_BYTES_WRITTEN_max'] - row['POSIX_BYTES_WRITTEN_min']) / row[
+ 'POSIX_BYTES_WRITTEN_max'] * 100
])
column_names = ['id', 'write_imbalance']
- detected_files = pd.DataFrame(detected_files, columns=column_names)
- check_individual_write_imbalance(imbalance_count, detected_files, file_map, dxt_posix, dxt_posix_write_data)
+ self.posix_shared_time_imbalance_detected_files2 = pd.DataFrame(self.posix_shared_time_imbalance_detected_files2,
+ columns=column_names)
+ # module.check_individual_write_imbalance(self.posix_data_imbalance_count, self.posix_shared_time_imbalance_detected_files2,
+ # self.report.name_records, self.dxt_posix, self.dxt_posix_write_data)
+
+ def mpiio_processing(self):
+ if not self.mpiio_df:
+ return
+
+ self.mpiio_df['counters'] = self.mpiio_df['counters'].assign(
+ id=lambda d: d['id'].astype(str)) # What does this do?
+
+
+ df_mpiio_collective_reads = self.mpiio_df['counters'] # .loc[(df_mpiio['counters']['MPIIO_COLL_READS'] > 0)]
+
+ self.total_mpiio_read_operations = self.mpiio_df['counters']['MPIIO_INDEP_READS'].sum() + self.mpiio_df['counters'][
+ 'MPIIO_COLL_READS'].sum()
+
+ self.mpiio_coll_reads = self.mpiio_df['counters']['MPIIO_COLL_READS'].sum()
+ self.mpiio_indep_reads = self.mpiio_df['counters']['MPIIO_INDEP_READS'].sum()
+
+ self.detected_files_mpi_coll_reads = []
+ if self.mpiio_coll_reads == 0 and self.total_mpiio_read_operations and self.total_mpiio_read_operations > \
+ module.thresholds['collective_operations_absolute'][0]:
+ files = pd.DataFrame(df_mpiio_collective_reads.groupby('id').sum()).reset_index()
+ for index, row in df_mpiio_collective_reads.iterrows():
+ if ((row['MPIIO_INDEP_READS'] + row['MPIIO_INDEP_WRITES']) and
+ row['MPIIO_INDEP_READS'] / (row['MPIIO_INDEP_READS'] + row['MPIIO_INDEP_WRITES']) >
+ module.thresholds['collective_operations'][0] and
+ (row['MPIIO_INDEP_READS'] + row['MPIIO_INDEP_WRITES']) >
+ module.thresholds['collective_operations_absolute'][0]):
+ self.detected_files_mpi_coll_reads.append([
+ row['id'], row['MPIIO_INDEP_READS'],
+ row['MPIIO_INDEP_READS'] / (row['MPIIO_INDEP_READS'] + row['MPIIO_INDEP_WRITES']) * 100
+ ])
+
+ column_names = ['id', 'absolute_indep_reads', 'percent_indep_reads']
+ self.detected_files_mpi_coll_reads = pd.DataFrame(self.detected_files_mpi_coll_reads, columns=column_names)
- imbalance_count = 0
+ # module.check_mpi_collective_read_operation(self.mpiio_coll_reads, self.mpiio_indep_reads, self.total_mpiio_read_operations,
+ # self.detected_files_mpi_coll_reads, self.report.name_records, self.dxt_mpiio)
- detected_files = []
+ # TODO: Split this into 2 functions for each module insight
- for index, row in aggregated.iterrows():
- if row['POSIX_BYTES_READ_max'] and abs(row['POSIX_BYTES_READ_max'] - row['POSIX_BYTES_READ_min']) / row['POSIX_BYTES_READ_max'] > thresholds['imbalance_size'][0]:
- imbalance_count += 1
- detected_files.append([
- row['id'], abs(row['POSIX_BYTES_READ_max'] - row['POSIX_BYTES_READ_min']) / row['POSIX_BYTES_READ_max'] * 100
+ df_mpiio_collective_writes = self.mpiio_df['counters'] # .loc[(df_mpiio['counters']['MPIIO_COLL_WRITES'] > 0)]
+
+ self.total_mpiio_write_operations = self.mpiio_df['counters']['MPIIO_INDEP_WRITES'].sum() + \
+ self.mpiio_df['counters'][
+ 'MPIIO_COLL_WRITES'].sum()
+
+ self.mpiio_coll_writes = self.mpiio_df['counters']['MPIIO_COLL_WRITES'].sum()
+ self.mpiio_indep_writes = self.mpiio_df['counters']['MPIIO_INDEP_WRITES'].sum()
+
+ self.detected_files_mpiio_coll_writes = []
+ if self.mpiio_coll_writes == 0 and self.total_mpiio_write_operations and self.total_mpiio_write_operations > \
+ module.thresholds['collective_operations_absolute'][0]:
+ files = pd.DataFrame(df_mpiio_collective_writes.groupby('id').sum()).reset_index()
+
+ for index, row in df_mpiio_collective_writes.iterrows():
+ if ((row['MPIIO_INDEP_READS'] + row['MPIIO_INDEP_WRITES']) and
+ row['MPIIO_INDEP_WRITES'] / (row['MPIIO_INDEP_READS'] + row['MPIIO_INDEP_WRITES']) >
+ module.thresholds['collective_operations'][0] and
+ (row['MPIIO_INDEP_READS'] + row['MPIIO_INDEP_WRITES']) >
+ module.thresholds['collective_operations_absolute'][0]):
+ self.detected_files_mpiio_coll_writes.append([
+ row['id'], row['MPIIO_INDEP_WRITES'],
+ row['MPIIO_INDEP_WRITES'] / (row['MPIIO_INDEP_READS'] + row['MPIIO_INDEP_WRITES']) * 100
+ ])
+
+ column_names = ['id', 'absolute_indep_writes', 'percent_indep_writes']
+ self.detected_files_mpiio_coll_writes = pd.DataFrame(self.detected_files_mpiio_coll_writes, columns=column_names)
+
+ # module.check_mpi_collective_write_operation(self.mpiio_coll_writes, self.mpiio_indep_writes, self.total_mpiio_write_operations,
+ # detected_files_mpiio_coll_writes, self.report.name_records, self.dxt_mpiio)
+
+ def posix_imbalance2(self):
+ if not self.posix_df:
+ return
+
+ self.imbalance_count_posix_shared_time = 0
+
+ self.posix_shared_time_imbalance_detected_files3 = []
+
+ for index, row in self.aggregated.iterrows():
+ if row['POSIX_BYTES_READ_max'] and abs(row['POSIX_BYTES_READ_max'] - row['POSIX_BYTES_READ_min']) / row[
+ 'POSIX_BYTES_READ_max'] > module.thresholds['imbalance_size'][0]:
+ self.imbalance_count_posix_shared_time += 1
+
+ self.posix_shared_time_imbalance_detected_files3.append([
+ row['id'],
+ abs(row['POSIX_BYTES_READ_max'] - row['POSIX_BYTES_READ_min']) / row['POSIX_BYTES_READ_max'] * 100
])
column_names = ['id', 'read_imbalance']
- detected_files = pd.DataFrame(detected_files, columns=column_names)
- check_individual_read_imbalance(imbalance_count, detected_files, file_map, dxt_posix, dxt_posix_read_data)
+ self.posix_shared_time_imbalance_detected_files3 = pd.DataFrame(self.posix_shared_time_imbalance_detected_files3,
+ columns=column_names)
+ # module.check_individual_read_imbalance(self.imbalance_count_posix_shared_time, self.posix_shared_time_imbalance_detected_files3,
+ # self.report.name_records, self.dxt_posix, self.dxt_posix_read_data)
- #########################################################################################################################################################################
+ def hdf5_check(self):
+ if not self.mpiio_df:
+ return
- if 'MPI-IO' in report.records:
- # Check if application uses MPI-IO and collective operations
- df_mpiio = report.records['MPI-IO'].to_df()
- df_mpiio['counters'] = df_mpiio['counters'].assign(id=lambda d: d['id'].astype(str))
+ self.report.name_records = self.report.name_records # Will this be optimised via JIT? Nvm CPython doesn't have JIT lol
+ for index, row in self.mpiio_df['counters'].iterrows():
+ if self.report.name_records[int(row['id'])].endswith('.h5') or self.report.name_records[
+ int(row['id'])].endswith('.hdf5'):
+ self.has_hdf5_extension = True
+ break # Early exit
- # Get the files responsible
- detected_files = []
+ def mpiio_non_blocking(self):
+ if not self.mpiio_df:
+ return
- df_mpiio_collective_reads = df_mpiio['counters'] #.loc[(df_mpiio['counters']['MPIIO_COLL_READS'] > 0)]
+ self.mpiio_nb_reads = self.mpiio_df['counters']['MPIIO_NB_READS'].sum()
+ self.mpiio_nb_writes = self.mpiio_df['counters']['MPIIO_NB_WRITES'].sum()
- total_mpiio_read_operations = df_mpiio['counters']['MPIIO_INDEP_READS'].sum() + df_mpiio['counters']['MPIIO_COLL_READS'].sum()
+ # module.check_mpi_none_block_operation(self.mpiio_nb_reads, self.mpiio_nb_writes, self.has_hdf5_extension,
+ # self.modules)
- mpiio_coll_reads = df_mpiio['counters']['MPIIO_COLL_READS'].sum()
- mpiio_indep_reads = df_mpiio['counters']['MPIIO_INDEP_READS'].sum()
+ def CHECKnumber_of_aggregators(self):
+ hints = ''
- detected_files = []
- if mpiio_coll_reads == 0 and total_mpiio_read_operations and total_mpiio_read_operations > thresholds['collective_operations_absolute'][0]:
- files = pd.DataFrame(df_mpiio_collective_reads.groupby('id').sum()).reset_index()
- for index, row in df_mpiio_collective_reads.iterrows():
- if ((row['MPIIO_INDEP_READS'] + row['MPIIO_INDEP_WRITES']) and
- row['MPIIO_INDEP_READS'] / (row['MPIIO_INDEP_READS'] + row['MPIIO_INDEP_WRITES']) > thresholds['collective_operations'][0] and
- (row['MPIIO_INDEP_READS'] + row['MPIIO_INDEP_WRITES']) > thresholds['collective_operations_absolute'][0]):
+ if 'h' in self.report.metadata['job']['metadata']:
+ hints = self.report.metadata['job']['metadata']['h']
- detected_files.append([
- row['id'], row['MPIIO_INDEP_READS'], row['MPIIO_INDEP_READS'] / (row['MPIIO_INDEP_READS'] + row['MPIIO_INDEP_WRITES']) * 100
- ])
-
- column_names = ['id', 'absolute_indep_reads', 'percent_indep_reads']
- detected_files = pd.DataFrame(detected_files, columns=column_names)
+ if hints:
+ hints = hints.split(';')
- check_mpi_collective_read_operation(mpiio_coll_reads, mpiio_indep_reads, total_mpiio_read_operations, detected_files, file_map, dxt_mpiio)
+ self.hints = hints
- df_mpiio_collective_writes = df_mpiio['counters'] #.loc[(df_mpiio['counters']['MPIIO_COLL_WRITES'] > 0)]
+ if 'MPI-IO' in self.modules:
- total_mpiio_write_operations = df_mpiio['counters']['MPIIO_INDEP_WRITES'].sum() + df_mpiio['counters']['MPIIO_COLL_WRITES'].sum()
+ for hint in hints:
+ if hint != 'no':
+ (key, value) = hint.split('=')
- mpiio_coll_writes = df_mpiio['counters']['MPIIO_COLL_WRITES'].sum()
- mpiio_indep_writes = df_mpiio['counters']['MPIIO_INDEP_WRITES'].sum()
+ if key == 'cb_nodes':
+ self.cb_nodes = value
- detected_files = []
- if mpiio_coll_writes == 0 and total_mpiio_write_operations and total_mpiio_write_operations > thresholds['collective_operations_absolute'][0]:
- files = pd.DataFrame(df_mpiio_collective_writes.groupby('id').sum()).reset_index()
+ # Try to get the number of compute nodes from SLURM, if not found, set as information
+ command = f'sacct --job {self.report.metadata["job"]["jobid"]} --format=JobID,JobIDRaw,NNodes,NCPUs --parsable2 --delimiter ","'
- for index, row in df_mpiio_collective_writes.iterrows():
- if ((row['MPIIO_INDEP_READS'] + row['MPIIO_INDEP_WRITES']) and
- row['MPIIO_INDEP_WRITES'] / (row['MPIIO_INDEP_READS'] + row['MPIIO_INDEP_WRITES']) > thresholds['collective_operations'][0] and
- (row['MPIIO_INDEP_READS'] + row['MPIIO_INDEP_WRITES']) > thresholds['collective_operations_absolute'][0]):
+ arguments = shlex.split(command)
- detected_files.append([
- row['id'], row['MPIIO_INDEP_WRITES'], row['MPIIO_INDEP_WRITES'] / (row['MPIIO_INDEP_READS'] + row['MPIIO_INDEP_WRITES']) * 100
- ])
+ try:
+ result = subprocess.run(arguments, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- column_names = ['id', 'absolute_indep_writes', 'percent_indep_writes']
- detected_files = pd.DataFrame(detected_files, columns=column_names)
+ if result.returncode == 0:
+ # We have successfully fetched the information from SLURM
+ db = csv.DictReader(io.StringIO(result.stdout.decode('utf-8')))
- check_mpi_collective_write_operation(mpiio_coll_writes, mpiio_indep_writes, total_mpiio_write_operations, detected_files, file_map, dxt_mpiio)
+ try:
+ first = next(db)
- #########################################################################################################################################################################
+ if 'NNodes' in first:
+ self.number_of_compute_nodes = first['NNodes']
- # Look for usage of non-block operations
+ # Do we have one MPI-IO aggregator per node?
+ module.check_mpi_aggregator(self.cb_nodes, self.number_of_compute_nodes)
+ except StopIteration:
+ pass
+ except FileNotFoundError:
+ pass
- # Look for HDF5 file extension
+ def something_else(self):
+ if 'start_time' in self.report.metadata['job']:
+ self.job_start = datetime.datetime.fromtimestamp(self.report.metadata['job']['start_time'],
+ datetime.timezone.utc)
+ self.job_end = datetime.datetime.fromtimestamp(self.report.metadata['job']['end_time'],
+ datetime.timezone.utc)
+ else:
+ self.job_start = datetime.datetime.fromtimestamp(self.report.metadata['job']['start_time_sec'],
+ datetime.timezone.utc)
+ self.job_end = datetime.datetime.fromtimestamp(self.report.metadata['job']['end_time_sec'],
+ datetime.timezone.utc)
- has_hdf5_extension = False
- for index, row in df_mpiio['counters'].iterrows():
- if file_map[int(row['id'])].endswith('.h5') or file_map[int(row['id'])].endswith('.hdf5'):
- has_hdf5_extension = True
- mpiio_nb_reads = df_mpiio['counters']['MPIIO_NB_READS'].sum()
- mpiio_nb_writes = df_mpiio['counters']['MPIIO_NB_WRITES'].sum()
+@dataclass
+class DarshanTrace(AbstractDarshanTrace):
+ path: Optional[str] = None
+ report: Optional[darshan.DarshanReport] = None
- check_mpi_none_block_operation(mpiio_nb_reads, mpiio_nb_writes, has_hdf5_extension, modules)
+ stdio_df: pd.DataFrame = None
+ posix_df: pd.DataFrame = None
+ mpiio_df: pd.DataFrame = None
+ lustre_df: pd.DataFrame = None
- #########################################################################################################################################################################
+ dxt_posix: pd.DataFrame = None
+ dxt_mpiio: pd.DataFrame = None
- # Nodes and MPI-IO aggregators
- # If the application uses collective reads or collective writes, look for the number of aggregators
- hints = ''
+ dxt_posix_read_data: pd.DataFrame = None
+ dxt_posix_write_data: pd.DataFrame = None
- if 'h' in job['job']['metadata']:
- hints = job['job']['metadata']['h']
+ shared_files: pd.DataFrame = None
- if hints:
- hints = hints.split(';')
+ def __init__(self, trace_path: str, job_information, report: darshan.DarshanReport):
+ self.path = trace_path
- # print('Hints: ', hints)
+ self.jobid = job_information['jobid']
+ self.log_ver = job_information['log_ver'] if 'log_ver' in job_information else job_information['metadata'][
+ 'lib_ver']
+ self.exe = report.metadata['exe']
- NUMBER_OF_COMPUTE_NODES = 0
+ _start_time = datetime.datetime.fromtimestamp(job_information['start_time_sec'], tz=datetime.timezone.utc)
+ _end_time = datetime.datetime.fromtimestamp(job_information['end_time_sec'], tz=datetime.timezone.utc)
+ self.time = TimestampPair(_start_time, _end_time)
- if 'MPI-IO' in modules:
- cb_nodes = None
+ self.modules = report.modules.keys()
- for hint in hints:
- if hint != 'no':
- (key, value) = hint.split('=')
-
- if key == 'cb_nodes':
- cb_nodes = value
+ # TODO: Should I search in self.modules or in report.records?
+ # ! All dfs are being materialised
+ self.report = report
+ self.posix_df = report.records['POSIX'].to_df() if 'POSIX' in self.modules else None
+ self.stdio_df = report.records['STDIO'].to_df() if 'STDIO' in self.modules else None
+ self.mpiio_df = report.records['MPI-IO'].to_df() if 'MPI-IO' in self.modules else None
- # Try to get the number of compute nodes from SLURM, if not found, set as information
- command = 'sacct --job {} --format=JobID,JobIDRaw,NNodes,NCPUs --parsable2 --delimiter ","'.format(
- job['job']['jobid']
- )
+ self.lustre_df = report.records['LUSTRE'].to_df() if 'LUSTRE' in self.modules else None
- arguments = shlex.split(command)
+ self.dxt_posix = report.records['DXT_POSIX'].to_df() if 'DXT_POSIX' in self.modules else None
+ self.dxt_mpiio = report.records['DXT_MPIIO'].to_df() if 'DXT_MPIIO' in self.modules else None
- try:
- result = subprocess.run(arguments, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ self.hints = []
+ self.files = {}
- if result.returncode == 0:
- # We have successfully fetched the information from SLURM
- db = csv.DictReader(io.StringIO(result.stdout.decode('utf-8')))
- try:
- first = next(db)
+@dataclass
+class AggregatedDarshanTraces(AbstractDarshanTrace):
+
+ traces: List[DarshanTrace] = field(default_factory=list)
+ # reports: List[darshan.DarshanReport] = field(default_factory=list)
+
+ def __init__(self, traces: List[DarshanTrace]):
+ assert len(traces) > 0
+ self.traces = traces
+
+ reports = [current_trace.report for current_trace in traces]
+ self.name_records = dict()
+ for report in reports:
+ self.name_records.update(report.name_records) # self.name_records |= report.name_records
+
+ def aggregate_traces(self):
+ self.modules = set()
+ self.files = dict()
+ for current_trace in self.traces:
+ self.modules.union(current_trace.modules)
+
+
+ self.total_write_size_stdio += current_trace.total_write_size_stdio
+ self.total_write_size_stdio += current_trace.total_write_size_stdio
+ self.total_size_stdio += current_trace.total_size_stdio
+
+ self.total_write_size_posix += current_trace.total_write_size_posix
+ self.total_read_size_posix += current_trace.total_read_size_posix
+ self.total_size_posix += current_trace.total_size_posix
+
+ self.total_write_size_mpiio += current_trace.total_write_size_mpiio
+ self.total_read_size_mpiio += current_trace.total_read_size_mpiio
+ self.total_size_mpiio += current_trace.total_size_mpiio
+
+ self.total_size += current_trace.total_size
+ self.total_files += current_trace.total_files
+ ###
+ self.max_read_offset = max(self.max_read_offset, current_trace.max_read_offset)
+ self.max_write_offset = max(self.max_read_offset, current_trace.max_write_offset)
+ ###
+
+ self.total_files_stdio += current_trace.total_files_stdio
+ self.total_files_posix += current_trace.total_size_posix
+ self.total_files_mpiio += current_trace.total_files_mpiio
+
+ self.files.update(current_trace.files) # self.files |= current_trace.files
+
+ self.total_reads += current_trace.total_reads
+ self.total_writes += current_trace.total_writes
+ self.total_operations += current_trace.total_operations
+ self.total_read_size += current_trace.total_read_size
+ self.total_written_size += current_trace.total_written_size
+ self.total_posix_size += current_trace.total_posix_size
+ self.total_reads_small += current_trace.total_reads_small
+ self.total_writes_small += current_trace.total_writes_small
+
+ self.total_mem_not_aligned += current_trace.total_mem_not_aligned
+ self.total_file_not_aligned += current_trace.total_file_not_aligned
+
+ self.read_consecutive += current_trace.read_consecutive
+ self.read_sequential += current_trace.read_sequential
+ self.read_random += current_trace.read_random
+ self.write_consecutive += current_trace.write_consecutive
+ self.write_sequential += current_trace.write_sequential
+ self.write_random += current_trace.write_random
+
+ self.total_shared_reads += current_trace.total_shared_reads
+ self.total_shared_reads_small += current_trace.total_shared_reads_small
+ self.total_shared_writes += current_trace.total_shared_writes
+ self.total_shared_writes_small += current_trace.total_shared_writes_small
+
+ self.count_long_metadata += current_trace.count_long_metadata
+
+ self.posix_shared_data_imbalance_stragglers_count += current_trace.posix_shared_data_imbalance_stragglers_count
+
+ self.has_hdf5_extension = self.has_hdf5_extension or current_trace.has_hdf5_extension
+
+ self.mpiio_nb_reads += current_trace.mpiio_nb_reads
+ self.mpiio_nb_writes += current_trace.mpiio_nb_writes
+
+def log_relation_check():
+ # TODO: Ensure that all logs are from a single job, generated at the same time, from the same executable and using the same library version
+ pass
+
+
+def handler():
+ console = config.init_console()
+
+ insights_start_time = time.time()
+
+ darshan.enable_experimental()
+ library_version = darshanll.get_lib_version()
+
+ # trace_path = args.log_paths[0] # TODO: A single file rn
+ darshan_traces = []
+
+
+ for trace_path in args.log_paths:
+ log = darshanll.log_open(trace_path)
+ information = darshanll.log_get_job(log)
+ darshanll.log_close(log)
+
+ report = darshan.DarshanReport(trace_path)
+ current_trace = DarshanTrace(trace_path, information, report)
+ darshan_traces.append(current_trace)
+ #
+
+ # Leave this as is for now
+ # # Make sure log format is of the same version
+ # filename = args.trace_path
+ # # check_log_version(console, args.trace_path, log_version, library_version)
+ #
+
+ # Compute values for each trace
+ for current_trace in darshan_traces:
+ current_trace.generate_dxt_posix_rw_df()
+ current_trace.calculate_insights()
+ current_trace.files_stuff()
+ # current_trace.check_stdio()
+ # current_trace.check_mpiio()
+ current_trace.something()
+ current_trace.small_operation_calculation()
+ current_trace.posix_alignment()
+ current_trace.posix_redundant_reads()
+ current_trace.posix_random_check()
+ current_trace.posix_shared_file()
+ current_trace.posix_long_metadata()
+ current_trace.posix_stragglers()
+ current_trace.posix_stragglers2()
+ current_trace.posix_imbalance()
+ current_trace.hdf5_check()
+ current_trace.mpiio_non_blocking()
+ current_trace.CHECKnumber_of_aggregators()
+ current_trace.something_else()
+ # current_trace.generate_insights()
+
+ # Create aggregated trace
+ aggregated_trace = AggregatedDarshanTraces(traces=darshan_traces)
+ aggregated_trace.aggregate_traces()
+ aggregated_trace.generate_insights()
- if 'NNodes' in first:
- NUMBER_OF_COMPUTE_NODES = first['NNodes']
- # Do we have one MPI-IO aggregator per node?
- check_mpi_aggregator(cb_nodes, NUMBER_OF_COMPUTE_NODES)
- except StopIteration:
- pass
- except FileNotFoundError:
- pass
-
- #########################################################################################################################################################################
insights_end_time = time.time()
# Version 3.4.1 of py-darshan changed the contents on what is reported in 'job'
- if 'start_time' in job['job']:
- job_start = datetime.datetime.fromtimestamp(job['job']['start_time'], datetime.timezone.utc)
- job_end = datetime.datetime.fromtimestamp(job['job']['end_time'], datetime.timezone.utc)
+ job_end, job_start = set_job_time(report)
+
+ print_insights(console, current_trace, insights_end_time, insights_start_time, job_end, job_start, trace_path,
+ report)
+
+ export_results(console, trace_path, report)
+
+
+def set_job_time(report):
+ if 'start_time' in report.metadata['job']:
+ job_start = datetime.datetime.fromtimestamp(report.metadata['job']['start_time'], datetime.timezone.utc)
+ job_end = datetime.datetime.fromtimestamp(report.metadata['job']['end_time'], datetime.timezone.utc)
else:
- job_start = datetime.datetime.fromtimestamp(job['job']['start_time_sec'], datetime.timezone.utc)
- job_end = datetime.datetime.fromtimestamp(job['job']['end_time_sec'], datetime.timezone.utc)
+ job_start = datetime.datetime.fromtimestamp(report.metadata['job']['start_time_sec'], datetime.timezone.utc)
+ job_end = datetime.datetime.fromtimestamp(report.metadata['job']['end_time_sec'], datetime.timezone.utc)
+ return job_end, job_start
- console.print()
+def export_results(console, log_path, report):
+ # Export to HTML, SVG, and CSV
+ trace_name = os.path.basename(log_path).replace('.darshan', '')
+ out_dir = args.export_dir if args.export_dir != "" else os.getcwd()
+ module.export_html(console, out_dir, trace_name)
+ module.export_svg(console, out_dir, trace_name)
+ module.export_csv(out_dir, trace_name, report.metadata['job']['jobid'])
+
+
+def print_insights(console, current_trace, insights_end_time, insights_start_time, job_end, job_start, log_path,
+ report):
+ console.print()
console.print(
Panel(
'\n'.join([
' [b]JOB[/b]: [white]{}[/white]'.format(
- job['job']['jobid']
+ report.metadata['job']['jobid']
),
' [b]EXECUTABLE[/b]: [white]{}[/white]'.format(
- job['exe'].split()[0]
+ report.metadata['exe'].split()[0]
),
' [b]DARSHAN[/b]: [white]{}[/white]'.format(
- os.path.basename(args.log_path)
+ os.path.basename(log_path)
),
' [b]EXECUTION TIME[/b]: [white]{} to {} ({:.2f} hours)[/white]'.format(
job_start,
@@ -719,19 +1105,20 @@ def handler():
(job_end - job_start).total_seconds() / 3600
),
' [b]FILES[/b]: [white]{} files ({} use STDIO, {} use POSIX, {} use MPI-IO)[/white]'.format(
- total_files,
- total_files_stdio,
- total_files_posix - total_files_mpiio, # Since MPI-IO files will always use POSIX, we can decrement to get a unique count
- total_files_mpiio
+ current_trace.total_files_posix,
+ current_trace.total_files_stdio,
+ current_trace.total_files_posix - current_trace.total_files_mpiio,
+ # Since MPI-IO files will always use POSIX, we can decrement to get a unique count
+ current_trace.total_files_mpiio
),
' [b]COMPUTE NODES[/b] [white]{}[/white]'.format(
- NUMBER_OF_COMPUTE_NODES
+ current_trace.number_of_compute_nodes
),
' [b]PROCESSES[/b] [white]{}[/white]'.format(
- job['job']['nprocs']
+ report.metadata['job']['nprocs']
),
' [b]HINTS[/b]: [white]{}[/white]'.format(
- ' '.join(hints)
+ ' '.join(current_trace.hints)
)
]),
title='[b][slate_blue3]DRISHTI[/slate_blue3] v.0.5[/b]',
@@ -745,22 +1132,7 @@ def handler():
padding=1
)
)
-
console.print()
-
- display_content(console)
- display_thresholds(console)
- display_footer(console, insights_start_time, insights_end_time)
-
- filename = '{}.html'.format(args.log_path)
- export_html(console, filename)
-
- filename = '{}.svg'.format(args.log_path)
- export_svg(console, filename)
-
- filename = '{}-summary.csv'.format(
- args.log_path.replace('.darshan', '')
- )
-
- export_csv(filename, job['job']['jobid'])
-
+ module.display_content(console)
+ module.display_thresholds(console)
+ module.display_footer(console, insights_start_time, insights_end_time)
diff --git a/drishti/handlers/handle_recorder.py b/drishti/handlers/handle_recorder.py
index 34c4790..1f30494 100644
--- a/drishti/handlers/handle_recorder.py
+++ b/drishti/handlers/handle_recorder.py
@@ -2,10 +2,11 @@
import os
import time
+
import pandas as pd
+from includes.module import *
from recorder_utils import RecorderReader
from recorder_utils.build_offset_intervals import build_offset_intervals
-from drishti.includes.module import *
def get_accessed_files(reader):
@@ -577,23 +578,12 @@ def process_helper(file_map, df_intervals, df_posix_records, fid=None):
display_thresholds(console)
display_footer(console, insights_start_time, insights_end_time)
+ # Export to HTML, SVG, and CSV
+ trace_name = os.path.basename(os.path.dirname(args.log_path))
if args.split_files:
- filename = '{}.{}.html'.format(args.log_path, fid)
- else:
- filename = '{}.html'.format(args.log_path)
-
- export_html(console, filename)
-
- if args.split_files:
- filename = '{}.{}.svg'.format(args.log_path, fid)
- else:
- filename = '{}.svg'.format(args.log_path)
-
- export_svg(console, filename)
-
- if args.split_files:
- filename = '{}.{}.summary.csv'.format(args.log_path, fid)
- else:
- filename = '{}-summary.csv'.format(args.log_path)
- export_csv(filename)
+ trace_name = f"{trace_name}.{fid}"
+ out_dir = args.export_dir if args.export_dir != "" else os.getcwd()
+ export_html(console, out_dir, trace_name)
+ export_svg(console, out_dir, trace_name)
+ export_csv(out_dir, trace_name)
diff --git a/drishti/includes/config.py b/drishti/includes/config.py
index 15097fd..edad899 100644
--- a/drishti/includes/config.py
+++ b/drishti/includes/config.py
@@ -1,16 +1,13 @@
#!/usr/bin/env python3
-import os
import json
+import os
+from drishti.includes.parser import *
from rich.console import Console, Group
from rich.padding import Padding
from rich.panel import Panel
-from rich.terminal_theme import TerminalTheme
-from rich.terminal_theme import MONOKAI
-
-from drishti.includes.parser import *
-
+from rich.terminal_theme import MONOKAI, TerminalTheme
RECOMMENDATIONS = 0
HIGH = 1
diff --git a/drishti/includes/module.py b/drishti/includes/module.py
index dedaa09..3e7bd94 100644
--- a/drishti/includes/module.py
+++ b/drishti/includes/module.py
@@ -1,12 +1,13 @@
#!/usr/bin/env python3
-import datetime
import csv
+import datetime
import time
+
import pandas as pd
+from drishti.includes.config import *
from rich import box
from rich.syntax import Syntax
-from drishti.includes.config import *
'''
Before calling the functions below
@@ -1823,76 +1824,90 @@ def display_footer(console, insights_start_time, insights_end_time):
)
)
-def export_html(console, filename):
- '''
- '''
- if args.export_html:
- console.save_html(
- filename,
- theme=set_export_theme(),
- clear=False
- )
+def export_html(console, export_dir, trace_name):
+ if not args.export_html:
+ return
+ os.makedirs(export_dir, exist_ok=True) # Ensure export directory exists
+ filepath = os.path.join(export_dir, f"{trace_name}.html")
-def export_svg(console, filename):
- if args.export_svg:
- console.save_svg(
- filename,
- title='Drishti',
- theme=set_export_theme(),
- clear=False
- )
+ console.save_html(
+ filepath,
+ theme=set_export_theme(),
+ clear=False
+ )
-def export_csv(filename, jobid=None):
- if args.export_csv:
- issues = [
- 'JOB',
- INSIGHTS_STDIO_HIGH_USAGE,
- INSIGHTS_POSIX_WRITE_COUNT_INTENSIVE,
- INSIGHTS_POSIX_READ_COUNT_INTENSIVE,
- INSIGHTS_POSIX_WRITE_SIZE_INTENSIVE,
- INSIGHTS_POSIX_READ_SIZE_INTENSIVE,
- INSIGHTS_POSIX_HIGH_SMALL_READ_REQUESTS_USAGE,
- INSIGHTS_POSIX_HIGH_SMALL_WRITE_REQUESTS_USAGE,
- INSIGHTS_POSIX_HIGH_MISALIGNED_MEMORY_USAGE,
- INSIGHTS_POSIX_HIGH_MISALIGNED_FILE_USAGE,
- INSIGHTS_POSIX_REDUNDANT_READ_USAGE,
- INSIGHTS_POSIX_REDUNDANT_WRITE_USAGE,
- INSIGHTS_POSIX_HIGH_RANDOM_READ_USAGE,
- INSIGHTS_POSIX_HIGH_SEQUENTIAL_READ_USAGE,
- INSIGHTS_POSIX_HIGH_RANDOM_WRITE_USAGE,
- INSIGHTS_POSIX_HIGH_SEQUENTIAL_WRITE_USAGE,
- INSIGHTS_POSIX_HIGH_SMALL_READ_REQUESTS_SHARED_FILE_USAGE,
- INSIGHTS_POSIX_HIGH_SMALL_WRITE_REQUESTS_SHARED_FILE_USAGE,
- INSIGHTS_POSIX_HIGH_METADATA_TIME,
- INSIGHTS_POSIX_SIZE_IMBALANCE,
- INSIGHTS_POSIX_TIME_IMBALANCE,
- INSIGHTS_POSIX_INDIVIDUAL_WRITE_SIZE_IMBALANCE,
- INSIGHTS_POSIX_INDIVIDUAL_READ_SIZE_IMBALANCE,
- INSIGHTS_MPI_IO_NO_USAGE,
- INSIGHTS_MPI_IO_NO_COLLECTIVE_READ_USAGE,
- INSIGHTS_MPI_IO_NO_COLLECTIVE_WRITE_USAGE,
- INSIGHTS_MPI_IO_COLLECTIVE_READ_USAGE,
- INSIGHTS_MPI_IO_COLLECTIVE_WRITE_USAGE,
- INSIGHTS_MPI_IO_BLOCKING_READ_USAGE,
- INSIGHTS_MPI_IO_BLOCKING_WRITE_USAGE,
- INSIGHTS_MPI_IO_AGGREGATORS_INTRA,
- INSIGHTS_MPI_IO_AGGREGATORS_INTER,
- INSIGHTS_MPI_IO_AGGREGATORS_OK
- ]
- if codes:
- issues.extend(codes)
+def export_svg(console, export_dir, trace_name):
+ if not args.export_svg:
+ return
+
+ os.makedirs(export_dir, exist_ok=True) # Ensure export directory exists
+ filepath = os.path.join(export_dir, f"{trace_name}.svg")
+
+ console.save_svg(
+ filepath,
+ title='Drishti',
+ theme=set_export_theme(),
+ clear=False
+ )
+
- detected_issues = dict.fromkeys(issues, False)
- detected_issues['JOB'] = jobid
+def export_csv(export_dir, trace_name, jobid=None):
+ if not args.export_csv:
+ return
+
+ issues = [
+ 'JOB',
+ INSIGHTS_STDIO_HIGH_USAGE,
+ INSIGHTS_POSIX_WRITE_COUNT_INTENSIVE,
+ INSIGHTS_POSIX_READ_COUNT_INTENSIVE,
+ INSIGHTS_POSIX_WRITE_SIZE_INTENSIVE,
+ INSIGHTS_POSIX_READ_SIZE_INTENSIVE,
+ INSIGHTS_POSIX_HIGH_SMALL_READ_REQUESTS_USAGE,
+ INSIGHTS_POSIX_HIGH_SMALL_WRITE_REQUESTS_USAGE,
+ INSIGHTS_POSIX_HIGH_MISALIGNED_MEMORY_USAGE,
+ INSIGHTS_POSIX_HIGH_MISALIGNED_FILE_USAGE,
+ INSIGHTS_POSIX_REDUNDANT_READ_USAGE,
+ INSIGHTS_POSIX_REDUNDANT_WRITE_USAGE,
+ INSIGHTS_POSIX_HIGH_RANDOM_READ_USAGE,
+ INSIGHTS_POSIX_HIGH_SEQUENTIAL_READ_USAGE,
+ INSIGHTS_POSIX_HIGH_RANDOM_WRITE_USAGE,
+ INSIGHTS_POSIX_HIGH_SEQUENTIAL_WRITE_USAGE,
+ INSIGHTS_POSIX_HIGH_SMALL_READ_REQUESTS_SHARED_FILE_USAGE,
+ INSIGHTS_POSIX_HIGH_SMALL_WRITE_REQUESTS_SHARED_FILE_USAGE,
+ INSIGHTS_POSIX_HIGH_METADATA_TIME,
+ INSIGHTS_POSIX_SIZE_IMBALANCE,
+ INSIGHTS_POSIX_TIME_IMBALANCE,
+ INSIGHTS_POSIX_INDIVIDUAL_WRITE_SIZE_IMBALANCE,
+ INSIGHTS_POSIX_INDIVIDUAL_READ_SIZE_IMBALANCE,
+ INSIGHTS_MPI_IO_NO_USAGE,
+ INSIGHTS_MPI_IO_NO_COLLECTIVE_READ_USAGE,
+ INSIGHTS_MPI_IO_NO_COLLECTIVE_WRITE_USAGE,
+ INSIGHTS_MPI_IO_COLLECTIVE_READ_USAGE,
+ INSIGHTS_MPI_IO_COLLECTIVE_WRITE_USAGE,
+ INSIGHTS_MPI_IO_BLOCKING_READ_USAGE,
+ INSIGHTS_MPI_IO_BLOCKING_WRITE_USAGE,
+ INSIGHTS_MPI_IO_AGGREGATORS_INTRA,
+ INSIGHTS_MPI_IO_AGGREGATORS_INTER,
+ INSIGHTS_MPI_IO_AGGREGATORS_OK
+ ]
+ if codes:
+ issues.extend(codes)
+
+ detected_issues = dict.fromkeys(issues, False)
+ detected_issues['JOB'] = jobid
+
+ for report in csv_report:
+ detected_issues[report] = True
- for report in csv_report:
- detected_issues[report] = True
+
+ os.makedirs(export_dir, exist_ok=True) # Ensure export directory exists
+ filepath = os.path.join(export_dir, f"{trace_name}.csv")
- with open(filename, 'w') as f:
- w = csv.writer(f)
- w.writerow(detected_issues.keys())
- w.writerow(detected_issues.values())
+ with open(filepath, 'w') as f:
+ w = csv.writer(f)
+ w.writerow(detected_issues.keys())
+ w.writerow(detected_issues.values())
diff --git a/drishti/includes/parser.py b/drishti/includes/parser.py
index 8659520..842874e 100644
--- a/drishti/includes/parser.py
+++ b/drishti/includes/parser.py
@@ -5,7 +5,8 @@
)
parser.add_argument(
- 'log_path',
+ 'log_paths',
+ nargs='+',
help='Input .darshan file or recorder folder'
)
@@ -96,6 +97,13 @@
help='Export a CSV with the code of all issues that were triggered'
)
+parser.add_argument(
+ '--export_dir',
+ default="",
+ dest='export_dir',
+ help='Specify the directory prefix for the output files (if any)'
+)
+
parser.add_argument(
'--json',
default=False,
diff --git a/drishti/reporter.py b/drishti/reporter.py
index 8455040..c07be9a 100644
--- a/drishti/reporter.py
+++ b/drishti/reporter.py
@@ -3,8 +3,10 @@
import os
import sys
from subprocess import call
-from drishti.includes.parser import *
+from typing import List, Optional
+# from includes.parser import * # imports {'parser', 'args', 'argparse'} # TODO: Is next line enuf
+from drishti.includes.parser import args
'''
|- handler_darshan -|
@@ -17,7 +19,6 @@
|-----> /includes -> module -> config -> parser
'''
-
LOG_TYPE_DARSHAN = 0
LOG_TYPE_RECORDER = 1
@@ -29,27 +30,54 @@ def clear():
_ = call('clear' if os.name == 'posix' else 'cls')
-def check_log_type(path):
- if path.endswith('.darshan'):
- if not os.path.isfile(path):
- print('Unable to open .darshan file.')
+def check_log_type(paths: List[str]) -> Optional[int]:
+ is_darshan = True
+ is_recorder = True
+ multiple_logs = len(paths) > 1
+
+ for path in paths:
+ if path.endswith('.darshan'):
+ if not os.path.isfile(path):
+ print('Unable to open .darshan file.')
+ sys.exit(os.EX_NOINPUT)
+ else:
+ is_darshan = True and is_darshan
+ is_recorder = False and is_recorder
+ else: # check whether is a valid recorder log
+ if not os.path.isdir(path):
+ print('Unable to open recorder folder.')
+ sys.exit(os.EX_NOINPUT)
+ else:
+ is_recorder = True and is_recorder
+ is_darshan = False and is_darshan
+
+ if multiple_logs:
+ if is_darshan:
+ return LOG_TYPE_DARSHAN
+ else:
+ print('Only .darshan files are supported for multiple logs.') #TODO
sys.exit(os.EX_NOINPUT)
- else: return LOG_TYPE_DARSHAN
- else: # check whether is a valid recorder log
- if not os.path.isdir(path):
- print('Unable to open recorder folder.')
+ else:
+ if is_darshan and not is_recorder:
+ return LOG_TYPE_DARSHAN
+ elif is_recorder and not is_darshan:
+ return LOG_TYPE_RECORDER
+ else:
+ print('Unable to reliably determine the log type.')
sys.exit(os.EX_NOINPUT)
- else: return LOG_TYPE_RECORDER
def main():
- log_type = check_log_type(args.log_path)
-
+ log_type = check_log_type(args.log_paths)
+
if log_type == LOG_TYPE_DARSHAN:
from drishti.handlers.handle_darshan import handler
elif log_type == LOG_TYPE_RECORDER:
from drishti.handlers.handle_recorder import handler
-
+
handler()
+
+if __name__ == '__main__':
+ main()
diff --git a/requirements.txt b/requirements.txt
index 65461cb..8700c81 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,5 +1,5 @@
argparse
-darshan==3.4.4.0
+darshan>=3.4.3.0
pandas
rich==12.5.1
recorder-utils
diff --git a/setup.py b/setup.py
index 3e75113..a93a8ce 100644
--- a/setup.py
+++ b/setup.py
@@ -1,4 +1,4 @@
-from setuptools import setup, find_packages
+from setuptools import find_packages, setup
with open("README.md", "r") as f:
long_description = f.read()
@@ -19,7 +19,7 @@
install_requires=[
'argparse',
'pandas',
- 'darshan==3.4.4.0',
+ 'darshan>=3.4.4.0',
'rich==12.5.1',
'recorder-utils',
],