Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Functionality of take! and put! and Channels in Julia

I'm trying to run something that looks like this:

y = @parallel (min) for i in collection
    f(i)
end

where f(i) is a function that's essentially a while loop that counts how many iterations it takes to fulfill its conditions. At the beginning, one of the termination conditions is a predetermined number of iterations, n. However, if f(i) ever returns less than n then ideally I would want to replace n with the value of f(i) (e.g., since I am looking for the minimum f(i), if f(j) is m I would want all other loops to stop checking if they reach m iterations).

I'm new to parallel computing and so I'm likely misinterpreting the documentation, but I think that I should be able to do something like this:

x = Channel{Int64}(1)
put!(x,n)

y = @parallel (min) for i in collection
    f(i,x)
end

close(x)

where I've modified f to take a Channel parameter and now it looks something like this:

@everywhere function f(item,chan)
    going = true
    count = 0
    while (going)
        going = false
        # perform some operations
        if (count < fetch(chan) && !conditions_met())
            # conditions_met checks the other termination conditions
            going = true
            count += 1
        end
    end

    count += 1

    if (count < fetch(chan))
        take!(chan)
        put!(chan,count)
    end

    return count
end

If I replace the first count < fetch(chan) with count < n and remove the other if block/Channel code, the script runs fine. But, since n will be several orders of magnitude larger than the minimum f(i), if I could do something like I've described it would speed up computation significantly. Is this something I should be able to do and if so, am I approaching this correctly?

Right now I am experiencing the following error (running with 4 procs):

ERROR (unhandled task failure): On worker 3:
cannot resize array with shared data
 in shift! at array.jl:501
 in take! at channels.jl:54
 in f at /home/michael/Documents/julia/script.jl:98
 [inlined code] from /home/michael/Documents/julia/script.jl:126
 in anonymous at no file:0
 in anonymous at multi.jl:913
 in run_work_thunk at multi.jl:651
 [inlined code] from multi.jl:913
 in anonymous at task.jl:63
 in remotecall_fetch at multi.jl:737
 in remotecall_fetch at multi.jl:740
 in anonymous at multi.jl:1519
ERROR: LoadError: On worker 2:
cannot resize array with shared data
 in shift! at array.jl:501
 in take! at channels.jl:54
 in f at /home/michael/Documents/julia/script.jl:98
 [inlined code] from /home/michael/Documents/julia/script.jl:126
 in anonymous at no file:0
 in anonymous at multi.jl:913
 in run_work_thunk at multi.jl:651
 [inlined code] from multi.jl:913
 in anonymous at task.jl:63
 in preduce at multi.jl:1523
 [inlined code] from multi.jl:1532
 in anonymous at expr.jl:113
 [inlined code] from /home/michael/Documents/julia/script.jl:125
 in anonymous at no file:0
while loading /home/michael/Documents/julia/script.jl, in expression starting on line 121
ERROR (unhandled task failure): On worker 4:
cannot resize array with shared data
 in shift! at array.jl:501
 in take! at channels.jl:54
 in f at /home/michael/Documents/julia/script.jl:98
 [inlined code] from /home/michael/Documents/julia/script.jl:126
 in anonymous at no file:0
 in anonymous at multi.jl:913
 in run_work_thunk at multi.jl:651
 [inlined code] from multi.jl:913
 in anonymous at task.jl:63
 in remotecall_fetch at multi.jl:737
 in remotecall_fetch at multi.jl:740
 in anonymous at multi.jl:1519
ERROR (unhandled task failure): On worker 5:
cannot resize array with shared data
 in shift! at array.jl:501
 in take! at channels.jl:54
 in f at /home/michael/Documents/julia/script.jl:98
 [inlined code] from /home/michael/Documents/julia/script.jl:126
 in anonymous at no file:0
 in anonymous at multi.jl:913
 in run_work_thunk at multi.jl:651
 [inlined code] from multi.jl:913
 in anonymous at task.jl:63
 in remotecall_fetch at multi.jl:737
 in remotecall_fetch at multi.jl:740
 in anonymous at multi.jl:1519

where line 98 is the take!(chan) statement in the function definition and line 126 is f(i,x) inside the parallel for loop.

like image 623
michael Avatar asked Oct 19 '22 14:10

michael


1 Answers

Channels implement CSP-like semantics for async communication, but they have no automated mechanism of sharing across parallel processes. You need to use RemoteRef for such purpose: http://docs.julialang.org/en/release-0.4/manual/parallel-computing/#remoterefs-and-abstractchannels

like image 142
Ruslan Prokopchuk Avatar answered Oct 23 '22 09:10

Ruslan Prokopchuk