Perl can do asynchronous programming with modules like IO::Async or Coro, but it's single threaded. You can compile Perl with threads, which provide multi-threaded computing.
A message queue is a data structure for holding messages from the time they're sent until the time the receiver retrieves and acts on them. Generally queues are used as a way to 'connect' producers (of data) & consumers (of data). A thread pool is a pool of threads that do some sort of processing.
I'm trying to accomplish the following:
Have a thread that reads data from a very large file say about 10GB and push them into the queue. (I do not wish for the queue to get very large either)
While the buildQueue
thread is pushing data to the queue at the same time have
about 5 worker threads de-queue and process data.
I've made an attempt but my other threads are unreachable because of a continuous loop in my buildQueue
thread.
My approach may be totally wrong. Thanks for any help, it's much appreciated.
Here's the code for buildQueue
:
sub buildQueue {
print "Enter a file name: ";
my $dict_path = <STDIN>;
chomp($dict_path);
open DICT_FILE, $dict_path or die("Sorry, could not open file!");
while (1) {
if (<DICT_FILE>) {
if ($queue->pending() < 100) {
my $query = <DICT_FILE>;
chomp($query);
$queue->enqueue($query);
my $count = $queue->pending();
print "Queue Size: $count Query: $query\n";
}
}
}
}
And as I've expected when this thread gets executed nothing else after will be executed because this thread will not finish.
my $builder = new Thread(&buildQueue);
Since the builder thread will be running for a long time I never get to create worker threads.
Here's the entire code:
#!/usr/bin/perl -w
use strict;
use Thread;
use Thread::Queue;
my $queue = new Thread::Queue();
my @threads;
sub buildQueue {
print "Enter a file name: ";
my $dict_path = <STDIN>;
chomp($dict_path);
open dict_file, $dict_path or die("Sorry, could not open file!");
while (1) {
if (<dict_file>) {
if ($queue->pending() < 100) {
my $query = <dict_file>;
chomp($query);
$queue->enqueue($query);
my $count = $queue->pending();
print "Queue Size: $count Query: $query\n";
}
}
}
}
sub processor {
my $query;
while (1) {
if ($query = $queue->dequeue) {
print "$query\n";
}
}
}
my $builder = new Thread(&buildQueue);
push @threads, new Thread(&processor) for 1..5;
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With