From 9674fe5ff45975f9f923a2b7eaff5b23eca07604 Mon Sep 17 00:00:00 2001 From: Sarah Yurick Date: Mon, 24 Oct 2022 11:51:38 -0700 Subject: [PATCH 1/9] initial pass --- .github/workflows/test-upstream.yml | 9 +++------ .github/workflows/test.yml | 9 +++------ continuous_integration/environment-3.10-dev.yaml | 1 - continuous_integration/environment-3.8-dev.yaml | 1 - continuous_integration/environment-3.9-dev.yaml | 1 - continuous_integration/gpuci/environment.yaml | 1 - docker/conda.txt | 1 - docker/main.dockerfile | 1 - setup.py | 1 - 9 files changed, 6 insertions(+), 19 deletions(-) diff --git a/.github/workflows/test-upstream.yml b/.github/workflows/test-upstream.yml index 1e3e5caa9..381a9bf29 100644 --- a/.github/workflows/test-upstream.yml +++ b/.github/workflows/test-upstream.yml @@ -73,11 +73,10 @@ jobs: mamba install -c conda-forge "sasl>=0.3.1" docker pull bde2020/hive:2.3.2-postgresql-metastore docker pull bde2020/hive-metastore-postgresql:2.3.0 - - name: Install upstream dev Dask / dask-ml + - name: Install upstream dev Dask if: env.which_upstream == 'Dask' run: | mamba update dask - python -m pip install --no-deps git+https://github.com/dask/dask-ml - name: Test with pytest run: | pytest --junitxml=junit/test-results.xml --cov-report=xml -n auto tests --dist loadfile @@ -112,11 +111,10 @@ jobs: which python pip list mamba list - - name: Install upstream dev dask-ml + - name: Install upstream dev Dask if: env.which_upstream == 'Dask' run: | mamba update dask - python -m pip install --no-deps git+https://github.com/dask/dask-ml - name: run a dask cluster run: | if [[ $which_upstream == "Dask" ]]; then @@ -161,12 +159,11 @@ jobs: which python pip list mamba list - - name: Install upstream dev Dask / dask-ml + - name: Install upstream dev Dask if: env.which_upstream == 'Dask' run: | python -m pip install --no-deps git+https://github.com/dask/dask python -m pip install --no-deps git+https://github.com/dask/distributed - python -m pip install --no-deps git+https://github.com/dask/dask-ml - name: Try to import dask-sql run: | python -c "import dask_sql; print('ok')" diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index a20ee2b14..bdf46c3f6 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -64,11 +64,10 @@ jobs: mamba install -c conda-forge "sasl>=0.3.1" docker pull bde2020/hive:2.3.2-postgresql-metastore docker pull bde2020/hive-metastore-postgresql:2.3.0 - - name: Optionally install upstream dev Dask / dask-ml + - name: Optionally install upstream dev Dask if: needs.detect-ci-trigger.outputs.triggered == 'true' run: | mamba update dask - python -m pip install --no-deps git+https://github.com/dask/dask-ml - name: Test with pytest run: | pytest --junitxml=junit/test-results.xml --cov-report=xml -n auto tests --dist loadfile @@ -108,11 +107,10 @@ jobs: which python pip list mamba list - - name: Optionally install upstream dev dask-ml + - name: Optionally install upstream dev Dask if: needs.detect-ci-trigger.outputs.triggered == 'true' run: | mamba update dask - python -m pip install --no-deps git+https://github.com/dask/dask-ml - name: run a dask cluster env: UPSTREAM: ${{ needs.detect-ci-trigger.outputs.triggered }} @@ -153,12 +151,11 @@ jobs: which python pip list mamba list - - name: Optionally install upstream dev Dask / dask-ml + - name: Optionally install upstream dev Dask if: needs.detect-ci-trigger.outputs.triggered == 'true' run: | python -m pip install --no-deps git+https://github.com/dask/dask python -m pip install --no-deps git+https://github.com/dask/distributed - python -m pip install --no-deps git+https://github.com/dask/dask-ml - name: Try to import dask-sql run: | python -c "import dask_sql; print('ok')" diff --git a/continuous_integration/environment-3.10-dev.yaml b/continuous_integration/environment-3.10-dev.yaml index af55d33b5..2e644d1fd 100644 --- a/continuous_integration/environment-3.10-dev.yaml +++ b/continuous_integration/environment-3.10-dev.yaml @@ -3,7 +3,6 @@ channels: - conda-forge - nodefaults dependencies: -- dask-ml>=2022.1.22 - dask>=2022.3.0 - fastapi>=0.69.0 - fugue>=0.7.0 diff --git a/continuous_integration/environment-3.8-dev.yaml b/continuous_integration/environment-3.8-dev.yaml index dca95257d..ca18dc15a 100644 --- a/continuous_integration/environment-3.8-dev.yaml +++ b/continuous_integration/environment-3.8-dev.yaml @@ -3,7 +3,6 @@ channels: - conda-forge - nodefaults dependencies: -- dask-ml=2022.1.22 - dask=2022.3.0 - fastapi=0.69.0 - fugue=0.7.0 diff --git a/continuous_integration/environment-3.9-dev.yaml b/continuous_integration/environment-3.9-dev.yaml index 52ec271d3..a22924a76 100644 --- a/continuous_integration/environment-3.9-dev.yaml +++ b/continuous_integration/environment-3.9-dev.yaml @@ -3,7 +3,6 @@ channels: - conda-forge - nodefaults dependencies: -- dask-ml>=2022.1.22 - dask>=2022.3.0 - fastapi>=0.69.0 - fugue>=0.7.0 diff --git a/continuous_integration/gpuci/environment.yaml b/continuous_integration/gpuci/environment.yaml index 2e7817cfc..fe1064f59 100644 --- a/continuous_integration/gpuci/environment.yaml +++ b/continuous_integration/gpuci/environment.yaml @@ -6,7 +6,6 @@ channels: - conda-forge - nodefaults dependencies: -- dask-ml>=2022.1.22 - dask>=2022.3.0 - fastapi>=0.69.0 - fugue>=0.7.0 diff --git a/docker/conda.txt b/docker/conda.txt index 32a08c7a9..3d57e18dc 100644 --- a/docker/conda.txt +++ b/docker/conda.txt @@ -16,7 +16,6 @@ uvicorn>=0.13.4 pyarrow>=6.0.1 prompt_toolkit>=3.0.8 pygments>=2.7.1 -dask-ml>=2022.1.22 scikit-learn>=1.0.0 intake>=0.6.0 pre-commit>=2.11.1 diff --git a/docker/main.dockerfile b/docker/main.dockerfile index 824f8ce27..0349b68b9 100644 --- a/docker/main.dockerfile +++ b/docker/main.dockerfile @@ -27,7 +27,6 @@ RUN mamba install --freeze-installed -y \ nest-asyncio \ # additional dependencies "pyarrow>=6.0.1" \ - "dask-ml>=2022.1.22" \ "scikit-learn>=1.0.0" \ "intake>=0.6.0" \ && conda clean -ay diff --git a/setup.py b/setup.py index c982e40a0..0f8520de9 100644 --- a/setup.py +++ b/setup.py @@ -59,7 +59,6 @@ "mock>=4.0.3", "sphinx>=3.2.1", "pyarrow>=6.0.1", - "dask-ml>=2022.1.22", "scikit-learn>=1.0.0", "intake>=0.6.0", "pre-commit", From 0027dd6217efd5edd93d6caeb310f25865b5c6ec Mon Sep 17 00:00:00 2001 From: Sarah Yurick Date: Tue, 15 Nov 2022 11:01:18 -0800 Subject: [PATCH 2/9] remove all dask ml references --- dask_sql/physical/rel/custom/predict.py | 3 +-- docs/source/machine_learning.rst | 8 +++----- docs/source/sql/ml.rst | 3 +-- notebooks/Feature Overview.ipynb | 4 ++-- tests/integration/test_model.py | 22 ++++++++++------------ 5 files changed, 17 insertions(+), 23 deletions(-) diff --git a/dask_sql/physical/rel/custom/predict.py b/dask_sql/physical/rel/custom/predict.py index eb5e4b69f..03edb62ed 100644 --- a/dask_sql/physical/rel/custom/predict.py +++ b/dask_sql/physical/rel/custom/predict.py @@ -30,8 +30,7 @@ class PredictModelPlugin(BaseRelPlugin): Please note however, that it will need to act on Dask dataframes. If you are using a model not optimized for this, it might be that you run out of memory if your data is larger than the RAM of a single machine. - To prevent this, have a look into the dask-ml package, - especially the [ParallelPostFit](https://ml.dask.org/meta-estimators.html) + To prevent this, have a look into the dask_sql.physical.rel.custom.wrappers.ParallelPostFit meta-estimator. If you are using a model trained with `CREATE MODEL` and the `wrap_predict` flag, this is done automatically. diff --git a/docs/source/machine_learning.rst b/docs/source/machine_learning.rst index fac0daacb..3dd301863 100644 --- a/docs/source/machine_learning.rst +++ b/docs/source/machine_learning.rst @@ -125,8 +125,7 @@ following sql statements Want to increase the performance of your model by tuning the parameters? Use the hyperparameter tuning directly in SQL using below SQL syntax, choose different tuners -from the dask_ml package based on memory and compute constraints and -for more details refer to the `dask ml documentation `_ +based on memory and compute constraints. .. TODO - add a GPU section to these examples once we have working CREATE EXPERIMENT tests for GPU @@ -135,7 +134,7 @@ for more details refer to the `dask ml documentation `_ +To prevent this, have a look into the `dask_sql.physical.rel.custom.wrappers.ParallelPostFit` meta-estimator. If you are using a model trained with ``CREATE MODEL`` and the ``wrap_predict`` flag set to true, this is done automatically. diff --git a/notebooks/Feature Overview.ipynb b/notebooks/Feature Overview.ipynb index 28538ab64..41085b31e 100644 --- a/notebooks/Feature Overview.ipynb +++ b/notebooks/Feature Overview.ipynb @@ -590,7 +590,7 @@ "metadata": {}, "source": [ "- Tune single model with different Hyperparameters \n", - " - install **dask_ml** for tunning\n", + " - install **sklearn** for tunning\n", "- Tune multiple model with different Hyperparameters\n", " - install **tpot** for Automl" ] @@ -604,7 +604,7 @@ "%%sql\n", "CREATE EXPERIMENT my_exp WITH (\n", " model_class = 'sklearn.ensemble.GradientBoostingClassifier',\n", - " experiment_class = 'dask_ml.model_selection.GridSearchCV',\n", + " experiment_class = 'sklearn.model_selection.GridSearchCV',\n", " tune_parameters = (n_estimators = ARRAY [16, 32, 2],learning_rate = ARRAY [0.1,0.01,0.001],\n", " max_depth = ARRAY [3,4,5,10]),\n", " target_column = 'species'\n", diff --git a/tests/integration/test_model.py b/tests/integration/test_model.py index ad48e5b44..67e2bd895 100644 --- a/tests/integration/test_model.py +++ b/tests/integration/test_model.py @@ -18,8 +18,6 @@ xgboost = None dask_cudf = None -pytest.importorskip("dask_ml") - def check_trained_model(c, model_name=None): if model_name is None: @@ -157,7 +155,7 @@ def test_clustering_and_prediction(c, training_df): c.sql( """ CREATE MODEL my_model WITH ( - model_class = 'dask_ml.cluster.KMeans' + model_class = 'sklearn.cluster.KMeans' ) AS ( SELECT x, y FROM timeseries @@ -244,7 +242,7 @@ def test_show_models(c, training_df): c.sql( """ CREATE MODEL my_model2 WITH ( - model_class = 'dask_ml.cluster.KMeans' + model_class = 'sklearn.cluster.KMeans' ) AS ( SELECT x, y FROM timeseries @@ -691,7 +689,7 @@ def test_ml_experiment(c, client, training_df): c.sql( """ CREATE EXPERIMENT my_exp WITH ( - experiment_class = 'dask_ml.model_selection.GridSearchCV', + experiment_class = 'sklearn.model_selection.GridSearchCV', tune_parameters = (n_estimators = ARRAY [16, 32, 2],learning_rate = ARRAY [0.1,0.01,0.001], max_depth = ARRAY [3,4,5,10]), target_column = 'target' @@ -731,7 +729,7 @@ def test_ml_experiment(c, client, training_df): """ CREATE EXPERIMENT IF NOT EXISTS my_exp WITH ( model_class = 'that.is.not.a.python.class', - experiment_class = 'dask_ml.model_selection.GridSearchCV', + experiment_class = 'sklearn.model_selection.GridSearchCV', tune_parameters = (n_estimators = ARRAY [16, 32, 2],learning_rate = ARRAY [0.1,0.01,0.001], max_depth = ARRAY [3,4,5,10]), target_column = 'target' @@ -794,7 +792,7 @@ def test_ml_experiment(c, client, training_df): """ CREATE EXPERIMENT my_exp WITH ( model_class = 'sklearn.ensemble.GradientBoostingClassifier', - experiment_class = 'dask_ml.model_selection.GridSearchCV', + experiment_class = 'sklearn.model_selection.GridSearchCV', tune_parameters = (n_estimators = ARRAY [16, 32, 2],learning_rate = ARRAY [0.1,0.01,0.001], max_depth = ARRAY [3,4,5,10]), target_column = 'target' @@ -816,7 +814,7 @@ def test_ml_experiment(c, client, training_df): """ CREATE EXPERIMENT my_exp WITH ( model_class = 'sklearn.ensemble.GradientBoostingClassifier', - experiment_class = 'dask_ml.model_selection.GridSearchCV', + experiment_class = 'sklearn.model_selection.GridSearchCV', tune_parameters = (n_estimators = ARRAY [16, 32, 2],learning_rate = ARRAY [0.1,0.01,0.001], max_depth = ARRAY [3,4,5,10]), target_column = 'target' @@ -831,7 +829,7 @@ def test_ml_experiment(c, client, training_df): """ CREATE EXPERIMENT IF NOT EXISTS my_exp WITH ( model_class = 'sklearn.ensemble.GradientBoostingClassifier', - experiment_class = 'dask_ml.model_selection.GridSearchCV', + experiment_class = 'sklearn.model_selection.GridSearchCV', tune_parameters = (n_estimators = ARRAY [16, 32, 2],learning_rate = ARRAY [0.1,0.01,0.001], max_depth = ARRAY [3,4,5,10]), target_column = 'target' @@ -847,7 +845,7 @@ def test_ml_experiment(c, client, training_df): """ CREATE OR REPLACE EXPERIMENT my_exp WITH ( model_class = 'sklearn.ensemble.GradientBoostingClassifier', - experiment_class = 'dask_ml.model_selection.GridSearchCV', + experiment_class = 'sklearn.model_selection.GridSearchCV', tune_parameters = (n_estimators = ARRAY [16, 32, 2],learning_rate = ARRAY [0.1,0.01,0.001], max_depth = ARRAY [3,4,5,10]), target_column = 'target' @@ -867,8 +865,8 @@ def test_ml_experiment(c, client, training_df): c.sql( """ CREATE EXPERIMENT my_exp1 WITH ( - model_class = 'dask_ml.cluster.KMeans', - experiment_class = 'dask_ml.model_selection.RandomizedSearchCV', + model_class = 'sklearn.cluster.KMeans', + experiment_class = 'sklearn.model_selection.RandomizedSearchCV', tune_parameters = (n_clusters = ARRAY [3,4,16],tol = ARRAY [0.1,0.01,0.001], max_iter = ARRAY [3,4,5,10]) ) AS ( From c785aefe864806bf395449024abd10a444d168ae Mon Sep 17 00:00:00 2001 From: Sarah Yurick Date: Tue, 15 Nov 2022 15:38:37 -0800 Subject: [PATCH 3/9] remove use_dask --- tests/integration/test_model.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/integration/test_model.py b/tests/integration/test_model.py index 67e2bd895..18d739689 100644 --- a/tests/integration/test_model.py +++ b/tests/integration/test_model.py @@ -887,7 +887,7 @@ def test_experiment_automl_classifier(c, client, training_df): """ CREATE EXPERIMENT my_automl_exp1 WITH ( automl_class = 'tpot.TPOTClassifier', - automl_kwargs = (population_size = 2 ,generations=2,cv=2,n_jobs=-1,use_dask=True), + automl_kwargs = (population_size = 2 ,generations=2,cv=2,n_jobs=-1), target_column = 'target' ) AS ( SELECT x, y, x*y > 0 AS target @@ -916,7 +916,6 @@ def test_experiment_automl_regressor(c, client, training_df): generations=2, cv=2, n_jobs=-1, - use_dask=True, max_eval_time_mins=1), target_column = 'target' From 9007c0b1795635912d32e145535850676aae357f Mon Sep 17 00:00:00 2001 From: Sarah Yurick Date: Tue, 15 Nov 2022 16:32:59 -0800 Subject: [PATCH 4/9] fix failing test --- dask_sql/physical/rel/custom/predict.py | 12 ++++++++++++ notebooks/Feature Overview.ipynb | 2 +- 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/dask_sql/physical/rel/custom/predict.py b/dask_sql/physical/rel/custom/predict.py index 03edb62ed..91de1d478 100644 --- a/dask_sql/physical/rel/custom/predict.py +++ b/dask_sql/physical/rel/custom/predict.py @@ -2,6 +2,9 @@ import uuid from typing import TYPE_CHECKING +import dask.dataframe as dd +import pandas as pd + from dask_sql.datacontainer import ColumnContainer, DataContainer from dask_sql.physical.rel.base import BaseRelPlugin @@ -58,8 +61,17 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai model, training_columns = context.schema[schema_name].models[model_name] df = context.sql(sql_select) + df = df.set_index(df.columns[0], drop=False) prediction = model.predict(df[training_columns]) + # Convert numpy.ndarray to Dask Series + prediction = dd.from_pandas( + pd.Series(prediction, index=df.index), + npartitions=df.npartitions, + ) predicted_df = df.assign(target=prediction) + # Need to drop first column to reset index + # because the first column is equal to the index + predicted_df = predicted_df.drop(columns=[df.columns[0]]).reset_index() # Create a temporary context, which includes the # new "table" so that we can use the normal diff --git a/notebooks/Feature Overview.ipynb b/notebooks/Feature Overview.ipynb index 41085b31e..ac23a9777 100644 --- a/notebooks/Feature Overview.ipynb +++ b/notebooks/Feature Overview.ipynb @@ -590,7 +590,7 @@ "metadata": {}, "source": [ "- Tune single model with different Hyperparameters \n", - " - install **sklearn** for tunning\n", + " - install **sklearn** for tuning\n", "- Tune multiple model with different Hyperparameters\n", " - install **tpot** for Automl" ] From e139fa4656577d55a7ae0f056ae8d38475a3f483 Mon Sep 17 00:00:00 2001 From: Sarah Yurick Date: Tue, 15 Nov 2022 16:57:00 -0800 Subject: [PATCH 5/9] wrap in try/except --- dask_sql/physical/rel/custom/predict.py | 26 ++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/dask_sql/physical/rel/custom/predict.py b/dask_sql/physical/rel/custom/predict.py index 91de1d478..1d1f2fd10 100644 --- a/dask_sql/physical/rel/custom/predict.py +++ b/dask_sql/physical/rel/custom/predict.py @@ -61,17 +61,21 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai model, training_columns = context.schema[schema_name].models[model_name] df = context.sql(sql_select) - df = df.set_index(df.columns[0], drop=False) - prediction = model.predict(df[training_columns]) - # Convert numpy.ndarray to Dask Series - prediction = dd.from_pandas( - pd.Series(prediction, index=df.index), - npartitions=df.npartitions, - ) - predicted_df = df.assign(target=prediction) - # Need to drop first column to reset index - # because the first column is equal to the index - predicted_df = predicted_df.drop(columns=[df.columns[0]]).reset_index() + try: + prediction = model.predict(df[training_columns]) + predicted_df = df.assign(target=prediction) + except TypeError: + df = df.set_index(df.columns[0], drop=False) + prediction = model.predict(df[training_columns]) + # Convert numpy.ndarray to Dask Series + prediction = dd.from_pandas( + pd.Series(prediction, index=df.index), + npartitions=df.npartitions, + ) + predicted_df = df.assign(target=prediction) + # Need to drop first column to reset index + # because the first column is equal to the index + predicted_df = predicted_df.drop(columns=[df.columns[0]]).reset_index() # Create a temporary context, which includes the # new "table" so that we can use the normal From ac009610bd2d86f506e742a0a74fe25622a84c74 Mon Sep 17 00:00:00 2001 From: Sarah Yurick Date: Wed, 16 Nov 2022 13:31:27 -0800 Subject: [PATCH 6/9] add gpu test --- tests/integration/test_model.py | 45 ++++++++++++++++++++++++--------- 1 file changed, 33 insertions(+), 12 deletions(-) diff --git a/tests/integration/test_model.py b/tests/integration/test_model.py index 18d739689..ecfd0a799 100644 --- a/tests/integration/test_model.py +++ b/tests/integration/test_model.py @@ -5,6 +5,7 @@ import pandas as pd import pytest from dask.datasets import timeseries +from dask.distributed import Client, LocalCluster from tests.integration.fixtures import skip_if_external_scheduler from tests.utils import assert_eq @@ -151,18 +152,38 @@ def test_xgboost_training_prediction(c, gpu_training_df): # TODO - many ML tests fail on clusters without sklearn - can we avoid this? @skip_if_external_scheduler -def test_clustering_and_prediction(c, training_df): - c.sql( +@pytest.mark.parametrize("gpu", [False, pytest.param(True, marks=pytest.mark.gpu)]) +def test_clustering_and_prediction(c, training_df, gpu): + if gpu: + df = timeseries(freq="1d").reset_index(drop=True) + c.create_table("gpu_timeseries", df, persist=True, gpu=True) + + cluster = LocalCluster() + client = Client(cluster) + + c.sql( + """ + CREATE MODEL my_model WITH ( + model_class = 'cuml.dask.cluster.KMeans' + ) AS ( + SELECT x, y + FROM gpu_timeseries + LIMIT 100 + ) + """ + ) + else: + c.sql( + """ + CREATE MODEL my_model WITH ( + model_class = 'sklearn.cluster.KMeans' + ) AS ( + SELECT x, y + FROM timeseries + LIMIT 100 + ) """ - CREATE MODEL my_model WITH ( - model_class = 'sklearn.cluster.KMeans' - ) AS ( - SELECT x, y - FROM timeseries - LIMIT 100 ) - """ - ) check_trained_model(c) @@ -887,7 +908,7 @@ def test_experiment_automl_classifier(c, client, training_df): """ CREATE EXPERIMENT my_automl_exp1 WITH ( automl_class = 'tpot.TPOTClassifier', - automl_kwargs = (population_size = 2 ,generations=2,cv=2,n_jobs=-1), + automl_kwargs = (population_size=2, generations=2, cv=2, n_jobs=-1), target_column = 'target' ) AS ( SELECT x, y, x*y > 0 AS target @@ -912,7 +933,7 @@ def test_experiment_automl_regressor(c, client, training_df): """ CREATE EXPERIMENT my_automl_exp2 WITH ( automl_class = 'tpot.TPOTRegressor', - automl_kwargs = (population_size = 2, + automl_kwargs = (population_size=2, generations=2, cv=2, n_jobs=-1, From 07a060f1cfc9c72aee30cc0863875951aa91ce6a Mon Sep 17 00:00:00 2001 From: Sarah Yurick Date: Wed, 16 Nov 2022 14:46:30 -0800 Subject: [PATCH 7/9] separate gpu test --- tests/integration/test_model.py | 56 ++++++++++++++++----------------- 1 file changed, 28 insertions(+), 28 deletions(-) diff --git a/tests/integration/test_model.py b/tests/integration/test_model.py index ecfd0a799..49db01c98 100644 --- a/tests/integration/test_model.py +++ b/tests/integration/test_model.py @@ -152,38 +152,38 @@ def test_xgboost_training_prediction(c, gpu_training_df): # TODO - many ML tests fail on clusters without sklearn - can we avoid this? @skip_if_external_scheduler -@pytest.mark.parametrize("gpu", [False, pytest.param(True, marks=pytest.mark.gpu)]) -def test_clustering_and_prediction(c, training_df, gpu): - if gpu: - df = timeseries(freq="1d").reset_index(drop=True) - c.create_table("gpu_timeseries", df, persist=True, gpu=True) - - cluster = LocalCluster() - client = Client(cluster) - - c.sql( - """ - CREATE MODEL my_model WITH ( - model_class = 'cuml.dask.cluster.KMeans' - ) AS ( - SELECT x, y - FROM gpu_timeseries - LIMIT 100 - ) +def test_clustering_and_prediction(c, training_df): + c.sql( """ + CREATE MODEL my_model WITH ( + model_class = 'sklearn.cluster.KMeans' + ) AS ( + SELECT x, y + FROM timeseries + LIMIT 100 ) - else: - c.sql( - """ - CREATE MODEL my_model WITH ( - model_class = 'sklearn.cluster.KMeans' - ) AS ( - SELECT x, y - FROM timeseries - LIMIT 100 - ) + """ + ) + + check_trained_model(c) + + +@pytest.mark.gpu +def test_gpu_clustering_and_prediction(c, gpu_training_df): + cluster = LocalCluster() + client = Client(cluster) + + c.sql( """ + CREATE MODEL my_model WITH ( + model_class = 'cuml.dask.cluster.KMeans' + ) AS ( + SELECT x, y + FROM timeseries + LIMIT 100 ) + """ + ) check_trained_model(c) From 9a0a5bc76cf973702ee9510bfb0d957a9d3591dd Mon Sep 17 00:00:00 2001 From: Sarah Yurick Date: Mon, 28 Nov 2022 11:21:47 -0800 Subject: [PATCH 8/9] use gpu_client --- tests/integration/test_model.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/tests/integration/test_model.py b/tests/integration/test_model.py index 49db01c98..21df10045 100644 --- a/tests/integration/test_model.py +++ b/tests/integration/test_model.py @@ -169,10 +169,7 @@ def test_clustering_and_prediction(c, training_df): @pytest.mark.gpu -def test_gpu_clustering_and_prediction(c, gpu_training_df): - cluster = LocalCluster() - client = Client(cluster) - +def test_gpu_clustering_and_prediction(c, gpu_training_df, gpu_client): c.sql( """ CREATE MODEL my_model WITH ( From 264cc343c480bdddf389d5c2e525e03130339dd9 Mon Sep 17 00:00:00 2001 From: Sarah Yurick Date: Mon, 28 Nov 2022 11:39:11 -0800 Subject: [PATCH 9/9] remove imports --- tests/integration/test_model.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/integration/test_model.py b/tests/integration/test_model.py index 21df10045..d1d89248f 100644 --- a/tests/integration/test_model.py +++ b/tests/integration/test_model.py @@ -5,7 +5,6 @@ import pandas as pd import pytest from dask.datasets import timeseries -from dask.distributed import Client, LocalCluster from tests.integration.fixtures import skip_if_external_scheduler from tests.utils import assert_eq