I'm trying to understand how the actor model works by modeling a bank. First, here's some code illustrating why we need models for concurrent systems:
import time
from threading import Thread
bank = {'joe': 100}
class Withdrawal(Thread):
"""
Models a concurrent withdrawal for 'joe'. In this example, 'bank'
is a shared resource not protected and accessible from any thread.
Args:
amount (double) how much to withdraw
sleep (bool) config to sleep the thread during the withdrawal
"""
def __init__(self, amount, sleep = False):
self.amount = amount
self.sleep = sleep
Thread.__init__(self)
def run(self):
"""
Overrides method in Thread.
Returns: void
"""
balance = bank['joe']
if balance >= self.amount:
if self.sleep:
time.sleep(5)
bank['joe'] -= self.amount
t1 = Withdrawal(80, True)
t2 = Withdrawal(80)
t1.start()
t2.start()
After running the code, the balance for 'joe'
should be -60
after five seconds. This is because bank
is unprotected from concurrent access, and pausing for five seconds during concurrent execution means that we can't guarantee that the data won't be accessed at different states. In this case, the first thread accesses the bank after the second thread has finished withdrawing but doesn't check that a withdrawal is still possible. As a result the account goes negative.
If we model the bank and withdrawals as actors, we can protect access to the account since its state is managed on a different thread that's separate from those trying to withdraw from it.
from queue import Queue
from threading import Thread
import time
import random
class Actor(Thread):
"""
Models an actor in the actor model for concurrent computation
see https://en.wikipedia.org/wiki/Actor_model for theoretical overview
Args:
handles (dict) mapping of public methods that are callable
on message data after message has been read
"""
def __init__(self, handles):
self.handles = handles
self.mailbox = Queue()
Thread.__init__(self, daemon=True)
def run(self):
"""
Overrides method in Thread. Once the thread has started,
we listen for messages and process one by one when they are received
Returns: void
"""
self.read_messages()
def send(self, actor, message):
"""
Puts a Message in the recipient actor's mailbox
Args:
actor (Actor) to receive message
message (Message) object to send actor
Returns: void
"""
actor.mailbox.put(message)
def read_messages(self):
"""
Reads messages one at a time and calls the target class handler
Returns: void
"""
while 1:
message = self.mailbox.get()
action = message.target
if action in self.handles:
self.handles[action](message.data)
class Message:
"""
Models a message in the actor model
Args:
sender (Actor) instance that owns the message
data (dict) message data that can be consumed
target (string) function in the recipient Actor to we'd like run when read
"""
def __init__(self, sender, data, target):
self.sender = sender
self.data = data
self.target = target
class Bank(Actor):
"""
Models a bank. Can be used in concurrent computations.
Args:
bank (dict) name to amount mapping that models state of Bank
"""
def __init__(self, bank):
self.bank = bank
Actor.__init__(self, {'withdraw': lambda data: self.withdraw(data)})
def withdraw(self, data):
"""
Action handler for 'withdraw' messages. Withdraw
if we can cover the requested amount
Args:
data (dict) message data
Returns: void
"""
name, amount = data['name'], data['amount']
if self.bank[name] >= amount:
if data['sleep']:
time.sleep(2)
self.bank[name] -= amount
class Withdrawal(Actor):
"""
Models a withdrawal. Can be used in concurrent computations.
Args:
bank (Bank) shared resource to transact with
sleep (bool) config to request that the bank sleep during a withdrawal
"""
def __init__(self, bank, sleep=False):
self.bank = bank
self.sleep = sleep
Actor.__init__(self, {})
def withdraw(self, name, amount):
"""
Wrapper for sending a withdrawl message
Args:
name (string) owner of the account in our bank
amount (double) amount we'd like to withdraw
Returns: void
"""
data = {'sleep': self.sleep, 'name': name, 'amount': amount}
Actor.send(self, self.bank, Message(self, data, 'withdraw'))
Let's now test:
bank = Bank({'joe': 100})
bank.start()
actors = []
for _ in range(100):
a = Withdrawal(bank, random.randint(0, 1))
a.start()
actors.append(a)
for a in actors:
a.withdraw('joe', 15)
Is this understanding correct? Even though the bank sleeps during withdrawals, no simultaneous withdrawal can corrupt the data because it's managed on a different thread than the withdrawals.
Simultaneous withdrawal can no longer occur, true, but it's because the withdraw
messages are handled serially, not concurrently, by the single Bank
thread inside the Bank.read_messages
loop. This means that the sleep
commands are also executed serially; the entire message queue is going to stall and yield control for 2 seconds whenever the bank has to sleep during a withdrawal. (Given the Bank
's modeled action, this is essentially unavoidable).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With