In this post, we will explore the utilization of EVENT tables to capture erroneous data. Snowflake newly introduced public preview feature enables developers to actively monitor and debug their applications. Based on business requirement, a streamlined process is developed to handle invalid records automatically without manual intervention. To achieve this, a Python process is employed to process the input feed file, identifying and excluding the flawed records, while simultaneously logging them in an event table. Additionally, an alert mechanism is implemented to monitor and send email notifications whenever such bad records are detected.
Therefore, We have implemented the below framework in our project to develop data pipeline, logging to event tables, and alerting on bad data.
Prerequisite:
- Firstly, Create an Event table “logging_event_table“ in your account.
- Secondly, Associated it with your account.
- Create an email notification integration, with your email address.
- In addition, Create the Below tables:
- RAW table (VARIANT column)
- PARSE table: To hold the Parsed data into Structured format
- Final Table: To hold the final dataset along with two additional columns indicate the Record created and updating date.
- Above all, Stream on PARSE table.
Implementation:
The following points outline the complete set of requirements:
- Initially, Source team places the JSON feed file to the S3 bucket.
- A TASK is schedule in snowflake to COPY the file into RAW table that contains a VARIANT column. This task is designed to automate the data loading process into the designated table.
- Developed a Python User-Defined Table Function (UDTF) that serves the purpose of parsing JSON data in a tabular format. This UDTF takes JSON input and transforms it into structured tabular data and ingest into the CUSTOMER_JSON_PARSE table.
- To track the daily feed file changes and capture the delta updates, we have created a STREAM on the Parse table in. By leveraging this stream, we can efficiently identify the delta changes in the daily feed files.
- Additionally, as part of error handling and data quality control, the UDTF has been designed to identify any erroneous records. These erroneous records are logged into an Event table for capturing and monitoring data-related issues.
- In addition, Utilizing the stream, we perform a merge operation to incorporate the changes from the stream into the Target table. This enables us to update the Target table with the modified records and add two additional date fields: REC_CREATED and RECORD_UPDATED.
- Configure an alert to notify you via email when specific warnings of interest detects in the event table.
Technical Implementation:
Technical Implementation:
- Firstly, COPY the data from S3 to Snowflake.
copy into customer_json_info
from @demo_db.public.ext_csv_stage/Cust_Demographic.json
on_error = CONTINUE;
- Verify the structure of data load into table:
- Develop the Python UDTF which will emit log to the event table. However, Import the below two module.
- import json : To read JSON data.
- import logging : To Log into Event table.
- After that,Calling the UDTF function with respect to the RAW table. This will parse the JSON data and load into the CUSTOMER_JSON_PARSE table. Also if the Function sees any issue in the JSON file it will log the error into EVENT table.
insert into customer_json_parse
SELECT t.* FROM customer_json_info, TABLE(CUSTOMER_DEMOGRAPHIC2(v::varchar)) as t;
- Verify the data in PARSE table and STREAM.
select * from customer_json_parse;
select * from customer_json_parse_stream;
- Moreover, Schedule the Task to Run the Merge statement using STREAMS and load the Final table.
MERGE INTO customer_json_parse_final fnl USING customer_json_parse_stream strm
ON fnl.CUST_ID = strm.CUST_ID
WHEN MATCHED THEN UPDATE SET
fnl.CUST_AMT = strm.CUST_AMT,
fnl.CITY = strm.CITY,
fnl.STATE = strm.STATE,
fnl.FST_NAME = strm.FST_NAME,
fnl.LST_NAME = strm.LST_NAME,
fnl.COMPANY_NAME = strm.COMPANY_NAME,
fnl.INV_NO = strm.INV_NO,
fnl.INV_CD = strm.INV_CD,
fnl.INV_AMT = strm.INV_AMT,
fnl.CARD_NUM = strm.CARD_NUM,
fnl.CARD_TYPE = strm.CARD_TYPE,
fnl.Record_Updated = current_date()
WHEN NOT MATCHED THEN INSERT (CUST_ID,CUST_AMT,CITY,STATE,FST_NAME,LST_NAME,COMPANY_NAME,INV_NO,INV_CD,INV_AMT,CARD_NUM,CARD_TYPE,Rec_created,RECORD_UPDATED)
VALUES (strm.CUST_ID,strm.CUST_AMT,strm.CITY,strm.STATE,strm.FST_NAME,strm.LST_NAME,strm.COMPANY_NAME,strm.INV_NO,strm.INV_CD,strm.INV_AMT,strm.CARD_NUM,strm.CARD_TYPE,CURRENT_DATE(),CURRENT_DATE());
- Emphatically,Verify the Data inside the EVENT table and you will see an entry of failed or bad JSON records.
SELECT TIMESTAMP,RESOURCE_ATTRIBUTES,RECORD,VALUE
FROM demo_db.public.logging_event_table order by timestamp desc;
- Above all, Create the ALERTS which will read data from EVENT table and sent an EMAIL to the stakeholders.
- Finally, Verify if we have received the mail for Erroneous records.
In the upcoming post, we will explore the utilization of Event logging with both JavaScript procedures and SQL scripting procedures.
Hello, how to configure system email from snowflake, are there any credit usage for same?