Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use multiprocessing to drop duplicates in a very big list?

Let's say I have a huge list containing random numbers for example

L = [random.randrange(0,25000000000) for _ in range(1000000000)]

I need to get rid of the duplicates in this list

I wrote this code for lists containing a smaller number of elements

def remove_duplicates(list_to_deduplicate):
seen = set()
result=[]
for i in list_to_deduplicate:
    if i not in seen:
        result.append(i)
        seen.add(i)
return result

In the code above I create a set so I can memorize what numbers have already appeared in the list I'm working on if the number is not in the set then I add it to the result list I need to return and save it in the set so it won't be added again in the result list

Now for 1000000 number in a list all is good I can get a result fast but for numbers superior to let's say 1000000000 problems arise I need to use the different cores on my machine to try and break up the problem and then combine the results from multiple processes

My first guess was to make a set accessible to all processes but many complications will arise How can a process read while maybe another one is adding to the set and I don't even know if it is possible to share a set between processes I know we can use a Queue or a pipe but I'm not sure on how to use it

Can someone give me an advice on what is the best way to solve this problem I am open to any new idea

like image 595
Charbel.AY Avatar asked Jan 16 '20 02:01

Charbel.AY


1 Answers

I'm skeptic even your greatest list is big enough so that multiprocessing would improve timings. Using numpy and multithreading is probably your best chance.

Multiprocessing introduces quite some overhead and increases memory consumption like @Frank Merrow rightly mentioned earlier. That's not the case (to that extend) for multithreading, though. It's important to not mix these terms up because processes and threads are not the same. Threads within the same process share their memory, distinct processes do not.

The problem with going multi-core in Python is the GIL, which doesn't allow multiple threads (in the same process) to execute Python bytecode in parallel. Some C-extensions like numpy can release the GIL, this enables profiting from multi-core parallelism with multithreading. Here's your chance to get some speed up on top of a big improvement just by using numpy.

from multiprocessing.dummy import Pool  # .dummy uses threads
import numpy as np

r = np.random.RandomState(42).randint(0, 25000000000, 100_000_000)
n_threads = 8

result = np.unique(np.concatenate(
    Pool(n_threads).map(np.unique, np.array_split(r, n_threads)))
).tolist()

Use numpy and a thread-pool, split up the array, make the sub-arrays unique in separate threads, then concatenate the sub-arrays and make the recombined array once more unique again. The final dropping of duplicates for the recombined array is necessary because within the sub-arrays only local duplicates can be identified.

For low entropy data (many duplicates) using pandas.unique instead of numpy.unique can be much faster. Unlike numpy.unique it also preserves order of appearance.

Note that using a thread-pool like above makes only sense if the numpy-function is not already multi-threaded under the hood by calling into low-level math libraries. So, always test to see if it actually improves performance and don't take it for granted.


Tested with 100M random generated integers in the range:

  • High entropy: 0 - 25_000_000_000 (199560 duplicates)
  • Low entropy: 0 - 1000

Code

import time
import timeit
from multiprocessing.dummy import Pool  # .dummy uses threads

import numpy as np
import pandas as pd


def time_stmt(stmt, title=None):
    t = timeit.repeat(
        stmt=stmt,
        timer=time.perf_counter_ns, repeat=3, number=1, globals=globals()
    )
    print(f"\t{title or stmt}")
    print(f"\t\t{min(t) / 1e9:.2f} s")


if __name__ == '__main__':

    n_threads = 8  # machine with 8 cores (4 physical cores)

    stmt_np_unique_pool = \
"""
np.unique(np.concatenate(
    Pool(n_threads).map(np.unique, np.array_split(r, n_threads)))
).tolist()
"""

    stmt_pd_unique_pool = \
"""
pd.unique(np.concatenate(
    Pool(n_threads).map(pd.unique, np.array_split(r, n_threads)))
).tolist()
"""
    # -------------------------------------------------------------------------

    print(f"\nhigh entropy (few duplicates) {'-' * 30}\n")
    r = np.random.RandomState(42).randint(0, 25000000000, 100_000_000)

    r = list(r)
    time_stmt("list(set(r))")

    r = np.asarray(r)
    # numpy.unique
    time_stmt("np.unique(r).tolist()")
    # pandas.unique
    time_stmt("pd.unique(r).tolist()")    
    # numpy.unique & Pool
    time_stmt(stmt_np_unique_pool, "numpy.unique() & Pool")
    # pandas.unique & Pool
    time_stmt(stmt_pd_unique_pool, "pandas.unique() & Pool")

    # ---
    print(f"\nlow entropy (many duplicates) {'-' * 30}\n")
    r = np.random.RandomState(42).randint(0, 1000, 100_000_000)

    r = list(r)
    time_stmt("list(set(r))")

    r = np.asarray(r)
    # numpy.unique
    time_stmt("np.unique(r).tolist()")
    # pandas.unique
    time_stmt("pd.unique(r).tolist()")
    # numpy.unique & Pool
    time_stmt(stmt_np_unique_pool, "numpy.unique() & Pool")
    # pandas.unique() & Pool
    time_stmt(stmt_pd_unique_pool, "pandas.unique() & Pool")

Like you can see in the timings below, just using numpy without multithreading already accounts for the biggest performance improvement. Also note pandas.unique() being faster than numpy.unique() (only) for many duplicates.

high entropy (few duplicates) ------------------------------

    list(set(r))
        32.76 s
    np.unique(r).tolist()
        12.32 s
    pd.unique(r).tolist()
        23.01 s
    numpy.unique() & Pool
        9.75 s
    pandas.unique() & Pool
        28.91 s

low entropy (many duplicates) ------------------------------

    list(set(r))
        5.66 s
    np.unique(r).tolist()
        4.59 s
    pd.unique(r).tolist()
        0.75 s
    numpy.unique() & Pool
        1.17 s
    pandas.unique() & Pool
        0.19 s
like image 74
Darkonaut Avatar answered Sep 28 '22 05:09

Darkonaut