-
Notifications
You must be signed in to change notification settings - Fork 72
Replace dask_ml.wrappers.Incremental
with custom Incremental
class
#855
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
28771dc
4734d8a
dc49752
8d593b0
8268096
7c60bd3
95d6ec6
0f91006
09fc4a6
f0dc935
8356dd4
8fcb67c
ebe2348
aadc04d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,17 +30,9 @@ class CreateExperimentPlugin(BaseRelPlugin): | |
* model_class: Full path to the class of the model which has to be tuned. | ||
Any model class with sklearn interface is valid, but might or | ||
might not work well with Dask dataframes. | ||
Have a look into the | ||
[dask-ml documentation](https://ml.dask.org/index.html) | ||
for more information on which models work best. | ||
You might need to install necessary packages to use | ||
the models. | ||
* experiment_class : Full path of the Hyperparameter tuner | ||
from dask_ml, choose dask tuner class carefully based on what you | ||
exactly need (memory vs compute constrains), refer: | ||
[dask-ml documentation](https://ml.dask.org/hyper-parameter-search.html) | ||
(for tuning hyperparameter of the models both model_class and experiment class are | ||
required parameters.) | ||
* tune_parameters: | ||
Key-value of pairs of Hyperparameters to tune, i.e Search Space for | ||
particular model to tune | ||
|
@@ -64,7 +56,7 @@ class CreateExperimentPlugin(BaseRelPlugin): | |
|
||
CREATE EXPERIMENT my_exp WITH( | ||
model_class = 'sklearn.ensemble.GradientBoostingClassifier', | ||
experiment_class = 'dask_ml.model_selection.GridSearchCV', | ||
experiment_class = 'sklearn.model_selection.GridSearchCV', | ||
sarahyurick marked this conversation as resolved.
Show resolved
Hide resolved
|
||
tune_parameters = (n_estimators = ARRAY [16, 32, 2], | ||
learning_rate = ARRAY [0.1,0.01,0.001], | ||
max_depth = ARRAY [3,4,5,10] | ||
|
@@ -174,7 +166,11 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai | |
|
||
search = ExperimentClass(model, {**parameters}, **experiment_kwargs) | ||
logger.info(tune_fit_kwargs) | ||
search.fit(X, y, **tune_fit_kwargs) | ||
search.fit( | ||
X.to_dask_array(lengths=True), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could experimentClass be a gpu based model or is it limited to cpu based ones only? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure, since I think every example I've seen with |
||
y.to_dask_array(lengths=True), | ||
**tune_fit_kwargs, | ||
) | ||
df = pd.DataFrame(search.cv_results_) | ||
df["model_class"] = model_class | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -32,9 +32,6 @@ class CreateModelPlugin(BaseRelPlugin): | |
* model_class: Full path to the class of the model to train. | ||
Any model class with sklearn interface is valid, but might or | ||
might not work well with Dask dataframes. | ||
Have a look into the | ||
[dask-ml documentation](https://ml.dask.org/index.html) | ||
for more information on which models work best. | ||
You might need to install necessary packages to use | ||
the models. | ||
* target_column: Which column from the data to use as target. | ||
|
@@ -45,16 +42,12 @@ class CreateModelPlugin(BaseRelPlugin): | |
want to set this parameter. | ||
* wrap_predict: Boolean flag, whether to wrap the selected | ||
model with a :class:`dask_sql.physical.rel.custom.wrappers.ParallelPostFit`. | ||
Have a look into the | ||
[dask-ml docu](https://ml.dask.org/meta-estimators.html#parallel-prediction-and-transformation) | ||
to learn more about it. Defaults to false. Typically you set | ||
it to true for sklearn models if predicting on big data. | ||
Defaults to false. Typically you set it to true for | ||
sklearn models if predicting on big data. | ||
Comment on lines
44
to
+46
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unrelated to this PR , can you file an issue to clean up the For |
||
* wrap_fit: Boolean flag, whether to wrap the selected | ||
model with a :class:`dask_ml.wrappers.Incremental`. | ||
Have a look into the | ||
[dask-ml docu](https://ml.dask.org/incremental.html) | ||
to learn more about it. Defaults to false. Typically you set | ||
it to true for sklearn models if training on big data. | ||
model with a :class:`dask_sql.physical.rel.custom.wrappers.Incremental`. | ||
Defaults to false. Typically you set it to true for | ||
sklearn models if training on big data. | ||
* fit_kwargs: keyword arguments sent to the call to fit(). | ||
|
||
All other arguments are passed to the constructor of the | ||
|
@@ -76,7 +69,7 @@ class CreateModelPlugin(BaseRelPlugin): | |
Examples: | ||
|
||
CREATE MODEL my_model WITH ( | ||
model_class = 'dask_ml.xgboost.XGBClassifier', | ||
model_class = 'xgboost.XGBClassifier', | ||
target_column = 'target' | ||
) AS ( | ||
SELECT x, y, target | ||
|
@@ -95,11 +88,10 @@ class CreateModelPlugin(BaseRelPlugin): | |
dask dataframes. | ||
|
||
* if you are training on relatively small amounts | ||
of data but predicting on large data samples | ||
(and you are not using a model build for usage with dask | ||
from the dask-ml package), you might want to set | ||
`wrap_predict` to True. With this option, | ||
model interference will be parallelized/distributed. | ||
of data but predicting on large data samples, | ||
you might want to set `wrap_predict` to True. | ||
With this option, model interference will be | ||
parallelized/distributed. | ||
* If you are training on large amounts of data, | ||
you can try setting wrap_fit to True. This will | ||
do the same on the training step, but works only on | ||
|
@@ -158,10 +150,7 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai | |
|
||
model = ModelClass(**kwargs) | ||
if wrap_fit: | ||
try: | ||
from dask_ml.wrappers import Incremental | ||
except ImportError: # pragma: no cover | ||
raise ValueError("Wrapping requires dask-ml to be installed.") | ||
from dask_sql.physical.rel.custom.wrappers import Incremental | ||
|
||
model = Incremental(estimator=model) | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,206 @@ | ||
# Copyright 2017, Dask developers | ||
# Dask-ML project - https://github.com/dask/dask-ml | ||
from typing import Optional, TypeVar | ||
|
||
import dask | ||
import dask.array as da | ||
import numpy as np | ||
import sklearn.metrics | ||
import sklearn.utils.multiclass | ||
from dask.array import Array | ||
from dask.utils import derived_from | ||
|
||
ArrayLike = TypeVar("ArrayLike", Array, np.ndarray) | ||
|
||
|
||
def accuracy_score( | ||
y_true: ArrayLike, | ||
y_pred: ArrayLike, | ||
normalize: bool = True, | ||
sample_weight: Optional[ArrayLike] = None, | ||
compute: bool = True, | ||
) -> ArrayLike: | ||
"""Accuracy classification score. | ||
In multilabel classification, this function computes subset accuracy: | ||
the set of labels predicted for a sample must *exactly* match the | ||
corresponding set of labels in y_true. | ||
Read more in the :ref:`User Guide <accuracy_score>`. | ||
Parameters | ||
---------- | ||
y_true : 1d array-like, or label indicator array | ||
Ground truth (correct) labels. | ||
y_pred : 1d array-like, or label indicator array | ||
Predicted labels, as returned by a classifier. | ||
normalize : bool, optional (default=True) | ||
If ``False``, return the number of correctly classified samples. | ||
Otherwise, return the fraction of correctly classified samples. | ||
sample_weight : 1d array-like, optional | ||
Sample weights. | ||
.. versionadded:: 0.7.0 | ||
Returns | ||
------- | ||
score : scalar dask Array | ||
If ``normalize == True``, return the correctly classified samples | ||
(float), else it returns the number of correctly classified samples | ||
(int). | ||
The best performance is 1 with ``normalize == True`` and the number | ||
of samples with ``normalize == False``. | ||
Notes | ||
----- | ||
In binary and multiclass classification, this function is equal | ||
to the ``jaccard_similarity_score`` function. | ||
|
||
""" | ||
|
||
if y_true.ndim > 1: | ||
differing_labels = ((y_true - y_pred) == 0).all(1) | ||
score = differing_labels != 0 | ||
else: | ||
score = y_true == y_pred | ||
|
||
if normalize: | ||
score = da.average(score, weights=sample_weight) | ||
elif sample_weight is not None: | ||
score = da.dot(score, sample_weight) | ||
else: | ||
score = score.sum() | ||
|
||
if compute: | ||
score = score.compute() | ||
return score | ||
|
||
|
||
def _log_loss_inner( | ||
x: ArrayLike, y: ArrayLike, sample_weight: Optional[ArrayLike], **kwargs | ||
): | ||
# da.map_blocks wasn't able to concatenate together the results | ||
# when we reduce down to a scalar per block. So we make an | ||
# array with 1 element. | ||
if sample_weight is not None: | ||
sample_weight = sample_weight.ravel() | ||
return np.array( | ||
[sklearn.metrics.log_loss(x, y, sample_weight=sample_weight, **kwargs)] | ||
) | ||
|
||
|
||
def log_loss( | ||
y_true, y_pred, eps=1e-15, normalize=True, sample_weight=None, labels=None | ||
): | ||
if not (dask.is_dask_collection(y_true) and dask.is_dask_collection(y_pred)): | ||
return sklearn.metrics.log_loss( | ||
y_true, | ||
y_pred, | ||
eps=eps, | ||
normalize=normalize, | ||
sample_weight=sample_weight, | ||
labels=labels, | ||
) | ||
|
||
if y_pred.ndim > 1 and y_true.ndim == 1: | ||
y_true = y_true.reshape(-1, 1) | ||
drop_axis: Optional[int] = 1 | ||
if sample_weight is not None: | ||
sample_weight = sample_weight.reshape(-1, 1) | ||
else: | ||
drop_axis = None | ||
|
||
result = da.map_blocks( | ||
_log_loss_inner, | ||
y_true, | ||
y_pred, | ||
sample_weight, | ||
chunks=(1,), | ||
drop_axis=drop_axis, | ||
dtype="f8", | ||
eps=eps, | ||
normalize=normalize, | ||
labels=labels, | ||
) | ||
if normalize and sample_weight is not None: | ||
sample_weight = sample_weight.ravel() | ||
block_weights = sample_weight.map_blocks(np.sum, chunks=(1,), keepdims=True) | ||
return da.average(result, 0, weights=block_weights) | ||
elif normalize: | ||
return result.mean() | ||
else: | ||
return result.sum() | ||
|
||
|
||
def _check_sample_weight(sample_weight: Optional[ArrayLike]): | ||
if sample_weight is not None: | ||
raise ValueError("'sample_weight' is not supported.") | ||
|
||
|
||
@derived_from(sklearn.metrics) | ||
def mean_squared_error( | ||
y_true: ArrayLike, | ||
y_pred: ArrayLike, | ||
sample_weight: Optional[ArrayLike] = None, | ||
multioutput: Optional[str] = "uniform_average", | ||
squared: bool = True, | ||
compute: bool = True, | ||
) -> ArrayLike: | ||
_check_sample_weight(sample_weight) | ||
output_errors = ((y_pred - y_true) ** 2).mean(axis=0) | ||
|
||
if isinstance(multioutput, str) or multioutput is None: | ||
if multioutput == "raw_values": | ||
if compute: | ||
return output_errors.compute() | ||
else: | ||
return output_errors | ||
else: | ||
raise ValueError("Weighted 'multioutput' not supported.") | ||
result = output_errors.mean() | ||
if not squared: | ||
result = da.sqrt(result) | ||
if compute: | ||
result = result.compute() | ||
return result | ||
|
||
|
||
def _check_reg_targets( | ||
y_true: ArrayLike, y_pred: ArrayLike, multioutput: Optional[str] | ||
): | ||
if multioutput is not None and multioutput != "uniform_average": | ||
raise NotImplementedError("'multioutput' must be 'uniform_average'") | ||
|
||
if y_true.ndim == 1: | ||
y_true = y_true.reshape((-1, 1)) | ||
if y_pred.ndim == 1: | ||
y_pred = y_pred.reshape((-1, 1)) | ||
|
||
# TODO: y_type, multioutput | ||
return None, y_true, y_pred, multioutput | ||
|
||
|
||
@derived_from(sklearn.metrics) | ||
def r2_score( | ||
y_true: ArrayLike, | ||
y_pred: ArrayLike, | ||
sample_weight: Optional[ArrayLike] = None, | ||
multioutput: Optional[str] = "uniform_average", | ||
compute: bool = True, | ||
) -> ArrayLike: | ||
_check_sample_weight(sample_weight) | ||
_, y_true, y_pred, _ = _check_reg_targets(y_true, y_pred, multioutput) | ||
weight = 1.0 | ||
|
||
numerator = (weight * (y_true - y_pred) ** 2).sum(axis=0, dtype="f8") | ||
denominator = (weight * (y_true - y_true.mean(axis=0)) ** 2).sum(axis=0, dtype="f8") | ||
|
||
nonzero_denominator = denominator != 0 | ||
nonzero_numerator = numerator != 0 | ||
valid_score = nonzero_denominator & nonzero_numerator | ||
output_chunks = getattr(y_true, "chunks", [None, None])[1] | ||
output_scores = da.ones([y_true.shape[1]], chunks=output_chunks) | ||
with np.errstate(all="ignore"): | ||
output_scores[valid_score] = 1 - ( | ||
numerator[valid_score] / denominator[valid_score] | ||
) | ||
output_scores[nonzero_numerator & ~nonzero_denominator] = 0.0 | ||
|
||
result = output_scores.mean(axis=0) | ||
if compute: | ||
result = result.compute() | ||
return result |
Uh oh!
There was an error while loading. Please reload this page.