package Thread::Conveyor::Array;

# Make sure we have version info for this module
# Make sure we are a belt
# Make sure we do everything by the book from now on

$VERSION = '0.21';
@ISA = qw(Thread::Conveyor);
use strict;

# Make sure we only load stuff when we actually need it

use load;

# Satisfy -require-

1;

#---------------------------------------------------------------------------

# The following subroutines are loaded only on demand

__END__

#---------------------------------------------------------------------------

# Class methods

#---------------------------------------------------------------------------
#  IN: 1 class with which to bless the object
# OUT: 1 instantiated object

sub new {

# Obtain the class
# Create the conveyor belt
# And bless it as an object

    my $class = shift;
    my @belt : shared;
    bless \@belt,$class;
} #new

#---------------------------------------------------------------------------

# object methods

#---------------------------------------------------------------------------
#  IN: 1 instantiated object
# OUT: 1 shared item on which you can lock

sub semaphore { shift } # semaphore

#---------------------------------------------------------------------------

# Object methods

#---------------------------------------------------------------------------
#  IN: 1 instantiated object
#      2..N parameters to be passed as a box onto the belt

sub put {

# Obtain the object
# Return now if nothing to do

    my $belt = shift;
    return unless @_;

# Make sure we're the only one putting things on the belt
# Freeze the parameters and put it in a box on the belt
# Signal the other worker threads that there is a new box on the belt

    lock( @$belt );
    push( @$belt,Thread::Serialize::freeze( @_ ) );
    threads::shared::cond_signal( @$belt );
} #put

#---------------------------------------------------------------------------
#  IN: 1 instantiated object
# OUT: 1..N parameters returned from a box on the belt

sub take {

# Obtain the object
# Create an empty box

    my $belt = shift;
    my $box;

# Make sure we're the only one working on the belt
# Wait until someone else puts something on the belt
# Take the box off the belt
# Wake up other worker threads if there are still boxes now

    {lock( @$belt );
     threads::shared::cond_wait( @$belt ) until @$belt;
     $box = shift( @$belt );
     threads::shared::cond_signal( @$belt ) if @$belt;
    } #@$belt

# Thaw the contents of the box and return the result

    Thread::Serialize::thaw( $box );
} #take

#---------------------------------------------------------------------------
#  IN: 1 instantiated object
# OUT: 1..N parameters returned from a box on the belt

sub take_dontwait {

# Obtain the object
# Make sure we're the only one handling the belt
# Return the result of taking of a box if there is one, or an empty list

    my $belt = shift;
    lock( @$belt );
    return @$belt ? $belt->take : ();
} #take_dontwait

#---------------------------------------------------------------------------
#  IN: 1 instantiated object
# OUT: 1..N references to data-structures in boxes

sub clean {

# Obtain the belt
# Return now after cleaning if we're not interested in the result
# Clean the belt and turn the boxes into references

    my $belt = shift;
    return $belt->_clean unless wantarray;
    map {[Thread::Serialize::thaw( $_ )]} $belt->_clean;
} #clean

#---------------------------------------------------------------------------
#  IN: 1 instantiated object
# OUT: 1..N references to data-structures in boxes

sub clean_dontwait {

# Obtain the belt
# Make sure we're the only one handling the belt
# Return the result of cleaning the belt if there are boxes, or an empty list

    my $belt = shift;
    lock( @$belt );
    return @$belt ? $belt->clean : ();
} #clean_dontwait

#---------------------------------------------------------------------------
#  IN: 1 instantiated object
#      2 ordinal number in array to return (default: 0)
# OUT: 1..N parameters returned from a box on the belt

sub peek {

# Obtain the object
# Create an empty box

    my $belt = shift;
    my $box;

# Make sure we're the only one working on the belt
# Wait until someone else puts something on the belt
# Copy the box off the belt
# Wake up other worker threads again

    {lock( @$belt );
     threads::shared::cond_wait( @$belt ) until @$belt;
     $box = $belt->[shift || 0];
     threads::shared::cond_signal( @$belt );
    } #@$belt

# Thaw the contents of the box and return the result

    Thread::Serialize::thaw( $box );
} #peek

#---------------------------------------------------------------------------
#  IN: 1 instantiated object
#      2 ordinal number in array to return (default: 0)
# OUT: 1..N parameters returned from a box on the belt

sub peek_dontwait {

# Obtain the object
# Make sure we're the only one handling the belt
# Return the result of taking of a box if there is one, or an empty list

    my $belt = shift;
    lock( @$belt );
    return @$belt ? $belt->peek( @_ ) : ();
} #peek_dontwait

#---------------------------------------------------------------------------
#  IN: 1 instantiated object
# OUT: 1 number of boxes still on the belt

sub onbelt { scalar(@{$_[0]}) } #onbelt

#---------------------------------------------------------------------------
#  IN: 1 instantiated object (ignored)

sub maxboxes {
    die "Cannot change throttling on a belt that was created unthrottled";
} #maxboxes

#---------------------------------------------------------------------------
#  IN: 1 instantiated object (ignored)

sub minboxes {
    die "Cannot change throttling on a belt that was created unthrottled";
} #minboxes

#---------------------------------------------------------------------------
#  IN: 1 instantiated object

sub shutdown { undef } #shutdown

#---------------------------------------------------------------------------
#  IN: 1 instantiated object
# OUT: 1 thread object associated with belt (always undef)

sub thread { undef } #thread

#---------------------------------------------------------------------------
#  IN: 1 instantiated object
# OUT: 1 thread id of thread object associated with belt (always undef)

sub tid { undef } #tid

#---------------------------------------------------------------------------

# Internal subroutines

#---------------------------------------------------------------------------
#  IN: 1 instantiated object
# OUT: 1..N all frozen boxes on the belt

sub _clean {

# Obtain the belt
# Initialize the list of frozen boxes

    my $belt = shift;
    my @frozen;

# Make sure we're the only one accessing the belt
# Wait until there is something on the belt
# Obtain the entire contents of the belt of we want it
# Clean the belt
# Notify the world again

    {lock( @$belt );
     threads::shared::cond_wait( @$belt ) until @$belt;
     @frozen = @$belt if wantarray;
     @$belt = ();
     threads::shared::cond_broadcast( @$belt );
    } #@$belt

# Return the frozen goods

    @frozen;
} #_clean

#---------------------------------------------------------------------------

__END__

=head1 NAME

Thread::Conveyor::Array - array implementation of Thread::Conveyor

=head1 DESCRIPTION

This class should not be called by itself, but only with a call to
L<Thread::Conveyor>.

=head1 AUTHOR

Elizabeth Mattijsen, <liz@dijkmat.nl>.

Please report bugs to <perlbugs@dijkmat.nl>.

=head1 COPYRIGHT

Copyright (c) 2002, 2003, 2004, 2007, 2010 Elizabeth Mattijsen <liz@dijkmat.nl>.
All rights reserved.  This program is free software; you can redistribute it
and/or modify it under the same terms as Perl itself.

=head1 SEE ALSO

L<Thread::Conveyor>.

=cut