diff --git a/pipeline_dp/dp_engine.py b/pipeline_dp/dp_engine.py index de85126e..20969ed0 100644 --- a/pipeline_dp/dp_engine.py +++ b/pipeline_dp/dp_engine.py @@ -25,86 +25,104 @@ from pipeline_dp import report_generator from pipeline_dp import sampling_utils from pipeline_dp.dataset_histograms import computing_histograms +from pipeline_dp.pipeline_backend import PipelineBackend from pipeline_dp.private_contribution_bounds import PrivateL0Calculator +import itertools +import dataclasses +from typing import Any, Callable, Tuple +import numpy as np + +import pipeline_dp +from pipeline_dp import combiners +import pipeline_dp.report_generator as report_generator + +import pydp.algorithms.partition_selection as partition_selection # type: ignore + + + +@dataclasses.dataclass +class DataExtractors: + """Data extractors. + + A set of functions that, given a piece of input, return the privacy id, + partition key, and value respectively. + """ + privacy_id_extractor: Callable = None + partition_extractor: Callable = None + value_extractor: Callable = None + + + +class Dimensions: + """ + A helper class to specify dimensions for public partitions. + Example: + Dimensions({ + 'country': ['US', 'UK', 'CA'], + 'date': ['2025-01-01', '2025-01-02'] + }) + """ + + def __init__(self, dimensions_dict): + self.dimensions_dict = dimensions_dict + + def get_cartesian_product(self): + keys, values = zip(*self.dimensions_dict.items()) + return [dict(zip(keys, v)) for v in itertools.product(*values)] + class DPEngine: """Performs DP aggregations.""" - def __init__(self, budget_accountant: 'BudgetAccountant', + def __init__(self, budget_accountant: 'budget_accounting.BudgetAccountant', backend: 'PipelineBackend'): self._budget_accountant = budget_accountant self._backend = backend self._report_generators = [] - @property - def _current_report_generator(self): - return self._report_generators[-1] - - def _add_report_generator(self, - params, - method_name: str, - is_public_partition: Optional[bool] = None): - self._report_generators.append( - report_generator.ReportGenerator(params, method_name, - is_public_partition)) - - def _add_report_stage(self, stage_description): - self._current_report_generator.add_stage(stage_description) + def _add_report_stage(self, text): + self._report_generators[-1].add_stage(text) - def _add_report_stages(self, stages_description): - for stage_description in stages_description: - self._add_report_stage(stage_description) - def explain_computations_report(self): - return [ - report_generator.report() - for report_generator in self._report_generators - ] + - def aggregate(self, - col, - params: pipeline_dp.AggregateParams, - data_extractors: pipeline_dp.DataExtractors, - public_partitions=None, - out_explain_computation_report: Optional[ - pipeline_dp.ExplainComputationReport] = None): - """Computes DP aggregate metrics. + def aggregate(self, col, params: pipeline_dp.AggregateParams, + data_extractors: DataExtractors, public_partitions=None): + """ + Computes DP aggregate metrics. Args: - col: collection where all elements are of the same type. - params: specifies which metrics to compute and computation parameters. - data_extractors: functions that extract needed pieces of information - from elements of 'col'. - public_partitions: A collection of partition keys that will be present - in the result. If not provided, partitions will be selected in a DP - manner. - out_explain_computation_report: an output argument, if specified, - it will contain the Explain Computation report for this aggregation. - For more details see the docstring to report_generator.py. - - Returns: - Collection of (partition_key, result_dictionary), where - 'result_dictionary' contains computed metrics per partition_key. - Keys of 'result_dictionary' correspond to computed metrics, e.g. - 'count' for COUNT metrics etc. + col: Collection where all elements are of the same type. + params: Specifies which metrics to compute and computation parameters. + data_extractors: Functions that extract needed pieces of information + from elements of 'col'. + public_partitions: A collection of partition keys or a Dimensions object + specifying public partitions. Optional. If not provided, partitions + will be selected in a DP manner. """ - self._check_aggregate_params(col, params, data_extractors) - self._check_budget_accountant_compatibility( - public_partitions is not None, params.metrics, - params.custom_combiners is not None) + _check_aggregate_params(col, params, data_extractors) with self._budget_accountant.scope(weight=params.budget_weight): - self._add_report_generator(params, "aggregate", public_partitions - is not None) - if out_explain_computation_report is not None: - out_explain_computation_report._set_report_generator( - self._current_report_generator) - col = self._aggregate(col, params, data_extractors, - public_partitions) - budget = self._budget_accountant._compute_budget_for_aggregation( - params.budget_weight) - return self._annotate(col, params=params, budget=budget) + # Step 1: Handle public_partitions as a Cartesian product of dimensions + if isinstance(public_partitions, Dimensions): + public_partitions = self._generate_cartesian_product(public_partitions) + + # Step 2: Filter out non-public partitions if public_partitions is specified + if public_partitions is not None: + col = self._drop_not_public_partitions(col, public_partitions, data_extractors) + col = self._add_empty_public_partitions(col, public_partitions, combiner.create_accumulator) + + # Step 3: Perform aggregation + col = self._aggregate(col, params, data_extractors, public_partitions) + + # Step 4: Compute and annotate budget + budget = self._budget_accountant._compute_budget_for_aggregation(params.budget_weight) + return self._backend.annotate(col, "annotation", params=params, budget=budget) + + def _generate_cartesian_product(self, dimensions: Dimensions): + """Generates the Cartesian product of dimensions specified by the user.""" + return [tuple(partition.values()) for partition in dimensions.get_cartesian_product()] def _aggregate(self, col, params: pipeline_dp.AggregateParams, data_extractors: pipeline_dp.DataExtractors, @@ -628,3 +646,17 @@ def _check_data_extractors(data_extractors: pipeline_dp.DataExtractors): raise ValueError("data_extractors must be set to a DataExtractors") if not isinstance(data_extractors, pipeline_dp.DataExtractors): raise TypeError("data_extractors must be set to a DataExtractors") + + +def _check_aggregate_params(col, params: pipeline_dp.AggregateParams, + data_extractors: DataExtractors): + if col is None or not col: + raise ValueError("col must be non-empty") + if params is None: + raise ValueError("params must be set to a valid AggregateParams") + if not isinstance(params, pipeline_dp.AggregateParams): + raise TypeError("params must be set to a valid AggregateParams") + if data_extractors is None: + raise ValueError("data_extractors must be set to a DataExtractors") + if not isinstance(data_extractors, pipeline_dp.DataExtractors): + raise TypeError("data_extractors must be set to a DataExtractors")