Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Custom Sink for Flume-ng null event

Tags:

flume

I am trying to write a custom sink for flume-ng. I looked at the existing sinks and documentation and coded it up. However, the 'process()' method that's supposed to receive the events always ends up with null. I am doing Event event = channel.take(); but the event is null. I see in the logs that this method is called repeatedly as the event is still in the channel.

Can someone point me in the right direction?

like image 689
vicky Avatar asked Mar 10 '13 03:03

vicky


2 Answers

This is the skeleton of a process function ...If you fail getting an event you rollback, change the status to BACKOFF . If not you commit and set status to READY . No matter what , you always close the transaction.

    Status status = null;
    Channel channel = getChannel();
    Transaction transaction = channel.getTransaction();
    transaction.begin();
    try {
        Event event = channel.take();

        if (event != null && validEvent(event.getBody()) >= 0) {
           # make some printing
        }
        transaction.commit();
        status = Status.READY;
    } catch (Throwable ex) {
        transaction.rollback();
        status = Status.BACKOFF;
        logger.error("Failed to deliver event. Exception follows.", ex);
        throw new EventDeliveryException("Failed to deliver event: " + ex);
    } finally {
        transaction.close();
    }
    return status;

I am sure this gonna work :).

like image 86
Spyros Lalos Avatar answered Sep 18 '22 00:09

Spyros Lalos


This is by design. The sink runner will poll the sink with null events so it can be sure the sink is alive and ready to accept future events. When you receive a null event, ensure you return Status.BACKOFF, and sink processor will wait a bit before trying again.

like image 27
andrewrjones Avatar answered Sep 19 '22 00:09

andrewrjones