our
$VERSION
=
'1.18'
;
$Data::Dumper::Terse
= 1;
sub
new
{
my
$class
=
shift
;
$class
=
ref
(
$class
) ||
$class
;
my
%opt
=
@_
;
my
$self
= {
PORT
=>
$opt
{
'PORT'
},
PREFORK
=>
$opt
{
'PREFORK'
},
MAXFORK
=>
$opt
{
'MAXFORK'
} || 1024,
NOFORK
=>
$opt
{
'NOFORK'
},
TIMEOUT
=>
$opt
{
'TIMEOUT'
} || 4,
PX_IDLE
=>
$opt
{
'PX_IDLE'
} || 31,
PROP_SIGUSR
=>
$opt
{
'PROP_SIGUSR'
},
DEBUG
=>
$opt
{
'DEBUG'
},
SSL
=>
$opt
{
'SSL'
},
OPT
=> \
%opt
,
};
if
(
$opt
{
'SSL'
} )
{
$self
->{
'SSL_OPTS'
} = {};
while
(
my
(
$k
,
$v
) =
each
%opt
)
{
next
unless
$k
=~ /^SSL_/;
$self
->{
'SSL_OPTS'
}{
$k
} =
$v
;
}
}
my
$pf
=
$self
->{
'PREFORK'
};
my
$mf
=
$self
->{
'MAXFORK'
};
if
(
$pf
< 0 )
{
$self
->{
'PREFORK'
} =
abs
(
$pf
);
$self
->{
'MAXFORK'
} =
abs
(
$pf
)
unless
$mf
> 0;
}
$self
->{
'TIMEOUT'
} = 1
if
$self
->{
'TIMEOUT'
} < 1;
$self
->{
'TIMEOUT'
} = 3600
if
$self
->{
'TIMEOUT'
} > 3600;
$self
->{
'PX_IDLE'
} = 31
if
$self
->{
'PX_IDLE'
} < 1;
bless
$self
,
$class
;
return
$self
;
}
sub
run
{
my
$self
=
shift
;
$self
->{
'PARENT_PID'
} = $$;
if
(
$self
->ssl_in_use() )
{
die
"SSL not available: $@"
if
$@;
};
$SIG
{
'INT'
} =
sub
{
$self
->break_main_loop(); };
$SIG
{
'TERM'
} =
sub
{
$self
->break_main_loop(); };
$SIG
{
'CHLD'
} =
sub
{
$self
->__sig_child(); };
$SIG
{
'HUP'
} =
sub
{
$self
->__sig_hup(); };
$SIG
{
'USR1'
} =
sub
{
$self
->__sig_usr1(); };
$SIG
{
'USR2'
} =
sub
{
$self
->__sig_usr2(); };
$SIG
{
'RTMIN'
} =
sub
{
$self
->__sig_kid_idle() };
$SIG
{
'RTMAX'
} =
sub
{
$self
->__sig_kid_busy() };
srand
();
$self
->on_server_begin();
my
$server_socket
;
my
$sock_pkg
;
my
%sock_opts
;
if
(
$self
->ssl_in_use() )
{
my
%sock_opts
= %{
$self
->{
'SSL_OPTS'
} };
$sock_opts
{
'SSL_error_trap'
} =
sub
{
shift
;
$self
->on_ssl_error(
shift
() ); },
$sock_pkg
=
'IO::Socket::SSL'
;
}
else
{
$sock_pkg
=
'IO::Socket::INET'
;
}
$server_socket
=
$sock_pkg
->new(
Proto
=>
'tcp'
,
LocalPort
=>
$self
->{
'PORT'
},
Listen
=> 128,
ReuseAddr
=> 1,
Blocking
=> 0,
%sock_opts
,
);
if
( !
$server_socket
)
{
return
100;
}
else
{
binmode
(
$server_socket
);
$self
->{
'SERVER_SOCKET'
} =
$server_socket
;
$self
->on_listen_ok();
}
$self
->__reinstall_sha();
while
(4)
{
last
if
$self
->{
'BREAK_MAIN_LOOP'
};
if
(
$self
->{
'PREFORK'
} > 0 )
{
$self
->__run_prefork(
$server_socket
);
sleep
(4);
}
else
{
$self
->__run_forking(
$server_socket
);
}
eval
{
$self
->__sha_lock_rw(
'MASTER STATS UPDATE'
);
$self
->{
'KIDS_BUSY'
} = 0;
for
my
$cpid
(
keys
%{
$self
->{
'SHA'
}{
'PIDS'
} } )
{
next
unless
$cpid
> 0;
if
( !
exists
$self
->{
'KID_PIDS'
}{
$cpid
} )
{
delete
$self
->{
'SHA'
}{
'PIDS'
}{
$cpid
};
}
else
{
my
$v
=
$self
->{
'SHA'
}{
'PIDS'
}{
$cpid
};
my
(
$b
,
$c
) =
split
/:/,
$v
;
$self
->{
'KIDS_BUSY'
}++
if
$b
eq
'*'
;
}
}
$self
->{
'STAT'
}{
'BUSY_COUNT'
} =
$self
->{
'SHA'
}{
'STAT'
}{
'BUSY_COUNT'
};
$self
->__sha_unlock(
'MASTER STATS UPDATE'
);
};
if
( $@ )
{
$self
->
log
(
"error: main loop kids stats management: $@"
);
$self
->
log
(
"status: reinstalling SHA, trying to recover..."
);
eval
{
$self
->__sha_unlock(
'MASTER STATS UPDATE'
);
};
$self
->stop_all_kids(
'TERM'
);
$self
->stop_all_kids(
'KILL'
);
sleep
(1);
$self
->{
'KIDS_BUSY'
} = 0;
$self
->__reinstall_sha();
}
}
$self
->stop_all_kids(
'TERM'
);
tied
( %{
$self
->{
'SHA'
} } )->clean_up();
delete
$self
->{
'SHA'
};
$self
->on_server_close(
$server_socket
);
close
(
$server_socket
);
return
0;
}
sub
__run_forking
{
my
$self
=
shift
;
my
$server_socket
=
shift
;
if
( ! socket_can_read(
$server_socket
,
$self
->{
'TIMEOUT'
} ) )
{
$self
->on_forking_idle();
return
'0E0'
;
}
my
$client_socket
=
$server_socket
->
accept
() or
return
'0E0'
;
if
( !
$client_socket
)
{
$self
->on_accept_error();
return
;
}
binmode
(
$client_socket
);
$self
->{
'CLIENT_SOCKET'
} =
$client_socket
;
my
$peerhost
=
$client_socket
->peerhost();
my
$peerport
=
$client_socket
->peerport();
my
$sockhost
=
$client_socket
->sockhost();
my
$sockport
=
$client_socket
->sockport();
$self
->on_accept_ok(
$client_socket
);
my
$mf
=
$self
->{
'MAXFORK'
};
if
(
$mf
> 0 and
$self
->{
'KIDS'
} >=
$mf
)
{
$self
->on_maxforked(
$client_socket
);
$self
->on_close(
$client_socket
);
$client_socket
->
close
();
return
;
}
my
$pid
;
if
( !
$self
->{
'NOFORK'
} )
{
$pid
=
fork
();
if
( !
defined
$pid
)
{
die
"fatal: fork failed: $!"
;
}
if
(
$pid
)
{
$self
->{
'FORKS'
}++;
$self
->{
'KIDS'
}++;
$self
->{
'KID_PIDS'
}{
$pid
} = 1;
$self
->on_fork_ok(
$pid
);
$client_socket
->
close
();
next
;
}
}
$self
->{
'CHILD'
} = 1;
delete
$self
->{
'SERVER_SOCKET'
};
delete
$self
->{
'KIDS'
};
delete
$self
->{
'KID_PIDS'
};
$SIG
{
'INT'
} =
sub
{
$self
->break_main_loop(); };
$SIG
{
'TERM'
} =
sub
{
$self
->break_main_loop(); };
$SIG
{
'CHLD'
} =
'IGNORE'
;
$SIG
{
'HUP'
} =
sub
{
$self
->__child_sig_hup(); };
$SIG
{
'USR1'
} =
sub
{
$self
->__child_sig_usr1(); };
$SIG
{
'USR2'
} =
sub
{
$self
->__child_sig_usr2(); };
$SIG
{
'RTMIN'
} =
sub
{
$self
->__sig_kid_idle() };
$SIG
{
'RTMAX'
} =
sub
{
$self
->__sig_kid_busy() };
$self
->{
'SHA'
} = new IPC::Shareable
key
=>
$self
->{
'SHA_KEY'
} or
die
"fatal: cannot attach shared memory segment\n"
;
srand
();
$client_socket
->autoflush( 1 );
$self
->on_child_start();
$self
->im_busy();
$self
->on_process(
$client_socket
);
$self
->on_close(
$client_socket
);
$client_socket
->
close
();
$self
->im_idle();
$self
->on_child_exit();
if
( !
$self
->{
'NOFORK'
} )
{
exit
();
}
}
my
$__prefork_next_stat
=
time
() + 4;
sub
__run_prefork
{
my
$self
=
shift
;
my
$server_socket
=
shift
;
my
$prefork_count
=
$self
->{
'PREFORK'
};
my
$kk
=
$self
->{
'KIDS'
};
my
$bk
=
$self
->get_busy_kids_count();
my
$ik
=
$kk
-
$bk
;
my
$tk
=
$prefork_count
;
$tk
=
$kk
+
$prefork_count
if
$ik
<= ( 1 +
$kk
/ 10 );
my
$mf
=
$self
->{
'MAXFORK'
};
$tk
=
$mf
if
$mf
> 0 and
$tk
>
$mf
;
if
(
time
() >
$__prefork_next_stat
)
{
$self
->{
'STAT'
}{
'IDLE_FREQ'
}{
int
(
$ik
/ 5 ) * 5 }++
if
$bk
> 0;
$self
->__prefork_print_stat()
if
$self
->{
'DEBUG'
};
$__prefork_next_stat
=
time
() + 4;
}
my
$tbk
=
$self
->{
'STAT'
}{
'BUSY_COUNT'
};
$self
->log_debug(
"debug: kids: $kk busy: $bk idle: $ik to_fork: $tk will_fork?: $kk < $tk total busy count: $tbk"
);
while
(
$self
->{
'KIDS'
} <
$tk
)
{
my
$pid
;
$pid
=
fork
();
if
( !
defined
$pid
)
{
die
"fatal: fork failed: $!"
;
}
if
(
$pid
)
{
$self
->{
'FORKS'
}++;
$self
->{
'KIDS'
}++;
$self
->{
'KID_PIDS'
}{
$pid
} = 1;
$self
->on_fork_ok(
$pid
);
$self
->{
'STAT'
}{
'SPAWNS'
}++;
}
else
{
$self
->{
'CHILD'
} = 1;
$self
->{
'SPTIME'
} =
time
();
delete
$self
->{
'SERVER_SOCKET'
};
delete
$self
->{
'KIDS'
};
delete
$self
->{
'KID_PIDS'
};
$SIG
{
'INT'
} =
sub
{
$self
->break_main_loop(); };
$SIG
{
'TERM'
} =
sub
{
$self
->break_main_loop(); };
$SIG
{
'CHLD'
} =
'IGNORE'
;
$SIG
{
'HUP'
} =
sub
{
$self
->__child_sig_hup(); };
$SIG
{
'USR1'
} =
sub
{
$self
->__child_sig_usr1(); };
$SIG
{
'USR2'
} =
sub
{
$self
->__child_sig_usr2(); };
$SIG
{
'RTMIN'
} =
sub
{
$self
->__sig_kid_idle() };
$SIG
{
'RTMAX'
} =
sub
{
$self
->__sig_kid_busy() };
$self
->{
'SHA'
} = new IPC::Shareable
key
=>
$self
->{
'SHA_KEY'
} or
die
"fatal: cannot attach shared memory segment\n"
;
srand
();
$self
->on_child_start();
$self
->im_idle();
my
$kid_idle
;
while
(4)
{
last
if
$self
->{
'BREAK_MAIN_LOOP'
};
last
unless
$self
->__run_preforked_child(
$server_socket
);
$kid_idle
=
$self
->{
'LPTIME'
} > 0 ?
time
() -
$self
->{
'LPTIME'
} : - (
time
() -
$self
->{
'SPTIME'
} );
last
if
$self
->{
'LPTIME'
} > 0 and
$kid_idle
>
$self
->{
'PX_IDLE'
};
my
$tt
= $0;
$tt
=~ s/ \[-?\d+\]//;
$0 =
$tt
.
" [$kid_idle]"
;
}
$self
->on_child_exit();
exit
;
}
}
}
sub
__run_preforked_child
{
my
$self
=
shift
;
my
$server_socket
=
shift
;
if
( ! socket_can_read(
$server_socket
,
$self
->{
'TIMEOUT'
} ) )
{
$self
->on_prefork_child_idle();
return
'0E0'
;
}
my
$client_socket
=
$server_socket
->
accept
() or
return
'0E0'
;
binmode
(
$client_socket
);
$self
->{
'CLIENT_SOCKET'
} =
$client_socket
;
my
$peerhost
=
$client_socket
->peerhost();
my
$peerport
=
$client_socket
->peerport();
my
$sockhost
=
$client_socket
->sockhost();
my
$sockport
=
$client_socket
->sockport();
$self
->on_accept_ok(
$client_socket
);
$self
->{
'BUSY_COUNT'
}++;
$self
->im_busy();
$client_socket
->autoflush( 1 );
my
$res
=
$self
->on_process(
$client_socket
);
$self
->on_close(
$client_socket
);
$client_socket
->
close
();
$self
->im_idle();
$self
->{
'LPTIME'
} =
time
();
return
$res
;
}
sub
__prefork_print_stat
{
my
$self
=
shift
;
eval
{
$self
->__sha_lock_ro(
'MASTER SHARED STATE'
);
$self
->log_debug(
"debug: shared memory state:\n"
. Dumper(
$self
->{
'SHA'
} ) );
$self
->__sha_unlock(
'MASTER SHARED STATE'
);
};
if
( $@ )
{
$self
->log_debug(
"debug: stats unavailable"
);
return
;
}
$self
->log_debug(
"debug: stats:\n"
. Dumper(
$self
->{
'STAT'
} ) );
my
$_c
= 10;
for
my
$k
(
sort
{
$self
->{
'STAT'
}{
'IDLE_FREQ'
}{
$b
} <=>
$self
->{
'STAT'
}{
'IDLE_FREQ'
}{
$a
} }
keys
%{
$self
->{
'STAT'
}{
'IDLE_FREQ'
} } )
{
my
$v
=
$self
->{
'STAT'
}{
'IDLE_FREQ'
}{
$k
};
$self
->log_debug(
sprintf
(
"debug: %3d idle(s) => %3d time(s)"
,
$k
,
$v
) );
last
unless
$_c
--;
}
}
sub
stop_all_kids
{
my
$self
=
shift
;
my
$sig
=
shift
||
'TERM'
;
my
$wait_time
=
time
();
while
(
$self
->{
'KIDS'
} and (
time
() -
$wait_time
< 8 ) )
{
$self
->
log
(
"status: waiting current kids, sending $sig: "
.
$self
->{
'KIDS'
} );
$self
->propagate_signal(
$sig
);
sleep
(1);
}
return
1;
}
sub
__reinstall_sha
{
my
$self
=
shift
;
tied
( %{
$self
->{
'SHA'
} } )->clean_up()
if
$self
->{
'SHA'
};
my
$tm
=
time
();
my
$sha_key
=
$self
->{
'SHA_KEY'
} =
"$0.$$.$tm"
;
$self
->{
'SHA'
} = new IPC::Shareable
key
=>
$sha_key
,
size
=> 256*1024,
mode
=> 0600,
create
=> 1 or
die
"fatal: cannot create shared memory segment\n"
;
$self
->{
'SHA'
}{
'PIDS'
} = {};
$self
->{
'SHA'
}{
'STAT'
} = {};
return
1;
}
sub
__sha_lock_ro
{
my
$self
=
shift
;
return
$self
->__sha_obtain_lock( IPC::Shareable::LOCK_SH,
'SH'
);
}
sub
__sha_lock_rw
{
my
$self
=
shift
;
return
$self
->__sha_obtain_lock( IPC::Shareable::LOCK_EX,
'EX'
);
}
sub
__sha_unlock
{
my
$self
=
shift
;
return
$self
->__sha_obtain_lock( IPC::Shareable::LOCK_UN,
'UN'
);
}
sub
__sha_obtain_lock
{
my
$self
=
shift
;
my
$op
=
shift
;
my
$str
=
shift
;
my
$limit
= 4;
my
$rc
;
while
( !
$rc
)
{
$rc
=
tied
( %{
$self
->{
'SHA'
} } )->
lock
(
$op
);
return
$rc
if
$rc
;
next
if
$!{EINTR} or $!{EAGAIN};
$self
->
log
(
"error: cannot obtain $str lock for SHA! [$rc] $! retry in 1 second"
);
last
if
$self
->{
'BREAK_MAIN_LOOP'
};
last
unless
$limit
--;
sleep
(1);
}
$self
->
log
(
"error: cannot obtain $str lock for SHA! $!"
);
my
$ppid
=
$self
->get_parent_pid();
die
"error: [$ppid/$$] cannot obtain $str lock for SHA! $!, will exit\n"
;
return
undef
;
}
sub
get_server_socket
{
my
$self
=
shift
;
return
exists
$self
->{
'SERVER_SOCKET'
} ?
$self
->{
'SERVER_SOCKET'
} :
undef
;
}
sub
get_client_socket
{
my
$self
=
shift
;
return
exists
$self
->{
'CLIENT_SOCKET'
} ?
$self
->{
'CLIENT_SOCKET'
} :
undef
;
}
sub
get_busy_kids_count
{
my
$self
=
shift
;
return
wantarray
? (
$self
->{
'KIDS_BUSY'
},
$self
->{
'KIDS'
} ) :
$self
->{
'KIDS_BUSY'
};
}
sub
get_kids_count
{
my
$self
=
shift
;
return
$self
->{
'KIDS'
};
}
sub
get_parent_pid
{
my
$self
=
shift
;
return
$self
->{
'PARENT_PID'
};
}
sub
get_kid_pids
{
my
$self
=
shift
;
return
()
if
$self
->is_child();
return
keys
%{
$self
->{
'KID_PIDS'
} || {} };
}
sub
im_busy
{
my
$self
=
shift
;
return
$self
->__im_in_state(
'*'
);
}
sub
im_idle
{
my
$self
=
shift
;
return
$self
->__im_in_state(
'-'
);
}
sub
__im_in_state
{
my
$self
=
shift
;
my
$state
=
uc
shift
;
my
$ppid
=
$self
->get_parent_pid();
return
0
if
$ppid
== $$;
my
$set_state
=
$state
.
":"
.
$self
->{
'BUSY_COUNT'
};
$self
->__sha_lock_rw(
'KID STATE'
);
$self
->{
'SHA'
}{
'PIDS'
}{ $$ } =
$set_state
;
$self
->{
'SHA'
}{
'STAT'
}{
'BUSY_COUNT'
}++
if
$state
eq
'*'
;
$self
->__sha_unlock(
'KID STATE'
);
my
$tt
= $0;
$tt
=~ s/ \| .+//;
$0 =
$tt
.
' | '
.
$set_state
;
return
kill
(
'RTMIN'
,
$ppid
)
if
$state
eq
'-'
;
return
kill
(
'RTMAX'
,
$ppid
)
if
$state
eq
'*'
;
return
0;
}
sub
break_main_loop
{
my
$self
=
shift
;
$self
->{
'BREAK_MAIN_LOOP'
} = 1;
}
sub
ssl_in_use
{
my
$self
=
shift
;
return
$self
->{
'SSL'
};
}
sub
is_child
{
my
$self
=
shift
;
return
$self
->{
'CHILD'
};
}
sub
propagate_signal
{
my
$self
=
shift
;
my
$sig
=
shift
;
for
my
$kpid
(
$self
->get_kid_pids() )
{
kill
(
$sig
,
$kpid
);
}
}
sub
__sig_child
{
my
$self
=
shift
;
while
( (
my
$cpid
=
waitpid
( -1, WNOHANG ) ) > 0 )
{
$self
->{
'KIDS'
}--;
delete
$self
->{
'KID_PIDS'
}{
$cpid
};
$self
->on_sig_child(
$cpid
);
}
$SIG
{
'CHLD'
} =
sub
{
$self
->__sig_child(); };
}
sub
__sig_hup
{
my
$self
=
shift
;
$self
->on_sig_hup();
$SIG
{
'HUP '
} =
sub
{
$self
->__sig_hup(); };
}
sub
__sig_usr1
{
my
$self
=
shift
;
$self
->on_sig_usr1();
$self
->propagate_signal(
'USR1'
)
if
$self
->{
'PROP_SIGUSR'
};
$SIG
{
'USR1'
} =
sub
{
$self
->__sig_usr1(); };
}
sub
__sig_usr2
{
my
$self
=
shift
;
$self
->on_sig_usr2();
$self
->propagate_signal(
'USR2'
)
if
$self
->{
'PROP_SIGUSR'
};
$SIG
{
'USR2'
} =
sub
{
$self
->__sig_usr2(); };
}
sub
__child_sig_hup
{
my
$self
=
shift
;
$self
->on_child_sig_hup();
$SIG
{
'HUP'
} =
sub
{
$self
->__child_sig_hup(); };
}
sub
__child_sig_usr1
{
my
$self
=
shift
;
$self
->on_child_sig_usr1();
$SIG
{
'USR1'
} =
sub
{
$self
->__child_sig_usr1(); };
}
sub
__child_sig_usr2
{
my
$self
=
shift
;
$self
->on_child_sig_usr2();
$SIG
{
'USR2'
} =
sub
{
$self
->__child_sig_usr2(); };
}
sub
__sig_kid_idle
{
my
$self
=
shift
;
$SIG
{
'RTMIN'
} =
sub
{
$self
->__sig_kid_idle() };
}
sub
__sig_kid_busy
{
my
$self
=
shift
;
$SIG
{
'RTMAX'
} =
sub
{
$self
->__sig_kid_busy() };
}
sub
on_server_begin
{
}
sub
on_listen_ok
{
}
sub
on_accept_error
{
}
sub
on_accept_ok
{
}
sub
on_fork_ok
{
}
sub
on_process
{
}
sub
on_prefork_child_idle
{
}
sub
on_forking_idle
{
}
sub
on_maxforked
{
}
sub
on_child_start
{
}
sub
on_child_exit
{
}
sub
on_close
{
}
sub
on_server_close
{
}
sub
on_ssl_error
{
}
sub
on_sig_child
{
}
sub
on_sig_hup
{
}
sub
on_sig_usr1
{
}
sub
on_sig_usr2
{
}
sub
on_child_sig_hup
{
}
sub
on_child_sig_usr1
{
}
sub
on_child_sig_usr2
{
}
sub
socket_can_write
{
my
$sock
=
shift
;
my
$timeout
=
shift
;
my
$win
;
vec
(
$win
,
fileno
(
$sock
), 1 ) = 1;
return
select
(
undef
,
$win
,
undef
,
$timeout
) > 0;
}
sub
socket_can_read
{
my
$sock
=
shift
;
my
$timeout
=
shift
;
my
$rin
;
vec
(
$rin
,
fileno
(
$sock
), 1 ) = 1;
return
select
(
$rin
,
undef
,
undef
,
$timeout
) > 0;
}
sub
log
{
my
$self
=
shift
;
print
STDERR
"$_\n"
for
@_
;
}
sub
log_debug
{
my
$self
=
shift
;
return
unless
$self
->{
'DEBUG'
};
return
$self
->
log
(
@_
);
}
1;