—#! /usr/bin/env perl
use
v5.10;
use
Getopt::Std;
use
File::Basename;
use
DB_File;
$|++;
# disable buffering on STDOUT - autoflush
our
$VERSION
=
'1.0'
;
our
$re_line
=
qr/(?<n>\w+):\s*(?<p>.+?)\s*(\[(?<dt>\w+)\]\s*)?:\s*(?<c>.*)\s*(?<a>\&)?/
;
sub
daemonize {
defined
(
my
$pid
=
fork
) or
die
"Can't fork: $!"
;
exit
if
$pid
;
setsid or
die
"Can't start a new session: $!"
;
umask
0;
}
sub
load_config {
my
$config_filename
=
shift
;
my
$data
=
shift
;
return
if
(! -e
$config_filename
);
my
$config_mt
= (
stat
$config_filename
)[9];
my
@config
= ();
'-'
x10,
' loading rules '
,
'-'
x10,
"\n"
;
my
$lines
=
do
{
# narrow scope
local
$/;
# Enter file slurp mode localized
open
my
$in_fh
,
'<'
,
$config_filename
or
"Cannot read '$config_filename': $!\n"
;
<
$in_fh
>;
# slurp whole input file in a run
};
for
my
$line
(
split
(/\n/,
$lines
) ) {
if
(
$line
!~ /^
#/ and $line =~ /:/) { # not starts with '#' and has ':'
if
(
$line
=~ /
$re_line
/) {
$line
,
"\n"
;
my
$name
= $+{n};
my
$pattern
= $+{p};
my
$dt_str
= $+{dt} //
'1'
;
for
(
qw/1 s m h d/
) {
$dt_str
=~ s/s/*1/;
$dt_str
=~ s/m/*60/;
$dt_str
=~ s/h/*3600/;
$dt_str
=~ s/d/*24*3600/;
$dt_str
=~ s/w/*7*24*3600/;
}
my
$dt
=
eval
$dt_str
;
my
$command
= $+{c};
my
$ampersand
= $+{a};
push
@config
, [
$name
,
$pattern
,
$dt
,
$command
,
$ampersand
];
$data
->{
$name
} = ()
if
( !
$data
->{
$name
} );
}
}
}
'-'
x35,
"\n"
;
return
\
@config
,
$config_mt
;
}
sub
taskflow {
my
(
$folder
,
$config_filename
,
$cache_filename
,
$target_name
,
$sleep
) =
@_
;
tie
my
%data
,
"DB_File"
,
$cache_filename
;
my
(
$config
,
$config_mt
) = load_config(
$config_filename
, \
%data
);
my
%processes
= ();
while
(
@$config
){
my
$pause
= 1;
(
$config
,
$config_mt
) = load_config(
$config_filename
, \
%data
)
if
(
$config_mt
< (
stat
$config_filename
)[9]);
return
if
(!
@$config
);
for
my
$clear
(<.taskflow.*.clear>) {
my
$rule
=
substr
(
$clear
, 10, -6);
delete
$data
{
$rule
};
unlink
(
$clear
);
}
for
my
$cfg
(
@$config
) {
next
if
(!
defined
$cfg
);
my
(
$name
,
$pattern
,
$dt
,
$action
,
$ampersand
) =
@$cfg
;
for
my
$filename
(
glob
(
$pattern
)) {
next
if
(!
$filename
);
my
$mt
= (
stat
$filename
)[9];
next
if
(
$mt
>
time
-
$dt
);
my
$pid_file
=
$filename
.
".$name.pid"
;
my
$log_file
=
$filename
.
".$name.out"
;
my
$err_file
=
$filename
.
".$name.err"
;
(
my
$key
=
$pattern
.
'='
.
$filename
.
':'
.
$action
) =~ s/\s+/ /g;
unless
(-e
$pid_file
or -e
$err_file
) {
if
(!
exists
$data
{
$key
} or
$data
{
$key
} !=
$mt
){
(
my
$command
=
$action
) =~ s/\Q
$target_name
\E/
$filename
/g;
my
$return
;
if
(
my
$pid
=
fork
) {
# parent - child process pid is available in $pid
open
my
$fh
,
'>'
,
$pid_file
or
die
$!;
$fh
$pid
;
# write pid
close
$fh
;
waitpid
(
$pid
, 0)
unless
(
$ampersand
);
}
else
{
# $pid is zero here if defined
die
"cannot fork: $!"
unless
defined
$pid
;
# parent process pid is available with getppid
open
STDOUT,
'>'
,
$log_file
;
open
STDERR,
'>'
,
$log_file
;
$return
=
system
$command
;
close
STDOUT;
close
STDERR;
}
$processes
{
$pid_file
} = [
$filename
,
$command
,
$return
];
}
}
my
@pids
=
keys
%processes
;
if
(
$pid_file
~~
@pids
and
exists
$processes
{
$pid_file
}[2] and
$processes
{
$pid_file
}[2] == 0) {
my
(
$filename
,
$command
,
$return
) = @{
$processes
{
$pid_file
}};
if
(
$return
){
open
my
$fh
,
'>'
,
$err_file
or
die
$!;
$fh
$return
;
close
$fh
;
}
else
{
$data
{
$key
} =
$mt
;
$data
{
$name
} = (
defined
$data
{
$name
}) ?
$data
{
$name
}.
' '
.
$key
:
$key
;
}
delete
$processes
{
$pid_file
};
unlink
$pid_file
;
$pause
= 0;
}
elsif
(-e
$pid_file
and
$pid_file
!~
@pids
) {
unlink
$pid_file
;
$pause
= 0;
}
}
sleep
$sleep
if
(
$pause
);
}
}
}
sub
version {
my
$ver
=
shift
//
$VERSION
;
"Version: $ver\n"
;
exit
0; }
sub
usage {
system
(
"perldoc $0"
);
exit
0; }
my
$myname
= basename($0);
my
%opt
=(
s
=> 1,
n
=>
'$0'
,
f
=>
'./'
,
x
=>
"$myname.config"
,
y
=>
"$myname.cache"
);
getopts(
'hvf:s:n:x:y:dc:'
, \
%opt
);
usage()
if
defined
$opt
{h};
version()
if
defined
$opt
{v};
daemonize()
if
defined
$opt
{d};
if
(
defined
$opt
{c}){
open
my
$fh
,
'>'
,
sprintf
(
".$myname.%s.clear"
,
$opt
{c});
$fh
scalar
localtime
;
exit
0;
}
taskflow(
$opt
{f},
$opt
{x},
$opt
{y},
$opt
{n},
$opt
{s}
);
1;
__END__
=head1 NAME
tasklet - light version and one file script of I<taskflow>: a light
weight file based taskflow engine
=head1 VERSION
Version 1.0
=cut
=head1 SYNOPSIS
=over 4
=item * create a configuration file, synatx defined below - default
name will be `tasklet.config`
=item * run `tasklet`
=back
=head1 DESCRIPTION
`tasklet` is a tiny workflow engine. It could be run as a normal
process, as a background process or as a daemon.
It automates many routine tasks such as moving and deleting files,
sending emails if some new files created or other tasks.
Files will be chosen by patterns and these patterns can be written in a
configuration file. For each file which is mached by a pattern, a new
process will be started.
If the process produces any output, it will be saved in a file with
`out` ending. Additionaly there is a file with ending `err` for error
message of started process.
The information about each processed file will be stored in a binary file
named `tasklet.cach`. Such a file will not be processed again, only if
=over 4
=item * the modified-time of the file changes (you edit or touch the file)
=item * the rule is cleaned up
=back
A rule can be cleaned up with:
tasklet -c rulename
or
tasklet --cache rulename
This command creates a file named `.tasklet.rulename.clear`. The next
running `tasklet` will clear corresponding entries in `tasklet.cache`.
It causes that the rule will be run again.
If you delete the `tasklet.cache` file, all rules will be run again.
If `tasklet` crashes, you find the name of rule and file which causes
such crashes in `<filename>.<rulename>.pid`. These pid files will be
deleted, if you restart `tasklet`.
If a rule results in an error and a `<filename>.<rulename>.err` is
created, the file is not processed again according to the rule, unless
the error file is deleted.
If a file is edited or touched and the rule runs again, the
`<filename>.<rulename>.out` will be overwritten.
Unless otherwise specified each file is processed 1s after it is last
modified. It is possible that a different process is still writing the
file but it is pausing more than 1s between writes (for example the file
is being downloaded via a slow connection). In this case it is best to
download the file with a different name than the name used for the
patterm and rename the file to its proper name after the write of the
file is completed. This must be handled outside of tasklet. tasklet
has no way of knowing when a file is completed or not.
Any change to `tasklet.config` file causes that it is realaoded
automatically by running `tasklet`.
=head1 tasklet options
-f <path> processing and monitoring directory
-s <seconds> the time interval between checks for new files
-n <name> the current filename, defaults to `$0`
-x <path> name of config file ( default: tasklet.config)
-y <path> name of cache file to use (default: tasklet.cache)
-d daemonizes `tasklet`
-c <rulename> clears a rule without starting the `tasklet`
-h display this documentation
-v display current version
=head1 `tasklet.config` syntax
`tasklet.config` consists of a series of rules with the following syntax
rulename: pattern [dt]: command
here:
=over 4
=item * `rulename`: name of the rule (no space)
=item * `pattern`: pattern of monitoring files
=item * `dt`: time interval (default:1 sec). Only files newer than `dt`
seconds will be considered, time units are as expected (see also
examples):
s = seconds, m = minutes, h = hours, d = days, w = weeks
=item * `command`: command to execute matching files (see `pattern`),
that are created more than `dt` seconds ago and not processed already.
`&` executes command in background.In case of missing `&`, it blocks the process
until its completion.
`$0` refers to name of matching file.
`\` used for command continuation for more than one line
`#` at the start of a line marks that line as comment and ignored
=back
=head2 Examples of rules
=over 4
=item * Delete all `*.log` files older than one day
delete_old_logs: *.log [1d]: rm $0
=item * Move all `*.txt` files older than one hour to other directory
move_old_txt: *.txt [1h]: mv $0 otherfolder/$0
=item * Email me when when a new `*.doc` file is created
email_me_on_new_doc: *.doc: mail -s 'new file: $0' me@example.com < /dev/null
=item * Process new `*.dat` files using a Perl script
process_dat: *.dat: perl process.pl $0
=item * Crate a finite state machine for each `*.src` file
rule1: *.src [1s]: echo > $0.state.1
rule2: *.state.1 [1s]: mv $0 `expr "$0" : '\(.*\).1'`.2
rule3: *.state.2 [1s]: mv $0 `expr "$0" : '\(.*\).2'`.3
rule4: *.state.3 [1s]: rm $0
=back
head1 NOTE
The original `taskflow` has uses the logging and a more sophisticated
persistancy module. These mechanisms are not part of Perl `core`, and so
`taskflow` can not be used out of box or on an older system without
CPAN connection. In other words the I<App::Taskflow> is modular but is
not portable enough as a one file script. So this is for absolute
minimalists between us.
=head1 ACKNOWLEDGEMENTS
Ideas for this script are borrowed from a Python script written by
'Massimo Di Pierro'.
=head1 SEE ALSO
There are similar solutions in CPAN for managing tasks or workflows. But
most of them are heavy and need more sophisticated configuration.
Some of the other modules:
L<Workflow>, L<Helios>
=head1 LICENSE AND COPYRIGHT
Copyright 2012 Farhad Fouladi.
This program is free software; you can redistribute it and/or modify it
under the terms of either: the GNU General Public License as published
by the Free Software Foundation; or the Artistic License.
See http://dev.perl.org/licenses/ for more information.
=cut