Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

lua simple mutex for threaded app

I'm writing extension/plugin for some app.

In its documentation it's said, that there are 2 threads for plugin:
- "main thread" where all business logic must live
- "callback thread" where application invokes callbacks with predefined names by events, these callbacks must not do complicated things and return as soon as possible

documentation is not very clear so it's possible that callbacks are invoked not from one thread but from many threads.

I've wrote dummy mutex, that works like that:

Mutex = class {
    _lock = function(self, owner)
        assert(owner, 'owner must be set')
        local ts = Timestamp.get()
        local key = tostring(owner) .. ' ' .. tostring(ts)
        self[key] = ts
        for k, v in pairs(self) do
            if k ~= key and v <= ts then
                self[key] = nil
                return false
            end
        end
        return key
    end,
    lock = function(self, owner, wait)
        local wait = wait or 0.01
        local k
        repeat
            k = self:_lock(owner)
            if k then return k else sleep(wait) end
        until k
    end,
    unlock = function(self, key)
        self[key] = nil
    end
}

and use it for making thread safe queue like this:

ThreadSafeQueue = class {
    new = function(cls)
        return getmetatable(cls).new(cls, {
            mx_queue = Mutex:new(),
            mx_push = Mutex:new(),
        })
    end,
    pop = function(self)
        local lock_queue = self.mx_queue:lock(self)
        local val
        if #self then
            val = table.remove(self, 1)
        else
            val = nil
        end
        self.mx_queue:unlock(lock_queue)
        return val
    end,
    push = function(self, val)
        if val == nil then return end
        -- don't `push()` from few threads at the same time
        local lock_push = self.mx_push:lock(val)
        -- don't `pop()` when `push()` and `push()` when `pop()`
        local lock_queue = self.mx_queue:lock(self)
        self[#self + 1] = val
        self.mx_queue:unlock(lock_queue)
        self.mx_push:unlock(lock_push)
    end
}

class here is helper that returns object with prototype lookups and :new() method which sets metatable.

Main problem is that I'm not sure what pairs() does.
- If original table will be modified while it's being iterated, will this loop return at least old state?
- Is it possible that some k, v will not be iterated in such case?

Other problem is that application I'm writing for is really black box and I'm not even sure on which os it will run (Win, Mac, Linux).
Everything that I know 100% that I have threads and socket module.


Could you review provided code?
Will it work?

Is there any other possibilities for mutex.
May be socket will give something?

like image 268
akaRem Avatar asked Jun 09 '26 13:06

akaRem


1 Answers

option for socket:
try to create socket, if success, then mutex locked, else - wait it for close

local Mutex = class {
    identities = {},
    new = function(cls, identity)
        assert(not cls.identities[identity])
        local inst = getmetatable(cls).new(cls, {
            port = identity,
            server = nil
        })
        cls.identities[identity] = inst
        return inst
    end,
    lock = function(self, wait)
        local wait = wait or 0.01
        local server
        local ts = Timestamp.get()
        repeat
            server = socket.bind("*", self.port)
            if server then
                self.server = server
                return true
            else
                sleep(wait)
            end
            assert(Timestamp.get() - ts < 3, 'deadlock')
        until server
    end,
    unlock = function(self)
        self.server:close()
        self.server = nil
    end
}
like image 194
akaRem Avatar answered Jun 11 '26 06:06

akaRem