Kafka::Connection - Object interface to connect to a kafka cluster.
This documentation refers to Kafka::Connection version 0.8008 .
Kafka::Connection
use 5.010; use strict; use warnings; use Scalar::Util qw( blessed ); use Try::Tiny; # A simple example of Kafka::Connection usage: use Kafka::Connection; # connect to local cluster with the defaults my $connection; try { $connection = Kafka::Connection->new( host => 'localhost' ); } catch { if ( blessed( $_ ) && $_->isa( 'Kafka::Exception' ) ) { warn $_->message, "\n", $_->trace->as_string, "\n"; exit; } else { die $_; } }; # Closes the connection and cleans up undef $connection;
The main features of the Kafka::Connection class are:
Provides API for communication with Kafka 0.8 cluster.
Performs requests encoding and responses decoding, provides automatic selection or promotion of a leader server from Kafka cluster.
Provides information about Kafka cluster.
The following constants are available for export
%RETRY_ON_ERRORS
These are non-fatal errors, which when happen causes refreshing of meta-data from Kafka followed by another attempt to fetch data.
new
Creates Kafka::Connection object for interaction with Kafka cluster. Returns created Kafka::Connection object.
new() takes arguments in key-value pairs. The following arguments are currently recognized:
new()
host => $host
$host is any Apache Kafka cluster host to connect to. It can be a hostname or the IP-address in the "xx.xx.xx.xx" form.
$host
Optional. Either host or broker_list must be supplied.
host
broker_list
port => $port
Optional, default = $KAFKA_SERVER_PORT.
$KAFKA_SERVER_PORT
$port is the attribute denoting the port number of the service we want to access (Apache Kafka service). $port should be an integer number.
$port
$KAFKA_SERVER_PORT is the default Apache Kafka server port constant (9092) that can be imported from the Kafka module.
9092
broker_list => $broker_list
Optional, $broker_list is a reference to array of the host:port strings, defining the list of Kafka servers. This list will be used to locate the new leader if the server specified via host => $host and port => $port arguments becomes unavailable. Either host or broker_list must be supplied.
$broker_list
timeout => $timeout
Optional, default = $Kafka::REQUEST_TIMEOUT.
$Kafka::REQUEST_TIMEOUT
$timeout specifies how long we wait for the remote server to respond. $timeout is in seconds, could be a positive integer or a floating-point number not bigger than int32 positive integer.
$timeout
Special behavior when timeout is set to undef:
timeout
undef
Alarms are not used internally (namely when performing gethostbyname).
gethostbyname
Default $REQUEST_TIMEOUT is used for the rest of IO operations.
$REQUEST_TIMEOUT
CorrelationId => $correlation_id
Optional, default = undef .
Correlation is a user-supplied integer. It will be passed back with the response by the server, unmodified. The $correlation_id should be an integer number.
Correlation
$correlation_id
An exception is thrown if CorrelationId in response does not match the one supplied in request.
CorrelationId
If CorrelationId is not provided, it is set to a random negative integer.
SEND_MAX_RETRIES => $retries
Optional, int32 signed integer, default = $Kafka::SEND_MAX_RETRIES .
$Kafka::SEND_MAX_RETRIES
In some circumstances (leader is temporarily unavailable, outdated metadata, etc) we may fail to send a message. This property specifies the maximum number of attempts to send a message. The $retries should be an integer number.
$retries
RECEIVE_MAX_RETRIES => $retries
Optional, int32 signed integer, default = $Kafka::RECEIVE_MAX_RETRIES .
$Kafka::RECEIVE_MAX_RETRIES
In some circumstances (temporarily network issues, server high load, socket error, etc) we may fail to receive a response. This property specifies the maximum number of attempts to receive a message. The $retries should be an integer number.
RETRY_BACKOFF => $backoff
Optional, default = $Kafka::RETRY_BACKOFF .
$Kafka::RETRY_BACKOFF
Since leader election takes a bit of time, this property specifies the amount of time, in milliseconds, that the producer waits before refreshing the metadata. The $backoff should be an integer number.
$backoff
AutoCreateTopicsEnable => $mode
Optional, default value is 0 (false).
AutoCreateTopicsEnable controls how this module handles the first access to non-existent topic when auto.create.topics.enable in server configuration is true. If AutoCreateTopicsEnable is false (default), the first access to non-existent topic produces an exception; however, the topic is created and next attempts to access it will succeed.
auto.create.topics.enable
true
If AutoCreateTopicsEnable is true, this module waits (according to the SEND_MAX_RETRIES and RETRY_BACKOFF properties) until the topic is created, to avoid errors on the first access to non-existent topic.
SEND_MAX_RETRIES
RETRY_BACKOFF
If auto.create.topics.enable in server configuration is false, this setting has no effect.
false
MaxLoggedErrors => $number
Optional, default value is 100.
Defines maximum number of last non-fatal errors that we keep in log. Use method "nonfatal_errors" to access those errors.
The following methods are defined for the Kafka::Producer class:
Kafka::Producer
get_known_servers
Returns the list of known Kafka servers (in host:port format).
is_server_known( $server )
Returns true, if $server (host:port) is known in cluster.
$server
is_server_alive( $server )
Returns true, if successful connection is established with $server (host:port).
receive_response_to_request( $request, $compression_codec )
$request
$request is a reference to the hash representing the structure of the request.
This method encodes $request, passes it to the leader of cluster, receives reply, decodes and returns it in a form of hash reference.
WARNING:
This method should be considered private and should not be called by an end user.
In order to achieve better performance, this method does not perform arguments validation.
$compression_codec
Optional.
$compression_codec sets the required type of $messages compression, if the compression is desirable.
$messages
Supported codecs: $COMPRESSION_NONE, $COMPRESSION_GZIP, $COMPRESSION_SNAPPY.
close_connection( $server )
Closes connection with $server (defined as host:port).
close
Closes connection with all known Kafka servers.
cluster_errors
Returns a reference to a hash.
Each hash key is the identifier of the server (host:port), and the value is the last communication error with that server.
An empty hash is returned if there were no communication errors.
nonfatal_errors
Returns a reference to an array of the last non-fatal errors.
Maximum number of entries is set using MaxLoggedErrors parameter of constructor.
MaxLoggedErrors
A reference to the empty array is returned if there were no non-fatal errors or parameter MaxLoggedErrors is set to 0.
clear_nonfatals
Clears an array of the last non-fatal errors.
A reference to the empty array is returned because there are no non-fatal errors now.
When error is detected, an exception, represented by object of Kafka::Exception::Connection class, is thrown (see Kafka::Exceptions).
code and a more descriptive message provide information about exception. Consult documentation of the Kafka::Exceptions for the list of all available methods.
Here is the list of possible error messages that Kafka::Connection may produce:
Invalid argument
Invalid argument was provided to new constructor or to other method.
Can't send
Request cannot be sent to Kafka.
Can't recv
Response cannot be received from Kafka.
Can't bind
A successful TCP connection can't be established on given host and port.
Can't get metadata
Error detected during parsing of response from Kafka.
Leader not found
Failed to locate leader of Kafka cluster.
Mismatch CorrelationId
Mismatch of CorrelationId of request and response.
There are no known brokers
Failed to locate cluster broker.
Received meta data is incorrect or missing.
Debug output can be enabled by passing desired level via environment variable using one of the following ways:
PERL_KAFKA_DEBUG=1 - debug is enabled for the whole Kafka package.
PERL_KAFKA_DEBUG=1
PERL_KAFKA_DEBUG=Connection:1 - enable debug for Kafka::Connection only.
PERL_KAFKA_DEBUG=Connection:1
Kafka::Connection prints to STDERR information about non-fatal errors, re-connection attempts and such when debug level is set to 1 or higher.
STDERR
The basic operation of the Kafka package modules:
Kafka - constants and messages used by the Kafka package modules.
Kafka::Connection - interface to connect to a Kafka cluster.
Kafka::Producer - interface for producing client.
Kafka::Consumer - interface for consuming client.
Kafka::Message - interface to access Kafka message properties.
Kafka::Int64 - functions to work with 64 bit elements of the protocol on 32 bit systems.
Kafka::Protocol - functions to process messages in the Apache Kafka's Protocol.
Kafka::IO - low-level interface for communication with Kafka server.
Kafka::Exceptions - module designated to handle Kafka exceptions.
Kafka::Internals - internal constants and functions used by several package modules.
A wealth of detail about the Apache Kafka and the Kafka Protocol:
Main page at http://kafka.apache.org/
Kafka Protocol at https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
Kafka package is hosted on GitHub: https://github.com/TrackingSoft/Kafka
Sergey Gladkov, <sgladkov@trackingsoft.com>
Alexander Solovey
Jeremy Jordan
Sergiy Zuban
Vlad Marchenko
Copyright (C) 2012-2013 by TrackingSoft LLC.
This package is free software; you can redistribute it and/or modify it under the same terms as Perl itself. See perlartistic at http://dev.perl.org/licenses/artistic.html.
This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
To install Kafka, copy and paste the appropriate command in to your terminal.
cpanm
cpanm Kafka
CPAN shell
perl -MCPAN -e shell install Kafka
For more information on module installation, please visit the detailed CPAN module installation guide.