our
@PFSTATE_VARS
=
qw(
PFSTATE_ERROR PFSTATE_STARTING PFSTATE_IDLE PFSTATE_BUSY PFSTATE_KILLED
PFORDER_ACCEPT PFSTATE_GOT_SIGCHLD
)
;
our
%EXPORT_TAGS
= (
'pfstates'
=> [
@PFSTATE_VARS
]
);
our
@EXPORT_OK
= (
@PFSTATE_VARS
);
use
constant
SUPPORT_TEST_INSTRUMENTATION
=> 0;
our
$TEST_MODE_CAUSE_RANDOM_KID_FAILURES
= 0;
our
$TEST_MODE_CAUSE_RANDOM_WRITE_RETRIES
= 0;
our
$TEST_MODE_CAUSE_RANDOM_PING_FAILURES
= 0;
use
constant
PF_ACCEPT_ORDER
=>
"A....\n"
;
use
constant
PF_PING_ORDER
=>
"P....\n"
;
sub
new {
my
$class
=
shift
;
$class
=
ref
(
$class
) ||
$class
;
my
$self
=
shift
;
if
(!
defined
$self
) {
$self
= { }; }
bless
(
$self
,
$class
);
$self
->{kids} = { };
$self
->{overloaded} = 0;
$self
->{min_children} ||= 1;
$self
->{server_last_ping} =
time
;
$self
;
}
sub
add_child {
my
(
$self
,
$pid
) =
@_
;
$self
->set_child_state (
$pid
, PFSTATE_STARTING);
}
sub
child_exited {
my
(
$self
,
$pid
) =
@_
;
dbg(
"prefork: child $pid: just exited"
);
$self
->set_child_state (
$pid
, PFSTATE_GOT_SIGCHLD);
$self
->{child_just_exited} = 1;
}
sub
post_sigchld_cleanup {
my
(
$self
) =
@_
;
my
@pids
=
grep
{
$self
->{kids}->{
$_
} == PFSTATE_GOT_SIGCHLD }
keys
%{
$self
->{kids}};
return
unless
@pids
;
foreach
my
$pid
(
@pids
) {
delete
$self
->{kids}->{
$pid
};
$self
->{backchannel}->delete_socket_for_child(
$pid
);
}
$self
->compute_lowest_child_pid();
}
sub
set_exiting_flag {
my
(
$self
) =
@_
;
$self
->{am_exiting} = 1;
}
sub
child_error_kill {
my
(
$self
,
$pid
,
$sock
) =
@_
;
warn
"prefork: killing failed child $pid fd="
.
((
defined
$sock
&&
defined
$sock
->
fileno
) ?
$sock
->
fileno
:
"undefined"
);
$self
->set_child_state (
$pid
, PFSTATE_KILLED);
kill
'INT'
=>
$pid
or
warn
"prefork: kill of failed child $pid failed: $!\n"
;
$self
->{backchannel}->delete_socket_for_child(
$pid
);
if
(
defined
$sock
&&
defined
$sock
->
fileno
()) {
$self
->{backchannel}->remove_from_selector(
$sock
);
}
if
(
$sock
) {
$sock
->
close
or info(
"prefork: error closing socket: $!"
);
}
delete
$self
->{kids}->{
$pid
};
$self
->compute_lowest_child_pid();
warn
"prefork: killed child $pid\n"
;
}
sub
set_child_state {
my
(
$self
,
$pid
,
$state
) =
@_
;
if
(
$state
== PFSTATE_STARTING ||
$state
== PFSTATE_KILLED ||
$state
== PFSTATE_GOT_SIGCHLD ||
exists
$self
->{kids}->{
$pid
})
{
$self
->{kids}->{
$pid
} =
$state
;
dbg(
"prefork: child $pid: entering state $state"
);
$self
->compute_lowest_child_pid();
}
else
{
dbg(
"prefork: child $pid: ignored new state $state, already exited?"
);
}
}
sub
compute_lowest_child_pid {
my
(
$self
) =
@_
;
my
@pids
=
grep
{
$self
->{kids}->{
$_
} == PFSTATE_IDLE }
keys
%{
$self
->{kids}};
my
$l
=
shift
@pids
;
foreach
my
$p
(
@pids
) {
if
(
$l
>
$p
) {
$l
=
$p
};
}
$self
->{lowest_idle_pid} =
$l
;
dbg(
"prefork: new lowest idle kid: "
.
(
$self
->{lowest_idle_pid} ?
$self
->{lowest_idle_pid} :
'none'
));
}
sub
set_server_fh {
my
(
$self
,
@fhs
) =
@_
;
$self
->{server_fh} = [];
$self
->{server_fileno} = [];
foreach
my
$fh
(
@fhs
) {
next
unless
defined
$fh
;
push
@{
$self
->{server_fh}},
$fh
;
push
@{
$self
->{server_fileno}},
$fh
->
fileno
();
}
}
sub
main_server_poll {
my
(
$self
,
$tout
) =
@_
;
my
$rin
= ${
$self
->{backchannel}->{selector}};
if
(
$self
->{overloaded}) {
$self
->vec_all(\
$rin
,
$self
->{server_fileno}, 0);
}
$self
->post_sigchld_cleanup();
my
(
$rout
,
$eout
,
$nfound
,
$timeleft
,
$selerr
);
my
$timer
= Mail::SpamAssassin::Timeout->new({
secs
=> (
$tout
*2) + 60 });
$timer
->run(
sub
{
$self
->{child_just_exited} = 0;
(
$nfound
,
$timeleft
) =
select
(
$rout
=
$rin
,
undef
,
$eout
=
$rin
,
$tout
);
$selerr
= $!
if
!
defined
$nfound
||
$nfound
< 0;
});
$self
->post_sigchld_cleanup();
if
(
$timer
->timed_out) {
dbg(
"prefork: select timed out (via alarm)"
);
$nfound
= 0;
$timeleft
= 0;
}
if
(!
defined
$nfound
||
$nfound
< 0)
{
if
(
exists
&Errno::EINTR
&&
$selerr
==
&Errno::EINTR
)
{
dbg(
"prefork: select returned err $selerr, probably signalled"
);
return
;
}
if
(
$self
->{child_just_exited} &&
$nfound
== -1) {
dbg(
"prefork: select returned -1 due to child exiting, ignored ($selerr)"
);
return
;
}
warn
"prefork: select returned "
.
(
defined
$nfound
?
$nfound
:
"undef"
).
"! recovering: $selerr\n"
;
sleep
1;
return
;
}
if
(
$self
->vec_all(\
$eout
,
$self
->{server_fileno})) {
warn
"prefork: select returned error on server filehandle: $selerr $!\n"
;
return
;
}
if
(!
$nfound
) {
my
$now
=
time
;
if
(
$now
-
$self
->{server_last_ping} > TOUT_PING_INTERVAL) {
$self
->main_ping_kids(
$now
);
}
return
;
}
if
(
$self
->vec_all(\
$rout
,
$self
->{server_fileno})) {
if
(!
$self
->order_idle_child_to_accept()) {
$self
->{overloaded} = 1;
}
return
;
}
foreach
my
$fh
(
$self
->{backchannel}->select_vec_to_fh_list(
$rout
))
{
if
(
$self
->read_one_message_from_child_socket(
$fh
) == PFSTATE_IDLE)
{
dbg(
"prefork: child reports idle"
);
if
(
$self
->{overloaded}) {
dbg(
"prefork: overloaded, immediately telling kid to accept"
);
if
(!
$self
->order_idle_child_to_accept()) {
warn
"prefork: lost idle kids, so still overloaded"
;
$self
->{overloaded} = 1;
}
else
{
dbg(
"prefork: no longer overloaded"
);
$self
->{overloaded} = 0;
}
}
}
}
$self
->adapt_num_children();
}
sub
main_ping_kids {
my
(
$self
,
$now
) =
@_
;
$self
->{server_last_ping} =
$now
;
keys
%{
$self
->{backchannel}->{kids}};
my
(
$sock
,
$kid
);
while
((
$kid
,
$sock
) =
each
%{
$self
->{backchannel}->{kids}}) {
if
(SUPPORT_TEST_INSTRUMENTATION &&
$TEST_MODE_CAUSE_RANDOM_PING_FAILURES
&&
rand
$TEST_MODE_CAUSE_RANDOM_PING_FAILURES
< 1)
{
warn
"prefork: TEST_MODE_CAUSE_RANDOM_PING_FAILURES simulating ping failure"
;
}
elsif
(
defined
$sock
&&
defined
$sock
->
fileno
) {
$self
->syswrite_with_retry(
$sock
, PF_PING_ORDER,
$kid
, 3) and
next
;
warn
"prefork: write of ping failed to $kid fd="
.
$sock
->
fileno
.
": "
.$!;
}
else
{
warn
"prefork: cannot ping $kid, file handle not defined, child likely "
.
"to still be processing SIGCHLD handler after killing itself\n"
;
}
$self
->child_error_kill(
$kid
,
$sock
);
}
}
sub
read_one_message_from_child_socket {
my
(
$self
,
$sock
) =
@_
;
my
$line
;
my
$nbytes
=
$self
->sysread_with_timeout(
$sock
, \
$line
, 6, TOUT_READ_MAX);
if
(!
defined
$nbytes
||
$nbytes
== 0) {
dbg(
"prefork: child closed connection"
);
my
$fno
=
$sock
->
fileno
;
if
(
defined
$fno
) {
$self
->{backchannel}->remove_from_selector(
$sock
);
$sock
->
close
or info(
"prefork: error closing socket: $!"
);
}
return
PFSTATE_ERROR;
}
if
(
$nbytes
< 6) {
warn
(
"prefork: child gave short message: len=$nbytes bytes="
.
join
(
" "
,
unpack
"C*"
,
$line
));
}
chomp
$line
;
if
(
$line
=~ s/^I//) {
my
$pid
=
unpack
(
"l1"
,
$line
);
$self
->set_child_state (
$pid
, PFSTATE_IDLE);
return
PFSTATE_IDLE;
}
elsif
(
$line
=~ s/^B//) {
my
$pid
=
unpack
(
"l1"
,
$line
);
$self
->set_child_state (
$pid
, PFSTATE_BUSY);
return
PFSTATE_BUSY;
}
else
{
die
"prefork: unknown message from child: '$line'"
;
return
PFSTATE_ERROR;
}
}
sub
order_idle_child_to_accept {
my
(
$self
) =
@_
;
my
$kid
=
$self
->{lowest_idle_pid};
if
(
defined
$kid
)
{
my
$sock
=
$self
->{backchannel}->get_socket_for_child(
$kid
);
if
(SUPPORT_TEST_INSTRUMENTATION &&
$TEST_MODE_CAUSE_RANDOM_KID_FAILURES
) {
if
(
rand
$TEST_MODE_CAUSE_RANDOM_KID_FAILURES
< 1) {
$sock
=
undef
;
warn
"prefork: TEST_MODE_CAUSE_RANDOM_KID_FAILURES simulating no socket for kid $kid"
;
}
}
if
(!
$sock
)
{
warn
"prefork: oops! no socket for child $kid, killing"
;
$self
->child_error_kill(
$kid
,
$sock
);
return
$self
->order_idle_child_to_accept();
}
if
(!
$self
->syswrite_with_retry(
$sock
, PF_ACCEPT_ORDER,
$kid
))
{
warn
"prefork: killing rogue child $kid, failed to write on fd "
.
$sock
->
fileno
.
": $!\n"
;
$self
->child_error_kill(
$kid
,
$sock
);
return
$self
->order_idle_child_to_accept();
}
dbg(
"prefork: ordered $kid to accept"
);
my
$ret
=
$self
->wait_for_child_to_accept(
$kid
,
$sock
);
if
(
$ret
) {
return
$ret
;
}
else
{
return
$self
->order_idle_child_to_accept();
}
}
else
{
dbg(
"prefork: no spare children to accept, waiting for one to complete"
);
return
;
}
}
sub
wait_for_child_to_accept {
my
(
$self
,
$kid
,
$sock
) =
@_
;
while
(1) {
my
$state
=
$self
->read_one_message_from_child_socket(
$sock
);
if
(
$state
== PFSTATE_BUSY) {
return
1;
}
if
(
$state
== PFSTATE_ERROR) {
return
;
}
else
{
if
( Scalar::Util::blessed(
$self
->{server_fh}[0]) eq
'IO::Socket::SSL'
) {
warn
"prefork: SSL connection protocol error"
;
}
warn
"prefork: ordered child $kid to accept, but they reported state '$state', killing rogue"
;
$self
->child_error_kill(
$kid
,
$sock
);
$self
->adapt_num_children();
sleep
1;
return
;
}
}
}
sub
child_now_ready_to_accept {
my
(
$self
,
$kid
) =
@_
;
if
(
$self
->{waiting_for_idle_child}) {
my
$sock
=
$self
->{backchannel}->get_socket_for_child(
$kid
);
$self
->syswrite_with_retry(
$sock
, PF_ACCEPT_ORDER,
$kid
)
or
die
"prefork: $kid claimed it was ready, but write failed on fd "
.
$sock
->
fileno
.
": "
.$!;
$self
->{waiting_for_idle_child} = 0;
}
}
sub
set_my_pid {
my
(
$self
,
$pid
) =
@_
;
$self
->{pid} =
$pid
;
}
sub
update_child_status_idle {
my
(
$self
) =
@_
;
$self
->report_backchannel_socket(
"I"
.
pack
(
"l"
,
$self
->{pid}).
"\n"
);
}
sub
update_child_status_busy {
my
(
$self
) =
@_
;
$self
->report_backchannel_socket(
"B"
.
pack
(
"l"
,
$self
->{pid}).
"\n"
);
}
sub
report_backchannel_socket {
my
(
$self
,
$str
) =
@_
;
my
$sock
=
$self
->{backchannel}->get_parent_socket();
$self
->syswrite_with_retry(
$sock
,
$str
,
'parent'
)
or
die
"syswrite() to parent failed: $!"
;
}
sub
wait_for_orders {
my
(
$self
) =
@_
;
my
$sock
=
$self
->{backchannel}->get_parent_socket();
while
(1) {
my
$line
;
my
$nbytes
=
$self
->sysread_with_timeout(
$sock
, \
$line
, 6, TOUT_READ_MAX);
if
(!
defined
$nbytes
||
$nbytes
== 0) {
if
(
$sock
->
eof
()) {
dbg(
"prefork: parent closed, exiting"
);
exit
;
}
die
"prefork: empty order from parent"
;
}
if
(
$nbytes
< 6) {
warn
(
"prefork: parent gave short message: len=$nbytes bytes="
.
join
(
" "
,
unpack
"C*"
,
$line
));
}
chomp
$line
;
if
(
index
(
$line
,
"P"
) == 0) {
dbg(
"prefork: periodic ping from spamd parent"
);
if
(am_running_on_windows()) {
sleep
2;
}
next
;
}
if
(
index
(
$line
,
"A"
) == 0) {
return
PFORDER_ACCEPT;
}
else
{
die
"prefork: unknown order from parent: '$line'"
;
}
}
}
sub
sysread_with_timeout {
my
(
$self
,
$sock
,
$lineref
,
$toread
,
$timeout
) =
@_
;
$$lineref
=
''
;
my
$readsofar
= 0;
my
$deadline
;
my
$buf
;
retry_read:
my
$nbytes
=
$sock
->
sysread
(
$buf
,
$toread
);
if
(!
defined
$nbytes
) {
unless
((
exists
&Errno::EAGAIN
&& $! ==
&Errno::EAGAIN
)
|| (
exists
&Errno::EWOULDBLOCK
&& $! ==
&Errno::EWOULDBLOCK
))
{
return
;
}
my
$now
=
time
();
my
$tout
=
$timeout
;
if
(!
defined
$deadline
) {
$deadline
=
$now
+
$timeout
;
}
elsif
(
$now
>
$deadline
) {
dbg(
"prefork: sysread(%d) failed after %.1f secs"
,
$sock
->
fileno
,
$timeout
);
return
;
}
else
{
$tout
=
$deadline
-
$now
;
$tout
= 1
if
(
$tout
<= 0);
}
dbg(
"prefork: sysread(%d) not ready, wait max %.1f secs"
,
$sock
->
fileno
,
$tout
);
my
$rin
=
''
;
vec
(
$rin
,
$sock
->
fileno
, 1) = 1;
my
$nfound
=
select
(
$rin
,
undef
,
undef
,
$tout
);
defined
$nfound
&&
$nfound
>= 0
or info(
"prefork: sysread_with_timeout select error: %s"
, $!);
goto
retry_read;
}
elsif
(
$nbytes
== 0) {
return
$readsofar
;
}
elsif
(
$nbytes
==
$toread
) {
$readsofar
+=
$nbytes
;
$$lineref
.=
$buf
;
return
$readsofar
;
}
else
{
warn
"prefork: partial read of $nbytes, toread="
.
$toread
.
"sofar="
.
$readsofar
.
" fd="
.
$sock
->
fileno
.
", recovering"
;
$readsofar
+=
$nbytes
;
$$lineref
.=
$buf
;
$toread
-=
$nbytes
;
goto
retry_read;
}
die
"assert: should not get here"
;
}
sub
syswrite_with_retry {
my
(
$self
,
$sock
,
$buf
,
$targetname
,
$numretries
) =
@_
;
$numretries
||= 10;
my
$written
= 0;
my
$try
= 0;
retry_write:
$try
++;
if
(
$try
> 1) {
warn
"prefork: syswrite("
.
$sock
->
fileno
.
") to $targetname failed on try $try"
;
if
(
$try
>
$numretries
) {
warn
"prefork: giving up"
;
return
;
}
else
{
my
$rout
=
''
;
vec
(
$rout
,
$sock
->
fileno
, 1) = 1;
my
$nfound
=
select
(
undef
,
$rout
,
undef
, 1);
defined
$nfound
&&
$nfound
>= 0
or info(
"prefork: syswrite_with_retry select error: %s"
, $!);
}
}
my
$nbytes
;
if
(SUPPORT_TEST_INSTRUMENTATION &&
$TEST_MODE_CAUSE_RANDOM_WRITE_RETRIES
&&
rand
$TEST_MODE_CAUSE_RANDOM_WRITE_RETRIES
< 1)
{
warn
"prefork: TEST_MODE_CAUSE_RANDOM_WRITE_RETRIES simulating write failure"
;
$nbytes
=
undef
; $! =
&Errno::EAGAIN
;
}
else
{
$nbytes
=
$sock
->
syswrite
(
$buf
);
}
if
(!
defined
$nbytes
) {
unless
((
exists
&Errno::EAGAIN
&& $! ==
&Errno::EAGAIN
)
|| (
exists
&Errno::EWOULDBLOCK
&& $! ==
&Errno::EWOULDBLOCK
))
{
return
;
}
warn
"prefork: retrying syswrite(): $!"
;
goto
retry_write;
}
else
{
$written
+=
$nbytes
;
$buf
=
substr
(
$buf
,
$nbytes
);
if
(
$buf
eq
''
) {
return
$written
;
}
else
{
warn
"prefork: partial write of $nbytes to "
.
$targetname
.
", towrite="
.
length
(
$buf
).
" sofar="
.
$written
.
" fd="
.
$sock
->
fileno
.
", recovering"
;
goto
retry_write;
}
}
die
"assert: should not get here"
;
}
sub
adapt_num_children {
my
(
$self
) =
@_
;
return
if
$self
->{am_exiting};
my
$kids
=
$self
->{kids};
my
$statestr
=
''
;
my
$num_idle
= 0;
my
@pids
=
sort
{
$a
<=>
$b
}
keys
%{
$kids
};
my
$num_servers
=
scalar
@pids
;
foreach
my
$pid
(
@pids
) {
my
$k
=
$kids
->{
$pid
};
next
unless
defined
$k
;
if
(
$k
== PFSTATE_IDLE) {
$statestr
.=
'I'
;
$num_idle
++;
}
elsif
(
$k
== PFSTATE_BUSY) {
$statestr
.=
'B'
;
}
elsif
(
$k
== PFSTATE_KILLED) {
$statestr
.=
'K'
;
}
elsif
(
$k
== PFSTATE_GOT_SIGCHLD) {
$statestr
.=
'Z'
;
}
elsif
(
$k
== PFSTATE_ERROR) {
$statestr
.=
'E'
;
}
elsif
(
$k
== PFSTATE_STARTING) {
$statestr
.=
'S'
;
}
else
{
$statestr
.=
'?'
;
}
}
info(
"prefork: child states: "
.
$statestr
.
"\n"
);
if
(
$num_idle
<
$self
->{min_idle}) {
if
(
$num_servers
<
$self
->{max_children}) {
$self
->need_to_add_server(
$num_idle
);
}
else
{
info(
"prefork: server reached --max-children setting, consider raising it\n"
);
}
}
elsif
(
$num_idle
>
$self
->{max_idle} &&
$num_servers
>
$self
->{min_children}) {
$self
->need_to_del_server(
$num_idle
);
}
}
sub
need_to_add_server {
my
(
$self
,
$num_idle
) =
@_
;
my
(
$pid
);
my
$cur
= ${
$self
->{cur_children_ref}};
$cur
++;
dbg(
"prefork: adjust: increasing, not enough idle children ($num_idle < $self->{min_idle})"
);
$pid
= main::spawn();
info(
"prefork: adjust: %s idle children less than %s minimum idle children. Increasing spamd children: %s started."
,
$num_idle
,
$self
->{min_idle},
$pid
);
}
sub
need_to_del_server {
my
(
$self
,
$num_idle
) =
@_
;
my
$cur
= ${
$self
->{cur_children_ref}};
$cur
--;
my
$pid
;
foreach
my
$k
(
keys
%{
$self
->{kids}}) {
my
$v
=
$self
->{kids}->{
$k
};
if
(
$v
== PFSTATE_IDLE)
{
if
(!
defined
$pid
||
$k
>
$pid
) {
$pid
=
$k
;
}
}
}
if
(!
defined
$pid
) {
die
"prefork: oops! no idle kids in need_to_del_server?"
;
}
$self
->set_child_state (
$pid
, PFSTATE_KILLED);
if
(!am_running_on_windows()) {
kill
'INT'
=>
$pid
;
}
else
{
my
$sock
=
$self
->{backchannel}->get_socket_for_child(
$pid
);
$sock
->
syswrite
(
"P....\n"
);
kill
'INT'
=>
$pid
or
warn
"prefork: kill of child $pid failed: $!\n"
;
$self
->{backchannel}->delete_socket_for_child(
$pid
);
if
(
defined
$sock
&&
defined
$sock
->
fileno
()) {
$self
->{backchannel}->remove_from_selector(
$sock
);
}
$sock
->
close
if
$sock
;
}
dbg(
"prefork: adjust: decreasing, too many idle children ($num_idle > $self->{max_idle}), killed $pid"
);
info(
"prefork: adjust: %s idle children more than %s maximum idle children. Decreasing spamd children: %s killed."
,
$num_idle
,
$self
->{max_idle},
$pid
);
}
sub
vec_all {
my
(
$self
,
$bitsref
,
$fhs
,
$value
) =
@_
;
my
$ret
= 0;
foreach
my
$fh
(@{
$fhs
}) {
next
unless
defined
$fh
;
if
(
defined
$value
) {
vec
(
$$bitsref
,
$fh
, 1) =
$value
;
}
else
{
$ret
|=
vec
(
$$bitsref
,
$fh
, 1);
}
}
return
$ret
;
}
1;