#!/usr/bin/env perl # PODNAME: abq 'app basis queue' script to add things to queues # ABSTRACT: add things to queues, using App::Basis::Queue =head1 SYNOPSIS add a message to queue fred, will be a simple queue, --type=simple is default > abq -q fred "message to tweet" # add a message to the work task queue > abq -q work --type=task "process /some/file/path" # add to a pubsub queue > abq --queue=notes --type=pubsub "started a program" # listen to all pubsub queue messages forever, messages are writen to stdout > abq --queue=notes --type=pubsub --listen # take an item from a simple queue > abq -q fred --pop # process an item in a task queue, exit status will determin if it is processed # the queue message is passed to the exec command in quotes # obviously there are security concerns around doing this, clean your inputs! > abq -q work --type=task --exec "/command/to/run" # peek at work items in a task queue, --type=task is default for a peek > abq --peek --count=10 -q work to get full help use > abq --help =head1 DESCRIPTION Add am item to a queue or process an item from a queue config file is in ~/.abq abq: queue: dsn: dbi:SQLite:/tmp/abq.sqlite user: password: The queue entry holds information about the queue database that you want to connect to, this is obviously a perl DBI style connection =cut # # (c) Kevin Mulholland, moodfarm@cpan.org # this code is released under the Perl Artistic License # ----------------------------------------------------------------------------- use 5.10.0 ; use strict ; use warnings ; use POSIX qw(strftime) ; use App::Basis ; use App::Basis::Config ; use DBI ; use App::Basis::Queue ; use Date::Manip::Date ; use feature 'say' ; use Lingua::EN::Inflexion ; # what about Data::Dumper::GUI or YAML::Tiny::Color use Data::Printer ; use constant FIVE_DAYS => 5 * 24 * 3600 ; use constant PEEK_DEFAULT => 10 ; use constant EXEC_DEFAULT => 1 ; # ----------------------------------------------------------------------------- use constant QUEUE_CONFIG => "$ENV{HOME}/.abq" ; my @queue_types = qw(task simple pubsub) ; # ----------------------------------------------------------------------------- # lets do the testing stuff with private variables { my $testing = 0 ; sub set_testing { $testing = 1 ; } sub is_testing { $testing ; } } # ----------------------------------------------------------------------------- my $program = get_program() ; # URL parsing from https://metacpan.org/pod/URI my $valid_url = "(?:([^:/?#]+):)?(?://([^/?#]*))?([^?#]*)(?:\?([^#]*))?(?:#(.*))?" ; # ----------------------------------------------------------------------------- # always create the datetime strings the same way sub std_datetime { my ($secs) = @_ ; $secs ||= time() ; return strftime( "%Y-%m-%d %H:%M:%S UTC", gmtime($secs) ) ; } # ----------------------------------------------------------------------------- # convert something like a datetime string or an epoch value into a standardised # datetime string and epoch value sub parse_datetime { my ($datetime) = @_ ; state $date = Date::Manip::Date->new() ; my @ret ; if ( !$datetime ) { return wantarray ? ( undef, undef ) : undef ; } elsif ( $datetime =~ /^\d+$/ ) { # assume anything less than five days is a time into the future $datetime += time() if ( $datetime <= FIVE_DAYS ) ; @ret = ( std_datetime($datetime), $datetime ) ; } else { # so parse will parse in locale time not as UTC $date->parse($datetime) ; { # if we get a warning about converting the date to a day, there # must be a problem with parsing the input date string local $SIG{__WARN__} = sub { die "Invalid date, could not parse" ; } ; my $day = $date->printf("%a") ; } my $d2 = $date->printf("%O %Z") ; # reparse the date to get it into UTC, best way I could think of :( $date->parse($d2) ; # secs_since_1970_GMT is epoch @ret = ( std_datetime( $date->secs_since_1970_GMT() ), $date->secs_since_1970_GMT() ) ; } return wantarray ? @ret : $ret[0] ; } # ----------------------------------------------------------------------------- # build suitable config sub create_default_config { my ($cfg) = @_ ; $cfg->store() ; } # ----------------------------------------------------------------------------- # connect to the queue DB sub connect_queue { my ( $dsn, $user, $passwd, $qname ) = @_ ; my $dbh = DBI->connect( $dsn, $user, $passwd, { RaiseError => 1, PrintError => 0, AutoCommit => 1 } ) or die "Could not connect to DB $dsn" ; if ( $dsn =~ /SQLite/i ) { $dbh->do("PRAGMA journal_mode = WAL") ; $dbh->do("PRAGMA synchronous = NORMAL") ; } my $queue = App::Basis::Queue->new( dbh => $dbh, default_queue => $qname, debug => 0, ) ; return $queue ; } # ----------------------------------------------------------------------------- # main my $action ; my %opt = init_app( help_text => "Simple script to queue messages for later action use perldoc $program to get the setup for the ~/.$program config file", help_cmdline => "message to send", options => { 'verbose|v' => 'Output useful information', 'queue|q=s' => { desc => 'queue to add things to', required => 1 }, 'size|s' => 'Disply the number of unprocessed items in a task queue', 'peek|p' => 'Display the next few items in a task queue, use count to limit, default ' PEEK_DEFAULT, 'type|t=s' => { desc => 'Type of the queue, one of ' . join( ", ", @queue_types ), default => 'task', required => 1, validate => sub { my $t = shift ; grep {/$t/} @queue_types ; }, }, 'listen|l' => 'Listen for pubsub messages on the queue, use count to limit, default no limit', 'exec|e=s' => { desc => "command to run with the message, use count to limit, default " . EXEC_DEFAULT, }, 'activates|a=s' => { desc => 'Parsable UTC datetime after which the message should be valid', }, 'count|c=i' => 'Number of messages to read', } ) ; my $msg = join( ' ', @ARGV ) ; if ( $opt{test} ) { set_verbose(1) ; set_testing(1) ; } # lets have the config named after this program my $cfg = App::Basis::Config->new( filename => "$ENV{HOME}/.$program", die_on_error => 1 ) ; create_default_config($cfg) ; my ( $activates, $epoch ) = parse_datetime( $opt{activates} ) ; my ( $queue, $settings ) ; $settings->{file} ||= get_program . "_$queue.tweets" ; msg_exit( "Could not find valid config in $ENV{HOME}/.$program", 2 ) if ( !$queue || !$settings ) ; # update the config if it needs it $cfg->store() ; my $q = $cfg->get("/abq/queue") ; $q->{prefix} ||= "/" ; my $theq = connect_queue( $q->{dsn}, $q->{user}, $q->{password}, $q->{prefix} . $queue ) ; if ( !$theq ) { msg_exit( "Could not connect to queue $q->{dsn}", 2 ) ; } # get the things out of the way that are information only # if asking for size or peeking, then there is no message adding or sending if ( $opt{size} || $opt{peek} ) { my $s = $theq->queue_size() ; if ( $opt{size} ) { say inflect "<#n:$s> <N:items> <V:were> found in the queue" ; } else { if ($s) { my $count = 1 ; say "-" x 80 ; foreach my $tweet ( $theq->peek( count => $opt{peek} ) ) { say $count++ . ":\n$tweet->{data}->{tweet}" ; say "-" x 80 ; } } else { say "The queue is empty" ; } } } else { # if we have a message then this should be added to the queue asap if ($msg) { my $resp = $theq->add( data => { tweet => $msg }, activates => $activates ) ; } if ( !$action && !$msg ) { verbose("Parameters are required, or message queue is empty") ; exit 0 ; } }