our
@ISA
=
qw(Exporter)
;
our
@EXPORT_OK
=
qw(queue_job)
;
our
%EXPORT_TAGS
= (
all
=> \
@EXPORT_OK
);
our
$VERSION
=
'0.97'
;
use
constant
FIRST_DEFERRED_ID
=> -2_100_000;
use
constant
QUEUE_PRIORITY_INCREMENT
=> 1.0E-6;
use
constant
DEFAULT_QUEUE_MONITOR_FREQ
=> 30;
our
(
@QUEUE
,
$QUEUE_MONITOR_PID
,
$QUEUE_MONITOR_PPID
,
$QUEUE_MONITOR_FREQ
);
our
$QUEUE_DEBUG
=
$ENV
{FORKS_SUPER_QUEUE_DEBUG} || 0;
our
$QUEUE_MONITOR_LIFESPAN
= 14400;
our
$DEFAULT_QUEUE_PRIORITY
= 0;
our
$INHIBIT_QUEUE_MONITOR
= 1;
my
$NEXT_DEFERRED_ID
= FIRST_DEFERRED_ID;
our
$OLD_QINTERRUPT_SIG
;
our
(
$MAIN_PID
,
$_LOCK
) = ($$,0);
our
$CHECK_FOR_REAP
= 1;
sub
get_default_priority {
my
$q
=
$DEFAULT_QUEUE_PRIORITY
;
$DEFAULT_QUEUE_PRIORITY
-=
&QUEUE_PRIORITY_INCREMENT
;
return
$q
;
}
sub
init {
tie
$QUEUE_MONITOR_FREQ
,
'Forks::Super::Deferred::QueueMonitorFreq'
,
&DEFAULT_QUEUE_MONITOR_FREQ
;
tie
$INHIBIT_QUEUE_MONITOR
,
'Forks::Super::Deferred::InhibitQueueMonitor'
,
&IS_WIN32
;
tie
$Forks::Super::QUEUE_INTERRUPT
,
'Forks::Super::Deferred::QueueInterrupt'
, (
''
,
keys
%SIG
);
if
(
grep
{/USR1/}
keys
%SIG
) {
$Forks::Super::QUEUE_INTERRUPT
=
'USR1'
;
}
return
;
}
sub
deinit {
untie
$QUEUE_MONITOR_FREQ
;
untie
$INHIBIT_QUEUE_MONITOR
;
untie
$Forks::Super::QUEUE_INTERRUPT
;
}
sub
init_child {
@QUEUE
= ();
undef
$QUEUE_MONITOR_PID
;
if
(
$Forks::Super::QUEUE_INTERRUPT
&&
$Forks::Super::SysInfo::CONFIG
{SIGUSR1}) {
$SIG
{
$Forks::Super::QUEUE_INTERRUPT
} =
'DEFAULT'
;
}
return
;
}
sub
_launch_queue_monitor {
if
(!
$Forks::Super::SysInfo::CONFIG
{
'SIGUSR1'
}) {
debug(
'_lqm returning: no SIGUSR1'
)
if
$QUEUE_DEBUG
;
return
;
}
if
(
defined
$QUEUE_MONITOR_PID
) {
debug(
'_lqm returning: $QUEUE_MONITOR_PID defined'
)
if
$QUEUE_DEBUG
;
return
;
}
if
(
$Forks::Super::SysInfo::CONFIG
{
'setitimer'
}) {
_launch_queue_monitor_setitimer();
}
else
{
_launch_queue_monitor_fork();
}
return
;
}
sub
_check_queue {
check_queue();
return
;
}
sub
_launch_queue_monitor_setitimer {
$QUEUE_MONITOR_PPID
= $$;
$QUEUE_MONITOR_PID
=
'setitimer'
;
$XSIG
{ALRM}[2] = \
&_check_queue
;
Time::HiRes::setitimer(
&Time::HiRes::ITIMER_REAL
,
$QUEUE_MONITOR_FREQ
,
$QUEUE_MONITOR_FREQ
);
return
;
}
sub
_launch_queue_monitor_fork {
if
(!
$Forks::Super::QUEUE_INTERRUPT
) {
debug(
'_lqm returning: $Forks::Super::QUEUE_INTERRUPT not set'
)
if
$QUEUE_DEBUG
;
return
;
}
$OLD_QINTERRUPT_SIG
=
$SIG
{
$Forks::Super::QUEUE_INTERRUPT
};
$SIG
{
$Forks::Super::QUEUE_INTERRUPT
} = \
&check_queue
;
$QUEUE_MONITOR_PPID
= $$;
$QUEUE_MONITOR_PID
= CORE::
fork
();
if
(not
defined
$QUEUE_MONITOR_PID
) {
warn
'Forks::Super: '
,
"queue monitoring sub process could not be launched: $!\n"
;
undef
$QUEUE_MONITOR_PPID
;
return
;
}
if
(
$QUEUE_MONITOR_PID
== 0) {
_launch_queue_monitor_fork_child();
exit
0;
}
return
;
}
sub
_launch_queue_monitor_fork_child {
$0 =
"QMon:$QUEUE_MONITOR_PPID"
;
if
(
$DEBUG
||
$QUEUE_DEBUG
) {
debug(
"Launching queue monitor process $$ "
,
"SIG $Forks::Super::QUEUE_INTERRUPT "
,
"PPID $QUEUE_MONITOR_PPID "
,
"FREQ $QUEUE_MONITOR_FREQ "
);
}
if
(
defined
&Forks::Super::init_child
) {
Forks::Super::init_child();
}
else
{
init_child();
}
close
STDIN;
open
STDIN,
'<'
, Forks::Super::Util::DEVNULL();
close
STDOUT;
open
STDOUT,
'>'
, Forks::Super::Util::DEVNULL();
if
(!
$DEBUG
&& !
$QUEUE_DEBUG
) {
close
STDERR;
open
STDERR,
'>'
, Forks::Super::Util::DEVNULL();
}
umask
0;
chdir
'/'
;
$SIG
{
'TERM'
} =
'DEFAULT'
;
my
$expire
=
time
+
$QUEUE_MONITOR_LIFESPAN
;
my
$consecutive_failures
= 0;
while
(
time
<
$expire
&&
$consecutive_failures
< 10) {
CORE::
sleep
$QUEUE_MONITOR_FREQ
;
if
(
$DEBUG
||
$QUEUE_DEBUG
) {
debug(
"queue monitor $$ passing signal to $QUEUE_MONITOR_PPID"
);
}
if
(CORE::
kill
$Forks::Super::QUEUE_INTERRUPT
,
$QUEUE_MONITOR_PPID
) {
$consecutive_failures
= 0;
}
else
{
$consecutive_failures
++;
}
last
if
time
>
$expire
;
}
return
;
}
sub
_kill_queue_monitor {
if
(
defined
(
$QUEUE_MONITOR_PPID
) && $$ ==
$QUEUE_MONITOR_PPID
) {
if
(
defined
$QUEUE_MONITOR_PID
) {
if
(
$DEBUG
||
$QUEUE_DEBUG
) {
debug(
"killing queue monitor $QUEUE_MONITOR_PID"
);
}
if
(
$QUEUE_MONITOR_PID
eq
'setitimer'
) {
$XSIG
{ALRM}[1] =
undef
;
$XSIG
{ALRM}[2] =
undef
;
Time::HiRes::setitimer(
&Time::HiRes::ITIMER_REAL
, 0);
undef
$QUEUE_MONITOR_PID
;
undef
$QUEUE_MONITOR_PPID
;
}
elsif
(
$QUEUE_MONITOR_PID
> 0) {
CORE::
kill
'TERM'
,
$QUEUE_MONITOR_PID
;
my
$z
= CORE::
waitpid
$QUEUE_MONITOR_PID
, 0;
if
(
$DEBUG
||
$QUEUE_DEBUG
) {
debug(
"kill queue monitor result: $z"
);
}
undef
$QUEUE_MONITOR_PID
;
undef
$QUEUE_MONITOR_PPID
;
if
(
defined
$OLD_QINTERRUPT_SIG
) {
$SIG
{
$Forks::Super::QUEUE_INTERRUPT
} =
$OLD_QINTERRUPT_SIG
;
}
}
}
}
return
;
}
sub
_cleanup {
_kill_queue_monitor();
return
;
}
sub
queue_job {
my
$job
=
shift
;
if
(
&Forks::Super::Job::_INSIDE_END_QUEUE
) {
return
;
}
if
(
defined
$job
) {
$job
->{state} =
'DEFERRED'
;
$job
->{queued} ||= Time::HiRes::
time
();
$job
->{pid} =
$NEXT_DEFERRED_ID
--;
$Forks::Super::ALL_JOBS
{
$job
->{pid}} =
$job
;
if
(
$job
->{debug} ||
$QUEUE_DEBUG
) {
debug(
'queueing job '
,
$job
->toString());
}
}
my
@q
=
grep
{
$_
->{state} eq
'DEFERRED'
}
@Forks::Super::ALL_JOBS
;
@QUEUE
=
@q
;
if
(
@QUEUE
> 0 && !
$QUEUE_MONITOR_PID
&& !
$INHIBIT_QUEUE_MONITOR
) {
_launch_queue_monitor();
}
elsif
(
@QUEUE
== 0 &&
defined
(
$QUEUE_MONITOR_PID
)) {
_kill_queue_monitor();
}
return
;
}
sub
_check_for_reap {
if
(
$CHECK_FOR_REAP
&&
$Forks::Super::Sigchld::REAP
> 0) {
if
(
$DEBUG
||
$QUEUE_DEBUG
) {
debug(
'reap during queue examination -- restart'
);
}
return
1;
}
return
;
}
sub
run_queue {
my
(
$ignore
) =
@_
;
if
(
@QUEUE
<= 0) {
return
;
}
if
(
&Forks::Super::Job::_INSIDE_END_QUEUE
) {
return
;
}
{
no
warnings
'once'
;
return
if
$Forks::Super::CHILD_FORK_OK
<= 0
&& $$ != (
$Forks::Super::MAIN_PID
||
$MAIN_PID
);
}
queue_job();
return
if
@QUEUE
<= 0;
if
(
$_LOCK
++ > 0) {
$_LOCK
--;
return
;
}
debug(
'run_queue(): examining deferred jobs'
)
if
$DEBUG
||
$QUEUE_DEBUG
;
while
(_attempt_to_launch_deferred_jobs()) { 1 }
$_LOCK
--;
return
;
}
sub
_get_deferred_jobs {
my
@deferred_jobs
=
grep
{
defined
(
$_
->{state}) &&
$_
->{state} eq
'DEFERRED'
}
@Forks::Super::ALL_JOBS
;
@deferred_jobs
=
sort
{
(
$b
->{queue_priority}||0) <=> (
$a
->{queue_priority}||0)
}
@deferred_jobs
;
return
@deferred_jobs
;
}
sub
_attempt_to_launch_deferred_jobs {
$Forks::Super::Sigchld::REAP
= 0;
foreach
my
$job
(_get_deferred_jobs()) {
if
(
$job
->can_launch) {
if
(
$job
->{debug}) {
debug(
"Launching deferred job $job->{pid}"
)
}
$job
->{state} =
'LAUNCHING'
;
if
(_check_for_reap()) {
$job
->{state} =
'DEFERRED'
;
return
1;
}
my
$pid
=
$job
->launch();
if
(
$pid
== 0) {
if
(
defined
(
$job
->{
sub
}) ||
defined
(
$job
->{cmd})
||
defined
(
$job
->{
exec
})) {
$_LOCK
--;
croak
'Forks::Super::run_queue(): '
,
'fork on deferred job unexpectedly returned '
,
"a process id of 0!\n"
;
}
$_LOCK
--;
croak
'Forks::Super::run_queue(): '
,
'deferred job must have a '
,
"'sub', 'cmd', or 'exec' option!\n"
;
}
return
1;
}
elsif
(
$job
->{debug}) {
debug(
'Still must wait to launch job '
,
$job
->toShortString());
}
}
return
0;
}
sub
suspend_resume_jobs {
my
@jobs
=
grep
{
defined
(
$_
->{suspend}) &&
(
$_
->{state} eq
'ACTIVE'
||
$_
->{state} eq
'SUSPENDED'
)
}
@Forks::Super::ALL_JOBS
;
return
if
@jobs
<= 0;
if
(
$_LOCK
++ > 0) {
$_LOCK
--;
return
;
}
debug(
'suspend_resume_jobs(): examining jobs'
)
if
$DEBUG
||
$QUEUE_DEBUG
;
foreach
my
$job
(
@jobs
) {
no
strict
'refs'
;
my
$job_is_suspended
=
$job
->{state} =~ /SUSPEND/;
my
$action
=
$job
->{suspend}->();
if
(
$action
< 0 && !
$job_is_suspended
) {
$job
->suspend;
debug(
"suspend_resume_jobs: suspend callback value $action for "
,
'job '
,
$job
->{pid},
' ... suspending'
)
if
$job
->{debug};
}
elsif
(
$action
> 0 &&
$job_is_suspended
) {
$job
->resume;
debug(
"suspend_resume_jobs: suspend callback value $action for "
,
'job '
,
$job
->{pid},
' ... resuming'
)
if
$job
->{debug};
}
}
$_LOCK
--;
return
;
}
sub
check_queue {
run_queue()
if
!
$_LOCK
;
suspend_resume_jobs()
if
!
$_LOCK
;
return
;
}
sub
Forks::Super::Deferred::QueueMonitorFreq::TIESCALAR {
my
(
$class
,
$value
) =
@_
;
$value
=
int
$value
;
if
(
$value
== 0) {
$value
= 1;
}
elsif
(
$value
< 0) {
$value
=
&DEFAULT_QUEUE_MONITOR_FREQ
;
}
debug(
'new F::S::D::QueueMonitorFreq obj'
)
if
$QUEUE_DEBUG
;
return
bless
\
$value
,
$class
;
}
sub
Forks::Super::Deferred::QueueMonitorFreq::FETCH {
my
$self
=
shift
;
debug(
"F::S::D::QueueMonitorFreq::FETCH: $$self"
)
if
$QUEUE_DEBUG
;
return
$$self
;
}
sub
Forks::Super::Deferred::QueueMonitorFreq::STORE {
my
(
$self
,
$new_value
) =
@_
;
$new_value
=
int
(
$new_value
) || 1;
if
(
$new_value
< 0) {
$new_value
=
&DEFAULT_QUEUE_MONITOR_FREQ
;
}
if
(
$new_value
==
$$self
) {
debug(
"F::S::D::QueueMonitorFreq::STORE noop $$self"
)
if
$QUEUE_DEBUG
;
return
$$self
;
}
if
(
$QUEUE_DEBUG
) {
debug(
"F::S::D::QueueMonitorFreq::STORE $$self <== $new_value"
);
}
$$self
=
$new_value
;
_kill_queue_monitor();
check_queue();
if
(
@QUEUE
> 0) {
_launch_queue_monitor();
}
return
;
}
sub
Forks::Super::Deferred::InhibitQueueMonitor::TIESCALAR {
my
(
$class
,
$value
) =
@_
;
$value
= 0+!!
$value
;
return
bless
\
$value
,
$class
;
}
sub
Forks::Super::Deferred::InhibitQueueMonitor::FETCH {
my
$self
=
shift
;
return
$$self
;
}
sub
Forks::Super::Deferred::InhibitQueueMonitor::STORE {
my
(
$self
,
$new_value
) =
@_
;
$new_value
= 0+!!
$new_value
;
if
(
$$self
!=
$new_value
) {
if
(
$new_value
) {
_kill_queue_monitor();
}
else
{
queue_job();
}
}
$$self
=
$new_value
;
return
$$self
;
}
{
no
warnings
'once'
;
*Forks::Super::Deferred::QueueInterrupt::TIESCALAR
= \
&Forks::Super::Tie::Enum::TIESCALAR
;
*Forks::Super::Deferred::QueueInterrupt::FETCH
= \
&Forks::Super::Tie::Enum::FETCH
;
}
sub
Forks::Super::Deferred::QueueInterrupt::STORE {
my
(
$self
,
$new_value
) =
@_
;
if
(
uc
$new_value
eq
uc
Forks::Super::Tie::Enum::get_value(
$self
)) {
return
;
}
if
(!Forks::Super::Tie::Enum::has_attr(
$self
,
$new_value
)) {
return
;
}
_kill_queue_monitor();
$Forks::Super::Tie::Enum::VALUE
{
$self
} =
$new_value
;
if
(
@QUEUE
> 0) {
_launch_queue_monitor();
}
return
;
}
1;