Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Lagom service consuming input from Kafka

I am trying to figure out how Lagom can be used to consume data from external systems communicating over Kafka.

I've ran into this section of Lagom documentation, which describes how Lagom service can communicate with another Lagom service by subscribing to its topic.

helloService
  .greetingsTopic()
  .subscribe // <-- you get back a Subscriber instance
  .atLeastOnce(
  Flow.fromFunction(doSomethingWithTheMessage)
)

However, what is the appropriate configuration when you want to subscribe to a Kafka topic that contains events produced by some random, external system?

Is some sort of adapter needed for this functionality? To clarify, I have this at the moment:

object Aggregator {
  val TOPIC_NAME = "my-aggregation"
}

trait Aggregator extends Service {
  def aggregate(correlationId: String): ServiceCall[Data, Done]

  def aggregationTopic(): Topic[DataRecorded]

  override final def descriptor: Descriptor = {
    import Service._

    named("aggregator")
      .withCalls(
        pathCall("/api/aggregate/:correlationId", aggregate _)
      )
      .withTopics(
        topic(Aggregator.TOPIC_NAME, aggregationTopic())
          .addProperty(
            KafkaProperties.partitionKeyStrategy,
            PartitionKeyStrategy[DataRecorded](_.sessionData.correlationId)
          )
      )
      .withAutoAcl(true)
  }
}

And I can invoke it via simple POST request. However, I would like for it to be invoked by consuming Data messages from some (external) Kafka topic.

I was wondering if there is such a way to configure the descriptor in a fashion similar to this mockup:

override final def descriptor: Descriptor = {
  ...
  kafkaTopic("my-input-topic")
    .subscribe(serviceCall(aggregate _)
    .withAtMostOnceDelivery
}

I've ran into this discussion on Google Groups, but in the OPs questions, I do not see he is actually doing anything with EventMessages coming from some-topic except routing them to the topic defined by his service.

EDIT #1: Progress update

Looking at the documentation, I decided to try the following approach. I added 2 more modules, aggregator-kafka-proxy-api and aggregator-kafka-proxy-impl.

In new api module, I defined a new service, with no methods, but one topic which would represent my Kafka topic:

object DataKafkaPublisher {
  val TOPIC_NAME = "data-in"
}

trait DataKafkaPublisher extends Service {
  def dataInTopic: Topic[DataPublished]

  override final def descriptor: Descriptor = {
    import Service._
    import DataKafkaPublisher._

    named("data-kafka-in")
      .withTopics(
        topic(TOPIC_NAME, dataInTopic)
          .addProperty(
            KafkaProperties.partitionKeyStrategy,
            PartitionKeyStrategy[SessionDataPublished](_.data.correlationId)
          )
      )
      .withAutoAcl(true)
  }
}

In the impl module, I simply did the standard implementation

class DataKafkaPublisherImpl(persistentEntityRegistry: PersistentEntityRegistry) extends DataKafkaPublisher {
  override def dataInTopic: Topic[api.DataPublished] =
    TopicProducer.singleStreamWithOffset {
      fromOffset =>
        persistentEntityRegistry.eventStream(KafkaDataEvent.Tag, fromOffset)
          .map(ev => (convertEvent(ev), ev.offset))
    }

  private def convertEvent(evt: EventStreamElement[KafkaDataEvent]): api.DataPublished = {
    evt.event match {
      case DataPublished(data) => api.DataPublished(data)
    }
  }
}

Now, to actually consume these events, in my aggregator-impl module, I added a "subscriber" service, which takes these events, and invokes appropriate commands on entity.

class DataKafkaSubscriber(persistentEntityRegistry: PersistentEntityRegistry, kafkaPublisher: DataKafkaPublisher) {

  kafkaPublisher.dataInTopic.subscribe.atLeastOnce(
    Flow[DataPublished].mapAsync(1) { sd =>
      sessionRef(sd.data.correlationId).ask(RecordData(sd.data))
    }
  )

  private def sessionRef(correlationId: String) =
    persistentEntityRegistry.refFor[Entity](correlationId)
}

This effectively allowed me to publish a message on Kafka topic "data-in", which was then proxied and converted to RecordData command before issued to the entity to consume.

However, it seems somewhat hacky to me. I am coupled to Kafka by Lagom internals. I cannot swap the source of my data easily. For example, how would I consume external messages from RabbitMQ if I wanted to? What if I'm trying to consume from another Kafka (different one than used by Lagom)?

Edit #2: More docs

I've found a few articles on Lagom docs, notably, this:

Consuming Topics from 3rd parties

You may want your Lagom service to consume data produced on services not implemented in Lagom. In that case, as described in the Service Clients section, you can create a third-party-service-api module in your Lagom project. That module will contain a Service Descriptor declaring the topic you will consume from. Once you have your ThirdPartyService interface and related classes implemented, you should add third-party-service-api as a dependency on your fancy-service-impl. Finally, you can consume from the topic described in ThirdPartyService as documented in the Subscribe to a topic section.

like image 765
ioreskovic Avatar asked Feb 04 '19 10:02

ioreskovic


People also ask

How do consumers consume messages in Kafka?

Consumers work in groups to read data from Kafka topics. Consumers within a group are responsible for reading a portion of the messages (partitions) so that messages can be consumed in parallel. Consumers "pull" messages from Kafka. Kafka just sits there while consumer applications simultaneously read the same data.

Which subscribes or consumes messages from Kafka topic?

You can use a KafkaConsumer node in a message flow to subscribe to a specified topic on a Kafka server. The KafkaConsumer node then receives messages that are published on the Kafka topic, as input to the message flow.

Does Kafka push to consumer?

Pushing is just extra work for the broker. With Kafka, the responsibility of fetching messages is on consumers. Consumers can decide at what rate they want to process the messages.


1 Answers

I don't use lagom so this is maybe just an idea. But as akka-streams is part of lagom (at least I assume that) - to get from this solution to what you need should be easy.

I used akka-stream-kafka and this went really nice (I only did a Prototype)

As you consume messages you would do something:

     Consumer
      .committableSource(
          consumerSettings(..), // config of Kafka
          Subscriptions.topics("kafkaWsPathMsgTopic")) // Topic to subscribe
      .mapAsync(10) { msg =>
        business(msg.record) // do something
      }

Check the well written documentation

My whole example you find here: PathMsgConsumer

like image 96
pme Avatar answered Oct 18 '22 11:10

pme