use
constant
DEBUG
=>
$ENV
{MariaDB_NonBlocking_DEBUG} // 0;
sub
TELL (@) {
say
STDERR __PACKAGE__,
': '
,
join
" "
,
@_
;
}
sub
new {
my
(
$class
,
$args
) =
@_
;
$args
//= {};
my
$loop
=
delete
$args
->{ev} || EV::default_loop();
my
$self
=
$class
->SUPER::new(
$args
);
$self
->{loop} =
$loop
;
return
$self
;
}
sub
_mysql_watchers_to_ev_watchers {
my
$wait_on
= 0;
$wait_on
|= EV::READ
if
$_
[0] & MYSQL_WAIT_READ;
$wait_on
|= EV::WRITE
if
$_
[0] & MYSQL_WAIT_WRITE;
return
$wait_on
;
}
sub
_ev_event_to_mysql_event {
return
MYSQL_WAIT_TIMEOUT
if
$_
[0] & EV::TIMER;
my
$events
= 0;
$events
|= MYSQL_WAIT_READ
if
$_
[0] & EV::READ;
$events
|= MYSQL_WAIT_WRITE
if
$_
[0] & EV::WRITE;
return
$events
;
}
sub
empty;
sub
__stop_watcher {
my
(
$watcher_type
,
$watcher
) =
@_
;
return
unless
$watcher
;
$watcher
->stop;
$watcher
->keepalive(0);
$watcher
->cb(\
&empty
);
}
our
%WATCHER_POOL
;
our
$WATCHER_POOL_MAX
= 2;
sub
__return_watcher_to_pool {
my
(
$watcher_type
,
$watcher
) =
@_
;
return
unless
$watcher
;
my
$pool
=
$WATCHER_POOL
{
$watcher_type
} //= [];
__stop_watcher(
$watcher_type
,
$watcher
);
return
if
@$pool
>=
$WATCHER_POOL_MAX
;
push
@$pool
,
$watcher
;
}
sub
_disarm_timer {
my
(
$maria
) =
@_
;
__stop_watcher(
$maria
->{watcher_storage}{timer});
}
sub
_clean_object {
my
(
$maria
) =
@_
;
my
$watchers
=
delete
$maria
->{watcher_storage} // {};
return
unless
%$watchers
;
foreach
my
$watcher_type
(
keys
%$watchers
) {
my
$watcher
=
delete
$watchers
->{
$watcher_type
};
$watcher_type
=
'io'
if
index
(
$watcher_type
,
'io'
) != -1;
$watcher_type
=
'timer'
if
index
(
$watcher_type
,
'timer'
) != -1;
__return_watcher_to_pool(
$watcher_type
,
$watcher
);
}
}
sub
__wrap_ev_cb {
my
(
$cb
) =
@_
;
return
sub
{
my
(
undef
,
$ev_event
) =
@_
;
my
$events_for_mysql
= _ev_event_to_mysql_event(
$ev_event
);
$cb
->(
$events_for_mysql
);
}
}
sub
__restart_watcher {
my
(
$existing_watcher
,
$cb
) =
@_
;
$existing_watcher
->cb(
$cb
);
$existing_watcher
->keepalive(1);
$existing_watcher
->start;
return
;
}
sub
_set_io_watcher {
my
(
$maria
,
$fd
,
$wait_for
,
$original_cb
) =
@_
;
my
$storage
=
$maria
->{watcher_storage} //= {};
my
$wrapped_cb
= __wrap_ev_cb(
$original_cb
);
my
$existing_watcher
=
$storage
->{io} ||=
pop
@{
$WATCHER_POOL
{io} //= [] };
my
$ev_mask
= _mysql_watchers_to_ev_watchers(
$wait_for
);
if
( !
$existing_watcher
) {
DEBUG && TELL
"Started new io watcher ($ev_mask)"
;
$storage
->{io} =
$maria
->{loop}->io(
$fd
,
$ev_mask
,
$wrapped_cb
);
return
;
}
DEBUG && TELL
"Reusing existing io watcher ($ev_mask)"
;
$existing_watcher
->set(
$fd
,
$ev_mask
);
__restart_watcher(
$existing_watcher
,
$wrapped_cb
);
return
;
}
sub
_set_timer {
my
(
$maria
,
$watcher_type
,
$timeout_s
,
$cb
) =
@_
;
my
$storage
=
$maria
->{watcher_storage} //= {};
my
$wrapped_cb
= __wrap_ev_cb(
$cb
);
my
$existing_watcher
=
$storage
->{
$watcher_type
}
||=
pop
@{
$WATCHER_POOL
{timer} //= [] };
$maria
->{loop}->now_update();
if
( !
$existing_watcher
) {
DEBUG && TELL
"Started new $watcher_type watcher"
;
$storage
->{
$watcher_type
} =
$maria
->{loop}->timer(
$timeout_s
,
0,
$wrapped_cb
,
);
return
;
}
DEBUG && TELL
"Reusing existing $watcher_type watcher"
;
$existing_watcher
->set(
$timeout_s
, 0 );
__restart_watcher(
$existing_watcher
,
$cb
);
return
;
}
1;