$ENV
{TERM} =
'xterm-256color'
;
our
$VERSION
=
'0.06'
;
has
'+configfile'
=> (
required
=> 0,
documentation
=>
q{If you get tired of putting all your options on the command line create a config file instead.
---
infile: "/path/to/commands/testcommand.in"
outdir: "path/to/testdir"
module:
- "R2"
- "shared"
}
);
has
'module'
=> (
is
=>
'rw'
,
isa
=>
'ArrayRef'
,
required
=> 0,
documentation
=>
q{List of modules to load ex. R2, samtools, etc}
,
default
=>
sub
{ [] },
);
has
afterok
=> (
is
=>
'rw'
,
isa
=>
'ArrayRef'
,
required
=> 0,
default
=>
sub
{
return
[];
},
);
has
'cpus_per_task'
=> (
is
=>
'rw'
,
isa
=>
'Str'
,
required
=> 0,
default
=> 4,
predicate
=>
'has_cpus_per_task'
,
clearer
=>
'clear_cpus_per_task'
);
has
'commands_per_node'
=> (
is
=>
'rw'
,
isa
=>
'Str'
,
required
=> 0,
default
=> 8,
documentation
=>
q{Commands to run on each node. This is not the same as concurrent_commands_per_node!}
,
predicate
=>
'has_commands_per_node'
,
clearer
=>
'clear_commands_per_node'
);
has
'nodes_count'
=> (
is
=>
'rw'
,
isa
=>
'Str'
,
required
=> 0,
default
=> 1,
documentation
=>
q{Number of nodes requested. You should only use this if submitting parallel jobs.}
,
predicate
=>
'has_nodes_count'
,
clearer
=>
'clear_nodes_count'
);
has
'partition'
=> (
is
=>
'rw'
,
isa
=>
'Str'
,
required
=> 0,
default
=>
''
,
documentation
=>
q{Slurm partition to submit jobs to. Defaults to the partition with the most available nodes}
,
predicate
=>
'has_partition'
,
clearer
=>
'clear_partition'
);
has
'nodelist'
=> (
is
=>
'rw'
,
isa
=>
'ArrayRef'
,
required
=> 0,
default
=>
sub
{
return
[] },
documentation
=>
q{List of nodes to submit jobs to. Defaults to the partition with the most nodes.}
,
);
has
'submit_to_slurm'
=> (
is
=>
'rw'
,
isa
=>
'Bool'
,
default
=> 1,
required
=> 1,
documentation
=>
q{Bool value whether or not to submit to slurm. If you are looking to debug your files, or this script you will want to set this to zero.}
,
);
has
'first_pass'
=> (
traits
=> [
'NoGetopt'
],
is
=>
'rw'
,
isa
=>
'Bool'
,
default
=> 1,
required
=> 1,
);
has
'template_file'
=> (
is
=>
'rw'
,
isa
=>
'Str'
,
default
=>
sub
{
my
$self
=
shift
;
my
(
$fh
,
$filename
) = tempfile();
my
$tt
=
<<EOF;
#!/bin/bash
#
#SBATCH --share
#SBATCH --get-user-env
#SBATCH --job-name=[% JOBNAME %]
#SBATCH --output=[% OUT %]
[% IF PARTITION %]
#SBATCH --partition=[% PARTITION %]
[% END %]
[% IF NODE %]
#SBATCH --nodelist=[% NODE %]
[% END %]
[% IF CPU %]
#SBATCH --cpus-per-task=[% CPU %]
[% END %]
[% IF AFTEROK %]
#SBATCH --dependency=afterok:[% AFTEROK %]
[% END %]
[% IF MODULE %]
[% FOR d = MODULE %]
module load [% d %]
[% END %]
[% END %]
[% COMMAND %]
EOF
print
$fh
$tt
;
return
$filename
;
},
predicate
=>
'has_template_file'
,
clearer
=>
'clear_template_file'
,
documentation
=>
q{Path to Slurm template file if you do not wish to use the default}
);
has
serial
=> (
is
=>
'rw'
,
isa
=>
'Bool'
,
default
=> 0,
documentation
=>
q{Use this if you wish to run each job run one after another, with each job starting only after the previous has completed successfully}
,
predicate
=>
'has_serial'
,
clearer
=>
'clear_serial'
);
has
'user'
=> (
is
=>
'rw'
,
isa
=>
'Str'
,
default
=>
sub
{
return
$ENV
{LOGNAME} ||
$ENV
{USER} ||
getpwuid
($<); },
required
=> 1,
documentation
=>
q{This defaults to your current user ID. This can only be changed if running as an admin user}
);
has
'use_threads'
=> (
is
=>
'rw'
,
isa
=>
'Bool'
,
default
=> 0,
required
=> 0,
documentation
=>
q{Use threads to run jobs}
,
);
has
'use_processes'
=> (
is
=>
'rw'
,
isa
=>
'Bool'
,
default
=> 1,
required
=> 0,
documentation
=>
q{Use processes to run jobs}
,
);
has
'use_gnuparallel'
=> (
is
=>
'rw'
,
isa
=>
'Bool'
,
default
=> 0,
required
=> 0,
documentation
=>
q{Use gnu-parallel to run jobs and manage threads. This is the best option if you do not know how many threads your application uses!}
);
has
'custom_command'
=> (
is
=>
'rw'
,
isa
=>
'Str'
,
predicate
=>
'has_custom_command'
,
clearer
=>
'clear_custom_command'
,
);
has
'template'
=> (
traits
=> [
'NoGetopt'
],
is
=>
'rw'
,
required
=> 0,
default
=>
sub
{
return
Template->new(
ABSOLUTE
=> 1 ) },
);
has
'cmd_counter'
=> (
traits
=> [
'Counter'
,
'NoGetopt'
],
is
=>
'ro'
,
isa
=>
'Num'
,
required
=> 1,
default
=> 0,
handles
=> {
inc_cmd_counter
=>
'inc'
,
dec_cmd_counter
=>
'dec'
,
reset_cmd_counter
=>
'reset'
,
},
);
has
'node_counter'
=> (
traits
=> [
'Counter'
,
'NoGetopt'
],
is
=>
'ro'
,
isa
=>
'Num'
,
required
=> 1,
default
=> 0,
handles
=> {
inc_node_counter
=>
'inc'
,
dec_node_counter
=>
'dec'
,
reset_node_counter
=>
'reset'
,
},
);
has
'batch_counter'
=> (
traits
=> [
'Counter'
,
'NoGetopt'
],
is
=>
'ro'
,
isa
=>
'Num'
,
required
=> 1,
default
=> 1,
handles
=> {
inc_batch_counter
=>
'inc'
,
dec_batch_counter
=>
'dec'
,
reset_batch_counter
=>
'reset'
,
},
);
has
'node'
=> (
traits
=> [
'NoGetopt'
],
is
=>
'rw'
,
isa
=>
'Str|Undef'
,
lazy
=> 1,
default
=>
sub
{
my
$self
=
shift
;
return
$self
->nodelist()->[0]
if
$self
->nodelist;
return
""
;
}
);
has
'batch'
=> (
traits
=> [
'String'
,
'NoGetopt'
, ],
is
=>
'rw'
,
isa
=>
'Str'
,
default
=>
q{}
,
required
=> 0,
handles
=> {
add_batch
=>
'append'
, },
clearer
=>
'clear_batch'
,
predicate
=>
'has_batch'
,
);
has
'cmdfile'
=> (
traits
=> [
'String'
,
'NoGetopt'
],
default
=>
q{}
,
is
=>
'rw'
,
isa
=>
'Str'
,
required
=> 0,
handles
=> {
clear_cmdfile
=>
'clear'
, },
);
has
'slurmfile'
=> (
traits
=> [
'String'
,
'NoGetopt'
],
default
=>
q{}
,
is
=>
'rw'
,
isa
=>
'Str'
,
required
=> 0,
handles
=> {
clear_slurmfile
=>
'clear'
, },
);
has
'slurm_decides'
=> (
is
=>
'rw'
,
isa
=>
'Bool'
,
default
=> 0,
);
has
'job_stats'
=> (
is
=>
'rw'
,
isa
=>
'HashRef'
,
default
=>
sub
{
my
$self
=
shift
;
my
$href
= {};
$href
->{total_processes} = 0;
$href
->{jobnames} = {};
$href
->{total_batches} = 0;
$href
->{batches} = {};
}
);
sub
run {
my
$self
=
shift
;
if
(
$self
->serial ) {
$self
->procs(1);
}
$self
->check_files;
$self
->first_pass(1);
$self
->parse_file_slurm;
$self
->do_stats;
$DB::single
= 2;
$self
->first_pass(0);
$self
->parse_file_slurm;
}
sub
do_stats {
my
$self
=
shift
;
my
@jobs
=
keys
%{
$self
->job_stats->{jobnames}};
foreach
my
$batch
(
keys
%{
$self
->job_stats->{batches}}){
my
$href
=
$self
->job_stats->{batches}->{
$batch
};
my
$jobname
=
$href
->{jobname};
my
@job_batches
= @{
$self
->job_stats->{jobnames}->{
$jobname
} };
my
$index
= firstidx {
$_
eq
$batch
}
@job_batches
;
$index
+= 1;
my
$lenjobs
=
$#job_batches
+ 1;
$self
->job_stats->{batches}->{
$batch
}->{job_batches} =
$index
.
"/"
.
$lenjobs
;
$href
->{total_processes} =
$self
->job_stats->{total_processes};
$href
->{total_batches} =
$self
->job_stats->{total_batches};
$href
->{batch_count} =
$href
->{batch}.
"/"
.
$self
->job_stats->{total_batches};
}
}
sub
check_files {
my
(
$self
) =
@_
;
my
(
$t
);
$t
=
$self
->outdir;
$t
=~ s/\/$//g;
$self
->outdir(
$t
);
make_path(
$self
->outdir )
if
!-d
$self
->outdir;
$self
->get_nodes;
}
sub
parse_file_slurm {
my
$self
=
shift
;
my
$fh
= IO::File->new(
$self
->infile,
q{<}
)
or
print
"Error opening file "
.
$self
->infile .
" "
. $!;
$self
->reset_cmd_counter;
$self
->reset_node_counter;
$self
->reset_batch_counter;
$self
->jobref( [] );
if
(
$self
->afterok ) {
$self
->
wait
(1);
$self
->jobref->[0] =
$self
->afterok;
push
( @{
$self
->jobref }, [] );
}
while
(<
$fh
>) {
my
$line
=
$_
;
next
unless
$line
;
next
unless
$line
=~ m/\S/;
$self
->process_lines(
$line
);
}
$self
->work
if
$self
->has_batch;
push
( @{
$self
->jobref }, [] )
if
$self
->serial;
close
(
$fh
);
}
sub
process_lines {
my
$self
=
shift
;
my
$line
=
shift
;
if
(
$line
=~ m/^nohup/ ) {
die
print
"You cannot submit jobs to the queue using nohup! Please remove nohup and try again.\n"
;
}
if
(
$self
->cmd_counter > 0
&& 0 ==
$self
->cmd_counter % (
$self
->commands_per_node )
&&
$self
->batch )
{
$self
->work;
push
( @{
$self
->jobref }, [] )
if
$self
->serial;
}
$self
->check_meta(
$line
);
return
if
$line
=~ m/^
if
(
$self
->has_cmd ) {
$self
->add_cmd(
$line
);
$self
->add_batch(
$line
);
if
(
$line
=~ m/\\$/ ) {
return
;
}
else
{
$self
->add_cmd(
"\n"
);
$self
->add_batch(
"\n"
);
$self
->clear_cmd;
$self
->inc_cmd_counter;
}
}
else
{
$self
->add_cmd(
$line
);
if
(
$line
=~ m/\\$/ ) {
$self
->add_batch(
$line
);
return
;
}
elsif
(
$self
->match_cmd(
qr/^wait$/
) ) {
$self
->clear_cmd;
$self
->
wait
(1);
$self
->work
if
$self
->has_batch;
push
( @{
$self
->jobref }, [] );
}
elsif
(
$self
->match_cmd(
qr/^newnode$/
) ) {
$self
->clear_cmd;
$self
->work
if
$self
->has_batch;
push
( @{
$self
->jobref }, [] )
if
$self
->serial;
}
else
{
$self
->inc_cmd_counter;
}
$self
->add_batch(
$line
.
"\n"
)
if
$self
->has_cmd;
$self
->clear_cmd;
}
}
sub
check_meta {
my
$self
=
shift
;
my
$line
=
shift
;
my
(
@match
,
$t1
,
$t2
);
return
unless
$line
=~ m/^
@match
=
$line
=~ m/HPC (\w+)=(.+)$/;
(
$t1
,
$t2
) = (
$match
[0],
$match
[1] );
if
( !
$self
->can(
$t1
) ) {
print
"Option $t1 is an invalid option!\n"
;
return
;
}
if
(
$t1
) {
if
(
$t1
eq
"module"
) {
$self
->
$t1
( [
$t2
] );
}
else
{
$self
->
$t1
(
$t2
);
}
}
else
{
@match
=
$line
=~ m/HPC (\w+)$/;
$t1
=
$match
[0];
return
unless
$t1
;
$t1
=
"clear_$t1"
;
$self
->
$t1
;
}
}
sub
work {
my
$self
=
shift
;
$DB::single
=2;
$self
->collect_stats
if
$self
->first_pass;
if
(
$self
->node_counter > (
scalar
@{
$self
->nodelist } ) ) {
$self
->reset_node_counter;
}
$self
->node(
$self
->nodelist()->[
$self
->node_counter ] )
if
$self
->nodelist;
$self
->process_batch
unless
$self
->first_pass;
$self
->inc_batch_counter;
$self
->clear_batch;
$self
->inc_node_counter;
$self
->reset_cmd_counter;
}
sub
collect_stats {
my
$self
=
shift
;
return
unless
$self
->first_pass;
my
$counter
=
$self
->batch_counter;
$counter
=
sprintf
(
"%03d"
,
$counter
);
my
$href
=
$self
->job_stats;
$href
->{total_processes} +=
$self
->cmd_counter;
my
$command_count
= (
$href
->{total_processes} -
$self
->cmd_counter) + 1;
$href
->{batches}->{
$counter
.
"_"
.
$self
->jobname } = {
commands
=>
$self
->cmd_counter,
jobname
=>
$self
->jobname,
batch
=>
$self
->batch_counter,
command_count
=>
$command_count
.
"-"
.
$href
->{total_processes},
};
my
$jobhref
= {};
$jobhref
->{
$self
->jobname } = [];
if
(
exists
$href
->{jobnames}->{
$self
->jobname } ) {
my
$tarray
=
$href
->{jobnames}->{
$self
->jobname };
push
( @{
$tarray
},
$counter
.
"_"
.
$self
->jobname );
}
else
{
$href
->{jobnames}->{
$self
->jobname }
= [
$counter
.
"_"
.
$self
->jobname ];
}
$href
->{total_batches} += 1;
$self
->job_stats(
$href
);
}
sub
process_batch {
my
$self
=
shift
;
my
(
$cmdfile
,
$slurmfile
,
$slurmsubmit
,
$fh
,
$command
);
my
$counter
=
$self
->batch_counter;
$counter
=
sprintf
(
"%03d"
,
$counter
);
$self
->cmdfile(
$self
->outdir .
"/$counter"
.
"_"
.
$self
->jobname .
".in"
);
$self
->slurmfile(
$self
->outdir .
"/$counter"
.
"_"
.
$self
->jobname .
".sh"
);
$fh
= IO::File->new(
$self
->cmdfile,
q{>}
)
or
print
"Error opening file "
.
$self
->cmdfile .
" "
. $!;
print
$fh
$self
->batch
if
defined
$fh
&&
defined
$self
->batch;
$fh
->
close
;
my
$ok
;
if
(
$self
->
wait
) {
$ok
=
join
(
":"
, @{
$self
->jobref->[-2] } )
if
$self
->jobref->[-2];
}
$command
=
$self
->process_batch_command();
$DB::single
= 2;
$self
->template->process(
$self
->template_file,
{
JOBNAME
=>
$counter
.
"_"
.
$self
->jobname,
USER
=>
$self
->user,
NODE
=>
$self
->node,
CPU
=>
$self
->cpus_per_task,
PARTITION
=>
$self
->partition,
AFTEROK
=>
$ok
,
OUT
=>
$self
->logdir
.
"/$counter"
.
"_"
.
$self
->jobname .
".log"
,
MODULE
=>
$self
->module,
self
=>
$self
,
COMMAND
=>
$command
},
$self
->slurmfile
) ||
die
$self
->template->error;
chmod
0777,
$self
->slurmfile;
$self
->submit_slurm
if
$self
->submit_to_slurm;
}
sub
process_batch_command {
my
(
$self
) =
@_
;
my
$command
;
my
$counter
=
$self
->batch_counter;
$counter
=
sprintf
(
"%03d"
,
$counter
);
if
(
$self
->has_custom_command ) {
$command
=
"cd "
. getcwd() .
"\n"
;
$command
.=
$self
->custom_command
.
" --procs "
.
$self
->procs
.
" --infile "
.
$self
->cmdfile
.
" --outdir "
.
$self
->outdir
.
" --logname $counter"
.
"_"
.
$self
->jobname;
}
elsif
(
$self
->use_gnuparallel ) {
$command
=
"cd "
. getcwd() .
"\n"
;
$command
.=
"cat "
.
$self
->cmdfile
.
" | parallelparser.pl | parallel --joblog "
.
$self
->outdir
.
"/main.log --gnu -N 1 -q gnuparallelrunner.pl --command `echo {}` --outdir "
.
$self
->outdir
.
" --logname $counter"
.
"_"
.
$self
->jobname
.
" --seq {#}"
.
"\n"
;
}
elsif
(
$self
->use_threads ) {
$command
=
"cd "
. getcwd()
.
"\n"
;
$command
.=
"paralellrunner.pl --procs "
.
$self
->procs
.
" --infile "
.
$self
->cmdfile
.
" --outdir "
.
$self
->outdir
.
" --logname $counter"
.
"_"
.
$self
->jobname;
}
elsif
(
$self
->use_processes ) {
$command
=
"cd "
. getcwd() .
"\n"
;
$command
.=
"mcerunner.pl --procs "
.
$self
->procs
.
" --infile "
.
$self
->cmdfile
.
" --outdir "
.
$self
->outdir
.
" --logname $counter"
.
"_"
.
$self
->jobname;
}
else
{
die
print
"None of the job processes were chosen!\n"
;
}
my
$metastr
=
$self
->create_meta_str;
$command
.=
$metastr
if
$metastr
;
my
$pluginstr
=
$self
->create_plugin_str;
$command
.=
$pluginstr
if
$pluginstr
;
return
$command
;
}
sub
create_meta_str {
my
$self
=
shift
;
my
$counter
=
$self
->batch_counter;
$counter
=
sprintf
(
"%03d"
,
$counter
);
my
$batchname
=
$counter
.
"_"
.
$self
->jobname;
my
$batch
=
$self
->job_stats->{batches}->{
$batchname
};
my
$json
= JSON->new->allow_nonref;
my
$json_text
=
$json
->encode(
$batch
);
$DB::single
=2;
$json_text
=
" --metastr \'$json_text\'"
;
return
$json_text
;
}
sub
create_plugin_str {
my
$self
=
shift
;
return
unless
$self
->plugins;
my
$plugins
=
$self
->plugins;
my
$pluginstr
=
""
;
if
(
$plugins
) {
if
(
ref
(
$plugins
) ) {
my
@plugins
= @{
$plugins
};
foreach
my
$plugin
(
@plugins
) {
$pluginstr
.=
" --plugins $plugin"
;
}
}
else
{
$pluginstr
=
" --plugins $plugins"
;
}
}
return
$pluginstr
;
}
__PACKAGE__->meta->make_immutable;
1;