1 0
Read Time:4 Minute, 13 Second

InsureTech Solutions processes thousands of car insurance policy documents daily. These policies arrive as PDFs from various sources – agents, brokers, and customers. Document AI is one of the most powerful capabilities inside Snowflake Cortex. While most demos show how to extract text from PDF files, the real value emerges when you operationalize Document AI inside an incremental ingestion pipeline. This solution uses Snowflake Streams on both Stages and Tables to create a fully incremental, event-driven pipeline. New files trigger automatic processing, and only changed data flows through the transformation layers.

In this blog, I will walk you through an end-to-end incremental Document AI ingestion framework that:

  1. Automatically detects new document.
  2.  Extracts structured JSON using Document AI
  3.  Loads only new/changed files into a processing table
  4. Parses attributes into a normalized table
  5.  Generates AI-based insights using Cortex Complete & Classify
  6. Runs fully serverless using Stages, Streams, Tasks, and SQL stored procedures

 Implementation Guide

 Implementation Guide

Create the Document AI model. I am not going to the detail as it is  quite  straight forward and refer other blogs. So the following model is created which reads  my car insurance pdf file.

Doc AI Model

Step 1: Foundation Setup

CREATE DATABASE doc_ai_db;

CREATE SCHEMA doc_ai_db.doc_ai_schema;

CREATE OR REPLACE STAGE car_stage
DIRECTORY = (ENABLE = TRUE)
ENCRYPTION = (TYPE = 'SNOWFLAKE_SSE');

Now, Upload file to  the stage.

File Upload

Step2: Create a Stream on the Stage

The stream captures only new files added to the stage.
This is the foundation of incremental Document AI pipelines.

CREATE OR REPLACE STREAM car_stream ON STAGE car_stage;

Car Stream

Step3:Raw Ingestion Table to Store Document AI Output

CREATE OR REPLACE TABLE car_insurance_details (
file_name VARCHAR,
file_size VARIANT,
last_modified VARCHAR,
snowflake_file_url VARCHAR,
upload_date DATE,
json_content VARIANT
);

Create a stream on this table for downstream processing:

CREATE OR REPLACE STREAM car_insurance_details_stm ON TABLE car_insurance_details;

Step 4: Automated Document AI Processing
CREATE OR REPLACE TASK load_new_car_insurance_data
WAREHOUSE = compute_wh
SCHEDULE = '1 minute'
WHEN SYSTEM$STREAM_HAS_DATA('car_stream') -- Only run when new files exist
AS
INSERT INTO car_insurance_details (
SELECT
RELATIVE_PATH AS file_name,
size AS file_size,
last_modified,
file_url AS snowflake_file_url,
CURRENT_DATE AS upload_date,
POLICY_ANALYS!PREDICT(
GET_PRESIGNED_URL('@car_stage', RELATIVE_PATH), 1
) AS json_content -- Document AI magic happens here
FROM car_stream
WHERE METADATA$ACTION = 'INSERT' -- Process only new files
);

Only incremental files from car_stream are processed.

Increment File

Step 5: Creating an Incremental Parser Using SQL Stored Procedure

This stored procedure checks if the final table exists, creates it if needed, and loads only incremental JSON rows from the stream.

DOC AI Proc

Step 6: Parse JSON into a Clean Table
CREATE OR REPLACE TASK parse_car_insurance_data
WAREHOUSE = compute_wh
AFTER load_new_car_insurance_data -- Task chaining
AS
CALL handle_car_insurance_policy();

Orchestration Flow:

  1. New file uploaded → Stage stream captures it
  2. Task 1 runs → Document AI extracts data
  3. Table stream captures new row
  4. Task 2 runs → Parses and structures data into Final table
Doc AI Parsed Data

Step 7: AI-Powered Enrichment with Cortex

Step 7: AI-Powered Enrichment with Cortex

This query enriches each policy with:

  • Risk category (High/Medium/Low)
  • Underwriting summary
  • Potential anomalies
  • Business explanation of risk

SELECT
*,
-- Risk Classification
SNOWFLAKE.CORTEX.CLASSIFY_TEXT(
'Premium: ' || premium_amount ||
', IDV: ' || idv_amount ||
', Damage Premium: ' || damage_premium_amount,
ARRAY_CONSTRUCT('High', 'Medium', 'Low')
) AS risk_category,

-- Policy Summary Generation
SNOWFLAKE.CORTEX.COMPLETE(
'mistral-large',
'You are an insurance underwriter. Summarize this policy: ' ||
'Policy No: ' || policy_no_value ||
', Insured: ' || insured_name ||
', Premium: ' || premium_amount
) AS policy_summary,

-- Anomaly Detection
SNOWFLAKE.CORTEX.COMPLETE(
'mistral-large',
'Identify red flags for this policy: ' ||
'Premium: ' || premium_amount ||
', IDV: ' || idv_amount
) AS anomaly_notes

FROM car_insurance_new_policy;

Next Day for Delta:

Now upload another file to the stage

Delta Process
Delta Car Insurance Stream

Execute the procedure and this will parse the data from new pdf and load the finaltargte table.

Next Day Proc Run

This architecture demonstrates how Snowflake Streams transform Document AI from a batch process into a real-time, event-driven pipeline. The combination of:

  • Stage Streams for file-level change detection
  • Table Streams for data-level change tracking
  • Task orchestration for automation
  • Cortex AI for intelligent enrichment

…creates a production-ready solution that’s cost-effective, scalable, and truly incremental.

 

Average Rating

5 Star
0%
4 Star
0%
3 Star
0%
2 Star
0%
1 Star
0%

Leave a Reply

Your email address will not be published. Required fields are marked *