no
warnings
qw( threads recursion uninitialized once )
;
our
$VERSION
=
'1.901'
;
use
if
$^O eq
'MSWin32'
,
'threads'
;
use
if
$^O eq
'MSWin32'
,
'threads::shared'
;
my
$is_MSWin32
= ($^O eq
'MSWin32'
) ? 1 : 0;
my
$use_pipe
= ($^O !~ /mswin|mingw|msys|cygwin/i && $] gt
'5.010000'
);
my
$tid
=
$INC
{
'threads.pm'
} ? threads->tid : 0;
sub
CLONE {
$tid
= threads->tid
if
$INC
{
'threads.pm'
};
}
sub
MCE::Mutex::Channel::_guard::DESTROY {
my
(
$pid
,
$obj
) = @{
$_
[0] };
CORE::
syswrite
(
$obj
->{_w_sock},
'0'
),
$obj
->{
$pid
} = 0
if
$obj
->{
$pid
};
return
;
}
sub
DESTROY {
my
(
$pid
,
$obj
) = (
$tid
? $$ .
'.'
.
$tid
: $$,
@_
);
CORE::
syswrite
(
$obj
->{_w_sock},
'0'
),
$obj
->{
$pid
} = 0
if
$obj
->{
$pid
};
CORE::
syswrite
(
$obj
->{_r_sock},
'0'
),
$obj
->{
$pid
.
'b'
} = 0
if
$obj
->{
$pid
.
'b'
};
if
(
$obj
->{_init_pid} eq
$pid
) {
(!
$use_pipe
||
$obj
->{impl} eq
'Channel2'
)
? MCE::Util::_destroy_socks(
$obj
,
qw(_w_sock _r_sock)
)
: MCE::Util::_destroy_pipes(
$obj
,
qw(_w_sock _r_sock)
);
}
return
;
}
my
@mutex
;
sub
_destroy {
my
$pid
=
$tid
? $$ .
'.'
.
$tid
: $$;
for
my
$i
( 0 ..
@mutex
- 1 ) {
CORE::
syswrite
(
$mutex
[
$i
]->{_w_sock},
'0'
),
$mutex
[
$i
]->{
$pid
} = 0
if
(
$mutex
[
$i
]->{
$pid
} );
CORE::
syswrite
(
$mutex
[
$i
]->{_r_sock},
'0'
),
$mutex
[
$i
]->{
$pid
.
'b'
} = 0
if
(
$mutex
[
$i
]->{
$pid
.
'b'
} );
}
}
sub
_save_for_global_cleanup {
push
(
@mutex
,
$_
[0]), weaken(
$mutex
[-1]);
}
sub
new {
my
(
$class
,
%obj
) = (
@_
,
impl
=>
'Channel'
);
$obj
{_init_pid} =
$tid
? $$ .
'.'
.
$tid
: $$;
$obj
{_t_lock} = threads::shared::share(
my
$t_lock
)
if
$is_MSWin32
;
$use_pipe
? MCE::Util::_pipe_pair(\
%obj
,
qw(_r_sock _w_sock)
)
: MCE::Util::_sock_pair(\
%obj
,
qw(_r_sock _w_sock)
);
CORE::
syswrite
(
$obj
{_w_sock},
'0'
);
bless
\
%obj
,
$class
;
if
(
caller
!~ /^MCE:?/ ||
caller
(1) !~ /^MCE:?/ ) {
MCE::Mutex::Channel::_save_for_global_cleanup(\
%obj
);
}
return
\
%obj
;
}
sub
lock
{
my
(
$pid
,
$obj
) = (
$tid
? $$ .
'.'
.
$tid
: $$,
shift
);
unless
(
$obj
->{
$pid
}) {
CORE::
lock
(
$obj
->{_t_lock}), MCE::Util::_sock_ready(
$obj
->{_r_sock})
if
$is_MSWin32
;
MCE::Util::_sysread(
$obj
->{_r_sock},
my
(
$b
), 1),
$obj
->{
$pid
} = 1;
}
return
;
}
sub
guard_lock {
&lock
(
@_
);
bless
([
$tid
? $$ .
'.'
.
$tid
: $$,
$_
[0] ], MCE::Mutex::Channel::_guard::);
}
*lock_exclusive
= \
&lock
;
*lock_shared
= \
&lock
;
sub
unlock {
my
(
$pid
,
$obj
) = (
$tid
? $$ .
'.'
.
$tid
: $$,
shift
);
CORE::
syswrite
(
$obj
->{_w_sock},
'0'
),
$obj
->{
$pid
} = 0
if
$obj
->{
$pid
};
return
;
}
sub
synchronize {
my
(
$pid
,
$obj
,
$code
) = (
$tid
? $$ .
'.'
.
$tid
: $$,
shift
,
shift
);
my
(
@ret
,
$b
);
return
unless
ref
(
$code
) eq
'CODE'
;
my
$guard
=
bless
([
$pid
,
$obj
], MCE::Mutex::Channel::_guard::);
unless
(
$obj
->{
$pid
}) {
CORE::
lock
(
$obj
->{_t_lock}), MCE::Util::_sock_ready(
$obj
->{_r_sock})
if
$is_MSWin32
;
MCE::Util::_sysread(
$obj
->{_r_sock},
$b
, 1),
$obj
->{
$pid
} = 1;
}
(
defined
wantarray
)
?
@ret
=
wantarray
?
$code
->(
@_
) :
scalar
$code
->(
@_
)
:
$code
->(
@_
);
return
wantarray
?
@ret
:
$ret
[-1];
}
*enter
= \
&synchronize
;
sub
timedwait {
my
(
$obj
,
$timeout
) =
@_
;
$timeout
= 1
unless
defined
$timeout
;
Carp::croak(
'MCE::Mutex::Channel: timedwait (timeout) is not valid'
)
if
(!looks_like_number(
$timeout
) ||
$timeout
< 0);
$timeout
= 0.0003
if
$timeout
< 0.0003;
local
$@;
my
$ret
=
''
;
eval
{
local
$SIG
{ALRM} =
sub
{
alarm
0;
die
"alarm clock restart\n"
};
alarm
$timeout
unless
$is_MSWin32
;
die
"alarm clock restart\n"
if
$is_MSWin32
&& MCE::Util::_sock_ready(
$obj
->{_r_sock},
$timeout
);
(!
$is_MSWin32
)
? (
$obj
->lock_exclusive,
$ret
= 1,
alarm
(0))
: (
$obj
->lock_exclusive,
$ret
= 1);
};
alarm
0
unless
$is_MSWin32
;
$ret
;
}
1;