Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Understanding the actor model by modeling a bank

I'm trying to understand how the actor model works by modeling a bank. First, here's some code illustrating why we need models for concurrent systems:

import time

from threading import Thread

bank = {'joe': 100}

class Withdrawal(Thread):
    """
    Models a concurrent withdrawal for 'joe'. In this example, 'bank'
    is a shared resource not protected and accessible from any thread.

    Args:
        amount (double) how much to withdraw
        sleep  (bool)   config to sleep the thread during the withdrawal
    """

    def __init__(self, amount, sleep = False):
        self.amount = amount
        self.sleep  = sleep

        Thread.__init__(self)

    def run(self):
        """
        Overrides method in Thread.

        Returns: void
        """
        balance = bank['joe']
        if balance >= self.amount:
            if self.sleep:
                time.sleep(5)
            bank['joe'] -= self.amount

t1 = Withdrawal(80, True)
t2 = Withdrawal(80)

t1.start()
t2.start()

After running the code, the balance for 'joe' should be -60 after five seconds. This is because bank is unprotected from concurrent access, and pausing for five seconds during concurrent execution means that we can't guarantee that the data won't be accessed at different states. In this case, the first thread accesses the bank after the second thread has finished withdrawing but doesn't check that a withdrawal is still possible. As a result the account goes negative.

If we model the bank and withdrawals as actors, we can protect access to the account since its state is managed on a different thread that's separate from those trying to withdraw from it.

from queue     import Queue
from threading import Thread

import time
import random

class Actor(Thread):
    """
    Models an actor in the actor model for concurrent computation
    see https://en.wikipedia.org/wiki/Actor_model for theoretical overview

    Args:
        handles (dict) mapping of public methods that are callable
            on message data after message has been read
    """

    def __init__(self, handles):

        self.handles = handles
        self.mailbox = Queue()
        Thread.__init__(self, daemon=True)

    def run(self):
        """
        Overrides method in Thread. Once the thread has started,
        we listen for messages and process one by one when they are received

        Returns: void
        """

        self.read_messages()

    def send(self, actor, message):
        """
        Puts a Message in the recipient actor's mailbox

        Args:
            actor   (Actor)   to receive message
            message (Message) object to send actor

        Returns: void
        """

        actor.mailbox.put(message)

    def read_messages(self):
        """
        Reads messages one at a time and calls the target class handler

        Returns: void
        """

        while 1:
            message = self.mailbox.get()
            action  = message.target
            if action in self.handles:
                self.handles[action](message.data)

class Message:
    """
    Models a message in the actor model

    Args:
        sender (Actor)  instance that owns the message
        data   (dict)   message data that can be consumed
        target (string) function in the recipient Actor to we'd like run when read
    """

    def __init__(self, sender, data, target):
        self.sender = sender
        self.data   = data
        self.target = target

class Bank(Actor):
    """
    Models a bank. Can be used in concurrent computations.

    Args:
        bank (dict) name to amount mapping that models state of Bank
    """

    def __init__(self, bank):
        self.bank = bank
        Actor.__init__(self, {'withdraw': lambda data: self.withdraw(data)})

    def withdraw(self, data):
        """
        Action handler for 'withdraw' messages. Withdraw
        if we can cover the requested amount 

        Args:
            data (dict) message data

        Returns: void
        """

        name, amount = data['name'], data['amount']

        if self.bank[name] >= amount:
            if data['sleep']:
                time.sleep(2)
            self.bank[name] -= amount

class Withdrawal(Actor):
    """
    Models a withdrawal. Can be used in concurrent computations.

    Args:
        bank  (Bank) shared resource to transact with
        sleep (bool) config to request that the bank sleep during a withdrawal
    """

    def __init__(self, bank, sleep=False):
        self.bank = bank
        self.sleep = sleep
        Actor.__init__(self, {})

    def withdraw(self, name, amount):
        """
        Wrapper for sending a withdrawl message

        Args:
            name   (string) owner of the account in our bank
            amount (double) amount we'd like to withdraw

        Returns: void
        """

        data = {'sleep': self.sleep, 'name': name, 'amount': amount}
        Actor.send(self, self.bank, Message(self, data, 'withdraw'))

Let's now test:

bank = Bank({'joe': 100})
bank.start()

actors = []
for _ in range(100):
    a = Withdrawal(bank, random.randint(0, 1))
    a.start()
    actors.append(a)

for a in actors:
    a.withdraw('joe', 15)

Is this understanding correct? Even though the bank sleeps during withdrawals, no simultaneous withdrawal can corrupt the data because it's managed on a different thread than the withdrawals.

like image 825
rookie Avatar asked Aug 11 '16 23:08

rookie


1 Answers

Simultaneous withdrawal can no longer occur, true, but it's because the withdraw messages are handled serially, not concurrently, by the single Bank thread inside the Bank.read_messages loop. This means that the sleep commands are also executed serially; the entire message queue is going to stall and yield control for 2 seconds whenever the bank has to sleep during a withdrawal. (Given the Bank's modeled action, this is essentially unavoidable).

like image 133
Tore Eschliman Avatar answered Sep 27 '22 16:09

Tore Eschliman