Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Raku Cro service subscribing to data "in the background" general guidance

Tags:

raku

cro

I am attempting to put together a Cro service that has a react/whenever block consuming data "in the background" So unlike many examples of websocket usage with Cro, this has nothing to do with routes that may be accessed via the browser.

My use case is to consume message received via an MQTT topic and do some processing with them. At a later stage in development I might create a supply out of this data, but for now, when data is received it will be stored in a variable and dependant on certain conditions, be sent to another service via a http post.

My thought was to include a provider() in the Cro::HTTP::Server setup like so:

use Cro::HTTP::Log::File;
use Cro::HTTP::Server;

use Routes;
use DataProvider; # Here

my Cro::Service $http = Cro::HTTP::Server.new(
        http => <1.1>,
        host => ...,
        port => ...,
        application => [routes(), provider()], # Made this into an array of subs?
        after => [
            Cro::HTTP::Log::File.new(logs => $*OUT, errors => $*ERR)
        ]
    );

And in the DataProvider.pm6:

use MQTT::Client;

sub provider() is export {
    my $mqtt  = MQTT::Client.new: server => 'localhost';
    react {
        whenever $mqtt.subscribe('some/mqtt/topic') {
            say "+ topic: { .<topic> } => { .<message>.decode("utf8-c8") }";
        }
    }
}

This throws a bunch of errors:

A react block:
  in sub provider at DataProvider.pm6 (DataProvider) line 5
  in block <unit> at service.p6 line 26

Died because of the exception:
    Invocant of method 'write' must be an object instance of type
    'IO::Socket::Async', not a type object of type 'IO::Socket::Async'.  Did
    you forget a '.new'?
      in method subscribe at /home/cam/raku/share/perl6/site/sources/42C762836A951A1C11586214B78AD34262EC465F (MQTT::Client) line 133
      in sub provider at DataProvider.pm6 (DataProvider) line 6
      in block <unit> at service.p6 line 26

To be perfectly honest, I am totally guessing that this is how I would approach the need to subscribe to data in the background of a Cro service, but I was not able to find any information on what might be considered the recommended approach.

Initially I had my react/whenever block in the main service.pm6 file, but that did not seem right. And needed to be wrapped in a start{} block because as I have just learned, react is blocking :) and cro was not able to actually start.

But following the pattern of how Routes are implemented seemed logical, but I am missing something. The error speaks about setting up a new method, but I'm not convinced that is the root cause. Routes.pm6 does not have a constructor.

Can anyone point me in the right direction please?

like image 592
camstuart Avatar asked Feb 17 '21 13:02

camstuart


3 Answers

You seem to be fine now but when I first saw this I made this https://github.com/jonathanstowe/Cro-MQTT which turns the MQTT client into a first class Cro service.

I haven't released it yet but it may be instructive.

like image 65
Jonathan Stowe Avatar answered Jan 21 '23 01:01

Jonathan Stowe


Thanks to all who have provided information, this has been a very valuable learning exercise.

The approach of passing additional sub routines, along side router() in the application parameter to Cro::HTTP::Server.new gave further trouble. (an array is not allowed, and broke routing)

Instead, I have moved the background work into a class of it's own, and given it a start and stop method more akin to Cro::HTTP::Server.

My new approach:

service.pm6

use Cro::HTTP::Log::File;
use Cro::HTTP::Server;

use Routes;
use KlineDataSubscriber; # Moved mqtt functionality here 
use Database;

my $dsn         = "host=localhost port=5432 dbname=act user=.. password=..";
my $dbh         = Database.new :$dsn;

my $mqtt-host   = 'localhost';
my $subscriber  = KlineDataSubscriber.new :$mqtt-host;

$subscriber.start; # Inspired by $http.start below

my Cro::Service $http = Cro::HTTP::Server.new(
    http => <1.1>,
    host => ...,
    port => ...,
    application => routes($dbh), # Basically back the way it was originally 
    after => [
        Cro::HTTP::Log::File.new(logs => $*OUT, errors => $*ERR)
    ]
);

$http.start;
say "Listening at...";
react {
    whenever signal(SIGINT) {
        say "Shutting down...";
        $subscriber.stop;
        $http.stop;
        done;
    }
}

And in KlineDataSubscriber.pm6

use MQTT::Client;

class KlineDataSubscriber {
    has Str $.mqtt-host is required;
    has MQTT::Client $.mqtt = Nil;

    submethod TWEAK() {
        $!mqtt = MQTT::Client.new: server => $!mqtt-host;
        await $!mqtt.connect;
    }

    method start(Str $topic = 'act/feed/exchange/binance/kline-closed/+/json') {
        start {
            react {
                whenever $!mqtt.subscribe($topic) {
                    say "+ topic: { .<topic> } => { .<message>.decode("utf8-c8") }";
                }
            }
        }
    }

    method stop() {
        # TODO Figure how to unsubscribe and cleanup nicely
    }
}

This feels much more "Cro idiomatic" to me, but I would be happy to be corrected. More importantly, it works as expected and I feel is somewhat future proof. I should be able to create a supply to make real-time data available to the router, and push data to any connected web clients.

I also intend to have a http GET endpoint /status with various checks to ensure everything healthy

like image 45
camstuart Avatar answered Jan 21 '23 00:01

camstuart


The root cause

The error speaks about setting up a new method, but I'm not convinced that is the root cause.

It's not about setting up a new method. It's about a value that should be defined instead being undefined. That typically means a failure to attempt to initialize it, which typically means a failure to call .new.

Can anyone point me in the right direction please?

Hopefully this question helps.

Finding information on a recommended approach

I am totally guessing that this is how I would approach the need to subscribe to data in the background of a Cro service, but I was not able to find any information on what might be considered the recommended approach.

It might be helpful for you to list which of the get-up-to-speed steps you've followed from Getting started with Cro, including the basics but also the "Learn about" steps at the end.

The error message

A react block:
  in sub provider ...

Died because of the exception:
    ... 
      in method subscribe ...

The error message begins with the built in react construct reporting that it caught an exception (and handled it by throwing its own exception in response). A "backtrace" corresponding to where the react appeared in your code is provided indented from the initial "A react block:".

The error message continues with the react construct summarizing its own exception (Died because ...) and explains itself by reporting the original exception, further indented, in subsequent lines. This includes another backtrace, this time one corresponding to the original exception, which will likely have occurred on a different thread with a different callstack.

(All of Raku's structured multithreading constructs[1] use this two part error reporting approach for exceptions they catch and handle by throwing another exception.)


The first backtrace indicates the react line:

in sub provider at DataProvider.pm6 (DataProvider) line 5
use MQTT::Client;

sub provider() is export {
    my $mqtt  = MQTT::Client.new: server => 'localhost';
    react {

The second backtrace is about the original exception:

    Invocant of method 'write' must be an object instance of type
    'IO::Socket::Async', not a type object of type 'IO::Socket::Async'. ...
      in method subscribe at ... (MQTT::Client) line 133

This reports that the write method called on line 133 of MQTT::Client requires its invocant is an instance of type 'IO::Socket::Async'. The value it got was of that type but was not an instance, but instead a "type object". (All values of non-native types are either type objects or instances of their type.).

The error message concludes with:

  Did you forget a '.new'?

This is a succinct hint based on the reality that 99 times out of a hundred the reason a type object is encountered when an instance is required is that code has failed to initialize a variable. (One of the things type objects are used for is to serve the role of "undefined" in languages like Perl.)

So, can you see why something that should have been an initialized instance of 'IO::Socket::Async' is instead an uninitialized one?

Footnotes

[1] Raku's constructs for parallelism, concurrency, and asynchrony follow the structured programming paradigm. See Parallelism, Concurrency, and Asynchrony in Raku for Jonathan Worthington's video presentation of this overall approach. Structured constructs like react can cleanly observe, contain, and manage events that occur anywhere within their execution scope, including errors such as error exceptions, even if they happen on other threads.

like image 30
raiph Avatar answered Jan 21 '23 01:01

raiph