diff --git a/.gitignore b/.gitignore index bc0f006..0ee1f85 100644 --- a/.gitignore +++ b/.gitignore @@ -162,3 +162,4 @@ cython_debug/ db-dump/ postgres-data/ +.vscode/ diff --git a/README.md b/README.md index ca15557..bf837bc 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,5 @@
Open-source Data integration platform diff --git a/docs/expectations.md b/docs/expectations.md new file mode 100644 index 0000000..7a49159 --- /dev/null +++ b/docs/expectations.md @@ -0,0 +1,242 @@ +# Expectations Module + +## Overview + +The `Expectations` class provides a structured way to **validate datasets** against defined data quality rules using [Great Expectations](https://greatexpectations.io/). + +It supports both **DataFrame-level** and **Column-level** checks, with validation rules defined in an external `expectations.yml` file. + +The class supports datasets in both **pandas** and **polars**, automatically normalizing to pandas for validation. + +### Features + +- **DataFrame-level checks** + - Validate row/column count + - Validate emptiness/non-emptiness +- **Column-level checks** + - Column existence + - Data type enforcement + - Numeric range validation + - Nullability checks + - Allowed categorical values + - String length validation (fixed length or range) + +--- + +## Installation Requirements + +Ensure the following dependencies are installed: + +```bash +pip install pandas polars pyyaml great-expectations +```` + +--- + +## Class: `Expectations` + +### Initialization + +```python +Expectations( + dataset: pd.DataFrame | pl.DataFrame, + expectations_yml_file: str | None = None +) +``` + +#### Parameters + +* **dataset** (`pd.DataFrame | pl.DataFrame`) + The dataset to validate. Both pandas and polars are supported. + If a `polars.DataFrame` is provided, it will be automatically converted to pandas for validation. + +* **expectations\_yml\_file** (`str | None`, optional) + Path to the expectations YAML file. + If not provided, defaults to `expectations.yml` located in the caller’s directory. + +#### Raises + +* `ValueError` + + * If `dataset` is not a pandas or polars DataFrame + * If `expectations_yml_file` is not a string +* `FileNotFoundError` + If `expectations.yml` file is missing +* `yaml.YAMLError` or `ValueError` + If the YAML file cannot be parsed or is missing required sections + +--- + +## YAML Expectations Schema + +The `expectations.yml` file must define **two sections**: + +```yaml +dataframe: + size: not empty # or empty + no_columns: 5 + no_rows: 3 + +columns: + age: + type: int64 + minimum: 18 + maximum: 70 + not-null: true + + height: + type: int64 + minimum: 5 + maximum: 8 + not-null: false + + gender: + type: object + classes: + - male + - female + - other + not-null: false + + phone: + type: object + not-null: false + length-between: + - 10 + - 13 + + shirt_size: + type: object + classes: + - s + - m + - l + not-null: true + length-between: + - 1 # exact length of 1 +``` + +--- + +## Supported Expectations Mapping + +The YAML configuration is translated into the following **Great Expectations classes**: + +| YAML Key | Great Expectations Class | +| ---------------------------- | ----------------------------------------------------------------------------- | +| `type` | `ExpectColumnValuesToBeOfType` | +| `minimum` / `maximum` | `ExpectColumnValuesToBeBetween` (only for numeric types) | +| `not-null: true` | `ExpectColumnValuesToNotBeNull` | +| `classes` | `ExpectColumnDistinctValuesToBeInSet` | +| `length-between: [N]` | `ExpectColumnValueLengthsToEqual` (exact length `N`) | +| `length-between: [min, max]` | `ExpectColumnValueLengthsToBeBetween` (string length between `min` and `max`) | + +At the **DataFrame-level** (outside columns), the following checks are enforced internally: + +* `size: not empty` → raises `ValueError` if DataFrame is empty +* `size: empty` → raises `ValueError` if DataFrame is not empty +* `no_columns` → raises `ValueError` if column count mismatches +* `no_rows` → raises `ValueError` if row count mismatches + +--- + +## Methods + +### `_read_definitions() -> dict` + +Loads and validates expectations from the YAML file. + +#### Returns + +* Dictionary containing expectations. + +#### Raises + +* `FileNotFoundError`: If YAML file not found +* `ValueError`: If YAML is invalid or missing required sections (`dataframe`, `columns`) + +--- + +### `validate_expectations()` + +Validates the dataset against defined expectations. + +#### Performs + +* **DataFrame-level checks** + + * Enforces `size` (empty / not empty) + * Enforces `no_rows` and `no_columns` + +* **Column-level checks** + + * Ensures required columns exist + * Validates data types + * Validates numeric ranges + * Enforces `not-null` + * Restricts categorical values + * Validates string length constraints + +#### Raises + +* `ValueError`: If dataset does not meet defined expectations +* `Exception`: If Great Expectations validation checkpoint fails + +--- + +## Example Usage + +```python +import polars as pl +from expectations import Expectations + +# Example dataset +df = pl.DataFrame( + { + "age": [19, 20, 30], + "height": [7, 5, 6], + "gender": ["male", "female", None], + "phone": ["0711222333", "0722111333", "+256744123432"], + "shirt_size": ["s", "m", "l"], + } +) + +# Initialize with default expectations.yml in caller's directory +validator = Expectations(df) + +# Run validation +validator.validate_expectations() +``` + +--- + +## Output + +On execution, a Great Expectations **checkpoint run report** is generated. +If validation fails, an exception is raised with a detailed message. + +Example success log: + +```text +INFO:root:Data passed validation check. +``` + +Example failure: + +```text +Exception: Data failed validation check! +{ + "success": false, + "results": [...] +} +``` + +--- + +## Best Practices + +* Store `expectations.yml` alongside your pipeline scripts for maintainability. +* Version control `expectations.yml` to track schema changes over time. +* Start with broad rules (row/column counts, non-null constraints) and refine incrementally. +* Use `polars` for data wrangling if performance is critical — the class will handle conversion to pandas for validation. + diff --git a/openhexa/toolbox/dhis2/dhis2.py b/openhexa/toolbox/dhis2/dhis2.py index 62741de..abfdfba 100644 --- a/openhexa/toolbox/dhis2/dhis2.py +++ b/openhexa/toolbox/dhis2/dhis2.py @@ -183,9 +183,11 @@ def organisation_unit_groups( def format_unit_group(group: Dict[str, Any], fields: str) -> Dict[str, Any]: return { - key: group.get(key) - if key != "organisationUnits" - else [ou.get("id") for ou in group.get("organisationUnits", [])] + key: ( + group.get(key) + if key != "organisationUnits" + else [ou.get("id") for ou in group.get("organisationUnits", [])] + ) for key in fields.split(",") } @@ -494,9 +496,11 @@ def indicator_groups( def format_group(group: Dict[str, Any], fields: str) -> Dict[str, Any]: return { - key: group.get(key) - if key != "indicators" - else [indicator.get("id") for indicator in group.get("indicators", [])] + key: ( + group.get(key) + if key != "indicators" + else [indicator.get("id") for indicator in group.get("indicators", [])] + ) for key in fields.split(",") } diff --git a/openhexa/toolbox/expectation/expectations.py b/openhexa/toolbox/expectation/expectations.py new file mode 100644 index 0000000..7960b80 --- /dev/null +++ b/openhexa/toolbox/expectation/expectations.py @@ -0,0 +1,248 @@ +import inspect +import pathlib +import json +import logging + +import great_expectations as gx +import pandas as pd +import polars as pl +import yaml +logging.basicConfig() +logger = logging.getLogger() +logger.setLevel(logging.INFO) + + +class Expectations: + """A utility class for validating the quality of a dataset against a set of expectations. + + This class supports: + - DataFrame-level checks (row count, column count, emptiness). + - Column-level checks (type, numeric ranges, allowed values, nullability, string length). + """ + + def __init__(self, dataset: pd.DataFrame | pl.DataFrame, expectations_yml_file: str | None = None): + """Initialize the Expectations validator. + + Args: + dataset (pd.DataFrame): The dataset to validate. + expectations_yml_file (str | None, optional): Path to expectations YAML file. + If not provided, defaults to `expectations.yml` in the caller's directory. + + Raises: + ValueError: If `dataset` is not a pandas DataFrame or + if `expectations_yml_file` is not a string. + """ + if not isinstance(expectations_yml_file, str) and expectations_yml_file is not None: + raise ValueError("expectations_yml_file should be a string") + + if not isinstance(dataset, pd.DataFrame) and not isinstance(dataset, pl.DataFrame): + raise ValueError("dataset should be a pandas or polars dataframe") + + if expectations_yml_file is None: + caller_file = inspect.stack()[1].filename + caller_dir = pathlib.Path(pathlib.Path(caller_file).resolve()).parent + self.expectations_yml_file = f"{caller_dir}/expectations.yml" + else: + self.expectations_yml_file = expectations_yml_file + if isinstance(dataset, pd.DataFrame): + self.dataset = dataset + elif isinstance(dataset, pl.DataFrame): + logger.info("Converting polars dataframe to pandas for data validation") + self.dataset = dataset.to_pandas() + # normalize to pandas dtypes + self.numeric_types = {"int64", "int32", "float64", "float32"} + self.string_types = {"object", "string"} + + def _read_definitions(self) -> dict: + """Load expectations definitions from the YAML file. + + Returns: + dict: Parsed expectations dictionary. + + Raises: + FileNotFoundError: If expectations.yml does not exist. + ValueError: If parsing fails or required keys are missing. + """ + try: + with pathlib.Path(self.expectations_yml_file).open(encoding="utf-8") as file: + expectations = yaml.safe_load(file) or {} + except FileNotFoundError: + raise FileNotFoundError("Error: 'expectations.yml' not found.") from None + except yaml.YAMLError as e: + raise ValueError(f"Error parsing 'expectations.yml' file: {e}") from e + + # basic schema guard + if "dataframe" not in expectations: + raise ValueError("expectations.yml must contain 'dataframe' section.") + if "columns" not in expectations: + raise ValueError("expectations.yml must contain 'columns' section.") + + return expectations + + def validate_expectations(self): + """Validate the dataset against expectations defined in the YAML file. + + - DataFrame-level checks: + * Size (empty or not empty) + * Number of rows + * Number of columns + + - Column-level checks: + * Existence of expected columns + * Data type enforcement + * Numeric ranges (minimum, maximum) + * Nullability + * Allowed categorical values (classes) + * String length constraints + + Raises: + ValueError: If expectations do not match dataset properties. + """ + context = gx.get_context() + data_source = context.data_sources.add_pandas(name="pandas") + data_asset = data_source.add_dataframe_asset(name="pd_dataframe_asset") + + batch_definition = data_asset.add_batch_definition_whole_dataframe("batch-def") + batch_parameters = {"dataframe": self.dataset} + batch = batch_definition.get_batch(batch_parameters=batch_parameters) # noqa: F841 + + expectations = self._read_definitions() + + # ------------------------ + # Dataframe-level checks + # ------------------------ + size_expect = expectations["dataframe"].get("size") + if size_expect == "not empty" and self.dataset.empty: + raise ValueError("DataFrame is empty but expectations require non-empty.") + if size_expect == "empty" and not self.dataset.empty: + raise ValueError("DataFrame is not empty but expectations require empty.") + + expected_no_columns = expectations["dataframe"].get("no_columns") + if expected_no_columns is not None: + if self.dataset.shape[1] != int(expected_no_columns): + raise ValueError(f"Columns mismatch: expected {expected_no_columns}, got {self.dataset.shape[1]}") + + expected_no_rows = expectations["dataframe"].get("no_rows") + if expected_no_rows is not None: + if self.dataset.shape[0] != int(expected_no_rows): + raise ValueError(f"Rows mismatch: expected {expected_no_rows}, got {self.dataset.shape[0]}") + + # ------------------------ + # Column-level checks + # ------------------------ + suite = context.suites.add(gx.core.expectation_suite.ExpectationSuite(name="expectations_suite")) + + for column, column_expectation in expectations["columns"].items(): + # column presence + if column not in self.dataset.columns: + raise ValueError( + f""" + Column '{column}' defined in expectations.yml but missing in dataset. + """ + ) + + col_type = column_expectation.get("type") + + # ------------------------ + # datatype + # ------------------------ + if col_type: + suite.add_expectation(gx.expectations.ExpectColumnValuesToBeOfType(column=column, type_=col_type)) + + # ------------------------ + # numeric ranges + # ------------------------ + if col_type in self.numeric_types: + min_val = column_expectation.get("minimum") + max_val = column_expectation.get("maximum") + + if min_val is not None or max_val is not None: + suite.add_expectation( + gx.expectations.ExpectColumnValuesToBeBetween( + column=column, + min_value=min_val, + max_value=max_val, + ) + ) + + # ------------------------ + # not-null + # ------------------------ + if column_expectation.get("not-null", False): + suite.add_expectation(gx.expectations.ExpectColumnValuesToNotBeNull(column=column)) + + # ------------------------ + # string expectations + # ------------------------ + if col_type in self.string_types: + classes = column_expectation.get("classes") + if classes: + suite.add_expectation( + gx.expectations.ExpectColumnDistinctValuesToBeInSet(column=column, value_set=classes) + ) + + length_between = column_expectation.get("length-between") + if length_between: + if not isinstance(length_between, (list, tuple)): + raise ValueError( + f""" + 'length-between' for column {column} must be a list or tuple. + """ + ) + if len(length_between) == 1: + suite.add_expectation( + gx.expectations.ExpectColumnValueLengthsToEqual(column=column, value=length_between[0]) + ) + elif len(length_between) == 2: + suite.add_expectation( + gx.expectations.ExpectColumnValueLengthsToBeBetween( + column=column, + min_value=min(length_between), + max_value=max(length_between), + ) + ) + else: + raise ValueError( + f""" + Column {column}: 'length-between' should have 1 or 2 entries. + """ + ) + + # ------------------------ + # Validation definition + # ------------------------ + validation_definition = context.validation_definitions.add( + gx.core.validation_definition.ValidationDefinition( + name="validation definition", data=batch_definition, suite=suite + ) + ) + + checkpoint = context.checkpoints.add( + gx.checkpoint.checkpoint.Checkpoint(name="context", validation_definitions=[validation_definition]) + ) + checkpoint_result = checkpoint.run(batch_parameters=batch_parameters) + result = checkpoint_result.describe() + + if not json.loads(result)["success"]: + exception_message = \ + f""" Data failed validation check! + {result} + """ + raise Exception(exception_message) + + logger.info("Data passed validation check.") + + + +if __name__ == "__main__": + df = pl.DataFrame( + { + "age": [19, 20, 30], + "height": [7, 5, 6], + "gender": ["male", "female", None], + "phone": ["0711222333", "0722111333", "+256744123432"], + "shirt_size": ["s", "m", "l"], + } + ) + validator = Expectations(df) + validator.validate_expectations() diff --git a/openhexa/toolbox/expectation/expectations.yml b/openhexa/toolbox/expectation/expectations.yml new file mode 100644 index 0000000..12bfa8b --- /dev/null +++ b/openhexa/toolbox/expectation/expectations.yml @@ -0,0 +1,40 @@ +dataframe: + size: not empty + no_columns: 5 + no_rows: 3 + +columns: + age: + type: int64 + minimum: 18 + maximum: 70 + not-null: true + height: + type: int64 + minimum: 5 + maximum: 8 + not-null: false + gender: + type: object + classes: + - male + - female + - other + not-null: false + length-between: + phone: + type: object + classes: + not-null: false + length-between: + - 10 + - 13 + shirt_size: + type: object + classes: + - s + - m + - l + not-null: true + length-between: + - 1 diff --git a/pyproject.toml b/pyproject.toml index 2050596..00221eb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -42,6 +42,7 @@ dev = [ "black~=25.1.0", "pre-commit", "responses", + "great_expectations", ] test = [ @@ -87,6 +88,7 @@ include = [ "openhexa.toolbox.iaso", "openhexa.toolbox.kobo", "openhexa.toolbox.lineage", + "openhexa.toolbox.expectation", ] namespaces = true @@ -108,4 +110,4 @@ max-doc-length = 120 [tool.pytest.ini_options] addopts = ["--import-mode=importlib"] -testpaths = ["tests/dhis2", "tests/era5", "tests/hexa", "tests/iaso"] +testpaths = ["tests/dhis2", "tests/era5", "tests/hexa", "tests/iaso", "tests/expectations"] diff --git a/tests/expectations/test_unit_expectations.py b/tests/expectations/test_unit_expectations.py new file mode 100644 index 0000000..b8d1ebf --- /dev/null +++ b/tests/expectations/test_unit_expectations.py @@ -0,0 +1,98 @@ +""" +Unit tests for the Expectations class. + +These tests focus on validating internal logic, argument validation, +and YAML schema checks without running full Great Expectations validation. +""" + +from pathlib import Path + +import pandas as pd +import pytest +import yaml + +from openhexa.toolbox.expectation.expectations import Expectations + + +# ----------------- +# Fixtures +# ----------------- + +@pytest.fixture +def sample_df() -> pd.DataFrame: + """A simple sample DataFrame for unit tests.""" + return pd.DataFrame( + { + "age": [25, 30, 40], + "height": [5.5, 6.1, 5.9], + "gender": ["male", "female", "male"], + } + ) + + +@pytest.fixture +def tmp_expectations_file(tmp_path: Path) -> str: + """A temporary expectations.yml file with a minimal valid schema.""" + expectations = { + "dataframe": {"size": "not empty", "no_columns": 3, "no_rows": 3}, + "columns": { + "age": {"type": "int64", "minimum": 20, "maximum": 60, "not-null": True}, + "height": {"type": "float64", "minimum": 5, "maximum": 7, "not-null": False}, + "gender": { + "type": "object", + "classes": ["male", "female"], + "not-null": True, + }, + }, + } + file_path = tmp_path / "expectations.yml" + with file_path.open("w") as f: + yaml.safe_dump(expectations, f) + return str(file_path) + + +def _write_yaml(tmp_path: Path, content: dict, filename: str = "bad.yml") -> str: + """Utility to write a YAML file and return its path.""" + file_path = tmp_path / filename + with file_path.open("w") as f: + yaml.safe_dump(content, f) + return str(file_path) + + +# ----------------- +# Tests +# ----------------- + +def test_init_with_invalid_dataset_type(): + """Fails if dataset is not a pandas DataFrame.""" + with pytest.raises(ValueError, match="dataset should be a pandas or polars dataframe"): + Expectations(dataset="not-a-df", expectations_yml_file=None) + + +def test_init_with_invalid_file_type(sample_df: pd.DataFrame): + """Fails if expectations_yml_file is not a string.""" + with pytest.raises(ValueError, match="expectations_yml_file should be a string"): + Expectations(dataset=sample_df, expectations_yml_file=123) + + +def test_missing_yaml_file(sample_df: pd.DataFrame): + """Fails if expectations.yml file does not exist.""" + validator = Expectations(sample_df, "non_existent.yml") + with pytest.raises(FileNotFoundError): + validator._read_definitions() + + +def test_missing_dataframe_section(sample_df: pd.DataFrame, tmp_path: Path): + """Fails if 'dataframe' section is missing from expectations.yml.""" + bad_file = _write_yaml(tmp_path, {"columns": {}}) + validator = Expectations(sample_df, bad_file) + with pytest.raises(ValueError, match="must contain 'dataframe'"): + validator._read_definitions() + + +def test_missing_column_section(sample_df: pd.DataFrame, tmp_path: Path): + """Fails if 'columns' section is missing from expectations.yml.""" + bad_file = _write_yaml(tmp_path, {"dataframe": {}}) + validator = Expectations(sample_df, bad_file) + with pytest.raises(ValueError, match="must contain 'columns'"): + validator._read_definitions() diff --git a/tests/lineage/test_lineage.py b/tests/lineage/test_lineage.py index 0b04bc1..f0f01bc 100644 --- a/tests/lineage/test_lineage.py +++ b/tests/lineage/test_lineage.py @@ -96,18 +96,16 @@ def test_unique_run_ids_for_different_tasks(self, mock_responses): outputs=["dataset2.csv"], ) - assert len(mock_responses.calls) == 3 - + event1 = json.loads(mock_responses.calls[0].request.body) event2 = json.loads(mock_responses.calls[1].request.body) event3 = json.loads(mock_responses.calls[2].request.body) - assert event1["run"]["runId"] == "381a6a74-ad3b-549c-967b-58585197f90a" assert event2["run"]["runId"] == "d8bb693b-23bb-54f0-8605-29032f10d5d5" - assert event3["run"]["runId"] == "381a6a74-ad3b-549c-967b-58585197f90a" # Same task, same run ID - + assert event3["run"]["runId"] == "381a6a74-ad3b-549c-967b-58585197f90a" # Same task, same run ID + assert event1["outputs"][0]["name"] == "dataset1.csv" assert event2["outputs"][0]["name"] == "dataset2.csv" assert event3["outputs"][0]["name"] == "dataset2.csv"