Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kinesis firehose data transformation using Java

When using Java Lambda function to do a kinesis data firehose transformation , getting the below error. The below is my transformed JSON look like

{
"records": [
    {
        "recordId": "49586022990098427206724983301551059982279766660054253570000000",
        "result": "Ok",
        "data": "ZXlKMGFXTnJaWEpmYzNsdFltOXNJam9pVkVWVFZEY2lMQ0FpYzJWamRHOXlJam9pU0VWQlRGUklRMEZTUlNJc0lDSmphR0Z1WjJVaQ0KT2kwd0xqQTFMQ0FpY0hKcFkyVWlPamcwTGpVeGZRbz0="
    }
] 
}

error in the kinesis console is like

Invalid output structure: Please check your function and make sure the processed records contain valid result status of Dropped, Ok, or ProcessingFailed

Anyone have an idea on this , i could not find an example code using Java on the kinesis data transformation

https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html

This document says about the output structure

like image 836
Java Programmer Avatar asked Oct 20 '25 14:10

Java Programmer


2 Answers

I just got done struggling through this in scala (java compatible). The key is to use the return type: com.amazonaws.services.lambda.runtime.events.KinesisAnalyticsInputPreprocessingResponse

import java.nio.ByteBuffer

import com.amazonaws.services.lambda.runtime.events.KinesisAnalyticsInputPreprocessingResponse._
import com.amazonaws.services.lambda.runtime.events.{KinesisAnalyticsInputPreprocessingResponse, KinesisFirehoseEvent}
import com.amazonaws.services.lambda.runtime.{Context, LambdaLogger, RequestHandler}

import scala.collection.JavaConversions._
import scala.language.implicitConversions

class Handler extends RequestHandler[KinesisFirehoseEvent, KinesisAnalyticsInputPreprocessingResponse] {

  override def handleRequest(in: KinesisFirehoseEvent, context: Context): KinesisAnalyticsInputPreprocessingResponse = {

    val logger: LambdaLogger = context.getLogger
    val records = in.getRecords
    val tranformed = records.flatMap(record => {

      try {
        val changed = record.getData.array()
        //do some sort of transform
        val rec = new Record(record.getRecordId, Result.Ok, ByteBuffer.wrap(changed))
        Some(rec)
      } catch {
        case e: Exception => {
          logger.log(e.toString)
          Some(new Record(record.getRecordId, Result.Dropped, record.getData))
        }
      }
    })

    val response = new KinesisAnalyticsInputPreprocessingResponse()
    response.setRecords(tranformed.toList)
    response
  }
}
like image 131
Chris Ellsworth Avatar answered Oct 23 '25 03:10

Chris Ellsworth


A java example:

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.KinesisAnalyticsInputPreprocessingResponse;
import com.amazonaws.services.lambda.runtime.events.KinesisFirehoseEvent;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;


@Log4j2
@RequiredArgsConstructor
public class FirehoseHandler implements RequestHandler<KinesisFirehoseEvent, KinesisAnalyticsInputPreprocessingResponse> {

    private final ObjectMapper mapper;

    @Override
    public KinesisAnalyticsInputPreprocessingResponse handleRequest(KinesisFirehoseEvent kinesisFirehoseEvent, Context context) {
        return Flux.fromIterable(kinesisFirehoseEvent.getRecords())
                .flatMap(this::transformRecord)
                .collectList()
                .map(KinesisAnalyticsInputPreprocessingResponse::new)
                .block();
    }

    private Mono<KinesisAnalyticsInputPreprocessingResponse.Record> transformRecord(KinesisFirehoseEvent.Record record) {
        return Mono.just(record.getData())
                .map(StandardCharsets.UTF_8::decode)
                .flatMap(data -> Mono.fromCallable(() -> doYourOwnThing(data)))
                .map(StandardCharsets.UTF_8::encode)
                .map(data -> new KinesisAnalyticsInputPreprocessingResponse.Record(record.getRecordId(), KinesisAnalyticsInputPreprocessingResponse.Result.Ok, data))
                .onErrorResume(e -> Mono.just(new KinesisAnalyticsInputPreprocessingResponse.Record(record.getRecordId(), KinesisAnalyticsInputPreprocessingResponse.Result.ProcessingFailed, record.getData())));
    }
}

like image 29
Johan Perez Avatar answered Oct 23 '25 03:10

Johan Perez



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!