—package
threads::lite::list;
use
strict;
use
warnings;
use
5.010;
our
@EXPORT_OK
=
qw/parallel_map parallel_grep/
;
our
$VERSION
= 0.029_001;
our
$THREADS
||= DEFAULT_THREADS;
sub
_mapper {
my
(
undef
,
$filter
) = receive(
'filter'
,
qr//
);
my
$continue
= 1;
while
(
$continue
) {
receive_match {
when
(
@$_
== 4) {
my
(
$manager
,
undef
,
$index
,
$value
) =
@$_
;
local
$_
=
$value
;
$manager
->
send
(self,
'map'
,
$index
,
$filter
->());
}
when
([
'kill'
]) {
$continue
= 0;
}
default
{
carp
sprintf
"Received something unknown: (%s)\n"
,
join
','
,
@$_
;
}
};
}
return
;
}
sub
_receive_next {
my
$threads
=
shift
;
my
(
$thread
,
undef
,
$index
,
@value
) = receive(
$threads
,
'map'
,
qr//
,
qr//
);
return
(
$thread
,
$index
,
@value
);
}
sub
new {
my
$class
=
shift
;
my
%options
= (
modules
=> [],
threads
=>
$THREADS
,
@_
,
);
my
@modules
= (
'threads::lite::list'
, @{
$options
{modules} });
my
%threads
=
map
{ (
$_
->
id
=>
$_
) } spawn({
modules
=> \
@modules
,
monitor
=> 1,
pool_size
=>
$options
{threads} },
'threads::lite::list::_mapper'
);
$_
->
send
(
filter
=>
$options
{code})
for
values
%threads
;
return
bless
\
%threads
,
$class
;
}
sub
map
{
my
(
$self
,
@args
) =
@_
;
my
$i
= 0;
my
@ret
;
my
$id
= self;
my
%threads
= %{
$self
};
for
my
$thread
(
values
%threads
) {
last
if
$i
==
@args
;
$thread
->
send
(
$id
,
'map'
,
$i
,
$args
[
$i
]);
$i
++;
}
while
(
$i
<
@args
) {
my
(
$thread
,
$index
,
@value
) = _receive_next([
values
%threads
]);
$ret
[
$index
] = \
@value
;
$thread
->
send
(
$id
,
'map'
,
$i
,
$args
[
$i
]);
$i
++;
}
while
(
%threads
) {
my
(
$thread
,
$index
,
@value
) = _receive_next([
values
%threads
]);
$ret
[
$index
] = \
@value
;
delete
$threads
{
$thread
->id };
}
return
map
{ @{
$_
} }
@ret
;
}
sub
grep
{
my
(
$self
,
@args
) =
@_
;
my
@values
=
$self
->
map
(
@args
);
my
@ret
;
for
my
$i
(0..
$#args
) {
push
@ret
,
$args
[
$i
]
if
$values
[
$i
];
}
return
@ret
;
}
## no critic (Subroutines::ProhibitSubroutinePrototypes)
sub
parallel_map(&@) {
my
(
$code
,
$options
,
@args
) =
@_
;
my
$object
= __PACKAGE__->new((
$options
? %{
$options
} : ()),
code
=>
$code
);
return
$object
->
map
(
@args
);
}
sub
parallel_grep(&@) {
my
(
$code
,
$options
,
@args
) =
@_
;
my
$object
= __PACKAGE__->new((
$options
? %{
$options
} : ()),
code
=>
$code
);
return
$object
->
grep
(
@args
);
}
sub
DESTROY {
my
$self
=
shift
;
for
my
$thread
(
values
%{
$self
}) {
$thread
->
send
(
'kill'
);
receive(
'exit'
,
qr//
,
$thread
->id);
delete
$self
->{
$thread
->id };
}
return
;
}
1;
=head1 NAME
threads::lite::list - Threaded list utilities
=head1 VERSION
Version 0.029_001
=head1 SYNOPSIS
This module implements threads for perl. One crucial difference with normal threads is that the threads are B<entirely> disconnected, except by message queues (channel). It thus facilitates a message passing style of multi-threading.
=head1 FUNCTIONS
=head2 parallel_map { block } $options, @elements
map a list using multiple threads. $options is a hashref whose keys are like in C<new>.
=head2 parallel_grep { block } $options, @elements
grep a list using multiple threads. $options is a hashref whose keys are like in C<new>.
=head1 CLASS METHODS
A parallel list processing object can be created if you want to reuse your filter with other arguments.
=head2 new(%options)
Create a new parallel list processing object. It takes three named arguments.
=over 2
=item * code
A reference to the piece of code that should be executed, or it's name. Note that if a name is given, it's containing module must be loaded using C<modules>.
=item * modules
Modules that must be loaded be for the mapping or grepping.
=item * threads
The number of threads you want to use to do the mapping. The default is currently 4, an arbitrary number that may change in the future.
=back
=head1 INSTANCE METHODS
=head2 map(@elements)
Map elements in a parallel manner.
=head2 grep(@elements)
Grep elements in a parallel manner.
=head1 AUTHOR
Leon Timmermans, C<< <leont at cpan.org> >>
=head1 BUGS
This is an early development release, and is expected to be buggy and incomplete.
Please report any bugs or feature requests to C<bug-threads-lite at rt.cpan.org>, or through
the web interface at L<http://rt.cpan.org/NoAuth/ReportBug.html?Queue=threads-lite>. I will be notified, and then you'll
automatically be notified of progress on your bug as I make changes.
=head1 SUPPORT
You can find documentation for this module with the perldoc command.
perldoc threads::lite::list
You can also look for information at:
=over 4
=item * RT: CPAN's request tracker
=item * AnnoCPAN: Annotated CPAN documentation
=item * CPAN Ratings
=item * Search CPAN
=back
=head1 COPYRIGHT & LICENSE
Copyright 2009, 2010 Leon Timmermans, all rights reserved.
This program is free software; you can redistribute it and/or modify it
under the same terms as Perl itself.
=cut