
InsureTech Solutions processes thousands of car insurance policy documents daily. These policies arrive as PDFs from various sources – agents, brokers, and customers. Document AI is one of the most powerful capabilities inside Snowflake Cortex. While most demos show how to extract text from PDF files, the real value emerges when you operationalize Document AI inside an incremental ingestion pipeline. This solution uses Snowflake Streams on both Stages and Tables to create a fully incremental, event-driven pipeline. New files trigger automatic processing, and only changed data flows through the transformation layers.
In this blog, I will walk you through an end-to-end incremental Document AI ingestion framework that:
- Automatically detects new document.
- Extracts structured JSON using Document AI
- Loads only new/changed files into a processing table
- Parses attributes into a normalized table
- Generates AI-based insights using Cortex Complete & Classify
- Runs fully serverless using Stages, Streams, Tasks, and SQL stored procedures
Implementation Guide
Implementation Guide
Create the Document AI model. I am not going to the detail as it is quite straight forward and refer other blogs. So the following model is created which reads my car insurance pdf file.

Step 1: Foundation Setup
CREATE DATABASE doc_ai_db;
CREATE SCHEMA doc_ai_db.doc_ai_schema;
CREATE OR REPLACE STAGE car_stage
DIRECTORY = (ENABLE = TRUE)
ENCRYPTION = (TYPE = 'SNOWFLAKE_SSE');
Now, Upload file to the stage.

Step2: Create a Stream on the Stage
The stream captures only new files added to the stage.
This is the foundation of incremental Document AI pipelines.
CREATE OR REPLACE STREAM car_stream ON STAGE car_stage;

Step3:Raw Ingestion Table to Store Document AI Output
CREATE OR REPLACE TABLE car_insurance_details (
file_name VARCHAR,
file_size VARIANT,
last_modified VARCHAR,
snowflake_file_url VARCHAR,
upload_date DATE,
json_content VARIANT
);
Create a stream on this table for downstream processing:
CREATE OR REPLACE STREAM car_insurance_details_stm ON TABLE car_insurance_details;
Step 4: Automated Document AI Processing
CREATE OR REPLACE TASK load_new_car_insurance_data
WAREHOUSE = compute_wh
SCHEDULE = '1 minute'
WHEN SYSTEM$STREAM_HAS_DATA('car_stream') -- Only run when new files exist
AS
INSERT INTO car_insurance_details (
SELECT
RELATIVE_PATH AS file_name,
size AS file_size,
last_modified,
file_url AS snowflake_file_url,
CURRENT_DATE AS upload_date,
POLICY_ANALYS!PREDICT(
GET_PRESIGNED_URL('@car_stage', RELATIVE_PATH), 1
) AS json_content -- Document AI magic happens here
FROM car_stream
WHERE METADATA$ACTION = 'INSERT' -- Process only new files
);
Only incremental files from car_stream are processed.

Step 5: Creating an Incremental Parser Using SQL Stored Procedure
This stored procedure checks if the final table exists, creates it if needed, and loads only incremental JSON rows from the stream.


Step 6: Parse JSON into a Clean Table
CREATE OR REPLACE TASK parse_car_insurance_data
WAREHOUSE = compute_wh
AFTER load_new_car_insurance_data -- Task chaining
AS
CALL handle_car_insurance_policy();
Orchestration Flow:
- New file uploaded → Stage stream captures it
- Task 1 runs → Document AI extracts data
- Table stream captures new row
- Task 2 runs → Parses and structures data into Final table

Step 7: AI-Powered Enrichment with Cortex
Step 7: AI-Powered Enrichment with Cortex
This query enriches each policy with:
- Risk category (High/Medium/Low)
- Underwriting summary
- Potential anomalies
- Business explanation of risk
SELECT
*,
-- Risk Classification
SNOWFLAKE.CORTEX.CLASSIFY_TEXT(
'Premium: ' || premium_amount ||
', IDV: ' || idv_amount ||
', Damage Premium: ' || damage_premium_amount,
ARRAY_CONSTRUCT('High', 'Medium', 'Low')
) AS risk_category,
-- Policy Summary Generation
SNOWFLAKE.CORTEX.COMPLETE(
'mistral-large',
'You are an insurance underwriter. Summarize this policy: ' ||
'Policy No: ' || policy_no_value ||
', Insured: ' || insured_name ||
', Premium: ' || premium_amount
) AS policy_summary,
-- Anomaly Detection
SNOWFLAKE.CORTEX.COMPLETE(
'mistral-large',
'Identify red flags for this policy: ' ||
'Premium: ' || premium_amount ||
', IDV: ' || idv_amount
) AS anomaly_notes
FROM car_insurance_new_policy;
Next Day for Delta:
Now upload another file to the stage


Execute the procedure and this will parse the data from new pdf and load the finaltargte table.

This architecture demonstrates how Snowflake Streams transform Document AI from a batch process into a real-time, event-driven pipeline. The combination of:
- Stage Streams for file-level change detection
- Table Streams for data-level change tracking
- Task orchestration for automation
- Cortex AI for intelligent enrichment
…creates a production-ready solution that’s cost-effective, scalable, and truly incremental.