I have the following simple example of an inter-thread communication problem: I want to run arbitrary "anytime" algorithms in a background thread. An anytime algorithm performs some computation of result type T
incrementally, i.e., it sporadically produces newer, more precise results. In Nim parlance, they are probably best represented by an iterator. In the main thread, I now want to wrap such iterators each in its own thread, with the possibility to query the threads for things like "is there a new value available" or "what is the current computation result".
Since I'm not familiar with Nim's concurrency concepts I have trouble to implement the required inter-thread communication. My idea was to use a TChannel
for the communication. According to this forum post, a TChannel
cannot be used in combination with spawn
but requires to use createThread
. I managed to get the following to compile and run:
import os, threadpool
proc spawnBackgroundJob[T](f: iterator (): T): TChannel[T] =
type Args = tuple[iter: iterator (): T, channel: ptr TChannel[T]]
# I think I have to wrap the iterator to pass it to createThread
proc threadFunc(args: Args) {.thread.} =
echo "Thread is starting"
let iter = args.iter
var channel = args.channel[]
for i in iter():
echo "Sending ", i
channel.send(i)
var thread: TThread[Args]
var channel: TChannel[T]
channel.open()
let args = (f, channel.addr)
createThread(thread, threadFunc, args)
result = channel
# example use in some main thread:
iterator test(): int {.closure.} =
sleep(500)
yield 1
sleep(500)
yield 2
var channel = spawnBackgroundJob[int](test)
for i in 0 .. 10:
sleep(200)
echo channel.peek()
echo "Finished"
Unfortunately, this does not have the expected behavior, i.e., I never receive anything in the main thread. I was told on IRC that the problem is that I do not use global variables. But even after a long time thinking I neither do see exactly why this fails, nor if there is a way to solve it. The problem is that I cannot simply make the variables thread
and channel
global, since they depend on the type T
. I also want to avoid restricting this to only run a single anytime algorithm (or some other fixed number N). I was also told that the approach does not really make sense overall, so maybe I'm just missing that this problem has an entirely different solution?
You're using two different channels in send and recv.
object assign in Nim is deep copy, they're different object.
var channel = args.channel[]
and
result = channel
To explain it, see code snippet below:
type
A = object
x: int
y: int
var a,b: A
var c = cast[ptr A](allocShared0(sizeof(A))) # shared memory allocation
a = c[]
b = c[]
echo a.x, a.y, b.x, b.y, c.x, c.y # output: 000000
a.x = 1
a.y = 2
echo a.x, a.y, b.x, b.y, c.x, c.y # output: 120000
b.x = 3
b.y = 4
echo a.x, a.y, b.x, b.y, c.x, c.y # output: 123400
To pass Channel as parameter and return value, please refer to Jehan's Answer in Nim forum.
paste Jehan's Answer here for quick reference, and make it compile pass in Nim 0.11.2
type SharedChannel[T] = ptr TChannel[T]
proc newSharedChannel[T](): SharedChannel[T] =
result = cast[SharedChannel[T]](allocShared0(sizeof(TChannel[T])))
open(result[])
proc close[T](ch: var SharedChannel[T]) =
close(ch[])
deallocShared(ch)
ch = nil
proc send[T](ch: SharedChannel[T], content: T) =
ch[].send(content)
proc recv[T](ch: SharedChannel[T]): T =
result = ch[].recv
proc someThread(ch: (SharedChannel[string], SharedChannel[bool])) {.thread.} =
let (mainChannel, responseChannel) = ch
while true:
let s = mainChannel.recv
if s == nil:
break
echo s
responseChannel.send(true)
responseChannel.send(false)
proc main() =
var
mainChannel = newSharedChannel[string]()
responseChannel = newSharedChannel[bool]()
th: TThread[(SharedChannel[string], SharedChannel[bool])]
createThread(th, someThread, (mainChannel, responseChannel))
for i in 0..2:
echo("main thread send: " & $i)
mainChannel.send($i)
if not responseChannel.recv:
break
mainChannel.send(nil)
joinThread(th)
close(mainChannel)
close(responseChannel)
main()
Output:
main thread send: 0
0
main thread send: 1
1
main thread send: 2
2
import os, threadpool, macros
template spawnBackgroundJob(t: typedesc, chan:ptr TChannel[t], iter: expr): stmt {.immediate.}=
block:
proc threadFunc(channel: ptr TChannel[t]) {.thread.} =
echo "Thread is starting"
for i in iter:
echo "Sending ", i
channel[].send(i)
channel[].open()
var thread: TThread[ptr TChannel[t]]
createThread(thread, threadFunc, chan)
#joinThread(thread)
# example use in some main thread:
iterator testJob(): int =
yield 0
sleep(500)
yield 1
sleep(500)
yield 2
var channel: ptr TChannel[int]
channel = cast[ptr TChannel[int]](allocShared0(sizeof(TChannel[int])))
spawnBackgroundJob(type(int), channel, testJob())
for i in 1 .. 10:
sleep(200)
echo channel[].peek()
channel[].close()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With