our
@ISA
=
qw()
;
our
$timer_resolution
;
BEGIN {
eval
{
$timer_resolution
= Time::HiRes->can(
'clock_getres'
)
? Time::HiRes::clock_getres(Time::HiRes::CLOCK_REALTIME())
: 0.001;
1;
} or
do
{
$timer_resolution
= 1;
};
}
sub
new {
my
$class
=
shift
;
$class
=
ref
(
$class
) ||
$class
;
my
(
$main
) =
@_
;
my
$self
= {
main
=>
$main
,
queries_started
=> 0,
queries_completed
=> 0,
pending_lookups
=> { },
pending_rules
=> { },
rules_for_key
=> { },
timing_by_query
=> { },
all_lookups
=> { },
};
bless
(
$self
,
$class
);
$self
;
}
sub
start_queue {
my
(
$self
) =
@_
;
$self
->{wait_queue} = 1;
}
sub
launch_queue {
my
(
$self
) =
@_
;
delete
$self
->{wait_queue};
if
(
$self
->{bgsend_queue}) {
dbg(
"async: launching queued lookups"
);
foreach
(@{
$self
->{bgsend_queue}}) {
$self
->bgsend_and_start_lookup(
@$_
);
}
delete
$self
->{bgsend_queue};
}
}
sub
bgsend_and_start_lookup {
my
$self
=
shift
;
my
(
$domain
,
$type
,
$class
,
$ent
,
$cb
,
%options
) =
@_
;
return
if
$self
->{main}->{resolver}->{no_resolver};
if
(
$self
->{wait_queue}) {
push
@{
$self
->{bgsend_queue}}, [
@_
];
dbg(
"async: DNS priority not reached, queueing lookup: $domain/$type"
);
return
$ent
;
}
if
(!
defined
$ent
->{rulename} && !
$self
->{rulename_warned}++) {
my
(
$package
,
$filename
,
$line
) =
caller
;
warn
"async: bgsend_and_start_lookup called without rulename, "
.
"from $package ($filename) line $line. You are likely using "
.
"a plugin that is not compatible with SpamAssasin 4.0.0."
;
}
$domain
=~ s/\.+\z//s;
$domain
= idn_to_ascii(
$domain
);
if
(utf8::is_utf8(
$domain
)) {
utf8::encode(
$domain
);
my
(
$package
,
$filename
,
$line
) =
caller
;
info(
"bgsend_and_start_lookup: Unicode domain name, expected octets: %s, "
.
"called from %s line %d"
,
$domain
,
$package
,
$line
);
}
elsif
(
$domain
=~
tr
/\x00-\x7F//c) {
my
(
$package
,
$filename
,
$line
) =
caller
;
info(
"bgsend_and_start_lookup: non-ASCII domain name: %s, "
.
"called from %s line %d"
,
$domain
,
$package
,
$line
);
}
my
$dnskey
=
uc
(
$type
).
'/'
.
lc
(
$domain
);
my
$dns_query_info
=
$self
->{all_lookups}{
$dnskey
};
$ent
= {}
if
!
$ent
;
$ent
->{id} =
undef
;
my
$key
=
$ent
->{key} =
$dnskey
;
$ent
->{query_type} =
$type
;
$ent
->{query_domain} =
$domain
;
$ent
->{type} =
$type
if
!
exists
$ent
->{type};
$ent
->{zone} =
$domain
if
!
exists
$ent
->{zone};
$cb
=
$ent
->{completed_callback}
if
!
$cb
;
my
@rulenames
=
grep
{
defined
} (
ref
$ent
->{rulename} ?
@{
$ent
->{rulename}} :
$ent
->{rulename});
$self
->{rules_for_key}->{
$key
}{
$_
} = 1
foreach
(
@rulenames
);
if
(
$dns_query_info
) {
my
$id
=
$ent
->{id} =
$dns_query_info
->{id};
return
if
!
defined
$id
;
my
$id_tail
=
$id
;
$id_tail
=~ s{^\d+/IN/}{};
lc
(
$id_tail
) eq
lc
(
$dnskey
)
or info(
"async: unmatched id %s, key=%s"
,
$id
,
$dnskey
);
my
$pkt
=
$dns_query_info
->{pkt};
if
(!
$pkt
) {
push
(@{
$dns_query_info
->{applicants}}, [
$ent
,
$cb
]);
$self
->{pending_rules}->{
$_
}{
$key
} = 1
foreach
(
@rulenames
);
dbg(
"async: query %s already underway, adding no.%d, rules: %s"
,
$id
,
scalar
@{
$dns_query_info
->{applicants}},
join
(
", "
,
@rulenames
));
}
else
{
delete
$self
->{pending_rules}->{
$_
}{
$key
}
foreach
(
@rulenames
);
if
(!
$cb
) {
dbg(
"async: query %s already done, re-using for %s, rules: %s"
,
$id
,
$key
,
join
(
", "
,
@rulenames
));
}
else
{
dbg(
"async: query %s already done, re-using for %s, callback, rules: %s"
,
$id
,
$key
,
join
(
", "
,
@rulenames
));
eval
{
$cb
->(
$ent
,
$pkt
); 1;
} or
do
{
chomp
$@;
die
"async: (1) $@\n"
if
$@ =~ /__alarm__ignore__\(.*\)/s;
warn
sprintf
(
"async: query %s completed, callback %s failed: %s\n"
,
$id
,
$key
, $@);
};
}
}
}
else
{
$dns_query_info
=
$self
->{all_lookups}{
$dnskey
} = {};
my
(
$id
,
$blocked
,
$check_dbrdom
);
my
$blocked_by
=
'dns_query_restriction'
;
my
$dns_query_blockages
=
$self
->{main}->{conf}->{dns_query_blocked};
my
$dns_block_domains
=
$self
->{main}->{conf}->{dns_block_rule_domains};
if
(
$dns_query_blockages
||
$dns_block_domains
) {
my
$search_list
= domain_to_search_list(
$domain
);
foreach
my
$parent_domain
((
@$search_list
,
'*'
)) {
if
(
$dns_query_blockages
) {
$blocked
=
$dns_query_blockages
->{
$parent_domain
};
last
if
defined
$blocked
;
}
if
(
$parent_domain
ne
'*'
&&
exists
$dns_block_domains
->{
$parent_domain
}) {
$check_dbrdom
=
$dns_block_domains
->{
$parent_domain
};
}
}
}
if
(!
$blocked
&&
$check_dbrdom
) {
my
$blockfile
=
$self
->{main}->sed_path(
"__global_state_dir__/dnsblock_${check_dbrdom}"
);
if
(
my
$mtime
= (
stat
(
$blockfile
))[9]) {
if
(
time
-
$mtime
<=
$self
->{main}->{conf}->{dns_block_time}) {
$blocked
= 1;
$blocked_by
=
'dns_block_rule'
;
}
else
{
dbg(
"async: dns_block_rule removing expired $blockfile"
);
unlink
(
$blockfile
);
}
}
}
if
(
$blocked
) {
dbg(
"async: blocked by %s: %s, rules: %s"
,
$blocked_by
,
$dnskey
,
join
(
", "
,
@rulenames
));
}
else
{
dbg(
"async: launching %s, rules: %s"
,
$dnskey
,
join
(
", "
,
@rulenames
));
$id
=
$self
->{main}->{resolver}->bgsend(
$domain
,
$type
,
$class
,
sub
{
my
(
$pkt
,
$pkt_id
,
$timestamp
) =
@_
;
if
(
$pkt_id
ne
$id
) {
warn
"async: mismatched dns id: got $pkt_id, expected $id\n"
;
return
;
}
$self
->set_response_packet(
$pkt_id
,
$pkt
,
$ent
->{key},
$timestamp
);
$dns_query_info
->{pkt} =
$pkt
;
my
$cb_count
= 0;
foreach
my
$tuple
(@{
$dns_query_info
->{applicants}}) {
my
(
$appl_ent
,
$appl_cb
) =
@$tuple
;
my
@rulenames
=
grep
{
defined
} (
ref
$appl_ent
->{rulename} ?
@{
$appl_ent
->{rulename}} :
$appl_ent
->{rulename});
foreach
(
@rulenames
) {
delete
$self
->{pending_rules}->{
$_
}{
$appl_ent
->{key}};
}
if
(
$appl_cb
) {
dbg(
"async: calling callback on key %s, rules: %s"
,
$key
,
join
(
", "
,
@rulenames
));
$cb_count
++;
eval
{
$appl_cb
->(
$appl_ent
,
$pkt
); 1;
} or
do
{
chomp
$@;
die
"async: (2) $@\n"
if
$@ =~ /__alarm__ignore__\(.*\)/s;
warn
sprintf
(
"async: query %s completed, callback %s failed: %s\n"
,
$id
,
$appl_ent
->{key}, $@);
};
}
}
delete
$dns_query_info
->{applicants};
dbg(
"async: query $id completed, no callbacks run"
)
if
!
$cb_count
;
});
}
return
if
!
defined
$id
;
$dns_query_info
->{id} =
$ent
->{id} =
$id
;
push
(@{
$dns_query_info
->{applicants}}, [
$ent
,
$cb
]);
$self
->{pending_rules}->{
$_
}{
$key
} = 1
foreach
(
@rulenames
);
$self
->_start_lookup(
$ent
,
$options
{master_deadline});
}
return
$ent
;
}
sub
start_lookup {
my
$self
=
shift
;
if
(!
$self
->{start_lookup_warned}++) {
my
(
$package
,
$filename
,
$line
) =
caller
;
warn
"async: deprecated start_lookup called, "
.
"from $package ($filename) line $line. You are likely using "
.
"a plugin that is not compatible with SpamAssasin 4.0.0."
;
}
return
if
$self
->{main}->{resolver}->{no_resolver};
$self
->_start_lookup(
@_
);
}
sub
_start_lookup {
my
(
$self
,
$ent
,
$master_deadline
) =
@_
;
my
$id
=
$ent
->{id};
my
$key
=
$ent
->{key};
defined
$id
&&
$id
ne
''
or
die
"oops, no id"
;
$key
or
die
"oops, no key"
;
$ent
->{type} or
die
"oops, no type"
;
my
$now
=
time
;
$ent
->{start_time} =
$now
if
!
defined
$ent
->{start_time};
my
$zone
=
$ent
->{zone};
my
$settings
;
my
$conf_by_zone
=
$self
->{main}->{conf}->{by_zone};
if
(
defined
$zone
&&
$conf_by_zone
) {
$zone
=~ s/^\.//;
$zone
=~ s/\.\z//;
for
(;;) {
if
(
exists
$conf_by_zone
->{
$zone
}) {
$settings
=
$conf_by_zone
->{
$zone
};
last
;
}
elsif
(
$zone
eq
''
) {
last
;
}
else
{
$zone
= (
$zone
=~ /^( (?: [^.] | \[ (?: \\. | [^\]\\] )* \] )* )
\. (.*) \z/xs) ? $2 :
''
;
}
}
}
dbg(
"async: applying by_zone settings for %s"
,
$zone
)
if
$settings
;
my
$t_init
=
$ent
->{timeout_initial};
$t_init
=
$settings
->{rbl_timeout}
if
$settings
&& !
defined
$t_init
;
$t_init
=
$self
->{main}->{conf}->{rbl_timeout}
if
!
defined
$t_init
;
$t_init
= 0
if
!
defined
$t_init
;
my
$t_end
=
$ent
->{timeout_min};
$t_end
=
$settings
->{rbl_timeout_min}
if
$settings
&& !
defined
$t_end
;
$t_end
=
$self
->{main}->{conf}->{rbl_timeout_min}
if
!
defined
$t_end
;
$t_end
= 0.2 *
$t_init
if
!
defined
$t_end
;
$t_end
= 0
if
$t_end
< 0;
$t_init
=
$t_end
if
$t_init
<
$t_end
;
my
$clipped_by_master_deadline
= 0;
if
(
defined
$master_deadline
) {
my
$time_avail
=
$master_deadline
-
time
;
$time_avail
= 0.5
if
$time_avail
< 0.5;
if
(
$t_init
>
$time_avail
) {
$t_init
=
$time_avail
;
$clipped_by_master_deadline
= 1;
$t_end
=
$time_avail
if
$t_end
>
$time_avail
;
}
}
$ent
->{timeout_initial} =
$t_init
;
$ent
->{timeout_min} =
$t_end
;
my
@rulenames
=
grep
{
defined
} (
ref
$ent
->{rulename} ?
@{
$ent
->{rulename}} :
$ent
->{rulename});
$ent
->{display_id} =
join
(
", "
,
grep
{
defined
}
map
{
$ent
->{
$_
} }
qw(type key)
);
$self
->{pending_lookups}->{
$key
} =
$ent
;
$self
->{queries_started}++;
dbg(
"async: starting: %s%s (timeout %.1fs, min %.1fs)%s"
,
@rulenames
?
join
(
", "
,
@rulenames
).
", "
:
''
,
$ent
->{display_id},
$ent
->{timeout_initial},
$ent
->{timeout_min},
!
$clipped_by_master_deadline
?
''
:
', capped by time limit'
);
$ent
;
}
sub
get_lookup {
my
(
$self
,
$key
) =
@_
;
warn
(
"async: deprecated get_lookup function used\n"
);
return
$self
->{pending_lookups}->{
$key
};
}
sub
log_lookups_timing {
my
(
$self
) =
@_
;
my
$timings
=
$self
->{timing_by_query};
for
my
$key
(
sort
{
$timings
->{
$a
} <=>
$timings
->{
$b
} }
keys
%$timings
) {
dbg(
"async: timing: %.3f %s"
,
$timings
->{
$key
},
$key
);
}
}
sub
complete_lookups {
my
(
$self
,
$timeout
,
$allow_aborting_of_expired
) =
@_
;
my
$alldone
= 0;
my
$anydone
= 0;
my
$allexpired
= 1;
my
%typecount
;
my
$pending
=
$self
->{pending_lookups};
my
$now
=
time
;
if
(
defined
$timeout
&&
$timeout
> 0 &&
%$pending
&&
$self
->{queries_started} > 0)
{
my
$r
=
$self
->{queries_completed} /
$self
->{queries_started};
my
$r2
=
$r
*
$r
;
my
$max_deadline
;
while
(
my
(
$key
,
$ent
) =
each
%$pending
) {
my
$t_init
=
$ent
->{timeout_initial};
my
$dt
=
$t_init
- (
$t_init
-
$ent
->{timeout_min}) *
$r2
;
my
$deadline
=
$ent
->{start_time} +
$dt
;
$max_deadline
=
$deadline
if
!
defined
$max_deadline
||
$deadline
>
$max_deadline
;
}
if
(
defined
$max_deadline
) {
$max_deadline
= 1 +
int
$max_deadline
if
$timer_resolution
== 1 &&
$max_deadline
>
int
$max_deadline
;
my
$sufficient_timeout
=
$max_deadline
-
$now
;
$sufficient_timeout
= 0
if
$sufficient_timeout
< 0;
if
(
$timeout
>
$sufficient_timeout
) {
dbg(
"async: reducing select timeout from %.1f to %.1f s"
,
$timeout
,
$sufficient_timeout
);
$timeout
=
$sufficient_timeout
;
}
}
}
eval
{
if
(
%$pending
) {
$self
->{last_poll_responses_time} =
$now
;
my
(
$nfound
,
$ncb
) =
$self
->{main}->{resolver}->poll_responses(
$timeout
);
dbg(
"async: select found %d responses ready (t.o.=%.1f), did %d callbacks"
,
$nfound
,
$timeout
,
$ncb
);
}
$now
=
time
;
while
(
my
(
$key
,
$ent
) =
each
%$pending
) {
my
$id
=
$ent
->{id};
if
(
exists
$self
->{finished}->{
$id
}) {
delete
$self
->{finished}->{
$id
};
$anydone
= 1;
$ent
->{finish_time} =
$now
if
!
defined
$ent
->{finish_time};
my
$elapsed
=
$ent
->{finish_time} -
$ent
->{start_time};
my
@rulenames
=
keys
%{
$self
->{rules_for_key}->{
$key
}};
dbg(
"async: completed in %.3f s: %s, rules: %s"
,
$elapsed
,
$ent
->{display_id},
join
(
", "
,
@rulenames
));
$self
->{timing_by_query}->{
". $key ($ent->{type})"
} +=
$elapsed
;
$self
->{queries_completed}++;
delete
$pending
->{
$key
};
}
}
if
(
%$pending
) {
my
$r
=
!
$allow_aborting_of_expired
|| !
$self
->{queries_started} ? 1.0
:
$self
->{queries_completed} /
$self
->{queries_started};
my
$r2
=
$r
*
$r
;
while
(
my
(
$key
,
$ent
) =
each
%$pending
) {
$typecount
{
$ent
->{type}}++;
my
$t_init
=
$ent
->{timeout_initial};
my
$dt
=
$t_init
- (
$t_init
-
$ent
->{timeout_min}) *
$r2
;
$dt
= 1 +
int
$dt
if
$timer_resolution
== 1 &&
$dt
>
int
$dt
;
$allexpired
= 0
if
$now
<=
$ent
->{start_time} +
$dt
;
}
}
if
(!
%$pending
) {
$alldone
= 1;
}
elsif
(
$allexpired
&&
$allow_aborting_of_expired
) {
dbg(
"async: escaping: lost or timed out requests or responses"
);
$self
->abort_remaining_lookups();
$alldone
= 1;
}
else
{
dbg(
"async: queries still pending: %s%s"
,
join
(
' '
,
map
{
"$_=$typecount{$_}"
}
sort
keys
%typecount
),
$allexpired
?
', all expired'
:
''
);
$alldone
= 0;
}
1;
} or
do
{
my
$eval_stat
= $@ ne
''
? $@ :
"errno=$!"
;
chomp
$eval_stat
;
die
"async: (3) $eval_stat\n"
if
$eval_stat
=~ /__alarm__ignore__\(.*\)/s;
dbg(
"async: caught complete_lookups death, aborting: %s"
,
$eval_stat
);
$alldone
= 1;
};
return
wantarray
? (
$alldone
,
$anydone
) :
$alldone
;
}
sub
abort_remaining_lookups {
my
(
$self
) =
@_
;
my
$pending
=
$self
->{pending_lookups};
my
$foundcnt
= 0;
my
$now
=
time
;
$self
->{pending_rules} = {};
while
(
my
(
$key
,
$ent
) =
each
%$pending
) {
my
$dur
=
$now
-
$ent
->{start_time};
my
@rulenames
=
keys
%{
$self
->{rules_for_key}->{
$key
}};
my
$msg
=
sprintf
(
"async: aborting after %.3f s, %s: %s, rules: %s"
,
$dur
,
(
defined
$ent
->{timeout_initial} &&
$now
>
$ent
->{start_time} +
$ent
->{timeout_initial}
?
'past original deadline'
:
'deadline shrunk'
),
$ent
->{display_id},
join
(
", "
,
@rulenames
) );
$dur
> 1 ? info(
$msg
) : dbg(
$msg
);
$foundcnt
++;
$self
->{timing_by_query}->{
"X $key"
} =
$dur
;
$ent
->{finish_time} =
$now
if
!
defined
$ent
->{finish_time};
delete
$pending
->{
$key
};
}
my
$all_lookups_ref
=
$self
->{all_lookups};
foreach
my
$dnskey
(
keys
%$all_lookups_ref
) {
my
$dns_query_info
=
$all_lookups_ref
->{
$dnskey
};
my
$cb_count
= 0;
foreach
my
$tuple
(@{
$dns_query_info
->{applicants}}) {
my
(
$ent
,
$cb
) =
@$tuple
;
if
(
$cb
) {
my
@rulenames
=
grep
{
defined
} (
ref
$ent
->{rulename} ?
@{
$ent
->{rulename}} :
$ent
->{rulename});
dbg(
"async: calling callback/abort on key %s, rules: %s"
,
$dnskey
,
join
(
", "
,
@rulenames
));
$cb_count
++;
eval
{
$cb
->(
$ent
,
undef
); 1;
} or
do
{
chomp
$@;
die
"async: (2) $@\n"
if
$@ =~ /__alarm__ignore__\(.*\)/s;
warn
sprintf
(
"async: query %s aborted, callback %s failed: %s\n"
,
$dnskey
,
$ent
->{key}, $@);
};
}
dbg(
"async: query $dnskey aborted, no callbacks run"
)
if
!
$cb_count
;
}
delete
$dns_query_info
->{applicants};
}
dbg(
"async: aborted %d remaining lookups"
,
$foundcnt
)
if
$foundcnt
> 0;
delete
$self
->{last_poll_responses_time};
$self
->{main}->{resolver}->bgabort();
1;
}
sub
set_response_packet {
my
(
$self
,
$id
,
$pkt
,
$key
,
$timestamp
) =
@_
;
$self
->{finished}->{
$id
} = 1;
$timestamp
=
time
if
!
defined
$timestamp
;
my
$pending
=
$self
->{pending_lookups};
if
(!
defined
$key
) {
if
(
$id
eq
$pending
->{
$id
}->{id}) {
$key
=
$id
;
}
else
{
for
my
$tkey
(
keys
%$pending
) {
if
(
$id
eq
$pending
->{
$tkey
}->{id}) {
$key
=
$tkey
;
last
}
}
}
dbg(
"async: got response on id $id, search found key $key"
);
}
if
(!
defined
$key
) {
info(
"async: no key, response packet not remembered, id $id"
);
}
else
{
my
$ent
=
$pending
->{
$key
};
my
$ent_id
=
$ent
->{id};
if
(!
defined
$ent_id
) {
info(
"async: ignoring response, id %s, ent_id is undef: %s"
,
$id
,
join
(
', '
,
%$ent
));
}
elsif
(
$id
ne
$ent_id
) {
info(
"async: ignoring response, mismatched id $id, expected $ent_id"
);
}
else
{
$ent
->{finish_time} =
$timestamp
;
$ent
->{response_packet} =
$pkt
;
}
}
1;
}
sub
report_id_complete {
my
(
$self
,
$id
,
$key
,
$timestamp
) =
@_
;
$self
->set_response_packet(
$id
,
undef
,
$key
,
$timestamp
);
}
sub
last_poll_responses_time {
my
(
$self
) =
@_
;
return
$self
->{last_poll_responses_time};
}
1;