Is there are way to push data from a Lambda function to a Kinesis stream? I have searched the internet but have not found any examples related to it.
Thanks.
To put data into the stream, you must specify the name of the stream, a partition key, and the data blob to be added to the stream. The partition key is used to determine which shard in the stream the data record is added to. All the data in the shard is sent to the same worker that is processing the shard.
You can use an AWS Lambda function to process records in an Amazon Kinesis data stream. A Kinesis data stream is a set of shards. Each shard contains a sequence of data records. A consumer is an application that processes the data from a Kinesis data stream.
Yes, you can send information from Lambda to Kinesis Stream and it is very simple to do. Make sure you are running Lambda with the right permissions.
const AWS = require('aws-sdk');
const kinesisConstant = require('./kinesisConstants'); //Keep it consistent
const kinesis = new AWS.Kinesis({
apiVersion: kinesisConstant.API_VERSION, //optional
//accessKeyId: '<you-can-use-this-to-run-it-locally>', //optional
//secretAccessKey: '<you-can-use-this-to-run-it-locally>', //optional
region: kinesisConstant.REGION
});
const savePayload = (payload) => {
//We can only save strings into the streams
if( typeof payload !== kinesisConstant.PAYLOAD_TYPE) {
try {
payload = JSON.stringify(payload);
} catch (e) {
console.log(e);
}
}
let params = {
Data: payload,
PartitionKey: kinesisConstant.PARTITION_KEY,
StreamName: kinesisConstant.STREAM_NAME
};
kinesis.putRecord(params, function(err, data) {
if (err) console.log(err, err.stack);
else console.log('Record added:',data);
});
};
exports.save = (payload) => {
const params = {
StreamName: kinesisConstant.STREAM_NAME,
};
kinesis.describeStream(params, function(err, data) {
if (err) console.log(err, err.stack);
else {
//Make sure stream is able to take new writes (ACTIVE or UPDATING are good)
if(data.StreamDescription.StreamStatus === kinesisConstant.STATE.ACTIVE
|| data.StreamDescription.StreamStatus === kinesisConstant.STATE.UPDATING ) {
savePayload(payload);
} else {
console.log(`Kinesis stream ${kinesisConstant.STREAM_NAME} is ${data.StreamDescription.StreamStatus}.`);
console.log(`Record Lost`, JSON.parse(payload));
}
}
});
};
module.exports = {
STATE: {
ACTIVE: 'ACTIVE',
UPDATING: 'UPDATING',
CREATING: 'CREATING',
DELETING: 'DELETING'
},
STREAM_NAME: '<your-stream-name>',
PARTITION_KEY: '<string-value-if-one-shard-anything-will-do',
PAYLOAD_TYPE: 'String',
REGION: '<the-region-where-you-have-lambda-and-kinesis>',
API_VERSION: '2013-12-02'
}
const kinesis = require('./kinesis');
exports.handler = (event, context, callback) => {
console.log('LOADING handler');
const done = (err, res) => callback(null, {
statusCode: err ? '400' : '200',
body: err || res,
headers: {
'Content-Type': 'application/json',
},
});
kinesis.save(event); // here we send it to the stream
done(null, event);
}
This should be done exactly like doing it on your computer.
Here's an example in nodejs
:
let aws = require('aws');
let kinesis = new aws.Kinesis();
// data that you'd like to send
let data_object = { "some": "properties" };
let data = JSON.stringify(data_object);
// push data to kinesis
const params = {
Data: data,
PartitionKey: "1",
StreamName: "stream name"
}
kinesis.putRecord(params, (err, data) => {
if (err) console.error(err);
else console.log("data sent");
}
Please note, this piece of code will not work, as the Lambda
has no permissions to your stream.
When accessing AWS
resources through Lambda
, it is better to use IAM
roles;
Lambda
, you can choose existing / create a role.IAM
, then Roles, and pick the role name you assigned to the Lambda
function.putRecord
, putRecords
).Then, test the Lambda
.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With