The London Perl and Raku Workshop takes place on 26th Oct 2024. If your company depends on Perl, please consider sponsoring and/or attending.

NAME

App::Basis::Queue

SYNOPSIS

    use App::Basis::Queue;

    my $dsn = "dbi:SQLite:/location/of/sqlite_db.sqlite3" ;
    my $dbh = DBI->connect( $dsn, "", "",
        { RaiseError => 1, PrintError => 0, } )
        or die "Could not connect to DB $dsn" ;

    my $queue = App::Basis::Queue->new( dbh => $dbh) ;

    # save some application audit data for later processing
    $queue->add(
        queue => '/invoice/pay',
        data => {
            ip => 12.12.12.12,
            session_id => 12324324345,
            client_id => 248296432984,
            amount => 250.45,
            reply => '/payments/made'
            },
    ) ;

    # in another process, we want to process that data

    use App::Basis::Queue;

    # for the example this will be paying an invoice
    sub processing_callback {
        my ( $queue, $qname, $record ) = @_;

        # call the payment system
        # pay_money( $record->{client_id}, $record->{amount}) ;

        # chatter back that the payment has been made, assume it worked
        $queue->pub( queue => $record->{reply},
            data => {
            client_id => $record->{ client_id},
            success => 1,
            }
        ) ;
    }


    my $dsn = "dbi:SQLite:/location/of/sqlite_db.sqlite3" ;
    my $dbh = DBI->connect( $dsn, "", "",
        { RaiseError => 1, PrintError => 0, } )
        or die "Could not connect to DB $dsn" ;
    my $queue = App::Basis::Queue->new( dbh => $dbh) ;
    $queue->process(
         queue => 'app_start',
         count => 10,
         callback => \&processing_callback
    ) ;

    # for pubsub we do

    use App::Basis::Queue;

    my $dsn = "dbi:SQLite:/location/of/sqlite_db.sqlite3" ;
    my $dbh = DBI->connect( $dsn, "", "",
        { RaiseError => 1, PrintError => 0, } )
        or die "Could not connect to DB $dsn" ;
    my $queue = App::Basis::Queue->new( dbh => $dbh) ;
    # for a system that wants to know when servers have started
    $queue->publish( queue => '/chat/helo', data => { host => 'abc, msg => 'helo world') ;

    # in another process

    use App::Basis::Queue;
        my $dsn = "dbi:SQLite:/location/of/sqlite_db.sqlite3" ;
    my $dbh = DBI->connect( $dsn, "", "",
        { RaiseError => 1, PrintError => 0, } )
        or die "Could not connect to DB $dsn" ;
    my $queue = App::Basis::Queue->new( dbh => $dbh) ;

DESCRIPTION

Why have another queuing system? Well for me I wanted a queuing system that did not mean I needed to install and maintain another server (ie RabbitMQ). Something that could run against existing DBs (eg PostgreSQL). PGQ was an option, but as it throws away queued items if there is not a listener, then this was useless! Some of the Job/Worker systems required you to create classes and plugins to process the queue. Queue::DBI almost made the grade but only has one queue. Minon maybe could do what was needed but I did not find it in time.

I need multiple queues plus new requirement queue wildcards!

So I created this simple/basic system. You need to expire items, clean the queue and do things like that by hand, there is no automation. You process items in the queue in chunks, not via a nice iterator.

There is no queue polling per se you need to process the queue and try again when all are done, there can only be one consumer of a record which is a good thing, if you cannot process an item it can be marked as failed to be handled by a cleanup function you will need to create.

NOTES

I would use msgpack instead of JSON to store the data, but processing BLOBS in PostgreSQL is tricky.

To make the various inserts/queries work faster I cache the prepared statement handles against a key and the fields that are being inserted, this speeds up the inserts roughly by 3x

AUTHOR

kmulholland, moodfarm@cpan.org

VERSIONS

v0.1 2013-08-02, initial work

TODO

Currently the processing functions only process the earliest MAX_PROCESS_ITEMS but by making use of the counter in the info table, then we could procss the entire table or at least a much bigger number and do it in chunks of MAX_PROCESS_ITEMS

Processing could be by date

Add a method to move processed items to queue_name/processed and failures to queue_name/failures or add them to these queues when marking them as processed or failed, will need a number of other methods to be updated but keeps less items in the unprocessed queue

See Also

Queue::DBI, AnyMQ::Queue, Minion

API

new

Create a new instance of a queue

prefix - set a prefix name of the tables, allows you to have dev/test/live versions in the same database debug - set basic STDERR debugging on or off skip_table_check - don't check to see if the tables need creating

    my $queue = App::Basis::Queue->new( dbh => $dbh ) ;

add

Add task data into a named queue. This creates a 'task' that needs to be processed.

    my $queue = App::Basis::Queue->new( dbh => $dbh) ;

    # save some application audit data
    $queue->add(
        queue => 'app_start',
        data => {
            ip => 12.12.12.12, session_id => 12324324345, client_id => 248296432984,
            appid => 2, app_name => 'twitter'
        },
    ) ;

* This does not handle wildcard queues *

queue

name of the queue

data

data to store against the queue, can be a scalar, hashref or arrayref

process

process up to 100 tasks from the name queue(s)

a reference to the queue object is passed to the callback along with the name of the queue and the record that is to be procssed.

If the callback returns a non-zero value then the record will be marked as processed. If the callback returns a zero value, then the processing is assumed to have failed and the failure count will be incremented by 1. If the failue count matches our maximum allowed limit then the item will not be available for any further processing.

    sub processing_callback {
        my ( $queue, $qname, $record ) = @_;

        return 1;
    }

    $queue->process(
        queue => 'queue_name',
        count => 5,
        callback => \&processing_callback
    ) ;

qname can contain wildcards and all matching queues will be scanned

    # add things to different queues, but with a common root
    $queue->add( queue => '/celestial/stars', data => { list: [ "sun", "alpha centuri"]}) ;
    $queue->add( queue => '/celestial/planets', data => { list: [ "earth", "pluto", "mars"]}) ;

    # process all the 'celestial' bodies queues
    $queue->process( queue => '/celestial/*', count => 5, callback => \&processing_callback) ;

process_failures

process up to 100 tasks from the queue a refrence to the queue object is passed to the callback along with the name of the queue and the record that is to be procssed. As these are failures we are not interested in an value of the callback function.

    sub processing_failure_callback {
        my ( $queue, $qname, $record ) = @_;

        # items before 2013 were completely wrong so we can delete
        if( $record->{added} < '2013-01-01') {
            $queue->delete_record( $record) ;
        } else {
            # failures in 2013 was down to a bad processing function
            $queue->reset_record( $record) ;
        }
    }

    $queue->process(
        queue => 'queue_name',
        count => 5,
        callback => \&processing_failure_callback
    ) ;

    # again we can use wildcards here for queue names

    # add things to different queues, but with a common root
    $queue->add( queue => '/celestial/stars', data => { list: [ "sun", "alpha centuri"]}) ;
    $queue->add( queue => '/celestial/planets', data => { list: [ "moon", "pluto", "mars"]}) ;
    # process, obviously 'moon' will fail our planet processing
    $queue->process(
        queue => 'queue_name',
        count => 5,
        callback => \&processing_callback
    ) ;

    # process all the 'celestial' bodies queues for failures - probably will just have the moon in it
    $queue->process_failures(
        queue => '/celestial/*',
        count => 5,
        callback => \&processing_failure_callback
    ) ;

queue_size

get the count of unprocessed TASK items in the queue

    my $count = $queue->queue_size( queue => 'queue_name') ;
    say "there are $count unprocessed items in the queue" ;

    # queue size can manage wildcards
    $queue->queue_size( queue => '/celestial/*') ;

list_queues

obtains a list of all the queues used by this database

    my $qlist = $queue->list_queues() ;
    foreach my $q (@$qlist) {
        say $q ;
    }

stats

obtains stats about the task data in the queue, this may be time/processor intensive so use with care!

provides counts of unprocessed, processed, failures max process_failure, avg process_failure, earliest_added, latest_added, min_data_size, max_data_size, avg_data_size, total_records avg_elapsed, max_elapsed, min_elapsed

    my $stats = $queue->stats( queue => 'queue_name') ;
    say "processed $stats->{processed}, failures $stats->{failure}, unprocessed $stats->{unprocessed}" ;

    # for all matching wildcard queues
    my $all_stats = $queue->stats( queue => '/celestial/*') ;

delete_record

delete a single task record from the queue requires a data record which contains infomation we will use to determine the record

may be used in processing callback functions

    sub processing_callback {
        my ( $queue, $qname, $record ) = @_;

        # lets remove records before 2013
        if( $record->{added) < '2013-01-01') {
            $queue->delete_record( $record) ;
        }
        return 1 ;
    }

* This does not handle wildcard queues *

reset_record

clear failure flag from a failed task record requires a data record which contains infomation we will use to determine the record

may be used in processing callback functions

    sub processing_callback {
        my ( $queue, $qname, $record ) = @_;

        # allow partially failed (and failed) records to be processed
        if( $record->{process_failure) {
            $queue->reset_record( $record) ;
        }
        return 1 ;
    }

* This does not handle wildcard queues *

publish

Publish some chatter data into a named queue.

arguments

    queue   - the name of the queue to publish a chatter to
    data    - hashref of data to be stored

optional arguments

    persist - 0|1 flag that this message is to be the most recent persistent one

    my $queue = App::Basis::Queue->new( dbh => $dbh) ;

    # keep track of a bit of info
    $queue->publish( queue => 'app_log',
        data => {
            ip => 12.12.12.12, session_id => 12324324345, client_id => 248296432984,
            appid => 2, app_name => 'twitter'
        }
    ) ;

* This does not handle wildcard queues *

subscribe

Subscribe to a named queue with a callback.

arguments

    queue    - the name of the queue to listen to, wildcards allowed
    callback - function to handle any matced events

optional arguments

    after   - unix time after which to listen for events, defaults to now,
            if set will skip persistent item checks
    persist - include the most recent persistent item, if using a wild card, this
            will match all the queues and could find multiple persistent items

    my $queue = App::Basis::Queue->new( dbh => $dbh) ;

    # keep track of a bit of info
    $queue->subscribe( queue => 'app_logs/*', callback => \&handler) ;
    $queue->listen() ;

listen

Listen to all subcribed channels. Loops forever unless told to stop. If there are any persistent messages, this will be passed to the callbacks first.

optional arguments

    events - minimum number of events to listen for, stop after this many,
            may stop after more - this is across ALL the subscriptions
    datetime - unix epoch time when to stop listening, ie based on time()

returns number of chatter events actually passed to ALL the handlers

    my $queue = App::Basis::Queue->new( dbh => $dbh) ;
    $queue->subscribe( '/logs/*', \&handler) ;
    $queue->listen() ;    # listening  forever

unsubscribe

Unsubscribe from a named queue.

    sub handler {
        state $counter = 0 ;
        my $q = shift ;             # we get the queue object
        # the queue trigger that matched, the actual queue name and the data
        my ($qmatch, $queue, $data) = @_ ;

        # we are only interested in 10 messages
        if( ++$counter > 10) {
            $q->unsubscribe( queue => $queue) ;
        } else {
            say Data::Dumper( $data) ;
        }
    }

    my $queue = App::Basis::Queue->new( dbh => $dbh) ;
    $queue->subscribe( queue => '/logs/*', callback => \&handler) ;
    $queue->listen() ;

purge_tasks

purge will remove all processed task items and failures (process_failure >= 5). These are completely removed from the database

    my $before = $queue->stats( queue => 'queue_name', before => '2015-11-24') ;
    $queue->purge_tasks( queue => 'queue_name') ;
    my $after = $queue->stats( queue  => 'queue_name') ;

    say "removed " .( $before->{total_records} - $after->{total_records}) ;


    before is optional and will default to 'now'

purge_chatter

purge will remove all chatter messages. These are completely removed from the database

    my $del = $queue->purge_chatter( queue => 'queue_name', before => '2015-11-24') ;

    say "removed $del messages" ;

    before is optional and will default to 'now'

remove_queue

remove a queue and all of its records (task and chatter)

    $queue->remove_queue( queue => 'queue_name') ;
    my $after = $queue->list_queues() ;
    # convert list into a hash for easier checking
    my %a = map { $_ => 1} @after ;
    say "queue removed" if( !$q->{queue_name}) ;

* This does not handle wildcard queues *

reset_failures

clear any process_failure values from all unprocessed task items

    my $before = $queue->stats( queue => 'queue_name') ;
    $queue->reset_failures( queue => 'queue_name') ;
    my $after = $queue->stats( queue => 'queue_name') ;

    say "reset " .( $after->{unprocessed} - $before->{unprocessed}) ;

remove_failures

permanently delete task failures from the database

    $queue->remove_failues( queue => 'queue_name') ;
    my $stats = $queue->stats( queue => 'queue_name') ;
    say "failues left " .( $stats->{failures}) ;

remove_tables

If you never need to use the database again, it can be completely removed

    $queue_>remove_tables() ;