From 631688c0e6f453536dd6f743fed28738c3a9e830 Mon Sep 17 00:00:00 2001 From: thangam-vaiyapuri Date: Fri, 25 Jul 2025 04:07:37 +0000 Subject: [PATCH 1/2] Sessionization transformwithstateinpandas initial commit This Schema Evolution Sessionization example shows how to use Apache Spark's transformWithStateInPandas API for schema evolution in stateful stream processing in PySpark. This notebook focuses on transformWithStateInPandas's ValueState capabilities to manage evolving session state schemas across processor versions while maintaining backward compatibility. --- .../README.md | 276 +++++++ .../sessionization_processor.ipynb | 671 ++++++++++++++++++ .../utils/processor.py | 326 +++++++++ .../utils/util.py | 216 ++++++ 4 files changed, 1489 insertions(+) create mode 100644 2025-07-sessionization-transformWithStateInPandas/python/sessionizationWithSchemaEvolution/README.md create mode 100644 2025-07-sessionization-transformWithStateInPandas/python/sessionizationWithSchemaEvolution/sessionization_processor.ipynb create mode 100644 2025-07-sessionization-transformWithStateInPandas/python/sessionizationWithSchemaEvolution/utils/processor.py create mode 100644 2025-07-sessionization-transformWithStateInPandas/python/sessionizationWithSchemaEvolution/utils/util.py diff --git a/2025-07-sessionization-transformWithStateInPandas/python/sessionizationWithSchemaEvolution/README.md b/2025-07-sessionization-transformWithStateInPandas/python/sessionizationWithSchemaEvolution/README.md new file mode 100644 index 0000000..ddc8ec6 --- /dev/null +++ b/2025-07-sessionization-transformWithStateInPandas/python/sessionizationWithSchemaEvolution/README.md @@ -0,0 +1,276 @@ +# Spark Structured Streaming with transformWithStateInPandas + +This Schema Evolution Sessionization example shows how to use Apache Spark's **transformWithStateInPandas** API for schema evolution in stateful stream processing in PySpark. This example focuses on transformWithStateInPandas's **ValueState** capabilities to manage evolving session state schemas across processor versions while maintaining backward compatibility. + +## Overview + +**transformWithStateInPandas** is a powerful Python API that enables schema evolution in Apache Spark Structured Streaming sessionization applications. It allows you to: + +- **Evolve session state schemas** from V1 to V2 formats without losing existing sessions +- **Maintain session continuity** across application upgrades and schema changes +- **Handle real-time customer journey reconstruction** with complex business logic +- **Process out-of-order and late-arriving events** with sophisticated state management +- **Automatically migrate existing state** when new fields are added or types are widened + +The repository demonstrates **ValueState schema evolution** through a real-world e-commerce sessionization use case, showing how customer sessions can seamlessly transition from basic tracking (V1) to enhanced analytics (V2) without data loss. + +## Requirements + +- Apache Spark 4.0+ (Databricks Runtime 16.3+) +- Python 3.12.3+ +- dbldatagen python package for synthetic data generation +- **RocksDB State Store Provider** (required for schema evolution) +- **Avro encoding format** (required for state serialization evolution) +- **RocksDB Changelog Checkpointing** (required for fault tolerance and faster recovery) + +## Business Use Case + +**StreamShop E-commerce Sessionization**: Track customer journeys in real-time as they navigate through an online retail platform. The system processes clickstream events (page views, searches, purchases, logouts) and groups them into meaningful sessions for: + +- **Real-time personalization** based on current session behavior +- **Conversion funnel analysis** to identify drop-off points +- **Customer journey reconstruction** across web and mobile platforms +- **Revenue attribution** per session with detailed analytics + +### The Schema Evolution Challenge + +Modern applications evolve continuously. Your mobile team deploys new tracking features, adding `device_type` and `page_category` fields, while your existing sessions are still active with the original V1 schema. Traditional approaches would lose this state or require full reprocessing. + +**transformWithStateInPandas** solves this by allowing **seamless schema evolution** where existing V1 sessions automatically upgrade to V2 format when new events arrive. + +## Example Application + +The sessionization example uses synthetic e-commerce clickstream data, processing user interactions across web and mobile platforms with automatic schema evolution. + +### ValueState Schema Evolution Example + +This example demonstrates using ValueState to evolve session schemas while preserving active session state: + +```python +class SessionizerV1(StatefulProcessor): + """V1 Processor - Establishes initial session state""" + + def init(self, handle: StatefulProcessorHandle) -> None: + # V1 State Schema - Basic session tracking + state_schema = StructType([ + StructField("session_id", StringType(), True), + StructField("user_id", StringType(), True), + StructField("event_count", IntegerType(), True), # Will be widened to Long + StructField("total_revenue", DoubleType(), True), + StructField("session_start", TimestampType(), True) + ]) + + self.session_state = handle.getValueState("session_state", state_schema) + self.terminal_events = {'purchase', 'logout'} +``` + +```python +class SessionizerV2(StatefulProcessor): + """V2 Processor - Demonstrates automatic schema evolution""" + + def init(self, handle: StatefulProcessorHandle) -> None: + # V2 State Schema - Enhanced with new fields and type widening + state_schema = StructType([ + StructField("session_id", StringType(), True), # Same as V1 + StructField("user_id", StringType(), True), # Same as V1 + StructField("event_count", LongType(), True), # TYPE WIDENING: Intβ†’Long + StructField("total_revenue", DoubleType(), True), # Same as V1 + StructField("session_start", TimestampType(), True), # Same as V1 + StructField("device_type", StringType(), True), # NEW FIELD + StructField("page_category", StringType(), True) # NEW FIELD + ]) + + # CRITICAL: Use same state variable name for automatic evolution + self.session_state = handle.getValueState("session_state", state_schema) + self.terminal_events = {'purchase', 'logout'} +``` + +### Schema Evolution Detection and Handling + +The V2 processor automatically detects and evolves V1 state: + +```python +def handleInputRows(self, key, rows: Iterator[pd.DataFrame], timer_values) -> Iterator[pd.DataFrame]: + # Process each event batch + for batch in rows: + for _, event in batch.iterrows(): + # Extract V2 event data with graceful field handling + device_type = str(event.get('device_type', 'unknown')) if event.get('device_type') is not None else 'unknown' + page_category = str(event.get('page_category', 'unknown')) if event.get('page_category') is not None else 'unknown' + + # Get current state - Databricks automatically handles schema evolution + current_state = self.session_state.get() + evolved_from_v1 = False + + if current_state is None: + # Brand new V2 session + state = (session_id, user_id, 1, revenue, timestamp, device_type, page_category) + else: + # SCHEMA EVOLUTION DETECTION + # When V1 state is read by V2, new fields (indices 5,6) will be None + if current_state[5] is None or current_state[6] is None: + print(f"[V2] πŸŽ‰ EVOLUTION DETECTED: {session_id} evolved from V1!") + evolved_from_v1 = True + + # Evolve V1 state to V2 format + state = ( + current_state[0], # session_id (preserved) + current_state[1], # user_id (preserved) + current_state[2] + 1, # event_count (auto Intβ†’Long) + 1 + current_state[3] + revenue, # total_revenue (accumulated) + current_state[4], # session_start (preserved) + device_type, # device_type (populated from V2 event) + page_category # page_category (populated from V2 event) + ) + else: + # Regular V2 session update + state = (current_state[0], current_state[1], current_state[2] + 1, + current_state[3] + revenue, current_state[4], + current_state[5], current_state[6]) +``` + +Key features: +- **Automatic schema evolution** when V1 state is read by V2 processor +- **Type widening** from IntegerType to LongType (handled by Databricks) +- **New field population** from current V2 events during evolution +- **Evolution tracking** with `evolved_from_v1` flag for analytics +- **Session continuity** preservation across schema changes + +## Code Structure + +``` +/Workspace/Users/[username]/sessionization/ +β”œβ”€β”€ sessionization_processor.py # Main notebook demonstrating schema evolution +└── utils/ + β”œβ”€β”€ util.py # Utility functions and synthetic data generation + └── processor.py # SessionizerV1 and SessionizerV2 classes +``` + +- **sessionization_processor.py**: Demonstrates the complete schema evolution workflow +- **processor.py**: Contains SessionizerV1 and SessionizerV2 StatefulProcessor implementations +- **util.py**: Provides synthetic e-commerce clickstream data generation and project setup utilities + +## How to Run + +These examples are designed to run in Databricks notebooks. Follow these steps: + +1. **Import the notebooks** into your Databricks workspace +2. **Configure the required settings** for schema evolution: + ```python + # REQUIRED: Configure RocksDB and Avro for schema evolution + spark.conf.set("spark.sql.streaming.stateStore.providerClass", + "com.databricks.sql.streaming.state.RocksDBStateStoreProvider") + spark.conf.set("spark.sql.streaming.stateStore.encodingFormat", "avro") + spark.conf.set("spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true") + ``` +3. **Ensure the volume path** in util.py exists and is accessible +4. **Run the sessionization notebook** to see schema evolution in action + +### Execution Flow: + +1. **Phase 1 - V1 Processing**: Establishes initial sessions with V1 schema, some sessions complete while others remain active +2. **State Store Inspection**: Examine persisted V1 session state +3. **Phase 2 - V2 Processing**: Uses the same checkpoint location to demonstrate automatic schema evolution +4. **Evolution Analysis**: Validate that V1 sessions successfully evolved to V2 format with enhanced fields + +Each phase: +1. Generates synthetic clickstream data using the `create_demo_data` function +2. Processes events with the appropriate processor (V1 or V2) +3. Writes sessionized results to Delta tables +4. Demonstrates state store evolution and session continuity + +## Key Concepts + +### StatefulProcessor Schema Evolution + +The example implements the `StatefulProcessor` interface with schema evolution support: + +- **init**: Defines state schemas for each version (V1 basic, V2 enhanced) +- **handleInputRows**: Processes events and automatically handles schema evolution +- **close**: Cleans up resources when the processor shuts down + +### Critical Configuration Requirements + +Schema evolution **requires specific configuration**: + +```python +# 1. Avro Encoding (REQUIRED for schema evolution) +spark.conf.set("spark.sql.streaming.stateStore.encodingFormat", "avro") + +# 2. RocksDB State Store (REQUIRED for schema evolution support) +spark.conf.set("spark.sql.streaming.stateStore.providerClass", + "com.databricks.sql.streaming.state.RocksDBStateStoreProvider") + +# 3. RocksDB Changelog Checkpointing (REQUIRED for fault tolerance) +# Enables faster recovery after failures by maintaining incremental changes +spark.conf.set("spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true") +``` + +**Why Each Configuration is Required:** +- **Avro Encoding**: Enables schema evolution capabilities in state serialization +- **RocksDB State Store**: Provides the underlying storage engine that supports schema evolution +- **Changelog Checkpointing**: Maintains incremental state changes for faster fault recovery and better performance + +### Schema Evolution Patterns + +The example demonstrates these evolution patterns: + +1. **Type Widening**: `IntegerType` β†’ `LongType` (automatic conversion by Databricks) +2. **Field Addition**: New fields appear as `None` when V1 state is read by V2 +3. **Evolution Detection**: Check for `None` values in new field positions +4. **State Migration**: Populate new fields from current events during evolution +5. **Backward Compatibility**: V2 can process both evolved V1 sessions and new V2 sessions + +### Business Session Logic + +Important sessionization concepts demonstrated: + +- **Terminal Events**: Sessions end on `purchase` or `logout` events +- **Session Continuity**: Active sessions persist across processor version changes +- **Revenue Tracking**: Accumulated across all events in a session +- **Context Enhancement**: V2 adds device and page category information +- **Evolution Tracking**: Maintains history of which sessions evolved from V1 + +## Expected Results + +### V1 Processing Results +- Sessions with basic schema (5 fields) +- Some sessions complete with terminal events +- Active sessions persist in state store + +### V2 Processing Results +- Sessions with enhanced schema (7 fields + evolution tracking) +- **`evolved_from_v1: true`** indicates successful schema evolution +- Enhanced context with `device_type` and `page_category` +- Preserved session data from V1 (event counts, revenue, timing) + +### Schema Evolution Validation +```sql +-- Analyze evolution patterns +SELECT evolved_from_v1, schema_version, COUNT(*) as session_count +FROM v2_sessions +GROUP BY evolved_from_v1, schema_version; + +-- View evolved sessions +SELECT session_id, user_id, event_count, total_revenue, device_type, page_category +FROM v2_sessions +WHERE evolved_from_v1 = true; +``` + +## Business Impact + +Schema evolution enables: + +- **Minimal Downtime Deployments**: Add new tracking fields without stopping the pipeline +- **Gradual Rollouts**: Deploy schema changes incrementally across services +- **Historical Continuity**: Preserve months of session state during upgrades +- **Analytics Continuity**: Maintain consistent metrics during platform evolution +- **Competitive Advantage**: Rapidly iterate on customer analytics without service disruption + +## References + +- [Apache Spark Structured Streaming Programming Guide](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html) +- [Databricks Documentation on Stateful Stream Processing](https://docs.databricks.com/en/structured-streaming/stateful-stream-processing.html) +- [PySpark API Reference - StatefulProcessor](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.streaming.StatefulProcessor.html) +- [Introducing transformWithState in Apache Spark Structured Streaming](https://www.databricks.com/blog/introducing-transformwithstate-apache-sparktm-structured-streaming) +- [Stateful Applications Schema Evolution](https://docs.databricks.com/en/stateful-applications/schema-evolution.html) \ No newline at end of file diff --git a/2025-07-sessionization-transformWithStateInPandas/python/sessionizationWithSchemaEvolution/sessionization_processor.ipynb b/2025-07-sessionization-transformWithStateInPandas/python/sessionizationWithSchemaEvolution/sessionization_processor.ipynb new file mode 100644 index 0000000..9e6bb91 --- /dev/null +++ b/2025-07-sessionization-transformWithStateInPandas/python/sessionizationWithSchemaEvolution/sessionization_processor.ipynb @@ -0,0 +1,671 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "8eee4215-1759-4a42-9e5d-ec6295f41be3", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "source": [ + "#### πŸ“‹ Summary \n", + "# \n", + "#### This Schema Evolution Sessionization example shows how to use Apache Spark's transformWithStateInPandas API for schema evolution in stateful stream processing in PySpark. This notebook focuses on transformWithStateInPandas's ValueState capabilities to manage evolving session state schemas across processor versions while maintaining backward compatibility.\n" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "5db54ca1-bc13-4656-a380-0ea968841aac", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "outputs": [], + "source": [ + "# =============================================================================\n", + "# PySpark Streaming Sessionization with Schema Evolution Demo\n", + "# =============================================================================\n", + "# This notebook demonstrates:\n", + "# 1. Stateful stream processing with PySpark\n", + "# 2. Schema evolution in streaming applications\n", + "# 3. State store management and persistence\n", + "# 4. Migration from V1 to V2 processor schemas\n", + "# =============================================================================" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "0e6f2fda-0389-4fd6-8771-cff015d07764", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "outputs": [], + "source": [ + "# Install required library for synthetic data generation\n", + "!pip install dbldatagen" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "5306770b-8ed8-44de-8a81-42b241de7a9d", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "outputs": [], + "source": [ + "# Import necessary modules for system and OS operations\n", + "import sys, os\n", + "\n", + "# Import the init module from the utils package\n", + "from utils import util\n", + "\n", + "# Get and display the project directory for reference\n", + "projectDir = util.get_project_dir()\n", + "print(\"project directory :\", projectDir)" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "552c80da-a30c-47df-abc0-3bfce16c84d4", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "outputs": [], + "source": [ + "# =============================================================================\n", + "# Spark Configuration for Stateful Processing\n", + "# =============================================================================\n", + "\n", + "# Configure Spark to use RocksDB as the state store provider\n", + "# RocksDB provides better performance and reliability for stateful operations\n", + "spark.conf.set(\n", + " \"spark.sql.streaming.stateStore.providerClass\",\n", + " \"com.databricks.sql.streaming.state.RocksDBStateStoreProvider\"\n", + ")\n", + "\n", + "# Enable changelog checkpointing for better fault tolerance\n", + "# This helps with faster recovery after failures by maintaining incremental changes\n", + "spark.conf.set(\n", + " \"spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled\", \n", + " \"true\"\n", + ")\n", + "\n", + "# Use Avro encoding format for state serialization\n", + "# Avro provides efficient serialization and supports schema evolution\n", + "spark.conf.set(\n", + " \"spark.sql.streaming.stateStore.encodingFormat\", \n", + " \"avro\"\n", + ")\n", + "\n" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "1a2055f3-5c39-4715-be11-1f29e4bf4f37", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "outputs": [], + "source": [ + "# =============================================================================\n", + "# Output and Checkpoint Path Configuration\n", + "# =============================================================================\n", + "\n", + "# Setup directory paths for checkpoints and outputs\n", + "# Using shared checkpoint to demonstrate schema evolution across processor versions\n", + "shared_checkpoint = f\"{projectDir}/sessionization/shared_checkpoint/\" # Shared state store location\n", + "v1_output = f\"{projectDir}/sessionization/v1_output/\" # V1 processor output\n", + "v2_output = f\"{projectDir}/sessionization/v2_output/\" # V2 processor output\n", + "\n", + "print(\"Shared checkpoint:\", shared_checkpoint)\n", + "print(\"V1 Output:\", v1_output)\n", + "print(\"V2 Output:\", v2_output)\n", + "\n", + "# Clean up any previous run data to ensure clean demo environment\n", + "# This removes existing checkpoints and outputs from previous executions\n", + "dbutils.fs.rm(shared_checkpoint, True) # Remove shared checkpoint directory\n", + "dbutils.fs.rm(v1_output, True) # Remove V1 output directory\n", + "dbutils.fs.rm(v2_output, True) # Remove V2 output directory\n" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "719d0416-d849-4a94-8915-5df3e6ed1f96", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "outputs": [], + "source": [ + "# =============================================================================\n", + "# Schema Definitions for Output Data\n", + "# =============================================================================\n", + "\n", + "from pyspark.sql.types import (\n", + " StructType, StructField, StringType, IntegerType, LongType,\n", + " TimestampType, DoubleType, BooleanType\n", + ")\n", + "from pyspark.sql.functions import col, lit, when, count, avg, col, array_contains, array, struct, array\n", + "\n", + "# Define output schemas for each processor version\n", + "# These schemas define the structure of completed session records\n", + "\n", + "# V1 Output Schema - Basic sessionization fields\n", + "V1_OUTPUT_SCHEMA = StructType([\n", + " StructField(\"session_id\", StringType(), True), # Unique session identifier\n", + " StructField(\"user_id\", StringType(), True), # User who owns the session\n", + " StructField(\"event_count\", IntegerType(), True), # Number of events in session\n", + " StructField(\"total_revenue\", DoubleType(), True), # Total revenue generated in session\n", + " StructField(\"session_start\", TimestampType(), True), # When session began\n", + " StructField(\"schema_version\", StringType(), True) # Schema version identifier\n", + "])\n", + "\n", + "# V2 Output Schema - Enhanced with additional fields and type evolution\n", + "V2_OUTPUT_SCHEMA = StructType([\n", + " StructField(\"session_id\", StringType(), True), # Same as V1\n", + " StructField(\"user_id\", StringType(), True), # Same as V1\n", + " StructField(\"event_count\", LongType(), True), # TYPE EVOLUTION: Int β†’ Long\n", + " StructField(\"total_revenue\", DoubleType(), True), # Same as V1\n", + " StructField(\"session_start\", TimestampType(), True), # Same as V1\n", + " StructField(\"device_type\", StringType(), True), # NEW: Device used in session\n", + " StructField(\"page_category\", StringType(), True), # NEW: Page category information\n", + " StructField(\"schema_version\", StringType(), True), # Schema version identifier\n", + " StructField(\"evolved_from_v1\", BooleanType(), True) # NEW: Evolution tracking flag\n", + "])\n", + "\n", + "print(\"Output schemas defined\")" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "7000c051-1b5a-4210-a2b2-a661c7a8fd74", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "outputs": [], + "source": [ + "# =============================================================================\n", + "# Synthetic Data Generation\n", + "# =============================================================================\n", + "\n", + "# Generate overlapping clickstream data containing both V1 and V2 schema events\n", + "# The data is designed with overlapping session IDs to demonstrate schema evolution\n", + "print(\"Generating clickstream data with deterministic overlapping session IDs...\")\n", + "streaming_data = util.create_demo_data(spark)\n" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "0f83d722-8dac-47ad-bf4f-83a60a35a3fe", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "outputs": [], + "source": [ + "# =============================================================================\n", + "# PHASE 1: V1 Schema Processing - Establish Initial State Store\n", + "# =============================================================================\n", + "\n", + "print(\"=\" * 60)\n", + "print(\"PHASE 1: STARTING WITH V1 SCHEMA\")\n", + "print(\"=\" * 60)\n", + "\n", + "# Filter streaming data to process only V1 events in this phase\n", + "# This simulates a production system initially running with V1 schema\n", + "v1_only_df = streaming_data.filter(col(\"schema_version\") == \"v1\")\n", + "\n", + "print(\"V1 Schema:\")\n", + "v1_only_df.printSchema()\n", + "\n", + "# Start V1 sessionization using transformWithStateInPandas\n", + "# This establishes the initial state store with V1 schema format\n", + "print(\"Starting V1 sessionization to establish state store...\")\n", + "\n", + "v1_sessionization_query = v1_only_df \\\n", + " .groupBy(\"session_id\") \\\n", + " .transformWithStateInPandas(\n", + " statefulProcessor=util.processor.SessionizerV1(), # Use V1 processor implementation\n", + " outputStructType=V1_OUTPUT_SCHEMA, # Define expected output schema\n", + " outputMode=\"append\", # Only output new completed sessions\n", + " timeMode=\"ProcessingTime\" # Use processing time for triggers\n", + " )\n", + "\n", + "# Write V1 sessionized data to Delta table with checkpointing\n", + "# The checkpoint location will store the V1 state for later evolution\n", + "v1_stream_query = v1_sessionization_query.writeStream \\\n", + " .format(\"delta\") \\\n", + " .outputMode(\"append\") \\\n", + " .option(\"checkpointLocation\", shared_checkpoint) \\\n", + " .option(\"path\", v1_output) \\\n", + " .start()\n", + "\n", + "print(\"V1 sessionization query started...\")" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "e6d8e257-e5a0-40e5-8201-68d1cefd7faa", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "outputs": [], + "source": [ + "# Let V1 run for a period to establish state store with multiple sessions\n", + "import time\n", + "print(\"Letting V1 run for 60 seconds to establish state store...\")\n", + "time.sleep(60)" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "b82281f0-f92e-4ee5-9f2b-2a327cfb176c", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "outputs": [], + "source": [ + "# =============================================================================\n", + "# V1 Results Inspection\n", + "# =============================================================================\n", + "\n", + "# Display sessionized results from V1 processor\n", + "print(\"=== V1 Sessionized Results ===\")\n", + "sessions_df = spark.read.format(\"delta\").load(v1_output)\n", + "sessions_df.createOrReplaceTempView(\"v1_session\")\n", + "display(sessions_df)\n" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "386ba649-1528-4c2b-a3ee-2d5373f6071a", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "outputs": [], + "source": [ + "# =============================================================================\n", + "# State Store Inspection\n", + "# =============================================================================\n", + "\n", + "# Inspect the state store to see persisted session state\n", + "# This shows sessions that are still active (not yet completed)\n", + "print(\"\\nπŸ—„οΈ STATE STORE INSPECTION:\")\n", + "state_store_values_df = spark.read.format(\"statestore\") \\\n", + " .option(\"operatorId\", \"0\") \\\n", + " .option(\"stateVarName\", \"session_state\") \\\n", + " .load(shared_checkpoint)\n", + " \n", + "state_count = state_store_values_df.count()\n", + "print(f\"SESSIONS WITH PERSISTED STATE: {state_count}\")\n", + "\n", + "print(\"\\nSTATE STORE CONTENTS:\")\n", + "display(state_store_values_df)\n" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "9636fd81-881d-42a1-b537-057658d7ecb0", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "outputs": [], + "source": [ + "# =============================================================================\n", + "# Allow V1 Processing Time and Graceful Shutdown\n", + "# =============================================================================\n", + "\n", + "# Let V1 run for a period to establish state store with multiple sessions\n", + "import time\n", + "print(\"Letting V1 run for 60 seconds to establish state store...\")\n", + "time.sleep(60)\n", + "\n", + "# Gracefully stop V1 query while preserving state store\n", + "# The state remains in the checkpoint for V2 to read and evolve\n", + "v1_stream_query.stop()\n", + "print(\"βœ… V1 query stopped. State store established with V1 schema.\")\n" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "4417a0fc-b488-4214-9ff2-539ee3690fdb", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "outputs": [], + "source": [ + "# =============================================================================\n", + "# V1 Results Analysis\n", + "# =============================================================================\n", + "\n", + "print(\"=\" * 60)\n", + "print(\"V1 RESULTS - INITIAL STATE ESTABLISHED\")\n", + "print(\"=\" * 60)\n", + "\n", + "# Read and analyze V1 results\n", + "v1_sessions_df = spark.read.format(\"delta\").load(v1_output)\n", + "v1_count = v1_sessions_df.count()\n", + "print(f\"V1 Sessions Generated: {v1_count}\")\n", + "\n", + "if v1_count > 0:\n", + " # Analyze V1 session characteristics\n", + " print(\"\\nV1 Schema Analysis:\")\n", + " v1_sessions_df.groupBy(\"schema_version\").count().show()\n", + "\n", + " print(\"\\nV1 Sample Sessions:\")\n", + " display(v1_sessions_df)\n", + "\n" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "849a7a9b-153c-4d17-ae59-8ea644a2baa5", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "outputs": [], + "source": [ + "# =============================================================================\n", + "# PHASE 2: V2 Schema Processing - Demonstrate Schema Evolution\n", + "# =============================================================================\n", + "\n", + "print(\"=\" * 60)\n", + "print(\"PHASE 2: SWITCHING TO V2 SCHEMA - SCHEMA EVOLUTION\")\n", + "print(\"=\" * 60)\n", + "\n", + "# Filter for V2 data (includes same session IDs as V1 for evolution demo)\n", + "# This simulates new events arriving with enhanced schema\n", + "v2_only_df = streaming_data.filter(col(\"schema_version\") == \"v2\")\n", + "\n", + "print(\"V2 Schema (note additional fields):\")\n", + "v2_only_df.printSchema()\n", + "\n", + "# Start V2 sessionization using SAME checkpoint location\n", + "# This is the key to schema evolution - V2 processor reads existing V1 state\n", + "# and automatically evolves it to V2 format when processing new events\n", + "print(\"Starting V2 sessionization with SAME checkpoint (demonstrates evolution)...\")\n", + "\n", + "v2_sessionization_query = v2_only_df \\\n", + " .groupBy(\"session_id\") \\\n", + " .transformWithStateInPandas(\n", + " statefulProcessor=util.processor.SessionizerV2(), # Use V2 processor with evolution logic\n", + " outputStructType=V2_OUTPUT_SCHEMA, # Enhanced output schema\n", + " outputMode=\"append\", # Only output completed sessions\n", + " timeMode=\"ProcessingTime\" # Processing time triggers\n", + " )\n", + "\n", + "# Write V2 sessionized data to separate output path for comparison\n", + "# Same checkpoint, different output demonstrates evolution in action\n", + "v2_stream_query = v2_sessionization_query.writeStream \\\n", + " .format(\"delta\") \\\n", + " .outputMode(\"append\") \\\n", + " .option(\"checkpointLocation\", shared_checkpoint) # SAME checkpoint as V1\n", + " .option(\"path\", v2_output) \\\n", + " .start()\n", + "\n", + "print(\"V2 sessionization query started with schema evolution...\")\n", + "\n" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "ed3b3f10-d98a-416e-bf84-402ec7aedc3c", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "outputs": [], + "source": [ + "# Let V2 run for a period to establish state store with multiple sessions\n", + "import time\n", + "print(\"Letting V2 run for 60 seconds to establish state store...\")\n", + "time.sleep(60)" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "86e0e32a-f90b-4a27-9615-15cd8011280c", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "outputs": [], + "source": [ + "# =============================================================================\n", + "# V2 Results Inspection\n", + "# =============================================================================\n", + "\n", + "# Display sessionized results from V2 processor\n", + "print(\"=== V2 Sessionized Results ===\")\n", + "v2_sessions_df = spark.read.format(\"delta\").load(v2_output)\n", + "v2_sessions_df.createOrReplaceTempView(\"v2_session\")\n", + "display(v2_sessions_df)\n" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "c1eab124-70e2-42a3-b713-54cdab952987", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "outputs": [], + "source": [ + "# =============================================================================\n", + "# Allow V2 Processing and Schema Evolution\n", + "# =============================================================================\n", + "\n", + "# Let V2 run to process events and demonstrate schema evolution\n", + "print(\"Letting V2 run for 60 seconds to demonstrate schema evolution...\")\n", + "time.sleep(60)\n", + "\n", + "# Gracefully stop V2 query\n", + "v2_stream_query.stop()\n", + "print(\"βœ… V2 query stopped. Schema evolution demonstrated.\")" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "314fcb36-5418-441f-8b92-63c3c45982be", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "outputs": [], + "source": [ + "# =============================================================================\n", + "# Schema Evolution Analysis and Results\n", + "# =============================================================================\n", + "\n", + "print(\"=\" * 60)\n", + "print(\"SCHEMA EVOLUTION ANALYSIS\")\n", + "print(\"=\" * 60)\n", + "\n", + "# Analyze V2 results to understand schema evolution behavior\n", + "v2_sessions_df = spark.read.format(\"delta\").load(v2_output)\n", + "v2_sessions_df.createOrReplaceTempView(\"v2_session_out\")\n", + "v2_count = v2_sessions_df.count()\n", + "print(f\"V2 Sessions Generated: {v2_count}\")\n", + "\n", + "if v2_count > 0:\n", + " # Analyze evolution patterns\n", + " print(\"\\nEvolution Analysis:\")\n", + " v2_sessions_df.groupBy(\"evolved_from_v1\", \"schema_version\").count().show()\n", + " \n", + " # Show sessions that were evolved from V1 state\n", + " print(\"\\nV2 sessions evolved from V1 state:\")\n", + " display(spark.sql(\"select * from v2_session_out where evolved_from_v1 = true\"))\n", + " \n", + " # Show all V2 sessions with enhanced schema\n", + " print(\"\\nAll V2 Sample Sessions (note new fields: device_type, page_category):\")\n", + " display(spark.sql(\"select * from v2_session_out\"))" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "520e7e02-8294-4ff2-bae2-bdc30acf05aa", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "outputs": [], + "source": [ + "# =============================================================================\n", + "# End of Schema Evolution Demo\n", + "# =============================================================================" + ] + } + ], + "metadata": { + "application/vnd.databricks.v1+notebook": { + "computePreferences": null, + "dashboards": [], + "environmentMetadata": { + "base_environment": "", + "environment_version": "3" + }, + "inputWidgetPreferences": null, + "language": "python", + "notebookMetadata": { + "pythonIndentUnit": 2 + }, + "notebookName": "sessionization_processor", + "widgets": {} + }, + "language_info": { + "name": "python" + } + }, + "nbformat": 4, + "nbformat_minor": 0 +} diff --git a/2025-07-sessionization-transformWithStateInPandas/python/sessionizationWithSchemaEvolution/utils/processor.py b/2025-07-sessionization-transformWithStateInPandas/python/sessionizationWithSchemaEvolution/utils/processor.py new file mode 100644 index 0000000..66452e7 --- /dev/null +++ b/2025-07-sessionization-transformWithStateInPandas/python/sessionizationWithSchemaEvolution/utils/processor.py @@ -0,0 +1,326 @@ +import pandas as pd +from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle +from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType, TimestampType, DoubleType, BooleanType +from typing import Iterator + +class SessionizerV1(StatefulProcessor): + """ + V1 Processor - ENHANCED with debug output to guarantee results + + This processor handles user session tracking by: + - Maintaining session state across streaming events + - Counting events and accumulating revenue per session + - Ending sessions when terminal events occur (purchase, logout) + - Outputting completed session summaries + """ + + def init(self, handle: StatefulProcessorHandle) -> None: + """ + Initialize the V1 processor with state schema and configuration. + + Args: + handle: StatefulProcessorHandle for managing state operations + """ + print("[V1 INIT] ========== INITIALIZING V1 PROCESSOR ==========") + + # Define V1 State Schema - basic session tracking fields + state_schema = StructType([ + StructField("session_id", StringType(), True), # Unique session identifier + StructField("user_id", StringType(), True), # User associated with session + StructField("event_count", IntegerType(), True), # Number of events in session + StructField("total_revenue", DoubleType(), True), # Cumulative revenue for session + StructField("session_start", TimestampType(), True) # When the session began + ]) + + # Initialize state store with the defined schema + self.session_state = handle.getValueState("session_state", state_schema) + + # Define events that trigger session completion + self.terminal_events = {'purchase', 'logout'} + + print(f"[V1 INIT] Terminal events: {self.terminal_events}") + print("[V1 INIT] ========== V1 PROCESSOR READY ==========") + + def handleInputRows(self, key, rows: Iterator[pd.DataFrame], timer_values) -> Iterator[pd.DataFrame]: + """ + Process incoming streaming data for a specific session key. + + Args: + key: Session identifier (usually session_id) + rows: Iterator of pandas DataFrames containing event data + timer_values: Timer information (not used in this implementation) + + Yields: + pd.DataFrame: Completed session summaries when terminal events occur + """ + # Extract session ID from key (handle both tuple and string formats) + session_id = key[0] if isinstance(key, tuple) else str(key) + print(f"\n[V1 BATCH] ========== PROCESSING BATCH FOR {session_id} ==========") + + # Process each batch of events for this session + for batch in rows: + # Skip empty batches + if batch.empty: + print(f"[V1 BATCH] Empty batch for {session_id}") + continue + + print(f"[V1 BATCH] Batch size: {len(batch)} events for {session_id}") + results = [] # Store completed sessions for output + + # Process each event in the batch + for idx, event in batch.iterrows(): + try: + # Extract event data with type conversion + event_type = str(event['event_type']) + user_id = str(event['user_id']) + revenue = float(event['revenue']) + timestamp = pd.Timestamp(event['event_timestamp']) + + print(f"[V1 EVENT] Processing {session_id}: {event_type} (revenue: {revenue})") + + # Retrieve current session state from state store + current_state = self.session_state.get() + + if current_state is None: + # Create new session state for first event + state = (session_id, user_id, 1, revenue, timestamp) + print(f"[V1 NEW] Created session {session_id} with first event {event_type}") + else: + # Update existing session state by incrementing counts + state = ( + current_state[0], # session_id (unchanged) + current_state[1], # user_id (unchanged) + current_state[2] + 1, # event_count + 1 + current_state[3] + revenue, # total_revenue + current revenue + current_state[4] # session_start (unchanged) + ) + print(f"[V1 UPDATE] Updated {session_id}: count={state[2]}, revenue={state[3]}") + + # Check if this event should terminate the session + is_terminal = event_type in self.terminal_events + print(f"[V1 CHECK] Event {event_type} terminal? {is_terminal}") + + if is_terminal: + # Session completed - create output record + result = { + 'session_id': state[0], + 'user_id': state[1], + 'event_count': int(state[2]), + 'total_revenue': float(state[3]), + 'session_start': state[4], + 'schema_version': 'v1' # Mark as V1 output + } + results.append(result) + print(f"[V1 COMPLETE] SESSION COMPLETED: {session_id} with {state[2]} events, revenue: {state[3]}") + + # Clear state since session is complete + self.session_state.clear() + print(f"[V1 COMPLETE] State cleared for {session_id}") + else: + # Session continues - update state store with new values + self.session_state.update(state) + print(f"[V1 CONTINUE] State updated for {session_id}, waiting for terminal event") + + except Exception as e: + # Log any processing errors and continue + print(f"[V1 ERROR] Error processing event: {e}") + import traceback + traceback.print_exc() + + print(f"[V1 BATCH] Batch complete. Results: {len(results)} sessions completed") + + # Output completed sessions or empty DataFrame with correct schema + if results: + result_df = pd.DataFrame(results) + print(f"[V1 OUTPUT] OUTPUTTING {len(results)} COMPLETED SESSIONS:") + for i, row in result_df.iterrows(): + print(f"[V1 OUTPUT] {row['session_id']}: {row['event_count']} events, ${row['total_revenue']}") + yield result_df + else: + # Return empty DataFrame with correct schema for downstream compatibility + empty_df = pd.DataFrame(columns=['session_id', 'user_id', 'event_count', 'total_revenue', 'session_start', 'schema_version']) + print(f"[V1 OUTPUT] No completed sessions in this batch") + yield empty_df + + def close(self) -> None: + """ + Clean up resources when processor is shut down. + """ + print("[V1 CLOSE] ========== V1 PROCESSOR CLOSED ==========") + pass + + +class SessionizerV2(StatefulProcessor): + """ + FIXED V2 Processor - Let Databricks handle schema evolution automatically + + This is an evolved version of the V1 processor that: + - Adds new fields (device_type, page_category) + - Widens event_count from Integer to Long + - Automatically handles schema evolution from V1 state + - Maintains backward compatibility with V1 sessions + """ + + def init(self, handle: StatefulProcessorHandle) -> None: + """ + Initialize the V2 processor with evolved state schema. + + Args: + handle: StatefulProcessorHandle for managing state operations + """ + print("[V2 INIT] ========== INITIALIZING FIXED V2 ==========") + + # V2 State Schema - evolved from V1 with additional fields and type widening + state_schema = StructType([ + StructField("session_id", StringType(), True), # Same as V1 + StructField("user_id", StringType(), True), # Same as V1 + StructField("event_count", LongType(), True), # TYPE WIDENING: Int->Long for larger counts + StructField("total_revenue", DoubleType(), True), # Same as V1 + StructField("session_start", TimestampType(), True), # Same as V1 + StructField("device_type", StringType(), True), # NEW FIELD: Track user's device + StructField("page_category", StringType(), True) # NEW FIELD: Track page category + ]) + + # Initialize state store - Databricks automatically handles schema evolution + self.session_state = handle.getValueState("session_state", state_schema) + + # Same terminal events as V1 + self.terminal_events = {'purchase', 'logout'} + + print("[V2 INIT] V2 processor ready - Databricks will handle schema evolution automatically") + + def handleInputRows(self, key, rows: Iterator[pd.DataFrame], timer_values) -> Iterator[pd.DataFrame]: + """ + Process incoming streaming data with schema evolution support. + + This method can handle both: + - New V2 sessions with all fields + - Existing V1 sessions that need to be evolved to V2 format + + Args: + key: Session identifier + rows: Iterator of pandas DataFrames containing event data + timer_values: Timer information (not used) + + Yields: + pd.DataFrame: Completed session summaries with V2 schema + """ + session_id = key[0] if isinstance(key, tuple) else str(key) + print(f"[V2] ========== PROCESSING {session_id} ==========") + + for batch in rows: + if batch.empty: + continue + + results = [] # Store completed sessions + + # Process each event in the batch + for _, event in batch.iterrows(): + try: + # Extract event data with safe defaults for new V2 fields + event_type = str(event['event_type']) + user_id = str(event['user_id']) + revenue = float(event['revenue']) + timestamp = pd.Timestamp(event['event_timestamp']) + + # Handle new V2 fields with fallback to 'unknown' + device_type = str(event.get('device_type', 'unknown')) if event.get('device_type') is not None else 'unknown' + page_category = str(event.get('page_category', 'unknown')) if event.get('page_category') is not None else 'unknown' + + print(f"[V2] Event: {event_type}, Device: {device_type}, Session: {session_id}") + + # Get current state - Databricks automatically handles schema evolution + current_state = self.session_state.get() + evolved_from_v1 = False # Track if this session was evolved from V1 + + if current_state is None: + print(f"[V2] NEW SESSION: {session_id}") + # Create new V2 session with all fields + state = (session_id, user_id, 1, revenue, timestamp, device_type, page_category) + else: + print(f"[V2] EXISTING STATE: {session_id}") + + # Detect schema evolution: V1 state will have None values in new fields + # When V1 state is read by V2 processor, new fields (indices 5,6) will be None + if current_state[5] is None or current_state[6] is None: + print(f"[V2] πŸŽ‰ EVOLUTION DETECTED: {session_id} has V1 state with None values in new fields!") + evolved_from_v1 = True + + # Evolve V1 state to V2 format using current event's data + state = ( + current_state[0], # session_id (unchanged) + current_state[1], # user_id (unchanged) + current_state[2] + 1, # event_count (auto-converted Int->Long) + 1 + current_state[3] + revenue, # total_revenue + current revenue + current_state[4], # session_start (unchanged) + device_type, # device_type (was None, now set from event) + page_category # page_category (was None, now set from event) + ) + print(f"[V2] EVOLVED: {session_id} from V1 -> V2 with {state[2]} events") + else: + print(f"[V2] REGULAR UPDATE: {session_id} already has V2 format") + # Already V2 format - perform regular update + state = ( + current_state[0], # session_id (unchanged) + current_state[1], # user_id (unchanged) + current_state[2] + 1, # event_count + 1 + current_state[3] + revenue, # total_revenue + revenue + current_state[4], # session_start (unchanged) + current_state[5], # device_type (keep existing) + current_state[6] # page_category (keep existing) + ) + + # Check if this event terminates the session + is_terminal = event_type in self.terminal_events + print(f"[V2] Terminal check: {event_type} -> {is_terminal}") + + if is_terminal: + # Session completed - create V2 output record + result = { + 'session_id': state[0], + 'user_id': state[1], + 'event_count': state[2], # Now Long type + 'total_revenue': float(state[3]), + 'session_start': state[4], + 'device_type': state[5], # New V2 field + 'page_category': state[6], # New V2 field + 'schema_version': 'v2', # Mark as V2 output + 'evolved_from_v1': evolved_from_v1 # Track evolution history + } + results.append(result) + + # Log completion with evolution status + evolution_msg = "EVOLVED FROM V1" if evolved_from_v1 else "NEW V2 SESSION" + print(f"[V2] COMPLETED: {session_id} - {evolution_msg}") + print(f"[V2] OUTPUT: {state[2]} events, ${state[3]}, {state[5]} device") + + # Clear state since session is complete + self.session_state.clear() + else: + # Session continues - update state store + self.session_state.update(state) + print(f"[V2] CONTINUE: {session_id} waiting for terminal event") + + except Exception as e: + # Log processing errors and continue + print(f"[V2] ERROR: {e}") + import traceback + traceback.print_exc() + + # Output results with V2 schema + if results: + print(f"[V2] BATCH OUTPUT: {len(results)} completed sessions") + yield pd.DataFrame(results) + else: + # Return empty DataFrame with complete V2 schema + print(f"[V2] No completed sessions this batch") + empty_columns = ['session_id', 'user_id', 'event_count', 'total_revenue', + 'session_start', 'device_type', 'page_category', 'schema_version', 'evolved_from_v1'] + yield pd.DataFrame(columns=empty_columns) + + def close(self) -> None: + """ + Clean up resources when processor is shut down. + """ + print("[V2] V2 processor closed") + pass \ No newline at end of file diff --git a/2025-07-sessionization-transformWithStateInPandas/python/sessionizationWithSchemaEvolution/utils/util.py b/2025-07-sessionization-transformWithStateInPandas/python/sessionizationWithSchemaEvolution/utils/util.py new file mode 100644 index 0000000..5f50507 --- /dev/null +++ b/2025-07-sessionization-transformWithStateInPandas/python/sessionizationWithSchemaEvolution/utils/util.py @@ -0,0 +1,216 @@ +# ============================================================================= +# Utility Functions for PySpark Streaming Sessionization Demo +# ============================================================================= +# This module provides utility functions for: +# 1. Project directory management in Databricks environment +# 2. Synthetic streaming data generation for schema evolution testing +# 3. Ensuring proper V1/V2 state persistence and evolution scenarios +# ============================================================================= + +def get_project_dir(): + """ + Get project directory path for the sessionization demo. + + This function constructs a user-specific project directory path within + Databricks volumes, ensuring each user has their own isolated workspace + for the demo data and checkpoints. + + Returns: + str: Full path to the user's project directory in the format: + /Volumes/main/demos/demos_volume/{username}/python/transformwithstate/sessionization + + Note: + - Uses Databricks DBUtils to get the current user's username + - Creates a user-specific directory to avoid conflicts between users + - Uses Databricks Volumes for persistent storage across cluster restarts + """ + from pyspark.dbutils import DBUtils + from pyspark.sql import SparkSession + + # Get current Spark session + spark = SparkSession.builder.getOrCreate() + + # Initialize Databricks utilities for accessing workspace context + dbutils = DBUtils(spark) + + # Extract the current user's username from the notebook context + # This ensures each user gets their own isolated directory + username = dbutils.notebook.entry_point.getDbutils().notebook().getContext().userName().get() + + # Define base volume path for persistent storage + volume_path = "/Volumes/main/demos/demos_volume" + + # Construct user-specific project directory path + # Format: /Volumes/.../username/python/transformwithstate/sessionization + projectDir = f"{volume_path}/{username}/python/transformwithstate/sessionization" + + return projectDir + +def create_demo_data(spark): + """ + Create synthetic streaming data designed for schema evolution testing. + + This function generates a rate stream with carefully crafted data patterns + that demonstrate PySpark streaming sessionization and schema evolution: + + Key Features: + - Creates overlapping session IDs across V1 and V2 events + - Ensures some V1 sessions remain active for V2 processor evolution + - Generates both terminal and non-terminal events in specific patterns + - Adds V2-specific fields (device_type, page_category) only for V2 events + + Args: + spark: SparkSession object for creating streaming DataFrames + + Returns: + DataFrame: Streaming DataFrame with the following schema: + - session_id: String (session_1 to session_6, cycling) + - user_id: String (user_1 to user_6, matching session IDs) + - event_timestamp: Timestamp (from rate stream) + - event_type: String (page_view, purchase, logout, search) + - revenue: Double (25.0, 50.0, 35.0, or 0.0) + - schema_version: String ("v1" for events 0-11, "v2" for 12+) + - device_type: String (mobile, desktop, tablet, smartphone) - V2 only + - page_category: String (checkout, profile, product, home) - V2 only + + Evolution Strategy: + - V1 Events (0-11): Some sessions get terminal events, others remain active + - V2 Events (12+): Previously active V1 sessions get terminal events + - This ensures V2 processor can demonstrate evolution of existing V1 state + """ + from pyspark.sql.functions import when, lit, col + + # Create base rate stream - generates sequential events with timestamps + # rowsPerSecond=1 provides steady stream for demo purposes + rate_stream = spark.readStream.format("rate").option("rowsPerSecond", 1).load() + + # Transform rate stream into sessionization demo data + # The 'value' column from rate stream (0, 1, 2, ...) drives all logic + demo_stream = rate_stream.select( + + # ============================================================================= + # Session ID Generation - Cycling Pattern + # ============================================================================= + # Create 6 different sessions that cycle based on value % 6 + # This ensures predictable session distribution across events + when(col("value") % 6 == 0, "session_1") + .when(col("value") % 6 == 1, "session_2") + .when(col("value") % 6 == 2, "session_3") + .when(col("value") % 6 == 3, "session_4") + .when(col("value") % 6 == 4, "session_5") + .otherwise("session_6").alias("session_id"), + + # ============================================================================= + # User ID Generation - Matches Session IDs + # ============================================================================= + # Each session belongs to a specific user (1:1 mapping) + # This simplifies the sessionization logic and makes results predictable + when(col("value") % 6 == 0, "user_1") + .when(col("value") % 6 == 1, "user_2") + .when(col("value") % 6 == 2, "user_3") + .when(col("value") % 6 == 3, "user_4") + .when(col("value") % 6 == 4, "user_5") + .otherwise("user_6").alias("user_id"), + + # ============================================================================= + # Timestamp - Direct from Rate Stream + # ============================================================================= + # Use the rate stream's timestamp for event ordering + col("timestamp").alias("event_timestamp"), + + # ============================================================================= + # Event Type Generation - CRITICAL FOR SCHEMA EVOLUTION + # ============================================================================= + # This is the most important part for demonstrating schema evolution: + # + # V1 Phase (events 0-11): + # - session_2 and session_4 get TERMINAL events (purchase/logout) + # - session_1, session_3, session_5, session_6 get NON-terminal events + # - This leaves 4 sessions with ACTIVE state in the state store + # + # V2 Phase (events 12+): + # - Previously active sessions (1,3,5,6) now get terminal events + # - This demonstrates V2 processor reading and evolving V1 state + when(col("value") < 12, # V1 events (first 12 events: 0-11) + when(col("value") % 6 == 1, "purchase") # session_2 terminates in V1 + .when(col("value") % 6 == 3, "logout") # session_4 terminates in V1 + .otherwise("page_view") # sessions 1,3,5,6 remain active + ).otherwise( # V2 events (events 12 and beyond) + when(col("value") % 6 == 0, "purchase") # session_1 terminates in V2 (evolution!) + .when(col("value") % 6 == 2, "logout") # session_3 terminates in V2 (evolution!) + .when(col("value") % 6 == 4, "purchase") # session_5 terminates in V2 (evolution!) + .otherwise("search") # session_6 and new cycles continue + ).alias("event_type"), + + # ============================================================================= + # Revenue Generation - Matches Purchase Events + # ============================================================================= + # Add revenue only for purchase events to make results more realistic + # Different sessions generate different revenue amounts + when(col("value") % 6 == 1, 25.0) # session_2 purchase in V1 + .when(col("value") % 6 == 0, 50.0) # session_1 purchase in V2 (evolution) + .when(col("value") % 6 == 4, 35.0) # session_5 purchase in V2 (evolution) + .otherwise(0.0).alias("revenue"), # All other events have 0 revenue + + # ============================================================================= + # Schema Version - Temporal Separation + # ============================================================================= + # First 12 events (0-11) are V1 schema events + # Events 12+ are V2 schema events with additional fields + # This temporal separation simulates a real schema migration scenario + when(col("value") < 12, "v1").otherwise("v2").alias("schema_version"), + + # ============================================================================= + # Device Type - V2 Schema Addition + # ============================================================================= + # This field only exists for V2 events (value >= 12) + # V1 events will have NULL/None for this field + # This demonstrates schema evolution with new field addition + when(col("value") >= 12, + when(col("value") % 6 == 0, "mobile") # session_1 uses mobile + .when(col("value") % 6 == 2, "desktop") # session_3 uses desktop + .when(col("value") % 6 == 4, "tablet") # session_5 uses tablet + .otherwise("smartphone") # other sessions use smartphone + ).alias("device_type"), # NULL for V1 events + + # ============================================================================= + # Page Category - V2 Schema Addition + # ============================================================================= + # Another V2-only field that demonstrates schema evolution + # V1 events will have NULL/None for this field + # Maps to different page types for business context + when(col("value") >= 12, + when(col("value") % 6 == 0, "checkout") # session_1 on checkout page + .when(col("value") % 6 == 2, "profile") # session_3 on profile page + .when(col("value") % 6 == 4, "product") # session_5 on product page + .otherwise("home") # other sessions on home page + ).alias("page_category") # NULL for V1 events + ) + + return demo_stream + +# ============================================================================= +# Data Generation Summary +# ============================================================================= +# +# Expected Data Flow: +# +# V1 Events (0-11): +# - Events 0,6: session_1/user_1, page_view, revenue=0, v1 schema +# - Events 1,7: session_2/user_2, purchase, revenue=25, v1 schema (TERMINATES) +# - Events 2,8: session_3/user_3, page_view, revenue=0, v1 schema +# - Events 3,9: session_4/user_4, logout, revenue=0, v1 schema (TERMINATES) +# - Events 4,10: session_5/user_5, page_view, revenue=0, v1 schema +# - Events 5,11: session_6/user_6, page_view, revenue=0, v1 schema +# +# After V1: Sessions 1,3,5,6 have ACTIVE state in state store +# +# V2 Events (12+): +# - Event 12: session_1, purchase, revenue=50, mobile, checkout (EVOLVES + TERMINATES) +# - Event 14: session_3, logout, revenue=0, desktop, profile (EVOLVES + TERMINATES) +# - Event 16: session_5, purchase, revenue=35, tablet, product (EVOLVES + TERMINATES) +# - Other events continue the pattern... +# +# This design guarantees that V2 processor will encounter existing V1 state +# and demonstrate successful schema evolution in a controlled, predictable way. +# ============================================================================= \ No newline at end of file From 6470b855389261d8cca3c5bff5237438fb7644d7 Mon Sep 17 00:00:00 2001 From: thangam-vaiyapuri Date: Fri, 25 Jul 2025 04:17:31 +0000 Subject: [PATCH 2/2] Added CodeOwners information --- CODEOWNERS | 1 + 1 file changed, 1 insertion(+) diff --git a/CODEOWNERS b/CODEOWNERS index 2eb034c..09baf8d 100644 --- a/CODEOWNERS +++ b/CODEOWNERS @@ -21,3 +21,4 @@ /automated-claims-processing/* @samanthawise @antondusak /2025-05-transformWithStateInPandas/* @jpdatabricks /2025-06-uc-migration-workflow-scoping/* @prashsub +/2025-07-sessionization-transformWithStateInPandas/* @thangam-vaiyapuri