Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Nim inter-thread message passing: How to avoid a global TChannel?

I have the following simple example of an inter-thread communication problem: I want to run arbitrary "anytime" algorithms in a background thread. An anytime algorithm performs some computation of result type T incrementally, i.e., it sporadically produces newer, more precise results. In Nim parlance, they are probably best represented by an iterator. In the main thread, I now want to wrap such iterators each in its own thread, with the possibility to query the threads for things like "is there a new value available" or "what is the current computation result".

Since I'm not familiar with Nim's concurrency concepts I have trouble to implement the required inter-thread communication. My idea was to use a TChannel for the communication. According to this forum post, a TChannel cannot be used in combination with spawn but requires to use createThread. I managed to get the following to compile and run:

import os, threadpool

proc spawnBackgroundJob[T](f: iterator (): T): TChannel[T] =

  type Args = tuple[iter: iterator (): T, channel: ptr TChannel[T]]

  # I think I have to wrap the iterator to pass it to createThread
  proc threadFunc(args: Args) {.thread.} =
    echo "Thread is starting"
    let iter = args.iter
    var channel = args.channel[]

    for i in iter():
      echo "Sending ", i
      channel.send(i)

  var thread: TThread[Args]
  var channel: TChannel[T]
  channel.open()

  let args = (f, channel.addr)
  createThread(thread, threadFunc, args)

  result = channel


# example use in some main thread:
iterator test(): int {.closure.} =
  sleep(500)
  yield 1
  sleep(500)
  yield 2

var channel = spawnBackgroundJob[int](test)

for i in 0 .. 10:
  sleep(200)
  echo channel.peek()

echo "Finished"

Unfortunately, this does not have the expected behavior, i.e., I never receive anything in the main thread. I was told on IRC that the problem is that I do not use global variables. But even after a long time thinking I neither do see exactly why this fails, nor if there is a way to solve it. The problem is that I cannot simply make the variables thread and channel global, since they depend on the type T. I also want to avoid restricting this to only run a single anytime algorithm (or some other fixed number N). I was also told that the approach does not really make sense overall, so maybe I'm just missing that this problem has an entirely different solution?

like image 874
bluenote10 Avatar asked Apr 29 '15 18:04

bluenote10


1 Answers

Reason:

You're using two different channels in send and recv.

object assign in Nim is deep copy, they're different object.

var channel = args.channel[]

and

result = channel

To explain it, see code snippet below:

type
  A = object
    x: int
    y: int

var a,b: A

var c = cast[ptr A](allocShared0(sizeof(A))) # shared memory allocation

a = c[]
b = c[]

echo a.x, a.y, b.x, b.y, c.x, c.y # output: 000000

a.x = 1
a.y = 2

echo a.x, a.y, b.x, b.y, c.x, c.y # output: 120000

b.x = 3
b.y = 4

echo a.x, a.y, b.x, b.y, c.x, c.y # output: 123400

Solution to pass channel in and out proc:

To pass Channel as parameter and return value, please refer to Jehan's Answer in Nim forum.

paste Jehan's Answer here for quick reference, and make it compile pass in Nim 0.11.2

type SharedChannel[T] = ptr TChannel[T]

proc newSharedChannel[T](): SharedChannel[T] =
  result = cast[SharedChannel[T]](allocShared0(sizeof(TChannel[T])))
  open(result[])

proc close[T](ch: var SharedChannel[T]) =
  close(ch[])
  deallocShared(ch)
  ch = nil

proc send[T](ch: SharedChannel[T], content: T) =
  ch[].send(content)


proc recv[T](ch: SharedChannel[T]): T =
  result = ch[].recv

proc someThread(ch: (SharedChannel[string], SharedChannel[bool])) {.thread.} =
  let (mainChannel, responseChannel) = ch
  while true:
    let s = mainChannel.recv
    if s == nil:
      break
    echo s
    responseChannel.send(true)
  responseChannel.send(false)

proc main() =
  var
    mainChannel = newSharedChannel[string]()
    responseChannel = newSharedChannel[bool]()
    th: TThread[(SharedChannel[string], SharedChannel[bool])]
  createThread(th, someThread, (mainChannel, responseChannel))
  for i in 0..2:
    echo("main thread send: " & $i)
    mainChannel.send($i)
    if not responseChannel.recv:
      break
  mainChannel.send(nil)
  joinThread(th)
  close(mainChannel)
  close(responseChannel)

main()

Output:

main thread send: 0
0
main thread send: 1
1
main thread send: 2
2

One more step, solution to this question:

import os, threadpool, macros

template spawnBackgroundJob(t: typedesc, chan:ptr TChannel[t], iter: expr): stmt {.immediate.}=
  block:
    proc threadFunc(channel: ptr TChannel[t]) {.thread.} =
      echo "Thread is starting"

      for i in iter:
        echo "Sending ", i
        channel[].send(i)

    channel[].open()

    var thread: TThread[ptr TChannel[t]]
    createThread(thread, threadFunc, chan)
    #joinThread(thread)


# example use in some main thread:
iterator testJob(): int =
  yield 0
  sleep(500)
  yield 1
  sleep(500)
  yield 2

var channel: ptr TChannel[int]
channel = cast[ptr TChannel[int]](allocShared0(sizeof(TChannel[int])))
spawnBackgroundJob(type(int), channel, testJob())

for i in 1 .. 10:
  sleep(200)
  echo channel[].peek()

channel[].close()
like image 156
Roger Avatar answered Jan 04 '23 05:01

Roger