Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

SWI-Prolog: Using message queues for thread-safe database read/writes with `library(persistency)`

SWI-Prolog advertises itself as a single-language replacement for the LAMP stack. When it comes to replacing the M(ySQL), the documentation enumerates several approaches, of which library(persistency) seems to be the simplest approach capable of providing a persistent, updateable database.

The documentation for library(persistency) provides the following example, which shows how to use mutexes to avoid invalid states when reading from, and making updates to, the database:

:- module(user_db,
          [ attach_user_db/1,       % +File
            current_user_role/2,    % ?User, ?Role
            add_user/2,         % +User, +Role
            set_user_role/2     % +User, +Role
          ]).
:- use_module(library(persistency)).

:- persistent
        user_role(name:atom, role:oneof([user,administrator])).

attach_user_db(File) :-
        db_attach(File, []).

%%  current_user_role(+Name, -Role) is semidet.

current_user_role(Name, Role) :-
        with_mutex(user_db, user_role(Name, Role)).

add_user(Name, Role) :-
        assert_user_role(Name, Role).

set_user_role(Name, Role) :-
        user_role(Name, Role), !.
set_user_role(Name, Role) :-
        with_mutex(user_db,
                   (  retractall_user_role(Name, _),
                      assert_user_role(Name, Role))).

However, in the documentation on thread synchronization via mutex, it points out that

The predicate with_mutex/2 behaves as once/1 with respect to the guarded goal. This means that our predicate address/2 is no longer a nice logical non-deterministic relation.

Having to give up "nice logical non-deterministic relations" in order to preserve validity seems like a bad bargain, since nice logical non-deterministic relations are the principle advantage of Prolog! Fortunately, a a more appealing alternative is suggested:

Message queues (see message_queue_create/3) often provide simpler and more robust ways for threads to communicate.

It sounds like I should be able to use message queues, as presented in the documentation on thread communication in order to achieve safe reads/updates without sacrificing the essence of the LP paradigm. Unfortunately, being quite new to threads, I cannot figure out what this use of message queues would look like!

I am hoping someone will be able to transform the library(persistency) example, replacing the use of mutexes with a more congenial use of message queues, assuring the constant validity of the database's state without sacrificing non-deterministic relations.

like image 390
Shon Avatar asked Mar 10 '23 21:03

Shon


1 Answers

In SWI Prolog, every thread has its own message queue. So you can run your database server on a thread and have clients post queries to the database's message queue. The database will process each request one at a time, so that the database is always valid. The queries will still be deterministic (as in once/1), but as you point out, unlike in the cases of with_mutex/2, the database relations themselves can be specified relationally.

[Note, that I'm showing how to do this using the built in SWI Prolog message queues, but you could also use Pengines for this, which is possibly more user friendly, and has built in support for remote execution.]

First, I'll remove the with_mutex calls:

:- module(user_db,
          [ attach_user_db/1,       % +File
            current_user_role/2,    % ?User, ?Role
            add_user/2,         % +User, +Role
            set_user_role/2     % +User, +Role
          ]).
:- use_module(library(persistency)).

:- persistent
        user_role(name:atom, role:oneof([user,administrator])).

attach_user_db(File) :-
        db_attach(File, []).

%%  current_user_role(+Name, -Role) is semidet.

current_user_role(Name, Role) :-
        user_role(Name, Role).

add_user(Name, Role) :-
        assert_user_role(Name, Role).

set_user_role(Name, Role) :-
    user_role(Name, Role), !.
set_user_role(Name, Role) :-
    retractall_user_role(Name, _),
    assert_user_role(Name, Role).

I just added the database server code in the same file, but it should probably go somewhere else. Also, turn on debug messages with :- debug(db), which I find indispensable in multithreaded code.

We need a predicate to start the db_thread. It's name is db and it is "detached," so it will be cleaned up when the system exits. The thread is started with a call to db_run/0.

db_up(File, DbThreadId) :-
    db_attach(File, []),
    thread_create(db_run, DbThreadId, [detached(true), alias(db)]),
    debug(db, 'db thread created~n').   

`db_run/0' is a failure driven loop that runs in the db thread and inspects its message queue for new messages. When a message is received, it is called. Once complete, the loop starts again.

db_run :-
    debug(db, 'db_run:...', []),
    repeat, 
      thread_get_message(db, Msg, []),
      debug(db, 'Received: ~p', [Msg]),
      Msg,            
      debug(db, 'db_query succeeded', []),
      fail.

The client sends messages of the form db_query(<Query>, <ClientThreadId>), so we need a predicate called db_query/2 that actually runs the computation. It sends either a success, failure, or exception message to client thread.

:- meta_predicate user_db:db_query(0,*).
db_query(Goal, ClientId) :-
    catch((Goal -> Status = true; Status = false),
          Err,
          Status = err(Err)), 
    (   Status = true ->
            Response = db_response(succ(Goal))
     ;
     Status = false ->
         Response = db_response(fail)
     ;
     Status = err(Err) ->
         Response = db_response(err(Err))
    ),
    debug(db, 'db_query/2: sending message ~w to ~p', [Response, ClientId]),
    thread_send_message(ClientId, Response).

Finally, we need a predicate that posts a query from the client to the db. After the message is sent, the client waits for a response using client_wait/1.

:- meta_predicate client_post(0).
client_post(Goal) :-
    thread_self(Me),
    Msg = db_query(Goal, Me),
    debug(db, 'client_post/1: sending message ~p...', [Msg]),
    thread_send_message(db, Msg),
    debug(db, 'client_post/1: waiting...', []),
    client_wait(Goal).

client_wait/1 waits for a message of the form db_response() (for at most 1 second before failing, but you probably want to do something smarter). It

:- meta_predicate client_wait(0).
client_wait(Goal) :-
    thread_self(Me), 
    thread_get_message(Me, db_response(Term), [timeout(1)]), % blocks until db_response(_) arrives
    Msg = db_response(Term),
    debug(db, 'Client received ~p', [Msg]),
    (   Term = succ(Goal) ->
            debug(db, 'client_wait/1: exit with true', []),
            true
     ;
     Term = fail ->
         fail
     ;
     Term = err(Err) ->
         throw(Err)
     ;
     domain_error(db_response_message, Msg)
    ).

With this, we can create the db and send queries:

$ swipl -l db_thread.pl 
Welcome to SWI-Prolog (Multi-threaded, 64 bits, Version 7.3.24-127-g9b94a9f-DIRTY)
Copyright (c) 1990-2016 University of Amsterdam, VU Amsterdam
SWI-Prolog comes with ABSOLUTELY NO WARRANTY. This is free software,
and you are welcome to redistribute it under certain conditions.
Please visit http://www.swi-prolog.org for details.

For help, use ?- help(Topic). or ?- apropos(Word).

?- user_db:db_up('db.pl', DB).
db thread created
DB = db.

?- Xs = [bob-administrator, john-user, bill-user], user_db:client_post(forall(member(U-R, Xs), add_user(U, R))).
Xs = [bob-administrator, john-user, bill-user].

?- findall(U, user_db:client_post(current_user_role(U, user)), Users).  %% queries are posted as in once/1
Users = [john].

?- user_db:client_post(findall(U, current_user_role(U, user), Users)).  %% but db predicates are themselves relational
Users = [john, bill].

A small test that this setup preserves the consistency of the database. In this file test_db.pl, I create the db, and run two threads. The one running toggle/0 switches between two user-role dbs and the other, running print/0 just prints out the users and their roles. We switch worlds, randomly spaced in time, 200 times. At the same time, the other thread prints out the db at 200 randomly spaced times.

test_db.pl:

:- use_module(user_db).

:- initialization user_db:db_up('db.pl', _), test.

world(1, [bob-administrator,
       john-user]).

world(2, [bob-user,
         john-administrator]).

set_world(I) :-
    world(I, Xs),
    forall(member(U-R, Xs),
           set_user_role(U, R)).

print_world :-
    findall(U-R,
            current_user_role(U, R),
            URs),
    sort(URs, URs1),
    format('~p~n', [URs1]).



random_sleep :-
    random(R), 
    X is R * 0.05,
    sleep(X).

toggle(0) :- !. 
toggle(N) :-
    forall(world(I, _), 
           (user_db:client_post(set_world(I)),
             random_sleep)),
    succ(N0, N),
    toggle(N0).

print(0) :- !. 
print(N) :-
    user_db:client_post(print_world),
    succ(N0, N),
    random_sleep, 
    print(N0).



test :-

    thread_create(toggle(100), Id1, []), 
    thread_create(print(200), Id2, []),
    thread_join(Id1, _),
    thread_join(Id2, _).

We run this with $ swipl -l test_db.pl:

[bob-administrator,john-user]
[bob-administrator,john-user]
[bob-user,john-administrator]
[bob-administrator,john-user]
[bob-user,john-administrator]
[bob-administrator,john-user]
[bob-user,john-administrator]
[bob-administrator,john-user]
[bob-administrator,john-user]
[bob-user,john-administrator]
[bob-administrator,john-user]
[bob-user,john-administrator]
[bob-administrator,john-user]
...
like image 184
Eyal Avatar answered Apr 06 '23 05:04

Eyal