I'm writing extension/plugin for some app.
In its documentation it's said, that there are 2 threads for plugin:
- "main thread" where all business logic must live
- "callback thread" where application invokes callbacks with predefined names by events, these callbacks must not do complicated things and return as soon as possible
documentation is not very clear so it's possible that callbacks are invoked not from one thread but from many threads.
I've wrote dummy mutex, that works like that:
Mutex = class {
_lock = function(self, owner)
assert(owner, 'owner must be set')
local ts = Timestamp.get()
local key = tostring(owner) .. ' ' .. tostring(ts)
self[key] = ts
for k, v in pairs(self) do
if k ~= key and v <= ts then
self[key] = nil
return false
end
end
return key
end,
lock = function(self, owner, wait)
local wait = wait or 0.01
local k
repeat
k = self:_lock(owner)
if k then return k else sleep(wait) end
until k
end,
unlock = function(self, key)
self[key] = nil
end
}
and use it for making thread safe queue like this:
ThreadSafeQueue = class {
new = function(cls)
return getmetatable(cls).new(cls, {
mx_queue = Mutex:new(),
mx_push = Mutex:new(),
})
end,
pop = function(self)
local lock_queue = self.mx_queue:lock(self)
local val
if #self then
val = table.remove(self, 1)
else
val = nil
end
self.mx_queue:unlock(lock_queue)
return val
end,
push = function(self, val)
if val == nil then return end
-- don't `push()` from few threads at the same time
local lock_push = self.mx_push:lock(val)
-- don't `pop()` when `push()` and `push()` when `pop()`
local lock_queue = self.mx_queue:lock(self)
self[#self + 1] = val
self.mx_queue:unlock(lock_queue)
self.mx_push:unlock(lock_push)
end
}
class here is helper that returns object with prototype lookups and :new() method which sets metatable.
Main problem is that I'm not sure what pairs() does.
- If original table will be modified while it's being iterated, will this loop return at least old state?
- Is it possible that some k, v will not be iterated in such case?
Other problem is that application I'm writing for is really black box and I'm not even sure on which os it will run (Win, Mac, Linux).
Everything that I know 100% that I have threads and socket module.
Could you review provided code?
Will it work?
Is there any other possibilities for mutex.
May be socket will give something?
option for socket:
try to create socket, if success, then mutex locked, else - wait it for close
local Mutex = class {
identities = {},
new = function(cls, identity)
assert(not cls.identities[identity])
local inst = getmetatable(cls).new(cls, {
port = identity,
server = nil
})
cls.identities[identity] = inst
return inst
end,
lock = function(self, wait)
local wait = wait or 0.01
local server
local ts = Timestamp.get()
repeat
server = socket.bind("*", self.port)
if server then
self.server = server
return true
else
sleep(wait)
end
assert(Timestamp.get() - ts < 3, 'deadlock')
until server
end,
unlock = function(self)
self.server:close()
self.server = nil
end
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With