From Code to Community: Sponsoring The Perl and Raku Conference 2025 Learn more

our $VERSION = '1.20141230'; # VERSION
use strict;
use Carp;
sub save_aggregate {
my ( $self, $agg ) = @_;
croak "policy_published must be a Mail::DMARC::Policy object"
if 'Mail::DMARC::Policy' ne ref $agg->policy_published;
#warn Dumper($meta); ## no critic (Carp)
foreach my $f ( qw/ org_name email begin end / ) {
croak "meta field $f required" if ! $agg->metadata->$f;
}
my $rid = $self->get_report_id( $agg )
or croak "failed to create report!";
# on 6/8/2013, Microsoft spat out a bunch of reports with zero records.
if ( ! $agg->record ) {
warn "\ta report with ZERO records! Illegal.\n"; ## no critic (Carp)
return $rid;
};
foreach my $rec ( @{ $agg->record } ) {
$self->insert_agg_record($rid, $rec);
};
return $rid;
};
sub retrieve {
my ( $self, %args ) = @_;
my $query = $self->get_report_query;
my @params;
if ( $args{rid} ) {
$query .= " AND r.id=?";
push @params, $args{rid};
};
if ( $args{begin} ) {
$query .= " AND r.begin>=?";
push @params, $args{begin};
};
if ( $args{end} ) {
$query .= " AND r.end<=?";
push @params, $args{end};
};
if ( $args{author} ) {
$query .= " AND a.org_name=?";
push @params, $args{author};
};
if ( $args{from_domain} ) {
$query .= " AND fd.domain=?";
push @params, $args{from_domain};
};
my $reports = $self->query( $query, \@params );
foreach (@$reports ) {
$_->{begin} = join(" ", split(/T/, $self->epoch_to_iso( $_->{begin} )));
$_->{end} = join(" ", split(/T/, $self->epoch_to_iso( $_->{end} )));
};
return $reports;
}
sub retrieve_todo {
my ( $self, @args ) = @_;
# this method extracts the data from the SQL tables and populates an
# Aggregate report object with it.
my $reports = $self->query( $self->get_todo_query, [ time ] );
return if ! @$reports;
my $report = $reports->[0];
my $agg = Mail::DMARC::Report::Aggregate->new();
$self->populate_agg_metadata( \$agg, \$report );
my $pp = $self->get_report_policy_published( $report->{rid} );
$pp->{domain} = $report->{from_domain};
$agg->policy_published( Mail::DMARC::Policy->new( %$pp ) );
$self->populate_agg_records( \$agg, $report->{rid} );
return $agg;
};
sub delete_report {
my $self = shift;
my $report_id = shift or croak "missing report ID";
print "deleting report $report_id\n";
# deletes with FK don't cascade in SQLite? Clean each table manually
my $rows = $self->query( 'SELECT id FROM report_record WHERE report_id=?',
[$report_id] );
my $row_ids = join( ',', map { $_->{id} } @$rows ) or return 1;
foreach my $table (
qw/ report_record_spf report_record_dkim report_record_reason /)
{
print "deleting $table rows $row_ids\n";
$self->query(
"DELETE FROM $table WHERE report_record_id IN ($row_ids)");
}
foreach my $table (qw/ report_policy_published report_record /) {
$self->query( "DELETE FROM $table WHERE report_id=?", [$report_id] );
}
$self->query( "DELETE FROM report_error WHERE report_id=?", [$report_id] );
# In MySQL, where FK constraints DO cascade, this is the only query needed
$self->query( "DELETE FROM report WHERE id=?", [$report_id] );
return 1;
}
sub get_domain_id {
my ( $self, $domain ) = @_;
croak "missing domain calling " . ( caller(0) )[3] if !$domain;
my $r = $self->query( 'SELECT id FROM domain WHERE domain=?', [$domain] );
if ( $r && scalar @$r ) {
return $r->[0]{id};
}
return $self->query( 'INSERT INTO domain (domain) VALUES (?)', [$domain]);
}
sub get_author_id {
my ( $self, $meta ) = @_;
croak "missing author name" if !$meta->org_name;
my $r = $self->query( 'SELECT id FROM author WHERE org_name=?',
[ $meta->org_name ] );
if ( $r && scalar @$r ) {
return $r->[0]{id};
}
carp "missing email" if !$meta->email;
return $self->query(
'INSERT INTO author (org_name,email,extra_contact) VALUES (??)',
[ $meta->org_name, $meta->email, $meta->extra_contact_info ]
);
}
sub get_report_id {
my ( $self, $aggr ) = @_;
my $meta = $aggr->metadata;
my $pol = $aggr->policy_published;
# check if report exists
my $author_id = $self->get_author_id( $meta ) or croak;
my $from_dom_id = $self->get_domain_id( $pol->domain ) or croak;
my $ids;
if ( $meta->report_id ) {
# reports arriving via the wire will have an author ID & report ID
$ids = $self->query(
'SELECT id FROM report WHERE uuid=? AND author_id=?',
[ $meta->report_id, $author_id ]
);
}
else {
# Reports submitted by our local MTA will not have a report ID
# They aggregate on the From domain, where the DMARC policy was discovered
$ids = $self->query(
'SELECT id FROM report WHERE from_domain_id=? AND end > ?',
[ $from_dom_id, time ]
);
};
if ( scalar @$ids ) { # report already exists
return $self->{report_id} = $ids->[0]{id};
}
my $rid = $self->{report_id} = $self->query(
'INSERT INTO report (from_domain_id, begin, end, author_id, uuid) VALUES (??)',
[ $from_dom_id, $meta->begin, $meta->end, $author_id, $meta->uuid ]
) or return;
$self->insert_policy_published( $rid, $pol );
return $rid;
}
sub get_report {
my ($self,@args) = @_;
croak "invalid parameters" if @args % 2;
my %args = @args;
my $query = $self->get_report_query;
my @params;
my @known = qw/ rid author from_domain begin end /;
my %known = map { $_ => 1 } @known;
# TODO: allow custom search ops? 'searchOper' => 'eq',
if ( $args{searchField} && $known{ $args{searchField} } ) {
$query .= " AND $args{searchField}=?";
push @params, $args{searchString};
};
foreach my $known ( @known ) {
next if ! defined $args{$known};
$query .= " AND $known=?";
push @params, $args{$known};
};
if ( $args{sidx} && $known{$args{sidx}} ) {
$query .= " ORDER BY " . $args{sidx};
if ( $args{sord} ) {
$query .= $args{sord} eq 'desc' ? ' DESC' : ' ASC';
};
};
my $total_recs = $self->dbix->query('SELECT COUNT(*) FROM report')->list;
my $total_pages = 0;
if ( $args{rows} ) {
if ( $args{page} ) {
$total_pages = POSIX::ceil($total_recs / $args{rows});
my $start = ($args{rows} * $args{page}) - $args{rows};
$start = 0 if $start < 0;
$query .= " LIMIT ?,?";
push @params, $start, $args{rows};
}
else {
$query .= " LIMIT ?";
push @params, $args{rows};
};
};
# warn "query: $query\n" . join(", ", @params) . "\n";
my $reports = $self->query($query, \@params);
foreach (@$reports ) {
$_->{begin} = join('<br>', split(/T/, $self->epoch_to_iso( $_->{begin} )));
$_->{end} = join('<br>', split(/T/, $self->epoch_to_iso( $_->{end} )));
};
# return in the format expected by jqGrid
return {
cur_page => $args{page},
total_pages => $total_pages,
total_rows => $total_recs,
rows => $reports,
};
};
sub get_report_policy_published {
my ($self, $rid) = @_;
my $pp_query = 'SELECT * from report_policy_published WHERE report_id=?';
my $pp = $self->query($pp_query, [ $rid ] )->[0];
$pp->{p} ||= 'none';
$pp = Mail::DMARC::Policy->new( v=>'DMARC1', %$pp );
return $pp;
};
sub get_report_query {
my $self = shift;
return <<'EO_REPORTS'
SELECT r.id AS rid,
r.uuid,
r.begin AS begin,
r.end AS end,
a.org_name AS author,
fd.domain AS from_domain
FROM report r
LEFT JOIN author a ON r.author_id=a.id
LEFT JOIN domain fd ON r.from_domain_id=fd.id
WHERE 1=1
EO_REPORTS
;
};
sub get_todo_query {
return <<'EO_TODO_QUERY'
SELECT r.id AS rid,
r.begin AS begin,
r.end AS end,
a.org_name AS author,
fd.domain AS from_domain
FROM report r
LEFT JOIN report_record rr ON r.id=rr.report_id
LEFT JOIN author a ON r.author_id=a.id
LEFT JOIN domain fd ON r.from_domain_id=fd.id
WHERE rr.count IS NULL
AND rr.report_id IS NOT NULL
AND r.end < ?
GROUP BY r.id
ORDER BY r.id
LIMIT 1
EO_TODO_QUERY
;
};
sub get_rr {
my ($self,@args) = @_;
croak "invalid parameters" if @args % 2;
my %args = @args;
#warn Dumper(\%args);
croak "missing report ID (rid)!" if ! defined $args{rid};
my $rows = $self->query( $self->get_rr_query, [ $args{rid} ] );
foreach ( @$rows ) {
$_->{reasons} = $self->query('SELECT type,comment FROM report_record_reason WHERE report_record_id=?', [ $_->{id} ] );
$_->{source_ip} = $self->any_inet_ntop( $_->{source_ip} );
};
return {
cur_page => 1,
total_pages => 1,
total_rows => scalar @$rows,
rows => $rows,
};
};
sub get_row_spf {
my ($self, $rowid) = @_;
my $spf_query = <<"EO_SPF_ROW"
SELECT d.domain AS domain,
s.result AS result,
s.scope AS scope
FROM report_record_spf s
LEFT JOIN domain d ON s.domain_id=d.id
WHERE s.report_record_id=?
EO_SPF_ROW
;
return $self->query( $spf_query, [ $rowid ] );
};
sub get_row_dkim {
my ($self, $rowid) = @_;
my $dkim_query = <<"EO_DKIM_ROW"
SELECT d.domain AS domain,
k.selector AS selector,
k.result AS result,
k.human_result AS human_result
FROM report_record_dkim k
LEFT JOIN domain d ON k.domain_id=d.id
WHERE report_record_id=?
EO_DKIM_ROW
;
return $self->query( $dkim_query, [ $rowid ] );
};
sub get_row_reason {
my ($self, $rowid) = @_;
my $row_query = <<"EO_ROW_QUERY"
SELECT type,comment
FROM report_record_reason
WHERE report_record_id=?
EO_ROW_QUERY
;
return $self->query( $row_query, [ $rowid ] );
};
sub get_rr_query {
return <<'EO_ROW_QUERY'
SELECT rr.*,
etd.domain AS envelope_to,
efd.domain AS envelope_from,
hfd.domain AS header_from
FROM report_record rr
LEFT JOIN domain etd ON etd.id=rr.envelope_to_did
LEFT JOIN domain efd ON efd.id=rr.envelope_from_did
LEFT JOIN domain hfd ON hfd.id=rr.header_from_did
WHERE report_id = ?
ORDER BY id
EO_ROW_QUERY
;
};
sub populate_agg_metadata {
my ($self, $agg_ref, $report_ref) = @_;
$$agg_ref->metadata->report_id( $$report_ref->{rid} );
foreach my $f ( qw/ org_name email extra_contact_info / ) {
$$agg_ref->metadata->$f( $self->config->{organization}{$f} );
};
foreach my $f ( qw/ begin end / ) {
$$agg_ref->metadata->$f( $$report_ref->{$f} );
};
my $errors = $self->query('SELECT error FROM report_error WHERE report_id=?',
[ $$report_ref->{rid} ]
);
foreach ( @$errors ) {
$$agg_ref->metadata->error( $_->{error} );
};
return 1;
};
sub populate_agg_records {
my ($self, $agg_ref, $rid) = @_;
my $recs = $self->query( $self->get_rr_query, [ $rid ] );
# aggregate the connections per IP-Disposition-DKIM-SPF uniqueness
my (%ips, %uniq, %pe, %auth, %ident, %reasons);
foreach my $rec ( @$recs ) {
my $key = join('-', $rec->{source_ip},
@$rec{ qw/ disposition dkim spf / }); # hash slice
$uniq{ $key }++;
$ips{$key} = $rec->{source_ip};
$ident{$key}{header_from} ||= $rec->{header_from};
$ident{$key}{envelope_from} ||= $rec->{envelope_from};
$ident{$key}{envelope_to} ||= $rec->{envelope_to};
$pe{$key}{disposition} ||= $rec->{disposition};
$pe{$key}{dkim} ||= $rec->{dkim};
$pe{$key}{spf} ||= $rec->{spf};
$auth{$key}{spf } ||= $self->get_row_spf($rec->{id});
$auth{$key}{dkim} ||= $self->get_row_dkim($rec->{id});
my $reasons = $self->get_row_reason( $rec->{id} );
foreach my $reason ( @$reasons ) {
my $type = $reason->{type} or next;
$reasons{$key}{$type} = $reason->{comment}; # flatten reasons
}
}
foreach my $u ( keys %uniq ) {
$$agg_ref->record( {
identifiers => $ident{$u},
auth_results => $auth{$u},
row => {
source_ip => $self->any_inet_ntop( $ips{$u} ),
count => $uniq{ $u },
policy_evaluated => {
%{ $pe{$u} },
$reasons{$u} ? ( reason => [ map { { type => $_, comment => $reasons{$u}{$_} } } sort keys %{ $reasons{$u} } ] ) : (),
},
},
} );
}
return $$agg_ref->record;
}
sub row_exists {
my ($self, $rid, $rec ) = @_;
if ( ! defined $rec->{row}{count} ) {
carp "\tnew record";
return;
};
my $rows = $self->query('SELECT id FROM report_record WHERE report_id=? AND source_ip=? AND count=?',
[ $rid, $rec->{row}{source_ip}, $rec->{row}{count}, ]
);
return 1 if scalar @$rows;
return;
};
sub insert_agg_record {
my ($self, $row_id, $rec) = @_;
return 1 if $self->row_exists( $row_id, $rec);
$row_id = $self->insert_rr( $row_id, $rec )
or croak "failed to insert report row";
my $reasons = $rec->{row}{policy_evaluated}{reason};
if ( $reasons ) {
foreach my $reason ( @$reasons ) {
next if !$reason || !$reason->{type};
$self->insert_rr_reason( $row_id, $reason->{type}, $reason->{comment} );
};
}
my $spf_ref = $rec->{auth_results}{spf};
if ( $spf_ref ) {
foreach my $spf (@$spf_ref) {
$self->insert_rr_spf( $row_id, $spf );
}
}
my $dkim = $rec->{auth_results}{dkim};
if ($dkim) {
foreach my $sig (@$dkim) {
next if ! $sig || ! $sig->{domain};
$self->insert_rr_dkim( $row_id, $sig );
}
}
return 1;
}
sub insert_error {
my ( $self, $rid, $error ) = @_;
# wait >5m before trying to deliver this report again
$self->query('UPDATE report SET end=? WHERE id=?', [time + (5*60), $rid]);
return $self->query(
'INSERT INTO report_error (report_id, error ) VALUES (??)',
[ $rid, $error ]
);
}
sub insert_rr_reason {
my ( $self, $row_id, $type, $comment ) = @_;
return $self->query(
'INSERT INTO report_record_reason (report_record_id, type, comment) VALUES (?,?,?)',
[ $row_id, $type, ($comment || '') ]
);
}
sub insert_rr_dkim {
my ( $self, $row_id, $dkim ) = @_;
my (@fields, @values);
foreach ( qw/ domain selector result human_result / ) {
next if ! $dkim->{$_};
if ( 'domain' eq $_ ) {
push @fields, 'domain_id';
push @values, $self->get_domain_id( $dkim->{domain} );
next;
};
push @fields, $_;
push @values, $dkim->{$_};
};
my $fields_str = join ',', @fields;
my $query = <<"EO_DKIM"
INSERT INTO report_record_dkim
(report_record_id, $fields_str)
VALUES (??)
EO_DKIM
;
$self->query( $query, [ $row_id, @values ] );
return 1;
}
sub insert_rr_spf {
my ( $self, $row_id, $spf ) = @_;
my (@fields, @values);
for ( qw/ domain scope result / ) {
next if ! $spf->{$_};
if ( 'domain' eq $_ ) {
push @fields, 'domain_id';
push @values, $self->get_domain_id( $spf->{domain} );
next;
};
push @fields, $_;
push @values, $spf->{$_};
};
my $fields_str = join ',', @fields;
my $query = "INSERT INTO report_record_spf (report_record_id, $fields_str) VALUES(??)";
$self->query( $query, [ $row_id, @values ]);
return 1;
}
sub insert_rr {
my ( $self, $report_id, $rec ) = @_;
$report_id or croak "report ID required?!";
my $query = <<'EO_ROW_INSERT'
INSERT INTO report_record
(report_id, source_ip, count, header_from_did, envelope_to_did, envelope_from_did,
disposition, dkim, spf)
VALUES (??)
EO_ROW_INSERT
;
my @args = ( $report_id,
$self->any_inet_pton( $rec->{row}{source_ip} ),
$rec->{row}{count},
);
foreach ( qw/ header_from envelope_to envelope_from / ) {
push @args, $rec->{identifiers}{$_} ?
$self->get_domain_id( $rec->{identifiers}{$_} ) : undef;
};
push @args, map { $rec->{row}{policy_evaluated}{$_} } qw/ disposition dkim spf /;
my $rr_id = $self->query( $query, \@args ) or croak;
return $self->{report_row_id} = $rr_id;
}
sub insert_policy_published {
my ( $self, $id, $pub ) = @_;
my $query = <<"EO_RPP"
INSERT INTO report_policy_published
(report_id, adkim, aspf, p, sp, pct, rua)
VALUES (??)
EO_RPP
;
return $self->query( $query,
[ $id, @$pub{ qw/ adkim aspf p sp pct rua /} ]
)
or croak "failed to insert published policy";
}
sub db_connect {
my $self = shift;
return $self->{dbix} if $self->{dbix}; # caching
my $dsn = $self->config->{report_store}{dsn} or croak;
my $user = $self->config->{report_store}{user};
my $pass = $self->config->{report_store}{pass};
my $needs_tables;
if ( $dsn =~ /sqlite/i ) {
my ($db) = ( split /=/, $dsn )[-1];
if ( !$db || $db eq ':memory:' || !-e $db ) {
my $schema = 'mail_dmarc_schema.sqlite';
$needs_tables = $self->get_db_schema($schema)
or croak
"can't locate DB $db AND can't find $schema! Create $db manually.\n";
}
}
$self->{dbix} = DBIx::Simple->connect( $dsn, $user, $pass )
or return $self->error( DBIx::Simple->error );
if ($needs_tables) {
$self->apply_db_schema($needs_tables);
}
return $self->{dbix};
}
sub db_check_err {
my ( $self, $err ) = @_;
## no critic (PackageVars)
return if !defined $DBI::errstr;
return if !$DBI::errstr;
return if $DBI::errstr eq 'DBI error: ';
croak $err . $DBI::errstr;
}
sub dbix { return $_[0]->{dbix} if $_[0]->{dbix}; return $_[0]->db_connect(); }
sub apply_db_schema {
my ( $self, $file ) = @_;
my $setup = $self->slurp($file);
foreach ( split /;/, $setup ) {
# warn "$_\n";
$self->dbix->query($_);
}
return;
}
sub get_db_schema {
my ( $self, $file ) = @_;
return "share/$file" if -f "share/$file"; # when testing
return File::ShareDir::dist_file( 'Mail-DMARC', $file ); # when installed
}
sub query {
my ( $self, $query, $params, @extra ) = @_;
my @c = caller;
my $err = sprintf( "query called by %s, %s\n", $c[0], $c[2] )
. "\t$query\n\t";
my @params;
if ( defined $params ) {
@params = ref $params eq 'ARRAY' ? @$params : $params;
no warnings; ## no critic (NoWarnings)
$err .= join( ', ', @params );
}
croak "too many arguments to exec_query!" if @extra;
my $dbix = $self->db_connect() or croak DBIx::Simple->error;
return $self->query_insert( $query, $err, @params )
if $query =~ /^INSERT/ix;
return $self->query_replace( $query, $err, @params )
if $query =~ /^(?:REPLACE|UPDATE)/ix;
return $self->query_delete( $query, $err, @params )
if $query =~ /^DELETE/ix;
return $self->query_any( $query, $err, @params );
}
sub query_any {
my ( $self, $query, $err, @params ) = @_;
#warn "query: $query\n" . join(", ", @params) . "\n";
my $r;
eval { $r = $self->dbix->query( $query, @params )->hashes; } or print '';
$self->db_check_err($err);
die "something went wrong with: $err\n" if ! $r; ## no critic (Carp)
return $r;
}
sub query_insert {
my ( $self, $query, $err, @params ) = @_;
eval { $self->dbix->query( $query, @params ) } or do {
warn DBIx::Simple->error;
croak $err;
};
$self->db_check_err($err);
# If the table has no autoincrement field, last_insert_id is zero
my ( undef, undef, $table ) = split /\s+/, $query;
($table) = split( /\(/, $table ) if $table =~ /\(/;
croak "unable to determine table in query: $query" if !$table;
return $self->dbix->last_insert_id( undef, undef, $table, undef );
}
sub query_replace {
my ( $self, $query, $err, @params ) = @_;
$self->dbix->query( $query, @params ) or croak $err;
$self->db_check_err($err);
return 1; # sorry, no indication of success
}
sub query_delete {
my ( $self, $query, $err, @params ) = @_;
$self->dbix->query( $query, @params ) or croak $err;
$self->db_check_err($err);
my $affected = 0;
eval { $affected = $self->dbix->query("SELECT ROW_COUNT()")->list }; ## no critic (Eval)
return 1 if $@; # succeed for SQLite
return $affected;
}
1;
# ABSTRACT: store and retrieve reports from a SQL RDBMS
__END__
=pod
=head1 NAME
Mail::DMARC::Report::Store::SQL - store and retrieve reports from a SQL RDBMS
=head1 VERSION
version 1.20141230
=head1 DESCRIPTION
Uses ANSI SQL syntax, keeping the SQL as portable as possible.
DB engine specific features are to be avoided.
=head1 SYPNOSIS
Store and retrieve DMARC reports from SQL data store.
Tested with SQLite and MySQL.
=head1 AUTHORS
=over 4
=item *
Matt Simerson <msimerson@cpan.org>
=item *
Davide Migliavacca <shari@cpan.org>
=back
=head1 COPYRIGHT AND LICENSE
This software is copyright (c) 2014 by Matt Simerson.
This is free software; you can redistribute it and/or modify it under
the same terms as the Perl 5 programming language system itself.
=cut