0 0
Read Time:2 Minute, 2 Second

During the last post (Kinesis Data stream to Snowflake), we saw how Snowpipe has been used to ingest parquet data into Snowflake. But beforehand we were supposed to create the parquet table structure in advance. Later on, Snowpipe just ingested the data into desired table. Now the ask is, if we need to create the table at run time instead of creating in advance. We need to develop a JavaScript procedure where we would be creating the Parquet table at the run time. Also Consider the scenario where we are getting files (PARQUET format) from multiple sources and structure of Files is also varies depending on the source system. Process should be robust enough that it should read the multiple file structure and create the table at run time.

We will be leveraging Schema Detection feature and create the JavaScript procedure to achieve the same. Though can be the multiple ways to take care this scenario, but we wanted to get hands on in JS procedure. Hence we followed this approach only.

CREATE OR REPLACE PROCEDURE Parquet_Table_Creation()
RETURNS VARCHAR
LANGUAGE JAVASCRIPT
AS
$$
result = "";
try {
var sql_cmd = `create or replace table Parquet_Data_Load using template
(select array_agg(object_construct(*)) from table
(
infer_schema(
location=>'@ext_parquet_stage/',
file_format=>'parquet_format'
)
))`;
var sql_stmt = snowflake.createStatement({sqlText: sql_cmd});
var rs=sql_stmt.execute();
rs.next();
result = "Table Creation completed successfully \n";

} catch(err) {
result = "Message: " + err.message;
return result;
}

var dataload = `copy into Parquet_Data_Load from @demo_db.public.ext_parquet_stage/
on_error = 'ABORT_STATEMENT'
MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE`;

var cnt = `select count(*) from Parquet_Data_Load`;

try {
var stmtload = snowflake.createStatement( { sqlText: dataload } );
stmtload.execute();

var stmtcnt = snowflake.createStatement( { sqlText: cnt } );
var rs = stmtcnt.execute();
rs.next();
var rowCount = (stmtcnt.getRowCount()>0) ? rs.getColumnValue(1) : 0;

result += "\n Total Rows loaded(" + rowCount + ")";
}

catch (err) {
result = "Failed: Code: " + err.code + "\n State: " + err.state;
result += "\n Message: " + err.message;
result += "\nStack Trace:\n" + err.stackTraceTxt;
}

return result;
$$;

JavaScript Procedure Call

Procedure has been executed and it has loaded 200 files into the Parquet table.

List Stage

As we see in the above screenshot, there are four Parquet files available in the staging area.

Table output

Average Rating

5 Star
0%
4 Star
0%
3 Star
0%
2 Star
0%
1 Star
0%

Leave a Reply

Your email address will not be published. Required fields are marked *