Kafka::Producer::Avro - Avro message producer for Apache Kafka.
use Kafka::Connection; use Kafka::Producer::Avro; my $connection = Kafka::Connection->new( host => 'localhost' ); my $producer = Kafka::Producer::Avro->new( Connection => $connection , SchemaRegistry => Confluent::SchemaRegistry->new() ); # Set Avro schema for message key (valid JSON-string) my $key_schema = <<KEY_SCHEMA; { "type": "long", "name": "_id" } KEY_SCHEMA # Set Avro schema for message value (payload) (valid JSON-string) my $value_schema = <<VALUE_SCHEMA; { "type": "record", "name": "myrecord", "fields": [ { "name": "f1", "type": "string" } ] } VALUE_SCHEMA # Sending a single message my $response = $producer->send( 'mytopic', # topic 0, # partition 'Single message', # message undef, # key undef, # compression_codec undef, # timestamps $key_schema, # key_schema $value_schema # value_schema ); # Sending a series of messages $response = $producer->send( 'mytopic', # topic 0, # partition [ # messages 'The first message', 'The second message', 'The third message', ], undef, # key(s) undef, # compression_codec undef, # timestamp(s) $key_schema, # key_schema $value_schema # value_schema ); # ...or use named parameters $producer->send( topic => $topic, partition => $partition, messages => $messages, keys => $keys, compression_codec => $compression_codec, timestamps => $timestamps, key_schema => $key_schema, value_schema => $value_schema ); # Closes the producer and cleans up undef $producer; $connection->close; undef $connection;
Kafka::Producer::Avro main feature is to provide object-oriented API to produce messages according to Confluent SchemaRegistry and Avro serialization.
Kafka::Producer::Avro
Kafka::Producer::Avro inerhits from and extends Kafka::Producer.
Installation of Kafka::Producer::Avro is a canonical:
perl Makefile.PL make make test make install
Tests are focused on verifying Avro-formatted messages and theirs interactions with Confluent Schema Registry and are intended to extend Kafka::Producer test suite.
Kafka::Producer
They expect that in the target are listening Apache Kafka and Schema Registry services, respectively listening on localhost:9092 and http://localhost:8081.
localhost:9092
http://localhost:8081
You can alternatively set a different URLs by exporting the following environment variable:
KAFKA_HOST
KAFKA_PORT
CONFLUENT_SCHEMA_REGISTY_URL
For example:
export KAFKA_HOST=my-kafka-host.my-domain.org export FALFA_PORT=9092 export CONFLUENT_SCHEMA_REGISTY_URL=http://my-schema-registry-host.my-domain.org
new
Creates new producer client object.
new() takes arguments in key-value pairs as described in Kafka::Producer from which it inherits.
new()
In addition, takes in the following arguments:
SchemaRegistry => $schema_registry
Is a Confluent::SchemaRegistry instance.
The following methods are defined for the Kafka::Avro::Producer class:
Kafka::Avro::Producer
schema_registry
Returns the Confluent::SchemaRegistry instance supplied to the construcor.
get_error
Returns a string containing last error message.
send( $topic, $partition, $messages, $keys, $compression_codec, $timestamps, $key_schema, $value_schema )
send( %named_params )
Sends Avro-formatted messages on a Kafka::Connection object.
Returns a non-blank value (a reference to a hash with server response description) if the message is successfully sent.
In order to handle Avro format, Kafka::Producer|Kafka::Producer send() method is extended with two more positional arguments, $key_schema and $value_schema:
Kafka::Producer|Kafka::Producer
send()
$key_schema
$value_schema
$producer->send( $topic, # scalar $partition, # scalar $messages, # scalar | array $keys, # (optional) undef | scalar | array $compression_codec, # (optional) undef | scalar $timestamps, # (optional) undef | scalar | array $key_schema, # (optional) undef | JSON-string $value_schema # (optional) undef | JSON-string );
Both $key_schema and $value_schema parameters are optional and must provide JSON strings that represent Avro schemas to use to validate and serialize key(s) and value(s).
These schemas are validated against $schema_registry and, if compliant, they are added to the registry under the $topic+'key' or $topic+'value' Schema Registry's subjects.
$schema_registry
$topic+'key'
$topic+'value'
If an expected schema isn't provided, latest version from Schema Registry is used accordingly to the (topic + key/value) subject.
Alternatively, for ease of use, the send() method may be also used by suggesting named parameters:
$producer->send( topic => $topic, # scalar partition => $partition, # scalar messages => $messages, # scalar | array keys => $keys, # (optional) undef | scalar | array compression_codec => $compression_codec, # (optional) undef | scalar timestamps => $timestamps, # (optional) undef | scalar | array key_schema => $key_schema, # (optional) undef | JSON-string value_schema => $value_schema # (optional) undef | JSON-string );
bulk_send( %params )
Similar to send but uses bulks to avoid memory leaking.
send
Extra named parameters are expected:
size => $size
The size of the bulk
on_before_send_bulk => sub {...}
A code block that will be executed before the sending of each bulk.
The block will receive the following positional parameters:
$bulk_num
$bulk_messages
$bulk_keys
$index_from
$index_to
on_after_send_bulk => sub {...}
A code block that will be executed after the sending of each bulk.
$sent
$total_sent
on_init => sub {...}
A code block that will be executed only once before at the beginning of the cycle.
$to_send
$bulk_size
on_complete => sub {...}
A code block that will be executed only once after the end of the cycle.
$errors
on_send_error => sub {...}
A code block that will be executed when a bulk registers an error.
Alvaro Livraghi, <alvarol@cpan.org>
https://github.com/alivraghi/Kafka-Producer-Avro
Please use GitHub project link above to report problems or contact authors.
Copyright 2018 by Alvaro Livraghi
This program is free software; you can redistribute it and/or modify it under the same terms as Perl itself.
To install Kafka::Producer::Avro, copy and paste the appropriate command in to your terminal.
cpanm
cpanm Kafka::Producer::Avro
CPAN shell
perl -MCPAN -e shell install Kafka::Producer::Avro
For more information on module installation, please visit the detailed CPAN module installation guide.