From Code to Community: Sponsoring The Perl and Raku Conference 2025 Learn more

#!/usr/bin/env perl
use strict;use warnings;
use lib 'lib';
=pod
=head1 NAME
MultiModule - Framework to intelligently manage many parallel tasks
=head1 SYNOPSIS
MultiModule [-q <control_qname>] [-p <module_prefixes>] [-c <config_file>] [-s <state_dir>] [-d <debug_level>] [-m <module>] [-o <out_of_band_config>] [-T <transit_config_dir>]
=head1 WARNING
This is a very early release. That means it has a whole pile of
technical debt. One clear example is that, at this point, this
distribution doesn't even try to function on any OS except Linux.
=head1 DESCRIPTION
The idea is to break a really big problem or job down into many very small
pieces of code that receive and emit messages. Each piece of code is called
a 'task', and either runs in the process space of the control daemon with
a coorperatively multitasking paradigm, or in a separate process, though
controlled and monitored by the MultiModule daemon.
These tasks communicate ONLY via IPC::Transit messages. IPC::Transit uses
the same interface to communicate inside of a process space with direct,
in-memory arrays (that is, about as fast as it is possible to communicate)
as well as between processes on the same box via the venerable System V
IPC Message Queue system. MultiModule will pass messages between tasks using
the most efficient method. The developer need not be over-concerned about
whether various tasks are inside the same process space, or in separate
processes.
A given task can be either stateful or stateless. Stateful tasks have some
in-memory structures that must persist between restarts and crashes.
The MultiModule daemon automatically handles the saving, loading, and
sanity checking of this state.
At this time, there are some limitations, that will likely be lifted at a
later time.
Each external process must have one and only one stateful task. Note that
every process has access to every stateless task, so a stateful, external
task can send messages to any stateless task, and those will be handled
in process.
This also means that a stateless task can not exist as an external process.
As stated above, every stateless task is available in every process. But
only the stateless tasks running in the MultiModule daemon process space
will receive and process messages sent from sources outside of MultiModule.
An example: consider stateless task named 'Foo'. Message sent to 'Foo'
from any MultiModule task will be delivered to 'Foo' in whatever process
space the message originated from. If some other program/process sends
messages to 'Foo', only the 'Foo' task inside of the MultiModule daemon
will receive it.
All tasks, internal and external, are monitored for various resources:
CPU
Memory
Read/Write disk IO
File Handle Count
Queue Fullness
Repeated crashes/restarts
...others...
Some of these have 'failsafe' levels: that is, if a task resource utilization
passes the 'failsafe' level, it is shutdown and will no re-start without
human intervention.
This is all fully configurable.
Tasks can also migrate from internal to external and back. Simply update
the config for a given task with the internal/external parameter, and
MultiModule will automatically handle the migration, if any.
External tasks are rigorously monitored and managed, but they do not have
the MultiModule daemon as parent. So they can restart, MultiModule can
restart, or even be down, with no complications.
=head1 EXAMPLES
The file 'test.conf' contains:
{ Example1 => { outstr => 'howdy' },
Router => {
routes => [
{ match => {
source => 'Example1'
},
forwards => [
{ qname => 'Example2' }
]
}
]
}
}
Example1.pm contains:
package MultiModuleTest::Example1;
use strict;use warnings;
use parent 'App::MultiModule::Task';
sub message {
my $self = shift; my $message = shift;
if(defined $message->{new_ct}) {
$self->{state}->{ct} = $message->{new_ct};
print STDERR "Example1: set ct to $message->{new_ct}\n";
}
}
sub set_config {
my $self = shift; my $config = shift;
$self->{config} = $config; #not necessary in this example
$self->{state} = { ct => 0 } unless $self->{state};
$self->named_recur(
recur_name => 'Example1',
repeat_interval => 1,
work => sub {
my $message = {
ct => $self->{state}->{ct}++,
outstr => $config->{outstr},
};
$self->emit($message);
},
);
}
sub is_stateful { return 'yes!'; }
1;
Example2.pm contains:
package MultiModuleTest::Example2;
use strict;use warnings;
use parent 'App::MultiModule::Task';
sub message {
my $self = shift; my $message = shift;
print STDERR "Example2: received a message: $message->{ct} ($message->{outstr})\n"
if $message->{ct} and $message->{outstr};
}
1;
Run:
$ bin/MultiModule -p MultiModuleTest:: -c test.conf
...
Example2: received a message: 1 (howdy)
Example2: received a message: 2 (howdy)
Example2: received a message: 3 (howdy)
Example2: received a message: 4 (howdy)
Example2: received a message: 5 (howdy)
^Ccaught SIGINT. starting orderly exit
$ bin/MultiModule -p MultiModuleTest:: -c test.conf
Example2: received a message: 6 (howdy)
Example2: received a message: 7 (howdy)
...
In another window:
$ strans '{new_ct => 999}' Example1
Back to the first window:
Example2: received a message: 8 (howdy)
Example2: received a message: 9 (howdy)
Example2: received a message: 10 (howdy)
Example1: received a message: $VAR1 = {
'.ipc_transit_meta' => {
'send_ts' => 1234567890
},
'new_ct' => 999
};
Example1: set ct to 999
Example2: received a message: 11 (howdy)
Example2: received a message: 999 (howdy)
Example2: received a message: 1000 (howdy)
Example2: received a message: 1001 (howdy)
Example2: received a message: 1002 (howdy)
^Ccaught SIGINT. starting orderly exit
The Example1 task simply emits a message every second with an increment
integer. It will accept a configurable starting number. It also accepts
messages that update the current integer within the count.
The Example2 task does nothing except receive messages and display the
integer counter.
Messages are routed per the configuration sent to the Router task.
=head1 OPTIONS
=over 4
=item B<-q> control_qname
Control the MultiModule daemon itself with this queue. This includes:
=over 4
=item B<configuration changes>
=item B<exit_cleanly>
=item B<debugging>
=back
=item B<-p> module_prefixes
Every task that this daemon controls lives in a single module: <task_name>.pm.
This defaults to App::MultiModule::Tasks::, which ships with this
distribution. Any additional name spaces to check must be referenced in this
option.
This does not affect Perl's @INC. Every task/module you use must exist in
@INC, name space prefixed as above.
B<EXAMPLE>
-p Our::Personal::Stuff::
If a task named 'Foo' is referenced, it is looked up as
App::MultiModule::Tasks::Foo
Our::Personal::Stuff::Foo
as iterated across all of the entries in @INC.
In other words, you probably need to:
use lib '/local/code/path';
Assuming Foo.pm is in:
/local/code/path/Our/Personal/Stuff/Foo.pm
=item B<-s> state_dir
This defaults to a directory called 'state' in the current directory.
=item B<-d> debug_level
Just an integer; there is a very deep, rich debugging system available,
and the notion of such an integer influencing it is not fully established
at this point.
=item B<-o> out_of_band_config
<channel_name1>:<handling1>,<channel_nameN>:<handlingN>
B<handling>
1 : STDOUT
2 : STDERR
router : send to the Transit Router
anything else : a filename
B<standard channels>
=over 4
=item B<log>
Basic, run-time information.
Defaults to 1 (STDOUT)
=item B<error>
All kinds of errors, fatal and otherwise. These should always be actionable.
Defaults to 2 (STDERR)
=item B<alert>
Resource alerts; details about tasks and tasklets if they use more than
pre-configured resource levels
Defaults to 2 (STDERR)
=item B<debug>
Debugging information.
Defaults to './debug.out' (file in run-time directory)
B<EXAMPLE>
-o log:1,alert:my_alert_queue,error:/var/log/mything.err,debug:router
Log to STDOUT, send resource alerts to the queue 'my_alert_queue', append
errors to /var/log/mything.err, and send all debug messages to the
Router task, to be handled as configured there.
=back
=item B<-m> module
Users do not normally use this option. For external tasks, the daemon calls
itself with this option set the the name of the external task.
=back
=cut
my $opts = {};
{ my $pristine_opts = join ' ', @ARGV;
my $opt_trans = {
q => 'qname', #optional; defaults to MultiModule_control
s => 'state_dir', #optional
p => 'module_prefixes', #optional
d => 'debug', #optional: level, an integer
m => 'module', #optional
o => 'oob', #optional: out of band; log, warn, error, debug
c => 'config_file', #optional
T => 'transit_config_dir', #optional
};
my $o = {};
getopt('csqpdmoT', $o);
while(my($key, $value) = each %{$o}) {
$opts->{$opt_trans->{$key}} = $value;
}
if($opts->{transit_config_dir}) {
$IPC::Transit::config_dir = $opts->{transit_config_dir};
}
$opts->{pristine_opts} = $pristine_opts;
my $oob = {
alert => 2,
error => 2,
log => 1,
debug => './debug.out',
};
if($opts->{oob}) {
my @chunks = split ',', $opts->{oob};
foreach my $chunk (@chunks) {
my ($channel_name, $channel_action) = split ':', $chunk;
$oob->{$channel_name} = $channel_action;
}
}
$opts->{oob} = $oob;
}
$opts->{qname} = 'MultiModule_control' unless $opts->{qname};
$opts->{state_dir} = 'state' unless $opts->{state_dir};
if($opts->{module} and $opts->{module} eq 'main') {
die "MultiModule: optional option -m <module>, if passed, must not be 'main'";
}
$opts->{module} = 'main' unless $opts->{module};
$opts->{debug} = 0 unless $opts->{debug};
if($opts->{module_prefixes}) {
my @m = split ',', $opts->{module_prefixes};
$opts->{module_prefixes} = \@m;
}
my $api = App::MultiModule::API->new(
state_dir => $opts->{state_dir},
);
my $state = $api->get_task_state($opts->{module});
my $status = $api->get_task_status($opts->{module});
if($status->{is_running}) {
my $running_pid = $status->{pid} || 'unknown';
die "MultiModule: program is already running under $running_pid\n";
}
my $m = App::MultiModule->new(%$opts);
$m->log(Data::Dumper::Dumper $opts);
$m->log('started');
$state->{this} = 'that';
$api->save_task_state($opts->{module}, $state);
$api->save_task_status($opts->{module}, $status);
POE::Kernel->run();
=head1 AUTHOR
Dana M. Diederich, C<< <diederich@gmail.com> >>