Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Weird Hazelcat IMap#put() behaviour

My Hazelcast-based program can work in two modes: submitter and worker.

Submitter puts some POJO to the distributed map by some key, e.g.: hazelcastInstance.getMap(MAP_NAME).put(key, value);

Worker has an infinite loop (with Thread.sleep(1000L); inside for timeout) which must process entities from map. For now I'm just printing the map size in this loop.

Now here's the problem. I start worker app. Then I start four submitters simultaneously (each adds an entry to the map and terminates it's work). But after all submitter apps are done, the worker app prints arbitrary size: sometimes it detects that only one entry was added, sometimes two, sometimes three (actually it never has seen all four entries).

What is the problem with this simple flow? I've read in Hazelcast docs that put() method is synchronous, so it guarantees that after it returns, entry is placed to distributed map and is replicated. But it doesn't seem so in my experiment.

UPD (code)

Submitter:

public void submit(String key) {
    Object mySerializableObject = ...
    IMap<String, Object> map = hazelcastInstance.getMap(MAP_NAME);
    map.putIfAbsent(key, mySerializableObject, TASK_TTL_IN_HOURS, TimeUnit.HOURS);
}

Worker:

public void process() {
    while (true) {
        IMap<String, Object> map = hazelcastInstance.getMap(MAP_NAME);
        System.out.println(map.size());

        // Optional<Map.Entry<String, Object>> objectToProcess = getObjectToProcess();
        // objectToProcess.ifPresent(objectToProcess-> processObject(id, objectToProcess));
        try {
            Thread.sleep(PAUSE);
        } catch (InterruptedException e) {
            LOGGER.error(e.getMessage(), e);
        }
    }
}

I commented out "processing" part itself, because now I'm just trying to get consistent state of the map. The code above prints different results each time, e.g.: "4, 3, 1, 1, 1, 1, 1..." (so it can even see 4 submitted tasks for a moment, but then they... disappear).

UPD (log)

Worker:

...
tasksMap.size() = 0
tasksMap.size() = 0
tasksMap.size() = 0
tasksMap.size() = 0
tasksMap.size() = 1
tasksMap.size() = 2
tasksMap.size() = 2
tasksMap.size() = 2
tasksMap.size() = 2
tasksMap.size() = 2
...

Submitter 1:

Before: tasksMap.size() = 0
After: tasksMap.size() = 1

Submitter 2:

Before: tasksMap.size() = 1
After: tasksMap.size() = 4

Submitter 3:

Before: tasksMap.size() = 1
After: tasksMap.size() = 2

Submitter 4:

Before: tasksMap.size() = 3
After: tasksMap.size() = 4
like image 904
Dmytro Titov Avatar asked Apr 24 '16 20:04

Dmytro Titov


People also ask

What is IMap in Hazelcast?

The IMap instances are divided into multiple partitions. By default, the map is divided into 271 partitions. And these partitions are distributed across Hazelcast members available. Each entry in which is added to the map is stored in a single partition.

What is TTL in Hazelcast?

Time To Live (TTL) is the maximum time in seconds for each entry to stay in the map. It limits the lifetime of the entries relative to the time of the last write access performed on them.

What is MapConfig Hazelcast?

public class MapConfig extends Object implements IdentifiedDataSerializable, NamedConfig, com.hazelcast.nio.serialization.impl.Versioned. Contains the configuration for an IMap .

What is MapStore in Hazelcast?

MapStore is an API for building a cache on top of Hazelcast. The MapStore interface includes methods that are triggered when operations are invoked on a map. You can implement your own logic in these methods to connect to an external data store, load data from it, and write data back to it.


1 Answers

Well, I guess, I've figured out the problem. As far as I understand, distributed IMap returned by hazelcastInstance.getMap doesn't guarantee that data is replicated over all existing nodes in the cluster: some portions of data may be replicated to some nodes, another portion - to another nodes. That's why in my example some of submitted tasks were replicated not to worker node (which works perpetually), but to some other submitters, which terminate their execution after submission. So such entries were lost on submitters exit.

I solved this issue by replacing hazelcastInstance.getMap to hazelcastInstance.getReplicatedMap. This method returns ReplicatedMap, which, AFAIK, guarantees that entries placed into it will be replicated to all nodes of the cluster. So now everything works fine in my system.

like image 78
Dmytro Titov Avatar answered Sep 25 '22 04:09

Dmytro Titov