Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Pushing AWS Lambda data to Kinesis Stream

Is there are way to push data from a Lambda function to a Kinesis stream? I have searched the internet but have not found any examples related to it.

Thanks.

like image 645
Harish Avatar asked Jun 29 '16 09:06

Harish


People also ask

How do I push data to Kinesis data stream?

To put data into the stream, you must specify the name of the stream, a partition key, and the data blob to be added to the stream. The partition key is used to determine which shard in the stream the data record is added to. All the data in the shard is sent to the same worker that is processing the shard.

Can a Lambda function Write to Kinesis?

You can use an AWS Lambda function to process records in an Amazon Kinesis data stream. A Kinesis data stream is a set of shards. Each shard contains a sequence of data records. A consumer is an application that processes the data from a Kinesis data stream.


2 Answers

Yes, you can send information from Lambda to Kinesis Stream and it is very simple to do. Make sure you are running Lambda with the right permissions.

  1. Create a file called kinesis.js, This file will provide a 'save' function that receives a payload and sends it to the Kinesis Stream. We want to be able to include this 'save' function anywhere we want to send data to the stream. Code:

const AWS = require('aws-sdk');
const kinesisConstant = require('./kinesisConstants'); //Keep it consistent
const kinesis = new AWS.Kinesis({
  apiVersion: kinesisConstant.API_VERSION, //optional
  //accessKeyId: '<you-can-use-this-to-run-it-locally>', //optional
  //secretAccessKey: '<you-can-use-this-to-run-it-locally>', //optional
  region: kinesisConstant.REGION
});

const savePayload = (payload) => {
//We can only save strings into the streams
  if( typeof payload !== kinesisConstant.PAYLOAD_TYPE) {
    try {
      payload = JSON.stringify(payload);
    } catch (e) {
      console.log(e);
    }
  }

  let params = {
    Data: payload,
    PartitionKey: kinesisConstant.PARTITION_KEY,
    StreamName: kinesisConstant.STREAM_NAME
  };

  kinesis.putRecord(params, function(err, data) {
    if (err) console.log(err, err.stack);
    else     console.log('Record added:',data);
  });
};

exports.save = (payload) => {
  const params = {
    StreamName: kinesisConstant.STREAM_NAME,
  };

  kinesis.describeStream(params, function(err, data) {
    if (err) console.log(err, err.stack);
    else {
      //Make sure stream is able to take new writes (ACTIVE or UPDATING are good)
      if(data.StreamDescription.StreamStatus === kinesisConstant.STATE.ACTIVE
        || data.StreamDescription.StreamStatus === kinesisConstant.STATE.UPDATING ) {
        savePayload(payload);
      } else {
        console.log(`Kinesis stream ${kinesisConstant.STREAM_NAME} is ${data.StreamDescription.StreamStatus}.`);
        console.log(`Record Lost`, JSON.parse(payload));
      }
    }
  });
};
  1. Create a kinesisConstant.js file to keep it consistent :)

module.exports = {
  STATE: {
    ACTIVE: 'ACTIVE',
    UPDATING: 'UPDATING',
    CREATING: 'CREATING',
    DELETING: 'DELETING'
  },
  STREAM_NAME: '<your-stream-name>',
  PARTITION_KEY: '<string-value-if-one-shard-anything-will-do',
  PAYLOAD_TYPE: 'String',
  REGION: '<the-region-where-you-have-lambda-and-kinesis>',
  API_VERSION: '2013-12-02'
}
  1. Your handler file: we added the 'done' function to send a response to whoever wants to send the data to the stream but 'kinesis.save(event)' does all the work.

const kinesis = require('./kinesis');

exports.handler = (event, context, callback) => {
  console.log('LOADING handler');
  
  const done = (err, res) => callback(null, {
    statusCode: err ? '400' : '200',
    body: err || res,
    headers: {
      'Content-Type': 'application/json',
    },
  });
  
  kinesis.save(event); // here we send it to the stream
  done(null, event);
}
like image 82
Max Cabrera Avatar answered Oct 18 '22 22:10

Max Cabrera


This should be done exactly like doing it on your computer.

Here's an example in nodejs:

let aws = require('aws');
let kinesis = new aws.Kinesis();

// data that you'd like to send
let data_object = { "some": "properties" };
let data = JSON.stringify(data_object);

// push data to kinesis
const params = {
  Data: data,
  PartitionKey: "1",
  StreamName: "stream name"
}

kinesis.putRecord(params, (err, data) => {
  if (err) console.error(err);
  else console.log("data sent");
}

Please note, this piece of code will not work, as the Lambda has no permissions to your stream. When accessing AWS resources through Lambda, it is better to use IAM roles;

  1. When configuring a new Lambda, you can choose existing / create a role.
  2. Go to IAM, then Roles, and pick the role name you assigned to the Lambda function.
  3. Add the relevant permissions (putRecord, putRecords).

Then, test the Lambda.

like image 44
johni Avatar answered Oct 18 '22 21:10

johni