-
Notifications
You must be signed in to change notification settings - Fork 72
Replace dask_ml.wrappers.Incremental
with custom Incremental
class
#855
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 11 commits
Commits
Show all changes
14 commits
Select commit
Hold shift + click to select a range
28771dc
Create metrics.py
sarahyurick 4734d8a
Merge branch 'main' into incremental
sarahyurick dc49752
add incremental functionality
sarahyurick 8d593b0
lint and some comments
sarahyurick 8268096
update more comments
sarahyurick 7c60bd3
add dask-ml fit function
sarahyurick 95d6ec6
style fix
sarahyurick 0f91006
DASK_2022_01_0
sarahyurick 09fc4a6
add unit tests
sarahyurick f0dc935
style fix
sarahyurick 8356dd4
remove scheduler
sarahyurick 8fcb67c
experiment_class comment
sarahyurick ebe2348
apply Vibhu's suggestions
sarahyurick aadc04d
style fix
sarahyurick File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -32,9 +32,6 @@ class CreateModelPlugin(BaseRelPlugin): | |
* model_class: Full path to the class of the model to train. | ||
Any model class with sklearn interface is valid, but might or | ||
might not work well with Dask dataframes. | ||
Have a look into the | ||
[dask-ml documentation](https://ml.dask.org/index.html) | ||
for more information on which models work best. | ||
You might need to install necessary packages to use | ||
the models. | ||
* target_column: Which column from the data to use as target. | ||
|
@@ -45,16 +42,12 @@ class CreateModelPlugin(BaseRelPlugin): | |
want to set this parameter. | ||
* wrap_predict: Boolean flag, whether to wrap the selected | ||
model with a :class:`dask_sql.physical.rel.custom.wrappers.ParallelPostFit`. | ||
Have a look into the | ||
[dask-ml docu](https://ml.dask.org/meta-estimators.html#parallel-prediction-and-transformation) | ||
to learn more about it. Defaults to false. Typically you set | ||
it to true for sklearn models if predicting on big data. | ||
Defaults to false. Typically you set it to true for | ||
sklearn models if predicting on big data. | ||
Comment on lines
44
to
+46
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unrelated to this PR , can you file an issue to clean up the For |
||
* wrap_fit: Boolean flag, whether to wrap the selected | ||
model with a :class:`dask_ml.wrappers.Incremental`. | ||
Have a look into the | ||
[dask-ml docu](https://ml.dask.org/incremental.html) | ||
to learn more about it. Defaults to false. Typically you set | ||
it to true for sklearn models if training on big data. | ||
model with a :class:`dask_sql.physical.rel.custom.wrappers.Incremental`. | ||
Defaults to false. Typically you set it to true for | ||
sklearn models if training on big data. | ||
* fit_kwargs: keyword arguments sent to the call to fit(). | ||
|
||
All other arguments are passed to the constructor of the | ||
|
@@ -76,7 +69,7 @@ class CreateModelPlugin(BaseRelPlugin): | |
Examples: | ||
|
||
CREATE MODEL my_model WITH ( | ||
model_class = 'dask_ml.xgboost.XGBClassifier', | ||
model_class = 'sklearn.ensemble.GradientBoostingClassifier', | ||
target_column = 'target' | ||
) AS ( | ||
SELECT x, y, target | ||
|
@@ -95,11 +88,10 @@ class CreateModelPlugin(BaseRelPlugin): | |
dask dataframes. | ||
|
||
* if you are training on relatively small amounts | ||
of data but predicting on large data samples | ||
(and you are not using a model build for usage with dask | ||
from the dask-ml package), you might want to set | ||
`wrap_predict` to True. With this option, | ||
model interference will be parallelized/distributed. | ||
of data but predicting on large data samples, | ||
you might want to set `wrap_predict` to True. | ||
With this option, model interference will be | ||
parallelized/distributed. | ||
* If you are training on large amounts of data, | ||
you can try setting wrap_fit to True. This will | ||
do the same on the training step, but works only on | ||
|
@@ -158,10 +150,7 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai | |
|
||
model = ModelClass(**kwargs) | ||
if wrap_fit: | ||
try: | ||
from dask_ml.wrappers import Incremental | ||
except ImportError: # pragma: no cover | ||
raise ValueError("Wrapping requires dask-ml to be installed.") | ||
from dask_sql.physical.rel.custom.wrappers import Incremental | ||
|
||
model = Incremental(estimator=model) | ||
|
||
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,206 @@ | ||
# Copyright 2017, Dask developers | ||
# Dask-ML project - https://github.com/dask/dask-ml | ||
from typing import Optional, TypeVar | ||
|
||
import dask | ||
import dask.array as da | ||
import numpy as np | ||
import sklearn.metrics | ||
import sklearn.utils.multiclass | ||
from dask.array import Array | ||
from dask.utils import derived_from | ||
|
||
ArrayLike = TypeVar("ArrayLike", Array, np.ndarray) | ||
|
||
|
||
def accuracy_score( | ||
y_true: ArrayLike, | ||
y_pred: ArrayLike, | ||
normalize: bool = True, | ||
sample_weight: Optional[ArrayLike] = None, | ||
compute: bool = True, | ||
) -> ArrayLike: | ||
"""Accuracy classification score. | ||
In multilabel classification, this function computes subset accuracy: | ||
the set of labels predicted for a sample must *exactly* match the | ||
corresponding set of labels in y_true. | ||
Read more in the :ref:`User Guide <accuracy_score>`. | ||
Parameters | ||
---------- | ||
y_true : 1d array-like, or label indicator array | ||
Ground truth (correct) labels. | ||
y_pred : 1d array-like, or label indicator array | ||
Predicted labels, as returned by a classifier. | ||
normalize : bool, optional (default=True) | ||
If ``False``, return the number of correctly classified samples. | ||
Otherwise, return the fraction of correctly classified samples. | ||
sample_weight : 1d array-like, optional | ||
Sample weights. | ||
.. versionadded:: 0.7.0 | ||
Returns | ||
------- | ||
score : scalar dask Array | ||
If ``normalize == True``, return the correctly classified samples | ||
(float), else it returns the number of correctly classified samples | ||
(int). | ||
The best performance is 1 with ``normalize == True`` and the number | ||
of samples with ``normalize == False``. | ||
Notes | ||
----- | ||
In binary and multiclass classification, this function is equal | ||
to the ``jaccard_similarity_score`` function. | ||
|
||
""" | ||
|
||
if y_true.ndim > 1: | ||
differing_labels = ((y_true - y_pred) == 0).all(1) | ||
score = differing_labels != 0 | ||
else: | ||
score = y_true == y_pred | ||
|
||
if normalize: | ||
score = da.average(score, weights=sample_weight) | ||
elif sample_weight is not None: | ||
score = da.dot(score, sample_weight) | ||
else: | ||
score = score.sum() | ||
|
||
if compute: | ||
score = score.compute() | ||
return score | ||
|
||
|
||
def _log_loss_inner( | ||
x: ArrayLike, y: ArrayLike, sample_weight: Optional[ArrayLike], **kwargs | ||
): | ||
# da.map_blocks wasn't able to concatenate together the results | ||
# when we reduce down to a scalar per block. So we make an | ||
# array with 1 element. | ||
if sample_weight is not None: | ||
sample_weight = sample_weight.ravel() | ||
return np.array( | ||
[sklearn.metrics.log_loss(x, y, sample_weight=sample_weight, **kwargs)] | ||
) | ||
|
||
|
||
def log_loss( | ||
y_true, y_pred, eps=1e-15, normalize=True, sample_weight=None, labels=None | ||
): | ||
if not (dask.is_dask_collection(y_true) and dask.is_dask_collection(y_pred)): | ||
return sklearn.metrics.log_loss( | ||
y_true, | ||
y_pred, | ||
eps=eps, | ||
normalize=normalize, | ||
sample_weight=sample_weight, | ||
labels=labels, | ||
) | ||
|
||
if y_pred.ndim > 1 and y_true.ndim == 1: | ||
y_true = y_true.reshape(-1, 1) | ||
drop_axis: Optional[int] = 1 | ||
if sample_weight is not None: | ||
sample_weight = sample_weight.reshape(-1, 1) | ||
else: | ||
drop_axis = None | ||
|
||
result = da.map_blocks( | ||
_log_loss_inner, | ||
y_true, | ||
y_pred, | ||
sample_weight, | ||
chunks=(1,), | ||
drop_axis=drop_axis, | ||
dtype="f8", | ||
eps=eps, | ||
normalize=normalize, | ||
labels=labels, | ||
) | ||
if normalize and sample_weight is not None: | ||
sample_weight = sample_weight.ravel() | ||
block_weights = sample_weight.map_blocks(np.sum, chunks=(1,), keepdims=True) | ||
return da.average(result, 0, weights=block_weights) | ||
elif normalize: | ||
return result.mean() | ||
else: | ||
return result.sum() | ||
|
||
|
||
def _check_sample_weight(sample_weight: Optional[ArrayLike]): | ||
if sample_weight is not None: | ||
raise ValueError("'sample_weight' is not supported.") | ||
|
||
|
||
@derived_from(sklearn.metrics) | ||
def mean_squared_error( | ||
y_true: ArrayLike, | ||
y_pred: ArrayLike, | ||
sample_weight: Optional[ArrayLike] = None, | ||
multioutput: Optional[str] = "uniform_average", | ||
squared: bool = True, | ||
compute: bool = True, | ||
) -> ArrayLike: | ||
_check_sample_weight(sample_weight) | ||
output_errors = ((y_pred - y_true) ** 2).mean(axis=0) | ||
|
||
if isinstance(multioutput, str) or multioutput is None: | ||
if multioutput == "raw_values": | ||
if compute: | ||
return output_errors.compute() | ||
else: | ||
return output_errors | ||
else: | ||
raise ValueError("Weighted 'multioutput' not supported.") | ||
result = output_errors.mean() | ||
if not squared: | ||
result = da.sqrt(result) | ||
if compute: | ||
result = result.compute() | ||
return result | ||
|
||
|
||
def _check_reg_targets( | ||
y_true: ArrayLike, y_pred: ArrayLike, multioutput: Optional[str] | ||
): | ||
if multioutput is not None and multioutput != "uniform_average": | ||
raise NotImplementedError("'multioutput' must be 'uniform_average'") | ||
|
||
if y_true.ndim == 1: | ||
y_true = y_true.reshape((-1, 1)) | ||
if y_pred.ndim == 1: | ||
y_pred = y_pred.reshape((-1, 1)) | ||
|
||
# TODO: y_type, multioutput | ||
return None, y_true, y_pred, multioutput | ||
|
||
|
||
@derived_from(sklearn.metrics) | ||
def r2_score( | ||
y_true: ArrayLike, | ||
y_pred: ArrayLike, | ||
sample_weight: Optional[ArrayLike] = None, | ||
multioutput: Optional[str] = "uniform_average", | ||
compute: bool = True, | ||
) -> ArrayLike: | ||
_check_sample_weight(sample_weight) | ||
_, y_true, y_pred, _ = _check_reg_targets(y_true, y_pred, multioutput) | ||
weight = 1.0 | ||
|
||
numerator = (weight * (y_true - y_pred) ** 2).sum(axis=0, dtype="f8") | ||
denominator = (weight * (y_true - y_true.mean(axis=0)) ** 2).sum(axis=0, dtype="f8") | ||
|
||
nonzero_denominator = denominator != 0 | ||
nonzero_numerator = numerator != 0 | ||
valid_score = nonzero_denominator & nonzero_numerator | ||
output_chunks = getattr(y_true, "chunks", [None, None])[1] | ||
output_scores = da.ones([y_true.shape[1]], chunks=output_chunks) | ||
with np.errstate(all="ignore"): | ||
output_scores[valid_score] = 1 - ( | ||
numerator[valid_score] / denominator[valid_score] | ||
) | ||
output_scores[nonzero_numerator & ~nonzero_denominator] = 0.0 | ||
|
||
result = output_scores.mean(axis=0) | ||
if compute: | ||
result = result.compute() | ||
return result |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.