has
'_drmaa_jobid'
=> (
is
=>
'rw'
,
init_arg
=>
undef
);
has
'_drmaa_stat'
=> (
is
=>
'rw'
,
init_arg
=>
undef
);
has
'_drmaa_rusage'
=> (
is
=>
'rw'
,
init_arg
=>
undef
);
has
'_qalter_command'
=> (
is
=>
'rw'
,
init_arg
=>
undef
,
predicate
=>
'_has_qalter_command'
,
lazy
=> 1,
default
=>
""
);
my
@_qa
= (
[
'-N'
,
'str1_parse'
,
'set_attr'
,
$DRMAA_JOB_NAME
,
'str_disp'
,
'name'
],
[
'-l'
,
'str1_parse'
,
'set_qalt'
,
undef
,
'res_disp'
,
'resources_required'
],
[
'-o'
,
'str1_parse'
,
'set_netp'
,
$DRMAA_OUTPUT_PATH
,
'str_disp'
,
' '
],
[
'-e'
,
'str1_parse'
,
'set_netp'
,
$DRMAA_ERROR_PATH
,
'str_disp'
,
' '
],
[
'-cwd'
,
'no_parse'
,
'set_attr'
,
$DRMAA_WD
,
'str_disp'
,
' '
],
[
'SCRIPT'
,
'str1_parse'
,
'set_attr'
,
$DRMAA_REMOTE_COMMAND
,
'str_disp'
,
' '
],
[
'-@'
,
'str1_parse'
,
'set_qalt'
,
undef
,
'str_disp'
,
undef
],
[
'-a'
,
'str1_parse'
,
'set_qalt'
,
undef
,
'str_disp'
,
undef
],
[
'-ac'
,
'str1_parse'
,
'set_qalt'
,
undef
,
'str_disp'
,
undef
],
[
'-ar'
,
'str1_parse'
,
'set_qalt'
,
undef
,
'str_disp'
,
undef
],
[
'-A'
,
'str1_parse'
,
'set_qalt'
,
undef
,
'str_disp'
,
undef
],
[
'-binding'
,
'bind_parse'
,
'set_qalt'
,
undef
,
'str_disp'
,
undef
],
[
'-c'
,
'str1_parse'
,
'set_qalt'
,
undef
,
'str_disp'
,
undef
],
[
'-ckpt'
,
'str1_parse'
,
'set_qalt'
,
undef
,
'str_disp'
,
undef
],
[
'-dc'
,
'str1_parse'
,
'set_qalt'
,
undef
,
'str_disp'
,
undef
],
[
'-display'
,
'str1_parse'
,
'set_qalt'
,
undef
,
'str_disp'
,
undef
],
[
'-dl'
,
'str1_parse'
,
'set_qalt'
,
undef
,
'str_disp'
,
undef
],
[
'-h'
,
'no_parse'
,
'set_qalt'
,
undef
,
'str_disp'
,
undef
],
[
'-hold_jid'
,
'str1_parse'
,
'set_qalt'
,
undef
,
'str_disp'
,
undef
],
[
'-hold_jid_ad'
,
'str1_parse'
,
'set_qalt'
,
undef
,
'str_disp'
,
undef
],
[
'-j'
,
'str1_parse'
,
'set_qalt'
,
undef
,
'str_disp'
,
undef
],
[
'-js'
,
'str1_parse'
,
'set_qalt'
,
undef
,
'str_disp'
,
undef
],
[
'-M'
,
'str1_parse'
,
'set_qalt'
,
undef
,
'str_disp'
,
undef
],
[
'-m'
,
'str1_parse'
,
'set_qalt'
,
undef
,
'str_disp'
,
undef
],
[
'-masterq'
,
'str1_parse'
,
'set_qalt'
,
undef
,
'str_disp'
,
undef
],
[
'-notify'
,
'no_parse'
,
'set_qalt'
,
undef
,
'str_disp'
,
undef
],
[
'-now'
,
'str1_parse'
,
'set_qalt'
,
undef
,
'str_disp'
,
undef
],
[
'-ot'
,
'str1_parse'
,
'set_qalt'
,
undef
,
'str_disp'
,
undef
],
[
'-P'
,
'str1_parse'
,
'set_qalt'
,
undef
,
'str_disp'
,
undef
],
[
'-p'
,
'str1_parse'
,
'set_qalt'
,
undef
,
'str_disp'
,
undef
],
[
'-pe'
,
'pe_parse'
,
'set_qalt'
,
undef
,
'str_disp'
,
undef
],
[
'-pty'
,
'str1_parse'
,
'set_qalt'
,
undef
,
'str_disp'
,
undef
],
[
'-q'
,
'str1_parse'
,
'set_queue'
,
undef
,
'str_disp'
,
undef
],
[
'-R'
,
'str1_parse'
,
'set_qalt'
,
undef
,
'str_disp'
,
undef
],
[
'-r'
,
'str1_parse'
,
'set_qalt'
,
undef
,
'str_disp'
,
undef
],
[
'-sc'
,
'str1_parse'
,
'set_qalt'
,
undef
,
'str_disp'
,
undef
],
[
'-v'
,
'str1_parse'
,
'set_qalt'
,
undef
,
'str_disp'
,
undef
],
);
my
%_qaparse
;
my
%_qaset
;
my
%_qasetattr
;
my
%_qadisp
;
my
%_qaattr
;
has
'_extra_queue_arg'
=> (
is
=>
'rw'
,
default
=>
undef
,
init_arg
=>
undef
);
has
'_cpu_multiplier'
=> (
is
=>
'rw'
,
default
=> 1,
init_arg
=>
undef
);
for
my
$l
(
@_qa
) {
my
(
$k
,
$p
,
$s
,
$sa
,
$d
,
$a
) =
@$l
;
$_qaparse
{
$k
} =
$p
;
$_qaset
{
$k
} =
$s
;
$_qasetattr
{
$k
} =
$sa
;
$_qadisp
{
$k
} =
$d
;
$_qaattr
{
$k
} =
$a
if
$a
;
}
has
'_drmaa_job_template'
=> (
is
=>
'ro'
,
lazy
=> 1,
init_arg
=>
undef
,
builder
=>
'_init_drmaa_job_template'
,
);
sub
_init_drmaa_job_template {
my
$self
=
shift
;
my
$jt
=
$self
->Odrmaa_allocate_job_template;
my
$args
=
$self
->_qsub_args;
my
$skipped
=
$self
->_qsub_args_skipped;
my
$stage
=
$self
->stage;
my
$extra
=
$stage
->extra_sge_args_string;
$extra
=~ s/^\s*//;
while
(
$extra
) {
$extra
=~ s/^(-\w+)\s*\b//
or croak
"No known qsub option ($extra)"
;
my
$key
= $1;
my
$p
=
$_qaparse
{
$key
}
or croak
"No known qsub option ($key $extra)"
;
my
$val
=
$self
->
$p
(
$key
, \
$extra
);
if
(
exists
$_qaattr
{
$key
}) {
my
$attr
=
$_qaattr
{
$key
};
if
(
$attr
=~ /^\s*$/) {
$attr
=
''
;
}
else
{
$attr
=
" from stage attr ($attr)"
;
}
$stage
->
warn
(
"ignoring ($key $val) from extra_sge_args_string, it is determined internally$attr"
);
push
@$skipped
,
$key
;
push
@$skipped
,
$val
if
defined
$val
;
}
else
{
$self
->set(
$jt
,
$key
,
$val
);
my
$disp
=
$_qadisp
{
$key
};
$disp
=
$self
->
$disp
(
$val
);
push
@$args
,
$key
;
push
@$args
,
$disp
if
defined
$disp
;
}
}
for
my
$pair
(
( [
'-N'
,
$self
->unique_id ],
[
'-l'
,
$stage
->_use_resources_required ],
[
'-o'
,
$self
->_stdout ],
[
'-e'
,
$self
->_stderr ],
[
'-cwd'
, get_cwd() ],
[
'SCRIPT'
,
$stage
->script_file ]
) ) {
my
(
$key
,
$val
) =
@$pair
;
$self
->set(
$jt
,
$key
,
$val
);
push
@$args
,
$key
unless
$key
eq
'SCRIPT'
;
if
(
defined
$val
) {
my
$disp
=
$_qadisp
{
$key
};
$disp
=
$self
->
$disp
(
$val
);
push
@$args
,
$disp
;
}
}
return
$jt
;
}
sub
str1_parse {
my
$self
=
shift
;
my
$key
=
shift
;
my
$ref_extra
=
shift
;
$$ref_extra
=~ s/^\s*(\S+)\b\s*//
or croak
"no string arg found for extra qsub option ($key)"
;
return
$1;
}
sub
no_parse {
my
$self
=
shift
;
my
$key
=
shift
;
my
$ref_extra
=
shift
;
return
undef
;
}
sub
yn_parse {
my
$self
=
shift
;
my
$key
=
shift
;
my
$ref_extra
=
shift
;
$$ref_extra
=~ s/^\s*(y)(es)?\b\s*// or
$$ref_extra
=~ s/\s*(n)(o)?\b\s*//
or croak
"no yes/no arg found for extra qsub option ($key) in: $$ref_extra"
;
return
$1;
}
my
%bind_prefix
=
map
{ (
$_
=> 1) }
qw(env pe set)
;
sub
bind_parse {
my
$self
=
shift
;
my
$key
=
shift
;
my
$ref_extra
=
shift
;
my
$res
=
$self
->str1_parse(
$key
,
$ref_extra
);
return
$bind_prefix
{
$res
}
?
"$res "
.
$self
->str1_parse(
"$key $res"
,
$ref_extra
)
:
$res
;
}
sub
pe_parse {
my
$self
=
shift
;
my
$key
=
shift
;
my
$ref_extra
=
shift
;
my
$pe
=
$self
->str1_parse(
$key
,
$ref_extra
);
my
$range
=
$self
->str1_parse(
"$key $pe"
,
$ref_extra
);
$range
=~ /(^(\d+)-?\d*$)|(^-\d*$)/
or croak
"invalid range arg ($range) found for extra qsub option ($key name($pe))"
;
$self
->_cpu_multiplier( $2 )
if
defined
$2;
return
"$pe $range"
;
}
sub
str_disp {
my
(
$self
,
$val
) =
@_
;
return
$val
;
}
sub
ml_disp {
my
(
$self
,
$val
) =
@_
;
return
$val
?
'n'
:
'e'
;
}
sub
res_disp {
my
$self
=
shift
;
my
$val
=
$self
->_get_mapped_resource_string(
@_
);
my
$extra_queue_arg
=
$self
->_extra_queue_arg;
return
(
$val
&&
$extra_queue_arg
)
?
"$val,$extra_queue_arg"
: (
$extra_queue_arg
||
$val
);
}
sub
vec_disp {
my
(
$self
,
$val
) =
@_
;
return
join
','
,
@$val
;
}
sub
set {
my
(
$self
,
$jt
,
$key
,
$val
) =
@_
;
my
$s
=
$_qaset
{
$key
};
$self
->
$s
(
$jt
,
$key
,
$val
);
}
sub
set_attr {
my
(
$self
,
$jt
,
$key
,
$val
) =
@_
;
$self
->Odrmaa_set_attribute(
$jt
,
$_qasetattr
{
$key
},
$val
);
}
sub
set_netp {
my
(
$self
,
$jt
,
$key
,
$val
) =
@_
;
$self
->Odrmaa_set_attribute(
$jt
,
$_qasetattr
{
$key
},
":$val"
);
}
sub
set_vec {
my
(
$self
,
$jt
,
$key
,
$val
) =
@_
;
$self
->Odrmaa_set_vector_attribute(
$jt
,
$_qasetattr
{
$key
},
$val
);
}
sub
set_queue {
my
(
$self
,
$jt
,
$key
,
$val
) =
@_
;
$self
->_extra_queue_arg(
"qname=$val"
);
set_qalt(
@_
);
}
sub
set_qalt {
my
(
$self
,
$jt
,
$key
,
$val
) =
@_
;
$self
->append_qalter(
$key
);
if
(
defined
$val
) {
my
$disp
=
$_qadisp
{
$key
};
$self
->append_qalter(
$self
->
$disp
(
$val
) );
}
}
sub
append_qalter {
my
(
$self
,
$val
) =
@_
;
my
$qa
=
$self
->_qalter_command;
my
$newval
=
join
(
' '
,
(
$qa
? (
$qa
) : ()),
$val
);
$self
->_qalter_command(
$newval
);
}
sub
get_cwd {
my
$cwd
= `/bin/pwd`;
chomp
$cwd
;
return
$cwd
;
}
has
'_qsub_args'
=> (
is
=>
'rw'
,
lazy
=> 1,
init_arg
=>
undef
,
default
=>
sub
{ [ ] }
);
has
'_qsub_args_skipped'
=> (
is
=>
'rw'
,
lazy
=> 1,
init_arg
=>
undef
,
default
=>
sub
{ [ ] }
);
my
$unchanged
=
sub
{
return
$_
[0] };
my
$to_localtime
=
sub
{
return
DateTime->from_epoch(
epoch
=>
$_
[0] )->datetime };
my
$to_int
=
sub
{
my
$v
=
shift
;
$v
=~ s/\.0+$//;
return
$v
};
my
$to_KMGT
=
sub
{
my
$v
=
$to_int
->(
shift
);
my
$units
=
''
;
my
@conv
= ( [ 1024**4,
'T'
], [ 1024**3,
'G'
], [ 1024**2,
'M'
], [ 1024**1,
'K'
] );
for
my
$pair
(
@conv
) {
my
(
$div
,
$name
) =
@$pair
;
if
(
$v
>=
$div
) {
$v
=
sprintf
"%7.3f"
,
$v
/
$div
;
$v
=~ s/ //g;
$units
=
$name
;
last
;
}
}
return
"$v$units"
;
};
my
%fixer
= (
maxvmem
=>
$to_KMGT
,
acct_maxvmem
=>
$to_KMGT
,
submission_time
=>
$to_localtime
,
start_time
=>
$to_localtime
,
end_time
=>
$to_localtime
);
around
'_register_status'
=>
sub
{
my
$orig
=
shift
;
my
$self
=
shift
;
my
$status
=
shift
;
my
$stage
=
$self
->stage;
if
(
my
$abort
=
$self
->Odrmaa_wifaborted(
$status
)) {
$stage
->info(
"Abort code ($abort) returned for drmaa status ($status)"
);
$self
->abort(
$status
);
}
elsif
(
my
$signal
=
$self
->Odrmaa_wifsignaled(
$status
)) {
$self
->
kill
(
$signal
);
$self
->killsignal(
$self
->Odrmaa_wtermsig(
$status
) ||
'not provided by drmaa'
);
}
else
{
my
$estat
=
$self
->Odrmaa_wexitstatus(
$status
);
$stage
->info(
"Exit status ($estat) returned for drmaa status ($status)"
);
$self
->status(
$self
->Odrmaa_wexitstatus(
$status
) );
}
};
around
'_collect_qacct_info'
=>
sub
{
my
$orig
=
shift
;
my
$self
=
shift
;
my
$stage
=
$self
->stage;
my
%info
;
my
$exit_status
;
while
(
my
(
$continue
,
$value
) =
$self
->Odrmaa_get_next_attr_value(
$self
->_drmaa_rusage)) {
last
unless
$continue
;
if
(
my
(
$k
,
$v
) =
$value
=~ /(\w+)=(.*)/) {
$info
{
$k
} = (
$fixer
{
$k
} //
$to_int
)->(
$v
);
$exit_status
=
$info
{
$k
}
if
$k
eq
'exit_status'
;
}
}
$orig
->(
$self
)
unless
defined
$exit_status
&&
$exit_status
ne
'unknown'
;
return
\
%info
;
};
around
'_delete_job'
=>
sub
{
my
$orig
=
shift
;
my
$self
=
shift
;
$self
->Odrmaa_control(
$self
->_drmaa_jobid,
$DRMAA_CONTROL_TERMINATE
);
};
around
'hard_timeout'
=>
sub
{
my
$orig
=
shift
;
my
$self
=
shift
;
$self
->_delete_job;
};
around
'_get_submit_command'
=>
sub
{
my
$orig
=
shift
;
my
$self
=
shift
;
my
$stage
=
$self
->stage;
my
$shell_script
=
$stage
->script_file;
my
$output_file
=
$self
->_stdout;
my
$error_file
=
$self
->_stderr;
my
$jt
=
$self
->_drmaa_job_template;
my
@skipped_args
= @{
$self
->_qsub_args_skipped };
if
(
@skipped_args
) {
unshift
@skipped_args
,
'(unsupported qsub args:'
;
push
@skipped_args
,
')'
;
}
my
$qsub_equivalent_command
=
join
(
' '
,
'internal equivalent to:'
,
'qsub'
, @{
$self
->_qsub_args },
@skipped_args
);
return
$qsub_equivalent_command
;
};
around
'_submit_command'
=>
sub
{
my
$orig
=
shift
;
my
$self
=
shift
;
my
$jt
=
$self
->_drmaa_job_template;
if
(
$self
->_has_qalter_command) {
my
$qac
=
$self
->_qalter_command;
$self
->Odrmaa_set_attribute(
$jt
,
$DRMAA_NATIVE_SPECIFICATION
,
$qac
);
}
my
$jobid
=
$self
->Odrmaa_run_job(
$jt
);
$self
->_drmaa_jobid(
$jobid
);
$self
->stage->info(
"Job submitted with job id: $jobid"
);
return
$jobid
;
};
1;