our
@EXPORT_API
=
qw(
NODE $NODE
configure
node_of port_is_local
snd kil
db_set db_del
db_mon db_family db_keys db_values
)
;
our
@EXPORT_OK
= (
qw(
%NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
add_node load_func
)
,
@EXPORT_API
,
);
our
@EXPORT
=
qw(
snd_to_func snd_on eval_on
port_is_local
up_nodes mon_nodes node_is_up
)
;
our
@CARP_NOT
= (AnyEvent::MP::);
sub
load_func($) {
my
$func
=
$_
[0];
unless
(
defined
&$func
) {
my
$pkg
=
$func
;
do
{
$pkg
=~ s/::[^:]+$//
or
return
sub
{
die
"unable to resolve function '$func'"
};
local
$@;
unless
(
eval
"require $pkg; 1"
) {
my
$error
= $@;
$error
=~ /^Can't locate .*.pm in \
@INC
\(/
or
return
sub
{
die
$error
};
}
}
until
defined
&$func
;
}
\
&$func
}
my
@alnum
= (
'0'
..
'9'
,
'A'
..
'Z'
,
'a'
..
'z'
);
sub
nonce($) {
join
""
,
map
chr
rand
256, 1 ..
$_
[0]
}
sub
nonce62($) {
join
""
,
map
$alnum
[
rand
62], 1 ..
$_
[0]
}
our
$CONFIG
;
our
$SECURE
;
our
$RUNIQ
;
our
$UNIQ
;
our
$NODE
;
our
$ID
=
"a"
;
our
%NODE
;
our
(
%PORT
,
%PORT_DATA
);
our
%RMON
;
our
%LMON
;
our
%BINDS
;
our
$BINDS
;
our
$SRCNODE
;
our
$GLOBAL
;
{
my
$now
= AE::now;
$UNIQ
=
(
join
""
,
map
$alnum
[
$_
],
$$ / 62 % 62,
$$ % 62,
(
int
$now
) % 62,
(
int
$now
* 100) % 62,
(
int
$now
* 10000) % 62,
) . nonce62 4
;
$RUNIQ
= nonce62 10;
$RUNIQ
=~ s/(.)$/\U$1/;
$NODE
=
""
;
}
sub
NODE() {
$NODE
}
sub
node_of($) {
my
(
$node
,
undef
) =
split
/
$node
}
BEGIN {
*TRACE
=
$ENV
{PERL_ANYEVENT_MP_TRACE}
?
sub
() { 1 }
:
sub
() { 0 };
}
our
$DELAY_TIMER
;
our
@DELAY_QUEUE
;
our
$delay_run
=
sub
{
(
shift
@DELAY_QUEUE
or
return
undef
$DELAY_TIMER
)->()
while
1;
};
sub
delay($) {
push
@DELAY_QUEUE
,
shift
;
$DELAY_TIMER
||= AE::timer 0, 0,
$delay_run
;
}
sub
_inject {
warn
"RCV $SRCNODE -> "
.
eval
{ JSON::XS->new->encode (\
@_
) } .
"\n"
if
TRACE &&
@_
;
&{
$PORT
{+
shift
} or
return
};
}
sub
add_node {
$NODE
{
$_
[0]} ||
do
{
my
(
$node
) =
@_
;
length
$node
or Carp::croak
"'undef' or the empty string are not valid node/port IDs"
;
new AnyEvent::MP::Node::Remote
$node
}
}
sub
snd(@) {
my
(
$nodeid
,
$portid
) =
split
/
warn
"SND $nodeid <- "
.
eval
{ JSON::XS->new->encode ([
$portid
,
@_
]) } .
"\n"
if
TRACE &&
@_
;
(
$NODE
{
$nodeid
} || add_node
$nodeid
)
->{
send
} ([
"$portid"
,
@_
]);
}
sub
port_is_local($) {
my
(
$nodeid
,
undef
) =
split
/
$nodeid
eq
$NODE
}
sub
snd_to_func($$;@) {
my
$nodeid
=
shift
;
$AnyEvent::MP::Node::Self::DELAY
= 1
if
$nodeid
ne
$NODE
;
(
$NODE
{
$nodeid
} || add_node
$nodeid
)->{
send
} ([
""
,
@_
]);
}
sub
snd_on($@) {
my
$node
=
shift
;
snd
$node
,
snd
=>
@_
;
}
sub
eval_on($$;@) {
my
$node
=
shift
;
snd
$node
,
eval
=>
@_
;
}
sub
kil(@) {
my
(
$nodeid
,
$portid
) =
split
/
length
$portid
or Carp::croak
"$nodeid#$portid: killing a node port is not allowed, caught"
;
(
$NODE
{
$nodeid
} || add_node
$nodeid
)
->
kill
(
"$portid"
,
@_
);
}
sub
node_is_up($) {
(
$_
[0] eq
$NODE
) || (
$NODE
{
$_
[0]} or
return
)->{transport}
? 1 : 0
}
sub
up_nodes() {
map
$_
->{id},
grep
$_
->{transport},
values
%NODE
}
our
%MON_NODES
;
sub
mon_nodes($) {
my
(
$cb
) =
@_
;
$MON_NODES
{
$cb
+0} =
$cb
;
defined
wantarray
and Guard::guard {
delete
$MON_NODES
{
$cb
+0} }
}
sub
_inject_nodeevent($$;@) {
my
(
$node
,
$up
,
@reason
) =
@_
;
AE::
log
7
=>
"$node->{id} is "
. (
$up
?
"up."
:
"down (@reason)."
);
for
my
$cb
(
values
%MON_NODES
) {
eval
{
$cb
->(
$node
->{id},
$up
,
@reason
); 1 }
or AE::
log
die
=> $@;
}
}
sub
_kill {
my
$port
=
shift
;
delete
$PORT
{
$port
}
or
return
;
delete
$PORT_DATA
{
$port
};
my
$mon
=
delete
$LMON
{
$port
}
or !
@_
or AE::
log
die
=>
"unmonitored local port $port died with reason: @_"
;
$_
->(
@_
)
for
values
%$mon
;
}
sub
_monitor {
return
$_
[2](
no_such_port
=>
"cannot monitor nonexistent port"
,
"$NODE#$_[1]"
)
unless
exists
$PORT
{
$_
[1]};
$LMON
{
$_
[1]}{
$_
[2]+0} =
$_
[2];
}
sub
_unmonitor {
delete
$LMON
{
$_
[1]}{
$_
[2]+0}
if
exists
$LMON
{
$_
[1]};
}
sub
_secure_check {
$SECURE
and
die
"remote execution not allowed\n"
;
}
our
%NODE_REQ
;
%NODE_REQ
= (
mon0
=>
sub
{
my
$portid
=
shift
;
_unmonitor
undef
,
$portid
,
delete
$NODE
{
$SRCNODE
}{rmon}{
$portid
}
if
exists
$NODE
{
$SRCNODE
};
},
mon1
=>
sub
{
my
$portid
=
shift
;
Scalar::Util::weaken (
my
$node
=
$NODE
{
$SRCNODE
});
_monitor
undef
,
$portid
,
$node
->{rmon}{
$portid
} =
sub
{
delete
$node
->{rmon}{
$portid
};
$node
->
send
([
""
,
kil0
=>
$portid
,
@_
])
if
$node
&&
$node
->{transport};
};
},
kil0
=>
sub
{
my
$cbs
=
delete
$NODE
{
$SRCNODE
}{lmon}{+
shift
}
or
return
;
$_
->(
@_
)
for
@$cbs
;
},
kil1
=> \
&_kill
,
snd
=>
sub
{
&snd
},
can
=>
sub
{
my
$method
=
shift
;
snd
@_
,
exists
$NODE_REQ
{
$method
};
},
eval
=>
sub
{
&_secure_check
;
my
@res
=
do
{
package
main;
eval
shift
};
snd
@_
,
"$@"
,
@res
if
@_
;
},
time
=>
sub
{
snd
@_
, AE::now;
},
devnull
=>
sub
{
},
""
=>
sub
{
},
);
new AnyEvent::MP::Node::Self
$NODE
;
$PORT
{
""
} =
sub
{
my
$tag
=
shift
;
eval
{ &{
$NODE_REQ
{
$tag
} ||=
do
{
&_secure_check
; load_func
$tag
} } };
AE::
log
die
=>
"error processing node message from $SRCNODE: $@"
if
$@;
};
our
$MPROTO
= 1;
push
@AnyEvent::MP::Transport::HOOK_GREET
,
sub
{
$_
[0]{local_greeting}{mproto} =
$MPROTO
;
};
our
%SEED_NODE
;
our
%NODE_SEED
;
our
%SEED_CONNECT
;
our
$SEED_WATCHER
;
our
$SEED_RETRY
;
our
%GLOBAL_NODE
;
sub
seed_connect {
my
(
$seed
) =
@_
;
my
(
$host
,
$port
) = AnyEvent::Socket::parse_hostport
$seed
or Carp::croak
"$seed: unparsable seed address"
;
AE::
log
9
=>
"trying connect to seed node $seed."
;
$SEED_CONNECT
{
$seed
} ||= AnyEvent::MP::Transport::mp_connect
$host
,
$port
,
on_greeted
=>
sub
{
if
(
$_
[0]{remote_node} eq
$AnyEvent::MP::Kernel::NODE
) {
delete
$SEED_NODE
{
$seed
};
}
else
{
$SEED_NODE
{
$seed
} =
$_
[0]{remote_node};
$NODE_SEED
{
$_
[0]{remote_node}} =
$seed
;
snd
$_
[0]{remote_node},
"g_slave"
;
}
},
sub
{
delete
$SEED_CONNECT
{
$seed
};
}
;
}
sub
seed_all {
my
@seeds
=
grep
!(
defined
$SEED_NODE
{
$_
} && node_is_up
$SEED_NODE
{
$_
}),
keys
%SEED_NODE
;
if
(
@seeds
) {
seed_connect
$_
for
grep
!
exists
$SEED_CONNECT
{
$_
},
@seeds
;
$SEED_RETRY
=
$SEED_RETRY
* 2;
$SEED_RETRY
=
$AnyEvent::MP::Kernel::CONFIG
->{monitor_timeout}
if
$SEED_RETRY
>
$AnyEvent::MP::Kernel::CONFIG
->{monitor_timeout};
$SEED_WATCHER
= AE::timer
$SEED_RETRY
, 0, \
&seed_all
;
}
else
{
undef
$SEED_WATCHER
;
}
}
sub
seed_again {
$SEED_RETRY
= (1 +
rand
) * 0.6;
$SEED_WATCHER
||= AE::timer 0, 0, \
&seed_all
;
}
sub
set_seeds(@) {
%SEED_NODE
= ();
%NODE_SEED
= ();
%SEED_CONNECT
= ();
@SEED_NODE
{
@_
} = ();
seed_again;
}
$NODE_REQ
{g_global} =
sub
{
undef
$GLOBAL_NODE
{
$SRCNODE
};
};
mon_nodes
sub
{
delete
$GLOBAL_NODE
{
$_
[0]}
unless
$_
[1];
return
unless
exists
$NODE_SEED
{
$_
[0]};
if
(
$_
[1]) {
snd
$_
[0],
"g_slave"
;
}
else
{
seed_again;
}
};
our
$KEEPALIVE_RETRY
;
our
$KEEPALIVE_WATCHER
;
our
%KEEPALIVE
;
our
%KEEPALIVE_DOWN
;
sub
keepalive_all {
AE::
log
9
=>
"keepalive: trying to establish connections with: "
. (
join
" "
,
keys
%KEEPALIVE_DOWN
)
.
"."
;
(add_node
$_
)->
connect
for
keys
%KEEPALIVE_DOWN
;
$KEEPALIVE_RETRY
=
$KEEPALIVE_RETRY
* 2;
$KEEPALIVE_RETRY
=
$AnyEvent::MP::Kernel::CONFIG
->{monitor_timeout}
if
$KEEPALIVE_RETRY
>
$AnyEvent::MP::Kernel::CONFIG
->{monitor_timeout};
$KEEPALIVE_WATCHER
= AE::timer
$KEEPALIVE_RETRY
, 0, \
&keepalive_all
;
}
sub
keepalive_again {
$KEEPALIVE_RETRY
= (1 +
rand
) * 0.3;
keepalive_all;
}
sub
keepalive_add {
return
if
$KEEPALIVE
{
$_
[0]}++;
return
if
node_is_up
$_
[0];
undef
$KEEPALIVE_DOWN
{
$_
[0]};
keepalive_again;
}
sub
keepalive_del {
return
if
--
$KEEPALIVE
{
$_
[0]};
delete
$KEEPALIVE
{
$_
[0]};
delete
$KEEPALIVE_DOWN
{
$_
[0]};
undef
$KEEPALIVE_WATCHER
unless
%KEEPALIVE_DOWN
;
}
mon_nodes
sub
{
return
unless
exists
$KEEPALIVE
{
$_
[0]};
if
(
$_
[1]) {
delete
$KEEPALIVE_DOWN
{
$_
[0]};
undef
$KEEPALIVE_WATCHER
unless
%KEEPALIVE_DOWN
;
}
else
{
undef
$KEEPALIVE_DOWN
{
$_
[0]};
keepalive_again;
}
};
our
$MASTER
;
our
$MASTER_MON
;
our
%LOCAL_DB
;
our
$GPROTO
= 1;
push
@AnyEvent::MP::Transport::HOOK_GREET
,
sub
{
$_
[0]{local_greeting}{gproto} =
$GPROTO
;
};
our
%GLOBAL_REQ
;
sub
global_req_add {
my
(
$id
,
$req
) =
@_
;
return
if
exists
$GLOBAL_REQ
{
$id
};
$GLOBAL_REQ
{
$id
} =
$req
;
snd
$MASTER
,
@$req
if
$MASTER
;
}
sub
global_req_del {
delete
$GLOBAL_REQ
{
$_
[0]};
}
our
%GLOBAL_RES
;
our
$GLOBAL_RES_ID
=
"a"
;
sub
global_call {
my
$id
= ++
$GLOBAL_RES_ID
;
$GLOBAL_RES
{
$id
} =
pop
;
global_req_add
$id
, [
@_
,
$id
];
}
$NODE_REQ
{g_reply} =
sub
{
my
$id
=
shift
;
global_req_del
$id
;
my
$cb
=
delete
$GLOBAL_RES
{
$id
}
or
return
;
&$cb
};
sub
g_find {
global_req_add
"g_find $_[0]"
, [
g_find
=>
$_
[0]];
}
$NODE_REQ
{g_found} =
sub
{
global_req_del
"g_find $_[0]"
;
my
$node
=
$NODE
{
$_
[0]} or
return
;
$node
->connect_to (
$_
[1]);
};
sub
master_set {
$MASTER
=
$_
[0];
AE::
log
8
=>
"new master node: $MASTER."
;
$MASTER_MON
= mon_nodes
sub
{
if
(
$_
[0] eq
$MASTER
&& !
$_
[1]) {
undef
$MASTER
;
master_search ();
}
};
snd
$MASTER
,
g_slave
=> \
%LOCAL_DB
;
snd
$MASTER
,
@$_
for
values
%GLOBAL_REQ
;
}
sub
master_search {
AE::
log
9
=>
"starting search for master node."
;
for
(
keys
%NODE_SEED
) {
if
(node_is_up
$_
) {
master_set
$_
;
return
;
}
}
$MASTER_MON
= mon_nodes
sub
{
return
unless
$_
[1];
return
unless
$NODE_SEED
{
$_
[0]};
master_set
$_
[0];
};
}
$NODE_REQ
{g_slave} =
sub
{
&{
$NODE_REQ
{g_slave} }
};
our
$sv_eq_coder
= JSON::XS->new->utf8->allow_nonref;
sub
sv_eq($$) {
ref
$_
[0] ||
ref
$_
[1]
? (JSON::XS::encode
$sv_eq_coder
,
$_
[0]) eq (JSON::XS::encode
$sv_eq_coder
,
$_
[1])
:
$_
[0] eq
$_
[1]
&&
defined
$_
[0] ==
defined
$_
[1]
}
sub
db_del($@) {
my
$family
=
shift
;
my
@del
=
grep
exists
$LOCAL_DB
{
$family
}{
$_
},
@_
;
return
unless
@del
;
delete
@{
$LOCAL_DB
{
$family
} }{
@del
};
snd
$MASTER
,
g_upd
=>
$family
=>
undef
, \
@del
if
defined
$MASTER
;
}
sub
db_set($$;$) {
my
(
$family
,
$subkey
) =
@_
;
unless
(
exists
$LOCAL_DB
{
$family
}{
$subkey
} && sv_eq
$LOCAL_DB
{
$family
}{
$subkey
},
$_
[2]) {
$LOCAL_DB
{
$family
}{
$subkey
} =
$_
[2];
snd
$MASTER
,
g_upd
=>
$family
=> {
$subkey
=>
$_
[2] }
if
defined
$MASTER
;
}
defined
wantarray
and Guard::guard { db_del
$family
=>
$subkey
}
}
sub
db_family {
my
(
$family
,
$cb
) =
@_
;
global_call
g_db_family
=>
$family
,
$cb
;
}
sub
db_keys {
my
(
$family
,
$cb
) =
@_
;
global_call
g_db_keys
=>
$family
,
$cb
;
}
sub
db_values {
my
(
$family
,
$cb
) =
@_
;
global_call
g_db_values
=>
$family
,
$cb
;
}
our
%LOCAL_MON
;
our
%MON_DB
;
sub
db_mon($@) {
my
(
$family
,
$cb
) =
@_
;
if
(
my
$db
=
$MON_DB
{
$family
}) {
$LOCAL_MON
{
$family
}{
$cb
+0} =
sub
{ };
AE::postpone {
return
unless
exists
$LOCAL_MON
{
$family
}{
$cb
+0};
$LOCAL_MON
{
$family
}{
$cb
+0} =
$cb
;
$cb
->(
$db
, [
keys
%$db
]);
};
}
else
{
$LOCAL_MON
{
$family
}{
$cb
+0} =
$cb
;
global_req_add
"mon1 $family"
=> [
g_mon1
=>
$family
];
$MON_DB
{
$family
} = {};
}
defined
wantarray
and Guard::guard {
my
$mon
=
$LOCAL_MON
{
$family
};
delete
$mon
->{
$cb
+0};
unless
(
%$mon
) {
global_req_del
"mon1 $family"
;
snd
$MASTER
,
g_mon0
=>
$family
if
$MASTER
;
delete
$LOCAL_MON
{
$family
};
delete
$MON_DB
{
$family
};
}
}
}
$NODE_REQ
{g_chg1} =
sub
{
return
unless
$SRCNODE
eq
$MASTER
;
my
(
$f
,
$ndb
) =
@_
;
my
$db
=
$MON_DB
{
$f
};
my
(
@a
,
@c
,
@d
);
while
(
my
(
$k
,
$v
) =
each
%$ndb
) {
exists
$db
->{
$k
}
?
push
@c
,
$k
:
push
@a
,
$k
;
$db
->{
$k
} =
$v
;
}
for
(
grep
!
exists
$ndb
->{
$_
},
keys
%$db
) {
delete
$db
->{
$_
};
push
@d
,
$_
;
}
$_
->(
$db
, \
@a
, \
@c
, \
@d
)
for
values
%{
$LOCAL_MON
{
$_
[0]} };
};
$NODE_REQ
{g_chg2} =
sub
{
return
unless
$SRCNODE
eq
$MASTER
;
my
(
$family
,
$set
,
$del
) =
@_
;
my
$db
=
$MON_DB
{
$family
};
my
(
@a
,
@c
);
while
(
my
(
$k
,
$v
) =
each
%$set
) {
exists
$db
->{
$k
}
?
push
@c
,
$k
:
push
@a
,
$k
;
$db
->{
$k
} =
$v
;
}
delete
@$db
{
@$del
};
$_
->(
$db
, \
@a
, \
@c
,
$del
)
for
values
%{
$LOCAL_MON
{
$family
} };
};
sub
nodename {
(POSIX::uname ())[1]
}
sub
_resolve($) {
my
(
$nodeid
) =
@_
;
my
$cv
= AE::cv;
my
@res
;
$cv
->begin (
sub
{
my
%seen
;
my
@refs
;
for
(
sort
{
$a
->[0] <=>
$b
->[0] }
@res
) {
push
@refs
,
$_
->[1]
unless
$seen
{
$_
->[1]}++
}
shift
->
send
(
@refs
);
});
my
$idx
;
for
my
$t
(
split
/,/,
$nodeid
) {
my
$pri
= ++
$idx
;
$t
=
length
$t
? nodename .
":$t"
: nodename
if
$t
=~ /^\d*$/;
my
(
$host
,
$port
) = AnyEvent::Socket::parse_hostport
$t
, 0
or Carp::croak
"$t: unparsable transport descriptor"
;
$port
=
"0"
if
$port
eq
"*"
;
if
(
$host
eq
"*"
) {
$cv
->begin;
my
$get_addr
=
sub
{
my
@addr
;
local
$SIG
{ALRM} =
'DEFAULT'
;
alarm
2;
for
my
$if
(Net::Interface->interfaces) {
for
$_
(
$if
->address (Net::Interface::AF_INET ())) {
next
if
/^\x7f/;
push
@addr
,
$_
;
}
for
(
$if
->address (Net::Interface::AF_INET6 ())) {
next
unless
/^[\x20-\x3f\xfc\xfd]/;
push
@addr
,
$_
;
}
}
alarm
0;
@addr
};
my
@addr
;
if
(AnyEvent::WIN32) {
@addr
=
$get_addr
->();
}
else
{
pipe
my
$r
,
my
$w
or
die
"pipe: $!"
;
if
(
fork
eq 0) {
close
$r
;
syswrite
$w
,
pack
"(C/a*)*"
,
$get_addr
->();
POSIX::_exit (0);
}
else
{
close
$w
;
my
$addr
;
1
while
sysread
$r
,
$addr
, 1024,
length
$addr
;
@addr
=
unpack
"(C/a*)*"
,
$addr
;
}
}
for
my
$ip
(
@addr
) {
push
@res
, [
$pri
+= 1e-5,
AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address
$ip
,
$port
];
}
$cv
->end;
}
else
{
$cv
->begin;
AnyEvent::Socket::resolve_sockaddr
$host
,
$port
,
"tcp"
, 0,
undef
,
sub
{
for
(
@_
) {
my
(
$service
,
$host
) = AnyEvent::Socket::unpack_sockaddr
$_
->[3];
push
@res
, [
$pri
+= 1e-5,
AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address
$host
,
$service
];
}
$cv
->end;
};
}
}
$cv
->end;
$cv
}
our
@POST_CONFIGURE
;
sub
post_configure(&) {
die
"AnyEvent::MP::Kernel::post_configure must be called in void context"
if
defined
wantarray
;
push
@POST_CONFIGURE
,
@_
;
(
shift
@POST_CONFIGURE
)->()
while
$NODE
&&
@POST_CONFIGURE
;
}
sub
configure(@) {
unshift
@_
,
"profile"
if
@_
& 1;
my
(
%kv
) =
@_
;
my
$profile
=
delete
$kv
{profile};
$profile
= nodename
unless
defined
$profile
;
$CONFIG
= AnyEvent::MP::Config::find_profile
$profile
,
%kv
;
$SECURE
=
$CONFIG
->{secure};
my
$node
=
exists
$CONFIG
->{nodeid} ?
$CONFIG
->{nodeid} :
"$profile/"
;
$node
or Carp::croak
"$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n"
;
my
$node_obj
=
delete
$NODE
{
$NODE
};
$NODE
=
$node
;
$NODE
=~ s/
%n
/nodename/ge;
if
(
$NODE
=~ s!(?:(?<=/)$|
%u
)!
$RUNIQ
!g) {
$UNIQ
=
""
;
}
$node_obj
->{id} =
$NODE
;
$NODE
{
$NODE
} =
$node_obj
;
my
$seeds
=
$CONFIG
->{seeds};
my
$binds
=
$CONFIG
->{binds};
$binds
||= [
"*"
];
AE::
log
8
=>
"node $NODE starting up."
;
$BINDS
= [];
%BINDS
= ();
for
(
map
_resolve
$_
,
@$binds
) {
for
my
$bind
(
$_
->
recv
) {
my
(
$host
,
$port
) = AnyEvent::Socket::parse_hostport
$bind
or Carp::croak
"$bind: unparsable local bind address"
;
my
$listener
= AnyEvent::MP::Transport::mp_server
$host
,
$port
,
prepare
=>
sub
{
my
(
undef
,
$host
,
$port
) =
@_
;
$bind
= AnyEvent::Socket::format_hostport
$host
,
$port
;
0
},
;
$BINDS
{
$bind
} =
$listener
;
push
@$BINDS
,
$bind
;
}
}
AE::
log
9
=>
"running post config hooks and init."
;
post_configure { };
db_set
"'l"
=>
$NODE
=>
$BINDS
;
AE::
log
8
=>
"node listens on [@$BINDS]."
;
set_seeds
map
$_
->
recv
,
map
_resolve
$_
,
@$seeds
;
master_search;
undef
&_resolve
;
*configure
=
sub
(@){ };
AE::
log
9
=>
"starting services."
;
for
(@{
$CONFIG
->{services} }) {
if
(
ref
) {
my
(
$func
,
@args
) =
@$_
;
(load_func
$func
)->(
@args
);
}
elsif
(s/::$//) {
eval
"require $_"
;
die
$@
if
$@;
}
else
{
(load_func
$_
)->();
}
}
eval
"#line 1 \"(eval configure parameter)\"\n$CONFIG->{eval}"
;
die
"$@"
if
$@;
}
1