Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Connect AWS SQS to Apache-Flink

Why is AWS SQS not a default connector for Apache Flink? Is there some technical limitation to doing this? Or was it just something that didn't get done? I want to implement this, any pointers would be appreciated

like image 809
Chavan Avatar asked Nov 08 '22 04:11

Chavan


2 Answers

Probably too late for an answer to the original question... I wrote a SQS consumer as a SourceFunction, using the Java Messaging Service library for SQS:

SQSConsumer extends RichParallelSourceFunction<String> {
   private volatile boolean isRunning;
   private transient AmazonSQS sqs;
   private transient SQSConnectionFactory connectionFactory;
   private transient ExecutorService consumerExecutor;

   @Override
   public void open(Configuration parameters) throws Exception {
      String region = ...
      AWSCredentialsProvider credsProvider = ...
      // may be use a blocking array backed thread pool to handle surges?
      consumerExecutor = Executors.newCachedThreadPool();
      ClientConfiguration clientConfig = PredefinedClientConfigurations.defaultConfig();
      this.sqs = AmazonSQSAsyncClientBuilder.standard().withRegion(region).withCredentials(credsProvider)
            .withClientConfiguration(clientConfig)
            .withExecutorFactory(()->consumerExecutor).build();
      this.connectionFactory = new SQSConnectionFactory(new ProviderConfiguration(), sqs);
      this.isRunning = true;
   }

   @Override
   public void run(SourceContext<String> ctx) throws Exception {
      SQSConnection connection = connectionFactory.createConnection();
      // ack each msg explicitly
      Session session = connection.createSession(false, SQSSession.UNORDERED_ACKNOWLEDGE);
      Queue queue = session.createQueue(<queueName>);
      MessageConsumer msgConsumer = session.createConsumer(queue);
      msgConsumer.setMessageListener(msg -> {
          try {
              String msgId = msg.getJMSMessageID();
              String evt = ((TextMessage) msg).getText();
              ctx.collect(evt);
              msg.acknowledge();
          } catch (JSMException e) {
              // log and move on the next msg or bail with an exception
              // have a dead letter queue is configured so this message is not lost
              // msg is not acknowledged so it may be picked up again by another consumer instance
          }
      };
      // check if we were canceled
      if (!isRunning) {
          return;
      }
      connection.start();
      while (!consumerExecutor.awaitTermination(1, TimeUnit.MINUTES)) {
          // keep waiting
      }
  }
            

  @Override
  public void cancel() {
      isRunning = false;
      // this method might be called before the task actually starts running
      if (sqs != null) {
          sqs.shutdown();
      }
      if(consumerExecutor != null) {
           consumerExecutor.shutdown();
           try {
               consumerExecutor.awaitTermination(1, TimeUnit.MINUTES); 
           } catch (Exception e) {
               //log e
           }
      }
   }

   @Override
   public void close() throws Exception {
       cancel();
       super.close();
   }
}

Note if you are using a standard SQS queue you may have to de-dup the messages depending on whether exactly-once guarantees are required.

Reference: Working with JMS and Amazon SQS

like image 139
partheinstein Avatar answered Nov 15 '22 07:11

partheinstein


At the moment, there is no connector for AWS SQS in Apache Flink. Have a look at the already existing connectors. I assume you already know about this, and would like to give some pointers. I was also looking for an SQS connector recently and found this mail thread.

Apache Kinesis Connector is somewhat similar to what you can implement on this. See whether you can get a start on this using this connector.

like image 26
Keet Sugathadasa Avatar answered Nov 15 '22 05:11

Keet Sugathadasa