The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.

NAME

Kafka::IO::Async - Pseudo async interface to nonblocking network communication with the Apache Kafka server with Coro. This module implements the same interface that usual Kafka::IO module

VERSION

Read documentation for Kafka::IO version 1.08 .

SYNOPSIS

    use 5.010;
    use strict;
    use warnings;

    use Scalar::Util qw(
        blessed
    );
    use Try::Tiny;

    use Kafka::IO::Async;

    my $io;
    try {
        $io = Kafka::IO::Async->new( host => 'localhost' );
    } catch {
        my $error = $_;
        if ( blessed( $error ) && $error->isa( 'Kafka::Exception' ) ) {
            warn 'Error: (', $error->code, ') ',  $error->message, "\n";
            exit;
        } else {
            die $error;
        }
    };

    # Closes and cleans up
    $io->close;
    undef $io;

DESCRIPTION

This module is private and should not be used directly.

In order to achieve better performance, methods of this module do not perform arguments validation.

The main features of the Kafka::IO::Async class are:

  • Provides an object oriented API for communication with Kafka.

  • This class allows you to create Kafka 0.9+ clients.

CONSTRUCTOR

new

Establishes TCP connection to given host and port, creates and returns Kafka::IO::Async IO object.

new() takes arguments in key-value pairs. The following arguments are currently recognized:

host => $host

$host is Kafka host to connect to. It can be a host name or an IP-address in IPv4 or IPv6 form (for example '127.0.0.1', '0:0:0:0:0:0:0:1' or '::1').

port => $port

Optional, default = $KAFKA_SERVER_PORT.

$port is integer attribute denoting the port number of to access Apache Kafka.

$KAFKA_SERVER_PORT is the default Apache Kafka server port that can be imported from the Kafka module.

timeout => $timeout

$REQUEST_TIMEOUT is the default timeout that can be imported from the Kafka module.

Special behavior when timeout is set to undef:

  • Alarms are not used internally (namely when performing gethostbyname).

  • Default $REQUEST_TIMEOUT is used for the rest of IO operations.

ip_version => $ip_version

Force version of IP protocol for resolving host name (or interpretation of passed address).

Optional, undefined by default, which works in the following way: version of IP address is detected automatically, host name is resolved into IPv4 address.

See description of $IP_V4, $IP_V6 in Kafka EXPORT.

METHODS

The following methods are provided by Kafka::IO::Async class:

send( $message <, $timeout> )

Sends a $message to Kafka.

The argument must be a bytes string.

Use optional $timeout argument to override default timeout for this request only.

Returns the number of characters sent.

receive( $length <, $timeout> )

Receives a message up to $length size from Kafka.

$length argument must be a positive number.

Use optional $timeout argument to override default timeout for this call only.

Returns a reference to the received message.

try_receive( $length <, $timeout> ) Receives a message up to $length size from Kafka.

$length argument must be a positive number.

Use optional $timeout argument to override default timeout for this call only.

Returns a reference to the received message.

close

Closes connection to Kafka server. Returns true if those operations succeed and if no error was reported by any PerlIO layer.

DIAGNOSTICS

When error is detected, an exception, represented by object of Kafka::Exception::IO class, is thrown (see Kafka::Exceptions).

code and a more descriptive message provide information about thrown exception. Consult documentation of the Kafka::Exceptions for the list of all available methods.

Authors suggest using of Try::Tiny's try and catch to handle exceptions while working with Kafka package.

Here is the list of possible error messages that Kafka::IO::Async may produce:

Invalid argument

Invalid arguments were passed to a method.

Cannot send

Message cannot be sent on a Kafka::IO::Async object socket.

Cannot receive

Message cannot be received.

Cannot bind

TCP connection cannot be established on given host and port.

Debug mode

Debug output can be enabled by passing desired level via environment variable using one of the following ways:

PERL_KAFKA_DEBUG=1 - debug is enabled for the whole Kafka package.

PERL_KAFKA_DEBUG=IO:1 - enable debug for Kafka::IO::Async only.

Kafka::IO::Async supports two debug levels (level 2 includes debug output of 1):

  1. Additional information about processing events/alarms.

  2. Dump of binary messages exchange with Kafka server.

SEE ALSO

The basic operation of the Kafka package modules:

Kafka - constants and messages used by the Kafka package modules.

Kafka::Connection - interface to connect to a Kafka cluster.

Kafka::Producer - interface for producing client.

Kafka::Consumer - interface for consuming client.

Kafka::Message - interface to access Kafka message properties.

Kafka::Int64 - functions to work with 64 bit elements of the protocol on 32 bit systems.

Kafka::Protocol - functions to process messages in the Apache Kafka's Protocol.

Kafka::IO::Async - low-level interface for communication with Kafka server.

Kafka::Exceptions - module designated to handle Kafka exceptions.

Kafka::Internals - internal constants and functions used by several package modules.

A wealth of detail about the Apache Kafka and the Kafka Protocol:

Main page at http://kafka.apache.org/

Kafka Protocol at https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol

SOURCE CODE

Kafka package is hosted on GitHub: https://github.com/TrackingSoft/Kafka

AUTHOR

Sergey Gladkov

Please use GitHub project link above to report problems or contact authors.

CONTRIBUTORS

Alexander Solovey

Jeremy Jordan

Sergiy Zuban

Vlad Marchenko

Damien Krotkine

COPYRIGHT AND LICENSE

Copyright (C) 2012-2017 by TrackingSoft LLC.

This package is free software; you can redistribute it and/or modify it under the same terms as Perl itself. See perlartistic at http://dev.perl.org/licenses/artistic.html.

This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.