diff --git a/dask_sql/physical/rel/custom/create_experiment.py b/dask_sql/physical/rel/custom/create_experiment.py index 642456937..3d510ac18 100644 --- a/dask_sql/physical/rel/custom/create_experiment.py +++ b/dask_sql/physical/rel/custom/create_experiment.py @@ -168,12 +168,7 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai f"Can not import tuner {experiment_class}. Make sure you spelled it correctly and have installed all packages." ) - try: - from dask_ml.wrappers import ParallelPostFit - except ImportError: # pragma: no cover - raise ValueError( - "dask_ml must be installed to use automl and tune hyperparameters" - ) + from dask_sql.physical.rel.custom.wrappers import ParallelPostFit model = ModelClass() @@ -199,12 +194,7 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai f"Can not import automl model {automl_class}. Make sure you spelled it correctly and have installed all packages." ) - try: - from dask_ml.wrappers import ParallelPostFit - except ImportError: # pragma: no cover - raise ValueError( - "dask_ml must be installed to use automl and tune hyperparameters" - ) + from dask_sql.physical.rel.custom.wrappers import ParallelPostFit automl = AutoMLClass(**automl_kwargs) # should be avoided if data doesn't fit in memory diff --git a/dask_sql/physical/rel/custom/create_model.py b/dask_sql/physical/rel/custom/create_model.py index 2e6cdeb0a..179dd7971 100644 --- a/dask_sql/physical/rel/custom/create_model.py +++ b/dask_sql/physical/rel/custom/create_model.py @@ -1,6 +1,7 @@ import logging from typing import TYPE_CHECKING +import numpy as np from dask import delayed from dask_sql.datacontainer import DataContainer @@ -43,7 +44,7 @@ class CreateModelPlugin(BaseRelPlugin): unsupervised algorithms). This means, you typically want to set this parameter. * wrap_predict: Boolean flag, whether to wrap the selected - model with a :class:`dask_ml.wrappers.ParallelPostFit`. + model with a :class:`dask_sql.physical.rel.custom.wrappers.ParallelPostFit`. Have a look into the [dask-ml docu](https://ml.dask.org/meta-estimators.html#parallel-prediction-and-transformation) to learn more about it. Defaults to false. Typically you set @@ -165,10 +166,7 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai model = Incremental(estimator=model) if wrap_predict: - try: - from dask_ml.wrappers import ParallelPostFit - except ImportError: # pragma: no cover - raise ValueError("Wrapping requires dask-ml to be installed.") + from dask_sql.physical.rel.custom.wrappers import ParallelPostFit # When `wrap_predict` is set to True we train on single partition frames # because this is only useful for non dask distributed models @@ -183,7 +181,16 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai delayed_model = [delayed(model.fit)(x_p, y_p) for x_p, y_p in zip(X_d, y_d)] model = delayed_model[0].compute() - model = ParallelPostFit(estimator=model) + if "sklearn" in model_class: + output_meta = np.array([]) + model = ParallelPostFit( + estimator=model, + predict_meta=output_meta, + predict_proba_meta=output_meta, + transform_meta=output_meta, + ) + else: + model = ParallelPostFit(estimator=model) else: model.fit(X, y, **fit_kwargs) diff --git a/dask_sql/physical/rel/custom/wrappers.py b/dask_sql/physical/rel/custom/wrappers.py new file mode 100644 index 000000000..7ed0d0dea --- /dev/null +++ b/dask_sql/physical/rel/custom/wrappers.py @@ -0,0 +1,497 @@ +# Copyright 2017, Dask developers +# Dask-ML project - https://github.com/dask/dask-ml +"""Meta-estimators for parallelizing estimators using the scikit-learn API.""" +import logging +import warnings + +import dask.array as da +import dask.dataframe as dd +import dask.delayed +import numpy as np + +try: + import sklearn.base + import sklearn.metrics +except ImportError: # pragma: no cover + raise ImportError("sklearn must be installed") + +logger = logging.getLogger(__name__) + + +class ParallelPostFit(sklearn.base.BaseEstimator, sklearn.base.MetaEstimatorMixin): + """Meta-estimator for parallel predict and transform. + + Parameters + ---------- + estimator : Estimator + The underlying estimator that is fit. + + scoring : string or callable, optional + A single string (see :ref:`scoring_parameter`) or a callable + (see :ref:`scoring`) to evaluate the predictions on the test set. + + For evaluating multiple metrics, either give a list of (unique) + strings or a dict with names as keys and callables as values. + + NOTE that when using custom scorers, each scorer should return a + single value. Metric functions returning a list/array of values + can be wrapped into multiple scorers that return one value each. + + See :ref:`multimetric_grid_search` for an example. + + .. warning:: + + If None, the estimator's default scorer (if available) is used. + Most scikit-learn estimators will convert large Dask arrays to + a single NumPy array, which may exhaust the memory of your worker. + You probably want to always specify `scoring`. + + predict_meta: pd.Series, pd.DataFrame, np.array deafult: None(infer) + An empty ``pd.Series``, ``pd.DataFrame``, ``np.array`` that matches the output + type of the estimators ``predict`` call. + This meta is necessary for for some estimators to work with + ``dask.dataframe`` and ``dask.array`` + + predict_proba_meta: pd.Series, pd.DataFrame, np.array deafult: None(infer) + An empty ``pd.Series``, ``pd.DataFrame``, ``np.array`` that matches the output + type of the estimators ``predict_proba`` call. + This meta is necessary for for some estimators to work with + ``dask.dataframe`` and ``dask.array`` + + transform_meta: pd.Series, pd.DataFrame, np.array deafult: None(infer) + An empty ``pd.Series``, ``pd.DataFrame``, ``np.array`` that matches the output + type of the estimators ``transform`` call. + This meta is necessary for for some estimators to work with + ``dask.dataframe`` and ``dask.array`` + + """ + + class_name = "ParallelPostFit" + + def __init__( + self, + estimator=None, + scoring=None, + predict_meta=None, + predict_proba_meta=None, + transform_meta=None, + ): + self.estimator = estimator + self.scoring = scoring + self.predict_meta = predict_meta + self.predict_proba_meta = predict_proba_meta + self.transform_meta = transform_meta + + def _check_array(self, X): + """Validate an array for post-fit tasks. + + Parameters + ---------- + X : Union[Array, DataFrame] + + Returns + ------- + same type as 'X' + + Notes + ----- + The following checks are applied. + + - Ensure that the array is blocked only along the samples. + """ + if isinstance(X, da.Array): + if X.ndim == 2 and X.numblocks[1] > 1: + logger.debug("auto-rechunking 'X'") + if not np.isnan(X.chunks[0]).any(): + X = X.rechunk({0: "auto", 1: -1}) + else: + X = X.rechunk({1: -1}) + return X + + @property + def _postfit_estimator(self): + # The estimator instance to use for postfit tasks like score + return self.estimator + + def fit(self, X, y=None, **kwargs): + """Fit the underlying estimator. + + Parameters + ---------- + X, y : array-like + **kwargs + Additional fit-kwargs for the underlying estimator. + + Returns + ------- + self : object + """ + logger.info("Starting fit") + result = self.estimator.fit(X, y, **kwargs) + + # Copy over learned attributes + copy_learned_attributes(result, self) + copy_learned_attributes(result, self.estimator) + return self + + def partial_fit(self, X, y=None, **kwargs): + logger.info("Starting partial_fit") + result = self.estimator.partial_fit(X, y, **kwargs) + + # Copy over learned attributes + copy_learned_attributes(result, self) + copy_learned_attributes(result, self.estimator) + return self + + def transform(self, X): + """Transform block or partition-wise for dask inputs. + + For dask inputs, a dask array or dataframe is returned. For other + inputs (NumPy array, pandas dataframe, scipy sparse matrix), the + regular return value is returned. + + If the underlying estimator does not have a ``transform`` method, then + an ``AttributeError`` is raised. + + Parameters + ---------- + X : array-like + + Returns + ------- + transformed : array-like + """ + self._check_method("transform") + X = self._check_array(X) + output_meta = self.transform_meta + + if isinstance(X, da.Array): + if output_meta is None: + output_meta = _get_output_dask_ar_meta_for_estimator( + _transform, self._postfit_estimator, X + ) + return X.map_blocks( + _transform, estimator=self._postfit_estimator, meta=output_meta + ) + elif isinstance(X, dd._Frame): + if output_meta is None: + output_meta = _transform(X._meta_nonempty, self._postfit_estimator) + try: + return X.map_partitions( + _transform, + self._postfit_estimator, + output_meta, + meta=output_meta, + ) + except ValueError: + if output_meta is None: + # dask-dataframe relies on dd.core.no_default + # for infering meta + output_meta = dd.core.no_default + return X.map_partitions( + _transform, estimator=self._postfit_estimator, meta=output_meta + ) + else: + return _transform(X, estimator=self._postfit_estimator) + + def score(self, X, y, compute=True): + """Returns the score on the given data. + + Parameters + ---------- + X : array-like, shape = [n_samples, n_features] + Input data, where n_samples is the number of samples and + n_features is the number of features. + + y : array-like, shape = [n_samples] or [n_samples, n_output], optional + Target relative to X for classification or regression; + None for unsupervised learning. + + Returns + ------- + score : float + return self.estimator.score(X, y) + """ + scoring = self.scoring + X = self._check_array(X) + y = self._check_array(y) + + if not scoring: + if type(self._postfit_estimator).score == sklearn.base.RegressorMixin.score: + scoring = "r2" + elif ( + type(self._postfit_estimator).score + == sklearn.base.ClassifierMixin.score + ): + scoring = "accuracy" + else: + scoring = self.scoring + + if scoring: + if not dask.is_dask_collection(X) and not dask.is_dask_collection(y): + scorer = sklearn.metrics.get_scorer(scoring) + else: + # TODO: implement Dask-ML's get_scorer() function + # scorer = get_scorer(scoring, compute=compute) + raise NotImplementedError("get_scorer function not implemented") + return scorer(self, X, y) + else: + return self._postfit_estimator.score(X, y) + + def predict(self, X): + """Predict for X. + + For dask inputs, a dask array or dataframe is returned. For other + inputs (NumPy array, pandas dataframe, scipy sparse matrix), the + regular return value is returned. + + Parameters + ---------- + X : array-like + + Returns + ------- + y : array-like + """ + self._check_method("predict") + X = self._check_array(X) + output_meta = self.predict_meta + + if isinstance(X, da.Array): + if output_meta is None: + output_meta = _get_output_dask_ar_meta_for_estimator( + _predict, self._postfit_estimator, X + ) + + result = X.map_blocks( + _predict, + estimator=self._postfit_estimator, + drop_axis=1, + meta=output_meta, + ) + return result + + elif isinstance(X, dd._Frame): + if output_meta is None: + # dask-dataframe relies on dd.core.no_default + # for infering meta + output_meta = _predict(X._meta_nonempty, self._postfit_estimator) + try: + return X.map_partitions( + _predict, + self._postfit_estimator, + output_meta, + meta=output_meta, + ) + except ValueError: + if output_meta is None: + output_meta = dd.core.no_default + return X.map_partitions( + _predict, estimator=self._postfit_estimator, meta=output_meta + ) + else: + return _predict(X, estimator=self._postfit_estimator) + + def predict_proba(self, X): + """Probability estimates. + + For dask inputs, a dask array or dataframe is returned. For other + inputs (NumPy array, pandas dataframe, scipy sparse matrix), the + regular return value is returned. + + If the underlying estimator does not have a ``predict_proba`` + method, then an ``AttributeError`` is raised. + + Parameters + ---------- + X : array or dataframe + + Returns + ------- + y : array-like + """ + X = self._check_array(X) + + self._check_method("predict_proba") + + output_meta = self.predict_proba_meta + + if isinstance(X, da.Array): + if output_meta is None: + output_meta = _get_output_dask_ar_meta_for_estimator( + _predict_proba, self._postfit_estimator, X + ) + # XXX: multiclass + return X.map_blocks( + _predict_proba, + estimator=self._postfit_estimator, + meta=output_meta, + chunks=(X.chunks[0], len(self._postfit_estimator.classes_)), + ) + elif isinstance(X, dd._Frame): + if output_meta is None: + # dask-dataframe relies on dd.core.no_default + # for infering meta + output_meta = _predict_proba(X._meta_nonempty, self._postfit_estimator) + try: + return X.map_partitions( + _predict_proba, + self._postfit_estimator, + output_meta, + meta=output_meta, + ) + except ValueError: + if output_meta is None: + output_meta = dd.core.no_default + return X.map_partitions( + _predict_proba, estimator=self._postfit_estimator, meta=output_meta + ) + else: + return _predict_proba(X, estimator=self._postfit_estimator) + + def predict_log_proba(self, X): + """Log of probability estimates. + + For dask inputs, a dask array or dataframe is returned. For other + inputs (NumPy array, pandas dataframe, scipy sparse matrix), the + regular return value is returned. + + If the underlying estimator does not have a ``predict_proba`` + method, then an ``AttributeError`` is raised. + + Parameters + ---------- + X : array or dataframe + + Returns + ------- + y : array-like + """ + self._check_method("predict_log_proba") + return da.log(self.predict_proba(X)) + + def _check_method(self, method): + """Check if self.estimator has 'method'. + + Raises + ------ + AttributeError + """ + estimator = self._postfit_estimator + if not hasattr(estimator, method): + msg = "The wrapped estimator '{}' does not have a '{}' method.".format( + estimator, method + ) + raise AttributeError(msg) + return getattr(estimator, method) + + +def _predict(part, estimator, output_meta=None): + if part.shape[0] == 0 and output_meta is not None: + empty_output = handle_empty_partitions(output_meta) + if empty_output is not None: + return empty_output + return estimator.predict(part) + + +def _predict_proba(part, estimator, output_meta=None): + if part.shape[0] == 0 and output_meta is not None: + empty_output = handle_empty_partitions(output_meta) + if empty_output is not None: + return empty_output + return estimator.predict_proba(part) + + +def _transform(part, estimator, output_meta=None): + if part.shape[0] == 0 and output_meta is not None: + empty_output = handle_empty_partitions(output_meta) + if empty_output is not None: + return empty_output + return estimator.transform(part) + + +def handle_empty_partitions(output_meta): + if hasattr(output_meta, "__array_function__"): + if len(output_meta.shape) == 1: + shape = 0 + else: + shape = list(output_meta.shape) + shape[0] = 0 + ar = np.zeros( + shape=shape, + dtype=output_meta.dtype, + like=output_meta, + ) + return ar + elif "scipy.sparse" in type(output_meta).__module__: + # sparse matrices don't support + # `like` due to non implemented __array_function__ + # Refer https://github.com/scipy/scipy/issues/10362 + # Note below works for both cupy and scipy sparse matrices + if len(output_meta.shape) == 1: + shape = 0 + else: + shape = list(output_meta.shape) + shape[0] = 0 + ar = type(output_meta)(shape, dtype=output_meta.dtype) + return ar + elif hasattr(output_meta, "iloc"): + return output_meta.iloc[:0, :] + + +def _get_output_dask_ar_meta_for_estimator(model_fn, estimator, input_dask_ar): + """ + Returns the output metadata array + for the model function (predict, transform etc) + by running the appropriate function on dummy data + of shape (1, n_features) + + Parameters + ---------- + + model_fun: Model function + _predict, _transform etc + + estimator : Estimator + The underlying estimator that is fit. + + input_dask_ar: The input dask_array + + Returns + ------- + metadata: metadata of output dask array + + """ + # sklearn fails if input array has size size + # It requires at least 1 sample to run successfully + input_meta = input_dask_ar._meta + if hasattr(input_meta, "__array_function__"): + ar = np.zeros( + shape=(1, input_dask_ar.shape[1]), + dtype=input_dask_ar.dtype, + like=input_meta, + ) + elif "scipy.sparse" in type(input_meta).__module__: + # sparse matrices dont support + # `like` due to non implimented __array_function__ + # Refer https://github.com/scipy/scipy/issues/10362 + # Note below works for both cupy and scipy sparse matrices + ar = type(input_meta)((1, input_dask_ar.shape[1]), dtype=input_dask_ar.dtype) + else: + func_name = model_fn.__name__.strip("_") + msg = ( + f"Metadata for {func_name} is not provided, so Dask is " + f"running the {func_name} " + "function on a small dataset to guess output metadata. " + "As a result, It is possible that Dask will guess incorrectly." + ) + warnings.warn(msg) + ar = np.zeros(shape=(1, input_dask_ar.shape[1]), dtype=input_dask_ar.dtype) + return model_fn(ar, estimator) + + +def copy_learned_attributes(from_estimator, to_estimator): + attrs = {k: v for k, v in vars(from_estimator).items() if k.endswith("_")} + + for k, v in attrs.items(): + setattr(to_estimator, k, v) diff --git a/docs/source/sql/ml.rst b/docs/source/sql/ml.rst index 931cdc5ee..5c3a3b9d1 100644 --- a/docs/source/sql/ml.rst +++ b/docs/source/sql/ml.rst @@ -62,7 +62,7 @@ The key-value parameters control, how and which model is trained: want to set this parameter. * ``wrap_predict``: Boolean flag, whether to wrap the selected - model with a :class:`dask_ml.wrappers.ParallelPostFit`. + model with a :class:`dask_sql.physical.rel.custom.wrappers.ParallelPostFit`. Have a look into the `dask-ml docu on ParallelPostFit `_ to learn more about it. Defaults to false. Typically you set diff --git a/tests/integration/test_model.py b/tests/integration/test_model.py index 3c1bd1a69..ad48e5b44 100644 --- a/tests/integration/test_model.py +++ b/tests/integration/test_model.py @@ -934,3 +934,79 @@ def test_experiment_automl_regressor(c, client, training_df): ), "Best model was not registered" check_trained_model(c, "my_automl_exp2") + + +# TODO - many ML tests fail on clusters without sklearn - can we avoid this? +@skip_if_external_scheduler +def test_predict_with_nullable_types(c): + df = pd.DataFrame( + { + "rough_day_of_year": [0, 1, 2, 3], + "prev_day_inches_rained": [0.0, 1.0, 2.0, 3.0], + "rained": [False, False, False, True], + } + ) + c.create_table("train_set", df) + + model_class = "'sklearn.linear_model.LogisticRegression'" + + c.sql( + f""" + CREATE OR REPLACE MODEL model WITH ( + model_class = {model_class}, + wrap_predict = True, + wrap_fit = False, + target_column = 'rained' + ) AS ( + SELECT * + FROM train_set + ) + """ + ) + + expected = c.sql( + """ + SELECT * FROM PREDICT( + MODEL model, + SELECT * FROM train_set + ) + """ + ) + + df = pd.DataFrame( + { + "rough_day_of_year": pd.Series([0, 1, 2, 3], dtype="Int32"), + "prev_day_inches_rained": pd.Series([0.0, 1.0, 2.0, 3.0], dtype="Float32"), + "rained": pd.Series([False, False, False, True]), + } + ) + c.create_table("train_set", df) + + c.sql( + f""" + CREATE OR REPLACE MODEL model WITH ( + model_class = {model_class}, + wrap_predict = True, + wrap_fit = False, + target_column = 'rained' + ) AS ( + SELECT * + FROM train_set + ) + """ + ) + + result = c.sql( + """ + SELECT * FROM PREDICT( + MODEL model, + SELECT * FROM train_set + ) + """ + ) + + assert_eq( + expected, + result, + check_dtype=False, + )