sub
get_type {
"group"
}
sub
get_jobs {
shift
->{jobs} }
sub
get_fail_with_members {
shift
->{fail_with_members} }
sub
get_stop_on_failure {
shift
->{stop_on_failure} }
sub
get_parallel {
shift
->{parallel} }
sub
get_scheduler {
shift
->{scheduler} }
sub
get_member_finished_callbacks {
shift
->{member_finished_callbacks} }
sub
set_jobs {
shift
->{jobs} =
$_
[1] }
sub
set_fail_with_members {
shift
->{fail_with_members} =
$_
[1] }
sub
set_stop_on_failure {
shift
->{stop_on_failure} =
$_
[1] }
sub
set_parallel {
shift
->{parallel} =
$_
[1] }
sub
set_member_finished_callbacks {
shift
->{member_finished_callbacks} =
$_
[1] }
sub
new {
my
$class
=
shift
;
my
%par
=
@_
;
my
(
$jobs
,
$fail_with_members
,
$stop_on_failure
) =
@par
{
'jobs'
,
'fail_with_members'
,
'stop_on_failure'
};
my
(
$parallel
,
$scheduler
,
$member_finished_callbacks
) =
@par
{
'parallel'
,
'scheduler'
,
'member_finished_callbacks'
};
$jobs
= []
unless
defined
$jobs
;
$fail_with_members
= 1
unless
defined
$fail_with_members
;
$stop_on_failure
= 1
unless
defined
$stop_on_failure
;
my
$self
=
$class
->SUPER::new(
@_
);
for
my
$cb
(
$member_finished_callbacks
) {
$cb
||= Event::ExecFlow::Callbacks->new;
$cb
= Event::ExecFlow::Callbacks->new(
$cb
)
if
ref
$cb
eq
'CODE'
;
}
$self
->set_jobs(
$jobs
);
$self
->set_fail_with_members(
$fail_with_members
);
$self
->set_stop_on_failure(
$stop_on_failure
);
$self
->set_parallel(
$parallel
);
$self
->set_scheduler(
$scheduler
);
$self
->set_member_finished_callbacks(
$member_finished_callbacks
);
return
$self
;
}
sub
set_frontend {
my
$self
=
shift
;
my
(
$frontend
) =
@_
;
$self
->SUPER::set_frontend(
$frontend
);
$_
->set_frontend(
$frontend
)
for
@{
$self
->get_jobs};
return
$frontend
;
}
sub
set_scheduler {
my
$self
=
shift
;
my
(
$scheduler
) =
@_
;
$self
->{scheduler} =
$scheduler
;
foreach
my
$job
( @{
$self
->get_jobs} ) {
$job
->set_scheduler(
$scheduler
)
if
$job
->get_type eq
'group'
;
}
return
$scheduler
;
}
sub
get_exec_type {
my
$self
=
shift
;
my
$job
=
$self
->get_next_job;
return
"sync"
if
not
$job
;
return
$job
->get_exec_type;
}
sub
get_diskspace_consumed {
my
$self
=
shift
;
my
$sum
=
$self
->SUPER::get_diskspace_consumed;
$sum
+=
$_
->get_diskspace_consumed
for
@{
$self
->get_jobs};
return
$sum
;
}
sub
get_diskspace_freed {
my
$self
=
shift
;
my
$sum
=
$self
->SUPER::get_diskspace_freed;
$sum
+=
$_
->get_diskspace_freed
for
@{
$self
->get_jobs};
return
$sum
;
}
sub
init {
my
$self
=
shift
;
$self
->SUPER::init();
foreach
my
$job
( @{
$self
->get_jobs} ) {
$job
->set_group(
$self
);
weaken(
$job
->{group});
$self
->add_child_post_callback(
$job
);
}
$self
->set_progress_max(
$self
->get_job_cnt);
1;
}
sub
reset_non_finished_jobs {
my
$self
=
shift
;
if
(
$self
->get_state ne
'finished'
) {
$self
->set_state(
"waiting"
);
$self
->set_cancelled(0);
$self
->set_error_message();
$self
->get_frontend->report_job_progress(
$self
);
}
foreach
my
$job
( @{
$self
->get_jobs} ) {
if
(
$job
->get_state ne
'finished'
) {
$job
->set_state(
"waiting"
);
$job
->set_cancelled(0);
$job
->set_error_message();
$self
->get_frontend->report_job_progress(
$job
);
}
$job
->reset_non_finished_jobs
if
$job
->get_type eq
'group'
;
}
1;
}
sub
get_job_cnt {
my
$self
=
shift
;
my
$cnt
= 0;
foreach
my
$job
( @{
$self
->get_jobs} ) {
$cnt
+=
$job
->get_job_cnt;
}
return
$cnt
;
}
sub
init_progress_state {
my
$self
=
shift
;
my
$progress_cnt
= 0;
foreach
my
$job
( @{
$self
->get_jobs} ) {
if
(
$job
->get_type eq
'group'
) {
$job
->init_progress_state;
$progress_cnt
+=
$job
->get_progress_cnt;
}
else
{
++
$progress_cnt
if
$job
->get_state eq
'finished'
||
$job
->get_state eq
'error'
;
}
}
$self
->set_progress_cnt(
$progress_cnt
);
$self
->set_progress_max(
$self
->get_job_cnt);
$self
->set_state(
"finished"
)
if
$self
->get_progress_cnt ==
$self
->get_progress_max;
1;
}
sub
set_group_in_all_childs {
my
$self
=
shift
;
foreach
my
$job
( @{
$self
->get_jobs} ) {
if
(
$job
->get_type eq
'group'
) {
$job
->set_group(
$self
);
weaken(
$job
->{group});
$job
->set_group_in_all_childs;
}
else
{
$job
->set_group(
$self
);
weaken(
$job
->{group});
}
}
1;
}
sub
increase_progress_max {
my
$self
=
shift
;
my
(
$add
) =
@_
;
my
$job
=
$self
;
while
(
$job
) {
$job
->set_progress_max(
$job
->get_progress_max +
$add
);
$job
=
$job
->get_group;
}
1;
}
sub
decrease_progress_max {
my
$self
=
shift
;
my
(
$del
) =
@_
;
my
$job
=
$self
;
while
(
$job
) {
$job
->set_progress_max(
$job
->get_progress_max -
$del
);
$job
=
$job
->get_group;
}
1;
}
sub
increase_progress_cnt {
my
$self
=
shift
;
my
(
$add
) =
@_
;
my
$job
=
$self
;
while
(
$job
) {
$job
->set_progress_cnt(
$job
->get_progress_cnt +
$add
);
$job
=
$job
->get_group;
}
1;
}
sub
decrease_progress_cnt {
my
$self
=
shift
;
my
(
$del
) =
@_
;
my
$job
=
$self
;
while
(
$job
) {
$job
->set_progress_cnt(
$job
->get_progress_cnt -
$del
);
$job
=
$job
->get_group;
}
1;
}
sub
add_job {
my
$self
=
shift
;
my
(
$job
) =
@_
;
push
@{
$self
->get_jobs},
$job
;
$job
->set_frontend(
$self
->get_frontend);
$job
->set_group(
$self
);
weaken(
$job
->{group});
my
$job_cnt
=
$job
->get_job_cnt;
$self
->increase_progress_max(
$job_cnt
)
if
$job_cnt
!= 0;
if
(
$self
->get_state eq
'finished'
||
$self
->get_state eq
'error'
) {
$self
->set_state(
"waiting"
);
}
$self
->add_child_post_callback(
$job
);
$self
->get_frontend->report_job_added(
$job
);
1;
}
sub
remove_job {
my
$self
=
shift
;
my
(
$job
) =
@_
;
my
$jobs
=
$self
->get_jobs;
my
$i
;
for
(
$i
=0;
$i
< @{
$jobs
}; ++
$i
) {
last
if
$jobs
->[
$i
] eq
$job
;
}
die
"Job with ID "
.
$job
->get_id.
" no member of this group"
if
$i
== @{
$jobs
};
splice
@{
$jobs
},
$i
, 1;
my
$job_cnt
=
$job
->get_job_cnt;
$self
->decrease_progress_max(
$job_cnt
)
if
$job_cnt
!= 0;
$self
->get_frontend->report_job_removed(
$job
);
1;
}
sub
get_job_by_name {
my
$self
=
shift
;
my
(
$job_name
) =
@_
;
foreach
my
$job
( @{
$self
->get_jobs} ) {
return
$job
if
$job
->get_name eq
$job_name
;
}
die
"Job '$job_name' not member of group '"
.
$self
->get_name.
"'"
;
}
sub
execute {
my
$self
=
shift
;
my
%par
=
@_
;
my
(
$skip
) =
$par
{
'skip'
};
$skip
=
""
if
!
defined
$skip
;
my
$blocked_job
;
while
( 1 ) {
if
(
$self
->get_cancelled
||
$self
->all_jobs_finished
|| (
$self
->get_error_message &&
$self
->get_stop_on_failure ) ) {
$self
->execution_finished;
if
(
$self
->get_scheduler &&
$self
->get_scheduler->is_exclusive ) {
$self
->get_scheduler->run;
}
return
;
}
return
if
$self
->get_scheduler &&
$self
->get_scheduler->is_exclusive;
my
$job
=
$self
->get_next_job(
blocked
=>
$blocked_job
);
next
if
defined
$job
&&
"$job"
eq
"$skip"
;
if
( !
$job
) {
$self
->try_reschedule_jobs(
skip
=>
$skip
);
last
;
}
if
(
$self
->get_scheduler ) {
my
$state
=
$self
->get_scheduler->schedule_job(
$job
);
return
if
$state
eq
'sched-blocked'
;
if
(
$state
eq
'job-blocked'
) {
$blocked_job
=
$job
;
next
;
}
die
"Illegal scheduler state '$state'"
unless
$state
eq
'ok'
;
}
$self
->start_child_job(
$job
);
last
if
!
$self
->get_parallel;
}
1;
}
sub
try_reschedule_jobs {
my
$self
=
shift
;
my
%par
=
@_
;
my
(
$skip
) =
$par
{
'skip'
};
my
$executed
= 0;
foreach
my
$job
( @{
$self
->get_jobs} ) {
next
if
"$job"
eq
"$skip"
;
if
(
$job
->get_type eq
'group'
&&
$job
->get_state eq
'running'
&&
$job
->get_parallel ) {
$job
->execute;
$executed
= 1;
}
}
if
( !
$executed
&&
$self
->get_group ) {
$self
->get_group->execute(
skip
=>
$self
);
}
1;
}
sub
cancel {
my
$self
=
shift
;
$self
->set_cancelled(1);
$_
->get_state eq
'running'
&&
$_
->cancel
for
@{
$self
->get_jobs};
1;
}
sub
pause_job {
my
$self
=
shift
;
$_
->get_state eq
'running'
&&
$_
->pause
for
@{
$self
->get_jobs};
1;
}
sub
add_child_post_callback {
my
$self
=
shift
;
my
(
$job
) =
@_
;
if
(
$job
->{_post_callbacks_added} ) {
return
;
Carp::confess(
$job
->get_info.
": callbacks added twice!"
);
}
$job
->{_post_callbacks_added} = 1;
$job
->get_post_callbacks->add(
sub
{
my
(
$job
) =
@_
;
$self
->child_job_finished(
$job
);
1;
});
1;
}
sub
start_child_job {
my
$self
=
shift
;
my
(
$job
) =
@_
;
$Event::ExecFlow::DEBUG
&&
print
"Group->start_child_job("
.
$job
->get_info.
")\n"
;
$self
->set_progress_cnt(0)
unless
defined
$self
->get_progress_cnt;
$self
->get_frontend->report_job_progress(
$self
);
$job
->start;
1;
}
sub
child_job_finished {
my
$self
=
shift
;
my
(
$job
) =
@_
;
$Event::ExecFlow::DEBUG
&&
print
"Group->child_job_finished("
.
$job
->get_info.
")\n"
;
$self
->get_member_finished_callbacks->execute()
if
$self
->get_member_finished_callbacks;
if
(
$job
->get_error_message && !
$job
->get_cancelled ) {
if
(
$self
->get_fail_with_members ) {
$self
->set_state(
"error"
);
$self
->add_job_error_message(
$job
);
}
}
if
(
$self
->get_scheduler ) {
$self
->get_scheduler->job_finished(
$job
);
}
$self
->execute;
1;
}
sub
add_job_error_message {
my
$self
=
shift
;
my
(
$job
) =
@_
;
my
$error_message
=
$self
->get_error_message ||
""
;
$error_message
.=
"Job '"
.
$job
->get_info.
"' "
.
"failed with error message:\n"
.
$job
->get_error_message.
"\n"
.
(
"-"
x80).
"\n"
;
$self
->set_error_message(
$error_message
);
1;
}
sub
get_first_job {
my
$self
=
shift
;
return
$self
->get_jobs->[0];
}
sub
get_next_job {
my
$self
=
shift
;
my
%par
=
@_
;
my
(
$blocked
) =
$par
{
'blocked'
};
$blocked
=
""
if
!
defined
$blocked
;
my
$next_job
;
foreach
my
$job
( @{
$self
->get_jobs} ) {
next
if
defined
$job
&&
"$job"
eq
"$blocked"
;
$Event::ExecFlow::DEBUG
&&
print
"Group("
.
$self
->get_info.
")->get_next_job: check "
.
$job
->get_info.
"=>"
.
$job
->get_state.
"\n"
;
if
(
$job
->get_state eq
'waiting'
&&
$self
->dependencies_ok(
$job
) ) {
$next_job
=
$job
;
last
;
}
}
$Event::ExecFlow::DEBUG
&&
print
"Group("
.
$self
->get_info.
")->get_next_job="
.
(
$next_job
?
$next_job
->get_info :
"NOJOB"
).
"\n"
;
return
$next_job
;
}
sub
dependencies_ok {
my
$self
=
shift
;
my
(
$job
) =
@_
;
foreach
my
$dep_job_name
( @{
$job
->get_depends_on} ) {
my
$dep_job
=
$self
->get_job_by_name(
$dep_job_name
);
$Event::ExecFlow::DEBUG
&&
print
"Job("
.
$job
->get_info.
")->dependencies_ok: check "
.
$dep_job
->get_info.
" =>"
.
$dep_job
->get_state.
"\n"
;
return
if
$dep_job
->get_state ne
'finished'
;
}
return
1;
}
sub
all_jobs_finished {
my
$self
=
shift
;
foreach
my
$job
( @{
$self
->get_jobs} ) {
return
0
if
$job
->get_state eq
'waiting'
||
$job
->get_state eq
'running'
;
}
return
1;
}
sub
get_max_diskspace_consumed {
my
$self
=
shift
;
my
(
$currently_consumed
,
$max_consumed
) =
@_
;
foreach
my
$job
( @{
$self
->get_jobs} ) {
(
$currently_consumed
,
$max_consumed
) =
$job
->get_max_diskspace_consumed
(
$currently_consumed
,
$max_consumed
);
}
return
(
$currently_consumed
,
$max_consumed
);
}
sub
backup_state {
my
$self
=
shift
;
my
$data_href
=
$self
->SUPER::backup_state();
delete
$data_href
->{jobs};
delete
$data_href
->{scheduler};
delete
$data_href
->{member_finished_callbacks};
my
$jobs
=
$self
->get_jobs;
foreach
my
$job
( @{
$jobs
} ) {
push
@{
$data_href
->{jobs}},
$job
->backup_state;
}
return
$data_href
;
}
sub
restore_state {
my
$self
=
shift
;
my
(
$data_href
) =
@_
;
my
$jobs
=
$self
->get_jobs;
$self
->SUPER::restore_state(
$data_href
);
my
$job_states
=
delete
$self
->{jobs};
my
$i
= 0;
foreach
my
$job
( @{
$jobs
} ) {
$job
->restore_state(
$job_states
->[
$i
]);
++
$i
;
}
$self
->set_jobs(
$jobs
);
1;
}
sub
add_stash_to_all_jobs {
my
$self
=
shift
;
my
(
$add_stash
) =
@_
;
$self
->add_stash(
$add_stash
);
foreach
my
$job
( @{
$self
->get_jobs} ) {
if
(
$job
->get_type eq
'group'
) {
$job
->add_stash_to_all_jobs(
$add_stash
);
}
else
{
$job
->add_stash(
$add_stash
);
}
}
}
sub
traverse_all_jobs {
my
$self
=
shift
;
my
(
$code
) =
@_
;
foreach
my
$job
( @{
$self
->get_jobs} ) {
$code
->(
$job
);
if
(
$job
->get_type eq
'group'
) {
$job
->traverse_all_jobs(
$code
);
}
}
1;
}
sub
get_job_with_id {
my
$self
=
shift
;
my
(
$job_id
) =
@_
;
my
$job
;
$self
->traverse_all_jobs(
sub
{
$job
=
$_
[0]
if
$_
[0]->get_id eq
$job_id
;
});
return
$job
;
}
1;