register
'register_worker'
=>
sub
{
my
(
$self
,
$first
,
$second
) = plugin_args(
@_
);
my
$workerconf
= (
ref
$first
eq
'HASH'
?
$first
: {});
my
$code
= (
ref
$first
eq
'CODE'
?
$first
:
$second
);
return
error
"bad param to register_worker"
unless
((
ref
sub
{} eq
ref
$code
) and (
ref
{} eq
ref
$workerconf
));
my
$package
= (
caller
)[0];
(
$workerconf
->{
package
} =
$package
) =~ s/^App::Netdisco::Worker::Plugin:://;
if
(
$package
=~ m/Plugin::(\w+)(?:::(\w+))?/) {
$workerconf
->{action} ||=
lc
($1);
$workerconf
->{namespace} ||=
lc
($2)
if
$2;
}
return
error
"failed to parse action in '$package'"
unless
$workerconf
->{action};
$workerconf
->{title} ||=
''
;
$workerconf
->{phase} ||=
'user'
;
$workerconf
->{namespace} ||=
'_base_'
;
$workerconf
->{priority} ||= (
exists
$workerconf
->{driver}
? (setting(
'driver_priority'
)->{
$workerconf
->{driver}} || 0) : 0);
my
$worker
=
sub
{
my
$job
=
shift
or
die
'missing job param'
;
debug YELLOW,
"\N{RIGHTWARDS BLACK ARROW} worker "
, GREY10,
$workerconf
->{
package
},
(
$workerconf
->{pyworklet} ? (
' '
.
$workerconf
->{pyworklet}) :
''
),
GREY10,
' p'
, MAGENTA,
$workerconf
->{priority},
(
$workerconf
->{title} ? (GREY10,
' "'
, BRIGHT_BLUE,
$workerconf
->{title}, GREY10,
'"'
) :
''
),
RESET;
if
(
$job
->is_cancelled) {
return
$job
->add_status( Status->info(
'skip: job is cancelled'
) );
}
if
(
$job
->is_offline
and
$workerconf
->{phase} eq
'main'
and
$workerconf
->{priority} > 0
and
$workerconf
->{priority} < setting(
'driver_priority'
)->{
'direct'
}) {
return
$job
->add_status( Status->info(
'skip: networked worker but job is running offline'
) );
}
return
$job
->add_status( Status->info(
'skip: namespace passed at higher priority'
) )
if
$job
->namespace_passed(
$workerconf
);
if
(
$job
->only_namespace and
$workerconf
->{phase} ne
'check'
) {
if
(not (
$workerconf
->{namespace} eq
lc
(
$job
->only_namespace )
or ((
$job
->only_namespace ne
'hooks'
) and (
$workerconf
->{phase} eq
'early'
)
and (
$job
->device and not
$job
->device->in_storage)) )) {
return
;
}
}
my
@newuserconf
= ();
my
@userconf
= @{ dclone (setting(
'device_auth'
) || []) };
if
(
ref
$job
->device) {
my
$no
= (
exists
$workerconf
->{
no
} ?
$workerconf
->{
no
} :
undef
);
my
$only
= (
exists
$workerconf
->{only} ?
$workerconf
->{only} :
undef
);
return
$job
->add_status( Status->info(
'skip: acls restricted'
) )
if
(
$no
and acl_matches(
$job
->device,
$no
))
or (
$only
and not acl_matches_only(
$job
->device,
$only
));
foreach
my
$stanza
(
@userconf
) {
next
if
exists
$stanza
->{driver} and
exists
$workerconf
->{driver}
and ((
$stanza
->{driver} ||
''
) ne (
$workerconf
->{driver} ||
''
));
next
if
exists
$stanza
->{action}
and not _find_matchaction(
$workerconf
,
lc
(
$stanza
->{action}));
push
@newuserconf
, dclone
$stanza
;
}
return
$job
->add_status( Status->info(
'skip: driver or action not applicable'
) )
if
0 ==
scalar
@newuserconf
and
$workerconf
->{priority} > 0
and
$workerconf
->{priority} < setting(
'driver_priority'
)->{
'direct'
};
}
my
$guard
= guard { set(
device_auth
=> \
@userconf
) };
set(
device_auth
=> \
@newuserconf
);
if
(
$ENV
{ND2_WORKER_ROLL_CALL}) {
return
Status->info(
'-'
);
}
else
{
$code
->(
$job
,
$workerconf
);
}
};
push
@{ vars->{
'workers'
}->{
$workerconf
->{action}}
->{
$workerconf
->{phase}}
->{
$workerconf
->{namespace}}
->{
$workerconf
->{priority}} },
$worker
;
};
sub
_find_matchaction {
my
(
$conf
,
$action
) =
@_
;
return
true
if
!
defined
$action
;
$action
= [
$action
]
if
ref
[] ne
ref
$action
;
foreach
my
$f
(
@$action
) {
return
true
if
$f
eq
$conf
->{action} or
$f
eq
"$conf->{action}::$conf->{namespace}"
;
}
return
false;
}
register_plugin;
true;