The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.

NAME

Net::WebSocket::EVx - Perl wrapper around Wslay websocket library

DESCRIPTION

Net::WebSocket::EVx - websocket module based on EV and Alien::Wslay. This is fork of Net::WebSocket::EV which looks abandoned. The main differences are usage of Alien::Wslay and rsv bit support (eg. for compressed tranfers)

SYNOPSIS

app.psgi for websocket echo server with compression support:

    use strict; use experimental 'signatures';
    use Net::WebSocket::EVx;
    use Compress::Raw::Zlib qw'Z_SYNC_FLUSH Z_OK MAX_WBITS';
    use Digest::SHA1 'sha1_base64';

    use constant {
        ws_max_size => 1<<31-1,
        ws_guid => '258EAFA5-E914-47DA-95CA-C5AB0DC85B11',
        ws_inflate_tail => pack(C4 => 0, 0, 255, 255),
        crlf => "\015\012"
    };

    sub ($env) {
        return [200, ['access-control-allow-origin', $env->{uc'http_origin'} // '*'], []] unless
            ($env->{uc'http_connection'}//'') eq 'upgrade' && ($env->{uc'http_upgrade'}//'') eq 'websocket';
        return [400, [], ['expecting ws v13 handshake']] unless
            ($env->{uc'http_sec_websocket_version'}//'') eq '13' && $env->{uc'http_sec_websocket_key'};
        return [500, [], []] unless exists $env->{'psgix.io'};
        my ($deflate, $inflate);
        if (($env->{uc'http_sec_websocket_extensions'} // '') =~ /permessage-deflate/) {
            $deflate = Compress::Raw::Zlib::Deflate->new(WindowBits => -MAX_WBITS);
            $inflate = Compress::Raw::Zlib::Inflate->new(WindowBits => -MAX_WBITS, Bufsize => ws_max_size, LimitOutput => 1);
        }
        sub {
            my $io = $env->{'psgix.io'};
            my $key = sha1_base64($env->{uc'http_sec_websocket_key'}.ws_guid);
            my $got = syswrite $io, my $handshake = join crlf,
                'HTTP/1.1 101 Switching Protocols', 'connection: upgrade', 'upgrade: websocket',
                $deflate ? 'sec-websocket-extensions: permessage-deflate' : (),
                "sec-websocket-accept: $key=", crlf;
            die "failed to write ws handshake in one go $len/$got: $!" unless $got and $got == length $handshake;
            open(my $fh, '+<&', $io) or die $!;
            my $srv; $srv = Net::WebSocket::EVx->new({
                fh => $fh, max_recv_size => ws_max_size,
                on_msg_recv => sub ($rsv, $opcode, $msg, $status_code) {
                    $srv->queue_msg($msg), return unless $rsv && $inflate; # plain echo
                    return unless $inflate->inflate(($msg .= ws_inflate_tail), my $out) == Z_OK;
                    return unless $deflate->deflate($out, $msg) == Z_OK && $deflate->flush($msg, Z_SYNC_FLUSH) == Z_OK;
                    substr $msg, -4, 4, ''; # cut deflated tail
                    $srv->queue_msg_ex($msg);
                },
                on_close => sub ($code) { undef $_ for $io, $fh, $srv, $inflate, $deflate } });
            return
        }
    }

run it via Twiggy/Feersum which support "psgix.io":

    plackup -l $(realpath app.sock) -s Feersum app.psgi

put nginx in front:

    http {
      upstream app { server unix:app.sock; }
      map $http_upgrade $connection_upgrade { default upgrade; '' ''; }
      server {
        listen 127.0.0.1:5000;
        location / {
          proxy_pass http://app;
          proxy_ignore_client_abort on;
          proxy_set_header Host $http_host;
          proxy_set_header X-Forwarded-For $http_x_forwarded_for;
          proxy_set_header X-Forwarded-Proto $http_x_forwarded_proto;
          proxy_set_header Upgrade $http_upgrade;
          proxy_set_header Connection $connection_upgrade;
        }
      }
    }

run it:

    /usr/sbin/nginx -p . -e nginx.err -c nginx.conf

METHODS

new( { params } )

Params:

fh or fd

Filehandle or numeric file descriptor of socket to use. Socket must be set in non blocking mode.

Net::WebSocket::EVx doesn't do handshake, you must do it before calling new().

type

Either "client" or "server"

default: server

buffering

If set to 0 - disables buffering. on_msg_recv is always called with empty $msg, use on_frame_recv_* to handle messages. Useful for handling big binary data without buffering it in memory.

Default if not defined : 1

max_recv_size

Max message or frame size, see https://tatsuhiro-t.github.io/wslay/man/wslay_event_config_set_max_recv_msg_length.html

on_msg_recv

This callback is called when library receives complete message. Close messages aren't handled by this callback. When buffering is disabled $msg is always empty

Callback arguments: my ($rsv, $opcode, $msg, $status_code) = @_;

on_msg_recv

This callback is called when library receives complete message. Close messages aren't handled by this callback. When buffering is disabled $msg is always empty

Callback arguments: my ($rsv, $opcode, $msg, $status_code) = @_;

on_close

Called when connection is closed.

Callback arguments: my ($close_code) = @_;

genmask

Used only by Net::WebSocket::EVx type=client mode. Must return $len bytes scalar to mask message. If not specified, then simple rand() mask generator is used.

Callback arguments: my ($len) = @_;

on_frame_recv_start

Called when frame header is received.

Callback arguments: my ($fin, $rsv,$opcode,$payload_length) = @_;

on_frame_recv_chunk

Called when next data portion is received.

Callback arguments: my ($data) = @_;

on_frame_recv_end

Called when message is received. No arguments

queue_msg( message, opcode )

Queue message, opcode is optional default is 1 (text message)

queue_msg_ex( message, opcode, rsv )

Queue message, opcode is optional default is 1 (text message) rsv is optional default is WSLAY_RSV1_BIT

queue_fragmented ( callback, opcode )

Queue fragmented message, opcode is optional, default is 2 (binary message)

Callback arguments: my ($len) = @_;

Callback must return array of two elements ( "scalar $len or less(can be 0) bytes length", status )

Status can be:

WS_FRAGMENTED_DATA - Data chunk, optional status value, you can just return one scalar with data. Wslay will constantly re-invoke callback when it returns WS_FRAGMENTED_DATA. It will let other events run, but you will get 100% CPU load if there is no data to send and your callback always returns WS_FRAGMENTED_DATA with empty scalar while waiting for data. To prevent this use ->stop_write to suspend all IO when there is no more data to send and ->start_write when new portion of data is ready.

WS_FRAGMENTED_ERROR - Error. Don't call callback anymore

WS_FRAGMENTED_EOF - End of message.

queue_fragmented_ex ( callback, opcode, rsv )

Queue fragmented message, opcode is optional, default is 2 (binary message) rsv is optional default is WSLAY_RSV1_BIT

wait(cb)

Callback called when send queue becomes empty.

queued_count()

Returns number of messages in send queue

start() and stop()

Start or stop all websocket IO

start_read() and stop_write()

start_read() and stop_read()

shutdown_read() and shutdown_write()

Disable read or write. There is no way to enable it again, use start_* and stop_* instead

close( status_code, reason_data)

Queue close frame. Status and reason are optional.

Possible attack vector: client can hold connection after receiving close frame and make a lot of connections. So if you want to guaranteed close connection, then call $ws->close() and ->wait until close frame will be sent, then close $ws->{fh}.