I am trying to write a custom sink for flume-ng. I looked at the existing sinks and documentation and coded it up. However, the 'process()' method that's supposed to receive the events always ends up with null. I am doing Event event = channel.take(); but the event is null. I see in the logs that this method is called repeatedly as the event is still in the channel.
Can someone point me in the right direction?
This is the skeleton of a process function ...If you fail getting an event you rollback, change the status to BACKOFF . If not you commit and set status to READY . No matter what , you always close the transaction.
Status status = null;
Channel channel = getChannel();
Transaction transaction = channel.getTransaction();
transaction.begin();
try {
Event event = channel.take();
if (event != null && validEvent(event.getBody()) >= 0) {
# make some printing
}
transaction.commit();
status = Status.READY;
} catch (Throwable ex) {
transaction.rollback();
status = Status.BACKOFF;
logger.error("Failed to deliver event. Exception follows.", ex);
throw new EventDeliveryException("Failed to deliver event: " + ex);
} finally {
transaction.close();
}
return status;
I am sure this gonna work :).
This is by design. The sink runner will poll the sink with null
events so it can be sure the sink is alive and ready to accept future events. When you receive a null
event, ensure you return Status.BACKOFF
, and sink processor will wait a bit before trying again.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With