How beneficial will it be to use Python/PHP
Nonpersistent array for storing 6GB+ data with 800+ million rows in RAM, rather than using MySQL/MongoDB/Cassandra/BigTable/BigData(Persistence Database) database when it comes to speed/latency in simple query execution?
For example, finding one name in 800+ million rows within 1 second: is it possible? Does anyone have experience of dealing with a dataset of more than 1-2 billion rows and getting the result within 1 second for a simple search query?
Is there a better, proven methodology to deal with billions of rows?
It should be very big different, around 4-5 orders of magnitude faster. The database stores records in 4KB blocks (usually), and has to bring each such block into memory it needs some milliseconds. Divide the size of your table with 4KB and get the picture. In contrast, corrresponding times for in-memory data are usually nanoseconds. There is no question that memory is faster, the real question is if you have enough memory and how long you can keep your data there.
However, the above holds for a select * from table
query. If you want a select * from table where name=something
, you can create an index on the name, so that the database does not have to scan the whole file, and the results should be much, much better, probably very satisfying for practical use.
4 bytes (int) * 1_000_000_000 ~ 4 Gb 4 bytes (int) * 1_000_000_000 / 64 bytes = 62500000 times (for L1 cache) 4 bytes (int) * 1_000_000_000 / 64 bytes = 62500000 times (for L2 cache)
Taken latency, which all should know for main memory 100 ns from here we get 100 s. If all inside L1 cache (64 Bytes line for intel) it is near about 31.25 ms. But before that there is also L2/L3 caches (same line size) is would be 218,75 ms. You can see that to read 1 Mb sequentially (in other words it's the best case), so for 4 Gb it is 4024 * 250 µs = 1006000 µs ~= 1 s. SSD disk has less latency, but it is not so simple. There is research (maybe expired now) showed that most of SSD disks, which available for all to buy, can not hold really very much high load rates (reasons - they fails and more interesting - they have own garbage collector, which can add big latency). But also there are solutions adaptive to SSD disks environment like Aerospike and generally, of course, SSDs are faster then HDD.
Just to understand. On typical laptop (my: intel core i5, x64, 16Gb RAM) I need near from 580 ms to 875 ms to calculate long sum for 1 billion int elements. I'm also can see Clickhouse speed from 300 Mb/s to 354.66 Mb/s to calculate sum on Int32 column on my SSD. (note that sum in both cases does not make sense, because of type overflow)
Of course, we also have CUDA as variant or even simple threading (suppose multiple threads will calculate sum, we can easily acheave).
So... What can we do?
There are two types of scaling: vertical and horizontal. Most of databases prefer horizontal scaling, I suppose the reason is simple. Horizontal scaling is simpler then vertical. For vertical scaling you need people (or you should have by your own) very good expertise in different areas. For example, from my life, I should know a lot of about Java/HotSpot/OS architectures/Many-many technologies/frameworks/DBs to write or understand benefits of different decisions when creating high performance applications/algorithms. And this is only beginning, there are much harder experts then me.
Other databases use vertical scaling, more accurately they uses special optimizations for particular scenarios/queries.
All of decisions are compromiss between different operations. For example, for Top N problem Vertica and Druid have specific implementations, which solve exactly this task. In Cassandra to make all selects fast you should create multiple tables or multiple views for one table with different representation efficient for particular query, of course, spending more storage place, because of data dublication.
One of the biggest real problem that even you can read 1 billion rows in one second - you can not write at the same time in the same table likely. In other words the main problem - it is hard to satisfy all user's requests for all user's tasks at the same time.
Is there a better, proven methodology to deal with billions of rows?
Some examples:
In some cases using own solution could be more expensive (and effective), then custom. In some cases it is not...
Comparison of strings is relatively complex, so I suppose you need start from calculating how much time you need to compare two strings. This simple example shows how much time we need to compare two strings in Java.
import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.options.OptionsBuilder;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
@State(Scope.Benchmark)
@BenchmarkMode(Mode.SampleTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
@Threads(1)
public class StringEquals {
@Param({"0", "5", "10"})
int prefix;
String theSamePart, theSamePartQuery;
@Setup(Level.Invocation)
public void setData() {
String value = String.valueOf(ThreadLocalRandom.current().nextInt());
theSamePart = prefix > 0 ? value.substring(Math.min(prefix, value.length())) : value;
value = String.valueOf(ThreadLocalRandom.current().nextInt());
theSamePartQuery = prefix > 0 ? theSamePart + value.substring(Math.min(prefix, value.length())) : value;
}
@Benchmark
public boolean equals(StringEquals stringEquals) {
return stringEquals.theSamePart.equals(stringEquals.theSamePartQuery);
}
public static void main(String[] args) throws Exception {
new Runner(new OptionsBuilder()
.include(StringEquals.class.getSimpleName())
.measurementIterations(10)
.warmupIterations(10)
.build()).run();
}
}
Results:
Benchmark (prefix) Mode Cnt Score Error Units
StringEquals.equals 0 sample 3482270 0,047 ± 0,011 us/op
StringEquals.equals:equals·p0.00 0 sample 0,022 us/op
StringEquals.equals:equals·p0.50 0 sample 0,035 us/op
StringEquals.equals:equals·p0.90 0 sample 0,049 us/op
StringEquals.equals:equals·p0.95 0 sample 0,058 us/op
StringEquals.equals:equals·p0.99 0 sample 0,076 us/op
StringEquals.equals:equals·p0.999 0 sample 0,198 us/op
StringEquals.equals:equals·p0.9999 0 sample 8,636 us/op
StringEquals.equals:equals·p1.00 0 sample 9519,104 us/op
StringEquals.equals 5 sample 2686616 0,037 ± 0,003 us/op
StringEquals.equals:equals·p0.00 5 sample 0,021 us/op
StringEquals.equals:equals·p0.50 5 sample 0,028 us/op
StringEquals.equals:equals·p0.90 5 sample 0,044 us/op
StringEquals.equals:equals·p0.95 5 sample 0,048 us/op
StringEquals.equals:equals·p0.99 5 sample 0,060 us/op
StringEquals.equals:equals·p0.999 5 sample 0,238 us/op
StringEquals.equals:equals·p0.9999 5 sample 8,677 us/op
StringEquals.equals:equals·p1.00 5 sample 1935,360 us/op
StringEquals.equals 10 sample 2989681 0,039 ± 0,001 us/op
StringEquals.equals:equals·p0.00 10 sample 0,021 us/op
StringEquals.equals:equals·p0.50 10 sample 0,030 us/op
StringEquals.equals:equals·p0.90 10 sample 0,049 us/op
StringEquals.equals:equals·p0.95 10 sample 0,056 us/op
StringEquals.equals:equals·p0.99 10 sample 0,074 us/op
StringEquals.equals:equals·p0.999 10 sample 0,222 us/op
StringEquals.equals:equals·p0.9999 10 sample 8,576 us/op
StringEquals.equals:equals·p1.00 10 sample 325,632 us/op
So assume that you need 1_000_000_000 strings, you need approximately 8_000_000_000 us = 8000 s for processing 1 billion strings in 99.99% cases.
In contrast we could try to do it in parallel way:
import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.options.OptionsBuilder;
import java.util.concurrent.*;
@State(Scope.Benchmark)
@BenchmarkMode(Mode.SampleTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@Threads(1)
public class SearchBillionForkJoin {
static final int availableProcessors = 4; // Runtime.getRuntime().availableProcessors()
static final int size = 10_000_000, bucketSize = size / availableProcessors;
static final int handlersCount = availableProcessors;
@Param({"50", "100"})
int spinner;
String[] a;
Callable<Integer>[] callables;
ForkJoinTask<Integer>[] tasks;
QueryHolder queryHolder;
@Setup(Level.Trial)
public void setup() {
callables = new Callable[handlersCount];
queryHolder = new QueryHolder();
a = new String[size];
for (int i = 0; i < callables.length; ++i) {
switch (i) {
case 0:
callables[i] = createForBucket(queryHolder, a, 0, bucketSize);
break;
case 1:
callables[i] = createForBucket(queryHolder, a, bucketSize, bucketSize * 2);
break;
case 2:
callables[i] = createForBucket(queryHolder, a, bucketSize * 2, bucketSize * 3);
break;
case 3:
callables[i] = createForBucket(queryHolder, a, bucketSize * 3, size);;
break;
}
}
tasks = new ForkJoinTask[handlersCount];
}
@Setup(Level.Invocation)
public void setData() {
for (int i = 0; i < a.length; ++i) {
a[i] = String.valueOf(ThreadLocalRandom.current().nextInt());
}
queryHolder.query = String.valueOf(ThreadLocalRandom.current().nextInt());
}
@Benchmark
public Integer forkJoinPoolWithoutCopy() {
try {
for (int i = 0; i < tasks.length; ++i) {
tasks[i] = ForkJoinPool.commonPool().submit(callables[i]);
}
Integer position = -1;
boolean findMore = true;
head:
while(position == -1 && findMore) {
findMore = false;
for (int i = 0; i < tasks.length; ++i) {
if (tasks[i].isDone() && !tasks[i].isCancelled()) {
final Integer value = tasks[i].get();
if (value > -1) {
position = value;
for (int j = 0; j < tasks.length; ++j) {
if (j != i && !tasks[j].isDone()) {
tasks[j].cancel(true);
}
}
break head;
}
} else {
findMore = true;
}
}
int counter = spinner;
while (counter > 0) --counter;
}
return position;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public static void main(String[] args) throws Exception {
new Runner(new OptionsBuilder()
.include(SearchBillionForkJoin.class.getSimpleName())
.jvmArgs("-Xmx10G")
.measurementIterations(10)
.warmupIterations(10)
.build()).run();
}
static boolean isDone(ForkJoinTask[] tasks) {
for (int i = 0; i < tasks.length; ++i) {
if (!tasks[i].isDone()) {
return false;
}
}
return true;
}
static Callable<Integer> createForBucket(QueryHolder queryHolder, String[] a, int start, int end) {
return new Callable<Integer>() {
@Override
public Integer call() throws Exception {
for (int j = start; j < end; ++j) {
if (queryHolder.query.equals(a[j])) {
return j;
}
}
return -1;
}
};
}
static class QueryHolder {
String query = null;
}
}
I use 10_000_000 and 4 threads (for 4 cpu cores), because I don't have enough memory for it. Results look still not appropriate.
Benchmark (spinner) Mode Cnt Score Error Units
SearchBillionForkJoin.forkJoinPoolWithoutCopy 50 sample 166 47,136 ± 1,989 ms/op
SearchBillionForkJoin.forkJoinPoolWithoutCopy:forkJoinPoolWithoutCopy·p0.00 50 sample 5,521 ms/op
SearchBillionForkJoin.forkJoinPoolWithoutCopy:forkJoinPoolWithoutCopy·p0.50 50 sample 47,055 ms/op
SearchBillionForkJoin.forkJoinPoolWithoutCopy:forkJoinPoolWithoutCopy·p0.90 50 sample 54,788 ms/op
SearchBillionForkJoin.forkJoinPoolWithoutCopy:forkJoinPoolWithoutCopy·p0.95 50 sample 56,653 ms/op
SearchBillionForkJoin.forkJoinPoolWithoutCopy:forkJoinPoolWithoutCopy·p0.99 50 sample 61,352 ms/op
SearchBillionForkJoin.forkJoinPoolWithoutCopy:forkJoinPoolWithoutCopy·p0.999 50 sample 63,635 ms/op
SearchBillionForkJoin.forkJoinPoolWithoutCopy:forkJoinPoolWithoutCopy·p0.9999 50 sample 63,635 ms/op
SearchBillionForkJoin.forkJoinPoolWithoutCopy:forkJoinPoolWithoutCopy·p1.00 50 sample 63,635 ms/op
SearchBillionForkJoin.forkJoinPoolWithoutCopy 100 sample 162 51,288 ± 4,031 ms/op
SearchBillionForkJoin.forkJoinPoolWithoutCopy:forkJoinPoolWithoutCopy·p0.00 100 sample 5,448 ms/op
SearchBillionForkJoin.forkJoinPoolWithoutCopy:forkJoinPoolWithoutCopy·p0.50 100 sample 49,840 ms/op
SearchBillionForkJoin.forkJoinPoolWithoutCopy:forkJoinPoolWithoutCopy·p0.90 100 sample 67,030 ms/op
SearchBillionForkJoin.forkJoinPoolWithoutCopy:forkJoinPoolWithoutCopy·p0.95 100 sample 90,505 ms/op
SearchBillionForkJoin.forkJoinPoolWithoutCopy:forkJoinPoolWithoutCopy·p0.99 100 sample 110,920 ms/op
SearchBillionForkJoin.forkJoinPoolWithoutCopy:forkJoinPoolWithoutCopy·p0.999 100 sample 121,242 ms/op
SearchBillionForkJoin.forkJoinPoolWithoutCopy:forkJoinPoolWithoutCopy·p0.9999 100 sample 121,242 ms/op
SearchBillionForkJoin.forkJoinPoolWithoutCopy:forkJoinPoolWithoutCopy·p1.00 100 sample 121,242 ms/op
In another words 63,635 ms * 100 = 6363,5 ms = 6 s. This results could be improved, for example, if you could use affinity locks (one full cpu per thread). But could be too complex.
Let's try to use segments just to show idea:
import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.options.OptionsBuilder;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
@State(Scope.Benchmark)
@BenchmarkMode(Mode.SampleTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@Threads(1)
public class SearchInMapBillionForkJoin {
static final int availableProcessors = 8; // Runtime.getRuntime().availableProcessors()
static final int size = 10_000_000, bucketSize = size / availableProcessors;
static final int handlersCount = availableProcessors;
Map<Integer, List<StringWithIndex>> strings;
QueryHolder queryHolder;
ForkJoinTask<Integer>[] tasks;
Callable<Integer>[] callables;
@Param({"50", "100"})
int spinner;
@Setup(Level.Trial)
public void setup() throws Exception {
queryHolder = new QueryHolder();
strings = new ConcurrentHashMap<>();
tasks = new ForkJoinTask[handlersCount];
callables = new Callable[handlersCount];
setData();
}
public void setData() throws Exception {
final int callableBucket = size / handlersCount;
for (int i = 0; i < handlersCount; ++i) {
callables[i] = createGenerateForBucket(strings, callableBucket);
tasks[i] = ForkJoinPool.commonPool().submit(callables[i]);
}
while(!isDone(tasks)) {
int counter = spinner;
while (counter > 0) --counter;
}
Map<Integer, Integer> distribution = new HashMap<>();
for (List<StringWithIndex> stringWithIndices : strings.values()) {
distribution.compute(stringWithIndices.size(), (key, value) -> value == null ? 1 : value + 1);
}
int maxListSize = 0;
for (int i = 0; i < handlersCount; ++i) {
Integer max = tasks[i].get();
if (max > maxListSize) {
maxListSize = max;
}
}
System.out.println("maxListSize = " + maxListSize);
System.out.println("list size distribution = " + distribution);
System.out.println("map size = " + strings.size());
distribution = null;
queryHolder.query = String.valueOf(ThreadLocalRandom.current().nextInt());
}
@Benchmark
public Integer findInSegment() {
final String query = this.queryHolder.query;
final Integer hashCode = query.hashCode();
final Map<Integer, List<StringWithIndex>> strings = this.strings;
if (strings.containsKey(hashCode)) {
List<StringWithIndex> values = strings.get(hashCode);
if (!values.isEmpty()) {
final int valuesSize = values.size();
if (valuesSize > 100_000) {
final int bucketSize = valuesSize / handlersCount;
callables[0] = createSearchForBucket(query, values, 0, bucketSize);
callables[1] = createSearchForBucket(query, values, bucketSize, bucketSize * 2);
callables[2] = createSearchForBucket(query, values, bucketSize * 2, bucketSize * 3);
callables[3] = createSearchForBucket(query, values, bucketSize * 3, values.size());
try {
for (int i = 0; i < callables.length; ++i) {
tasks[i] = ForkJoinPool.commonPool().submit(callables[i]);
}
Integer position = -1;
boolean findMore = true;
head:
while (position == -1 && findMore) {
findMore = false;
for (int i = 0; i < tasks.length; ++i) {
if (tasks[i].isDone() && !tasks[i].isCancelled()) {
final Integer value = tasks[i].get();
if (value > -1) {
position = value;
for (int j = 0; j < tasks.length; ++j) {
if (j != i && !tasks[j].isDone()) {
tasks[j].cancel(true);
}
}
break head;
}
} else {
findMore = true;
}
}
int counter = spinner;
while (counter > 0) --counter;
}
return position;
} catch (Exception e) {
throw new RuntimeException(e);
}
} else {
for (StringWithIndex stringWithIndex : values) {
if (query.equals(stringWithIndex.value)) {
return stringWithIndex.index;
}
}
}
}
}
return -1;
}
public static void main(String[] args) throws Exception {
new Runner(new OptionsBuilder()
.include(SearchInMapBillionForkJoin.class.getSimpleName())
.jvmArgs("-Xmx6G")
.measurementIterations(10)
.warmupIterations(10)
.build()).run();
}
static class StringWithIndex implements Comparable<StringWithIndex> {
final int index;
final String value;
public StringWithIndex(int index, String value) {
this.index = index;
this.value = value;
}
@Override
public int compareTo(StringWithIndex o) {
int a = this.value.compareTo(o.value);
if (a == 0) {
return Integer.compare(this.index, o.index);
}
return a;
}
@Override
public int hashCode() {
return this.value.hashCode();
}
@Override
public boolean equals(Object obj) {
if (obj instanceof StringWithIndex) {
return this.value.equals(((StringWithIndex) obj).value);
}
return false;
}
}
static class QueryHolder {
String query = null;
}
static Callable<Integer> createSearchForBucket(String query, List<StringWithIndex> values, int start, int end) {
return new Callable<Integer>() {
@Override
public Integer call() throws Exception {
for (int j = start; j < end; ++j) {
StringWithIndex stringWithIndex = values.get(j);
if (query.equals(stringWithIndex.value)) {
return stringWithIndex.index;
}
}
return -1;
}
};
}
static Callable<Integer> createGenerateForBucket(Map<Integer, List<StringWithIndex>> strings,
int count) {
return new Callable<Integer>() {
@Override
public Integer call() throws Exception {
int maxListSize = 0;
for (int i = 0; i < count; ++i) {
String value = String.valueOf(ThreadLocalRandom.current().nextInt());
List<StringWithIndex> values = strings.computeIfAbsent(value.hashCode(), k -> new ArrayList<>());
values.add(new StringWithIndex(i, value));
if (values.size() > maxListSize) {
maxListSize = values.size();
}
}
return maxListSize;
}
};
}
static boolean isDone(ForkJoinTask[] tasks) {
for (int i = 0; i < tasks.length; ++i) {
if (!tasks[i].isDone()) {
return false;
}
}
return true;
}
}
Results:
Benchmark (spinner) Mode Cnt Score Error Units
SearchInMapBillionForkJoin.findInSegment 50 sample 5164328 ≈ 10⁻⁴ ms/op
SearchInMapBillionForkJoin.findInSegment:findInSegment·p0.00 50 sample ≈ 10⁻⁵ ms/op
SearchInMapBillionForkJoin.findInSegment:findInSegment·p0.50 50 sample ≈ 10⁻⁴ ms/op
SearchInMapBillionForkJoin.findInSegment:findInSegment·p0.90 50 sample ≈ 10⁻⁴ ms/op
SearchInMapBillionForkJoin.findInSegment:findInSegment·p0.95 50 sample ≈ 10⁻⁴ ms/op
SearchInMapBillionForkJoin.findInSegment:findInSegment·p0.99 50 sample ≈ 10⁻⁴ ms/op
SearchInMapBillionForkJoin.findInSegment:findInSegment·p0.999 50 sample ≈ 10⁻⁴ ms/op
SearchInMapBillionForkJoin.findInSegment:findInSegment·p0.9999 50 sample 0.009 ms/op
SearchInMapBillionForkJoin.findInSegment:findInSegment·p1.00 50 sample 18.973 ms/op
SearchInMapBillionForkJoin.findInSegment 100 sample 4642775 ≈ 10⁻⁴ ms/op
SearchInMapBillionForkJoin.findInSegment:findInSegment·p0.00 100 sample ≈ 10⁻⁵ ms/op
SearchInMapBillionForkJoin.findInSegment:findInSegment·p0.50 100 sample ≈ 10⁻⁴ ms/op
SearchInMapBillionForkJoin.findInSegment:findInSegment·p0.90 100 sample ≈ 10⁻⁴ ms/op
SearchInMapBillionForkJoin.findInSegment:findInSegment·p0.95 100 sample ≈ 10⁻⁴ ms/op
SearchInMapBillionForkJoin.findInSegment:findInSegment·p0.99 100 sample ≈ 10⁻⁴ ms/op
SearchInMapBillionForkJoin.findInSegment:findInSegment·p0.999 100 sample ≈ 10⁻⁴ ms/op
SearchInMapBillionForkJoin.findInSegment:findInSegment·p0.9999 100 sample 0.005 ms/op
SearchInMapBillionForkJoin.findInSegment:findInSegment·p1.00 100 sample 0.038 ms/op
Before do any global conclusions, is good to know some criticism for this example:
Here you can get idea, that check is there key in map (or memory segment), obviously, is better then go over all data. This theme is very broad. There are many people who work with performance and can say that "Performance optimization is infinite process". :) I also should remind that "Pre-optimization is bad", and from me add that it does not mean that you should design your system without thinking, irrationally.
Disclaimer: All this information could be wrong. It is intended for information purposes only, and may not be incorporated into any contract. Before use it for production scenarios you should check on your own. And you should not use this information in production code refers to me. I'm not responsible for possible loss of money. All this information is not refers to any companies where I ever work. I'm not affiliated with any of MySQL/MongoDB/Cassandra/BigTable/BigData and also Apache Ignite/Hazelcast/Vertica/Clickhouse/Aerospike or any other database.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With