Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Update singleton HashMap using Google pub/sub

I have a use case where I initialise a HashMap that contains a set of lookup data (information about the physical location etc. of IoT devices). This lookup data serves as reference data for a 2nd dataset which is a PCollection. This PCollection is a dataflow that provides the data that the IoT devices record. The dataflow from the IoT devices uses an Apache Beam pipeline that runs as a Google Dataflow utilising Google Cloud pub/sub.

When I process the PCollection (the device data), I link the Google Cloud pub/sub data to the related lookup entry in the HashMap.

I need to update the HashMap, based on a 2nd pub/sub that pushes changes to its data. Here's how I'm getting a PCollection so far and doing a lookup using the HashMap:

HashMap -> contains pre-loaded lookup data (information about the IoT devices)

PCollection -> contains data from a pipeline dataflow (the data recorded by the IoT devices)

I'm generating a HashMap for the IoT device lookup data as a singleton:

public class MyData {

    private static final MyData instance = new MyData ();

    private MyData () {     
            HashMap myDataMap = new HashMap<String, String>();          
               ... logic to populate the map

            this.referenceData = myDataMap;

    }

    public HashMap<Integer, DeviceReference> referenceData;

    public static DeviceData getInstance(){
        return instance;
    }
}

I then use the HashMap in a different class where I'm subscribing to updates to the data (these are messages that e.g. give me new data that relates to the entities already stored in the HashMap). I'm subscribing to changes using a Google pub/sub with Apache beam:

HashMap<String, String> referenceData = MyData.getInstance().referenceData;

Pipeline pipeLine = Pipeline.create(options);           

// subscribe to changes in data

org.apache.beam.sdk.values.PCollection myDataUpdates;

myDataUpdates = pipeLine.begin()
    .apply(String.format("Subscribe to data updates"),
        PubsubIO.readStrings().fromTopic(
                String.format("myPubSubPath")));

What I want to do is to efficiently apply the data updates to the singleton HashMap (i.e. manipulate the HashMap based on my data subscription). How can I do this?

I have a limited understanding of Apache Beam and I only know how to do transforms on pipeline data to create another, separate PCollection. I think that this is the point of Beam, that it is for transforming large data sets into a different form. Is there a way of achieving what I need (updating a dataset based on a pub/sub subscription) using Apache Beam, or is there another way I can update the HashMap using pub/sub? (I can't poll for the data as it creates too much latency and cost, I need to update the HashMap using a subscription).


The Google cloud docs show a way of directly subscribing to a Google Cloud pub/sub that isn't linked to an Apache Beam pipeline. This is promising as a potential solution, and relies on the following Maven dependency:

<dependency>
  <groupId>com.google.cloud</groupId>
  <artifactId>google-cloud-pubsub</artifactId>
  <version>1.53.0</version>
</dependency>

I'm getting a conflict though, which is a conflict with the following Maven dependencies for Apache Beam:

<dependency>
  <groupId>com.google.cloud.dataflow</groupId>
  <artifactId>google-cloud-dataflow-java-sdk-all</artifactId>
  <version>2.5.0</version>
</dependency>

The issue is documented in a separate question here - Maven conflict in Java app with google-cloud-core-grpc dependency. From what I'm seeing it seems that it doesn't matter which version of the google-cloud-pubsub Maven artifact I use, as from what I've figured out it looks like the v.2.5.0 beam dependency and below will always conflict with any current version of the google dependency.

(I've raised this as an issue in the Beam Jira - https://issues.apache.org/jira/browse/BEAM-6118)


I'm currently investigating side inputs and combine as a way to achieve updating of the HashMap:

https://www.programcreek.com/java-api-examples/?api=org.apache.beam.sdk.transforms.Combine

Example 10 shows a way that .getSideInputsMap() can be applied to a payload. I'm wondering if I can apply this somehow to my subscription to the lookup data changes. If I get a PCollection like this, I can't directly chain .getSideInputsMap() to the PCollection

deviceReferenceDataUpdates = pipeLine.begin()
    .apply("Get changes to the IoT device lookup data"),
         PubsubIO.readMessages().fromTopic("IoT device lookup data")).

I've asked a separate question specifically about how I might be able to use .getSideInputsMap() - Apache Beam - how can I apply .getSideInputsMap to a subscription to a Google pub/sub?

like image 280
Chris Halcrow Avatar asked Nov 21 '18 23:11

Chris Halcrow


1 Answers

I found a way of doing this within the Apache Beam framework, as follows (not fully tested).

Note - take into account the comment on the OP from @Serg M Ten that a better approach may be to consolidate the data later, instead of trying to join the lookup data as part of the transformation processing.


Singleton HashMap

See my answer here - Accessing a HashMap from a different class


Pipeline (on single thread, implemented in main)

// initialise singleton HashMap containing lookup data on bootstrap:
LookupData lookupData = LookupData.getInstance();

org.apache.beam.sdk.values.PCollection lookupDataUpdateMessage;

lookupDataUpdateMessage = pipeLine.begin()
                              .apply("Extract lookup update data", PubsubIO.readStrings().fromTopic("myLookupUpdatePubSubTopic"))
                              .apply("Transform lookup update data",
                                 ParDo.of(new TransformLookupData.TransformFn()));

                     org.apache.beam.sdk.values.PCollection lookupDataMessage;

Transform

import java.io.Serializable;

import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.transforms.DoFn;
import org.json.JSONObject;

import myLookupSingletonClass;
import myLookupUpObjectClass;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.client.util.Strings;


public class TransformDeviceMeta

    public static class TransformFn extends DoFn<String, MyLookupData> {

        @ProcessElement
        public void processElement(ProcessContext c) 
        {   
            LookupData lookupData = LookupData.getInstance();

            MyLookupData myLookupDataUpdate = new MyLookupData();

            try 
            {           
                byte[] payload = c.element().getBytes();
                String myLookUpDataJson = new JSONObject(new String(payload)).toString();

                ObjectMapper mapper = new ObjectMapper();
                myLookUpDataUpdate = mapper.readValue(myLookUpDataJson , MyLookupData.class);

                String updatedLookupDataId = updatedLookupDataId.id;

                // logic for HashMap updating e.g:

                    lookupData.myHashMap.remove(updatedDeviceId);
                }
                else {
                    lookupData.myHashMap.put(updatedDeviceId, deviceMetaUpdate);    
                }
            } 
            catch (Exception ex) {
                Log.error(ex.getMessage());
                System.out.println("Error " + ex.getMessage());
            }
        }       
    }   
}

MyLookupData = Class that forms the model for the lookup data

like image 130
Chris Halcrow Avatar answered Oct 03 '22 17:10

Chris Halcrow