# $Id: Signal.pm,v 1.24 2010/03/25 12:52:36 dk Exp $
package IO::Lambda::Signal;
use vars qw(@ISA %SIGDATA);
@ISA = qw(Exporter);
@EXPORT_OK = qw(signal pid spawn new_signal new_pid new_process);
%EXPORT_TAGS = ( all => \@EXPORT_OK);

our $DEBUG = $IO::Lambda::DEBUG{signal} || 0;

use strict;
use Carp;
use IO::Handle;
use POSIX ":sys_wait_h";
use IO::Lambda qw(:all :dev);

my $MASTER = bless {}, __PACKAGE__;

# register yield handler
IO::Lambda::add_loop($MASTER);
END { IO::Lambda::remove_loop($MASTER) };

sub empty { 0 == keys %SIGDATA }

sub remove
{
	my $lambda = $_[1];
	my %rec;
	keys %SIGDATA;
	while ( my ($id, $v) = each %SIGDATA) {
		for my $r (@{$v-> {lambdas}}) {
			push @{$rec{$id}}, $r-> [0];
		}
	}
	while ( my ($id, $v) = each %rec) {
		unwatch_signal( $id, $_ ) for @$v;
	}
}

sub yield
{
	my %v = %SIGDATA;
	for my $id ( keys %v) {
		my $v = $v{$id};
		# use mutex in case signal happens right here during handling
		$v-> {mutex} = 0;
		warn "  yield sig $id\n" if $DEBUG > 1;
	AGAIN:  
		next unless $v-> {signal};

		my @r = @{$v-> {lambdas}};
		warn "  calling ", scalar(@r), " sig handlers\n" if $DEBUG > 1;
		for my $r ( @r) {
			my ( $lambda, $callback, @param) = @$r;
			$callback-> ( $lambda, @param);
		}

		my $sigs = $v-> {mutex};
		if ( $sigs) {
			warn "  caught $sigs signals during yield\n" if $DEBUG > 1;
			$v-> {signal} = $sigs;
			$v-> {mutex}  -= $sigs;
			goto AGAIN;
		}
	}
}

sub signal_handler
{
	my $id = shift;
	warn "SIG{$id}\n" if $DEBUG;
	return unless exists $SIGDATA{$id};
	$SIGDATA{$id}-> {signal}++;
	$SIGDATA{$id}-> {mutex}++;
	$IO::Lambda::LOOP-> signal($id) if $IO::Lambda::LOOP-> can('signal');
}

sub watch_signal
{
	my ($id, $lambda, $callback, @param) = @_;

	my $entry = [ $lambda, $callback, @param ];
	unless ( exists $SIGDATA{$id}) {
		$SIGDATA{$id} = {
			mutex   => 0,
			signal  => 0,
			save    => $SIG{$id},
			lambdas => [$entry],
		};
		$SIG{$id} = sub { signal_handler($id) };
		warn "install signal handler for $id ", _o($lambda), "\n" if $DEBUG > 1;
	} else {
		push @{ $SIGDATA{$id}-> {lambdas} }, $entry;
		warn "push signal handler for $id ", _o($lambda), "\n" if $DEBUG > 2;
	}
}

sub unwatch_signal
{
	my ( $id, $lambda) = @_;

	return unless exists $SIGDATA{$id};
		
	warn "remove signal handler for $id ", _o($lambda), "\n" if $DEBUG > 2;

	@{ $SIGDATA{$id}-> {lambdas} } = 
		grep { $$_[0] != $lambda } 
		@{ $SIGDATA{$id}-> {lambdas} };
	
	return if @{ $SIGDATA{$id}-> {lambdas} };
	
	warn "uninstall signal handler for $id\n" if $DEBUG > 1;

	if (defined($SIGDATA{$id}-> {save})) {
		$SIG{$id} = $SIGDATA{$id}-> {save};
	} else {
		delete $SIG{$id};
	}
	delete $SIGDATA{$id};
}

# create a lambda that either returns undef on timeout,
# or some custom value based on passed callback
sub signal_or_timeout_lambda
{
	my ( $id, $deadline, $condition) = @_;

	my $t;
	my $q = IO::Lambda-> new;

	# wait for signal
	my $c = $q-> bind;
	watch_signal( $id, $q, sub {
		my @ret = $condition-> ();
		return unless @ret;

		unwatch_signal( $id, $q);
		$q-> cancel_event($t) if $t;
		$q-> resolve($c);
		$q-> terminate(@ret); # result
		undef $c;
		undef $q;
	});

	# or wait for timeout
	$t = $q-> watch_timer( $deadline, sub {
		unwatch_signal( $id, $q);
		$q-> resolve($c);
		undef $c;
		undef $q;
		return undef; #result
	}) if $deadline;

	return $q;
}

sub new_process;
# condition
sub signal (&) { new_signal (context)-> condition(shift, \&signal, 'signal') }
sub pid    (&) { new_pid    (context)-> condition(shift, \&pid,    'pid') }
sub spawn  (&) { new_process-> call(context)-> condition(shift, \&spawn,  'spawn') }

sub new_signal
{
	my ( $id, $deadline) = @_;
	signal_or_timeout_lambda( $id, $deadline, 
		sub { 1 });
}

sub new_pid
{
	my ( $pid, $deadline) = @_;

	croak 'bad pid' unless $pid =~ /^\-?\d+$/;
	warn "new_pid($pid) ", _t($deadline), "\n" if $DEBUG;
	
	# avoid race conditions
	my ( $savesig, $early_sigchld);
	unless ( defined $SIGDATA{CHLD}) {
		warn "new_pid: install early SIGCHLD detector\n" if $DEBUG > 1;
		$savesig       = $SIG{CHLD};
		$early_sigchld = 0;
		$SIG{CHLD} = sub {
			warn "new_pid: early SIGCHLD caught\n" if $DEBUG > 1;
			$early_sigchld++
		};
	}

	# finished already
	if ( waitpid( $pid, WNOHANG) != 0) {
		if ( defined $early_sigchld) {
			if ( defined( $savesig)) {
				$SIG{CHLD} = $savesig;
			} else {
				delete $SIG{CHLD};
			}
		}
		warn "new_pid($pid): finished already with $?\n" if $DEBUG > 1;
		return IO::Lambda-> new-> call($?) 
	}

	# wait
	my $p = signal_or_timeout_lambda( 'CHLD', $deadline, sub {
		my $wp = waitpid($pid, WNOHANG);
		warn "waitpid($pid) = $wp\n" if $DEBUG > 1;
		return if $wp == 0;
		return $?;
	});

	warn "new_pid: new lambda(", _o($p), ")\n" if $DEBUG > 1;

	# don't let unwatch_signal() to restore it back to us
	$SIGDATA{CHLD}-> {save} = $savesig if defined $early_sigchld;

	# possibly have a race? gracefully remove the lambda
	if ( $early_sigchld) {

		# Got a signal, but that wasn't our pid. And neither it was
		# pid that we're watching.
		return $p if waitpid( $pid, WNOHANG) == 0;

		# Our pid is finished. Unwatch the signal.
		unwatch_signal( 'CHLD', $p);
		# Lambda will also never get executed - cancel it
		$p-> terminate;
		
		warn "new_pid($pid): finished with race: $?, ", _o($p), " killed\n" if $DEBUG > 1;
	
		return IO::Lambda-> new-> call($?); 
	}

	return $p;
}

sub new_process_posix
{
lambda {
	my $h   = IO::Handle-> new;
	my $pid = open( $h, '-|', @_);

	return undef, undef, $! unless $pid;

	this-> {pid} = $pid;
	$h-> blocking(0);

	my $buf;
	context readbuf, $h, \$buf, undef; # wait for EOF
tail {
	my ($res, $error) = @_;
	if ( defined $error) {
		close $h;
		return ($buf, $?, $error);
	}

	# finished already
	if (waitpid($pid, WNOHANG) != 0) {
		my ( $exitcode, $error) = ( $?, $! );
		close $h;
		return ($buf, $exitcode, $error);
	}
	# wait for it
	context $pid;
pid {
	close $h;
	return ($buf, shift);
}}}}

sub new_process_win32
{
	lambda {
		my @cmd = @_;
		context IO::Lambda::Thread::threaded( sub {
			my $k = `@cmd`;
			return $? ? ( undef, $?, $! ) : ( $k, 0, undef );
		});
		&tail();
	}
}


if ( $^O !~ /win32/i) {
	*new_process = \&new_process_posix;
} else {
	require IO::Lambda::Thread;
	unless ( $IO::Lambda::Thread::DISABLED) {
		*new_process = \&new_process_win32;
	} else {
		*new_process = sub { lambda { undef, undef, $IO::Lambda::Thread::DISABLED } };
	}
}


1;

__DATA__

=pod

=head1 NAME

IO::Lambda::Signal - wait for pids and signals

=head1 DESCRIPTION

The module provides access to the signal-based callbacks: generic signal listener
C<signal>, process ID listener C<pid>, and the asynchronous version of I<system>
call, C<spawn>.

=head1 SYNOPSIS

   use strict;
   use IO::Lambda qw(:all);
   use IO::Lambda::Signal qw(pid spawn);

   # pid
   my $pid = fork;
   exec "/bin/ls" unless $pid;
   lambda {
       context $pid, 5;
       pid {
          my $ret = shift;
	  print defined($ret) ? ("exitcode(", $ret>>8, ")\n") : "timeout\n";
       }
   }-> wait;

   # spawn
   this lambda {
      context "perl -v";
      spawn {
      	  my ( $buf, $exitcode, $error) = @_;
   	  print "buf=[$buf], exitcode=$exitcode, error=$error\n";
      }
   }-> wait;

=head2 USAGE

=over

=item pid ($PID, $TIMEOUT) -> $?|undef

Accepts PID and an optional deadline/timeout, returns either the process' exit status,
or undef on timeout.  The corresponding lambda is C<new_pid> :

   new_pid ($PID, $TIMEOUT) :: () -> $?|undef

=item signal ($SIG, $TIMEOUT) -> boolean

Accepts signal name and optional deadline/timeout, returns 1 if the signal was caught,
or C<undef> on timeout.  The corresponding lambda is C<new_signal> :

   new_signal ($SIG, $TIMEOUT) :: () -> boolean

=item spawn (@LIST) -> ( output, $?, $!)

Calls pipe open on C<@LIST>, reads all data printed by the child process,
and awaits for the process to finish. Returns three scalars - collected output,
process exitcode C<$?>, and an error string (usually C<$!>). The corresponding
lambda is C<new_process> :

   new_process (@LIST) :: () -> ( output, $?, $!)

Lambda objects created by C<new_process> have an additional field C<'pid'> 
initialized with the process pid value.

=back

=head1 LIMITATION

C<pid> and C<new_pid> don't work on win32 because win32 doesn't use
SIGCHLD/waitpid.  Native implementation of C<spawn> and C<new_process> doesn't
work for the same reason on win32 as well, therefore those were reimplemented
using threads, and require a threaded perl.

=head1 SEE ALSO

L<IO::Lambda>, L<perlipc>, L<IPC::Open2>, L<IPC::Run>

=head1 AUTHOR

Dmitry Karasik, E<lt>dmitry@karasik.eu.orgE<gt>.

=cut