our
$VERSION
= 0.09;
our
@ISA
=
qw( Mail::SpamAssassin::BayesStore )
;
sub
new {
my
$class
=
shift
;
$class
=
ref
(
$class
) ||
$class
;
my
$self
=
$class
->SUPER::new(
@_
);
my
$bconf
=
$self
->{bayes}->{conf};
foreach
(
split
(
';'
,
$bconf
->{bayes_sql_dsn})) {
my
(
$a
,
$b
) =
split
(/=/,
$_
, 2);
if
(!
defined
$b
) {
warn
(
"bayes: invalid bayes_sql_dsn config\n"
);
return
;
}
elsif
(
$a
eq
'database'
) {
$self
->{db_id} =
$b
;
}
elsif
(
$a
eq
'password'
) {
$self
->{password} =
$b
;
}
else
{
push
@{
$self
->{redis_conf}},
$a
=>
$b
eq
'undef'
?
undef
: untaint_var(
$b
);
}
}
if
(!
$bconf
->{bayes_auto_expire}) {
$self
->{expire_token} =
$self
->{expire_seen} =
undef
;
warn
(
"bayes: the setting bayes_auto_expire is off, this is "
.
"not a recommended setting for the Redis bayes backend"
);
}
else
{
$self
->{expire_token} =
$bconf
->{bayes_token_ttl};
undef
$self
->{expire_token}
if
$self
->{expire_token} &&
$self
->{expire_token} < 0;
$self
->{expire_seen} =
$bconf
->{bayes_seen_ttl};
undef
$self
->{expire_seen}
if
$self
->{expire_seen} &&
$self
->{expire_seen} < 0;
}
$self
->{supported_db_version} = 3;
$self
->{connected} = 0;
$self
->{is_officially_open} = 0;
$self
->{is_writable} = 0;
$self
->{timer} = Mail::SpamAssassin::Timeout->new({
secs
=>
$self
->{conf}->{redis_timeout} || 10
});
return
$self
;
}
sub
disconnect {
my
(
$self
) =
@_
;
local
($@, $!);
if
(
$self
->{connected}) {
dbg(
"bayes: Redis disconnect"
);
$self
->{connected} = 0;
$self
->{redis}->disconnect;
}
undef
$self
->{redis};
}
sub
DESTROY {
my
(
$self
) =
@_
;
local
($@, $!,
$_
);
dbg(
"bayes: Redis destroy"
);
$self
->{connected} = 0;
undef
$self
->{redis};
}
sub
on_connect {
my
(
$r
,
$db_id
,
$pwd
) =
@_
;
$db_id
||= 0;
dbg(
"bayes: Redis on-connect, db_id %d"
,
$db_id
);
eval
{
$r
->call(
'SELECT'
,
$db_id
) eq
'OK'
? 1 : 0;
} or
do
{
if
($@ =~ /^NOAUTH\b/ || $@ =~ /^ERR operation not permitted/) {
defined
$pwd
or
die
"Redis server requires authentication, no password provided"
;
$r
->call(
'AUTH'
,
$pwd
);
$r
->call(
'SELECT'
,
$db_id
);
}
else
{
chomp
$@;
die
"Command 'SELECT $db_id' failed: $@"
;
}
};
eval
{
$r
->call(
'CLIENT'
,
'SETNAME'
,
'sa['
.$$.
']'
);
} or
do
{
dbg(
"bayes: CLIENT SETNAME command failed, don't worry, "
.
"possibly an old redis version: %s"
, $@);
};
1;
}
sub
connect
{
my
(
$self
) =
@_
;
$self
->disconnect
if
$self
->{connected};
undef
$self
->{redis};
my
$err
=
$self
->{timer}->run_and_catch(
sub
{
$self
->{opened_from_pid} = $$;
my
$db_id
=
$self
->{db_id};
my
$pwd
=
$self
->{password};
$self
->{redis} = Mail::SpamAssassin::Util::TinyRedis->new(
@{
$self
->{redis_conf}},
on_connect
=>
sub
{ on_connect(
$_
[0],
$db_id
,
$pwd
) });
$self
->{redis} or
die
"Error: $!"
;
});
if
(
$self
->{timer}->timed_out()) {
undef
$self
->{redis};
die
"bayes: Redis connection timed out!"
;
}
elsif
(
$err
) {
undef
$self
->{redis};
die
"bayes: Redis failed: $err"
;
}
$self
->{connected} = 1;
}
sub
prefork_init {
my
(
$self
) =
@_
;
if
(
$self
->{connected}) {
dbg(
"bayes: prefork_init, closing a session "
.
"with a Redis server in a parent process"
);
$self
->untie_db;
$self
->disconnect;
}
}
sub
spamd_child_init {
my
(
$self
) =
@_
;
if
(
$self
->{connected}) {
dbg(
"bayes: spamd_child_init, closing a parent's session "
.
"to a Redis server in a child process"
);
$self
->untie_db;
$self
->disconnect;
}
}
sub
tie_db_readonly {
my
(
$self
) =
@_
;
$self
->{is_writable} = 0;
my
$success
;
if
(
$self
->{connected}) {
$success
=
$self
->{is_officially_open} = 1;
}
else
{
$success
=
$self
->_open_db();
}
return
$success
;
}
sub
tie_db_writable {
my
(
$self
) =
@_
;
$self
->{is_writable} = 0;
my
$success
;
if
(
$self
->{connected}) {
$success
=
$self
->{is_officially_open} = 1;
}
else
{
$success
=
$self
->_open_db();
}
$self
->{is_writable} = 1
if
$success
;
return
$success
;
}
sub
_open_db {
my
(
$self
) =
@_
;
dbg(
"bayes: _open_db(%s)"
,
$self
->{connected} ?
'already connected'
:
'not yet connected'
);
if
(
$self
->{connected}) {
$self
->{is_officially_open} = 1;
return
1;
}
$self
->read_db_configs();
$self
->
connect
;
if
(!
defined
$self
->{redis_server_version}) {
my
$info
=
$self
->{info} =
$self
->{redis}->call(
"INFO"
);
if
(
defined
$info
) {
my
$redis_mem
;
local
$1;
$self
->{redis_server_version} =
$info
=~ /^redis_version:\s*(.*?)\r?$/m ? $1 :
''
;
$self
->{have_lua} =
$info
=~ /^used_memory_lua:/m ? 1 : 0;
$redis_mem
= $1
if
$info
=~ /^used_memory:\s*(.*?)\r?$/m;
dbg(
"bayes: redis server version %s, memory used %.1f MiB, Lua %s"
,
$self
->{redis_server_version},
$redis_mem
/1024/1024,
$self
->{have_lua} ?
'is available'
:
'is not available'
);
}
}
$self
->{db_version} =
$self
->{redis}->call(
'GET'
,
'v:DB_VERSION'
);
if
(!
$self
->{db_version}) {
$self
->{db_version} =
$self
->DB_VERSION;
my
$ret
=
$self
->{redis}->call(
'MSET'
,
'v:DB_VERSION'
,
$self
->{db_version},
'v:NSPAM'
, 0,
'v:NHAM'
, 0,
'v:TOKEN_FORMAT'
, 2 );
unless
(
$ret
) {
warn
(
"bayes: failed to initialize database"
);
return
0;
}
dbg(
"bayes: initialized empty database, version $self->{db_version}"
);
}
else
{
dbg(
"bayes: found bayes db version %s"
,
$self
->{db_version});
if
(
$self
->{db_version} ne
$self
->DB_VERSION) {
warn
(
"bayes: bayes db version $self->{db_version} not supported, aborting\n"
);
return
0;
}
my
$token_format
=
$self
->{redis}->call(
'GET'
,
'v:TOKEN_FORMAT'
) || 0;
if
(
$token_format
< 2) {
warn
(
"bayes: bayes old token format $token_format not supported, "
.
"consider backup/restore or initialize a database\n"
);
return
0;
}
}
if
(
$self
->{have_lua} && !
defined
$self
->{multi_hmget_script}) {
$self
->_define_lua_scripts;
}
$self
->{is_officially_open} = 1;
return
1;
}
sub
untie_db {
my
$self
=
shift
;
$self
->{is_officially_open} =
$self
->{is_writable} = 0;
return
;
}
sub
sync_due {
return
0;
}
sub
expiry_due {
return
0;
}
sub
seen_get {
my
(
$self
,
$msgid
) =
@_
;
return
$self
->{redis}->call(
'GET'
,
"s:$msgid"
);
}
sub
seen_put {
my
(
$self
,
$msgid
,
$flag
) =
@_
;
my
$r
=
$self
->{redis};
if
(
$self
->{expire_seen}) {
$r
->call(
'SETEX'
,
"s:$msgid"
,
$self
->{expire_seen},
$flag
);
}
else
{
$r
->call(
'SET'
,
"s:$msgid"
,
$flag
);
}
return
1;
}
sub
seen_delete {
my
(
$self
,
$msgid
) =
@_
;
$self
->{redis}->call(
'DEL'
,
"s:$msgid"
);
return
1;
}
sub
get_storage_variables {
my
(
$self
,
@varnames
) =
@_
;
@varnames
=
qw{LAST_JOURNAL_SYNC NSPAM NHAM NTOKENS LAST_EXPIRE
OLDEST_TOKEN_AGE DB_VERSION LAST_JOURNAL_SYNC
LAST_ATIME_DELTA LAST_EXPIRE_REDUCE NEWEST_TOKEN_AGE
TOKEN_FORMAT}
if
!
@varnames
;
my
$values
=
$self
->{redis}->call(
'MGET'
,
map
(
'v:'
.
$_
,
@varnames
));
return
if
!
$values
;
return
map
(
defined
$_
?
$_
: 0,
@$values
);
}
sub
get_running_expire_tok {
return
0;
}
sub
set_running_expire_tok {
return
0;
}
sub
remove_running_expire_tok {
return
1;
}
sub
tok_get {
my
(
$self
,
$token
) =
@_
;
my
$array
=
$self
->tok_get_all(
$token
);
return
if
!
$array
|| !
@$array
;
return
(@{
$array
->[0]})[1,2,3];
}
sub
tok_get_all {
my
$self
=
shift
;
my
@values
;
$self
->
connect
if
!
$self
->{connected};
my
$r
=
$self
->{redis};
if
(!
$self
->{have_lua} ) {
$r
->b_call(
'HMGET'
,
'w:'
.
$_
,
's'
,
'h'
)
for
@_
;
my
$results
=
$r
->b_results;
if
(
@$results
!=
@_
) {
$self
->disconnect;
die
sprintf
(
"bayes: tok_get_all got %d entries, expected %d\n"
,
scalar
@$results
,
scalar
@_
);
}
for
my
$j
(0 ..
$#$results
) {
my
(
$s
,
$h
) = @{
$results
->[
$j
]};
push
(
@values
, [
$_
[
$j
], (
$s
||0)+0, (
$h
||0)+0, 0])
if
$s
||
$h
;
}
}
else
{
my
$nonce
=
sprintf
(
"%06x"
,
rand
(0xffffff));
my
$result
;
eval
{
$result
=
$r
->call(
'EVALSHA'
,
$self
->{multi_hmget_script},
scalar
@_
,
map
(
'w:'
.
$_
,
@_
),
$nonce
);
1;
} or
do
{
if
($@ !~ /^NOSCRIPT/) {
$self
->disconnect;
die
"bayes: Redis LUA error: $@\n"
;
}
$self
->_define_lua_scripts;
$result
=
$r
->call(
'EVALSHA'
,
$self
->{multi_hmget_script},
scalar
@_
,
map
(
'w:'
.
$_
,
@_
),
$nonce
);
};
my
@items
=
split
(
' '
,
$result
);
my
$r_nonce
=
pop
(
@items
);
if
(
$r_nonce
ne
$nonce
) {
$self
->disconnect;
die
sprintf
(
"bayes: tok_get_all nonce mismatch, expected %s, got %s\n"
,
$nonce
,
defined
$r_nonce
?
$r_nonce
:
'UNDEF'
);
}
elsif
(
@items
!=
@_
) {
$self
->disconnect;
die
sprintf
(
"bayes: tok_get_all got %d entries, expected %d\n"
,
scalar
@items
,
scalar
@_
);
}
else
{
for
my
$j
(0 ..
$#items
) {
my
(
$s
,
$h
) =
split
(m{/},
$items
[
$j
], 2);
push
(
@values
, [
$_
[
$j
], (
$s
||0)+0, (
$h
||0)+0, 0])
if
$s
||
$h
;
}
}
}
dbg(
"bayes: tok_get_all found %d tokens out of %d"
,
scalar
@values
,
scalar
@_
);
return
\
@values
;
}
sub
tok_count_change {
my
(
$self
,
$dspam
,
$dham
,
$token
,
$newatime
) =
@_
;
$self
->multi_tok_count_change(
$dspam
,
$dham
, {
$token
=> 1},
$newatime
);
}
sub
multi_tok_count_change {
my
(
$self
,
$dspam
,
$dham
,
$tokens
,
$newatime
) =
@_
;
$dspam
||= 0;
$dham
||= 0;
dbg(
"bayes: multi_tok_count_change learning %d spam, %d ham"
,
$dspam
,
$dham
);
my
$ttl
=
$self
->{expire_token};
$self
->
connect
if
!
$self
->{connected};
my
$r
=
$self
->{redis};
if
(
$dspam
> 0 ||
$dham
> 0) {
while
(
my
(
$token
,
$v
) =
each
(
%$tokens
)) {
my
$key
=
'w:'
.
$token
;
$r
->b_call(
'HINCRBY'
,
$key
,
's'
,
int
$dspam
)
if
$dspam
> 0;
$r
->b_call(
'HINCRBY'
,
$key
,
'h'
,
int
$dham
)
if
$dham
> 0;
$r
->b_call(
'EXPIRE'
,
$key
,
$ttl
)
if
$ttl
;
}
$r
->b_results;
}
if
(
$dspam
< 0 ||
$dham
< 0) {
while
(
my
(
$token
,
$v
) =
each
(
%$tokens
)) {
my
$key
=
'w:'
.
$token
;
if
(
$dspam
< 0) {
my
$result
=
$r
->call(
'HINCRBY'
,
$key
,
's'
,
int
$dspam
);
if
(!
$result
||
$result
<= 0) {
$r
->call(
'HDEL'
,
$key
,
's'
);
}
elsif
(
$ttl
) {
$r
->call(
'EXPIRE'
,
$key
,
$ttl
);
}
}
if
(
$dham
< 0) {
my
$result
=
$r
->call(
'HINCRBY'
,
$key
,
'h'
,
int
$dham
);
if
(!
$result
||
$result
<= 0) {
$r
->call(
'HDEL'
,
$key
,
'h'
);
}
elsif
(
$ttl
) {
$r
->call(
'EXPIRE'
,
$key
,
$ttl
);
}
}
}
}
return
1;
}
sub
nspam_nham_get {
my
(
$self
) =
@_
;
my
@vars
=
$self
->get_storage_variables(
'NSPAM'
,
'NHAM'
);
dbg(
"bayes: nspam_nham_get nspam=%s, nham=%s"
,
@vars
);
@vars
;
}
sub
nspam_nham_change {
my
(
$self
,
$ds
,
$dh
) =
@_
;
return
1
unless
$ds
||
$dh
;
$self
->
connect
if
!
$self
->{connected};
my
$r
=
$self
->{redis};
my
$err
=
$self
->{timer}->run_and_catch(
sub
{
$r
->b_call(
'INCRBY'
,
"v:NSPAM"
,
$ds
)
if
$ds
;
$r
->b_call(
'INCRBY'
,
"v:NHAM"
,
$dh
)
if
$dh
;
$r
->b_results;
});
if
(
$self
->{timer}->timed_out()) {
$self
->disconnect;
die
(
"bayes: Redis connection timed out!"
);
}
elsif
(
$err
) {
$self
->disconnect;
die
(
"bayes: failed to increment nspam $ds nham $dh: $err"
);
}
return
1;
}
sub
tok_touch {
my
(
$self
,
$token
,
$atime
) =
@_
;
return
$self
->tok_touch_all([
$token
],
$atime
);
}
sub
tok_touch_all {
my
(
$self
,
$tokens
,
$newatime
) =
@_
;
my
$ttl
=
$self
->{expire_token};
return
1
unless
$ttl
&&
$tokens
&&
@$tokens
;
dbg(
"bayes: tok_touch_all setting expire to %s on %d tokens"
,
$ttl
,
scalar
@$tokens
);
$self
->
connect
if
!
$self
->{connected};
my
$r
=
$self
->{redis};
$r
->b_call(
'EXPIRE'
,
'w:'
.
$_
,
$ttl
)
for
@$tokens
;
$r
->b_results;
return
1;
}
sub
cleanup {
return
1;
}
sub
sync {
return
1;
}
sub
perform_upgrade {
return
1;
}
sub
clear_database {
my
(
$self
) =
@_
;
warn
(
"bayes: note: assuming the database is empty; "
.
"to manually clear a database: redis-cli -n <db-ind> FLUSHDB\n"
);
return
1;
}
sub
dump_db_toks {
my
(
$self
,
$template
,
$regex
,
@vars
) =
@_
;
return
0
unless
$self
->tie_db_readonly;
$self
->
connect
if
!
$self
->{connected};
my
$r
=
$self
->{redis};
my
$atime
=
time
;
my
$keys
=
$r
->call(
'KEYS'
,
'w:*'
);
dbg(
"bayes: fetched %d token keys"
,
scalar
@$keys
);
for
(
my
$i
= 0;
$i
<=
$#$keys
;
$i
+= 1000) {
my
$end
=
$i
+ 999 >=
$#$keys
?
$#$keys
:
$i
+ 999;
my
@tokensdata
;
if
(!
$self
->{have_lua}) {
for
(
my
$j
=
$i
;
$j
<=
$end
;
$j
++) {
$r
->b_call(
'HMGET'
,
$keys
->[
$j
],
's'
,
'h'
);
}
my
$j
=
$i
;
my
$itemslist_ref
=
$r
->b_results;
foreach
my
$item
(
@$itemslist_ref
) {
my
(
$s
,
$h
) =
@$item
;
push
(
@tokensdata
,
[
substr
(
$keys
->[
$j
],2), (
$s
||0)+0, (
$h
||0)+0 ])
if
$s
||
$h
;
$j
++;
}
}
else
{
my
$nonce
=
sprintf
(
"%06x"
,
rand
(0xffffff));
my
@tokens
= @{
$keys
}[
$i
..
$end
];
my
$result
=
$r
->call(
'EVALSHA'
,
$self
->{multi_hmget_script},
scalar
@tokens
,
@tokens
,
$nonce
);
my
@items
=
split
(
' '
,
$result
);
my
$r_nonce
=
pop
(
@items
);
if
(!
defined
$r_nonce
) {
$self
->disconnect;
die
"bayes: dump_db_toks received no results\n"
;
}
elsif
(
$r_nonce
ne
$nonce
) {
$self
->disconnect;
die
sprintf
(
"bayes: dump_db_toks nonce mismatch, "
.
"expected %s, got %s\n"
,
$nonce
,
defined
$r_nonce
?
$r_nonce
:
'UNDEF'
);
}
elsif
(
@items
!=
@tokens
) {
$self
->disconnect;
die
sprintf
(
"bayes: dump_db_toks got %d entries, expected %d\n"
,
scalar
@items
,
scalar
@tokens
);
}
@tokensdata
=
map
{
my
(
$s
,
$h
) =
split
(m{/},
shift
@items
, 2);
[
substr
(
$_
,2), (
$s
||0)+0, (
$h
||0)+0 ] }
@tokens
;
}
my
$probabilities_ref
=
$self
->{bayes}->_compute_prob_for_all_tokens(\
@tokensdata
,
$vars
[1],
$vars
[2]);
foreach
my
$tokendata
(
@tokensdata
) {
my
$prob
=
shift
(
@$probabilities_ref
);
my
(
$token
,
$s
,
$h
) =
@$tokendata
;
next
if
!
$s
&& !
$h
;
$prob
= 0.5
if
!
defined
$prob
;
my
$encoded
=
unpack
(
"H*"
,
$token
);
printf
(
$template
,
$prob
,
$s
,
$h
,
$atime
,
$encoded
)
or
die
"Error writing tokens: $!"
;
}
}
dbg(
"bayes: written token keys"
);
$self
->untie_db();
return
;
}
sub
backup_database {
my
(
$self
) =
@_
;
return
0
unless
$self
->tie_db_readonly;
$self
->
connect
if
!
$self
->{connected};
my
$r
=
$self
->{redis};
my
$atime
=
time
;
my
@vars
=
$self
->get_storage_variables(
qw(DB_VERSION NSPAM NHAM)
);
print
"v\t$vars[0]\tdb_version # this must be the first line!!!\n"
;
print
"v\t$vars[1]\tnum_spam\n"
;
print
"v\t$vars[2]\tnum_nonspam\n"
;
my
$keys
=
$r
->call(
'KEYS'
,
'w:*'
);
dbg(
"bayes: fetched %d token keys"
,
scalar
@$keys
);
for
(
my
$i
= 0;
$i
<=
$#$keys
;
$i
+= 1000) {
my
$end
=
$i
+ 999 >=
$#$keys
?
$#$keys
:
$i
+ 999;
if
(!
$self
->{have_lua}) {
for
(
my
$j
=
$i
;
$j
<=
$end
;
$j
++) {
$r
->b_call(
'HMGET'
,
$keys
->[
$j
],
's'
,
'h'
);
}
my
$j
=
$i
;
my
$itemslist_ref
=
$r
->b_results;
foreach
my
$item
(
@$itemslist_ref
) {
my
$encoded
=
unpack
(
"H*"
,
substr
(
$keys
->[
$j
++], 2));
my
(
$s
,
$h
) =
@$item
;
printf
(
"t\t%d\t%d\t%s\t%s\n"
,
$s
||0,
$h
||0,
$atime
,
$encoded
)
if
$s
||
$h
;
}
}
else
{
my
$nonce
=
sprintf
(
"%06x"
,
rand
(0xffffff));
my
@tokens
= @{
$keys
}[
$i
..
$end
];
my
$result
=
$r
->call(
'EVALSHA'
,
$self
->{multi_hmget_script},
scalar
@tokens
,
@tokens
,
$nonce
);
my
@items
=
split
(
' '
,
$result
);
my
$r_nonce
=
pop
(
@items
);
if
(!
defined
$r_nonce
) {
$self
->disconnect;
die
"bayes: backup_database received no results\n"
;
}
elsif
(
$r_nonce
ne
$nonce
) {
$self
->disconnect;
die
sprintf
(
"bayes: backup_database nonce mismatch, "
.
"expected %s, got %s\n"
,
$nonce
,
defined
$r_nonce
?
$r_nonce
:
'UNDEF'
);
}
elsif
(
@items
!=
@tokens
) {
$self
->disconnect;
die
sprintf
(
"bayes: backup_database got %d entries, expected %d\n"
,
scalar
@items
,
scalar
@tokens
);
}
foreach
my
$token
(
@tokens
) {
my
(
$s
,
$h
) =
split
(m{/},
shift
@items
, 2);
next
if
!
$s
&& !
$h
;
my
$encoded
=
unpack
(
"H*"
,
substr
(
$token
,2));
printf
(
"t\t%d\t%d\t%s\t%s\n"
,
$s
||0,
$h
||0,
$atime
,
$encoded
);
}
}
}
dbg(
"bayes: written token keys"
);
$keys
=
$r
->call(
'KEYS'
,
's:*'
);
dbg(
"bayes: fetched %d seen keys"
,
scalar
@$keys
);
for
(
my
$i
= 0;
$i
<=
$#$keys
;
$i
+= 1000) {
my
$end
=
$i
+ 999 >=
$#$keys
?
$#$keys
:
$i
+ 999;
my
@t
= @{
$keys
}[
$i
..
$end
];
my
$v
=
$r
->call(
'MGET'
,
@t
);
for
(
my
$i
= 0;
$i
<
@$v
;
$i
++) {
next
unless
defined
$v
->[
$i
];
printf
(
"s\t%s\t%s\n"
,
$v
->[
$i
],
substr
(
$t
[
$i
], 2));
}
}
dbg(
"bayes: written seen keys"
);
$self
->untie_db();
return
1;
}
sub
restore_database {
my
(
$self
,
$filename
,
$showdots
) =
@_
;
local
*DUMPFILE
;
if
(!
open
(DUMPFILE,
'<'
,
$filename
)) {
warn
(
"bayes: unable to open backup file $filename: $!"
);
return
0;
}
unless
(
$self
->clear_database()) {
return
0;
}
return
0
unless
$self
->tie_db_writable;
$self
->
connect
if
!
$self
->{connected};
my
$r
=
$self
->{redis};
my
$token_count
= 0;
my
$db_version
;
my
$num_spam
= 0;
my
$num_ham
= 0;
my
$line_count
= 0;
my
$line
= <DUMPFILE>;
defined
$line
or
die
"Error reading dump file: $!"
;
$line_count
++;
if
(
$line
=~ m/^v\s+(\d+)\s+db_version/) {
$db_version
= $1;
}
else
{
warn
(
"bayes: database version must be the first line in the backup file, correct and re-run"
);
return
0;
}
unless
(
$db_version
== 2 ||
$db_version
== 3) {
warn
(
"bayes: database version $db_version is unsupported, must be version 2 or 3\n"
);
return
0;
}
my
$curtime
=
time
;
my
$q_cnt
= 0;
my
$token_ttl
=
$self
->{expire_token};
my
$seen_ttl
=
$self
->{expire_seen};
for
($!=0;
defined
(
$line
=<DUMPFILE>); $!=0) {
chomp
(
$line
);
$line_count
++;
if
(
$showdots
&&
$line_count
% 1000 == 0) {
print
STDERR
"."
if
$showdots
;
}
if
(
$line
=~ /^t\s+/) {
my
@parsed_line
=
split
(/\s+/,
$line
, 5);
my
$spam_count
=
$parsed_line
[1] + 0;
my
$ham_count
=
$parsed_line
[2] + 0;
my
$token
=
$parsed_line
[4];
$spam_count
= 0
if
$spam_count
< 0;
$ham_count
= 0
if
$ham_count
< 0;
next
if
!
$spam_count
&& !
$ham_count
;
if
(
$db_version
< 3) {
$token
=
substr
(sha1(
$token
), -5);
}
else
{
$token
=
pack
(
"H*"
,
$token
);
}
my
$key
=
'w:'
.
$token
;
$r
->b_call(
'HINCRBY'
,
$key
,
's'
,
int
$spam_count
)
if
$spam_count
> 0;
$r
->b_call(
'HINCRBY'
,
$key
,
'h'
,
int
$ham_count
)
if
$ham_count
> 0;
if
(
$token_ttl
) {
$r
->b_call(
'EXPIRE'
,
$key
,
int
(
$token_ttl
* (
rand
()+0.7)));
}
$r
->b_results
if
++
$q_cnt
% 1000 == 0;
$token_count
++;
}
elsif
(
$line
=~ /^s\s+/) {
my
@parsed_line
=
split
(/\s+/,
$line
, 3);
my
$flag
=
$parsed_line
[1];
my
$msgid
=
$parsed_line
[2];
unless
(
$flag
eq
'h'
||
$flag
eq
's'
) {
dbg(
"bayes: unknown seen flag ($flag) for line: $line, skipping"
);
next
;
}
unless
(
$msgid
) {
dbg(
"bayes: blank msgid for line: $line, skipping"
);
next
;
}
if
(!
$seen_ttl
) {
$r
->b_call(
'SET'
,
"s:$msgid"
,
$flag
);
}
else
{
$r
->b_call(
'SETEX'
,
"s:$msgid"
,
int
(
$seen_ttl
* (
rand
()+0.7)),
$flag
);
}
$r
->b_results
if
++
$q_cnt
% 1000 == 0;
}
elsif
(
$line
=~ /^v\s+/) {
my
@parsed_line
=
split
(/\s+/,
$line
, 3);
my
$value
=
$parsed_line
[1] + 0;
if
(
$parsed_line
[2] eq
'num_spam'
) {
$num_spam
=
$value
;
}
elsif
(
$parsed_line
[2] eq
'num_nonspam'
) {
$num_ham
=
$value
;
}
else
{
dbg(
"bayes: restore_database: skipping unknown line: $line"
);
}
}
else
{
dbg(
"bayes: skipping unknown line: $line"
);
next
;
}
}
$r
->b_results;
defined
$line
|| $!==0 or
$!==EBADF ? dbg(
"bayes: error reading dump file: $!"
)
:
die
"error reading dump file: $!"
;
close
(DUMPFILE) or
die
"Can't close dump file: $!"
;
print
STDERR
"\n"
if
$showdots
;
if
(
$num_spam
<= 0 &&
$num_ham
<= 0) {
warn
(
"bayes: no num_spam/num_ham found, aborting"
);
return
0;
}
else
{
$self
->nspam_nham_change(
$num_spam
,
$num_ham
);
}
dbg(
"bayes: parsed $line_count lines"
);
dbg(
"bayes: created database with $token_count tokens "
.
"based on $num_spam spam messages and $num_ham ham messages"
);
$self
->untie_db();
return
1;
}
sub
db_readable {
my
(
$self
) =
@_
;
return
$self
->{is_officially_open};
}
sub
db_writable {
my
(
$self
) =
@_
;
return
$self
->{is_officially_open} &&
$self
->{is_writable};
}
sub
_define_lua_scripts {
my
$self
=
shift
;
dbg(
"bayes: defining Lua scripts"
);
$self
->
connect
if
!
$self
->{connected};
my
$r
=
$self
->{redis};
$self
->{multi_hmget_script} =
$r
->call(
'SCRIPT'
,
'LOAD'
,
<<'END');
local rcall = redis.call
local nonce = ARGV[1]
local KEYS = KEYS
local r = {}
for j = 1, #KEYS do
local sh = rcall("HMGET", KEYS[j], "s", "h")
-- returns counts as a list of spam/ham pairs, zeroes may be omitted
local s, h = sh[1] or "0", sh[2] or "0"
local pair
if h == "0" then
pair = s -- just a spam field, possibly zero; a ham field omitted
elseif s == "0" then
pair = "/" .. h -- just a ham field, zero in a spam field suppressed
else
pair = s .. "/" .. h
end
r[#r+1] = pair
end
r[#r+1] = nonce
-- return counts as a single string, avoids overhead of multiresult parsing
return table.concat(r," ")
END
1;
}
1;