use strict;
use warnings;
use vars qw($testno $loaded $srvtype);
BEGIN {
	my $tests = 80;
	print STDERR "Note: some tests have significant delays...\n";
	$^W= 1;
	$| = 1;
	print "1..$tests\n";
}

END {print "not ok $testno\n" unless $loaded;}

package Qable;
use Thread::Queue::Queueable;

use base qw(Thread::Queue::Queueable);

sub new {
	return bless {}, shift;
}

sub onEnqueue {
	my $obj = shift;
	my $class = ref $obj;
#	print STDERR "$class object enqueued\n";
	return $obj->SUPER::onEnqueue;
}

sub onDequeue {
	my ($class, $obj) = @_;
#	print STDERR "$class object dequeued\n";
	return $class->SUPER::onDequeue($obj);
}

sub onCancel {
	my $obj = shift;
#	print STDERR "Item cancelled.\n";
	1;
}

sub curse {
	my $obj = shift;
	return $obj->SUPER::curse;
}

sub redeem {
	my ($class, $obj) = @_;
	return $class->SUPER::redeem($obj);
}

1;

package SharedQable;
use Thread::Queue::Queueable;

use base qw(Thread::Queue::Queueable);

sub new {
	my %obj : shared = ( Value => 1);
	return bless \%obj, shift;
}

sub set_value {
	my $obj = shift;
	$obj->{Value}++;
	return 1;
}

sub get_value { return shift->{Value}; }

sub redeem {
	my ($class, $obj) = @_;
	return bless $obj, $class;
}

1;

package main;

use threads;
use threads::shared;
use Thread::Queue::Duplex;

$srvtype = 'init';

sub report_ok {

	print "ok $testno # ", shift, " for $srvtype\n";
	$testno++;
}

sub report_fail {

	print "not ok $testno # ", shift, " for $srvtype\n";
	$testno++;
}
#
#	test normal dequeue method
#
sub run_dq {
	my $q = shift;

	$q->listen;
	while (1) {
		my $left = $q->pending;

		my $req = $q->dequeue;
#print threads->self()->tid(), " run_dq dq'd\n";
		my $id = shift @$req;

		if ($req->[0] eq 'stop') {
			$q->respond($id, 'stopped');
			$q->ignore();
			last;
		}

		if ($req->[0] eq 'wait') {
			sleep($req->[1]);
		}

		if ($req->[1] && ref $req->[1] && (ref $req->[1] eq 'SharedQable')) {
			$req->[1]->set_value();
		}
#
#	ignore simplex msgs
#
		next
			unless $id;

		$q->marked($id) ?
			$q->respond($id, $q->get_mark($id)) :
			$q->respond($id, @$req);
	}
}
#
#	test nonblocking dequeue method
#
sub run_nb {
	my $q = shift;

	$q->listen;
	while (1) {
		my $req = $q->dequeue_nb;

		sleep 1, next
			unless $req;

#print "run_nb dq'd\n";
		my $id = shift @$req;

		$q->ignore(),
		$q->respond($id, 'stopped'),
		last
			if ($req->[0] eq 'stop');

#print STDERR join(', ', @$req), "\n";
		sleep($req->[1])
			if ($req->[0] eq 'wait');
#
#	ignore simplex msgs
#
		$q->respond($id, @$req)
			if $id;
	}
}
#
#	test timed dequeue method
#
sub run_until {
	my $q = shift;

	my $timeout = 2;
	$q->listen;
	while (1) {

		my $req = $q->dequeue_until($timeout);
		sleep 1, next
			unless $req;

#print "run_until dq'd\n";
		my $id = shift @$req;

		$q->ignore(),
		$q->respond($id, 'stopped'),
		last
			if ($req->[0] eq 'stop');

		sleep($req->[1])
			if ($req->[0] eq 'wait');
#
#	ignore simplex msgs
#
		$q->respond($id, @$req)
			if $id;
	}
}
#
#	acts as a requestor thread for class-level
#	wait tests
#
sub run_requestor {
	my $q = shift;

	$q->wait_for_listener;

	while (1) {
		my $id = $q->enqueue('request');
		my $resp = $q->wait($id);

		last
			if ($resp->[0] eq 'stop');
	}
	return 1;
}
#
#	test urgent dequeue method
#
sub run_urgent {
	my $q = shift;

	$q->listen;
	while (1) {
		my $req = $q->dequeue_urgent;

		sleep 1, next
			unless $req;

#print "run_urgent dq'd\n";
		my $id = shift @$req;

		$q->ignore(),
		$q->respond($id, 'stopped'),
		last
			if ($req->[0] eq 'stop');

		sleep($req->[1])
			if ($req->[0] eq 'wait');
#
#	ignore simplex msgs
#
		$q->respond($id, @$req)
			if $id;
	}
}

$testno = 1;

report_ok('load module');
#
#	create queue
#	spawn server thread
#	execute various requests
#	verify responses
#
#	test constructor
#
my $q = Thread::Queue::Duplex->new(ListenerRequired => 1);

report_ok('create queue');
#
#	test different kinds of dequeue
#
my @servers = (\&run_dq, \&run_nb, \&run_until);
my @types = ('normal', 'nonblock', 'timed');

my ($result, $id, $server);

my $start = $ARGV[0] || 0;
my $qable = Qable->new();
my $sharedqable = SharedQable->new();

foreach ($start..$#servers) {
	$server = threads->new($servers[$_], $q);
	$srvtype = $types[$_];
#
#	wait for a listener
#
	$q->wait_for_listener() ?
		report_ok('wait_for_listener()') :
		report_fail('wait_for_listener()');
#
#	test enqueue_simplex
#
	$id = $q->enqueue_simplex('foo', 'bar');
	defined($id) ?
		report_ok('enqueue_simplex()') :
		report_fail('enqueue_simplex()');
#
#	test enqueue
#
	$id = $q->enqueue('foo', 'bar');
	defined($id) ?
		report_ok('enqueue()') :
		report_fail('enqueue()');
#
#	test ready(); don't care about outcome
#	(prolly need eval here)
#
	$result = $q->ready($id);
	report_ok('ready()');
#
#	test wait()
#
	$result = $q->wait($id);

	(defined($result) &&
		($result->[0] eq 'foo') &&
		($result->[1] eq 'bar')) ?
		report_ok('wait()') :
		report_fail('wait()');
#
#	test dequeue_response
#
	$id = $q->enqueue('foo', 'bar');
	$result = $q->dequeue_response($id);
	(defined($result) &&
		($result->[0] eq 'foo') &&
		($result->[1] eq 'bar')) ?
		report_ok('dequeue_response()') :
		report_fail('dequeue_response()');
#
#	test Queueable enqueue
#
	$id = $q->enqueue('foo', $qable);
	defined($id) ?
		report_ok('enqueue() Queueable') :
		report_fail('enqueue() Queueable');

	$result = $q->wait($id);

	(defined($result) &&
		($result->[0] eq 'foo') &&
		(ref $result->[1]) &&
		(ref $result->[1] eq 'Qable')) ?
		report_ok('wait() Queueable') :
		report_fail('wait() Queueable');
#
#
#	test wait_until, enqueue_urgent
#
	$id = $q->enqueue('wait', 3);
	my $id1 = $q->enqueue('foo', 'bar');
	$result = $q->wait_until($id, 1);
	defined($result) ?
		report_fail('wait_until() expires') :
		report_ok('wait_until() expires');

	my $id2 = $q->enqueue_urgent('urgent', 'entry');
#
#	should get wait reply here
#
	$result = $q->wait_until($id, 5);
	defined($result) &&
		($result->[0] eq 'wait') ?
		report_ok('wait_until()') :
		report_fail('wait_until()');
#
#	should get urgent reply here
#
	$result = $q->wait($id2);
	defined($result) &&
		($result->[0] eq 'urgent') ?
		report_ok('enqueue_urgent()') :
		report_fail('enqueue_urgent()');
#
#	should get normal reply here
#
	$result = $q->wait($id1);
	defined($result) && ($result->[0] eq 'foo') ?
		report_ok('enqueue()') :
		report_fail('enqueue()');
#
#	test wait_any: need to queue up several
#
	my %ids = ();

	map { $ids{$q->enqueue('foo', 'bar')} = 1; } (1..10);
#
#	repeat here until all ids respond
#
	my $failed;
	while (keys %ids) {
		$result = $q->wait_any(keys %ids);
		$failed = 1,
		last
			unless defined($result) &&
				(ref $result) &&
				(ref $result eq 'HASH');
		map {
			$failed = 1
				unless delete $ids{$_};
		} keys %$result;
		last
			if $failed;
	}
	$failed ?
		report_fail('wait_any()') :
		report_ok('wait_any()');
#
#	test wait_any_until
#
	%ids = ();

	$ids{$q->enqueue('wait', '3')} = 1;
	map { $ids{$q->enqueue('foo', 'bar')} = 1; } (2..10);
	$failed = undef;

	$result = $q->wait_any_until(1, keys %ids);
	if ($result) {
		report_fail('wait_any_until()');
	}
	else {
		while (keys %ids) {
			$result = $q->wait_any_until(5, keys %ids);
			$failed = 1,
			last
				unless defined($result) &&
					(ref $result) &&
					(ref $result eq 'HASH');
			map {
				$failed = 1
					unless delete $ids{$_};
			} keys %$result;
			last
				if $failed;
		}
		$failed ?
			report_fail('wait_any_until()') :
			report_ok('wait_any_until()');
	}
#
#	test wait_all
#
	%ids = ();
	map { $ids{$q->enqueue('foo', 'bar')} = 1; } (1..10);
#
#	test available()
#
	sleep 1;
	my @avail = $q->available;
	scalar @avail ?
		report_ok('available (array)') :
		report_fail('available (array)');

	$id = $q->available;
	$id ?
		report_ok('available (scalar)') :
		report_fail('available (scalar)');

	$id = keys %ids;
	@avail = $q->available($id);
	scalar @avail ?
		report_ok('available (id)') :
		report_fail('available (id)');
#
#	make sure all ids respond
#
	$result = $q->wait_all(keys %ids);
	unless (defined($result) &&
		(ref $result) &&
		(ref $result eq 'HASH') &&
		(scalar keys %ids == scalar keys %$result)) {
		report_fail('wait_all()');
	}
	else {
		map { $failed = 1 unless delete $ids{$_}; } keys %$result;
		($failed || scalar %ids) ?
			report_fail('wait_all()') :
			report_ok('wait_all()');
	}
#
#	test wait_all_until
#
	%ids = ();
	map { $ids{$q->enqueue('wait', '1')} = 1; } (1..10);
#
#	make sure all ids respond
#
	$result = $q->wait_all_until(1, keys %ids);
	if (defined($result)) {
		report_fail('wait_all_until()');
	}
	else {
	# may need a warning print here...
		$result = $q->wait_all_until(20, keys %ids);
		map { $failed = 1 unless delete $ids{$_}; } keys %$result;
		($failed || scalar keys %ids) ?
			report_fail('wait_all_until()') :
			report_ok('wait_all_until()');
	}
#
#	test cancel()/cancel_all():
#	post a waitop
# 	post a no wait
#	wait a bit for server to pick up the first
#	cancel the nowait
#	check the pending count for zero
#	wait for waitop to finish
#
	$id = $q->enqueue('wait', 5);
	$id1 = $q->enqueue('foo', 'bar');
	$result = $q->wait_until($id, 3);
	$q->cancel($id1);
#print "Cancel: pending :", $q->pending, "\n";
	$q->pending ? report_fail('cancel()') : report_ok('cancel()');
	$result = $q->wait($id);
#
#	do same, but add multiple and cancel all
#
	$id = $q->enqueue('wait', 5);
	$id1 = $q->enqueue('first', 'bar');
	$id2 = $q->enqueue('second', 'bar');
	$result = $q->wait_until($id, 1);
	$q->cancel_all();
#print "Cancel all: pending :", $q->pending, " avail ", $q->available, "\n";
	$q->pending || $q->available ?
		report_fail('cancel_all()') : report_ok('cancel_all()');
#
#	kill the thread; also tests urgent i/f
#
	$id = $q->enqueue_urgent('stop');
	$id ? report_ok('enqueue_urgent()') :
		report_fail('enqueue_urgent()');
	$server->join;
#
#	wait for response, then test enqueue wo/ a listener
#
	$result = $q->wait($id);
	$q->enqueue('no listener') ?
		report_fail('enqueue() wo/ listener') :
		report_ok('enqueue() wo/ listener');

}	#end foreach server method
#
#	now test the class-level waits:
#	create an add'l queue
#	create a listener thread w/ old queue
#	create requestor thread w/ new queue
#
	my $newq = Thread::Queue::Duplex->new(ListenerRequired => 1);
	$server = threads->new($servers[0], $q);
	$srvtype = $types[0];
	$q->wait_for_listener();
#
#	test shared Queueable enqueue
#
	$id = $q->enqueue('foo', $sharedqable);
	defined($id) ?
		report_ok('enqueue() shared Queueable') :
		report_fail('enqueue() shared Queueable');

	$result = $q->wait($id);

	(defined($result) &&
		($result->[0] eq 'foo') &&
		(ref $result->[1]) &&
		(ref $result->[1] eq 'SharedQable') &&
		($result->[1]->get_value == 2)) ?
		report_ok('wait() Queueable') :
		report_fail('wait() Queueable');


	my $requestor = threads->new(\&run_requestor, $newq);
	$newq->listen();
	my @qs = ();
#
#	post request to listener
#	wait on both queues
#
	$id = $q->enqueue('wait', 3);
print "ID is undef!!!\n" unless defined($id);
#	@qs = Thread::Queue::Duplex->wait_any([$q, $id], [$newq]);
	@qs = Thread::Queue::Duplex->wait_any([$q, $id], $newq);
	unless (scalar @qs) {
		report_fail('class-level wait_any()');
	}
	else {
#
#	should get the newq only here...
#
		unless ((scalar @qs == 1) && ($qs[0] eq $newq)) {
			report_fail('class-level wait_any()');
		}
		else {
			my $req = $newq->dequeue();
			$newq->respond(shift @$req, 'ok');
		}
#
#	wait for other queue
#
		my $resp = $q->wait($id);
		report_ok('class-level wait_any()');
	}
#
#	now timed wait_any
#
	$id = $q->enqueue('wait', 5);
	@qs = Thread::Queue::Duplex->wait_any_until(3, [$q, $id], [$newq]);
	unless (scalar @qs) {
		report_fail('class-level wait_any_until()');
	}
	else {
#
#	should get the newq only here...
#
		unless ((scalar @qs == 1) && ($qs[0] eq $newq)) {
			report_fail('class-level wait_any_until()');
		}
		else {
			my $req = $newq->dequeue();
			$newq->respond(shift @$req, 'ok');
		}
#
#	wait for other queue
#
		my $resp = $q->wait($id);
		report_ok('class-level wait_any_until()');
	}
#
#	now wait_all
#
	$id = $q->enqueue('wait', 3);
	@qs = Thread::Queue::Duplex->wait_all([$q, $id], [$newq]);
	unless (scalar @qs == 2) {
		report_fail('class-level wait_all()');
	}
	else {
		foreach (@qs) {
			if ($_ eq $newq) {
				my $req = $newq->dequeue();
				$newq->respond(shift @$req, 'ok');
			}
			else {
#
#	wait for other queue
#
				my $resp = $q->wait($id);
			}
		}
		report_ok('class-level wait_all()');
	}
#
#	now timed wait_all
#
	$id = $q->enqueue('wait', 3);
	@qs = Thread::Queue::Duplex->wait_all_until(1, [$q, $id], [$newq]);
#
#	shouldn't get anything first time thru
#
	if (@qs) {
		report_fail('class-level wait_all_until()');
	}
	else {
		@qs = Thread::Queue::Duplex->wait_all_until(5, [$q, $id], [$newq]);
		unless (scalar @qs == 2) {
			report_fail('class-level wait_all_until()');
		}
		else {
			foreach (@qs) {
				if ($_ eq $newq) {
					my $req = $newq->dequeue();
					$newq->respond(shift @$req, 'stop');
				}
				else {
#
#	wait for other queue
#
					my $resp = $q->wait($id);
				}
			}
			report_ok('class-level wait_all_until()');
		}
	}
	$q->enqueue_simplex('stop');
	$server->join;
	$requestor->join;
#
#	make sure no one else is listening on our queue
#
	$q = Thread::Queue::Duplex->new(ListenerRequired => 1);
#
#	test max pending
#
	$q->set_max_pending(5);
	$server = threads->new(\&run_dq, $q);
	$q->wait_for_listener();
	my @ids = ();
	push @ids, $q->enqueue('wait', 5);
	sleep 1;
#
#	queue up several, then see if we block
#
	push @ids, $q->enqueue('foo', 'bar');
	push @ids, $q->enqueue_urgent('foo', 'bar');
	$q->enqueue_simplex_urgent('foo', 'bar');
	$q->enqueue_simplex('foo', 'bar');
	push @ids, $q->enqueue_urgent('foo', 'bar');
#
#	keep time, we should block at this point
#
	my $started = time();
	push @ids, $q->enqueue('foo', 'bar');
	(time() - $started > 1) ?
		report_ok('max_pending') :
		report_fail('max_pending');
#
#	consume all our responses
#
	$q->wait_all(@ids);
#
#	test mark
#
	my $failed = undef;
	my $id1 = $q->enqueue('wait', 3);
	$q->mark($id1, 'CANCEL');
	unless ($q->get_mark($id1) eq 'CANCEL') {
		$failed = 1;
	}
	else {
		sleep 3;	# give thread time to process both
		$result = $q->wait($id1);
		$failed = 1
			unless ($result->[0] eq 'CANCEL');
	}
	$failed ?
		report_fail('mark') :
		report_ok('mark');

	$q->enqueue_simplex('stop');
	$server->join;
#
#	test dequeue_urgent
#
	$server = threads->new(\&run_urgent, $q);
	$q->wait_for_listener();
	$id1 = $q->enqueue('bar', 'foo');
	my $id2 = $q->enqueue_urgent('foo', 'bar');
	sleep 3;	# give thread time to process both
	$result = $q->wait_any($id1, $id2);
	$failed = undef;
	foreach (keys %$result) {
		$failed = 1, last unless ($_ == $id2);
	}
	$failed ?
		report_fail('dequeue_urgent') :
		report_ok('dequeue_urgent');

	$q->enqueue_simplex_urgent('stop');
	$server->join;

$testno--;
$loaded = 1;