Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to have ZeroMQ to timeout messages that are outbound queued, but haven't been sent within a set time?

I have an application running on a server which takes requests from a phone app and then load balances the request across worker servers. I'm trying to add a timeout in the case that messages on the main server that have been in the outbound queue for the length of the timeout are removed from the queue. More specifically, the application on the main server is written in golang and implements the Paranoid Pirate Pattern of load balancing. The code I currently have is:

import (
    "fmt"
    zmq "github.com/pebbe/zmq4"
    "time"
)

const (
    HEARTBEAT_LIVENESS = 3
    HEARTBEAT_INTERVAL = 1500 * time.Millisecond

    MESSAGE_READY     = "\001"
    MESSAGE_HEARTBEAT = "\002"
)

var (
    client *zmq.Socket
    backend *zmq.Socket
    frontend *zmq.Socket
    workerPoller *zmq.Poller
    brokerPoller *zmq.Poller
    workerQueue []Worker
)

type Worker struct {
    Id string
    Expire time.Time
}

type RequestWrapper {
    RequestToSend Request

}

func NewWorker(id string) Worker {
    return Worker{
        Id: id,
        Expire: time.Now().Add(HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS),
    }
}

func AddReadyWorker(workers []Worker, worker Worker) []Worker {
    fmt.Println(worker.Id, " joined")
    for i, w := range workers {
        if worker.Id == w.Id {
            if i == 0 {
                workers = workers[1:]
            } else if i == len(workers)-1 {
                workers = workers[:i]
            } else {
                workers = append(workers[:i], workers[i+1:]...)
            }
            break
        }
    }
    return append(workers, worker)
}

func PurgeInactiveWorkers() {
    now := time.Now()
    for i, worker := range workerQueue {
        if now.Before(worker.Expire) {
            workerQueue = workerQueue[i:]
            return
        }
    }

    workerQueue = workerQueue[0:0]
}

func LoadBalance() {
// Loop:
    heartbeat := time.Tick(HEARTBEAT_INTERVAL)
    for {
        var sockets []zmq.Polled

        // If you have available workers, poll on the both front and backend
        // If not poll on backend with infinite timeout
        if len(workerQueue) > 0 {
            sockets, _ = brokerPoller.Poll(HEARTBEAT_INTERVAL)
        } else {
            sockets, _ = workerPoller.Poll(HEARTBEAT_INTERVAL)
        }

        for _, socket := range sockets {
            switch socket.Socket {
                // backend is a router
                case backend:
                    workerId, _ := backend.Recv(0)
                    workerQueue = AddReadyWorker(workerQueue, NewWorker(workerId))
                    clientId, _ := backend.Recv(0)
                    if clientId != MESSAGE_READY && clientId != MESSAGE_HEARTBEAT {
                        route, _ := backend.Recv(0)
                        message, _ := backend.RecvBytes(0)

                        fmt.Println("Received response")
                        RouteResponse(route, message)

                        // frontend.Send(clientId, zmq.SNDMORE)
                        // frontend.Send("", zmq.SNDMORE)
                        // frontend.SendBytes(message, 0)
                    }
                // frontend is a dealer
                case frontend:
                    clientId, _ := frontend.Recv(0)
                    route, _ := frontend.Recv(0)
                    message, _ := frontend.RecvBytes(0)

                    backend.Send(workerQueue[0].Id, zmq.SNDMORE)
                    backend.Send(clientId, zmq.SNDMORE)
                    backend.Send(route, zmq.SNDMORE)
                    backend.SendBytes(message, 0)

                    workerQueue = workerQueue[1:]
            }
        }

        select {
            case <-heartbeat:
                for _, worker := range workerQueue {
                    backend.Send(worker.Id, zmq.SNDMORE)
                    backend.Send(MESSAGE_HEARTBEAT, 0)
                }
                break
            default:
        }

        PurgeInactiveWorkers()
    }
}

If the backend sends a message, but it is not actually sent to a worker in some period of time, I want it to expire and not ever be sent. Is there a socket option that can accomplish this? If not, what would I have to do to accomplish this?

Two ways I think I can do this without socket options are:

1) Have the backend wrap the message in a wrapper and send to a golang queue and not through zeromq. The wrapper contains the time that the message was "sent". The backend concurrently pulls from the front of the golang queue one at a time and checks if the message is expired. If so, don't send, if not, send the message. I could have the backend add the message to the golang queue first and then truly send it out after in the same block of code. That way, I don't need a lock.

2) Send the wrapper message through zeromq to a retriever and the retriever checks if its expired and returns early. I don't like this because it seems like its bad for performance.

like image 414
Riyadh Rahman Avatar asked Oct 30 '22 06:10

Riyadh Rahman


1 Answers

What you're trying to do is use communication as an execution rendezvous. The sender wants to know something about when the receiver gets messages.

ZMQ implements the Actor model. What you need is a modification of the Communicating Sequential Processes model (one where sends timeout). Basically you need to add control message flows to/from the workers, the idea being that the server asks the worker to receive a message and the server waits for the reply. The reply means that the worker is ready to receive a message right now, and that the server and worker have both rendezvoused at a send/receive in their program flows. If that reply fails to arrive within timeout seconds, then the server doesn't send the actual message.

Or you could cheat by having everything going to the workers regardless, wrapped in a message that carries a "sent at time X" field, and have the worker decide to discard old messages.

like image 114
bazza Avatar answered Nov 15 '22 10:11

bazza