Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Julia pmap performance

I am trying to port some of my R code to Julia; Basically I have rewritten the following R code in Julia:

library(parallel)

eps_1<-rnorm(1000000)
eps_2<-rnorm(1000000)

large_matrix<-ifelse(cbind(eps_1,eps_2)>0,1,0)
matrix_to_compare = expand.grid(c(0,1),c(0,1))
indices<-seq(1,1000000,4)
large_matrix<-lapply(indices,function(i)(large_matrix[i:(i+3),]))

function_compare<-function(x){
  which((rowSums(x==matrix_to_compare)==2) %in% TRUE)
}

> system.time(lapply(large_matrix,function_compare))
   user  system elapsed 
 38.812   0.024  38.828 
> system.time(mclapply(large_matrix,function_compare,mc.cores=11))
   user  system elapsed 
 63.128   1.648   6.108 

As one can notice I am getting significant speed-up when going from one core to 11. Now I am trying to do the same in Julia:

#Define cluster:

addprocs(11);

using Distributions;
@everywhere using Iterators;
d = Normal();

eps_1 = rand(d,1000000);
eps_2 = rand(d,1000000);


#Create a large matrix:
large_matrix = hcat(eps_1,eps_2).>=0;
indices = collect(1:4:1000000)

#Split large matrix:
large_matrix = [large_matrix[i:(i+3),:] for i in indices];

#Define the function to apply:
@everywhere function function_split(x)
    matrix_to_compare = transpose(reinterpret(Int,collect(product([0,1],[0,1])),(2,4)));
    matrix_to_compare = matrix_to_compare.>0;
    find(sum(x.==matrix_to_compare,2).==2)
end

@time map(function_split,large_matrix )
@time pmap(function_split,large_matrix )

   5.167820 seconds (22.00 M allocations: 2.899 GB, 12.83% gc time)
   18.569198 seconds (40.34 M allocations: 2.082 GB, 5.71% gc time)

As one can notice I am not getting any speed up with pmap. Maybe somebody can suggest alternatives.

like image 692
Vitalijs Avatar asked Jul 05 '16 15:07

Vitalijs


1 Answers

I think that some of the problem here is that @parallel and @pmap don't always handle moving data to and from the workers very well. Thus, they tend to work best in situations where what you are executing doesn't require very much data movement at all. I also suspect that there are probably things that could be done to improve their performance, but I'm not certain on the details.

For situations in which you do need more data moving around, it may be best to stick with options that directly call functions on workers, with those functions then accessing objects within the memory space of those workers. I give one example below, which speeds up your function using multiple workers. It uses perhaps the simplest option, which is @everywhere, but @spawn, remotecall() etc. are also worth considering, depending on your situation.

addprocs(11);

using Distributions;
@everywhere using Iterators;
d = Normal();

eps_1 = rand(d,1000000);
eps_2 = rand(d,1000000);

#Create a large matrix:
large_matrix = hcat(eps_1,eps_2).>=0;
indices = collect(1:4:1000000);

#Split large matrix:
large_matrix = [large_matrix[i:(i+3),:] for i in indices];

large_matrix = convert(Array{BitArray}, large_matrix);

function sendto(p::Int; args...)
    for (nm, val) in args
        @spawnat(p, eval(Main, Expr(:(=), nm, val)))
    end
end

getfrom(p::Int, nm::Symbol; mod=Main) = fetch(@spawnat(p, getfield(mod, nm)))

@everywhere function function_split(x::BitArray)
    matrix_to_compare = transpose(reinterpret(Int,collect(product([0,1],[0,1])),(2,4)));
    matrix_to_compare = matrix_to_compare.>0;
    find(sum(x.==matrix_to_compare,2).==2)
end


function distribute_data(X::Array, WorkerName::Symbol)
    size_per_worker = floor(Int,size(X,1) / nworkers())
    StartIdx = 1
    EndIdx = size_per_worker
    for (idx, pid) in enumerate(workers())
        if idx == nworkers()
            EndIdx = size(X,1)
        end
        @spawnat(pid, eval(Main, Expr(:(=), WorkerName, X[StartIdx:EndIdx])))
        StartIdx = EndIdx + 1
        EndIdx = EndIdx + size_per_worker - 1
    end
end

distribute_data(large_matrix, :large_matrix)


function parallel_split()
    @everywhere begin
        if myid() != 1
            result = map(function_split,large_matrix );
        end
    end
    results = cell(nworkers())
    for (idx, pid) in enumerate(workers())
        results[idx] = getfrom(pid, :result)
    end
    vcat(results...)
end

## results given after running once to compile
@time a = map(function_split,large_matrix); ## 6.499737 seconds (22.00 M allocations: 2.899 GB, 13.99% gc time)
@time b = parallel_split();  ## 1.097586 seconds (1.50 M allocations: 64.508 MB, 3.28% gc time)

julia> a == b
true

Note: even with this, the speedup is not perfect from the multiple processes. But, this is to be expected, since there is still a moderate amount of data to be returned as a result of your function, and that data's got to be moved, taking time.

P.S. See this post (Julia: How to copy data to another processor in Julia) or this package (https://github.com/ChrisRackauckas/ParallelDataTransfer.jl) for more on the sendto and getfrom functions I used here.

like image 168
Michael Ohlrogge Avatar answered Oct 23 '22 20:10

Michael Ohlrogge