# vim600: set ts=4 sw=4 tw=80 expandtab nowrap noai cin foldmethod=marker:
# A Multiplex TCP Component designed for performance.
# -----------------------------------------------------------------------------
# "THE BEER-WARE LICENSE" (Revision 43) borrowed from FreeBSD's jail.c:
# <tag@cpan.org> wrote this file. As long as you retain this notice you
# can do whatever you want with this stuff. If we meet some day, and you think
# this stuff is worth it, you can buy me a beer in return. Scott S. McCoy
# -----------------------------------------------------------------------------
# See TCPMulti.otl (TVO format) or TCPMulti.pod (POD format) for documentation
package POE::Component::Client::TCPMulti;
# Settings and Initialization {{{
use strict;
use warnings FATAL => qw( all );
use constant CHEAP => -1;
# POE::Component::Server::TCPMulti can export cheap also
# We're not going to require order from the user.
sub import {
no strict "refs";
my $caller = caller;
unless (defined *{"${caller}::CHEAP"}) {
*{"${caller}::CHEAP"} = \&CHEAP;
}
}
use UNIVERSAL;
use POE qw( Kernel
Session
Driver::SysRW
Filter::Line
Wheel::ReadWrite
Wheel::SocketFactory );
use Carp qw( carp croak );
*VERSION = \0.0524;
our $VERSION;
BEGIN {
unless (defined &DEBUG) {
constant->import(DEBUG => 0);
}
unless (defined &TRACE_EVENTS) {
constant->import(TRACE_EVENTS => 0);
}
unless (defined &TRACE_CONNECT) {
constant->import(TRACE_CONNECT => 0);
}
unless (defined &TRACE_FILENAME) {
constant->import(TRACE_FILENAME => 0);
}
}
if (DEBUG) {
print "TCPMulti: DEBUG MODE ENABLED\n";
}
if (TRACE_FILENAME) {
open TRACE, ">", TRACE_FILENAME;
}
else {
*TRACE = *STDERR;
}
# Heap is now package global. This is fine, each wheel throughout the POE
# Kernel has its own unique identification. So multiple component sessions
# can utilize the same hash for Connection Heaps.
# Note: Explicit lexical was not accessable by the inline states (This seems to
# be a bug in perl >= 5.8.1, although its marked as simply changed behavior in
# the changelog. Its only with strange combinations of lexicals anonymous
# subroutines and anonymous hashrefs (As commonly used in POE
# programming...bastards :P)
our %heap;
# }}}
# new (Depriciated) {{{
sub new { goto &create }
# }}}
# Constructor {{{
sub create {
# Initialization {{{
shift if $_[0] eq __PACKAGE__;
my ($code, %user_code);
%user_code = @_;
$user_code{$_} ||= sub {} for qw( ErrorEvent
InputEvent
Initialize
Disconnected
SuccessEvent
FlushedEvent
FailureEvent
TimeoutEvent );
$user_code{Timeout} ||= 30;
$user_code{ConnectTimeout} ||= $user_code{Timeout};
$user_code{InputTimeout} ||= 300;
$user_code{Filter} ||= "POE::Filter::Line";
$user_code{FilterArgs} ||= undef;
$user_code{options} ||= {};
$user_code{package_states} ||= [];
$user_code{object_states} ||= [];
if (ref $user_code{Filter} eq "ARRAY") {
my @FilterData = @{ delete $user_code{Filter} };
$user_code{Filter} = shift @FilterData;
$user_code{FilterArgs} = \@FilterData;
}
@{ $user_code{UserStates} }{ qw( _start _stop _child ) } =
delete @{ $user_code{inline_states} }{ qw( _start _stop _child ) };
# }}}
# Internal States {{{
$code = {
# Session Events {{{
# _start: Session Start {{{
_start => sub {
$_[KERNEL]->alias_set( delete $user_code{Alias} )
if defined $user_code{Alias};
$user_code{UserStates}->{_start}->(@_)
if ref $user_code{UserStates}->{_start} eq "CODE";
},
# }}}
# _child: Session Child {{{
_child => sub {
$user_code{states}->{_child}->(@_)
if ref $user_code{UserStates}->{_child} eq "CODE";
},
# }}}
# _stop: Session End {{{
_stop => sub {
$user_code{UserStates}->{_stop}->(@_)
if ref $user_code{UserStates}->{_stop} eq "CODE";
},
# }}}
# }}}
# Connection States {{{
# -success: Connection was successful (Internal) {{{
-success => sub {
my ($kernel, $handle, $old_id) = @_[KERNEL, ARG0, ARG3];
my $filter;
# We need 1 filter per Wheel...yeah
if (ref $user_code{Filter} &&
UNIVERSAL::isa($user_code{Filter}, "UNIVERSAL")) {
$filter = $user_code{Filter} = ref $user_code{Filter};
}
$filter = $user_code{Filter}->new( @{ $user_code{FilterArgs} } );
$heap{$old_id}{-SERVER} = POE::Wheel::ReadWrite->new
( Handle => $handle,
Driver => POE::Driver::SysRW->new(BlockSize => 4096),
Filter => $filter,
InputEvent => '-incoming',
ErrorEvent => '-error',
# FlushedEvent => '-flushed',
);
# Transfer entire heap (including wheel), reinstate -ID
my $new_id = $heap{$old_id}{-SERVER}->ID;
my $cheap = $heap{$new_id} = delete $heap{$old_id};
bless $heap{$new_id}, "POE::Component::Client::TCPMulti::CHEAP";
# ARG4 differs from Wheel definition...its our new id.
push @_, $new_id, $cheap;
$cheap->{-ID} = $new_id;
$cheap->{-TIMEOUT} = $user_code{InputTimeout};
if ($user_code{InputTimeout}) {
if ($cheap->{-ALARM}) {
DEBUG && printf "%d << Adjusting alarm %d (%d s)\n",
$new_id, @{ $_[CHEAP] }{qw( -ALARM -TIMEOUT )};
$kernel->delay_adjust
( $cheap->{-ALARM}, $cheap->{-TIMEOUT} );
}
else {
$cheap->{-ALARM} = $kernel->delay_set
( -timeout => $cheap->{-TIMEOUT}, $cheap->{-ID} );
}
}
# We should have an alarm ID -> maybe we're not storing it.
elsif ($cheap->{-ALARM}) {
$kernel->alarm_remove( delete $cheap->{-ALARM} );
}
$user_code{SuccessEvent}->(@_);
printf "%d == Successfull Connection %s:%d\n", $new_id,
@{ $heap{$new_id} }{qw( -ADDR -PORT )} if DEBUG;
},
# }}}
# connect: Open new connection {{{
# Connect to the next available proxy
connect => sub {
my $cheap;
if (ref $_[ARG0] eq "HASH" || ref $_[ARG0] eq "ARRAY") {
$cheap = splice @_, ARG0, 1;
}
my ($address, $port, $bindaddress, $bindport) = @_[ARG0..ARG3];
printf TRACE "connect event invoked (%s, %d) for %s from %s:%d\n",
@_[ ARG0, ARG1 ],
$cheap->{email}, # email para poeml lang
@_[ CALLER_FILE, CALLER_LINE ] if TRACE_CONNECT;
unless (defined $address) {
return printf STDERR
"connect called without address or port, %s: line %d\n",
@_[CALLER_FILE, CALLER_LINE];
}
printf STDERR "!!! !! connect state invoked from %s:%d\n",
@_[CALLER_FILE, CALLER_LINE] if DEBUG;
push @_, POE::Component::Client::TCPMulti->connect
( RemoteAddress => $address,
RemotePort => $port,
BindAddress => $bindaddress,
BindPort => $bindport,
Timeout => $user_code{ConnectTimeout},
Heap => $cheap,
);
$user_code{Initialize}->(@_);
},
# }}}
# }}}
# IO States {{{
# -incoming: Handling recieved data (Internal) {{{
-incoming => sub {
my ($kernel, $id) = @_[ KERNEL, ARG1 ];
push @_, $heap{$id};
my $cheap = $_[ CHEAP ];
return unless $cheap->{-RUNNING};
if (DEBUG) {
print "$_[ARG1] << $_[ARG0]\n";
}
if ($cheap->{-TIMEOUT}) {
$kernel->delay_adjust
( $cheap->{-ALARM}, $cheap->{-TIMEOUT} );
}
$user_code{InputEvent}(@_);
},
# }}}
# send: Send Data {{{
send => sub {
my $cheap = $heap{$_[ARG0]};
unless (defined $_[ARG1]) {
return printf STDERR
"send called without socket or data %s: line %d\n",
@_[CALLER_FILE, CALLER_LINE];
}
elsif (defined $cheap->{-SERVER}) {
if (DEBUG) {
print "$_[ARG0] >> $_[ARG1]\n";
}
$cheap->{-SERVER}->put( @_[ARG1 .. $#_] );
}
},
# }}}
# }}}
# Error States {{{
# -failure: Handle Connection Failure (Internal) {{{
-failure => sub {
printf "%d !! Disconnected - Failed (%s)\n", $_[ARG3], $_[ARG2]
if DEBUG;
push @_, $heap{$_[ARG3]};
# di ko alam kahit needed ito
$user_code{FailureEvent}->(@_) if $_[CHEAP]{-RUNNING};
# Redundant ( This is done in shutdown )
# delete $_[CHEAP];
# delete $heap{$_[ARG3]}{-SERVER};
$_[ARG0] = $_[ARG3];
$code->{shutdown}->(@_);
},
# }}}
# -error: Handle Connection Error (Internal) {{{
-error => sub {
printf "%d !! Disconnected - Error\n", $_[ARG3] if DEBUG;
push @_, $heap{$_[ARG3]};
$user_code{ErrorEvent}->(@_) if $_[CHEAP]{-RUNNING};
# Redundant
# delete $_[CHEAP];
# delete $heap{$_[ARG3]}{-SERVER};
$_[ARG0] = $_[ARG3];
$code->{shutdown}->(@_);
},
# }}}
# -timeout: Handle Connection Timeout (Internal) {{{
# Occsaionally -timeout is being called after the connection errors,
# thats what the extra check on -RUNNING is for, as well as in the
# other error states, just to ensure there is no problem. This doesn't
# really happen anymore but I'm not comfortable with it yet.
-timeout => sub {
# 20050330: timeouts aren't getting cleaned up!
# if ($heap{$_[ARG0]}{-RUNNING}) {
printf "%d ** Disconnected - Timeout\n", $_[ARG0] if DEBUG;
push @_, delete $heap{$_[ARG0]};
$user_code{TimeoutEvent}->(@_);
$user_code{Disconnected}->(@_);
# Just incase the cheap hangs around clean up the wheel
delete $_[CHEAP]->{-SERVER};
delete $_[CHEAP];
# kase sabi ito dalawa ng
# $code->{shutdown}->(@_);
# }
},
# }}}
# }}}
# Closing States {{{
# -flushed: Empty Socket (Internal) {{{
# flush - our socket is empty - Direct call is faster and fits reqs.
# -flushed => sub {
# unless ($heap{$_[ARG0]}{-RUNNING}) {
# $code->{shutdown}->(@_);
# }
# },
# }}}
# shutdown: Handle Socket Shutdown {{{
# Shutdown... push onto queue if not sent, delete driver.
shutdown => sub {
my ($kernel, $id) = @_[ KERNEL, ARG0 ];
unless (defined $id) {
return printf STDERR
"shutdown called without CHEAP id %s: line %d\n",
@_[CALLER_FILE, CALLER_LINE];
}
unless (exists $heap{$id}) {
die "$_[ARG0]: Socket doesn't exist?";
}
push @_, my $cheap = delete $heap{$id};
$cheap->{-RUNNING} = 0;
# Shutdown is now impolite.
# if (defined $heap{$_[ARG0]}{-SERVER}) {
# if ($heap{$_[ARG0]}{-SERVER}->can("get_driver_out_octets")) {
# unless ($heap{$_[ARG0]}{-SERVER}->get_driver_out_octets) {
printf "%d -- Disconnected - Closed\n", $_[ARG0]
if DEBUG;
# Remove Alarm, tanga ko ba!?
$kernel->alarm_remove
( delete $cheap->{-ALARM} );
$user_code{Disconnected}->(@_);
# Blow shit up
delete $_[CHEAP];
# }
# Its either gone and we're out of synch (shouldn't happen),
# or we want to wait for a clean shutdown.
return;
# }
# }
# Our wheel is dead if we didn't return above.
# This is kind of redundant, but much of this module is.
$_[KERNEL]->alarm_remove ( delete $heap{$_[ARG0]}{-ALARM} );
push @_, $heap{$_[ARG0]};
$user_code{Disconnected}->(@_);
delete $_[CHEAP];
delete $heap{$_[ARG0]};
# Don't do this unless we're flushed...
# delete $heap{$_[ARG0]};
},
# }}}
# die: Gracefully close all sockets {{{
# Shutdown quick, clean and gracefull.
die => sub {
$_[KERNEL]->call(shutdown => $_) for keys %heap;
$_[KERNEL]->alias_remove($_) for $_[KERNEL]->alias_list;
$_[KERNEL]->alarm_remove_all;
},
# }}}
# }}}
};
# }}}
# Session Constructor {{{
POE::Session->create
( inline_states => { %{ delete $user_code{inline_states} }, %$code },
object_states => delete $user_code{object_states},
package_states => delete $user_code{package_states},
options => delete $user_code{options},
args => delete $user_code{args},
);
# }}}
}
# }}}
# Connect Method {{{
sub connect {
my %Options = @_[1..$#_];
$Options{Heap} ||= {};
printf STDERR "!!! -> connect method called from %s:%d\n",
(caller)[1,2] if DEBUG;
my $server = POE::Wheel::SocketFactory->new
( RemoteAddress => $Options{RemoteAddress},
RemotePort => $Options{RemotePort},
BindAddress => $Options{BindAddress},
BindPort => $Options{BindPort},
SuccessEvent => '-success',
FailureEvent => '-failure',
Reuse => 'yes',
);
my $id = $server->ID;
printf TRACE "->connect(count %d, id %d, host (%s:%d) %s:%d);\n",
scalar keys %heap, $id, @Options{qw( RemoteAddress RemotePort )},
(caller)[1,2] if TRACE_CONNECT;
$heap{$id} = bless {
%{ $Options{Heap} },
-ID => $server->ID,
-ADDR => $Options{RemoteAddress},
-PORT => $Options{RemotePort},
-BINDA => $Options{BindAddress},
-BINDP => $Options{BindPort},
-RUNNING => 1,
-TIMEOUT => $Options{Timeout},
-SERVER => $server,
-STAMP => time,
}, __PACKAGE__ . "::CHEAP";
if ($heap{$id}{-TIMEOUT}) {
$heap{$id}{-ALARM} = $poe_kernel->delay_set
( -timeout => $heap{$id}{-TIMEOUT}, $id, $heap{$id}{email});
}
else {
$heap{$id}{-ALARM} = 0;
}
printf "%d ++ Connecting %s:%d \n", $id, @{ $heap{$id} }{qw( -ADDR -PORT )}
if DEBUG;
return $heap{$id};
}
# }}}
# CHEAP Package {{{
package POE::Component::Client::TCPMulti::CHEAP;
use POE::Kernel;
# Attribute Accessors {{{
sub ID {
shift->{-ID}
}
sub ADDR {
shift->{-ADDR}
}
sub PORT {
shift->{-PORT}
}
# }}}
# Filter Settings {{{
sub filter {
shift->{-SERVER}->set_filter( shift->new(@_) );
}
sub input_filter {
shift->{-SERVER}->set_input_filter( shift->new(@_) );
}
sub output_filter {
shift->{-SERVER}->set_output_filter( shift->new(@_) );
}
# }}}
# Timeout Setting {{{
sub timeout {
my ($cheap, $timeout) = @_;
$poe_kernel->alarm_remove($cheap->{-ALARM}) if $cheap->{-ALARM};
unless (defined $timeout) {
return $cheap->{-TIMEOUT};
}
if ($timeout) {
$cheap->{-TIMEOUT} = $timeout;
$cheap->{-STAMP} = time;
$cheap->{-ALARM} = $poe_kernel->delay_set
( -timeout => $cheap->{-TIMEOUT}, $cheap->{-ID});
}
else {
$cheap->{-TIMEOUT} = 0;
$cheap->{-ALARM} = 0;
$cheap->{-STAMP} = 0;
}
}
# }}}
# }}}
return "POE Rules";