Streaming The Data

2017-04-12, 19:06 Posted by: Aki Karjalainen

Kinesis

If you need to push large volumes of streaming data into AWS and do not want to set up your streaming infrastructure yourself Kinesis may be for you. There’s even some recently announced new features to Kinesis which makes it even more compelling option to what it used to be.

KafkaIn case you decided to set up your own streaming infrastructure you would need to consider e.g. durability and availability. What about maintenance costs, upgrading, applying security and other patches? Well, life with Kinesis is easier – scalability and elasticity is on the house. Consider operational overhead of choosing other data ingestion tools such as Flume or Sqoop or Kafka.

There’s basically three different “flavors” for Kinesis: Streams, Firehose and Analytics. Amazon Kinesis Firehose is for delivering real-time streaming data to end destinations such as S3, Redshift or Elasticsearch. You configure your data producers to send data to Firehose and it automatically delivers the data to the specified destination. With encryption, compression and all the bells and whistles. You can send data to your delivery stream using the Amazon Kinesis Agent or the Firehose API, using the AWS SDK. A while ago AWS released a possibility to perform light pre-processing of the incoming data before it will be delivered to the end destination (e.g. S3). Some other typical use cases may include adding some metadata to the data or converting the data to a different format before storing it. When you enable Firehose data transformation, Firehose buffers incoming data and invokes the specified Lambda function with each buffered batch asynchronously. The transformed data is sent from Lambda to Firehose for buffering and then delivered to the destination. It’s also possible to enable source record backup which will make sure that all your records are stored in S3, unaltered, and the altered/transformed ones are stored to the end destination of Firehose.

Luckily there’s pre-built Lambda blueprints to get you started with pre-processing data records. E.g. there’s blueprints for processing Apache logs and system logs to JSON or CSV format and you can customize the pre-built functions as you wish. Nice.

Kinesisprocessing

If you combine Firehose with pre-processing capabilities with Kinesis Analytics you can come up with cool scenarios. Kinesis Analytics is a service for processing streaming data in real time with SQL. It enables you to write queries against the data flowing through Firehose. And as usual, fully managed service which scales automatically to match the volume and throughput of your data. The following is an example which queries price data flowing though Firehose pipeline and analyzes the data records in 1 minute window and there’s even an example to invoke Lambda function on the results of the analysis. With a relatively simple SQL we could find the minimum and maximum amounts from our price stream from all the purchases flowing in.

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (

   "Puchase" INTEGER,

   "Min_Price"    DOUBLE,

   "Max_Price"    DOUBLE);

-- CREATE OR REPLACE PUMP to insert into output

CREATE OR REPLACE PUMP "STREAM_PUMP" AS

 INSERT INTO "DESTINATION_SQL_STREAM"

   SELECT STREAM "purchase",

                 MIN("price") AS Min_price,

                 MAX("price") AS Max_price

   FROM   "SOURCE_SQL_STREAM_001"

   GROUP BY "purchase",

            FLOOR("SOURCE_SQL_STREAM_001".ROWTIME TO MINUTE);





from __future__ import print_function

import json

import base64

print('Loading function')

def lambda_handler(event, context):

   output = []

   for record in event['records']:

       print("Record: %s" %(record))

       payload = base64.b64decode(record['data'])

       print("Payload: %s" %(payload))

       data = json.loads(payload)

       

       if int(data['Max_price']) > 50:

           print("Someone just made a purhcase over 50eur! Purhcase: %s!" %(data['purchase']))

       output_record = {

           'recordId': record['recordId'],

           'result': 'Ok',

           'data': base64.b64encode(payload)

       }

       output.append(output_record)

   print('Successfully processed {} records.'.format(len(event['records'])))

   return {'records': output}

 

Oh, and how about enhancing the example by pointing your purchase streaming data Firehose to massively scalable RedShift endpoint and doing your enterprise data warehouse analytics, reporting and visualization on the purchases there, near real-time.

Have fun.


Kinesis
comments powered by Disqus