I have a use case where I initialise a HashMap that contains a set of lookup data (information about the physical location etc. of IoT devices). This lookup data serves as reference data for a 2nd dataset which is a PCollection. This PCollection is a dataflow that provides the data that the IoT devices record. The dataflow from the IoT devices uses an Apache Beam pipeline that runs as a Google Dataflow utilising Google Cloud pub/sub.
When I process the PCollection (the device data), I link the Google Cloud pub/sub data to the related lookup entry in the HashMap.
I need to update the HashMap, based on a 2nd pub/sub that pushes changes to its data. Here's how I'm getting a PCollection so far and doing a lookup using the HashMap:
HashMap -> contains pre-loaded lookup data (information about the IoT devices)
PCollection -> contains data from a pipeline dataflow (the data recorded by the IoT devices)
I'm generating a HashMap for the IoT device lookup data as a singleton:
public class MyData {
private static final MyData instance = new MyData ();
private MyData () {
HashMap myDataMap = new HashMap<String, String>();
... logic to populate the map
this.referenceData = myDataMap;
}
public HashMap<Integer, DeviceReference> referenceData;
public static DeviceData getInstance(){
return instance;
}
}
I then use the HashMap in a different class where I'm subscribing to updates to the data (these are messages that e.g. give me new data that relates to the entities already stored in the HashMap). I'm subscribing to changes using a Google pub/sub with Apache beam:
HashMap<String, String> referenceData = MyData.getInstance().referenceData;
Pipeline pipeLine = Pipeline.create(options);
// subscribe to changes in data
org.apache.beam.sdk.values.PCollection myDataUpdates;
myDataUpdates = pipeLine.begin()
.apply(String.format("Subscribe to data updates"),
PubsubIO.readStrings().fromTopic(
String.format("myPubSubPath")));
What I want to do is to efficiently apply the data updates to the singleton HashMap (i.e. manipulate the HashMap based on my data subscription). How can I do this?
I have a limited understanding of Apache Beam and I only know how to do transforms on pipeline data to create another, separate PCollection
. I think that this is the point of Beam, that it is for transforming large data sets into a different form. Is there a way of achieving what I need (updating a dataset based on a pub/sub subscription) using Apache Beam, or is there another way I can update the HashMap using pub/sub? (I can't poll for the data as it creates too much latency and cost, I need to update the HashMap using a subscription).
The Google cloud docs show a way of directly subscribing to a Google Cloud pub/sub that isn't linked to an Apache Beam pipeline. This is promising as a potential solution, and relies on the following Maven dependency:
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsub</artifactId>
<version>1.53.0</version>
</dependency>
I'm getting a conflict though, which is a conflict with the following Maven dependencies for Apache Beam:
<dependency>
<groupId>com.google.cloud.dataflow</groupId>
<artifactId>google-cloud-dataflow-java-sdk-all</artifactId>
<version>2.5.0</version>
</dependency>
The issue is documented in a separate question here - Maven conflict in Java app with google-cloud-core-grpc dependency. From what I'm seeing it seems that it doesn't matter which version of the google-cloud-pubsub
Maven artifact I use, as from what I've figured out it looks like the v.2.5.0 beam dependency and below will always conflict with any current version of the google dependency.
(I've raised this as an issue in the Beam Jira - https://issues.apache.org/jira/browse/BEAM-6118)
I'm currently investigating side inputs and combine
as a way to achieve updating of the HashMap:
https://www.programcreek.com/java-api-examples/?api=org.apache.beam.sdk.transforms.Combine
Example 10 shows a way that .getSideInputsMap()
can be applied to a payload
. I'm wondering if I can apply this somehow to my subscription to the lookup data changes. If I get a PCollection
like this, I can't directly chain .getSideInputsMap()
to the PCollection
deviceReferenceDataUpdates = pipeLine.begin()
.apply("Get changes to the IoT device lookup data"),
PubsubIO.readMessages().fromTopic("IoT device lookup data")).
I've asked a separate question specifically about how I might be able to use .getSideInputsMap()
- Apache Beam - how can I apply .getSideInputsMap to a subscription to a Google pub/sub?
I found a way of doing this within the Apache Beam framework, as follows (not fully tested).
Note - take into account the comment on the OP from @Serg M Ten that a better approach may be to consolidate the data later, instead of trying to join the lookup data as part of the transformation processing.
See my answer here - Accessing a HashMap from a different class
main
)// initialise singleton HashMap containing lookup data on bootstrap:
LookupData lookupData = LookupData.getInstance();
org.apache.beam.sdk.values.PCollection lookupDataUpdateMessage;
lookupDataUpdateMessage = pipeLine.begin()
.apply("Extract lookup update data", PubsubIO.readStrings().fromTopic("myLookupUpdatePubSubTopic"))
.apply("Transform lookup update data",
ParDo.of(new TransformLookupData.TransformFn()));
org.apache.beam.sdk.values.PCollection lookupDataMessage;
import java.io.Serializable;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.transforms.DoFn;
import org.json.JSONObject;
import myLookupSingletonClass;
import myLookupUpObjectClass;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.client.util.Strings;
public class TransformDeviceMeta
public static class TransformFn extends DoFn<String, MyLookupData> {
@ProcessElement
public void processElement(ProcessContext c)
{
LookupData lookupData = LookupData.getInstance();
MyLookupData myLookupDataUpdate = new MyLookupData();
try
{
byte[] payload = c.element().getBytes();
String myLookUpDataJson = new JSONObject(new String(payload)).toString();
ObjectMapper mapper = new ObjectMapper();
myLookUpDataUpdate = mapper.readValue(myLookUpDataJson , MyLookupData.class);
String updatedLookupDataId = updatedLookupDataId.id;
// logic for HashMap updating e.g:
lookupData.myHashMap.remove(updatedDeviceId);
}
else {
lookupData.myHashMap.put(updatedDeviceId, deviceMetaUpdate);
}
}
catch (Exception ex) {
Log.error(ex.getMessage());
System.out.println("Error " + ex.getMessage());
}
}
}
}
MyLookupData
= Class that forms the model for the lookup data
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With