diff --git a/notebooks/data_quality_monitor.ipynb b/notebooks/data_quality_monitor.ipynb new file mode 100644 index 0000000..60d6323 --- /dev/null +++ b/notebooks/data_quality_monitor.ipynb @@ -0,0 +1,679 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Monitor data quality for promotion planning" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from sagemaker import get_execution_role\n", + "from sagemaker.image_uris import retrieve\n", + "from sagemaker.model import Model, Session\n", + "from sagemaker.model_monitor import (BatchTransformInput,\n", + " CronExpressionGenerator,\n", + " DataCaptureConfig, DefaultModelMonitor,\n", + " MonitoringDatasetFormat)\n", + "from sagemaker.model_monitor.dataset_format import DatasetFormat" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "sagemaker_session = Session()\n", + "sagemaker_runtime_client = sagemaker_session.sagemaker_runtime_client" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "role = get_execution_role()\n", + "region = 'us-east-1'\n", + "instance_count = 1\n", + "instance_type = 'ml.m4.xlarge'\n", + "data_bucket = 'adp-rnd-ml-datasets'\n", + "model_bucket = 'adp-rnd-ml-models'\n", + "stage_bucket = 'adp-rnd-ml-stage'\n", + "\n", + "endpoint_monitor = DefaultModelMonitor(\n", + " role=role,\n", + " instance_count=instance_count,\n", + " instance_type=instance_type,\n", + " volume_size_in_gb=20,\n", + " max_runtime_in_seconds=3600,\n", + " base_job_name='promotion-planning',\n", + " sagemaker_session=sagemaker_session\n", + ")\n", + "transform_monitor = DefaultModelMonitor(\n", + " role=role,\n", + " instance_count=instance_count,\n", + " instance_type=instance_type,\n", + " volume_size_in_gb=20,\n", + " max_runtime_in_seconds=3600,\n", + " base_job_name='promotion-planning',\n", + " sagemaker_session=sagemaker_session\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Model" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "jupyter": { + "source_hidden": true + }, + "tags": [] + }, + "outputs": [], + "source": [ + "image_uri = retrieve(framework='xgboost',\n", + " region=region,\n", + " version='0.90-1')\n", + "\n", + "bucket_prefix = 'promotion-planning/model/promotion-planning-train-job-2023-01-31-084806/output'\n", + "model_file_name = 'model.tar.gz'\n", + "model_s3_key = f'{bucket_prefix}/{model_file_name}'\n", + "model_url = f's3://{model_bucket}/{model_s3_key}'\n", + "model_name = 'promotion-planning-2023-01-31-084806'\n", + "\n", + "model = Model(image_uri=image_uri,\n", + " model_data=model_url,\n", + " role=role,\n", + " name=model_name)\n", + "\n", + "model.create()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Deploy model" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "initial_instance_count = 1\n", + "endpoint_instance_type = 'ml.t2.medium'\n", + "endpoint_name = 'promotion-planning-endpoint-084806'\n", + "endpont_capture_destination_s3_uri = f's3://{stage_bucket}/captured_data'\n", + "\n", + "data_capture_config = DataCaptureConfig(\n", + " enable_capture=True,\n", + " sampling_percentage=100,\n", + " destination_s3_uri=endpont_capture_destination_s3_uri\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "predictor = model.deploy(\n", + " initial_instance_count=initial_instance_count,\n", + " instance_type=endpoint_instance_type,\n", + " endpoint_name=endpoint_name,\n", + " data_capture_config=data_capture_config,\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Suggest baseline" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "training_dataset = 'promotion-planning/train/data.csv'\n", + "baseline_data_uri = f's3://{data_bucket}/{training_dataset}'\n", + "baseline_results_uri = f's3://{stage_bucket}/promotion-planning/baseline_results'" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "result = endpoint_monitor.suggest_baseline(\n", + " baseline_dataset=baseline_data_uri,\n", + " dataset_format=DatasetFormat.csv(header=True),\n", + " output_s3_uri=baseline_results_uri,\n", + " wait=True\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Create monitoring schedule" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "s3_report_path = f's3://{stage_bucket}/monitoring-results'\n", + "statistics_path = f'{baseline_results_uri}/statistics.json'\n", + "constraints_path = f'{baseline_results_uri}/constraints.json'" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Real-time endpoint" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "endpoint_mon_schedule_name = 'promotion-planning-endpoint-model-monitor-schedule'" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "endpoint_monitor.create_monitoring_schedule(\n", + " monitor_schedule_name=endpoint_mon_schedule_name,\n", + " endpoint_input=endpoint_name,\n", + " output_s3_uri=s3_report_path,\n", + " statistics=statistics_path,\n", + " constraints=constraints_path,\n", + " schedule_cron_expression=CronExpressionGenerator.hourly(),\n", + " enable_cloudwatch_metrics=True,\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Batch transform" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "transform_capture_destination_s3_uri = f's3://{stage_bucket}/captured_data/batch_tranform/promotion-planning'\n", + "transform_mon_schedule_name = 'promotion-planning-transform-model-monitor-schedule'" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "transform_monitor.create_monitoring_schedule(\n", + " monitor_schedule_name=transform_mon_schedule_name,\n", + " batch_transform_input=BatchTransformInput(\n", + " data_captured_destination_s3_uri=transform_capture_destination_s3_uri,\n", + " destination='/opt/ml/processing/input',\n", + " dataset_format=MonitoringDatasetFormat.csv(header=False),\n", + " ),\n", + " output_s3_uri=s3_report_path,\n", + " statistics=statistics_path,\n", + " constraints=constraints_path,\n", + " schedule_cron_expression=CronExpressionGenerator.hourly(),\n", + " enable_cloudwatch_metrics=True,\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Model monitor information" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "endpoint_monitor.monitoring_schedule_name = endpoint_mon_schedule_name\n", + "transform_monitor.monitoring_schedule_name = transform_mon_schedule_name" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Endpoint monitor" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "model_monitor = endpoint_monitor\n", + "mon_schedule_name = endpoint_mon_schedule_name" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "or" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Batch transform monitor" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "model_monitor = transform_monitor\n", + "mon_schedule_name = transform_mon_schedule_name" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### describe schedule" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "schedule_description = model_monitor.describe_schedule()\n", + "print('{}\\n{}\\n{}'.format(\n", + " schedule_description.get('MonitoringScheduleName'),\n", + " schedule_description.get('MonitoringScheduleStatus'),\n", + " schedule_description.get('EndpointName')\n", + "))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "schedule_description" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### last execution" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "list_executions = model_monitor.list_executions()\n", + "if list_executions:\n", + " last_list_execution = list_executions[-1].describe()\n", + " print('{}\\n{}\\n{}'.format(\n", + " last_list_execution.get('ProcessingJobStatus'),\n", + " last_list_execution.get('ExitMessage'),\n", + " last_list_execution['ProcessingEndTime'].isoformat() if last_list_execution.get('ProcessingEndTime') else ''\n", + " ))\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "list_executions[-1].constraint_violations().body_dict" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### describe monitoring schedule" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "describe_monitoring_schedule = sagemaker_session.describe_monitoring_schedule(monitoring_schedule_name=mon_schedule_name)\n", + "print('{}\\n{}\\n{}'.format(\n", + " describe_monitoring_schedule['MonitoringScheduleStatus'],\n", + " describe_monitoring_schedule['LastMonitoringExecutionSummary']['MonitoringExecutionStatus'] if describe_monitoring_schedule.get('LastMonitoringExecutionSummary') else '',\n", + " describe_monitoring_schedule['LastMonitoringExecutionSummary']['ScheduledTime'].isoformat() if describe_monitoring_schedule.get('LastMonitoringExecutionSummary') else '',\n", + "))\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### list monitoring schedules" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "monitoring_schedules = sagemaker_session.list_monitoring_schedules(endpoint_name=endpoint_name)\n", + "[{ 'MonitoringScheduleName': s.get('MonitoringScheduleName'),\n", + " 'MonitoringScheduleStatus': s.get('MonitoringScheduleStatus'),\n", + " 'EndpointName': s.get('EndpointName'),\n", + " 'MonitoringType': s.get('MonitoringType'),\n", + " } for s in monitoring_schedules['MonitoringScheduleSummaries']]" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### list monitoring executions" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "monitoring_executions = sagemaker_session.list_monitoring_executions(monitoring_schedule_name=mon_schedule_name)\n", + "[{ 'MonitoringExecutionStatus': s.get('MonitoringExecutionStatus'),\n", + " 'ScheduledTime': s.get('ScheduledTime').isoformat() if s.get('ScheduledTime') else '',\n", + " } for s in monitoring_executions['MonitoringExecutionSummaries'][:5]]" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Model usage" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Real-time endpoint" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "stage_file_path = f's3://{stage_bucket}/promotion-planning/input/stage.csv'\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "!pip install -Uq awswrangler" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import time\n", + "\n", + "import awswrangler as wr\n", + "\n", + "def predict_from_file(file_path: str, max_lines=None):\n", + " df = wr.s3.read_csv(file_path, header=0)\n", + " dataset_type = \"text/csv\"\n", + " for index, series in df.iterrows():\n", + " if index == max_lines:\n", + " break\n", + " payload = ','.join(series.astype(str).array)\n", + " response = sagemaker_runtime_client.invoke_endpoint(\n", + " EndpointName=endpoint_name,\n", + " Body=payload,\n", + " ContentType=dataset_type,\n", + " )\n", + " prediction = response[\"Body\"].read().decode()\n", + " print(prediction, end=' ')\n", + " time.sleep(.1)\n", + "\n", + "\n", + "def predict_spoiled_data(max_lines=None):\n", + " null_element_row = ',0.05,0.0,0.0,0.0,0.0,0.0,7.0,10.0,9.0,0.0,216.92,216.92,216.92,216.92,0.0,0.0,0.0,0.0,216.92,216.92,216.92,216.92,86.77,2.4996543,2.4996543,2.4996543,2.4996543,0.9999539,0.9999539,0.9999539,0.9999539,2.4996543,2.4996543,2.4996543,2.4996543,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0'\n", + " rows = []\n", + " rows.append(null_element_row)\n", + " element_row_template = '{},{},0.0,0.0,0.0,0.0,0.0,7.0,10.0,9.0,0.0,216.92,216.92,216.92,216.92,0.0,0.0,0.0,0.0,216.92,216.92,216.92,216.92,86.77,2.4996543,2.4996543,2.4996543,2.4996543,0.9999539,0.9999539,0.9999539,0.9999539,2.4996543,2.4996543,2.4996543,2.4996543,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0'\n", + " for i in range(max_lines):\n", + " rows.append(element_row_template.format((i-2)*10000.0, (i-2)*10000.0))\n", + " dataset_type = \"text/csv\"\n", + " for index, row in enumerate(rows):\n", + " if index == max_lines:\n", + " break\n", + " payload = row\n", + " response = sagemaker_runtime_client.invoke_endpoint(\n", + " EndpointName=endpoint_name,\n", + " Body=payload,\n", + " ContentType=dataset_type,\n", + " )\n", + " prediction = response[\"Body\"].read().decode()\n", + " print(prediction, end=' ')\n", + " time.sleep(.1)\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "predict_from_file(file_path=stage_file_path, max_lines=100)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "predict_spoiled_data(max_lines=100)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Batch transform" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from sagemaker.transformer import Transformer\n", + "from sagemaker.inputs import BatchDataCaptureConfig\n", + "\n", + "transform_instance_count = 1\n", + "transform_instance_type = 'ml.m4.xlarge'\n", + "base_transform_job_name='promotion-planning'\n", + "batch_input = 's3://{}/promotion-planning/input/'.format(stage_bucket)\n", + "batch_output = 's3://{}/promotion-planning/output'.format(stage_bucket)\n", + "\n", + "transformer = Transformer(\n", + " model_name=model_name,\n", + " instance_count=transform_instance_count,\n", + " instance_type=transform_instance_type,\n", + " strategy='SingleRecord',\n", + " assemble_with='Line',\n", + " output_path=batch_output,\n", + " max_concurrent_transforms=16,\n", + " max_payload=6,\n", + " base_transform_job_name=base_transform_job_name,\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "transformer.transform(\n", + " data=batch_input,\n", + " content_type='text/csv',\n", + " split_type='Line',\n", + " input_filter='$[1:]',\n", + " batch_data_capture_config=BatchDataCaptureConfig(\n", + " destination_s3_uri=transform_capture_destination_s3_uri,\n", + " ),\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Clean up" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "endpoint_monitor.delete_monitoring_schedule()\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "transform_monitor.delete_monitoring_schedule()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "sagemaker_session.delete_endpoint_config(endpoint_config_name=endpoint_name)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "sagemaker_session.delete_endpoint(endpoint_name=endpoint_name)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "sagemaker_session.delete_model(model_name=model_name)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "conda_amazonei_pytorch_latest_p37", + "language": "python", + "name": "conda_amazonei_pytorch_latest_p37" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.7.13" + }, + "vscode": { + "interpreter": { + "hash": "1b93f42604f61d1fcc96f49e8a7dce25aa3467e961bc3f64701b51556c6bc026" + } + } + }, + "nbformat": 4, + "nbformat_minor": 4 +}