NAME
Net::Kafka::Producer::Avro - Apache Kafka message producer based on librdkafka, Avro serialization and Confluent Schema Registry validation.
SYNOPSIS
use Net::Kafka::Producer::Avro;
use Confluent::SchemaRegistry;
use AnyEvent;
use JSON;
my $producer = Net::Kafka::Producer::Avro->new(
'bootstrap.servers' => 'localhost:9092',
'schema-registry' => Confluent::SchemaRegistry->new(), # defaults to http://localhost:8081
'compression.codec' => 'gzip', # optional, one of: 'none' (def.), 'gzip', 'snappy', 'lz4', 'zstd'
'log_level' => 0, # suppress librdkafka internal logging
'error_cb' => sub {
my ($self, $err, $msg) = @_;
die "Connection error:\n\t- err: " . $err . "\n\t- msg: " . $msg . "\n";
}
);
# creates the header object (if you need to add headers to the message)
my $headers = Net::Kafka::Headers->new();
$headers->add('my-header-1', 'foo');
$headers->add('my-header-2', 'bar');
my $condvar = AnyEvent->condvar;
my $promise = $producer->produce(
topic => 'mytopic',
partition => 0,
key => 1000,
key_schema => to_json(
{
name => 'id',
type => 'long'
}
),
payload => {
id => 1210120,
f1 => 'text message'
},
payload_schema => to_json(
{
type => 'record',
name => 'myrecord',
fields => [
{
name => 'id',
type => 'long'
},
{
name => 'f1',
type => 'string'
}
]
}
),
headers => $headers
);
die "Error requesting message production: " . $producer->get_error() . "\n"
unless $promise;
$promise->then(
sub {
my $delivery_report = shift;
$condvar->send; # resolve the promise
print "Message delivered with offset " . $delivery_report->{offset};
},
sub {
my $error = shift;
$condvar->send; # resolve the promise
die "Unable to produce message: " . $error->{error} . ", code: " . $error->{code};
}
);
$condvar->recv; # wait for the promise resolution
print "Message produced", "\n";
DESCRIPTION
Net::Kafka::Producer::Avro main goal is to provide object-oriented API to produce Avro-serialized messages according to Confluent SchemaRegistry.
Net::Kafka::Producer::Avro inerhits from and extends Net::Kafka::Producer module.
INSTALL
Installation of Net::Kafka::Producer::Avro is a canonical:
perl Makefile.PL
make
make test
make install
TESTING TROUBLESHOOTING
Tests are focused on verifying Avro-formatted messages and theirs interactions with Confluent Schema Registry and are intended to extend the Net::Kafka::Producer's test suite.
It's expected that a local Apache Kafka and Schema Registry services are listening on localhost:9092 and http://localhost:8081.
You can either set different endpoints by exporting the following environment variables:
KAFKA_HOSTKAFKA_PORTCONFLUENT_SCHEMA_REGISTY_URL
For example:
export KAFKA_HOST=my-kafka-host.my-domain.org
export KAFKA_PORT=9092
export CONFLUENT_SCHEMA_REGISTY_URL=http://my-schema-registry-host.my-domain.org
USAGE
CONSTRUCTOR
new
Creates a message producer.
new() method expects the same arguments set as the Net::Kafka::Producer parent constructor.
In addition, takes in the following mandatory argument:
SchemaRegistry => $schema_registry(mandatory)-
Is a Confluent::SchemaRegistry instance.
METHODS
The following methods are defined for the Net::Kafka::Producer::Avro class:
schema_registry()
Returns the Confluent::SchemaRegistry instance supplied to the construcor.
get_error()
Returns a string containing last error message.
produce( %named_params )
Sends Avro-formatted key/message pairs.
According to Net::Kafka::Producer, returns a promise value if the message was successfully sent.
In order to handle Avro format, the Net::Kafka::Producer|Net::Kafka::Producer's produce() method has been extended with two more arguments, key_schema and payload_schema:
$producer->produce(
topic => $topic, # scalar
partition => $partition, # scalar
key_schema => $key_schema, # (optional) scalar representing a JSON string of the Avro schema to use for the key
key => $key, # (optional) scalar | hashref
payload_schema => $payload_schema, # (optional) scalar representing a JSON string of the Avro schema to use for the payload
payload => $payload, # scalar | hashref
timestamp => $timestamp, # (optional) scalar representing milliseconds since epoch
headers => $headers, # (optional) Net::Kafka::Headers object
# ...other params accepted by Net::Kafka::Producer's produce() method
);
Both $key_schema and $payload_schema parameters are optional and must provide a JSON strings representing the Avro schemas to use for validating and serializing key and payload.
These schemas will be validated against the $schema_registry supplied to the new method and, if compliant, will be added to the registry under the $topic+'key' or $topic+'value' Schema Registry subjects.
If a schema isn't provided, the latest version from Schema Registry will be used accordingly to the (topic + key/value) subject.
AUTHOR
Alvaro Livraghi, <alvarol@cpan.org>
CONTRIBUTE
https://github.com/alivraghi/Net-Kafka-Producer-Avro
BUGS
Please use GitHub project link above to report problems or contact authors.
COPYRIGHT AND LICENSE
Copyright 2026 by Alvaro Livraghi
This program is free software; you can redistribute it and/or modify it under the same terms as Perl itself.