I am trying to trigger an AWS lambda function written in Java, on dynamodb stream events. Amazon has a guide for the same, using NodeJS here http://docs.aws.amazon.com/lambda/latest/dg/wt-ddb-create-test-function.html
The testing input for NodeJS (from the above link) looks like an SNS event, so I tried to use the corresponding SNSEvent class in Java as an input to my handler method.
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.LambdaLogger;
import com.amazonaws.services.lambda.runtime.events.SNSEvent;
import com.amazonaws.services.lambda.runtime.events.SNSEvent.SNSRecord;
import java.util.List;
public class RecomFunction {
public void handler(SNSEvent event, Context context) {
LambdaLogger logger = context.getLogger();
List<SNSRecord> records = event.getRecords();
if (records != null) {
for (SNSRecord record : records) {
if (record != null) {
logger.log("SNS record: " + record.getSNS().getMessage());
}
}
}
}
}
Unfortunately, record.getSNS() returns NULL resulting to a NullPointer exception
There is a related question, however a specific answer was not given: Setup DynamoDB Trigger using Lambda
To configure your function to read from DynamoDB Streams in the Lambda console, create a DynamoDB trigger. Open the Functions page of the Lambda console. Choose the name of a function. Under Function overview, choose Add trigger.
You can invoke a Lambda function by creating an AWSLambda object and invoking its invoke method. Create an InvokeRequest object to specify additional information such as the function name and the payload to pass to the Lambda function.
We need to provide access to AWS Lambda function to Read/Write to Dynamo DB table. For that,we will create first a Polciy and then attach that policy to a Role. To create a new policy, go to Services and then IAM. Click on the Policies under Access Management and you will see following screen.
Lambda Triggers Explained with DynamoDB Integration By enabling DynamoDB Streams on a table, you will be able to associate an ARN with your Lambda function. Instantly after an item in the table is modified, a new record will appear in the table's stream.
This code worked for me. You can use it to receive and process DynamoDB events in a Lambda function -
public class Handler implements RequestHandler<DynamodbEvent, Void> {
@Override
public Void handleRequest(DynamodbEvent dynamodbEvent, Context context) {
for (DynamodbStreamRecord record : dynamodbEvent.getRecords()) {
if (record == null) {
continue;
}
// Your code here
}
return null;
}
}
Similarly, you can use SNSEvent
and SNSRecord
to handle Amazon SNS events.
This worked for me - case DynamoDB stream events:
import com.amazonaws.services.lambda.runtime.RequestHandler;
...
public class DynamoStreamHandler implements RequestHandler<Object, Void> {
@Override
public Void handleRequest(Object o, Context context) {
LinkedHashMap lhm = (LinkedHashMap) o;
...etc.
}
}
It seems they use a customized JSON mapper which utilizes Map
and List
objects. It's quite straightforward (but tedious) to verify this for other event types by testing and print-outing logs. (sigh)
EDIT: If ~5 MB overhead is ok, you can use DynamodbEvent.DynamodbStreamRecord
provided by aws-lambda-java-events
library v1.1.0 as described in AWS Lambda Walkthrough 3: Process Amazon DynamoDB Events (Java) in AWS Lambda Documentation.
Create a handler that takes an InputStream, read in the contents of the InputStream (which is just JSON) and then deserialize it to get the data that you need.
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import com.amazonaws.services.lambda.runtime.Context;
public class MyHandler {
public void handler(InputStream inputStream, OutputStream outputStream, Context context) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
int letter;
while((letter = inputStream.read()) != -1)
{
baos.write(letter);
}
//Send the contents of baos to a JSON deserializer ...
}
}
It's a bit cumbersome, but as far as I'm aware AWS doesn't currently provide a higher level Java lambda interface for consuming DynamoDB Streams. I have a full blown example here with details on how I deserialized the stream of JSON to get Java objects for the data.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With