our
$VERSION
=
'0.24'
;
sub
new {
my
$class
=
shift
;
my
%new
=
@_
;
my
$self
={};
if
(
$#_
% 2 == 0) {
$new
{Host}=
shift
;
}
bless
(
$self
,
$class
||
'Net::SMTP::AnyEvent'
);
$self
->{new}=\
%new
;
$self
->{encode} = ( (
$new
{Encode}||
''
) eq
'1'
) ?
'utf8'
:
''
;
$self
->{debug} = ((
$new
{Debug}||0) >= 1) ?
int
(
$new
{Debug}):0;
if
( (
$new
{DebugPath}||
''
) ne
''
) {
$self
->{debug_path} = (
lc
(
$new
{DebugPath}) eq
'default'
) ?
'debug_[HOST]_[THREAD].txt'
:
$new
{DebugPath};
}
$self
->{func} =
$new
{Callbacks};
$self
->{global_timeout} =
$new
{GlobalTimeout}||120;
$self
->{last_active}=
time
;
$self
->{start_time}=
time
;
$self
->{timer_actions}=[];
$self
->{runtime_limit}=
$new
{RuntimeLimit}||0;
$self
->{defaults}={
threads
=>
$new
{Threads}||2,
port
=>
$new
{Port}||25,
timeout
=>
$new
{Timeout}||30,
secure
=>
$new
{Secure}||0,
pipe
=>
$new
{Pipeline}||0,
sleep
=>{
hang
=>0,
fail
=>0,
},
retry
=>{
global_hang
=>1,
hang
=>1,
fail
=>5
}
};
$self
->{cv} = AnyEvent->condvar;
if
(
exists
(
$new
{Hosts})) {
$self
->_PREPARE(
$new
{Hosts});
}
else
{
$self
->_PREPARE([\
%new
]);
}
return
$self
;
}
sub
auth {
my
$self
=
shift
;
my
$type
=
shift
;
my
$user
=
shift
;
my
$pass
=
shift
;
my
$k
=
shift
||
$self
->{
last
}[0];
if
( !
defined
(
$type
) and
exists
(
$self
->{auth}{
$k
->[0] }{
$k
->[1] }[0])) {
}
elsif
( !
defined
(
$pass
) or
ref
(
$pass
) eq
'ARRAY'
) {
$k
=
$pass
if
ref
(
$pass
) eq
'ARRAY'
;
$self
->{auth}{
$k
->[0] }{
$k
->[1] }=[
'AUTO'
,
$type
,
$user
];
}
else
{
$self
->{auth}{
$k
->[0] }{
$k
->[1] }=[
$type
,
$user
,
$pass
];
}
}
sub
mail {
my
$self
=
shift
;
my
$user
=
shift
;
my
$k
=
shift
||
$self
->{
last
}[0];
$self
->{queue}{
$k
->[0] }{
$k
->[1] }[
$self
->{queue_size}{
$k
->[0] }{
$k
->[1] }]{mail}=
$user
;
}
sub
to {
my
$self
=
shift
;
my
$user
=
shift
;
my
$k
=
shift
||
$self
->{
last
}[0];
$self
->{queue}{
$k
->[0] }{
$k
->[1] }[
$self
->{queue_size}{
$k
->[0] }{
$k
->[1] }]{to}=
$user
;
}
sub
data {
my
$self
=
shift
;
my
$k
=
shift
||
$self
->{
last
}[0];
$self
->{queue}{
$k
->[0] }{
$k
->[1] }[
$self
->{queue_size}{
$k
->[0] }{
$k
->[1] }]{data}=
''
;
}
sub
datasend {
my
$self
=
shift
;
my
$data
=
shift
;
my
$k
=
shift
||
$self
->{
last
}[0];
$self
->{queue}{
$k
->[0] }{
$k
->[1] }[
$self
->{queue_size}{
$k
->[0] }{
$k
->[1] }]{data}.=
$data
;
}
sub
dataend {
my
$self
=
shift
;
my
$k
=
shift
||
$self
->{
last
}[0];
$self
->{queue_size}{
$k
->[0] }{
$k
->[1] }++;
$self
->_NEXT();
}
sub
quit {
my
$self
=
shift
;
my
$id
=
shift
||
time
;
my
$timer
;
print
"start timer\n"
;
$timer
= AnyEvent->timer(
interval
=> 1,
cb
=>
sub
{
if
(
$self
->{global_timeout} > 0 and (
$self
->{last_active} +
$self
->{global_timeout}) <
time
) {
my
$r
=
$self
->_FUNC(
'global_hang'
,
$self
,[-1,-1],0,[]);
my
$end
=1;
foreach
my
$h
(
keys
(%{
$self
->{threads} }) ) {
foreach
my
$t
( 0..
$self
->{threads}{
$h
} ) {
$self
->{last_active}=
time
;
$self
->_DEBUG([
$h
,
$t
],
'GLOBAL TIMEOUT : (PASS:'
.(
$self
->{stats}{
$h
}{
$t
}{queue}{pass}).
'|HANG:'
.(
$self
->{stats}{
$h
}{
$t
}{queue}{hang}).
'|FAIL:'
.(
$self
->{stats}{
$h
}{
$t
}{queue}{fail}).
'|TOTAL:'
.
$self
->{stats}{
$h
}{
$t
}{queue}{total}.
')'
,5)
if
$self
->{debug} >= 1;
if
(
$self
->{retry}{
$k
->[0] }{global_hang} >=
$self
->{retry}{
$k
->[0] }{global_hang_count} ) {
$self
->{retry}{
$k
->[0] }{global_hang_count}++;
$self
->reconnect([
$h
,
$t
],99,1);
$end
=0;
}
else
{
$self
->{fh}{
$h
}{
$t
}->destroy
if
defined
(
$self
->{fh}{
$h
}{
$t
});
}
}
}
if
(
$end
== 1) {
undef
(
$timer
);
$self
->{cv}->
send
;
}
}
my
@actions
;
foreach
my
$i
( 0 .. $
$self
->{timer_actions}[
$i
][0]++;
if
(
$self
->{timer_actions}[
$i
][0] >=
$self
->{timer_actions}[
$i
][1] ) {
&{
$self
->{timer_actions}[
$i
][2] }( @{
$self
->{timer_actions}[
$i
][3] } );
}
else
{
push
(
@actions
,
$self
->{timer_actions}[
$i
]);
}
}
$self
->{timer_actions}=\
@actions
;
if
(
$self
->{runtime_limit} and
$self
->{start_time} +
$self
->{runtime_limit} <
time
) {
print
"LIMIT!\n"
;
$self
->_FUNC(
'runtime_limit'
,
$self
,
$k
,0,[
$self
->{on_queue}{
$k
->[0] }{
$k
->[1] }]);
undef
(
$timer
);
$self
->{cv}->
send
;
}
},
);
my
$total
=0;
foreach
my
$h
(
keys
(%{
$self
->{threads} }) ) {
foreach
my
$t
( 0..
$self
->{threads}{
$h
} ) {
$self
->{stats}{
$h
}{
$t
}{queue}{id}=
$id
;
$self
->{stats}{
$h
}{
$t
}{queue}{total}=($
$total
+=
$self
->{stats}{
$h
}{
$t
}{queue}{total};
$self
->_DEBUG([
$h
,
$t
],
'Set Queue : '
.
$self
->{stats}{
$h
}{
$t
}{queue}{total},5)
if
$self
->{debug} >= 1;
if
( $
$self
->{cv}->begin;
$self
->_CONNECT([
$h
,
$t
]);
}
}
}
$self
->{cv}->
recv
if
$total
> 0;
undef
(
$timer
)
if
defined
(
$timer
);
my
$rt1
=
$self
->_FUNC(
'complete'
,
$self
,[-1,-1],0,[]);
foreach
my
$h
(
keys
(%{
$self
->{threads} }) ) {
foreach
my
$t
( 0..
$self
->{threads}{
$h
} ) {
my
$rt2
=
$self
->_FUNC(
'end_thread'
,
$self
,
$k
,0,[]);
$self
->{fh}{
$h
}{
$t
}->destroy
if
defined
(
$self
->{fh}{
$h
}{
$t
});
$self
->_DEBUG([
$h
,
$t
],
'End Queue : (PASS:'
.(
$self
->{stats}{
$h
}{
$t
}{queue}{pass}).
'|HANG:'
.(
$self
->{stats}{
$h
}{
$t
}{queue}{hang}).
'|FAIL:'
.(
$self
->{stats}{
$h
}{
$t
}{queue}{fail}).
'|TOTAL:'
.
$self
->{stats}{
$h
}{
$t
}{queue}{total}.
')'
,5)
if
$self
->{debug} >= 1;
$self
->{stats}{
$h
}{
$t
}{queue}{id}=0;
}
}
foreach
my
$dfh
(
keys
(%{
$self
->{debug_fh} }) ) {
close
(
$self
->{debug_fh}{
$dfh
})
}
}
sub
reconnect {
my
$self
=
shift
;
my
$k
=
shift
||
$self
->{
last
}[0];
my
$retry
=
shift
||0;
my
$global
=
shift
||0;
if
(
$retry
> (
$self
->{on_queue}{
$k
->[0] }{
$k
->[1] }{retry}||0) ) {
$self
->{queue_size}{
$k
->[0] }{
$k
->[1] }++;
$self
->{on_queue}{
$k
->[0] }{
$k
->[1] }{retry} = (
$self
->{on_queue}{
$k
->[0] }{
$k
->[1] }||0) + 1
if
$global
== 0;
push
(@{
$self
->{queue}{
$k
->[0] }{
$k
->[1] } },
$self
->{on_queue}{
$k
->[0] }{
$k
->[1] });
}
$self
->{fh}{
$k
->[0] }{
$k
->[1] }->destroy
if
defined
(
$self
->{fh}{
$k
->[0] }{
$k
->[1] });
$self
->{stage}{
$k
->[0] }{
$k
->[1] }=[
'BEGIN'
,0];
$self
->_CONNECT(
$k
);
}
sub
_PREPARE {
my
$self
=
shift
;
my
$hosts
=
shift
;
$self
->{order}=[];
$self
->{open_threads}=0;
foreach
my
$i
( 0..$
my
%new
=( %{
$hosts
->[
$i
]} );
$self
->{host}{
$new
{Host} }||=
$new
{Host};
$self
->{
last
}=[[
$new
{Host},0],0]
if
$i
== 0;
if
(
$self
->{host}{
$new
{Host} }=~s/\:(\d+?)$//is) {
$self
->{port}{
$new
{Host} }=$1;
}
$self
->{secure}{
$new
{Host} }=
$new
{Secure}||
$self
->{defaults}{secure};
$self
->{
pipe
}{
$new
{Host} }=
$new
{Pipeline}||
$self
->{defaults}{
pipe
};
$self
->{port}{
$new
{Host} }||=
$new
{Port}||
$self
->{defaults}{port};
$self
->{helo}{
$new
{Host} }=
$new
{Hello}||
$self
->{host}{
$new
{Host} };
$self
->{timeout}{
$new
{Host} }=
$new
{Timeout}||
$self
->{defaults}{timeout};
$self
->{threads}{
$new
{Host} }=(
$new
{Threads}||
$self
->{defaults}{threads}) - 1;
$self
->{
sleep
}{
$new
{Host} }{hang}=
$new
{Sleep}{Hang}||
$self
->{defaults}{
sleep
}{hang};
$self
->{
sleep
}{
$new
{Host} }{fail}=
$new
{Sleep}{Fail}||
$self
->{defaults}{
sleep
}{fail};
$self
->{retry}{
$new
{Host} }{hang}=
$new
{Retry}{Hang}||
$self
->{defaults}{retry}{hang};
$self
->{retry}{
$new
{Host} }{global_hang}=
$new
{Retry}{GlobalHang}||
$self
->{defaults}{retry}{global_hang};
$self
->{retry}{
$new
{Host} }{global_hang_count}=0;
$self
->{retry}{
$new
{Host} }{fail}=
$new
{Retry}{Hang}||
$self
->{defaults}{retry}{fail};
$self
->{open_threads}+=
$self
->{threads}{
$new
{Host} };
foreach
my
$t
( 0..
$self
->{threads}{
$new
{Host} } ) {
if
(
exists
(
$self
->{debug_path}) ) {
my
$path
=
$self
->_STRFTIME(
''
.
$self
->{debug_path},
time
);
$path
=~s/\[HOST\]/
$new
{Host}/gs;
$path
=~s/\[THREAD\]/
$t
/gs;
$self
->_MKDIRF(
$path
);
open
(
$self
->{debug_fh}{
$new
{Host}.
':'
.
$t
} ,
'>>'
.
$path
);
binmode
(
$self
->{debug_fh}{
$new
{Host}.
':'
.
$t
} ,
':utf8'
);
}
$self
->{auth}{
$new
{Host} }{
$t
}=
$new
{Auth}||[0,
''
];
$self
->{objects}{
$new
{Host} }{
$t
}={};
$self
->{queue}{
$new
{Host} }{
$t
}=[];
$self
->{queue_size}{
$new
{Host} }{
$t
}=0;
$self
->{pipe_mode}{
$new
{Host} }{
$t
}=[0,[],{}];
$self
->{stage}{
$new
{Host} }{
$t
}=[
'BEGIN'
,0];
$self
->{stats}{
$new
{Host} }{
$t
}={
queue
=> {
total
=>0,
pass
=>0,
hang
=>0,
fail
=>0,
count
=>0,
id
=>0
}
};
push
(@{
$self
->{order}}, [
$new
{Host},
$t
,1] );
}
}
}
sub
_CONNECT {
my
$self
=
shift
;
my
$k
=
shift
;
my
%extra
;
if
(
$self
->{secure}{
$k
->[0] } == 1 ) {
%extra
=(
tls
=>
'connect'
,
tls_ctx
=> {
verify
=> 0,
verify_peername
=>
"smtp"
}
);
}
elsif
(
$self
->{secure}{
$k
->[0] } == 2 ) {
%extra
=(
tls
=>
'connect'
,
tls_ctx
=> {
verify
=> 1,
verify_peername
=>
"smtp"
}
);
}
$self
->_DEBUG(
$k
,
"Connecting to $self->{host}{ $k->[0] } on port $self->{port}{ $k->[0] }"
)
if
$self
->{debug} >= 1;
$self
->{fh}{
$k
->[0] }{
$k
->[1] } = new AnyEvent::Handle(
connect
=> [
$self
->{host}{
$k
->[0] },
$self
->{port}{
$k
->[0] }],
on_read
=>
sub
{
$self
->_READ(
$k
); },
timeout
=> (
$self
->{timeout}{
$k
->[0] }||60),
on_timeout
=>
sub
{
$self
->{stats}{
$k
->[0] }{
$k
->[1] }{queue}{hang}++;
my
$r
=
$self
->_FUNC(
'hang'
,
$self
,
$k
,0,[
$self
->{on_queue}{
$k
->[0] }{
$k
->[1] }]);
$self
->_DEBUG(
$k
,
'Email : '
.(++
$self
->{stats}{
$k
->[0] }{
$k
->[1] }{queue}{count}).
' : '
.
$self
->{on_queue}{
$k
->[0] }{
$k
->[1] }{to}.
' : HANG : '
.
$self
->{stage}{
$k
->[0] }{
$k
->[1] }[0].
','
.
$self
->{stage}{
$k
->[0] }{
$k
->[1] }[1], 7)
if
$self
->{debug} >= 1;
if
(
$r
== 1 or
$r
== 103 ) {
if
( (
$self
->{
sleep
}{
$k
->[0] }{hang}||0) > 0 ) {
push
(@{
$self
->{timer_actions}},[0,
$self
->{
sleep
}{
$k
->[0] }{hang},
'reconnect'
,[
$self
,
$k
,
$self
->{retry}{
$k
->[0] }{hang}]] );
}
else
{
$self
->reconnect(
$k
,
$self
->{retry}{
$k
->[0] }{hang} );
}
}
},
on_error
=>
sub
{
$self
->{stats}{
$k
->[0] }{
$k
->[1] }{queue}{fail}++;
my
$r
=
$self
->_FUNC(
'fail'
,
$self
,
$k
,0,[
$self
->{on_queue}{
$k
->[0] }{
$k
->[1] }]);
$self
->_DEBUG(
$k
,
'Email : '
.(++
$self
->{stats}{
$k
->[0] }{
$k
->[1] }{queue}{count}).
' : '
.
$self
->{on_queue}{
$k
->[0] }{
$k
->[1] }{to}.
' : FAIL : '
.
$self
->{stage}{
$k
->[0] }{
$k
->[1] }[0].
','
.
$self
->{stage}{
$k
->[0] }{
$k
->[1] }[1], 7)
if
$self
->{debug} >= 1;
if
( (
$self
->{
sleep
}{
$k
->[0] }{fail}||0) > 0 ) {
push
(@{
$self
->{timer_actions}},[0,
$self
->{
sleep
}{
$k
->[0] }{fail},
'reconnect'
,[
$self
,
$k
,
$self
->{retry}{
$k
->[0] }{fail}]] );
}
else
{
$self
->reconnect(
$k
,
$self
->{retry}{
$k
->[0] }{fail} );
}
},
%extra
);
}
sub
_FUNC {
my
$self
=
shift
;
no
strict;
my
$func
=
shift
;
return
&{
$self
->{func}{
$func
}}(
@_
)
if
exists
(
$self
->{func}{
$func
});
return
1;
}
sub
_BULK {
my
$self
=
shift
;
}
sub
_NEXT {
my
$self
=
shift
;
my
$k
=
shift
;
my
@next
;
while
(!
exists
(
$next
[0])) {
$self
->{
last
}[1]++;
if
(
exists
(
$self
->{order}[
$self
->{
last
}[1] ])) {
@next
=(
$self
->{order}[
$self
->{
last
}[1] ],
$self
->{
last
}[1] );
}
else
{
@next
=(
$self
->{order}[0],0);
}
}
$self
->{
last
}=\
@next
;
}
sub
_HELO {
my
$self
=
shift
;
my
$k
=
shift
;
$self
->_WRITE(
$k
,
'EHLO '
.
$self
->{helo}{
$k
->[0] });
}
sub
_PIPE {
my
$self
=
shift
;
my
$k
=
shift
;
my
$mode
=
shift
;
if
(
$self
->{
pipe
}{
$k
->[0] } > 0) {
if
(
$mode
== 1 ) {
$self
->{pipe_mode}{
$k
->[0] }{
$k
->[1] }[0]=
$self
->{
pipe
}{
$k
->[0] };
}
}
}
sub
_STARTTLS {
my
$self
=
shift
;
my
$k
=
shift
;
if
(
$self
->{stage}{
$k
->[0] }{
$k
->[1] }[0] eq
'HEADER'
) {
$self
->{stage}{
$k
->[0] }{
$k
->[1] }=[
'STARTTLS'
,0];
$self
->_WRITE(
$k
,
'STARTTLS'
);
}
else
{
my
@extra
;
if
(
$self
->{secure}{
$k
->[0] } == 3 ) {
@extra
=(
'connect'
,
{
verify
=> 0,
verify_peername
=>
"smtp"
}
);
}
elsif
(
$self
->{secure}{
$k
->[0] } == 4 ) {
@extra
=(
'connect'
,
{
verify
=> 1,
verify_peername
=>
"smtp"
}
);
}
$self
->{stage}{
$k
->[0] }{
$k
->[1] }=[
'BEGIN'
,0];
$self
->{fh}{
$k
->[0] }{
$k
->[1] }->starttls(
@extra
);
$self
->_HELO(
$k
);
$self
->{header}{
$k
->[0] }{
$k
->[1] }={};
$self
->{stage}{
$k
->[0] }{
$k
->[1] }=[
'HELO'
,1];
}
}
sub
_AUTH {
my
$self
=
shift
;
my
$k
=
shift
;
if
(
$self
->{stage}{
$k
->[0] }{
$k
->[1] }[0] eq
'AUTH'
) {
if
(
$self
->{stage}{
$k
->[0] }{
$k
->[1] }[1] == 1) {
if
(
my
$str
=
$self
->{objects}{
$k
->[0] }{
$k
->[1] }{sasl_client}->client_start() ) {
$self
->_WRITE(
$k
,MIME::Base64::encode_base64(
$str
,
''
));
$self
->{stage}{
$k
->[0] }{
$k
->[1] }[1]++;
return
;
}
}
my
$msg
= MIME::Base64::decode_base64(
$self
->{status_text}{
$k
->[0] }{
$k
->[1] });
$self
->_WRITE(
$k
, MIME::Base64::encode_base64(
$self
->{objects}{
$k
->[0] }{
$k
->[1] }{sasl_client}->client_step(
$msg
),
''
) );
$self
->{stage}{
$k
->[0] }{
$k
->[1] }[1]++;
}
else
{
my
$mech
=(
$self
->{auth}{
$k
->[0] }{
$k
->[1] }[0] eq
'AUTO'
) ?
uc
(
join
(
' '
,@{
$self
->{header}{
$k
->[0] }{
$k
->[1] }{auth}})) :
$self
->{auth}{
$k
->[0] }{
$k
->[1] }[0];
$self
->{objects}{
$k
->[0] }{
$k
->[1] }{sasl} = Authen::SASL->new (
mechanism
=>
$mech
,
callback
=> {
user
=>
$self
->{auth}{
$k
->[0] }{
$k
->[1] }[1],
pass
=>
$self
->{auth}{
$k
->[0] }{
$k
->[1] }[2],
authname
=>
$self
->{auth}{
$k
->[0] }{
$k
->[1] }[1],
}
);
$self
->{objects}{
$k
->[0] }{
$k
->[1] }{sasl_client} =
$self
->{objects}{
$k
->[0] }{
$k
->[1] }{sasl}->client_new(
"smtp"
,
$k
->[0],1);
$self
->{stage}{
$k
->[0] }{
$k
->[1] }=[
'AUTH'
,0];
$self
->_WRITE(
$k
,
'AUTH '
.
$self
->{objects}{
$k
->[0] }{
$k
->[1] }{sasl_client}->mechanism);
$self
->{stage}{
$k
->[0] }{
$k
->[1] }=[
'AUTH'
,1];
}
}
sub
_MAIL {
my
$self
=
shift
;
my
$k
=
shift
;
$self
->{queue_size}{
$k
->[0] }{
$k
->[1] }--;
$self
->{on_queue}{
$k
->[0] }{
$k
->[1] }=
shift
(@{
$self
->{queue}{
$k
->[0] }{
$k
->[1] } });
if
(
$self
->{queue_size}{
$k
->[0] }{
$k
->[1] } == -1 ) {
$self
->_WRITE(
$k
,
'QUIT'
);
$self
->{fh}{
$k
->[0] }{
$k
->[1] }->destroy;
$self
->{cv}->end;
$self
->{stage}{
$k
->[0] }{
$k
->[1] }=[
'END'
,0];
}
else
{
$self
->{stage}{
$k
->[0] }{
$k
->[1] }=[
'MAIL'
,1];
my
$write
=
'MAIL FROM: '
.
$self
->{on_queue}{
$k
->[0] }{
$k
->[1] }{mail};
if
(
$self
->{pipe_mode}{
$k
->[0] }{
$k
->[1] }[0] == 0 ) {
$self
->_WRITE(
$k
,
$write
);
$self
->{stage}{
$k
->[0] }{
$k
->[1] }=[
'MAIL'
,2];
}
else
{
push
( @{
$self
->{pipe_mode}{
$k
->[0] }{
$k
->[1] }[1] },
$write
);
$self
->_TO(
$k
);
}
}
}
sub
_TO {
my
$self
=
shift
;
my
$k
=
shift
;
$self
->{stage}{
$k
->[0] }{
$k
->[1] }=[
'TO'
,1];
my
$write
=
'RCPT TO: '
.
$self
->{on_queue}{
$k
->[0] }{
$k
->[1] }{to};
if
(
$self
->{pipe_mode}{
$k
->[0] }{
$k
->[1] }[0] == 0 ) {
$self
->_WRITE(
$k
,
$write
);
$self
->{stage}{
$k
->[0] }{
$k
->[1] }=[
'TO'
,2];
}
else
{
push
( @{
$self
->{pipe_mode}{
$k
->[0] }{
$k
->[1] }[1] },
$write
);
$self
->_DATA(
$k
);
}
}
sub
_DATA {
my
$self
=
shift
;
my
$k
=
shift
;
$self
->{stage}{
$k
->[0] }{
$k
->[1] }=[
'DATA'
,1];
my
$write
=
'DATA'
;
if
(
$self
->{pipe_mode}{
$k
->[0] }{
$k
->[1] }[0] == 0 ) {
$self
->_WRITE(
$k
,
$write
);
}
else
{
push
( @{
$self
->{pipe_mode}{
$k
->[0] }{
$k
->[1] }[1] },
$write
);
$write
=
join
(
"\n"
,@{
$self
->{pipe_mode}{
$k
->[0] }{
$k
->[1] }[1] });
$self
->{pipe_mode}{
$k
->[0] }{
$k
->[1] }[1]=[];
$self
->_WRITE(
$k
,
$write
);
}
$self
->{stage}{
$k
->[0] }{
$k
->[1] }=[
'DATA'
,2];
}
sub
_DATAEND {
my
$self
=
shift
;
my
$k
=
shift
;
$self
->{stage}{
$k
->[0] }{
$k
->[1] }=[
'DATAEND'
,1];
my
$write
=
$self
->{on_queue}{
$k
->[0] }{
$k
->[1] }{data}.
"\r\n."
;
if
(
$self
->{pipe_mode}{
$k
->[0] }{
$k
->[1] }[0] < 2 ) {
$write
=
join
(
"\n"
, (@{
$self
->{pipe_mode}{
$k
->[0] }{
$k
->[1] }[1] },
$write
) );
$self
->{pipe_mode}{
$k
->[0] }{
$k
->[1] }[1]=[];
$self
->_WRITE(
$k
,
$write
);
$self
->{stage}{
$k
->[0] }{
$k
->[1] }=[
'DATAEND'
,2];
$self
->{data}{
$k
->[0] }{
$k
->[1] }={};
}
elsif
(
$self
->{pipe_mode}{
$k
->[0] }{
$k
->[1] }[0] == 2 ) {
if
(
$self
->{queue_size}{
$k
->[0] }{
$k
->[1] } == 0 ) {
$self
->{pipe_mode}{
$k
->[0] }{
$k
->[1] }[2]=1;
$write
=
join
(
"\n"
,@{
$self
->{pipe_mode}{
$k
->[0] }{
$k
->[1] }[1] }).
"\n"
.
$write
;
$self
->{pipe_mode}{
$k
->[0] }{
$k
->[1] }[1]=[];
$self
->_WRITE(
$k
,
$write
);
$self
->{stage}{
$k
->[0] }{
$k
->[1] }=[
'DATAEND'
,2];
$self
->{data}{
$k
->[0] }{
$k
->[1] }={};
}
else
{
$self
->{pipe_mode}{
$k
->[0] }{
$k
->[1] }[2]{to}=
$self
->{on_queue}{
$k
->[0] }{
$k
->[1] }{to};
$self
->{pipe_mode}{
$k
->[0] }{
$k
->[1] }[2]{queue}=
$self
->{queue}{
$k
->[0] }{
$k
->[1] };
push
( @{
$self
->{pipe_mode}{
$k
->[0] }{
$k
->[1] }[1] },
$write
);
$self
->{stage}{
$k
->[0] }{
$k
->[1] }=[
'MAIL'
,0];
$self
->{data}{
$k
->[0] }{
$k
->[1] }={};
$self
->_MAIL(
$k
);
}
}
}
sub
_HEADER {
my
$self
=
shift
;
my
$k
=
shift
;
my
$line
=
shift
;
$line
=~m/^((\d{3})[ \-](\w+?)(?: (.*?)|)[\r\n]*?)$/is;
my
$status
=
lc
($2);
my
$head
=
lc
($3);
$self
->{header}{
$k
->[0] }{
$k
->[1] }{
$head
}=[
split
/ /,($4||
''
)];
}
sub
_READ {
my
$self
=
shift
;
my
$k
=
shift
;
$self
->{last_active}=
time
;
$self
->{fh}{
$k
->[0] }{
$k
->[1] }->push_read (
line
=>
sub
{
$self
->{handle}{
$k
->[0] }{
$k
->[1] }=
shift
;
$self
->{buffer}{
$k
->[0] }{
$k
->[1] }=
shift
;
$self
->_DEBUG(
$k
,
$self
->{buffer}{
$k
->[0] }{
$k
->[1] })
if
$self
->{debug} >= 1;
if
(
$self
->{buffer}{
$k
->[0] }{
$k
->[1] }=~m/^(\d+?)([ \-])(.*?)$/is) {
$self
->{status_code}{
$k
->[0] }{
$k
->[1] }=$1;
$self
->{status_mode}{
$k
->[0] }{
$k
->[1] }=$2;
$self
->{status_text}{
$k
->[0] }{
$k
->[1] }=$3;
my
$r_read
=
$self
->_FUNC(
'read'
,
$self
,
$k
,0,
$self
->{queue}{
$k
->[0] }{
$k
->[1] });
if
(
$self
->{status_code}{
$k
->[0] }{
$k
->[1] } == 250 and
$self
->{stage}{
$k
->[0] }{
$k
->[1] }[0] eq
'DATAEND'
and
$self
->{stage}{
$k
->[0] }{
$k
->[1] }[1] == 2 ) {
$self
->{stats}{
$k
->[0] }{
$k
->[1] }{queue}{pass}++;
$self
->_DEBUG(
$k
,
'Email : '
.(++
$self
->{stats}{
$k
->[0] }{
$k
->[1] }{queue}{count}).
' : '
.
$self
->{on_queue}{
$k
->[0] }{
$k
->[1] }{to}.
' : PASS'
,8)
if
$self
->{debug} >= 1;
my
$r
=
$self
->_FUNC(
'pass'
,
$self
,
$k
,0,
$self
->{queue}{
$k
->[0] }{
$k
->[1] });
$self
->{stage}{
$k
->[0] }{
$k
->[1] }=[
'MAIL'
,0];
}
elsif
(
$self
->{pipe_mode}{
$k
->[0] }{
$k
->[1] }[0] == 2 and
$self
->{status_code}{
$k
->[0] }{
$k
->[1] } == 250 and
$self
->{stage}{
$k
->[0] }{
$k
->[1] }[0] eq
'DATA'
and
$self
->{stage}{
$k
->[0] }{
$k
->[1] }[1] == 2 and
$self
->{pipe_mode}{
$k
->[0] }{
$k
->[1] }[2]{to} ne
''
) {
$self
->{stats}{
$k
->[0] }{
$k
->[1] }{queue}{pass}++;
$self
->_DEBUG(
$k
,
'Email : '
.(++
$self
->{stats}{
$k
->[0] }{
$k
->[1] }{queue}{count}).
' : '
.
$self
->{pipe_mode}{
$k
->[0] }{
$k
->[1] }[2]{to}.
' : PASS'
,8)
if
$self
->{debug} >= 1;
my
$r
=
$self
->_FUNC(
'pass'
,
$self
,
$k
,0,
$self
->{pipe_mode}{
$k
->[0] }{
$k
->[1] }[2]{queue});
$self
->{pipe_mode}{
$k
->[0] }{
$k
->[1] }[2]={};
}
if
(
$self
->{status_code}{
$k
->[0] }{
$k
->[1] } == 220 and (
$self
->{stage}{
$k
->[0] }{
$k
->[1] }[0] eq
'BEGIN'
or
$self
->{stage}{
$k
->[0] }{
$k
->[1] }[0] eq
'HELO'
) ) {
$self
->{stage}{
$k
->[0] }{
$k
->[1] }=[
'HELO'
,0];
if
(
$self
->{status_mode}{
$k
->[0] }{
$k
->[1] } eq
' '
) {
my
$r
=
$self
->_FUNC(
'connect_pass'
,
$self
,
$k
,0,
$self
->{queue}{
$k
->[0] }{
$k
->[1] });
$self
->_HELO(
$k
);
$self
->{stage}{
$k
->[0] }{
$k
->[1] }=[
'HELO'
,1];
}
}
elsif
(
$self
->{status_code}{
$k
->[0] }{
$k
->[1] } == 220 and
$self
->{stage}{
$k
->[0] }{
$k
->[1] }[0] eq
'STARTTLS'
) {
$self
->_STARTTLS(
$k
);
}
elsif
(
$self
->{status_code}{
$k
->[0] }{
$k
->[1] } == 221 ) {
$self
->{fh}{
$k
->[0] }{
$k
->[1] }->destroy;
}
elsif
(
$self
->{status_code}{
$k
->[0] }{
$k
->[1] } == 421 ) {
my
$r
=
$self
->_FUNC(
'hang'
,
$self
,
$k
,0,[
$self
->{on_queue}{
$k
->[0] }{
$k
->[1] }]);
$self
->_DEBUG(
$k
,
'Email : '
.(++
$self
->{stats}{
$k
->[0] }{
$k
->[1] }{queue}{count}).
' : '
.
$self
->{on_queue}{
$k
->[0] }{
$k
->[1] }{to}.
' : HANG'
,8)
if
$self
->{debug} >= 1;
$self
->reconnect(
$k
,
$self
->{retry}{
$k
->[0] }{hang} )
if
(
$r
== 1 or
$r
== 103);
}
elsif
( (
$self
->{status_code}{
$k
->[0] }{
$k
->[1] } == 250 and
$self
->{stage}{
$k
->[0] }{
$k
->[1] }[0] eq
'HELO'
) or
$self
->{stage}{
$k
->[0] }{
$k
->[1] }[0] eq
'HEADER'
) {
$self
->{stage}{
$k
->[0] }{
$k
->[1] }=[
'HEADER'
,0];
$self
->_HEADER(
$k
,
$self
->{buffer}{
$k
->[0] }{
$k
->[1] });
if
(
$self
->{status_mode}{
$k
->[0] }{
$k
->[1] } eq
' '
) {
$self
->{stage}{
$k
->[0] }{
$k
->[1] }=[
'HEADER'
,1];
if
( (
$self
->{secure}{
$k
->[0] } == 3 or
$self
->{secure}{
$k
->[0] } == 4) and
exists
(
$self
->{header}{
$k
->[0] }{
$k
->[1] }{starttls}) ) {
$self
->_STARTTLS(
$k
);
}
elsif
( (
$self
->{auth}{
$k
->[0] }{
$k
->[1] }[0]||
'0'
) ne
'0'
) {
$self
->_PIPE(
$k
,1);
$self
->_AUTH(
$k
);
}
else
{
$self
->_PIPE(
$k
,1);
$self
->{stage}{
$k
->[0] }{
$k
->[1] }=[
'MAIL'
,0];
}
}
}
elsif
(
$self
->{status_code}{
$k
->[0] }{
$k
->[1] } == 334 and
$self
->{stage}{
$k
->[0] }{
$k
->[1] }[0] eq
'AUTH'
and
$self
->{stage}{
$k
->[0] }{
$k
->[1] }[1] >= 1 ) {
$self
->_AUTH(
$k
);
}
elsif
(
$self
->{status_code}{
$k
->[0] }{
$k
->[1] } == 235 and
$self
->{stage}{
$k
->[0] }{
$k
->[1] }[0] eq
'AUTH'
and
$self
->{stage}{
$k
->[0] }{
$k
->[1] }[1] >= 1 ) {
$self
->{stage}{
$k
->[0] }{
$k
->[1] }=[
'MAIL'
,0];
}
if
(
$self
->{stage}{
$k
->[0] }{
$k
->[1] }[0] eq
'MAIL'
and
$self
->{stage}{
$k
->[0] }{
$k
->[1] }[1] == 0 ) {
$self
->_MAIL(
$k
);
}
elsif
(
$self
->{stage}{
$k
->[0] }{
$k
->[1] }[0] eq
'MAIL'
and
$self
->{stage}{
$k
->[0] }{
$k
->[1] }[1] == 2 ) {
if
(
$self
->{status_code}{
$k
->[0] }{
$k
->[1] } == 250) {
$self
->{stage}{
$k
->[0] }{
$k
->[1] }=[
'TO'
,0];
$self
->_TO(
$k
);
}
}
elsif
(
$self
->{stage}{
$k
->[0] }{
$k
->[1] }[0] eq
'TO'
and
$self
->{stage}{
$k
->[0] }{
$k
->[1] }[1] == 2 ) {
if
(
$self
->{status_code}{
$k
->[0] }{
$k
->[1] } == 250) {
$self
->{stage}{
$k
->[0] }{
$k
->[1] }=[
'DATA'
,0];
$self
->_DATA(
$k
);
}
}
elsif
(
$self
->{stage}{
$k
->[0] }{
$k
->[1] }[0] eq
'DATA'
and
$self
->{stage}{
$k
->[0] }{
$k
->[1] }[1] == 2 ) {
if
(
$self
->{status_code}{
$k
->[0] }{
$k
->[1] } == 354 ) {
$self
->{stage}{
$k
->[0] }{
$k
->[1] }=[
'DATAEND'
,0];
$self
->_DATAEND(
$k
);
}
}
}
});
}
sub
_WRITE {
my
$self
=
shift
;
my
$k
=
shift
;
my
$str
=
shift
;
$str
=~s/[\r\n]+?$//s;
$self
->{last_active}=
time
;
$self
->_DEBUG(
$k
,
'>>'
.
$str
)
if
$self
->{debug} >= 1;
$self
->{handle}{
$k
->[0] }{
$k
->[1] }->push_write( (
$self
->{encode} ne
''
) ? Encode::encode(
$self
->{encode}=>
$str
.
"\r\n"
):
$str
.
"\r\n"
);
}
sub
_DEBUG {
my
$self
=
shift
;
my
$k
=
shift
;
my
$str
=
shift
||
''
;
my
$dlevel
=
shift
||10;
if
(
$dlevel
<=
$self
->{debug} ) {
my
$out
=
'['
.
$k
->[0].
':'
.
$k
->[1].
':'
.
$self
->{stats}{
$k
->[0] }{
$k
->[1] }{queue}{id}.
']['
.
$self
->_STRFTIME(
'[YYYY]-[MM]-[DD] [hh]:[mm]:[ss]'
,
time
).
'] '
.
$str
.
"\n"
;
if
(
exists
(
$self
->{debug_path}) ) {
print
{
$self
->{debug_fh}{
$k
->[0].
':'
.
$k
->[1] } }
$out
;
}
else
{
print
$out
;
}
}
}
sub
_STRFTIME {
my
$self
=
shift
;
my
$format
=
shift
;
my
$time
=
shift
;
my
@time
=
localtime
(
$time
);
my
%DT
=(
'YYYY'
=>
$time
[5]+1900,
'MM'
=>
sprintf
(
'%.2d'
,
$time
[4]+1),
'DD'
=>
sprintf
(
'%.2d'
,
$time
[3]),
'hh'
=>
sprintf
(
'%.2d'
,
$time
[2]),
'mm'
=>
sprintf
(
'%.2d'
,
$time
[1]),
'ss'
=>
sprintf
(
'%.2d'
,
$time
[0]),
'WK'
=>(( (
$time
[7]+1-
$time
[6]) <= 7) ?
'01'
:
sprintf
(
'%.2d'
,(
$time
[7]+1-
$time
[6])/7)+1)
);
$format
=~s/\[(YYYY|MM|DD|hh|mm|ss|MNA|DNAME|WK)\]/
$DT
{$1}/gs;
return
$format
;
}
sub
_MKDIRF {
my
$self
=
shift
;
my
$path
=
shift
;
my
$sep
=0;
my
$np2
=
''
;
if
(
$path
=~s!^(\w\:|)\/!!s) {
$np2
=
''
.($1||
''
);
}
else
{
$sep
=1;
}
my
@f
=
split
/\//,
$path
;
my
$fl
=
pop
(
@f
);
foreach
my
$f
(
@f
) {
if
(
$sep
== 0) {
$np2
.=
'/'
.
$f
;
}
else
{
$np2
.=
$f
;
$sep
=0;
}
if
(!(-e
$np2
)) {
my
$mk
=
mkdir
(
$np2
);
}
}
}
1;