BEGIN { use_ok('IO::File') }
BEGIN { use_ok('Storable',qw(store retrieve)) }
BEGIN { use_ok('Thread::Queue') }
$SIG{ALRM} = sub {};
my $unresolved = 'unresolved';
my $resolved = 'resolved';
my $filter = 'filter';
my $filtered = 'filtered';
my $ip2domain = 'ip2domain';
diag( "Creating test log files" );
my @ip;
my %ip2domain;
my %domain2ip;
my $domainref = Thread::Pool::Resolve->can( 'rand_domain' );
my $ipref = Thread::Pool::Resolve->can( 'rand_ip' );
my $resolve;
my @shared : shared;
my %status : shared;
my $checkpointed : shared;
for (my $i = 1; $i <= 100; $i++) {
my ($ip,$domain) = (&$ipref,&$domainref);
push( @ip,$ip );
$ip2domain{$ip} = $domain;
$domain2ip{$domain} = $ip;
}
for (my $i = 1; $i <= 30; $i++) {
my $ip = &$ipref;
push( @ip,$ip );
$ip2domain{$ip} = '';
}
ok( store( \%ip2domain,$ip2domain ), 'save ip2domain hash with Storable' );
ok( open( my $out1,'>',$unresolved ), 'create unresolved log file' );
ok( open( my $out2,'>',$resolved ), 'create resolved log file' );
my $lines;
for ($lines = 1; $lines <= 1000; $lines++) {
my $ip = $ip[int(rand(@ip))];
my $domain = $ip2domain{$ip} || $ip;
my $hits = 1+int(rand(10));
foreach (1..$hits) {
print $out1 "$ip $lines\n";
print $out2 "$domain $lines\n";
$lines++;
}
$lines--;
}
$lines--;
my $checkpoints = int($lines/10);
ok( close( $out1 ), 'close unresolved log file' );
ok( close( $out2 ), 'close resolved log file' );
ok( open( my $script,'>',$filter ), 'create script file' );
print $script <<EOD;
BEGIN {\@INC = qw(@INC)};
\$SIG{ALRM} = sub {};
use Storable qw(retrieve);
my \$ip2domain = retrieve( '$ip2domain' );
sub gethostbyaddr { select( undef,undef,undef,rand() ); \$ip2domain->{\$_[0]} }
use Thread::Pool::Resolve;
my \$r = Thread::Pool::Resolve->new(
{
optimize => shift,
resolver => 'gethostbyaddr',
}
)->read;
EOD
ok( close( $script ), 'close script file' );
diag( "Test simple external log resolve filter ($optimize)" );
ok( !system( "$^X $filter $optimize <$unresolved >$filtered" ), 'filter unresolved' );
ok( check( $resolved,$filtered ), 'check result of script filter' );
diag( "Test resolving from a file ($optimize)" );
$checkpointed = 0;
$resolve = Thread::Pool::Resolve->new(
{
status => \%status,
optimize => $optimize,
checkpoint => sub {$checkpointed++},
frequency => 10,
resolver => 'gethostbyaddr',
},
$filtered
);
isa_ok( $resolve,'Thread::Pool::Resolve', 'check object type' );
ok( $resolve->read( $unresolved ), 'read from file' );
$resolve->shutdown;
ok( check( $resolved,$filtered ), 'check result of file' );
cmp_ok( $checkpointed,'==',$checkpoints,'check checkpoints' );
diag( "Test resolving from an open()ed handle ($optimize)" );
ok( open( my $log,'<',$unresolved ), 'open GLOB log file' );
$resolve = Thread::Pool::Resolve->new(
{
status => \%status,
optimize => $optimize,
resolver => 'gethostbyaddr',
},
$filtered
);
isa_ok( $resolve,'Thread::Pool::Resolve', 'check object type' );
ok( $resolve->read( $log ), 'read from opened GLOB' );
$resolve->shutdown;
ok( check( $resolved,$filtered ), 'check result of opened GLOB' );
ok( close( $log ), 'close GLOB log file' );
diag( "Test resolving from an IO::File->new handle ($optimize)" );
$log = IO::File->new( $unresolved,'<' );
isa_ok( $log,'IO::Handle', 'check IO::File handle' );
$resolve = Thread::Pool::Resolve->new(
{
status => \%status,
optimize => $optimize,
open => sub { IO::File->new( @_ ) },
resolver => 'gethostbyaddr',
},
$filtered
);
isa_ok( $resolve,'Thread::Pool::Resolve', 'check object type' );
ok( $resolve->read( $log ), 'read from opened IO::File' );
$resolve->shutdown;
ok( check( $resolved,$filtered ), 'check result of opened IO::File' );
ok( close( $log ), 'close IO::File log file' );
diag( "Test resolving from a SCALAR handle ($optimize)" );
ok( open( $log,'<',$unresolved ), 'check opening unresolved file' );
my $scalar;
{local $/; $scalar = <$log>}
ok( close( $log ), 'check closing unresolved file' );
ok( open( $log,'<',\$scalar ), 'check SCALAR handle' );
my $output;
$resolve = Thread::Pool::Resolve->new(
{
optimize => $optimize,
pre => sub { open( $output,'>',$filtered ) or die "$filtered: $!" },
monitor => sub { print $output $_[0] },
post => sub { close( $output ) },
resolver => 'gethostbyaddr',
}
);
isa_ok( $resolve,'Thread::Pool::Resolve', 'check object type' );
ok( $resolve->read( $log ), 'read from opened SCALAR' );
$resolve->shutdown;
ok( check( $resolved,$filtered ), 'check result of opened SCALAR' );
ok( close( $log ), 'close IO::File log file' );
diag( "Test resolving from a list ($optimize)" );
ok( open( $log,'<',$unresolved ), 'check opening unresolved file' );
my @array = <$log>;
ok( close( $log ), 'check closing unresolved file' );
$resolve = Thread::Pool::Resolve->new(
{
optimize => $optimize,
resolver => 'gethostbyaddr',
},
$filtered
);
isa_ok( $resolve,'Thread::Pool::Resolve', 'check object type' );
ok( $resolve->lines( @array ), 'check object returned from lines' );
$resolve->shutdown;
ok( check( $resolved,$filtered ), 'check result from lines' );
diag( "Test resolving from different threads ($optimize)" );
@shared = @array;
$resolve = Thread::Pool::Resolve->new(
{
optimize => $optimize,
resolver => 'gethostbyaddr',
},
$filtered
);
isa_ok( $resolve,'Thread::Pool::Resolve', 'check object type' );
my @thread;
push( @thread,threads->new( \&bythread ) ) foreach 1..10;
$_->join foreach @thread;
$resolve = undef; # needed to finalize streaming in time
ok( check( $resolved,$filtered ), 'check result different threads' );
diag( "Test resolving from Thread::Queue ($optimize)" );
@shared = (@array,undef);
my $queue = bless \@shared,'Thread::Queue';
isa_ok( $queue,'Thread::Queue', 'check object type' );
$resolve = Thread::Pool::Resolve->new(
{
status => \%status,
optimize => $optimize,
resolver => 'gethostbyaddr',
},
$filtered
);
isa_ok( $resolve,'Thread::Pool::Resolve', 'check object type' );
ok( $resolve->read( $queue ), 'check result of reading with queue' );
$resolve = undef;
ok( check( $resolved,$filtered ), 'check result Thread::Queue' );
diag( "Test resolving using Thread::Conveyor ($optimize)" );
my $belt = Thread::Conveyor->new( {optimize => $optimize} );
isa_ok( $belt,'Thread::Conveyor', 'check object type' );
$belt->put( $_ ) foreach @array,undef;
$resolve = Thread::Pool::Resolve->new(
{
optimize => $optimize,
resolved => retrieve( $ip2domain ),
resolver => 'gethostbyaddr',
},
$filtered
);
isa_ok( $resolve,'Thread::Pool::Resolve', 'check object type' );
ok( $resolve->read( $belt ), 'check result of reading with belt' );
$resolve = undef;
ok( check( $resolved,$filtered ), 'check result Thread::Conveyor' );
ok( unlink( $unresolved,$resolved,$filter,$filtered ), 'remove created files' );
#=======================================================================
# necessary subroutines
# adding resolve lines by shared array per thread
sub bythread {
local ($_);
READ: while (1) {
{lock( @shared );
my $line = shift( @shared );
last READ unless defined( $line );
$resolve->line( $line );
}
}
}
# resolve from the dummy hash
sub gethostbyaddr { select( undef,undef,undef,rand() ); $ip2domain{$_[0]} }
# check two files and return true if they are the same
sub check {
my ($file1,$file2) = @_;
open( my $handle1,'<',$file1 ) or return 0;
open( my $handle2,'<',$file2 ) or return 0;
while (readline( $handle1 )) {
my $line = readline( $handle2 );
return 0 unless defined($line);
next if $line eq $_;
return 0;
}
my $line = readline( $handle2 );
return !defined($line);
}