
This blog post showcases a real-time data pipeline built in Snowflake that leverages Slowly Changing Dimensions (SCD 2) and Finalizer Tasks to ensure your customer data is always fresh, accurate, and reflects historical changes.
Imagine you have a system that continuously generates customer data, including customer number, status, balance, invoice information. However, your current system has a critical oversight and there’s no guarantee that a report summarizing the pipeline’s overall health is generated after each run. You can’t easily see if individual tasks within the pipeline succeeded or failed. If a task fails silently, it might go unnoticed requiring manual investigation after someone notices a discrepancy.
Snowflake’s Finalizer Tasks come to the rescue!. Guarantees an action always happens after a task graph (your data pipeline) finishes running, regardless of individual task success or failure.
The Snowflake Solution:
The Snowflake Solution:
This pipeline utilizes a series of tasks and tables to achieve real-time data ingestion and historical tracking:
- Source Table (CUSTOMER_SRC): This temporary staging area holds the latest customer data received from your external system.
- Landing Zone (CUSTOMER_LANDING): This table acts as a landing zone for the incoming data.
- Analytical Table (CUSTOMER_ANALYTIC): This table stores the historical data with additional columns for start and end dates. Also with a flag to identify active/inactive customers.
The Data Flow:
The Data Flow:
- CUST_TRUNC Task: A scheduled task runs every minute to truncate the CUSTOMER_SRC table, ensuring it only holds the latest data.
- CUST_LOAD Task: Following truncation, task calls a stored procedure (PROC_COPY_SF_LEAD_TO_RAW_STAGE()) to populate the CUSTOMER_SRC table with the latest customer data.
- SCD2 with MERGE (CUST_LANDING_LOAD Task): A key task utilizes a MERGE statement to handle incoming data in the CUSTOMER_SRC table and update the CUSTOMER_LANDING table accordingly:
- Match and Update: If a customer record exists with a different status, the details updates in the CUSTOMER_LANDING table.
- New Customer: If a new customer record is there, a new row inserts into the CUSTOMER_LANDING table.
- ANALYTICS_LOAD Task: Here’s how the MERGE statement works:
- Customer Deletion: If the METADATA$ACTION in the stream indicates a “DELETE” operation, the corresponding record in the CUSTOMER_ANALYTIC table flag as inactive (FLAG=’N’) and assign an ENDDATE with the current timestamp. This captures the historical validity of the customer record.
- New Customer Insertion: If the stream indicates a “INSERT” operation, a new record is inserts into the CUSTOMER_ANALYTIC table. It captures the initial state with a STARTDATE set to the current timestamp and a flag set to active (FLAG=’Y’).
- Finalizer Tasks(FINALIZE_TASKS) :
-
- Guarantee Reporting: Finalizer Task calls a stored procedure to create a report, ensure a summary is generated after each pipeline run.
- Data Integrity: If the pipeline encounters errors, the Finalizer Task might trigger corrective actions within the procedure, such as data deletion. This helps maintain data integrity by potentially removing inconsistent data introduced during a failed pipeline run
Technical Implementation:

Task Hierarchy
Table Details:

Stream Details:
CREATE OR REPLACE STREAM CUST_STREAM ON TABLE CUSTOMER_LANDING APPEND_ONLY = TRUE;
TASK Details:


Stored Procedure:

Finalize Stored Proc:

TASK Output:


Benefits:
Benefits:
- Real-Time Customer Insights: This pipeline allows you to access and analyze the latest customer data for informed decision-making.
- Accurate Historical Record: SCD2 ensures historical changes are captured and reflected in the data, providing valuable insights into customer behavior over time.
- Guaranteed Reporting: The Finalizer Task guarantees a report is generated after each pipeline execution, keeping you informed about potential issues.