NAME

Net::Kafka::Producer::Avro - Apache Kafka message producer based on librdkafka, Avro serialization and Confluent Schema Registry validation.

SYNOPSIS

use Net::Kafka::Producer::Avro;
use Confluent::SchemaRegistry;
use AnyEvent;
use JSON;


my $producer = Net::Kafka::Producer::Avro->new(
  'bootstrap.servers' => 'localhost:9092',
  'schema-registry'  => Confluent::SchemaRegistry->new(), # defaults to http://localhost:8081
  'compression.codec' => 'gzip',  # optional, one of: 'none' (def.), 'gzip', 'snappy', 'lz4', 'zstd'
  'log_level' => 0, # suppress librdkafka internal logging
  'error_cb' => sub {
    my ($self, $err, $msg) = @_;
    die "Connection error:\n\t- err: " . $err . "\n\t- msg: " . $msg . "\n";
  }
);

# creates the header object (if you need to add headers to the message)
my $headers = Net::Kafka::Headers->new();
$headers->add('my-header-1', 'foo');
$headers->add('my-header-2', 'bar');

my $condvar = AnyEvent->condvar;

my $promise = $producer->produce(
  topic          => 'mytopic',
  partition      => 0,
  key            => 1000,
  key_schema     => to_json(
                      {
                        name => 'id',
                        type => 'long'
                      }
                    ),
  payload        => {
                      id  => 1210120,
                      f1  => 'text message'
                    },
  payload_schema => to_json(
                      {
                        type => 'record',
                        name => 'myrecord',
                        fields => [
                          {
                            name => 'id',
                            type => 'long'
                          },
                          {
                            name => 'f1',
                            type => 'string'
                          }
                        ]
                      }
                    ),
  headers        => $headers
);

die "Error requesting message production: " . $producer->get_error() . "\n"
  unless $promise;

$promise->then(
  sub {
    my $delivery_report = shift;
    $condvar->send; # resolve the promise
    print "Message delivered with offset " . $delivery_report->{offset};
  }, 
  sub {
    my $error = shift;
    $condvar->send; # resolve the promise
    die "Unable to produce message: " . $error->{error} . ", code: " . $error->{code};
  }
);

$condvar->recv; # wait for the promise resolution

print "Message produced", "\n";

DESCRIPTION

Net::Kafka::Producer::Avro main goal is to provide object-oriented API to produce Avro-serialized messages according to Confluent SchemaRegistry.

Net::Kafka::Producer::Avro inerhits from and extends Net::Kafka::Producer module.

INSTALL

Installation of Net::Kafka::Producer::Avro is a canonical:

perl Makefile.PL
make
make test
make install

TESTING TROUBLESHOOTING

Tests are focused on verifying Avro-formatted messages and theirs interactions with Confluent Schema Registry and are intended to extend the Net::Kafka::Producer's test suite.

It's expected that a local Apache Kafka and Schema Registry services are listening on localhost:9092 and http://localhost:8081.

You can either set different endpoints by exporting the following environment variables:

KAFKA_HOST
KAFKA_PORT
CONFLUENT_SCHEMA_REGISTY_URL

For example:

export KAFKA_HOST=my-kafka-host.my-domain.org
export KAFKA_PORT=9092
export CONFLUENT_SCHEMA_REGISTY_URL=http://my-schema-registry-host.my-domain.org

USAGE

CONSTRUCTOR

new

Creates a message producer.

new() method expects the same arguments set as the Net::Kafka::Producer parent constructor.

In addition, takes in the following mandatory argument:

SchemaRegistry => $schema_registry (mandatory)

Is a Confluent::SchemaRegistry instance.

METHODS

The following methods are defined for the Net::Kafka::Producer::Avro class:

schema_registry()

Returns the Confluent::SchemaRegistry instance supplied to the construcor.

get_error()

Returns a string containing last error message.

produce( %named_params )

Sends Avro-formatted key/message pairs.

According to Net::Kafka::Producer, returns a promise value if the message was successfully sent.

In order to handle Avro format, the Net::Kafka::Producer|Net::Kafka::Producer's produce() method has been extended with two more arguments, key_schema and payload_schema:

$producer->produce(
	topic             => $topic,             # scalar 
	partition         => $partition,         # scalar
	key_schema        => $key_schema,        # (optional) scalar representing a JSON string of the Avro schema to use for the key
	key               => $key,               # (optional) scalar | hashref
	payload_schema    => $payload_schema,    # (optional) scalar representing a JSON string of the Avro schema to use for the payload
	payload           => $payload,           # scalar | hashref
	timestamp         => $timestamp,         # (optional) scalar representing milliseconds since epoch
	headers           => $headers,           # (optional) Net::Kafka::Headers object
	# ...other params accepted by Net::Kafka::Producer's produce() method
);    

Both $key_schema and $payload_schema parameters are optional and must provide a JSON strings representing the Avro schemas to use for validating and serializing key and payload.

These schemas will be validated against the $schema_registry supplied to the new method and, if compliant, will be added to the registry under the $topic+'key' or $topic+'value' Schema Registry subjects.

If a schema isn't provided, the latest version from Schema Registry will be used accordingly to the (topic + key/value) subject.

AUTHOR

Alvaro Livraghi, <alvarol@cpan.org>

CONTRIBUTE

https://github.com/alivraghi/Net-Kafka-Producer-Avro

BUGS

Please use GitHub project link above to report problems or contact authors.

COPYRIGHT AND LICENSE

Copyright 2026 by Alvaro Livraghi

This program is free software; you can redistribute it and/or modify it under the same terms as Perl itself.