package Analizo::Batch::Runner::Parallel;
use strict;
use warnings;
use ZMQ::FFI qw(ZMQ_PUSH ZMQ_PULL ZMQ_REQ ZMQ_REP);
use YAML::XS;
use parent qw( Analizo::Batch::Runner );
$YAML::XS::LoadBlessed = 1;
$YAML::XS::LoadCode = 1;
$YAML::XS::DumpCode = 1;
sub new {
my ($class, $parallelism) = @_;
$parallelism ||= 2;
$class->SUPER::new(parallelism => $parallelism);
}
sub parallelism {
my ($self) = @_;
return $self->{parallelism};
}
sub actually_run {
my ($self, $batch, $output) = @_;
$self->start_workers();
$self->coordinate_workers($batch, $output);
$self->wait_for_workers();
}
sub _socket_spec {
my ($name, $ppid) = @_;
return "ipc:///tmp/.analizo-$name-$ppid";
}
sub start_workers {
my ($self) = @_;
$self->{workers} = [];
my $n = $self->parallelism();
my $ppid = $$;
for my $i (1..$n) {
my $pid = fork();
if ($pid) {
# on parent
push(@{$self->{workers}}, $pid);
} else {
# on child
$0 = '[analizo worker]';
worker($ppid);
exit();
}
}
my $distributor_pid = fork();
if ($distributor_pid) {
push(@{$self->{workers}}, $distributor_pid);
} else {
$0 = '[analizo queue]';
distributor($ppid, $n);
exit();
}
}
sub wait_for_workers {
my ($self) = @_;
for my $pid (@{$self->{workers}}) {
waitpid($pid, 0);
}
}
sub coordinate_workers {
my ($self, $batch, $output) = @_;
my $context = ZMQ::FFI->new();
my $queue = $context->socket(ZMQ_PUSH);
$queue->bind(_socket_spec('queue', $$));
my $results = $context->socket(ZMQ_PULL);
$results->bind(_socket_spec('results', $$));
# push jobs to queue
my $results_expected = 0;
while (my $job = $batch->next()) {
$queue->send(Dump($job));
$results_expected++;
}
$queue->send(Dump({}));
# collect results
my $results_received = 0;
while ($results_received < $results_expected) {
my $msg = $results->recv();
my $job = Load($msg);
$output->push($job);
$results_received++;
$self->report_progress($job, $results_received, $results_expected);
}
}
sub distributor {
my ($parent_pid, $number_of_workers) = @_;
my $context = ZMQ::FFI->new();
my $queue = $context->socket(ZMQ_PULL);
$queue->connect(_socket_spec('queue', $parent_pid));
my $job_source = $context->socket(ZMQ_REP);
$job_source->bind(_socket_spec('job_source', $parent_pid));
my @queue;
my $job;
while(1) {
my $msg = $queue->recv();
$job = Load($msg);
last if !exists($job->{id});
push(@queue, $job);
}
my $workers_finished = 0;
while ($workers_finished < $number_of_workers) {
$job_source->recv();
if(scalar(@queue) > 0) {
$job = shift(@queue);
$job_source->send(Dump($job));
} else {
$job_source->send(Dump({}));
$workers_finished++;
}
}
}
sub worker {
my ($parent_pid) = @_;
my $context = ZMQ::FFI->new();
my $source = $context->socket(ZMQ_REQ);
$source->connect(_socket_spec('job_source', $parent_pid));
my $results = $context->socket(ZMQ_PUSH);
$results->connect(_socket_spec('results', $parent_pid));
my $run = 1;
my $last_job = undef;
while ($run) {
$source->send('');
my $msg = $source->recv();
my $job = Load($msg);
if (exists($job->{id})) {
$last_job = $job;
$job->parallel_prepare();
$job->execute();
$results->send(Dump($job));
} else {
# a job without an id means that there are no more jobs to process, we
# should exit.
$run = 0;
}
}
if ($last_job) {
$last_job->parallel_cleanup();
}
}
1;