Kafka::Protocol - Functions to process messages in the Apache Kafka protocol.
This documentation refers to Kafka::Protocol version 1.07 .
Kafka::Protocol
use 5.010; use strict; use warnings; use Data::Compare; use Kafka qw( $COMPRESSION_NONE $ERROR_NO_ERROR $REQUEST_TIMEOUT $WAIT_WRITTEN_TO_LOCAL_LOG ); use Kafka::Internals qw( $PRODUCER_ANY_OFFSET ); use Kafka::Protocol qw( decode_produce_response encode_produce_request ); # a encoded produce request hex stream my $encoded = pack( q{H*}, '00000049000000000000000400000001000005dc0000000100076d79746f7069630000000100000000000000200000000000000000000000148dc795a20000ffffffff0000000648656c6c6f21' ); # a decoded produce request my $decoded = { CorrelationId => 4, ClientId => q{}, RequiredAcks => $WAIT_WRITTEN_TO_LOCAL_LOG, Timeout => $REQUEST_TIMEOUT * 100, # ms topics => [ { TopicName => 'mytopic', partitions => [ { Partition => 0, MessageSet => [ { Offset => $PRODUCER_ANY_OFFSET, MagicByte => 0, Attributes => $COMPRESSION_NONE, Key => q{}, Value => 'Hello!', }, ], }, ], }, ], }; my $encoded_request = encode_produce_request( $decoded ); say 'encoded correctly' if $encoded_request eq $encoded; # a encoded produce response hex stream $encoded = pack( q{H*}, '00000023000000040000000100076d79746f706963000000010000000000000000000000000000' ); # a decoded produce response $decoded = { CorrelationId => 4, topics => [ { TopicName => 'mytopic', partitions => [ { Partition => 0, ErrorCode => $ERROR_NO_ERROR, Offset => 0, }, ], }, ], }; my $decoded_response = decode_produce_response( \$encoded ); say 'decoded correctly' if Compare( $decoded_response, $decoded ); # more examples, see t/*_decode_encode.t
This module is not a user module.
In order to achieve better performance, functions of this module do not perform arguments validation.
The main features of the Kafka::Protocol module are:
Supports parsing the Apache Kafka protocol.
Supports Apache Kafka Requests and Responses (PRODUCE and FETCH). Within this package we currently support access to PRODUCE, FETCH, OFFSET, METADATA Requests and Responses.
Support for working with 64 bit elements of the Kafka protocol on 32 bit systems.
The following constants are available for export
$DEFAULT_APIVERSION
The default API version that will be used as fallback, if it's not possible to detect what the Kafka server supports. Only Kafka servers > 0.10.0.0 can be queried to get which API version they implements. On Kafka servers 0.8.x and 0.9.x, the protocol will default to use $DEFAULT_APIVERSION. Currently its value is '0'
$CONSUMERS_REPLICAID
According to Apache Kafka documentation: 'ReplicaId - Normal client consumers should always specify this as -1 as they have no node id.'
$NULL_BYTES_LENGTH
According to Apache Kafka documentation: 'Protocol Primitive Types: ... bytes, string - A length of -1 indicates null.'
$BAD_OFFSET
According to Apache Kafka documentation: 'Offset - When the producer is sending messages it doesn't actually know the offset and can fill in any value here it likes.'
The following functions are available for Kafka::MockProtocol module.
Kafka::MockProtocol
encode_api_versions_request( $ApiVersions_Request )
Encodes the argument and returns a reference to the encoded binary string representing a Request buffer.
This function takes the following arguments:
$ApiVersions_Request
$ApiVersions_Request is a reference to the hash representing the structure of the APIVERSIONS Request. it contains CorrelationId, ClientId (can be empty string), and ApiVersion (must be 0)
decode_api_versions_response( $bin_stream_ref )
Decodes the argument and returns a reference to the hash representing the structure of the APIVERSIONS Response.
$bin_stream_ref
$bin_stream_ref is a reference to the encoded Response buffer. The buffer must be a non-empty binary string.
encode_find_coordinator_request( $FindCoordinator_Request )
$FindCoordinator_Request
$FindCoordinator_Request is a reference to the hash representing the structure of the FINDCOORDINATOR Request. it contains CorrelationId, ClientId (can be empty string), CoordinatorKey and CoordinatorType (for version 1 of protocol)
decode_find_coordinator_response( $bin_stream_ref )
Decodes the argument and returns a reference to the hash representing the structure of the FINDCOORDINATOR Response.
encode_produce_request( $Produce_Request, $compression_codec )
$Produce_Request
$Produce_Request is a reference to the hash representing the structure of the PRODUCE Request (examples see t/*_decode_encode.t).
t/*_decode_encode.t
$compression_codec
Optional.
$compression_codec sets the required type of $messages compression, if the compression is desirable.
$messages
Supported codecs: $COMPRESSION_NONE, $COMPRESSION_GZIP, $COMPRESSION_SNAPPY, $COMPRESSION_LZ4.
NOTE: $COMPRESSION_LZ4 requires Kafka 0.10 or higher, as initial implementation of LZ4 in Kafka did not follow the standard LZ4 framing specification.
decode_produce_response( $bin_stream_ref )
Decodes the argument and returns a reference to the hash representing the structure of the PRODUCE Response (examples see t/*_decode_encode.t).
encode_fetch_request( $Fetch_Request )
$Fetch_Request
$Fetch_Request is a reference to the hash representing the structure of the FETCH Request (examples see t/*_decode_encode.t).
decode_fetch_response( $bin_stream_ref )
Decodes the argument and returns a reference to the hash representing the structure of the FETCH Response (examples see t/*_decode_encode.t).
encode_offset_request( $Offset_Request )
$Offset_Request
$Offset_Request is a reference to the hash representing the structure of the OFFSET Request (examples see t/*_decode_encode.t).
decode_offset_response( $bin_stream_ref )
Decodes the argument and returns a reference to the hash representing the structure of the OFFSET Response (examples see t/*_decode_encode.t).
encode_metadata_request( $Metadata_Request )
$Metadata_Request
$Metadata_Request is a reference to the hash representing the structure of the METADATA Request (examples see t/*_decode_encode.t).
decode_metadata_response( $bin_stream_ref )
Decodes the argument and returns a reference to the hash representing the structure of the METADATA Response (examples see t/*_decode_encode.t).
encode_offsetcommit_request( $OffsetCommit_Request )
$OffsetCommit_Request
$OffsetCommit_Request is a reference to the hash representing the structure of the OffsetCommit Request (examples see t/*_decode_encode.t).
decode_offsetcommit_response( $bin_stream_ref )
Decodes the argument and returns a reference to the hash representing the structure of the OFFSETCOMMIT Response (examples see t/*_decode_encode.t).
encode_offsetfetch_request( $OffsetFetch_Request )
$OffsetFetch_Request
$OffsetFetch_Request is a reference to the hash representing the structure of the OffsetFetch Request (examples see t/*_decode_encode.t).
decode_offsetfetch_response( $bin_stream_ref )
Decodes the argument and returns a reference to the hash representing the structure of the OFFSETFETCH Response (examples see t/*_decode_encode.t).
The basic operation of the Kafka package modules:
Kafka - constants and messages used by the Kafka package modules.
Kafka::Connection - interface to connect to a Kafka cluster.
Kafka::Producer - interface for producing client.
Kafka::Consumer - interface for consuming client.
Kafka::Message - interface to access Kafka message properties.
Kafka::Int64 - functions to work with 64 bit elements of the protocol on 32 bit systems.
Kafka::Protocol - functions to process messages in the Apache Kafka's Protocol.
Kafka::IO - low-level interface for communication with Kafka server.
Kafka::Exceptions - module designated to handle Kafka exceptions.
Kafka::Internals - internal constants and functions used by several package modules.
A wealth of detail about the Apache Kafka and the Kafka Protocol:
Main page at http://kafka.apache.org/
Kafka Protocol at https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
Kafka package is hosted on GitHub: https://github.com/TrackingSoft/Kafka
Sergey Gladkov
Please use GitHub project link above to report problems or contact authors.
Alexander Solovey
Jeremy Jordan
Sergiy Zuban
Vlad Marchenko
Damien Krotkine
Greg Franklin
Copyright (C) 2012-2017 by TrackingSoft LLC.
This package is free software; you can redistribute it and/or modify it under the same terms as Perl itself. See perlartistic at http://dev.perl.org/licenses/artistic.html.
This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
To install Kafka, copy and paste the appropriate command in to your terminal.
cpanm
cpanm Kafka
CPAN shell
perl -MCPAN -e shell install Kafka
For more information on module installation, please visit the detailed CPAN module installation guide.