use
5.008000;
our
$VERSION
=
'2.30_5272'
;
our
$D_OD_RETRIES
= 3;
our
$D_OD_RETRY_INTERVAL
= 5;
sub
setConfig {
$_
[0]->{config} =
$_
[1]; }
sub
getConfig {
return
$_
[0]->{config}; }
sub
setArgs {
$_
[0]->{args} =
$_
[1]; }
sub
getArgs {
return
$_
[0]->{args}; }
sub
setJobid {
$_
[0]->{args} =
$_
[1]; }
sub
getJobid {
return
$_
[0]->job()->jobid; }
sub
setFuncid {
return
$_
[0]->job()->funcid(
$_
[1]); }
sub
getFuncid {
return
$_
[0]->job()->funcid; }
sub
setFailures {
return
$_
[0]->job()->failures(
$_
[1]); }
sub
getFailures {
return
$_
[0]->job()->failures; }
sub
setFuncname {
return
$_
[0]->job()->funcname(
$_
[1]); }
sub
getFuncname {
return
$_
[0]->job()->funcname; }
sub
setUniqkey {
return
$_
[0]->job()->uniqkey(
$_
[1]); }
sub
getUniqkey {
return
$_
[0]->job()->uniqkey; }
sub
setRunAfter {
return
$_
[0]->job()->run_after(
$_
[1]); }
sub
getRunAfter {
return
$_
[0]->job()->run_after; }
sub
setGrabbedUntil {
return
$_
[0]->job()->grabbed_until(
$_
[1]); }
sub
getGrabbedUntil {
return
$_
[0]->job()->grabbed_until; }
sub
setCoalesce {
return
$_
[0]->job()->coalesce(
$_
[1]); }
sub
getCoalesce {
return
$_
[0]->job()->coalesce; }
sub
job {
my
$self
=
shift
;
@_
?
$self
->{job} =
shift
:
$self
->{job}; }
sub
setArgXML {
$_
[0]->{argxml} =
$_
[1]; }
sub
getArgXML {
return
$_
[0]->{argxml}; }
sub
debug {
my
$self
=
shift
;
@_
?
$self
->{debug} =
shift
:
$self
->{debug}; }
sub
new {
my
$caller
=
shift
;
my
$class
=
ref
(
$caller
) ||
$caller
;
my
$self
= {};
bless
$self
,
$class
;
if
(
defined
(
$_
[0]) &&
ref
(
$_
[0]) &&
$_
[0]->isa(
'TheSchwartz::Job'
) ) {
$self
->job(
$_
[0]);
$self
->setArgXML(
$_
[0]->arg()->[0] );
}
else
{
my
$schwartz_job
= TheSchwartz::Job->new(
@_
);
$self
->job(
$schwartz_job
);
}
return
$self
;
}
sub
parseArgXML {
my
$self
=
shift
;
my
$xml
=
shift
;
my
$xs
= XML::Simple->new(
SuppressEmpty
=>
undef
,
KeepRoot
=> 1,
ForceArray
=> [
'job'
]);
my
$args
;
try
{
$args
=
$xs
->XMLin(
$xml
);
} otherwise {
throw Helios::Error::InvalidArg($!);
};
return
$args
;
}
sub
parseArgs {
my
$self
=
shift
;
my
$job
=
$self
->job();
my
$args
;
my
$parsedxml
=
$self
->parseArgXML(
$job
->arg()->[0]);
if
(
defined
(
$parsedxml
->{metajob}) ) {
$args
=
$parsedxml
->{metajob};
$args
->{metajob} = 1;
}
elsif
(
defined
(
$parsedxml
->{job}) ) {
$args
=
$parsedxml
->{job}->[0]->{params};
}
else
{
$args
=
$parsedxml
->{params};
}
$self
->setArgs(
$args
);
return
$args
;
}
sub
isaMetaJob {
my
$self
=
shift
;
my
$args
=
$self
->getArgs() ?
$self
->getArgs() :
$self
->parseArgs();
if
(
defined
(
$args
->{metajob}) &&
$args
->{metajob} == 1) {
return
1; }
return
0;
}
sub
completed {
my
$self
=
shift
;
my
$job
=
$self
->job();
my
$retries
= 0;
RETRY: {
try
{
my
$driver
=
$self
->getDriver();
my
$jobhistory
= Helios::JobHistory->new(
jobid
=>
$job
->jobid,
funcid
=>
$job
->funcid,
arg
=>
$job
->arg()->[0],
uniqkey
=>
$job
->uniqkey,
insert_time
=>
$job
->insert_time,
run_after
=>
$job
->run_after,
grabbed_until
=>
$job
->grabbed_until,
priority
=>
$job
->priority,
coalesce
=>
$job
->coalesce,
complete_time
=>
time
(),
exitstatus
=> 0
);
$driver
->insert(
$jobhistory
);
} otherwise {
my
$e
=
shift
;
if
(
$retries
>
$D_OD_RETRIES
) {
throw Helios::Error::DatabaseError(
$e
->text);
}
else
{
$retries
++;
sleep
$D_OD_RETRY_INTERVAL
;
next
RETRY;
}
};
}
$job
->completed();
return
0;
}
sub
failed {
my
$self
=
shift
;
my
$error
=
shift
;
my
$exitstatus
=
shift
;
my
$job
=
$self
->job();
if
( !
defined
(
$exitstatus
) ||
$exitstatus
== 0 ) {
$exitstatus
= 1;
}
my
$retries
= 0;
my
$retry_limit
= 3;
RETRY: {
try
{
my
$driver
=
$self
->getDriver();
my
$jobhistory
= Helios::JobHistory->new(
jobid
=>
$job
->jobid,
funcid
=>
$job
->funcid,
arg
=>
$job
->arg()->[0],
uniqkey
=>
$job
->uniqkey,
insert_time
=>
$job
->insert_time,
run_after
=>
$job
->run_after,
grabbed_until
=>
$job
->grabbed_until,
priority
=>
$job
->priority,
coalesce
=>
$job
->coalesce,
complete_time
=>
time
(),
exitstatus
=>
$exitstatus
);
$driver
->insert(
$jobhistory
);
} otherwise {
my
$e
=
shift
;
if
(
$retries
>
$retry_limit
) {
$job
->failed(
$error
,
$exitstatus
);
throw Helios::Error::DatabaseError(
$e
->text);
}
else
{
$retries
++;
sleep
10;
next
RETRY;
}
};
}
$job
->failed(
substr
(
$error
,0,254),
$exitstatus
);
return
$exitstatus
;
}
sub
failedNoRetry {
my
$self
=
shift
;
my
$error
=
shift
;
my
$exitstatus
=
shift
;
my
$job
=
$self
->job();
if
( !
defined
(
$exitstatus
) ||
$exitstatus
== 0 ) {
$exitstatus
= 1;
}
my
$retries
= 0;
my
$retry_limit
= 3;
RETRY: {
try
{
my
$driver
=
$self
->getDriver();
my
$jobhistory
= Helios::JobHistory->new(
jobid
=>
$job
->jobid,
funcid
=>
$job
->funcid,
arg
=>
$job
->arg()->[0],
uniqkey
=>
$job
->uniqkey,
insert_time
=>
$job
->insert_time,
run_after
=>
$job
->run_after,
grabbed_until
=>
$job
->grabbed_until,
priority
=>
$job
->priority,
coalesce
=>
$job
->coalesce,
complete_time
=>
time
(),
exitstatus
=>
$exitstatus
);
$driver
->insert(
$jobhistory
);
} otherwise {
my
$e
=
shift
;
if
(
$retries
>
$retry_limit
) {
$job
->permanent_failure(
$error
,
$exitstatus
);
throw Helios::Error::DatabaseError(
$e
->text);
}
else
{
$retries
++;
sleep
10;
next
RETRY;
}
};
}
$job
->permanent_failure(
substr
(
$error
,0,254),
$exitstatus
);
return
$exitstatus
;
}
sub
submit {
my
$self
=
shift
;
my
$config
=
$self
->getConfig();
my
$params
=
$self
->getArgXML();
my
$job_class
=
$self
->getFuncname;
my
$databases
= [
{
dsn
=>
$config
->{dsn},
user
=>
$config
->{user},
pass
=>
$config
->{password}
}
];
my
$args
= [
$params
];
my
$client
= TheSchwartz->new(
databases
=>
$databases
,
verbose
=> 1 );
my
$sjh
=
$client
->insert(
$job_class
,
$args
);
$self
->setJobid(
$sjh
->jobid);
return
$sjh
->jobid;
}
sub
burst {
my
$self
=
shift
;
my
$job
=
$self
->job();
my
$args
=
$self
->getArgs();
my
$xs
= XML::Simple->new(
SuppressEmpty
=>
undef
,
ForceArray
=> [
'job'
]);
my
@newjobs
;
my
$classname
;
if
(
defined
(
$args
->{class}) ) {
$classname
=
$args
->{class};
}
else
{
$classname
=
$job
->funcname;
}
try
{
foreach
my
$job_arg
(@{
$args
->{jobs}->{job}}) {
my
$newxml
=
$xs
->XMLout(
$job_arg
,
NoAttr
=> 1,
NoIndent
=> 1,
RootName
=>
undef
);
my
$newjob
= TheSchwartz::Job->new(
funcname
=>
$classname
,
arg
=> [
$newxml
]
);
push
(
@newjobs
,
$newjob
);
}
$job
->replace_with(
@newjobs
);
} otherwise {
my
$e
=
shift
;
$self
->failed(
$e
->text);
throw Helios::Error::Fatal(
$e
->text);
};
$self
->completed;
if
(
$self
->debug) {
foreach
(
@newjobs
) {
print
"JOBID: "
,
$_
->jobid,
"\n"
;
}
}
return
scalar
(
@newjobs
);
}
sub
getDriver {
my
$self
=
shift
;
my
$config
=
$self
->getConfig();
if
(
$self
->debug) {
print
$config
->{dsn},
$config
->{user},
$config
->{password},
"\n"
; }
my
$driver
= Data::ObjectDriver::Driver::DBI->new(
dsn
=>
$config
->{dsn},
username
=>
$config
->{user},
password
=>
$config
->{password}
);
if
(
$self
->debug) {
print
"DRIVER: "
,
$driver
,
"\n"
; }
return
$driver
;
}
1;