package
DBIx::Class::Storage::BlockRunner;
has
storage
=> (
is
=>
'ro'
,
required
=> 1,
);
has
wrap_txn
=> (
is
=>
'ro'
,
required
=> 1,
);
has
retry_handler
=> (
is
=>
'ro'
,
required
=> 1,
isa
=> qsub
q{
(Scalar::Util::reftype($_[0])||'') eq 'CODE'
or DBIx::Class::Exception->throw('retry_handler must be a CODE reference')
}
,
);
has
retry_debug
=> (
is
=>
'rw'
,
default
=> qsub
'$ENV{DBIC_STORAGE_RETRY_DEBUG}'
,
lazy
=> 1,
);
has
max_attempts
=> (
is
=>
'ro'
,
default
=> 20,
);
has
failed_attempt_count
=> (
is
=>
'ro'
,
init_arg
=>
undef
,
writer
=>
'_set_failed_attempt_count'
,
default
=> 0,
lazy
=> 1,
trigger
=> qsub
q{
$_[0]->throw_exception( sprintf (
'Reached max_attempts amount of %d, latest exception: %s',
$_[0]->max_attempts, $_[0]->last_exception
)) if $_[0]->max_attempts <= ($_[1]||0);
}
,
);
has
exception_stack
=> (
is
=>
'ro'
,
init_arg
=>
undef
,
clearer
=>
'_reset_exception_stack'
,
default
=> qsub
q{ [] }
,
lazy
=> 1,
);
sub
last_exception {
shift
->exception_stack->[-1] }
sub
throw_exception {
shift
->storage->throw_exception (
@_
) }
sub
run {
my
$self
=
shift
;
$self
->_reset_exception_stack;
$self
->_set_failed_attempt_count(0);
my
$cref
=
shift
;
$self
->throw_exception(
'run() requires a coderef to execute as its first argument'
)
if
( reftype(
$cref
)||
''
) ne
'CODE'
;
my
$storage
=
$self
->storage;
return
$cref
->(
@_
)
if
(
$storage
->{_in_do_block}
and
!
$self
->wrap_txn
);
local
$storage
->{_in_do_block} = 1
unless
$storage
->{_in_do_block};
return
$self
->_run(
$cref
,
@_
);
}
sub
_run {
weaken(
my
$self
=
shift
);
weaken(
my
$cref
=
shift
);
my
$args
=
@_
? \
@_
: [];
my
$txn_init_depth
=
$self
->wrap_txn ?
$self
->storage->transaction_depth :
undef
;
my
$txn_begin_ok
;
my
$run_err
=
''
;
return
preserve_context {
try
{
if
(
defined
$txn_init_depth
) {
$self
->storage->txn_begin;
$txn_begin_ok
= 1;
}
$cref
->(
@$args
);
}
catch
{
$run_err
=
$_
;
();
};
}
replace
=>
sub
{
my
@res
=
@_
;
my
$storage
=
$self
->storage;
my
$cur_depth
=
$storage
->transaction_depth;
if
(
defined
$txn_init_depth
and ! is_exception
$run_err
) {
my
$delta_txn
= (1 +
$txn_init_depth
) -
$cur_depth
;
if
(
$delta_txn
) {
carp (
sprintf
'Unexpected reduction of transaction depth by %d after execution of '
.
'%s, skipping txn_commit()'
,
$delta_txn
,
$cref
,
)
unless
$delta_txn
== 1 and
$cur_depth
== 0;
}
else
{
$run_err
=
eval
{
$storage
->txn_commit; 1 } ?
''
: $@;
}
}
if
( is_exception
$run_err
) {
if
(
$txn_begin_ok
) {
my
$rollback_exception
=
$storage
->_seems_connected
? (!
eval
{
$storage
->txn_rollback; 1 }) ? $@ :
''
:
'lost connection to storage'
;
if
(
$rollback_exception
and (
!
defined
blessed
$rollback_exception
or
!
$rollback_exception
->isa(
'DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION'
)
) ) {
$run_err
=
"Transaction aborted: $run_err. Rollback failed: $rollback_exception"
;
}
}
push
@{
$self
->exception_stack },
$run_err
;
$self
->_set_failed_attempt_count(
$self
->failed_attempt_count + 1);
$storage
->throw_exception(
$run_err
)
if
(
(
defined
$txn_init_depth
and
$txn_init_depth
> (
$storage
->{_dbh_autocommit} ? 0 : 1 )
) or !
$self
->retry_handler->(
$self
)
);
carp(
sprintf
'Retrying %s (attempt %d) after caught exception: %s'
,
$cref
,
$self
->failed_attempt_count + 1,
$run_err
,
)
if
$self
->retry_debug;
$storage
->ensure_connected;
$storage
->throw_exception(
sprintf
'Unexpected transaction depth of %d on freshly connected handle'
,
$storage
->transaction_depth,
)
if
(
defined
$txn_init_depth
and
$storage
->transaction_depth);
return
$self
->_run(
$cref
,
@$args
);
}
return
wantarray
?
@res
:
$res
[0];
};
}
1;