I've a Dynamodb table with streaming enabled. Also I've created a trigger for this table which calls an AWS Lambda function. Within this lambda function, I'm trying read the new image (Dynamodb item after the modification) from the Dynamodb stream and trying to get the pure json string out of it. My Question is how can i get the pure json string of the DynamoDB item that's been sent over the stream? I'm using the code snippet given below to get the new Image, but I've no clue how to get the json string out of it. Appreciate your help.
public class LambdaFunctionHandler implements RequestHandler<DynamodbEvent, Object> {
@Override
public Object handleRequest(DynamodbEvent input, Context context) {
context.getLogger().log("Input: " + input);
for (DynamodbStreamRecord record : input.getRecords()){
context.getLogger().log(record.getEventID());
context.getLogger().log(record.getEventName());
context.getLogger().log(record.getDynamodb().toString());
Map<String,AttributeValue> currentRecord = record.getDynamodb().getNewImage();
//how to get the pure json string of the new image
//..............................................
}
return "Successfully processed " + input.getRecords().size() + " records.";
}
}
The AWS SDKs use JSON to send data to DynamoDB, and DynamoDB responds with JSON.
The listener's purpose is to detect when a new item is being added to the DynamoDB table remotely, and when it detects a new item in the DB, the app will inform the user about it with a push notification which will include also one of the attributes of the item.
You can store a JSON document as an attribute in a DynamoDB table. To do this, use the withJSON method of Item . This method parses the JSON document and maps each element to a native DynamoDB data type.
In c# you can convert newImage to pure json by use of DynamoDB Document class var jsonResult=Document.FromAttributeMap (streamRecord.Dynamodb.NewImage).ToJson (); and if you want to go further ahead to convert json to object you can use Newtonsoft TModel model = JsonConvert.DeserializeObject (jsonResult);
Whenever items are created, updated, or deleted in a table, DynamoDB Streams writes a stream record with the primary key attributes of the modified items. A stream record contains information about a data modification to a single item in a DynamoDB table.
DynamoDB Streams works particularly well with AWS Lambda due to its event-driven nature. They scale to the amount of data pushed through the stream and streams are only invoked if there's data that needs to be processed. In Serverless Framework, to subscribe your Lambda function to a DynamoDB stream, you might use following syntax:
Map<String, AttributeValue> dynamoDbAttributes = objectMapper.convertValue (dynamoDbMap, new TypeReference<Map<String, AttributeValue>> () {}); and then convert this DynamoDB Map into a plain Map (equivalent to the json originally pushed into DynamoDb):
Below is the complete code for converting from Dynamo JSON to Standard JSON:
import com.amazonaws.services.dynamodbv2.document.Item;
import com.amazonaws.services.dynamodbv2.document.internal.InternalUtils;
import com.amazonaws.services.dynamodbv2.model.AttributeValue;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.DynamodbEvent;
import com.amazonaws.services.lambda.runtime.events.DynamodbEvent.DynamodbStreamRecord;
import com.google.gson.Gson;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* Main Lambda class to receive event stream, parse it to Survey
* and process them.
*/
public class SurveyEventProcessor implements
RequestHandler<DynamodbEvent, String> {
private static final String INSERT = "INSERT";
private static final String MODIFY = "MODIFY";
public String handleRequest(DynamodbEvent ddbEvent, Context context) {
List<Item> listOfItem = new ArrayList<>();
List<Map<String, AttributeValue>> listOfMaps = null;
for (DynamodbStreamRecord record : ddbEvent.getRecords()) {
if (INSERT.equals(record.getEventName()) || MODIFY.equals(record.getEventName())) {
listOfMaps = new ArrayList<Map<String, AttributeValue>>();
listOfMaps.add(record.getDynamodb().getNewImage());
listOfItem = InternalUtils.toItemList(listOfMaps);
}
System.out.println(listOfItem);
try {
// String json = new ObjectMapper().writeValueAsString(listOfItem.get(0));
Gson gson = new Gson();
Item item = listOfItem.get(0);
String json = gson.toJson(item.asMap());
System.out.println("JSON is ");
System.out.println(json);
}catch (Exception e){
e.printStackTrace();
}
}
return "Successfully processed " + ddbEvent.getRecords().size() + " records.";
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With