I have implemented a small IO class, which can read from multiple and same files on different disks (e.g two hard disks containing the same file). In sequential case, both disks read 60MB/s in average over the file, but when I do an interleaved (e.g. 4k disk 1, 4k disk 2 then combine), the effective read speed is reduced to 40MB/s instead of increasing?
Context: Win 7 + JDK 7b70, 2GB RAM, 2.2GB test file. Basically, I try to mimic Win7's ReadyBoost and RAID x in a poor man's fashion.
In the heart, when a read() is issued to the class, it creates two runnables with instructions to read a pre-opened RandomAccessFile from a certain position and length. Using an executor service and Future.get() calls, when both finish, the data read gets copied into a common buffer and returned to the caller.
Is there a conceptional error in my approach? (For example, the OS caching mechanism will always counteract?)
protected <T> List<T> waitForAll(List<Future<T>> futures)
throws MultiIOException {
MultiIOException mex = null;
int i = 0;
List<T> result = new ArrayList<T>(futures.size());
for (Future<T> f : futures) {
try {
result.add(f.get());
} catch (InterruptedException ex) {
if (mex == null) {
mex = new MultiIOException();
}
mex.exceptions.add(new ExceptionPair(metrics[i].file, ex));
} catch (ExecutionException ex) {
if (mex == null) {
mex = new MultiIOException();
}
mex.exceptions.add(new ExceptionPair(metrics[i].file, ex));
}
i++;
}
if (mex != null) {
throw mex;
}
return result;
}
public int read(long position, byte[] output, int start, int length)
throws IOException {
if (start < 0 || start + length > output.length) {
throw new IndexOutOfBoundsException(
String.format("start=%d, length=%d, output=%d",
start, length, output.length));
}
// compute the fragment sizes and positions
int result = 0;
final long[] positions = new long[metrics.length];
final int[] lengths = new int[metrics.length];
double speedSum = 0.0;
double maxValue = 0.0;
int maxIndex = 0;
for (int i = 0; i < metrics.length; i++) {
speedSum += metrics[i].readSpeed;
if (metrics[i].readSpeed > maxValue) {
maxValue = metrics[i].readSpeed;
maxIndex = i;
}
}
// adjust read lengths
int lengthSum = length;
for (int i = 0; i < metrics.length; i++) {
int len = (int)Math.ceil(length * metrics[i].readSpeed / speedSum);
lengths[i] = (len > lengthSum) ? lengthSum : len;
lengthSum -= lengths[i];
}
if (lengthSum > 0) {
lengths[maxIndex] += lengthSum;
}
// adjust read positions
long positionDelta = position;
for (int i = 0; i < metrics.length; i++) {
positions[i] = positionDelta;
positionDelta += (long)lengths[i];
}
List<Future<byte[]>> futures = new LinkedList<Future<byte[]>>();
// read in parallel
for (int i = 0; i < metrics.length; i++) {
final int j = i;
futures.add(exec.submit(new Callable<byte[]>() {
@Override
public byte[] call() throws Exception {
byte[] buffer = new byte[lengths[j]];
long t = System.nanoTime();
long t0 = t;
long currPos = metrics[j].handle.getFilePointer();
metrics[j].handle.seek(positions[j]);
t = System.nanoTime() - t;
metrics[j].seekTime = t * 1024.0 * 1024.0 /
Math.abs(currPos - positions[j]) / 1E9 ;
int c = metrics[j].handle.read(buffer);
t0 = System.nanoTime() - t0;
// adjust the read speed if we read something
if (c > 0) {
metrics[j].readSpeed = (alpha * c * 1E9 / t0 / 1024 / 1024
+ (1 - alpha) * metrics[j].readSpeed) ;
}
if (c < 0) {
return null;
} else
if (c == 0) {
return EMPTY_BYTE_ARRAY;
} else
if (c < buffer.length) {
return Arrays.copyOf(buffer, c);
}
return buffer;
}
}));
}
List<byte[]> data = waitForAll(futures);
boolean eof = true;
for (byte[] b : data) {
if (b != null && b.length > 0) {
System.arraycopy(b, 0, output, start + result, b.length);
result += b.length;
eof = false;
} else {
break; // the rest probably reached EOF
}
}
// if there was no data at all, we reached the end of file
if (eof) {
return -1;
}
sequentialPosition = position + (long)result;
// evaluate the fastest file to read
double maxSpeed = 0;
maxIndex = 0;
for (int i = 0; i < metrics.length; i++) {
if (metrics[i].readSpeed > maxSpeed) {
maxSpeed = metrics[i].readSpeed;
maxIndex = i;
}
}
fastest = metrics[maxIndex];
return result;
}
(FileMetrics in metrics array contain measurements of read speed to adaptively determine the buffer sizes of various input channels - in my test with alpha = 0 and readSpeed = 1 results equal distribution)
Edit I ran an non-entangled test (e.g read the two files independently in separate threads.) and I've got a combined effective speed of 110MB/s.
Edit2 I guess I know why is this happening.
When I read in parallel and in sequence, it is not a sequential read for the disks, but rather read-skip-read-skip pattern due the interleaving (and possibly riddled with allocation table lookups). This basically reduces the effective read speed per disk to half or worse.
As you said, a sequential read on a disk is much faster than a read-skip-read-skip pattern. Hard disks are capable of high bandwidth when reading sequentially, but the seek time (latency) is expensive.
Instead of storing a copy of the file in each disk, try storing block i of the file on disk i (mod 2). This way you can read from both disks sequentially and recombine the result in memory.
If you want to do a parallel read, break the read into two sequential reads. Find the halfway point and read the first half from the first file and the second half from the second file.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With