2 2
Read Time:2 Minute, 59 Second

Consider a Scenario where the Client manages a large Customer-Invoice details with constantly changing Invoice data. The company uses Snowflake to manage Invoice data in a table named S_INVOICE. This table receives daily loads(history + Current) to reflect changes in customer and invoice partial payment. The goal is to efficiently merge this dynamic data into a target table named S_INVOICE_TARGET while performing Insert operations using stored procedure.

Dynamic Merge Procedure:

The provided DYNAMIC_MERGE procedure can be directly utilized for this use case. Here’s a breakdown of its functionality:

  1. Dynamic Column Handling: The procedure retrieves column names from the source table S_INVOICE and constructs comma-separated lists for both Source and target tables. This ensures the procedure adapts to changes in the source table schema (adding/removing columns)
  2. Dynamic Merge Procedure:

The DYNAMIC_MERGE procedure takes two parameters:

P_TBL: The name of the staging table containing daily sales data.

P_TBL_TRGT: The name of the target table to merge data into.

  • The procedure dynamically retrieves column names from the staging table using Snowflake’s INFORMATION_SCHEMA.COLUMNS.
  • It constructs the MERGE INTO statement dynamically to merge data from the staging table into the target table.
  • The ON condition uses the HASH(*) function to compare records based on all columns.
  • When a record is not matched in the target table, it inserts the data from the staging table into the target table.
  • After executing the merge query, the procedure returns a message indicating whether the merge was successful or failed.

Stored Procedure:

CREATE OR REPLACE PROCEDURE DYNAMIC_MERGE(P_TBL VARCHAR, P_TBL_TRGT VARCHAR)
RETURNS VARCHAR
LANGUAGE JAVASCRIPT
AS
$$
try {
var return_value = "";
var target_value = "TGT.";
var SQL_COL_NAME = "SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = '" + P_TBL + "'";

var stmt_col = snowflake.createStatement({sqlText: SQL_COL_NAME});
var rs_col = stmt_col.execute();

while (rs_col.next()) {
var col_name = rs_col.getColumnValue(1);
return_value += (return_value === "") ? col_name : (", " + col_name);
target_value += (target_value === "TGT.") ? col_name : (", TGT." + col_name);
}

// Construct the merge query dynamically
var merge_query = `
MERGE INTO ` + P_TBL_TRGT + ` TGT
USING (
WITH CTE AS (
SELECT ` + return_value + `,
ROW_NUMBER() OVER (PARTITION BY hash(*) ORDER BY hash(*)) AS rnm,
hash(` + return_value + `) AS hashkey
FROM ` + P_TBL + `
)
SELECT *, hash(` + return_value + `) AS tgt_hashkey FROM CTE
) src
ON hash(SRC.INV_AMT, SRC.CRID, SRC.CUST_BAL, SRC.PHONE, SRC.CUST_NUM, SRC.SSN, SRC.EMAIL, SRC.CUST_STAT, SRC.INV_NO)
= hash(TGT.INV_AMT, TGT.CRID, TGT.CUST_BAL, TGT.PHONE, TGT.CUST_NUM, TGT.SSN, TGT.EMAIL, TGT.CUST_STAT, TGT.INV_NO)
WHEN NOT MATCHED THEN
INSERT (` + target_value + `, last_inserted)
VALUES (` + return_value + `, current_date())
`;

var stmt_merge = snowflake.createStatement({sqlText: merge_query});
var result = stmt_merge.execute();

if (result.next()) {
return "Merge completed successfully.";
} else {
return "Merge failed.";
}
} catch (err) {
// Handle any errors that occur during execution
return "Error: " + err.message;
}
$$;

Screenshot:

Source and Target TBL

 

Run Procedure
Verify data

Now insert new record and keep existing record in S_INVOICE table.

Delta Insert

Benefits:

  • Automated Insert: This approach reducing the need for manual data manipulation and ensuring data consistency.
  • Improved Performance: By dynamically generating the merge statement, the procedure avoids the overhead of pre-defined queries for each data update, potentially improving performance

 

Average Rating

5 Star
0%
4 Star
0%
3 Star
0%
2 Star
0%
1 Star
0%

Leave a Reply

Your email address will not be published. Required fields are marked *