#!/usr/bin/env perl
$| ++;
$MYDan::Util::OptConf::THIS
=
'agent'
;
$MYDan::Util::OptConf::ARGC
= -1;
@MYDan::Util::OptConf::CONF
=
qw( no_ignore_case pass_through )
;
my
$option
= MYDan::Util::OptConf->load();
my
%o
=
$option
->set(
timeout
=> 60,
addr
=> 0 )->get(
qw( range=s sudo=s timeout=i max=i verbose env=s user=s port=s version secret=s immediately addr=s listen=i )
)->
dump
();
$option
->assert(
'range'
);
my
$code
=
shift
;
$o
{version} = 1
if
$code
eq
'version'
;
$o
{user} = `id -un` and
chop
$o
{user}
unless
$o
{user};
if
(
$o
{immediately} )
{
my
$listen
=
$o
{
listen
};
unless
(
$listen
)
{
my
$scan
= `netstat -tun|awk
'{print \$4}'
|awk -F:
'{print \$2}'
`;
my
%open
=
map
{
$_
=> 1 }
my
@open
=
$scan
=~ /(\d+)/g;
my
%port
=
map
{
$_
=> 1 }65112 .. 65535;
(
$listen
) =
grep
{ !
$open
{
$_
} }
keys
%port
;
}
my
(
$cv
,
$index
,
$uuid
,
%index
,
%head
) = ( AE::cv, 0, Data::UUID->new->create_str() );
tcp_server
undef
,
$listen
,
sub
{
my
(
$fh
,
$ip
,
$port
) =
@_
or
die
"[MYDan]tcp_server: $!"
;
$index
++;
warn
"[MYDan]tcp connet from $ip:$port\n"
;
my
$handle
;
$handle
= new AnyEvent::Handle(
fh
=>
$fh
,
rbuf_max
=> 10240000,
wbuf_max
=> 10240000,
autocork
=> 1,
on_read
=>
sub
{
my
$self
=
shift
;
$self
->unshift_read (
chunk
=>
length
$self
->{rbuf},
sub
{
if
(
$head
{
"$ip:$port"
} )
{
map
{
print
$head
{
"$ip:$port"
}.
':'
.
$_
,
"\n"
; }
split
"\n"
,
$_
[1];
}
else
{
my
$x
=
$_
[1];
$x
=~ s/^([^:]+):([^:]+)://;
if
( $1 ne
$uuid
)
{
$handle
->push_shutdown;
return
;
}
warn
"[MYDan]host:$2 ip:$ip port:$port:\n"
;
print
$x
;
$head
{
"$ip:$port"
} = $2;
}
},
)
},
on_error
=>
sub
{
warn
"[MYDan]tcp error: $ip:$port\n"
;
},
on_eof
=>
sub
{
warn
"[MYDan]tcp close: $ip:$port\n"
;
},
);
$index
{
$index
}{handle} =
$handle
;
};
my
$int
= AnyEvent->signal(
signal
=>
"INT"
,
cb
=>
sub
{
warn
"kill.\n"
;
$cv
->
send
; } );
my
$env
=
'MYDan_rlog='
.
join
":"
,
$uuid
,
$o
{addr},
$listen
;
$o
{env} =
$o
{env} ?
join
(
';'
,
$env
,
$o
{env} ) :
$env
;
}
my
$time
=
time
;
my
%query
= (
code
=>
$code
,
argv
=> argv(
$code
),
map
{
$_
=>
$o
{
$_
} }
qw( user sudo )
);
my
$use
=
int
(
time
-
$time
);
$o
{timeout} -=
$use
if
$use
;
$o
{timeout} = 3
if
$o
{timeout} <= 0;
map
{
$query
{
"__$1"
} = $2
if
$_
=~ /^([^=]+)=(.+)$/
}
split
/;/,
delete
$o
{secret}
if
$o
{secret};
if
(
$o
{env} )
{
my
%env
;
map
{
my
@env
=
split
/=/,
$_
;
$env
{
$env
[0]} =
$env
[1]
if
2 ==
@env
}
split
/;/,
$o
{env};
$query
{env} = \
%env
if
%env
;
}
$query
{sudo} =
delete
$o
{sudo}
if
$o
{sudo};
my
$range
= MYDan::Node->new(
$option
->
dump
(
'range'
) );
my
%result
= MYDan::Agent::Client->new(
$range
->load(
delete
$o
{range} )->list
)->run(
%o
,
verbose
=> !
$o
{verbose},
query
=> \
%query
);
unless
(
$o
{verbose} )
{
MYDan::VSSH::Print::result(
'rcal'
=>
%result
);
exit
}
for
my
$node
(
sort
keys
%result
)
{
my
$succ
= 1
if
$result
{
$node
} =~ s/--- 0\n$//;
map
{
$succ
?
print
(
"$node:$_\n"
) :
warn
(
"$node:$_\n"
) }
split
/\n/,
$result
{
$node
};
}
exit
0;
sub
argv
{
my
$code
= File::Spec->
join
(
$o
{argv},
shift
);
return
-f
$code
&& (
$code
=
do
$code
) &&
ref
$code
eq
'CODE'
?
&$code
(
@ARGV
) : \
@ARGV
;
}