1 0
Read Time:2 Minute, 2 Second

During previous post we discussed about the Snowflake Data pipe line using the integration of Kinesis Streams, Firehose and Snowflake.
Please find the technical implementation of the above architecture . I tried to include the screenshot for (Kinesis Firehose and Snowflake) from source flow to destination. Also covered snapshot of python code as well Snowflake .

Amazon Kinesis Data Streams (KDS) is a massively scalable and durable real-time data streaming service. KDS can continuously capture gigabytes of data per second from various sources.

  • Website clickstreams, database event streams, financial transactions, social media feeds, IT logs, and location-tracking events. The data collected is available in milliseconds to enable real-time analytics use cases such as real-time dashboards, real-time anomaly detection, dynamic pricing, and more.

Kinesis Firehose is Amazon’s data-ingestion product offering for Kinesis. It is used to capture and load streaming data into other Amazon services such as S3 and Redshift. It will consist of producers, delivery streams, and destination. Producers will produce data records of your preference and send it to delivery streams.

Please find attached python code and Snowflake code along with attachment.

Note: The attached implementation is one of the approach to achieve the desired output and sure there would be multiple ways to achieve this.

##Kinesis_load.py
import boto3
import json
from datetime import datetime
import calendar
import random
import time

my_stream_name = 'mystream'

kinesis_client = boto3.client('kinesis', region_name='us-east-1')

def put_to_stream(thing_id, property_value, property_timestamp):
    payload = {
                'prop': str(property_value),
                'timestamp': str(property_timestamp),
                'thing_id': thing_id
              }

    print payload

    put_response = kinesis_client.put_record(
                        StreamName=my_stream_name,
                        Data=json.dumps(payload),
                        PartitionKey=thing_id)

while True:
    property_value = random.randint(40, 120)
    property_timestamp = calendar.timegm(datetime.utcnow().timetuple())
    thing_id = 'aa-bb'

    put_to_stream(thing_id, property_value, property_timestamp)

    # wait for 5 second
    time.sleep(5)
#kinesis_load_csv.py
import boto3
import json
import testdata
from datetime import datetime
import calendar
import random
import time
import csv

my_stream_name = 'mystream'
thing_id = 'aa-bb'

kinesis_client = boto3.client('kinesis', region_name='us-east-1')

with open("Customer_Invoice.csv") as f:
#Creating the ordered Dict
    reader = csv.DictReader(f)
    for row in reader:
        put_response = kinesis_client.put_record(
                StreamName=my_stream_name,
                Data=json.dumps(row),
                PartitionKey=thing_id)

Average Rating

5 Star
0%
4 Star
0%
3 Star
0%
2 Star
0%
1 Star
0%

Leave a Reply

Your email address will not be published. Required fields are marked *