{
use
5.10.0;
our
$MAJOR
= 0.074;
our
$MINOR
= 0;
our
$DEV
= 3;
our
$VERSION
=
sprintf
(
'%1.3f%03d'
. (
$DEV
? ((
$DEV
< 0 ?
''
:
'_'
) .
'%03d'
) : (
''
)),
$MAJOR
,
$MINOR
,
abs
$DEV
);
sub
BUILD {1}
has
'client'
=> (
isa
=>
'Net::BitTorrent'
,
is
=>
'ro'
,
predicate
=>
'has_client'
);
after
'BUILD'
=>
sub
{
my
(
$s
,
$a
) =
@_
;
return
has
'+client'
=>
(
handles
=>
qr[^(?:(?:_has_)?udp\d.*?|ip_filter|port)]
)
if
$s
->has_client;
Moose::Util::apply_all_roles(
$s
,
'Net::BitTorrent::DHT::Standalone'
,
{
rebless_params
=>
$a
});
$s
->udp6;
$s
->udp4;
};
for
my
$type
(
qw[requests replies]
) {
for
my
$var
(
qw[count length]
) {
my
$attr
=
join
'_'
,
''
,
'recv_invalid'
,
$var
;
has
$attr
=> (
isa
=>
'Int'
,
is
=>
'ro'
,
init_arg
=>
undef
,
traits
=> [
'Counter'
],
handles
=> {
'_inc'
.
$attr
=>
'inc'
},
default
=> 0
);
for
my
$dir
(
qw[recv send]
) {
my
$attr
=
join
'_'
,
''
,
$dir
,
$type
,
$var
;
has
$attr
=> (
isa
=>
'Int'
,
is
=>
'ro'
,
init_arg
=>
undef
,
traits
=> [
'Counter'
],
handles
=> {
'_inc'
.
$attr
=>
'inc'
},
default
=> 0
);
}
}
}
has
'nodeid'
=> (
isa
=>
'NBTypes::DHT::NodeID'
,
is
=>
'ro'
,
lazy_build
=> 1,
builder
=>
'_build_nodeid'
,
coerce
=> 1
);
sub
_build_nodeid {
return
Digest::SHA::sha1(
rand
(
time
* $^T) . $0 .
'Sanko was here.'
);
}
sub
send
{
my
(
$s
,
$node
,
$packet
,
$reply
) =
@_
;
my
$rule
=
$s
->ip_filter->is_banned(
$node
->host);
if
(
defined
$rule
) {
$s
->trigger_ip_filter(
{
protocol
=> (
$node
->ipv6 ?
'udp6'
:
'udp4'
),
severity
=>
'debug'
,
event
=>
'ip_filter'
,
address
=> [
$node
->host,
$node
->port],
rule
=>
$rule
,
message
=>
'Outgoing data was blocked by ipfilter'
}
);
return
$s
->routing_table->del_node(
$node
);
}
my
$sock
=
$node
->ipv6 &&
$s
->_has_udp6_sock ?
$s
->udp6_sock
:
$s
->_has_udp4_sock ?
$s
->udp4_sock
: ();
my
$sent
=
$sock
?
send
$sock
,
$packet
, 0,
$node
->sockaddr :
return
;
if
(
$reply
) {
$s
->_inc_send_replies_count;
$s
->_inc_send_replies_length(
$sent
);
}
else
{
$s
->_inc_send_requests_count;
$s
->_inc_send_requests_length(
$sent
);
}
return
$sent
;
}
has
'ipv4_routing_table'
=> (
isa
=>
'Net::BitTorrent::Protocol::BEP05::RoutingTable'
,
is
=>
'ro'
,
lazy_build
=> 1,
handles
=> {
ipv4_add_node
=>
'add_node'
,
ipv4_buckets
=>
'buckets'
}
);
has
'ipv6_routing_table'
=> (
isa
=>
'Net::BitTorrent::Protocol::BEP05::RoutingTable'
,
is
=>
'ro'
,
lazy_build
=> 1,
handles
=> {
ipv6_add_node
=>
'add_node'
,
ipv6_buckets
=>
'buckets'
}
);
sub
_build_ipv4_routing_table {
Net::BitTorrent::Protocol::BEP05::RoutingTable->new(
dht
=>
shift
);
}
sub
_build_ipv6_routing_table {
Net::BitTorrent::Protocol::BEP05::RoutingTable->new(
dht
=>
shift
);
}
sub
add_node {
my
(
$s
,
$n
) =
@_
;
my
$sockaddr
= sockaddr(
$n
->[0],
$n
->[1]);
next
if
!
$sockaddr
;
$n
= blessed
$n
?
$n
: Net::BitTorrent::Protocol::BEP05::Node->new(
host
=>
$n
->[0],
port
=>
$n
->[1],
sockaddr
=>
$sockaddr
,
routing_table
=> (
length
$sockaddr
== 28 ?
$s
->ipv6_routing_table
:
$s
->ipv4_routing_table
)
);
(
$n
->ipv6
?
$s
->ipv6_routing_table->add_node(
$n
)
:
$s
->ipv4_routing_table->add_node(
$n
)
)->find_node(
$s
->nodeid);
}
my
$boot_constraint
;
after
'BUILD'
=>
sub
{
my
(
$self
,
$args
) =
@_
;
return
if
!
defined
$args
->{
'boot_nodes'
};
$boot_constraint
//=
Moose::Util::TypeConstraints::create_parameterized_type_constraint(
'ArrayRef[NBTypes::Network::Addr]'
);
$boot_constraint
->validate(@{
$args
->{
'boot_nodes'
}});
$self
->add_node(
$_
)
for
@{
$args
->{
'boot_nodes'
}};
};
for
my
$type
(
qw[get_peers announce_peer find_node]
) {
has
"_${type}_quests"
=> (
isa
=>
'ArrayRef[Ref]'
,
is
=>
'ro'
,
init_arg
=>
undef
,
traits
=> [
'Array'
],
handles
=> {
"add_${type}_quest"
=>
'push'
,
"${type}_quests"
=>
'elements'
,
"get_${type}_quest"
=>
'get'
,
"grep_${type}_quests"
=>
'grep'
,
"map_${type}_quests"
=>
'map'
},
default
=>
sub
{ [] }
);
after
"add_${type}_quest"
=>
sub
{
Scalar::Util::weaken
$_
[0]->{
"_${type}_quests"
}->[-1];
};
}
my
$onesixty_constraint
;
sub
get_peers {
my
(
$self
,
$infohash
,
$code
) =
@_
;
$onesixty_constraint
//=
Moose::Util::TypeConstraints::find_type_constraint(
'NBTypes::DHT::NodeID'
);
$infohash
=
$onesixty_constraint
->coerce(
$infohash
);
Scalar::Util::weaken
$self
;
my
$quest
= [
$infohash
,
$code
,
[],
AE::timer(
0,
0.25 * 60,
sub
{
return
if
!
$self
;
for
my
$rt
(
$self
->ipv6_routing_table,
$self
->ipv4_routing_table)
{
for
my
$node
(
@{
$rt
->nearest_bucket(
$infohash
)->nodes})
{
$node
->get_peers(
$infohash
);
}
}
}
)
];
$self
->add_get_peers_quest(
$quest
);
return
$quest
;
}
sub
announce_peer {
my
(
$self
,
$infohash
,
$port
,
$code
) =
@_
;
$onesixty_constraint
//=
Moose::Util::TypeConstraints::find_type_constraint(
'NBTypes::DHT::NodeID'
);
$infohash
=
$onesixty_constraint
->coerce(
$infohash
);
Scalar::Util::weaken
$self
;
my
$quest
= [
$infohash
,
$code
,
$port
,
[],
AE::timer(
10,
0.25 * 60,
sub
{
return
if
!
$self
;
for
my
$rt
(
$self
->ipv6_routing_table,
$self
->ipv4_routing_table)
{
for
my
$node
(
@{
$rt
->nearest_bucket(
$infohash
)->nodes})
{
$node
->announce_peer(
$infohash
,
$port
);
}
}
}
)
];
$self
->add_announce_peer_quest(
$quest
);
return
$quest
;
}
sub
find_node {
my
(
$self
,
$target
,
$code
) =
@_
;
$onesixty_constraint
//=
Moose::Util::TypeConstraints::find_type_constraint(
'NBTypes::DHT::NodeID'
);
$target
=
$onesixty_constraint
->coerce(
$target
);
Scalar::Util::weaken
$self
;
my
$quest
= [
$target
,
$code
,
[],
AE::timer(
0,
0.25 * 60,
sub
{
return
if
!
$self
;
for
my
$rt
(
$self
->ipv6_routing_table,
$self
->ipv4_routing_table)
{
for
my
$node
(@{
$rt
->nearest_bucket(
$target
)->nodes})
{
$node
->find_node(
$target
);
}
}
}
)
];
$self
->add_find_node_quest(
$quest
);
return
$quest
;
}
sub
_on_udp6_in {
my
(
$self
,
$sock
,
$sockaddr
,
$host
,
$port
,
$data
,
$flags
) =
@_
;
my
$packet
= bdecode
$data
;
if
( !
$packet
|| !
ref
$packet
||
ref
$packet
ne
'HASH'
|| !
keys
%$packet
)
{
$self
->_inc_recv_invalid_count;
$self
->_inc_recv_invalid_length(
length
$data
);
return
;
}
my
$node
=
$self
->ipv6_routing_table->find_node_by_sockaddr(
$sockaddr
);
if
(!
defined
$node
) {
$node
=
Net::BitTorrent::Protocol::BEP05::Node->new(
host
=>
$host
,
port
=>
$port
,
routing_table
=>
$self
->ipv6_routing_table,
sockaddr
=>
$sockaddr
);
}
}
sub
_on_udp4_in {
my
(
$self
,
$sock
,
$sockaddr
,
$host
,
$port
,
$data
,
$flags
) =
@_
;
my
$packet
= bdecode
$data
;
if
( !
$packet
|| !
ref
$packet
||
ref
$packet
ne
'HASH'
|| !
keys
%$packet
|| !
defined
$packet
->{
'y'
})
{
$self
->_inc_recv_invalid_count;
$self
->_inc_recv_invalid_length(
length
$data
);
return
;
}
my
$node
=
$self
->ipv4_routing_table->find_node_by_sockaddr(
$sockaddr
);
if
(!
defined
$node
) {
$node
=
Net::BitTorrent::Protocol::BEP05::Node->new(
host
=>
$host
,
port
=>
$port
,
routing_table
=>
$self
->ipv4_routing_table,
sockaddr
=>
$sockaddr
);
}
return
$node
->routing_table->del_node(
$node
)
if
$node
->has_nodeid
&& (
$node
->nodeid->Lexicompare(
$self
->nodeid) == 0);
$node
->touch;
if
(
$packet
->{
'y'
} eq
'r'
) {
if
(
defined
$packet
->{
'r'
}) {
if
(
$node
->is_expecting(
$packet
->{
't'
})) {
$self
->_inc_recv_replies_count;
$self
->_inc_recv_replies_length(
length
$data
);
$node
->_v(
$packet
->{
'v'
})
if
!
$node
->_has_v &&
defined
$packet
->{
'v'
};
my
$req
=
$node
->del_request(
$packet
->{
't'
});
$req
->{
'cb'
}->(
$packet
,
$host
,
$port
)
if
defined
$req
->{
'cb'
};
my
$type
=
$req
->{
'type'
};
$node
->_nodeid(
$packet
->{
'r'
}{
'id'
})
if
!
$node
->has_nodeid;
if
(
$type
eq
'ping'
) {
}
elsif
(
$type
eq
'find_node'
) {
my
(
$quest
) =
$self
->grep_find_node_quests(
sub
{
defined
$_
&&
$req
->{
'target'
}->equal(
$_
->[0]);
}
);
return
if
!
defined
$quest
;
my
@nodes
=
map
{
Net::BitTorrent::Protocol::BEP23::Compact::uncompact_ipv4(
$_
)
}
ref
$packet
->{
'r'
}{
'nodes'
}
? @{
$packet
->{
'r'
}{
'nodes'
}}
:
$packet
->{
'r'
}{
'nodes'
};
{
my
%seen
= ();
@{
$quest
->[2]}
=
grep
{ !
$seen
{
$_
->[0]}{
$_
->[1]}++ }
@{
$quest
->[2]},
@nodes
;
}
$self
->ipv4_add_node(
$_
)
for
@nodes
;
$quest
->[1]->(
$quest
->[0],
$node
, \
@nodes
);
}
elsif
(
$type
eq
'get_peers'
) {
if
(!(
defined
$packet
->{
'r'
}{
'nodes'
}
||
defined
$packet
->{
'r'
}{
'values'
}
)
)
{
...;
}
if
(
defined
$packet
->{
'r'
}{
'nodes'
}) {
for
my
$new_node
(
Net::BitTorrent::Protocol::BEP23::Compact::uncompact_ipv4(
$packet
->{
'r'
}{
'nodes'
}
)
)
{
$new_node
=
$self
->ipv4_add_node(
$new_node
);
$new_node
->get_peers(
$req
->{
'info_hash'
})
if
$new_node
;
}
if
(
defined
$packet
->{
'r'
}{
'values'
}) {
my
(
$quest
) =
$self
->grep_get_peers_quests(
sub
{
defined
$_
&&
$req
->{
'info_hash'
}
->equal(
$_
->[0]);
}
);
return
if
!
defined
$quest
;
require
Net::BitTorrent::Protocol::BEP23::Compact;
my
@peers
=
map
{
Net::BitTorrent::Protocol::BEP23::Compact::uncompact_ipv4(
$_
)
}
ref
$packet
->{
'r'
}{
'values'
}
? @{
$packet
->{
'r'
}{
'values'
}}
:
$packet
->{
'r'
}{
'values'
};
{
my
%seen
= ();
@{
$quest
->[2]}
=
grep
{ !
$seen
{
$_
->[0]}{
$_
->[1]}++ }
@{
$quest
->[2]},
@peers
;
}
$quest
->[1]
->(
$req
->{
'info_hash'
},
$node
, \
@peers
);
}
if
(
defined
$packet
->{
'r'
}{
'token'
})
{
$node
->_set_announce_peer_token_in(
$req
->{
'info_hash'
}->to_Hex,
$packet
->{
'r'
}{
'token'
});
}
}
}
elsif
(
$type
eq
'announce_peer'
) {
my
(
$quest
) =
$self
->grep_announce_peer_quests(
sub
{
defined
$_
&&
$req
->{
'info_hash'
}->equal(
$_
->[0]);
}
);
return
if
!
defined
$quest
;
push
@{
$quest
->[3]}, [
$node
->host,
$node
->port];
$quest
->[1]
->(
$req
->{
'info_hash'
},
$node
,
$quest
->[2]);
$node
->get_prev_get_peers(0)
if
$node
->defined_prev_get_peers(
$req
->{
'info_hash'
}
);
}
else
{
warn
sprintf
'%s:%d'
,
$node
->host,
$node
->port;
}
}
else
{
$node
->inc_fail;
$self
->_inc_recv_invalid_count;
$self
->_inc_recv_invalid_length(
length
$data
);
}
}
}
elsif
(
$packet
->{
'y'
} eq
'q'
&&
defined
$packet
->{
'a'
}) {
$self
->_inc_recv_requests_count;
$self
->_inc_recv_requests_length(
length
$data
);
my
$type
=
$packet
->{
'q'
};
$node
->_nodeid(
$packet
->{
'a'
}{
'id'
})
if
!
$node
->has_nodeid;
if
(
$type
eq
'ping'
&&
defined
$packet
->{
't'
}) {
return
$node
->_reply_ping(
$packet
->{
't'
});
}
elsif
(
$type
eq
'get_peers'
&&
defined
$packet
->{
'a'
}{
'info_hash'
})
{
$onesixty_constraint
//=
Moose::Util::TypeConstraints::find_type_constraint(
'NBTypes::DHT::NodeID'
);
return
$node
->_reply_get_peers(
$packet
->{
't'
},
$onesixty_constraint
->coerce(
$packet
->{
'a'
}{
'info_hash'
})
);
}
elsif
(
$type
eq
'find_node'
&&
defined
$packet
->{
'a'
}{
'target'
})
{
$onesixty_constraint
//=
Moose::Util::TypeConstraints::find_type_constraint(
'NBTypes::DHT::NodeID'
);
return
$node
->_reply_find_node(
$packet
->{
't'
},
$onesixty_constraint
->coerce(
$packet
->{
'a'
}{
'target'
}));
}
elsif
(
$type
eq
'announce_peer'
&&
defined
$packet
->{
'a'
}{
'info_hash'
})
{
$onesixty_constraint
//=
Moose::Util::TypeConstraints::find_type_constraint(
'NBTypes::DHT::NodeID'
);
return
$node
->_reply_announce_peer(
$packet
->{
't'
},
$onesixty_constraint
->coerce(
$packet
->{
'a'
}{
'info_hash'
}),
$packet
->{
'a'
},
);
}
else
{
...;
}
}
elsif
(
$packet
->{
'y'
} eq
'q'
&&
defined
$packet
->{
'a'
}) {
warn
sprintf
'Error from %s:%d'
,
$node
->host,
$node
->port;
}
else
{
warn
sprintf
'%s:%d'
,
$node
->host,
$node
->port;
}
}
sub
dump_ipv4_buckets {
my
@return
= _dump_buckets(
$_
[0],
$_
[0]->ipv4_routing_table());
return
wantarray
?
@return
:
sub
{
say
$_
for
@_
}
->(
@return
);
}
sub
dump_ipv6_buckets {
my
@return
= _dump_buckets(
$_
[0],
$_
[0]->ipv6_routing_table());
return
wantarray
?
@return
:
sub
{
say
$_
for
@_
}
->(
@return
);
}
sub
_dump_buckets {
my
(
$self
,
$routing_table
) =
@_
;
my
@return
=
sprintf
'Num buckets: %d. My DHT ID: %s'
,
$routing_table
->count_buckets,
$self
->nodeid->to_Hex;
my
(
$x
,
$t_primary
,
$t_backup
) = (0, 0, 0);
for
my
$bucket
(@{
$routing_table
->buckets}) {
push
@return
,
sprintf
'Bucket %s: %s (replacement cache: %d)'
,
$x
++,
$bucket
->floor->to_Hex,
$bucket
->count_backup_nodes;
for
my
$node
(@{
$bucket
->nodes}) {
push
@return
,
sprintf
' %s %s:%d fail:%d seen:%d age:%s ver:%s'
,
$node
->nodeid->to_Hex,
$node
->host,
$node
->port,
$node
->fail || 0,
$node
->seen,
__duration(
time
-
$node
->birth),
$node
->v ||
'?'
;
}
$t_primary
+=
$bucket
->count_nodes;
$t_backup
+=
$bucket
->count_backup_nodes;
}
push
@return
,
sprintf
'Total peers: %d (in replacement cache %d)'
,
$t_primary
+
$t_backup
,
$t_backup
;
push
@return
,
sprintf
'Outstanding add nodes: %d'
,
scalar
$routing_table
->outstanding_add_nodes;
push
@return
,
sprintf
'Received: %d requests (%s), %d replies (%s), %d invalid (%s)'
,
$self
->_recv_requests_count,
__data(
$self
->_recv_requests_length),
$self
->_recv_replies_count,
__data(
$self
->_recv_replies_length),
$self
->_recv_invalid_count,
__data(
$self
->_recv_invalid_length);
push
@return
,
sprintf
'Sent: %d requests (%s), %d replies (%s)'
,
$self
->_send_requests_count,
__data(
$self
->_send_requests_length),
$self
->_send_replies_count,
__data(
$self
->_send_replies_length);
return
@return
;
}
sub
__duration ($) {
my
%dhms
= (
d
=>
int
(
$_
[0] / (24 * 60 * 60)),
h
=> (
$_
[0] / (60 * 60)) % 24,
m
=> (
$_
[0] / 60) % 60,
s
=>
$_
[0] % 60
);
return
join
' '
,
map
{
$dhms
{
$_
} ?
$dhms
{
$_
} .
$_
: () }
sort
keys
%dhms
;
}
sub
__data($) {
$_
[0] >= 1073741824 ?
sprintf
(
'%0.2f GB'
,
$_
[0] / 1073741824)
:
$_
[0] >= 1048576 ?
sprintf
(
'%0.2f MB'
,
$_
[0] / 1048576)
:
$_
[0] >= 1024 ?
sprintf
(
'%0.2f KB'
,
$_
[0] / 1024)
:
$_
[0] .
' bytes'
;
}
}
1;