diff --git a/docs/macros.md b/docs/macros.md index a028863..fe10da9 100644 --- a/docs/macros.md +++ b/docs/macros.md @@ -3053,6 +3053,162 @@ Generates SQL to build an Extended Tracking Satellite table using the provided p SELECT * FROM records_to_insert ``` +--- + +### sts + +###### view source: +[![Snowflake](./assets/images/platform_icons/snowflake.png)](https://github.com/Datavault-UK/dbtvault/blob/release/0.8.3/macros/tables/snowflake/sts.sql) + +Generates SQL to build a Status Tracking Satellite table using the provided parameters. + +#### Usage + +``` jinja +{{ dbtvault.sts(src_pk=src_pk, src_ldts=src_ldts, src_source=src_source, + src_status=src_status, src_hashdiff=src_hashdiff, source_model=source_model }} +``` + +#### Parameters + +| Parameter | Description | Type | Required? | +|--------------|---------------------------------------------|---------|-----------------------------------------------| +| src_pk | Source primary key column | String | :fontawesome-solid-check-circle:{ .required } | +| src_ldts | Source load date timestamp column | String | :fontawesome-solid-check-circle:{ .required } | +| src_source | Name of the column containing the source ID | String | :fontawesome-solid-check-circle:{ .required } | +| src_status | Source data status column | String | :fontawesome-solid-check-circle:{ .required } | +| src_hashdiff | Name of the status hashdiff column | String | :fontawesome-solid-check-circle:{ .required } | +| source_model | Staging model name | String | :fontawesome-solid-check-circle:{ .required } | + +!!! tip + [Read the tutorial](tutorial/tut_sts.md) for more details + +#### Example Metadata + +[See examples](metadata.md#status-tracking-satellites) + +#### Example Output + +=== "Snowflake" + + === "Base Load" + + ```sql + WITH source_data AS ( + SELECT a.CUSTOMER_HK, a.LOAD_DATE, a.RECORD_SOURCE + FROM DBTVAULT.TEST.STG_CUSTOMER AS a + WHERE a.CUSTOMER_HK IS NOT NULL + ), + + records_with_status AS ( + SELECT DISTINCT stage.CUSTOMER_HK, stage.LOAD_DATE, stage.RECORD_SOURCE, + 'I' AS STATUS + FROM source_data AS stage + ), + + records_with_status_and_hashdiff AS ( + SELECT d.CUSTOMER_HK, d.LOAD_DATE, d.RECORD_SOURCE, d.STATUS, + CAST((MD5_BINARY(NULLIF(UPPER(TRIM(CAST(STATUS AS VARCHAR))), ''))) AS BINARY(16)) AS STATUS_HASHDIFF + FROM records_with_status AS d + ), + + records_to_insert AS ( + SELECT DISTINCT stage.CUSTOMER_HK, stage.LOAD_DATE, stage.RECORD_SOURCE, stage.STATUS, stage.STATUS_HASHDIFF + FROM records_with_status_and_hashdiff AS stage + ) + + SELECT * FROM records_to_insert + ``` + + === "Subsequent Loads" + + ```sql + WITH source_data AS ( + SELECT a.CUSTOMER_HK, a.LOAD_DATE, a.RECORD_SOURCE + FROM DBTVAULT.TEST.STG_CUSTOMER AS a + WHERE a.CUSTOMER_HK IS NOT NULL + ), + + stage_datetime AS ( + SELECT MAX(b.LOAD_DATE) AS LOAD_DATETIME + FROM source_data AS b + ), + + latest_records AS ( + SELECT c.CUSTOMER_HK, c.LOAD_DATE, c.RECORD_SOURCE, c.STATUS, c.STATUS_HASHDIFF + FROM ( + SELECT current_records.CUSTOMER_HK, current_records.LOAD_DATE, current_records.RECORD_SOURCE, current_records.STATUS, current_records.STATUS_HASHDIFF, + RANK() OVER ( + PARTITION BY current_records.CUSTOMER_HK + ORDER BY current_records.LOAD_DATE DESC + ) AS rank + FROM DBTVAULT_DEV.TEST_TIM_WILSON.STS AS current_records + ) AS c + WHERE c.rank = 1 + ), + + records_with_status AS ( + SELECT DISTINCT stage.CUSTOMER_HK, stage.LOAD_DATE, stage.RECORD_SOURCE, + 'I' AS STATUS + FROM source_data AS stage + WHERE NOT EXISTS ( + SELECT 1 + FROM latest_records + WHERE (latest_records.CUSTOMER_HK = stage.CUSTOMER_HK + AND latest_records.STATUS != 'D') + ) + + UNION ALL + + SELECT DISTINCT latest_records.CUSTOMER_HK, + stage_datetime.LOAD_DATETIME AS LOAD_DATE, + latest_records.RECORD_SOURCE, + 'D' AS STATUS + FROM latest_records + INNER JOIN stage_datetime + ON 1 = 1 + WHERE NOT EXISTS ( + SELECT 1 + FROM source_data AS stage + WHERE latest_records.CUSTOMER_HK = stage.CUSTOMER_HK + ) + AND latest_records.STATUS != 'D' + AND stage_datetime.LOAD_DATETIME IS NOT NULL + + UNION ALL + + SELECT DISTINCT stage.CUSTOMER_HK, stage.LOAD_DATE, stage.RECORD_SOURCE, + 'U' AS STATUS + FROM source_data AS stage + WHERE EXISTS ( + SELECT 1 + FROM latest_records + WHERE (latest_records.CUSTOMER_HK = stage.CUSTOMER_HK + AND latest_records.STATUS != 'D' + AND stage.LOAD_DATE != latest_records.LOAD_DATE) + ) + ), + + records_with_status_and_hashdiff AS ( + SELECT d.CUSTOMER_HK, d.LOAD_DATE, d.RECORD_SOURCE, d.STATUS, + CAST((MD5_BINARY(NULLIF(UPPER(TRIM(CAST(STATUS AS VARCHAR))), ''))) AS BINARY(16)) AS STATUS_HASHDIFF + FROM records_with_status AS d + ), + + records_to_insert AS ( + SELECT DISTINCT stage.CUSTOMER_HK, stage.LOAD_DATE, stage.RECORD_SOURCE, stage.STATUS, stage.STATUS_HASHDIFF + FROM records_with_status_and_hashdiff AS stage + LEFT JOIN latest_records + ON latest_records.CUSTOMER_HK = stage.CUSTOMER_HK + WHERE latest_records.STATUS_HASHDIFF != stage.STATUS_HASHDIFF + OR latest_records.STATUS_HASHDIFF IS NULL + ) + + SELECT * FROM records_to_insert + ``` + +--- + ### pit ###### view source: diff --git a/docs/metadata.md b/docs/metadata.md index c059866..08ffd06 100644 --- a/docs/metadata.md +++ b/docs/metadata.md @@ -957,6 +957,52 @@ derived_columns: ___ +### Status Tracking Satellites + +#### Parameters + +[sts macro parameters](macros.md#sts) + +#### Metadata + +=== "Per-model - YAML strings" + + ```jinja + {%- set yaml_metadata -%} + source_model: v_stg_customer + src_pk: CUSTOMER_HK + src_ldts: LOAD_DATE + src_source: RECORD_SOURCE + src_status: STATUS + src_hashdiff: STATUS_HASHDIFF + {%- endset -%} + + {% set metadata_dict = fromyaml(yaml_metadata) %} + + {{ dbtvault.sts(src_pk=metadata_dict["src_pk"], + src_ldts=metadata_dict["src_ldts"], + src_source=metadata_dict["src_source"], + src_status=metadata_dict["src_status"], + src_hashdiff=metadata_dict["src_hashdiff"], + source_model=metadata_dict["source_model"]) }} + ``` + +=== "Per-Model - Variables" + + ```jinja + {%- set source_model = "v_stg_order_customer" -%} + {%- set src_pk = "CUSTOMER_HK" -%} + {%- set src_ldts = "LOAD_DATE" -%} + {%- set src_source = "RECORD_SOURCE" -%} + {%- set src_status = "STATUS" -%} + {%- set src_hashdiff = "STATUS_HASHDIFF" -%} + + {{ dbtvault.sts(src_pk=src_pk, src_ldts=src_ldts, src_source=src_source, + src_status=src_status, src_hashdiff=src_hashdiff, + source_model=source_model) }} + ``` +___ + ### Point-In-Time (PIT) Tables #### Parameters diff --git a/docs/tutorial/tut_sts.md b/docs/tutorial/tut_sts.md new file mode 100644 index 0000000..8f9dfaf --- /dev/null +++ b/docs/tutorial/tut_sts.md @@ -0,0 +1,110 @@ +Status tracking satellites +optional Data Vault 2.0 entity +can be maintained where there is no change data capture (CDC) system operating on the source +to track the status of the source business entity +for instance data with deleted status can be excluded from downstream business reporting + +!!! Note + Unlike other raw vault loads the source data provided must be a full snapshot of the source entity's business keys + that has been staged in the normal manner to add the primary key, load date and record source + + +### Structure + +In general, Status Tracking Satellites consist of 5 columns, described below. + +#### Primary Key (src_pk) +A primary key (or surrogate key) which is usually a hashed representation of the natural key. +For a Status Tracking Satellite, this should be the same as the corresponding Hub's PK. + +#### Load date (src_ldts) +A load date or load date timestamp. This identifies when the record was first loaded into the database. + +#### Record Source (src_source) +The source for the record. This can be a code which is assigned to a source name in an external lookup table, +or a string directly naming the source system. + +#### Status (src_status) +The status of the record calculated during processing, can be one of the values Insert(I), Update(U), or Delete(D). + +#### Hashdiff (src_hashdiff) +This is the name of the column that will store the hash of the record status value calculated during processing. + +### Creating Status Tracking Satellite models + +Create a new dbt model as before. We'll call this one `sts_customer`. + +=== "sts_customer.sql" + + ```jinja + {{ dbtvault.sts(src_pk=src_pk, src_ldts=src_ldts, src_source=src_source, + src_status=src_status, src_hashdiff=src_hashdiff, source_model=source_model }} + ``` + +To create an STS model, we simply copy and paste the above template into a model named after the STS we +are creating. dbtvault will generate an STS using parameters provided in the next steps. + +#### Materialisation + +The materialisation for **Status Tracking Satellites** must be `incremental`, as we only load and add new records +to the existing data set for a single point in time. + +### Adding the metadata + +Let's look at the metadata we need to provide to the [sts macro](../macros.md#sts). + +We provide the column names which we would like to select from the staging area (`source_model`). + +Using our [knowledge](#structure) of what columns we need in our `sts_customer` STS, we can identify columns in our +staging layer which map to them: + +| Parameter | Value | +|--------------|-----------------| +| source_model | v_stg_customer | +| src_pk | CUSTOMER_HK | +| src_ldts | LOAD_DATE | +| src_source | RECORD_SOURCE | +| src_status | STATUS | +| src_hashdiff | STATUS_HASDHIFF | + +When we provide the metadata above, our model should look like the following: + +=== "sts_customer.sql" + + ```jinja + {{ config(materialized='incremental') }} + + {%- set source_model = "v_stg_customer" -%} + {%- set src_pk = "CUSTOMER_HK" -%} + {%- set src_ldts = "LOAD_DATE" -%} + {%- set src_source = "RECORD_SOURCE" -%} + {%- set src_status = "STATUS" -%} + {%- set src_hashdiff = "STATUS_HASHDIFF" -%} + + {{ dbtvault.sts(src_pk=src_pk, src_ldts=src_ldts, src_source=src_source, + src_status=src_status, src_hashdiff=src_hashdiff, + source_model=source_model }} + ``` + +!!! Note + See our [metadata reference](../metadata.md#status-tracking-satellites) for more detail on how to provide metadata to Status Tracking Satellites. + +### Running dbt + +With our metadata provided and our model complete, we can run dbt to create our `sts_customer` Status Tracking Satellite, as follows: + +=== "< dbt v0.20.x" + `dbt run -m +sts_customer` +=== "> dbt v0.21.0" + `dbt run -s +sts_customer` + +The resulting Status Tracking Satellite will look like this: + +| CUSTOMER_PK | LOAD_DATE | SOURCE | STATUS | STATUS_HASHDIFF | +|-------------|-------------|--------|--------|-----------------| +| B8C37E... | 1993-01-01 | * | I | DD7536... | +| . | . | . | . | . | +| . | . | . | . | . | +| FED333... | 1993-01-01 | * | U | 4C6143... | + +--8<-- "includes/abbreviations.md" \ No newline at end of file diff --git a/mkdocs.yml b/mkdocs.yml index 0c24b72..9468bce 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -37,6 +37,7 @@ nav: - Links: 'tutorial/tut_links.md' - Transactional Links: 'tutorial/tut_t_links.md' - Satellites: 'tutorial/tut_satellites.md' + - Status Tracking Satellites: 'tutorial/tut_sts.md' - Effectivity Satellites: 'tutorial/tut_eff_satellites.md' - Multi-Active Satellites: 'tutorial/tut_multi_active_satellites.md' - Extended Tracking Satellites: 'tutorial/tut_xts.md'