The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.

NAME

Kafka::Producer::Avro - Avro message producer for Apache Kafka.

SYNOPSIS

  use Kafka::Connection;
  use Kafka::Producer::Avro;
  
  my $connection = Kafka::Connection->new( host => 'localhost' );
  
  my $producer = Kafka::Producer::Avro->new( Connection => $connection , SchemaRegistry => Confluent::SchemaRegistry->new() );
  
  # Set Avro schema for message key (valid JSON-string)
  my $key_schema = <<KEY_SCHEMA;
        {
                "type": "long",
                "name": "_id"
        }
  KEY_SCHEMA
  # Set Avro schema for message value (payload) (valid JSON-string)
  my $value_schema = <<VALUE_SCHEMA;
        {
                "type": "record",
                "name": "myrecord",
                "fields": [
                        {
                                "name": "f1",
                                "type": "string"
                        }
                ]
        }
  VALUE_SCHEMA
  
  # Sending a single message
  my $response = $producer->send(
        'mytopic',          # topic
        0,                  # partition
        'Single message',   # message
        undef,              # key
        undef,              # compression_codec
        undef,              # timestamps
        $key_schema,        # key_schema
        $value_schema       # value_schema
  );
  
  # Sending a series of messages
  $response = $producer->send(
        'mytopic',          # topic
        0,                  # partition
        [                   # messages
                'The first message',
                'The second message',
                'The third message',
        ],
        undef,              # key(s)
        undef,              # compression_codec
        undef,              # timestamp(s)
        $key_schema,        # key_schema
        $value_schema       # value_schema
  );
  
  # ...or use named parameters
  
  $producer->send(
        topic             => $topic,
        partition         => $partition,
        messages          => $messages,
        keys              => $keys,
        compression_codec => $compression_codec,
        timestamps        => $timestamps,
        key_schema        => $key_schema,
        value_schema      => $value_schema
  );    
  
  # Closes the producer and cleans up
  undef $producer;
  $connection->close;
  undef $connection;

DESCRIPTION

Kafka::Producer::Avro main feature is to provide object-oriented API to produce messages according to Confluent SchemaRegistry and Avro serialization.

Kafka::Producer::Avro inerhits from and extends Kafka::Producer.

INSTALL

Installation of Kafka::Producer::Avro is a canonical:

  perl Makefile.PL
  make
  make test
  make install

TEST NOTES

Tests are focused on verifying Avro-formatted messages and theirs interactions with Confluent Schema Registry and are intended to extend Kafka::Producer test suite.

They expect that in the target are listening Apache Kafka and Schema Registry services, respectively listening on localhost:9092 and http://localhost:8081.

You can alternatively set a different URLs by exporting the following environment variable:

KAFKA_HOST
KAFKA_PORT
CONFLUENT_SCHEMA_REGISTY_URL

For example:

  export KAFKA_HOST=my-kafka-host.my-domain.org
  export FALFA_PORT=9092
  export CONFLUENT_SCHEMA_REGISTY_URL=http://my-schema-registry-host.my-domain.org

USAGE

CONSTRUCTOR

new

Creates new producer client object.

new() takes arguments in key-value pairs as described in Kafka::Producer from which it inherits.

In addition, takes in the following arguments:

SchemaRegistry => $schema_registry (mandatory)

Is a Confluent::SchemaRegistry instance.

METHODS

The following methods are defined for the Kafka::Avro::Producer class:

schema_registry()

Returns the Confluent::SchemaRegistry instance supplied to the construcor.

get_error()

Returns a string containing last error message.

send( $topic, $partition, $messages, $keys, $compression_codec, $timestamps, $key_schema, $value_schema )

send( %named_params )

Sends Avro-formatted messages on a Kafka::Connection object.

Returns a non-blank value (a reference to a hash with server response description) if the message is successfully sent.

In order to handle Avro format, Kafka::Producer|Kafka::Producer send() method is extended with two more positional arguments, $key_schema and $value_schema:

  $producer->send(
        $topic,             # scalar 
        $partition,         # scalar
        $messages,          # scalar | array
        $keys,              # (optional) undef | scalar | array
        $compression_codec, # (optional) undef | scalar
        $timestamps,        # (optional) undef | scalar | array
        $key_schema,        # (optional) undef | JSON-string
        $value_schema       # (optional) undef | JSON-string
  );

Both $key_schema and $value_schema parameters are optional and must provide JSON strings that represent Avro schemas to use to validate and serialize key(s) and value(s).

These schemas are validated against $schema_registry and, if compliant, they are added to the registry under the $topic+'key' or $topic+'value' Schema Registry's subjects.

If an expected schema isn't provided, latest version from Schema Registry is used accordingly to the (topic + key/value) subject.

Alternatively, for ease of use, the send() method may be also used by suggesting named parameters:

  $producer->send(
        topic             => $topic,             # scalar 
        partition         => $partition,         # scalar
        messages          => $messages,          # scalar | array
        keys              => $keys,              # (optional) undef | scalar | array
        compression_codec => $compression_codec, # (optional) undef | scalar
        timestamps        => $timestamps,        # (optional) undef | scalar | array
        key_schema        => $key_schema,        # (optional) undef | JSON-string
        value_schema      => $value_schema       # (optional) undef | JSON-string
  );    

bulk_send( %params )

Similar to send but uses bulks to avoid memory leaking.

Extra named parameters are expected:

size => $size

The size of the bulk

on_before_send_bulk => sub {...} (optional)

A code block that will be executed before the sending of each bulk.

The block will receive the following positional parameters:

$bulk_num the number of the bulk
$bulk_messages the number of messages in the bulk
$bulk_keys the number of keys in the bulk
$index_from the absolute index of the first message in the bulk
$index_to the absolute index of the last message in the bulk
on_after_send_bulk => sub {...} (optional)

A code block that will be executed after the sending of each bulk.

The block will receive the following positional parameters:

$sent the number of sent messages in the bulk
$total_sent the total number of messages sent
on_init => sub {...} (optional)

A code block that will be executed only once before at the beginning of the cycle.

The block will receive the following positional parameters:

$to_send the total number of messages to send
$bulk_size the size of the bulk
on_complete => sub {...} (optional)

A code block that will be executed only once after the end of the cycle.

The block will receive the following positional parameters:

$to_send the total number of messages to send
$total_sent the total number of messages sent
$errors the number bulks sent with errors
on_send_error => sub {...} (optional)

A code block that will be executed when a bulk registers an error.

AUTHOR

Alvaro Livraghi, <alvarol@cpan.org>

CONTRIBUTE

https://github.com/alivraghi/Kafka-Producer-Avro

BUGS

Please use GitHub project link above to report problems or contact authors.

COPYRIGHT AND LICENSE

Copyright 2018 by Alvaro Livraghi

This program is free software; you can redistribute it and/or modify it under the same terms as Perl itself.