The London Perl and Raku Workshop takes place on 26th Oct 2024. If your company depends on Perl, please consider sponsoring and/or attending.

NAME

Kafka::Producer::Avro - Avro message producer for Apache Kafka.

SYNOPSIS

use Kafka::Connection;
use Kafka::Producer::Avro;

my $connection = Kafka::Connection->new( host => 'localhost' );

my $producer = Kafka::Producer::Avro->new( Connection => $connection , SchemaRegistry => Confluent::SchemaRegistry->new() );

# Set Avro schema for message key (valid JSON-string)
my $key_schema = <<KEY_SCHEMA;
	{
		"type": "long",
		"name": "_id"
	}
KEY_SCHEMA
# Set Avro schema for message value (payload) (valid JSON-string)
my $value_schema = <<VALUE_SCHEMA;
	{
		"type": "record",
		"name": "myrecord",
		"fields": [
			{
				"name": "f1",
				"type": "string"
			}
		]
	}
VALUE_SCHEMA

# Sending a single message
my $response = $producer->send(
	'mytopic',          # topic
	0,                  # partition
	'Single message',   # message
	undef,              # key
	undef,              # compression_codec
	undef,              # timestamps
	$key_schema,        # key_schema
	$value_schema       # value_schema
);

# Sending a series of messages
$response = $producer->send(
	'mytopic',          # topic
	0,                  # partition
	[                   # messages
		'The first message',
		'The second message',
		'The third message',
	],
	undef,              # key(s)
	undef,              # compression_codec
	undef,              # timestamp(s)
	$key_schema,        # key_schema
	$value_schema       # value_schema
);

# ...or use named parameters

$producer->send(
	topic             => $topic,
	partition         => $partition,
	messages          => $messages,
	keys              => $keys,
	compression_codec => $compression_codec,
	timestamps        => $timestamps,
	key_schema        => $key_schema,
	value_schema      => $value_schema
);    

# Closes the producer and cleans up
undef $producer;
$connection->close;
undef $connection;

DESCRIPTION

Kafka::Producer::Avro main feature is to provide object-oriented API to produce messages according to Confluent SchemaRegistry and Avro serialization.

Kafka::Producer::Avro inerhits from and extends Kafka::Producer.

INSTALL

Installation of Kafka::Producer::Avro is a canonical:

perl Makefile.PL
make
make test
make install

TEST NOTES

Tests are focused on verifying Avro-formatted messages and theirs interactions with Confluent Schema Registry and are intended to extend Kafka::Producer test suite.

They expect that in the target are listening Apache Kafka and Schema Registry services, respectively listening on localhost:9092 and http://localhost:8081.

You can alternatively set a different URLs by exporting the following environment variable:

KAFKA_HOST
KAFKA_PORT
CONFLUENT_SCHEMA_REGISTY_URL

For example:

export KAFKA_HOST=my-kafka-host.my-domain.org
export FALFA_PORT=9092
export CONFLUENT_SCHEMA_REGISTY_URL=http://my-schema-registry-host.my-domain.org

USAGE

CONSTRUCTOR

new

Creates new producer client object.

new() takes arguments in key-value pairs as described in Kafka::Producer from which it inherits.

In addition, takes in the following arguments:

SchemaRegistry => $schema_registry (mandatory)

Is a Confluent::SchemaRegistry instance.

METHODS

The following methods are defined for the Kafka::Avro::Producer class:

schema_registry()

Returns the Confluent::SchemaRegistry instance supplied to the construcor.

get_error()

Returns a string containing last error message.

send( $topic, $partition, $messages, $keys, $compression_codec, $timestamps, $key_schema, $value_schema )

send( %named_params )

Sends Avro-formatted messages on a Kafka::Connection object.

Returns a non-blank value (a reference to a hash with server response description) if the message is successfully sent.

In order to handle Avro format, Kafka::Producer|Kafka::Producer send() method is extended with two more positional arguments, $key_schema and $value_schema:

$producer->send(
	$topic,             # scalar 
	$partition,         # scalar
	$messages,          # scalar | array
	$keys,              # (optional) undef | scalar | array
	$compression_codec, # (optional) undef | scalar
	$timestamps,        # (optional) undef | scalar | array
	$key_schema,        # (optional) undef | JSON-string
	$value_schema       # (optional) undef | JSON-string
);

Both $key_schema and $value_schema parameters are optional and must provide JSON strings that represent Avro schemas to use to validate and serialize key(s) and value(s).

These schemas are validated against $schema_registry and, if compliant, they are added to the registry under the $topic+'key' or $topic+'value' Schema Registry's subjects.

If an expected schema isn't provided, latest version from Schema Registry is used accordingly to the (topic + key/value) subject.

Alternatively, for ease of use, the send() method may be also used by suggesting named parameters:

$producer->send(
	topic             => $topic,             # scalar 
	partition         => $partition,         # scalar
	messages          => $messages,          # scalar | array
	keys              => $keys,              # (optional) undef | scalar | array
	compression_codec => $compression_codec, # (optional) undef | scalar
	timestamps        => $timestamps,        # (optional) undef | scalar | array
	key_schema        => $key_schema,        # (optional) undef | JSON-string
	value_schema      => $value_schema       # (optional) undef | JSON-string
);    

bulk_send( %params )

Similar to send but uses bulks to avoid memory leaking.

Extra named parameters are expected:

size => $size

The size of the bulk

on_before_send_bulk => sub {...} (optional)

A code block that will be executed before the sending of each bulk.

The block will receive the following positional parameters:

$bulk_num the number of the bulk
$bulk_messages the number of messages in the bulk
$bulk_keys the number of keys in the bulk
$index_from the absolute index of the first message in the bulk
$index_to the absolute index of the last message in the bulk
on_after_send_bulk => sub {...} (optional)

A code block that will be executed after the sending of each bulk.

The block will receive the following positional parameters:

$sent the number of sent messages in the bulk
$total_sent the total number of messages sent
on_init => sub {...} (optional)

A code block that will be executed only once before at the beginning of the cycle.

The block will receive the following positional parameters:

$to_send the total number of messages to send
$bulk_size the size of the bulk
on_complete => sub {...} (optional)

A code block that will be executed only once after the end of the cycle.

The block will receive the following positional parameters:

$to_send the total number of messages to send
$total_sent the total number of messages sent
$errors the number bulks sent with errors
on_send_error => sub {...} (optional)

A code block that will be executed when a bulk registers an error.

AUTHOR

Alvaro Livraghi, <alvarol@cpan.org>

CONTRIBUTE

https://github.com/alivraghi/Kafka-Producer-Avro

BUGS

Please use GitHub project link above to report problems or contact authors.

COPYRIGHT AND LICENSE

Copyright 2018 by Alvaro Livraghi

This program is free software; you can redistribute it and/or modify it under the same terms as Perl itself.