Is it possible to achieve distributed reads from HDSF cluster using an HDFS client on one machine?
I have carried out an experiment with a cluster consisting of 3 data nodes (DN1,DN2,DN3). Then I run 10 simultaneous reads from 10 independent files from a client program located on DN1, and it appeared to be only reading data from DN1. Other data nodes (DN2,DN3) were showing zero activity (judging from debug logs).
I have checked that all files' blocks are replicated across all 3 datanodes, so if I shut down DN1 then data is read from DN2 (DN2 only).
Increasing the amount of data read did not help (tried from 2GB to 30GB).
Since I have a need to read multiple large files and extract only a small amount of data from it (few Kb), I would like to avoid using map/reduce since it requires settings up more services and also requires writing the output of each split task back to HDFS. Rather it would be nice to have the result streamed directly back to my client program from the data nodes.
I am using SequenceFile
for reading/writing data, in this fashion (jdk7):
//Run in thread pool on multiple files simultaneously
List<String> result = new ArrayList<>();
LongWritable key = new LongWritable();
Text value = new Text();
try(SequenceFile.Reader reader = new SequenceFile.Reader(conf,
SequenceFile.Reader.file(filePath)){
reader.next(key);
if(key.get() == ID_I_AM_LOOKING_FOR){
reader.getCurrentValue(value);
result.add(value.toString());
}
}
return result; //results from multiple workers are merged later
Any help appreciated. Thanks!
I'm afraid the behavior you see is by-design. From Hadoop document:
Replica Selection
To minimize global bandwidth consumption and read latency, HDFS tries to satisfy a read request from a replica that is closest to the reader. If there exists a replica on the same rack as the reader node, then that replica is preferred to satisfy the read request. If angg/ HDFS cluster spans multiple data centers, then a replica that is resident in the local data center is preferred over any remote replica.
It can be further confirmed by corresponding Hadoop source code:
LocatedBlocks getBlockLocations(...) {
LocatedBlocks blocks = getBlockLocations(src, offset, length, true, true);
if (blocks != null) {
//sort the blocks
DatanodeDescriptor client = host2DataNodeMap.getDatanodeByHost(
clientMachine);
for (LocatedBlock b : blocks.getLocatedBlocks()) {
clusterMap.pseudoSortByDistance(client, b.getLocations());
// Move decommissioned datanodes to the bottom
Arrays.sort(b.getLocations(), DFSUtil.DECOM_COMPARATOR);
}
}
return blocks;
}
I.e., all available replicas are tried one after another if former one fails but the nearest one is always the first.
On the other hand, if you access HDFS files through HDFS Proxy, it does pick datanodes randomly. But I don't think that's what you want.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With