Author image Robert Acock 🤞🌏
and 1 contributors

NAME

Thread::Pool::Resolve - resolve logs asynchronously

SYNOPSIS

 use Thread::Pool::Resolve;
 Thread::Pool::Resolve->timeout( 60 ); # only for default resolver
 
 $resolve = Thread::Pool::Resolve->new( {field => setting}, parameters );

 $resolve->read( | file | handle | socket | belt );
 $resolve->line( single_log_line );

DESCRIPTION

                 *** A note of CAUTION ***

 This module only functions on Perl versions 5.8.0 and later.
 And then only when threads are enabled with -Dusethreads.
 It is of no use with any version of Perl before 5.8.0 or
 without threads enabled.

                 *************************

The Thread::Pool::Resolve module allows you to resolve log-files (any source of data in which the first characters on a line constitute an IP number) in an asynchronous manner using threads. Because threads are used to resolve IP numbers in parallel, the wallclock time of the resolving process can be significantly reduced.

Because the Thread::Pool::Resolve module is very flexible in its input and output media, you can e.g. resolve log lines in real-time and store the result in a data-base (rather than in a text-file).

If you are more interested in as low a CPU usage as possible, you should probably just create a simple filter using the gethostbyaddr function.

CLASS METHODS

This method can be called without an instantiated Thread::Pool::Resolve object.

new

 $resolve = Thread::Pool::Resolve->new(
  {
   open => sub { print "open output handle with @_\n",   # default: STDOUT
   close => sub { print "close output handle with @_\n", # default: none

   pre => sub { print "start monitoring yourself\n" },   # alternative
   monitor => sub { print "monitor yourself\n" },        # for
   post => sub { print "stop monitoring yourself\n" },   # open/close

   status => \%status,                    # monitor progress resolver
   checkpoint => sub { print "checkpointing\n" },
   frequency => 1000,

   resolved => \%resolved,                # default: empty hash
   resolver => \&myownresolver,           # default: gethostbyaddr()

   autoshutdown => 1, # default: 1 = yes

   workers => 10,     # default: 10
   maxjobs => 50,     # default: 5 * workers
   minjobs => 5,      # default: maxjobs / 2
  },

  qw(file layers)     # parameters to "open", "close", "pre" and "post"

 );

The "new" method returns an instantiated Thread::Pool::Resolve object.

The first input parameter is a reference to a hash with fields that adapt settings for the resolving process.

The other input parameters are optional. If specified, they are passed to the the "open" or the "pre" routine when resolving is started.

There are basically two modes of operation: one mode in which the resolved log is simply written to a file (specified with the "open" field) and a mode in which you have complete control over what happens to each resolved log line (specified with the "monitor" field).

writing result to a file

The simplest way is to specify an "open" subroutine that returns the file handle to which the output should be written. This could be as simple as:

 open => sub { open( my $out,'>',$_[0] ) or die "$_[0]: $!"; $out },

The extra parameters that you specify with the new method, are passed to the "open" routine. So if you pass the filename, it can be used by the "open" routine to open the desired file for writing.

The reason it is not possible to specify a handle directly, is that the file will only be opened in the monitoring thread. This prevents any locking or mutex problems at the file system level. It also allows you to use PerlIO layers that (as of yet) cannot be used in multiple threads at the same time, such as the ":gzip" (PerlIO::gzip) IO layer (available from CPAN). And it allows you to use DBI::DBD drivers that are (as of yet) not threadsafe yet.

If you don't specify an "open" routine, and don't have a "monitor" routine specified either, then a default "open" routine will be assumed that will assume the first parameter to be the filename and the second parameter to be any PerlIO layers to apply. If no parameters are present, output will be sent to STDOUT.

monitoring resolved lines yourself

If you're interested in any monitoring of the resolved log lines (e.g. if you want to filter out certain domain names), you can specify a "monitor" routine that will be called for each resolved log line. You can set up any file handles with the "pre" routine and perform any shutdown operations with a "post" routine.

These are all the possible fields that you may specify with the new method.

optimize
 optimize => 'cpu', # default: 'memory'

The "optimize" field specifies which implementation of the belt will be selected. Currently there are two choices: 'cpu' and 'memory'. By default, the "memory" optimization will be selected if no specific optmization is specified.

You can call the class method optimize to change the default optimization.

open
 open => 'open_output_file',            # name or code reference

The "open" field specifies the subroutine that determines to which file the resolved log lines should be written. It must be specified as either the name of a subroutine or as a reference to a (anonymous) subroutine.

The "open" routine is expected to return a file handle to which the resolved log files should be written. Any extra parameters that are passed to new, will be passed through to this subroutine.

If no "open" field is specified, and no "monitor" field is specified either, then a default "open" routine will be assumed that interpretes the extra parameters passed to new as:

 1 name of file to write resolved log lines to (default: STDOUT)
 2 PerlIO layers to apply (default: none)

So, if you would like to write the log file to "resolved.gz" and use the :gzip PerlIO layer (available from CPAN), you would specify this as:

 $resolve = Thread::Pool::Resolve->new( {},'resolved.gz',':gzip' );

Please note the empty hash reference as the first parameter in this case.

close
 close => 'close_output_file',          # name or code reference

The "close" field specifies the subroutine that will be called when an "open" routine was specifically specified and the monitoring thread is shutting down. It must be specified as either the name of a subroutine or as a reference to a (anonymous) subroutine.

The "close" routine allows you to do any extra cleanup operations. Please note however that the file to which the resolved log lines were written, is closed automatically, so you don't need to specify a "close" routine for that.

Any extra parameters that are passed to new, will be passed through to the "close" subroutine.

pre
 pre => 'prepare_monitoring',           # name or code reference

The "pre" field specifies the subroutine to be executed once before the first time the "monitor" routine is called. It must be specified as either the name of a subroutine or as a reference to a (anonymous) subroutine.

The specified subroutine should expect the following parameters to be passed:

 1..N  any additional parameters that were passed with the call to L<new>.

The "pre" routine executes in the same thread as the "monitor" routine.

monitor
 monitor => 'in_order_read',            # name or code reference

The "monitor" field specifies the subroutine to be executed for monitoring the results of the resolving process. If specified, the "monitor" routine is called once for each resolved log line in the order in which the (unresolved) lines occur in the original log.

The specified subroutine should expect the following parameters to be passed:

 1 resolved log line

Whatever the "monitor" routine does with the resolved log line, is up to the developer. Normally it would write the line into a file or an external database.

post
 post => 'cleanup_after_monitoring',    # name or code reference

The "post" field specifies the subroutine to be executed when the Thread::Pool::Resolve object is shutdown specifically or implicitely when the object is destroyed. It must be specified as either the name of a subroutine or as a reference to a (anonymous) subroutine.

The specified subroutine should expect the following parameters to be passed:

 1..N  any additional parameters that were passed with the call to L<new>.

The "post" routine executes in the same thread as the "monitor" routine.

status
 status => \%status,

The "status" field specifies a reference to a shared hash that will be filled with the status messages of the resolving process. A one line status message will be set by the thread doing the resolving process, keyed to the numerical thread id (tid) of the process.

No status will be set if no hash reference is specified (which is the default).

checkpoint
 checkpoint => 'checkpointing',          # name or code reference

The "checkpoint" field specifies the subroutine to be executed everytime a checkpoint should be made by the monitoring routine (e.g. for saving or updating status). It must be specified as either the name of a subroutine or as a reference to a (anonymous) subroutine.

It only makes sense to specify a checkpoint routine if there is also a monitoring routine specified. No checkpointing will occur by default if a monitoring routine is specified. The frequency of checkpointing can be specified with the "frequency" field.

The specified subroutine should not expect any parameters to be passed. Any values returned by the checkpointing routine, will be lost.

frequency
 frequency => 100,                             # default = 1000

The "frequency" field specifies the number of jobs that should have been monitored before the "checkpoint" routine is called. If a checkpoint routine is specified but no frequency field is specified, then a frequency of 1000 will be assumed.

This field has no meaning if no checkpoint routine is specified with the "checkpoint" field. The default frequency can be changed with the frequency method, which is inherited from Thread::Pool.

resolved
 resolved => \%resolved,

The "resolved" field specifies a reference to a shared hash that contains domain names keyed to IP numbers. An empty shared hash will be used if the "resolved" field is not specified.

The "resolved" hash contains the IP numbers as the keys and the domain names as the associated values. An IP number should still exist in the hash if the IP number could not be resolved: its value could be either the undefined value, the empty string or the IP number.

Use the resolved object method to obtain a reference to the resolved hash after the resolving process is completed (after a shutdown). You can then save the resolved hash (e.g. with the Storable module) so that you can later use these result for future resolving of other log files.

resolver
 resolver => \&resolver,                # name or code reference

The "resolver" field specifies the subroutine that should be called to resolve an IP number. A special internal resolver subroutine (based on calling gethostbyaddr) will be assumed if the "resolver" field is not specified.

The "resolver" subroutine should expect the IP number (as a string) as its input parameter. It is expected to return the domain name associated with the IP number, or the undefined value or empty string if resolving failed.

You can call the resolver method to obtain the code reference of the "resolver" subroutine actually used.

autoshutdown
 autoshutdown => 0, # default: 1

The "autoshutdown" field specified whether the shutdown method should be called when the object is destroyed. By default, this flag is set to 1 indicating that the shutdown method should be called when the object is being destroyed. Setting the flag to a false value, will cause the shutdown method not to be called, causing potential loss of data and error messages when threads are not finished when the program exits.

The setting of the flag can be later changed by calling the inherited autoshutdown method.

workers
 workers => 25, # default: 10

The "workers" field specifies the number of worker threads that should be created when the pool is created. If no "workers" field is specified, then ten worker threads will be created. The inherited workers method can be used to change the number of worker threads later.

maxjobs
 maxjobs => 125, # default: 5 * workers

The "maxjobs" field specifies the maximum number of lines that can be waiting to be handled. If a new log line would exceed this amount, submission of log lines will be halted until the number of lines waiting to be handled has become at least as low as the amount specified with the "minjobs" field.

If the "maxjobs" field is not specified, an amount of 5 * the number of worker threads will be assumed. If you do not want to have any throttling, you can specify the value "undef" for the field. But beware! If you do not have throttling active, you may wind up using excessive amounts of memory used for storing all of the log lines in memory before they are being handled.

The inherited maxjobs method can be called to change the throttling settings during the lifetime of the object.

minjobs
 minjobs => 10, # default: maxjobs / 2

The "minjobs" field specified the minimum number of log lines that can be waiting to be handled before submission is allowed again (throttling).

If throttling is active and the "minjobs" field is not specified, then half of the "maxjobs" value will be assumed.

The inherited minjobs method can be called to change the throttling settings during the lifetime of the object.

optimize

 Thread::Pool::Resolve->optimize( 'cpu' );

 $optimize = Thread::Pool::Resolve->optimize;

The "optimize" class method allows you to specify the default optimization type that will be used if no "optimize" field has been explicitely specified with a call to new. It returns the current default type of optimization.

Currently two types of optimization can be selected:

memory

Attempt to use as little memory as possible. Currently, this is achieved by starting a seperate thread which hosts an unshared array. This uses the "Thread::Conveyor::Thread" sub-class.

cpu

Attempt to use as little CPU as possible. Currently, this is achieved by using a shared array (using the "Thread::Conveyor::Array" sub-class), encapsulated in a hash reference if throttling is activated (then also using the "Thread::Conveyor::Throttled" sub-class).

timeout

 Thread::Pool::Resolve->timeout( 120 ); # default: 60

 Thread::Pool::Resolve->timeout( 0 );   # de-activate timeout checks

 $timeout = Thread::Pool::Resolve;

The "timeout" class method returns the current timout setting (in seconds) that will be used by the default resolver. It can also be used to change the timeout setting used by the default resolver. A value of 0 can be used to disable timeout checking.

The timeout feature makes use of alarm(), which may cause problems on some operating system and/or in conjunction with sleep().

OBJECT METHODS

These methods can be called on instantiated Thread::Pool::Resolve objects.

read

 $resolve->read;
 $resolve->read( 'file' );
 $resolve->read( 'file.gz',':gzip' );
 $resolve->read( $known );
 $resolve->read( $strange,'method' );

The "read" method specifies the source from which log lines will be read. The STDIN handle will be assumed if no parameters are specified.

The first input parameter can either be a filename or a reference (to an object). If it is a file name, it is the name of the log file that will be read to resolve the IP numbers. In that case, the second input parameter may be used to specify any PerlIO layers that should be applied to the reading process.

If the first input parameter parameter is a reference (to an object), it is assumed to have a method for obtaining lines one-by-one. If the reference or the object is recognized, the appropriate method will be automatically selected. The name of the method to be used must be specified as the second input parameter if the object is not recognized.

Currently the following objects and reference types are recognized:

 objects             | references
 ================================
 IO::File            | GLOB
 IO::Handle          | SCALAR
 IO::Socket          |
 Thread::Conveyor    |
 Thread::Queue       |

Other object types and references may be added in the future.

The "read" method returns the object itself, which maybe handy for one-liners.

line

 $resolve->line( "1.2.3.4 accessed this" );

The "line" method allows you to submit a single line for resolving. It is intended to be used in real-time logging situations, specifically from multiple threads. Use the lines method to submit more than one log line at a time. Use the read method to submit all lines from a file or an object.

lines

 $resolve->lines( @logline );

The "lines" method allows you to submit more than one line at a time for resolving. Use the line method if you only want to submit a single line for resolving. Use the read method to submit all lines from a file or an object.

The "lines" method returns the object itself, which maybe handy for one-liners.

resolved

 $resolved = $resolve->resolved;

The "resolved" method returns a reference to the shared hash that contains the IP number to domain name translations. It either is the same as what was specified with the new method and the "resolved" field, or it is a reference to a newly created shared hash. It can be used to provide persistence to the resolved hash, e.g. with the Storable module. Later incarnations can then specify the "resolved" field to continue resolving using the IP number to domain name translation information from previous sessions.

status

 $status = $resolve->status;

The "status" method returns a reference to the shared hash that contains the status information. It is the same as what was specified with the new method and the "status" field. It can be used to access the status information mechanism in a custom monitoring routine.

INSIDE JOB METHODS

These methods can be called inside the "open", "close", "pre", "post" and "monitor" routines.

resolved

 $resolved = Thread::Pool::Resolve->resolved;

The "resolved" method returns a reference to the shared hash that contains the IP number to domain name translations. It either is the same as what was specified with the new method and the "resolved" field, or it is a reference to a newly created shared hash.

resolver

 $resolver = Thread::Pool::Resolve->resolver;

The "resolver" method returns a code reference to the routine that is performing the actual resolving of IP numbers to domain name. It either is the same as what was specified with the new method and the "resolver" field, or it is a reference to the default resolver routine provided by this module itself.

status

 $status = Thread::Pool::Resolve->status;

The "status" method returns a reference to the hash that is used to keep status. It is the same as what was specified with the new method and the "status" field.

INHERITED METHODS

The following methods (in alphabetical order) are inherited from Thread::Pool. Please check the documentation there for more information about these methods.

 add            add one or more worker threads
 autoshutdown   set behaviour when object is destroyed
 frequency      (set default) checkpointing frequency
 maxjobs        set maximum number of lines waiting to be handled
 minjobs        set minimum number of lines waiting to be handled
 remove         remove one or more worker threads
 shutdown       shutdown the resolving process
 todo           number of IP numbers left to resolve (approximate)
 workers        set number of worker threads

DEBUG METHODS

The following methods are for debugging purposes only.

rand_domain

 $domain = Thread::Pool::Resolve->rand_domain;

The "rand_domain" class method returns a random domain name. It is a name that roughly conforms to what is considered to be a valid domain name. It can be used in a resolver routine if you want to quickly test your log resolving application.

rand_ip

 $ip = Thread::Pool::Resolve->rand_ip;

The "rand_ip" class method returns a random IP number. It is an IP number that roughly conforms to what is considered to be a valid IP number (basically a sequence of 4 numbers between 0 and 255, concatenated with periods). It can be used to create test log files (as is done in the test-suite of this module).

REQUIRED MODULES

 load (any)
 Thread::Pool (0.29)

BUGS

Release 5.8.0 of Perl has a bug related to the usage of signals with threads. Basically, signals within threads are only handled correctly if the main thread has "activated" the signal handler before the thread that is to receive a signal, is started.

Since the timeout function of the default resolver uses the alarm() function (which uses signals) to time out any DNS request that just takes too long, the ALRM signal must be assigned before any threads that use signals, are started. The new class method tries to be smart about this and attempts to fix this. However, it cannot fix this if it's not called from the main thread. So, in that case, it will die with the message:

 Cannot reliably use signals in threads

And when it is able to fix this, it will output a warning:

 Priming $SIG{ALRM} to get signals to work inside threads

If you are calling method new from a thread other than the main thread, or you do not want to be alarmed (pun intended) by the warning, then you must assign the ALRM signal in your main thread yourself. This can be as simple as:

 $SIG{ALRM} = sub {}; # workaround for 5.8.0 thread signal breakage

It can be placed anywhere in your main program, but before you call method new.

EXAMPLES

There are four examples right now. Of course, you can also check out the test-suite for more examples of the usage of Thread::Pool::Resolve.

simple log resolving filter

Creating a log resolving filter is really simple:

 # Make sure we have the right modules loaded
 # Start resolving from STDIN and write to STDOUT

 use Thread::Pool::Resolve;
 Thread::Pool::Resolve->new->read;

That's all there is to it. By default, resolving writes the result to STDOUT. And the "read" method reads from STDIN by default.

simple log resolving program

The following version allows the input file to be specified as the first parameter to the script, and the output file as the second parameter to the script. The first input parameter defaults to STDIN, the second to STDOUT.

 # Make sure we have the right modules loaded
 # Start resolving from given input file and write to given output file

 use Thread::Pool::Resolve;
 Thread::Pool::Resolve->new( {},$ARGV[1] )->read( $ARGV[0] );

Again, pretty simple, eh? Note the empty hash reference that needs to be specified now in the call to new.

resolve zipped log files

 # Make sure we have PerlIO::gzip (available from CPAN)
 # Make sure we can resolve

 use PerlIO::gzip;
 use Thread::Pool::Resolve;

 # Create a resolving object, writing gzipped to 'resolved.gz'
 # Read the unresolved file, gunzip on the fly and wait until all done

 my $resolve = Thread::Pool::Resolve->new( {},'resolved.gz',':gzip' );
 $resolve->read( 'unresolved.gz',':gzip' )->shutdown;

use existing IP number resolving information

 # Make sure we can resolve
 # Make sure we can have a persistent hash

 use Thread::Pool::Resolve;
 use Storable qw(store retrieve);

 # Create a resolver that starts with the information of the database

 my $resolve = Thread::Pool::Resolve->new(
  {
   resolved => retrieve( 'database.storable' )
  },
  'resolved'
 );

 # Read the log file from STDIN and wait until all done
 # Store the resulting database for later usage

 $resolve->read->shutdown;
 store( $resolve->resolved,'database.storable' );

AUTHOR

Elizabeth Mattijsen, <liz@dijkmat.nl>.

Please report bugs to <perlbugs@dijkmat.nl>.

COPYRIGHT

Copyright (c) 2002-2003 Elizabeth Mattijsen <liz@dijkmat.nl>. All rights reserved. This program is free software; you can redistribute it and/or modify it under the same terms as Perl itself.

SEE ALSO

Thread::Pool.