In continuation of my previous Dynamic Tables post, let’s delve into the implementation of SCD2 using Dynamic Tables. I’ve come across several posts suggesting that Dynamic Tables (DT) are excellent replacements for streams and an effective way to implement SCD2. Out of curiosity, I wanted to explore how DT tables ensure the preservation of historical data by creating a new record for each change. Though this can be the basic example but its worth to share that how DT tables are used with SCD2.
Previously, we relied on Streams and tasks to implement SCD2 in Snowflake. However, the introduction of Dynamic Tables has simplified the SCD2 implementation process considerably. In this scenario, let’s assume that we have a historical feed file that needs to be loaded into Snowflake. For subsequent runs, we anticipate receiving a Delta feed file in the S3 bucket, containing new or updated records. The objective is to insert the new records into the target table. For the existing records, we aim to flag them as ‘Inactive’ with the flag set to ‘Y’ and specify an end date. Additionally, we need to create a new entry for the existing records.
So overall we are trying to implement the below flow.
Technical Implementation:
Technical Implementation:
Staging Table: CUSTOMER_INFO
In the initial run, a customer feed file was loaded into the staging table using Snowpipe.
Now we have created a Dynamic table on the top of this staging table. Here observe the logic how we handled the SCD2.
- We’ve identified duplicate records based on the same customer ID, ordered by LOAD_DATE in descending order.
- So, if we have multiple entries for the same customer, the record with the highest LOAD_DATE will have a rank of 1, and the remaining records will have ranks of 2, 3, and so on.
- The record with Rank = 1 would be the latest record and hence we generated 2 columns i.e. ACTIVE_FLG = ‘Y’ and END_DATE = NULL. For rest of the records the Flag would be set to ‘N’ and END_DATE would be populated with current date.
To verify the output during initial run:
Now the Incremental load starts and we get feed file which has inserted 1 new record and 2 existing records with updated data.
Now the DT table refreshes and we would see the below dataset in DT table.
On third day say we get one more entry for Sachin Mittal record.
Say we have adhoc request from the source team that few records need to be purged from the staging table and the Dynamic table should display the updated data accordingly.
Say we are asked to remove the following record.
To get only the Active records we have created another Dynamic table based on the CUSTOMER_INFO_HISTORY Dynamic table.
Query on the DT table and will get all active records.