Consider a Scenario where the Client manages a large Customer-Invoice details with constantly changing Invoice data. The company uses Snowflake to manage Invoice data in a table named S_INVOICE. This table receives daily loads(history + Current) to reflect changes in customer and invoice partial payment. The goal is to efficiently merge this dynamic data into a target table named S_INVOICE_TARGET while performing Insert operations using stored procedure.
Dynamic Merge Procedure:
The provided DYNAMIC_MERGE procedure can be directly utilized for this use case. Here’s a breakdown of its functionality:
- Dynamic Column Handling: The procedure retrieves column names from the source table S_INVOICE and constructs comma-separated lists for both Source and target tables. This ensures the procedure adapts to changes in the source table schema (adding/removing columns)
- Dynamic Merge Procedure:
The DYNAMIC_MERGE procedure takes two parameters:
P_TBL: The name of the staging table containing daily sales data.
P_TBL_TRGT: The name of the target table to merge data into.
- The procedure dynamically retrieves column names from the staging table using Snowflake’s INFORMATION_SCHEMA.COLUMNS.
- It constructs the MERGE INTO statement dynamically to merge data from the staging table into the target table.
- The ON condition uses the HASH(*) function to compare records based on all columns.
- When a record is not matched in the target table, it inserts the data from the staging table into the target table.
- After executing the merge query, the procedure returns a message indicating whether the merge was successful or failed.
Stored Procedure:
CREATE OR REPLACE PROCEDURE DYNAMIC_MERGE(P_TBL VARCHAR, P_TBL_TRGT VARCHAR)
RETURNS VARCHAR
LANGUAGE JAVASCRIPT
AS
$$
try {
var return_value = "";
var target_value = "TGT.";
var SQL_COL_NAME = "SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = '" + P_TBL + "'";
var stmt_col = snowflake.createStatement({sqlText: SQL_COL_NAME});
var rs_col = stmt_col.execute();
while (rs_col.next()) {
var col_name = rs_col.getColumnValue(1);
return_value += (return_value === "") ? col_name : (", " + col_name);
target_value += (target_value === "TGT.") ? col_name : (", TGT." + col_name);
}
// Construct the merge query dynamically
var merge_query = `
MERGE INTO ` + P_TBL_TRGT + ` TGT
USING (
WITH CTE AS (
SELECT ` + return_value + `,
ROW_NUMBER() OVER (PARTITION BY hash(*) ORDER BY hash(*)) AS rnm,
hash(` + return_value + `) AS hashkey
FROM ` + P_TBL + `
)
SELECT *, hash(` + return_value + `) AS tgt_hashkey FROM CTE
) src
ON hash(SRC.INV_AMT, SRC.CRID, SRC.CUST_BAL, SRC.PHONE, SRC.CUST_NUM, SRC.SSN, SRC.EMAIL, SRC.CUST_STAT, SRC.INV_NO)
= hash(TGT.INV_AMT, TGT.CRID, TGT.CUST_BAL, TGT.PHONE, TGT.CUST_NUM, TGT.SSN, TGT.EMAIL, TGT.CUST_STAT, TGT.INV_NO)
WHEN NOT MATCHED THEN
INSERT (` + target_value + `, last_inserted)
VALUES (` + return_value + `, current_date())
`;
var stmt_merge = snowflake.createStatement({sqlText: merge_query});
var result = stmt_merge.execute();
if (result.next()) {
return "Merge completed successfully.";
} else {
return "Merge failed.";
}
} catch (err) {
// Handle any errors that occur during execution
return "Error: " + err.message;
}
$$;
Screenshot:
Now insert new record and keep existing record in S_INVOICE table.
Benefits:
- Automated Insert: This approach reducing the need for manual data manipulation and ensuring data consistency.
- Improved Performance: By dynamically generating the merge statement, the procedure avoids the overhead of pre-defined queries for each data update, potentially improving performance