In this post, we will explore a use case involving Dynamic Tables. Imagine a scenario where a customer uploads a feed file to an S3 bucket. A Snowpipe has been set up on the bucket to ingest the file into a Snowflake staging table as soon as a file upload notification is received. Dynamic tables are then generated on top of these staging tables to store the most recent dataset received from the source system.
Our discussion will focus on implementing the following requirement.
- Configure the Snowpipe on Bucket to load the table in Staging Layer.
- The source table may have duplicate records based on the same customer number. The cleansed layer will incorporate logic to fetch the latest customer based on the Customer creation date.
- Customer accessories may have multiple prices for the same product. The Cleansed Layer should select the product with the highest price.
- The Cleansed Layer will consist of two Dynamic Tables based on the Base Tables in the Staging Layer
- These Dynamic tables will refresh automatically depending on the Target Layer Dynamic table refresh frequency.
- The Cleansed Layer’s Dynamic Table will contain the most up-to-date and curated data according to business logic
- Finally the Target layer will have the Customer along with Accessory details and calculates the Price per Accessory.
- The Dynamic Table in the Target Layer has a lag period of 1 minute, triggering the refresh of the Dynamic Table in the Cleansed Layer
- In the event of an error while refreshing a Dynamic Table at any layer, an alert notification will be sent to stakeholders, including details of the impacted Dynamic Table
Technical Implementation:
Staging Layer: Say, Customer has uploaded the file to bucket and Snowpipe has ingested into Staging tables i.e.
Customer Table and Accessory_Item tables.
Cleansed Layer: We will be creating two Dynamic tables with above said business logic.
DOWNSTREAM: This term signifies that the Dynamic Table should be refreshed as needed when other Dynamic Tables that rely on it require a refresh. In our scenario, the Target Layer’s Dynamic Table initiates the refresh of the Dynamic Table mentioned above.
Target Layer:
Note: We are calculating Price_Per_Accessory by dividing Accessory Price /Accessory Count.
Now the Dynamic table would be refreshed:
Output:
Dynamic Table Graph:
Second Iteration:
During second iteration we have inserted few records in respective staging tables.
After the DT refresh we observed the below behavior in respective Dynamic Tables.
Verify the Refresh history of Target Layer Dynamic table and found below Error:
To automate the process we have to configure the ALERTS which will scan the DYNAMIC_TABLE_REFRESH_HISTORY table and verify if any failed entry for any Dynamic table is there. If there is any failure then ALERTS will send an email to the stakeholder about the failure.
Before Resuming the ALERTS we can verify our query. The below query returns the FAILED entry along with the DT table.
ALTER ALERT DT_FAILURE RESUME;
A mail will be received to notify the error.
To resolve the issue, we can have another file in bucket with Accessory count > 0 for C-104 record. Assume it has been updated with 5 in staging table. The Same would be reflected with DT table as well.
excellent explanation.