Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 93 additions & 61 deletions pipeline_dp/dp_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,86 +25,104 @@
from pipeline_dp import report_generator
from pipeline_dp import sampling_utils
from pipeline_dp.dataset_histograms import computing_histograms
from pipeline_dp.pipeline_backend import PipelineBackend
from pipeline_dp.private_contribution_bounds import PrivateL0Calculator

import itertools
import dataclasses
from typing import Any, Callable, Tuple
import numpy as np

import pipeline_dp
from pipeline_dp import combiners
import pipeline_dp.report_generator as report_generator

import pydp.algorithms.partition_selection as partition_selection # type: ignore



@dataclasses.dataclass
class DataExtractors:
"""Data extractors.

A set of functions that, given a piece of input, return the privacy id,
partition key, and value respectively.
"""
privacy_id_extractor: Callable = None
partition_extractor: Callable = None
value_extractor: Callable = None



class Dimensions:
"""
A helper class to specify dimensions for public partitions.
Example:
Dimensions({
'country': ['US', 'UK', 'CA'],
'date': ['2025-01-01', '2025-01-02']
})
"""

def __init__(self, dimensions_dict):
self.dimensions_dict = dimensions_dict

def get_cartesian_product(self):
keys, values = zip(*self.dimensions_dict.items())
return [dict(zip(keys, v)) for v in itertools.product(*values)]


class DPEngine:
"""Performs DP aggregations."""

def __init__(self, budget_accountant: 'BudgetAccountant',
def __init__(self, budget_accountant: 'budget_accounting.BudgetAccountant',
backend: 'PipelineBackend'):
self._budget_accountant = budget_accountant
self._backend = backend
self._report_generators = []

@property
def _current_report_generator(self):
return self._report_generators[-1]

def _add_report_generator(self,
params,
method_name: str,
is_public_partition: Optional[bool] = None):
self._report_generators.append(
report_generator.ReportGenerator(params, method_name,
is_public_partition))

def _add_report_stage(self, stage_description):
self._current_report_generator.add_stage(stage_description)
def _add_report_stage(self, text):
self._report_generators[-1].add_stage(text)

def _add_report_stages(self, stages_description):
for stage_description in stages_description:
self._add_report_stage(stage_description)

def explain_computations_report(self):
return [
report_generator.report()
for report_generator in self._report_generators
]


def aggregate(self,
col,
params: pipeline_dp.AggregateParams,
data_extractors: pipeline_dp.DataExtractors,
public_partitions=None,
out_explain_computation_report: Optional[
pipeline_dp.ExplainComputationReport] = None):
"""Computes DP aggregate metrics.
def aggregate(self, col, params: pipeline_dp.AggregateParams,
data_extractors: DataExtractors, public_partitions=None):
"""
Computes DP aggregate metrics.

Args:
col: collection where all elements are of the same type.
params: specifies which metrics to compute and computation parameters.
data_extractors: functions that extract needed pieces of information
from elements of 'col'.
public_partitions: A collection of partition keys that will be present
in the result. If not provided, partitions will be selected in a DP
manner.
out_explain_computation_report: an output argument, if specified,
it will contain the Explain Computation report for this aggregation.
For more details see the docstring to report_generator.py.

Returns:
Collection of (partition_key, result_dictionary), where
'result_dictionary' contains computed metrics per partition_key.
Keys of 'result_dictionary' correspond to computed metrics, e.g.
'count' for COUNT metrics etc.
col: Collection where all elements are of the same type.
params: Specifies which metrics to compute and computation parameters.
data_extractors: Functions that extract needed pieces of information
from elements of 'col'.
public_partitions: A collection of partition keys or a Dimensions object
specifying public partitions. Optional. If not provided, partitions
will be selected in a DP manner.
"""
self._check_aggregate_params(col, params, data_extractors)
self._check_budget_accountant_compatibility(
public_partitions is not None, params.metrics,
params.custom_combiners is not None)
_check_aggregate_params(col, params, data_extractors)

with self._budget_accountant.scope(weight=params.budget_weight):
self._add_report_generator(params, "aggregate", public_partitions
is not None)
if out_explain_computation_report is not None:
out_explain_computation_report._set_report_generator(
self._current_report_generator)
col = self._aggregate(col, params, data_extractors,
public_partitions)
budget = self._budget_accountant._compute_budget_for_aggregation(
params.budget_weight)
return self._annotate(col, params=params, budget=budget)
# Step 1: Handle public_partitions as a Cartesian product of dimensions
if isinstance(public_partitions, Dimensions):
public_partitions = self._generate_cartesian_product(public_partitions)

# Step 2: Filter out non-public partitions if public_partitions is specified
if public_partitions is not None:
col = self._drop_not_public_partitions(col, public_partitions, data_extractors)
col = self._add_empty_public_partitions(col, public_partitions, combiner.create_accumulator)

# Step 3: Perform aggregation
col = self._aggregate(col, params, data_extractors, public_partitions)

# Step 4: Compute and annotate budget
budget = self._budget_accountant._compute_budget_for_aggregation(params.budget_weight)
return self._backend.annotate(col, "annotation", params=params, budget=budget)

def _generate_cartesian_product(self, dimensions: Dimensions):
"""Generates the Cartesian product of dimensions specified by the user."""
return [tuple(partition.values()) for partition in dimensions.get_cartesian_product()]

def _aggregate(self, col, params: pipeline_dp.AggregateParams,
data_extractors: pipeline_dp.DataExtractors,
Expand Down Expand Up @@ -628,3 +646,17 @@ def _check_data_extractors(data_extractors: pipeline_dp.DataExtractors):
raise ValueError("data_extractors must be set to a DataExtractors")
if not isinstance(data_extractors, pipeline_dp.DataExtractors):
raise TypeError("data_extractors must be set to a DataExtractors")


def _check_aggregate_params(col, params: pipeline_dp.AggregateParams,
data_extractors: DataExtractors):
if col is None or not col:
raise ValueError("col must be non-empty")
if params is None:
raise ValueError("params must be set to a valid AggregateParams")
if not isinstance(params, pipeline_dp.AggregateParams):
raise TypeError("params must be set to a valid AggregateParams")
if data_extractors is None:
raise ValueError("data_extractors must be set to a DataExtractors")
if not isinstance(data_extractors, pipeline_dp.DataExtractors):
raise TypeError("data_extractors must be set to a DataExtractors")