use
Dancer
qw/:moose :syntax :script/
;
qw/jq_locked jq_getsome jq_lock jq_warm_thrusters/
;
sub
worker_begin {
my
$self
=
shift
;
my
$wid
=
$self
->wid;
return
debug
"mgr ($wid): no need for manager... skip begin"
if
setting(
'workers'
)->{
'no_manager'
};
debug
"entering Manager ($wid) worker_begin()"
;
jq_warm_thrusters;
$self
->{queue}->enqueuep(200,
App::Netdisco::Backend::Job->new({
job
=> 0,
action
=>
'primeskiplist'
}));
debug
"mgr ($wid): searching for jobs booked to this processing node"
;
my
@jobs
= jq_locked;
if
(
scalar
@jobs
) {
info
sprintf
"mgr (%s): found %s jobs booked to this processing node"
,
$wid
,
scalar
@jobs
;
$self
->{queue}->enqueuep(100,
@jobs
);
}
}
my
$memoize
=
sub
{
no
warnings
'uninitialized'
;
my
$job
=
shift
;
return
join
chr
(28),
map
{
$job
->{
$_
}}
(
qw/action port subaction/
, (
$job
->{device_key} ?
'device_key'
:
'device'
));
};
sub
worker_body {
my
$self
=
shift
;
my
$wid
=
$self
->wid;
if
(setting(
'workers'
)->{
'no_manager'
}) {
prctl
sprintf
'nd2: #%s mgr: inactive'
,
$wid
;
return
debug
"mgr ($wid): no need for manager... quitting"
}
while
(1) {
prctl
sprintf
'nd2: #%s mgr: gathering'
,
$wid
;
my
$num_slots
= 0;
my
%seen_job
= ();
$num_slots
= parse_max_workers( setting(
'workers'
)->{tasks} )
-
$self
->{queue}->pending();
debug
"mgr ($wid): getting potential jobs for $num_slots workers"
if
not
$ENV
{ND2_SINGLE_WORKER};
foreach
my
$job
( jq_getsome(
$num_slots
) ) {
next
if
$seen_job
{
$memoize
->(
$job
) }++;
next
unless
jq_lock(
$job
);
info
sprintf
"mgr (%s): job %s booked out for this processing node"
,
$wid
,
$job
->id;
$self
->{queue}->enqueuep(
$job
->job_priority,
$job
);
}
debug
"mgr ($wid): sleeping now..."
if
not
$ENV
{ND2_SINGLE_WORKER};
prctl
sprintf
'nd2: #%s mgr: idle'
,
$wid
;
sleep
( setting(
'workers'
)->{sleep_time} || 1 );
}
}
1;