NAME
EV::Nats - High-performance asynchronous NATS client using EV
SYNOPSIS
use EV;
use EV::Nats;
my $nats = EV::Nats->new(
host => '127.0.0.1',
port => 4222,
reconnect => 1,
on_error => sub { warn "nats: $_[0]\n" },
on_connect => sub { warn "connected\n" },
);
# Subscribe (plain or queue group)
my $sid = $nats->subscribe('foo.>', sub {
my ($subject, $payload, $reply, $headers) = @_;
print "[$subject] $payload\n";
});
$nats->subscribe('work.>', sub { ... }, 'workers');
# Publish (fire-and-forget) and headered publish
$nats->publish('foo.bar', 'hello world');
$nats->hpublish('foo.bar', "NATS/1.0\r\nX-Trace: 42\r\n\r\n", 'body');
# Request / reply
$nats->request('service.echo', 'ping', sub {
my ($response, $err) = @_;
die $err if $err;
print "reply: $response\n";
}, 5000); # 5s timeout
$nats->unsubscribe($sid);
EV::run;
DESCRIPTION
EV::Nats is an async NATS client that implements the protocol directly in XS on top of EV. There is no external C library dependency.
Protocol
Full NATS client protocol (PUB, SUB, UNSUB, MSG, HMSG, PING/PONG), including headered publish/receive, wildcard subjects (*, >), queue groups, and request/reply with an automatic shared inbox subscription.
Connectivity
TCP and Unix-domain sockets; TCP keepalive; connect timeout; auto reconnect with exponential backoff and jitter; subscription and auto-unsub state restored on reconnect; cluster failover from INFO connect_urls; lame-duck-mode (leaf node graceful shutdown) callback; graceful drain.
Auth
Token, user/pass, NKey/JWT (Ed25519 via OpenSSL).
TLS
Optional, auto-detected at build time. STARTTLS-style upgrade after INFO; full hostname verification (DNS or IP literal) by default; opt-out tls_skip_verify; custom CA via tls_ca_file.
Performance
Write coalescing via ev_prepare (one write() per loop iteration); O(1) subscription lookup; per-publish allocation-free fast path; explicit batch mode for tight loops; per-connection stats counters.
Higher-level APIs
EV::Nats::JetStream, EV::Nats::KV, EV::Nats::ObjectStore.
Note: DNS resolution via getaddrinfo is blocking. Use numeric IP addresses for latency-sensitive applications.
METHODS
new(%options)
Create an EV::Nats instance. If host or path is supplied, connection is initiated immediately and the on_connect callback fires once the CONNECT/PONG handshake completes.
my $nats = EV::Nats->new(
host => '127.0.0.1',
port => 4222,
reconnect => 1,
on_error => sub { warn "nats: $_[0]\n" },
on_connect => sub { warn "ready\n" },
);
Connection options
- host => Str
-
Server hostname (numeric IP recommended; see "CAVEATS"). When set, connection starts immediately.
- port => Int (default 4222)
-
Server port.
- path => Str
-
Unix-domain socket path. Mutually exclusive with
host. - connect_timeout => Int (ms; 0 = none)
-
How long to wait for the TCP/TLS handshake before giving up.
- keepalive => Int (seconds)
-
If set, enables
SO_KEEPALIVEwith this idle interval. - priority => Int (-2 .. +2)
-
EV watcher priority for the I/O watchers on this connection.
- loop => EV::Loop (default
EV::default_loop) -
The EV loop to attach watchers to.
- name => Str
-
Client name advertised in CONNECT.
Auth options
- user => Str / pass => Str
-
Username/password authentication. JSON-escaped in CONNECT.
- token => Str
-
Token authentication.
- nkey_seed => Str
-
NATS NKey seed (the
SU...form). Requires the build to have OpenSSL (EV::Nats::HAS_NKEY). - jwt => Str
-
User JWT, paired with
nkey_seedfor decentralized auth. See also "creds_file". - tls => Bool / tls_ca_file => Str / tls_skip_verify => Bool
-
See "tls" for details.
Protocol options
- verbose => Bool (default 0)
-
Request
+OKacknowledgments after each command. - pedantic => Bool (default 0)
-
Server-side strict subject checking.
- echo => Bool (default 1)
-
Receive messages this client itself publishes.
- no_responders => Bool (default 0)
-
Ask the server to send a 503 status reply when a request has no responders, surfaced as the
"no responders"error inrequest. - ping_interval => Int (ms, default 120000; 0 = disabled)
-
Client-initiated PING interval for keep-alive.
- max_pings_outstanding => Int (default 2)
-
Maximum unacked PINGs before the connection is declared stale.
Reconnect options
- reconnect => Bool (default 0)
-
Enable automatic reconnection.
- reconnect_delay => Int (ms, default 2000)
-
Initial delay between reconnect attempts; subsequent attempts use exponential backoff with jitter, capped by
max_reconnect_delay. - max_reconnect_delay => Int (ms, default 30000)
-
Upper bound on the backoff delay.
- max_reconnect_attempts => Int (default 60; 0 = unlimited)
-
Give up after this many consecutive failures.
Callback options
All callbacks fire on the EV loop, never inline.
- on_connect => sub { }
-
Called after the CONNECT/PONG handshake completes.
- on_disconnect => sub { }
-
Called when the connection drops, before any auto-reconnect attempt.
- on_error => sub { my ($err) = @_ }
-
Receives a string. If unset, errors
croak. - on_lame_duck => sub { }
-
Called once when the server signals lame-duck-mode shutdown via INFO
ldm:true. - on_slow_consumer => sub { my ($pending_bytes) = @_ }
-
See "slow_consumer".
connect($host, [$port])
Initiate a TCP connection. Port defaults to 4222. Croaks if already connected or in the middle of connecting; otherwise returns immediately and signals completion via on_connect.
connect_unix($path)
Initiate a Unix-domain-socket connection. Same async semantics as "connect".
disconnect
Cancel any pending reconnect, drop queued writes, close the socket, and fire on_disconnect. intentional_disconnect is set so no auto-reconnect is scheduled. For a clean shutdown that flushes pending writes first, see "drain".
is_connected
True if the CONNECT/PONG handshake has completed and no disconnect or reconnect is in progress.
publish($subject, [$payload], [$reply_to])
Publish a message. Alias: pub.
$nats->publish('foo', 'hello');
$nats->publish('foo', 'hello', 'reply.subject');
hpublish($subject, $headers, [$payload], [$reply_to])
Publish with headers. Alias: hpub.
$nats->hpublish('foo', "NATS/1.0\r\nX-Key: val\r\n\r\n", 'body');
subscribe($subject, $cb, [$queue_group])
Subscribe to a subject. Returns subscription ID. Alias: sub.
my $sid = $nats->subscribe('foo.*', sub {
my ($subject, $payload, $reply, $headers) = @_;
});
Queue groups are preserved across reconnects.
Callback receives:
$subject- actual subject the message was published to$payload- message body$reply- reply-to subject (undef if none)$headers- raw headers string (only for HMSG)
subscribe_max($subject, $cb, $max_msgs, [$queue_group])
Convenience: "subscribe" followed by an auto-unsubscribe after $max_msgs messages have been delivered.
unsubscribe($sid, [$max_msgs])
Unsubscribe. With $max_msgs, the server is told to deliver that many more messages and then drop the subscription. The auto-unsub state is restored on reconnect (so the partial count survives a disconnect). Alias: unsub.
request($subject, $payload, $cb, [$timeout_ms])
Request/reply. Uses automatic inbox subscription. Alias: req.
$nats->request('service', 'data', sub {
my ($response, $err) = @_;
die $err if $err;
print "got: $response\n";
}, 5000);
Callback receives ($response, $error). For replies that include NATS message headers (HMSG), a third argument $headers with the raw header block is also passed. Error is set on timeout ("request timeout") or no responders ("no responders").
drain([$cb])
Graceful shutdown: sends UNSUB for all subscriptions, flushes pending writes with a PING fence, fires $cb when the server confirms with PONG, then disconnects. No new messages will be received after drain is initiated.
$cb receives a single argument: undef on clean drain, or an error string (e.g. "disconnected") if the connection dropped before the PONG arrived.
$nats->drain(sub {
my ($err) = @_;
die "drain failed: $err" if $err;
print "drained, safe to exit\n";
});
ping
Send PING to server.
flush([$cb])
Send PING as a write fence; the subsequent PONG guarantees all prior messages were processed by the server. If $cb is given, it is invoked when the PONG arrives. The callback receives a single argument: undef on success, or an error string (e.g. "disconnected") if the connection dropped before the PONG arrived.
creds_file($path)
Read a NATS .creds file and apply the embedded JWT and NKey seed via "jwt" and "nkey_seed". Apply this BEFORE connect so the credentials are available during the CONNECT handshake. Dies if the file is unreadable or missing either the USER JWT or USER NKEY SEED block.
new_inbox
Returns a fresh subject suitable for use as a private reply target (_INBOX.<rand>.<n>). Each call burns a slot from the same counter that "request" uses, so manual subscribers must treat the returned subject as opaque.
subscription_count
Returns the number of currently-registered subscriptions, including the implicit _INBOX.> subscription used by "request".
server_info
Returns the raw JSON string of the most recent INFO frame received from the server (or undef before the first INFO). Useful for inspecting server_id, version, cluster, connect_urls, etc.
max_payload([$limit])
Server-advertised maximum payload size in bytes. Returns the current value; with an argument, overrides it (publishes above this croak locally before reaching the wire).
waiting_count
Number of writes queued locally during connect or reconnect (i.e. publish/request calls made while the connection is not yet ready). They flush when the handshake completes.
skip_waiting
Drop all queued writes without sending them. Useful before disconnect if reconnect is enabled and you don't want stale publishes replayed.
reconnect($enable, [$delay_ms], [$max_attempts])
Configure reconnection. $delay_ms and $max_attempts are only written when supplied; omitted args leave the existing value unchanged.
reconnect_enabled
Returns true if reconnect is enabled.
connect_timeout([$ms])
Get/set connect timeout.
ping_interval([$ms])
Get/set PING interval.
max_pings_outstanding([$num])
Get/set max outstanding PINGs.
priority([$num])
Get/set EV watcher priority.
keepalive([$seconds])
Get/set TCP keepalive.
batch($coderef)
Batch multiple publishes into a single write. Suppresses per-publish write scheduling; all buffered data is flushed after the coderef returns.
$nats->batch(sub {
$nats->publish("foo.$_", "msg-$_") for 1..1000;
});
slow_consumer($bytes_threshold, [$cb])
Enable slow consumer detection. When the write buffer exceeds $bytes_threshold bytes, $cb is called with the current buffer size.
$nats->slow_consumer(1024*1024, sub {
my ($pending_bytes) = @_;
warn "slow consumer: ${pending_bytes}B pending\n";
});
on_lame_duck([$cb])
Get/set the lame-duck callback. Fires once when the server signals shutdown (leaf node, rolling restart) via INFO ldm:true. Use this to migrate work to another server before the grace period elapses.
nkey_seed($seed)
Set the NKey seed (the SU... base32-encoded form) for Ed25519 authentication. Requires the build to have OpenSSL (see "HAS_NKEY" in EV::Nats). The server nonce from INFO is automatically signed during CONNECT. May also be passed to "new" as nkey_seed => ....
jwt($token)
Set the user JWT. Combine with "nkey_seed" for NATS decentralized auth. May also be passed to "new". See "creds_file" for the common case of loading both from a .creds file.
EV::Nats->nkey_generate_user_seed
Class method. Returns a fresh, valid NATS User NKey seed (the SU... form). Useful for tests and provisioning scripts that don't have the nk CLI available. Requires HAS_NKEY; croaks otherwise.
EV::Nats->nkey_public_from_seed($seed)
Class method. Derives the matching public key (the U... form) from a User NKey seed. Croaks on an invalid seed. Pair with "nkey_generate_user_seed" to provision the server with the public key while the client keeps the seed.
tls($enable, [$ca_file], [$skip_verify])
Configure TLS. Requires OpenSSL at build time (see "HAS_TLS" in EV::Nats).
$nats->tls(1); # system CA
$nats->tls(1, '/path/to/ca.pem'); # custom CA
$nats->tls(1, undef, 1); # skip verification
When verification is enabled (the default), the server certificate's SAN must match either the resolved IP literal or the DNS hostname passed to "connect". May also be passed to "new" as tls => 1, tls_ca_file => $path.
stats
Returns a hash of connection counters:
my %s = $nats->stats;
# ( msgs_in, msgs_out, bytes_in, bytes_out )
reset_stats
Zero all counters returned by "stats".
on_error([$cb])
on_connect([$cb])
on_disconnect([$cb])
Get/set the corresponding callback at runtime. With no argument, returns the current value (or undef). With an argument, replaces it; pass undef to clear.
BUILD-TIME FEATURES
- EV::Nats::HAS_TLS
-
True if compiled with OpenSSL (TLS supported).
- EV::Nats::HAS_NKEY
-
True if NKey/JWT signing is available (also requires OpenSSL).
BENCHMARKS
Measured on Linux with TCP loopback, Perl 5.40, nats-server 2.12, 100-byte payloads (bench/benchmark.pl):
100K msgs 200K msgs
PUB fire-and-forget 4.7M 5.0M msgs/sec
PUB + SUB (loopback) 1.8M 1.6M msgs/sec
PUB + SUB (8B payload) 2.2M 1.9M msgs/sec
REQ/REP (pipelined, 64) 334K msgs/sec
Connected-path publish appends directly to the write buffer with no per-message allocation. Write coalescing via ev_prepare batches all publishes per event-loop iteration into a single write() syscall.
Run perl bench/benchmark.pl for full results. Set BENCH_MESSAGES, BENCH_PAYLOAD, BENCH_HOST, BENCH_PORT to customize.
NATS PROTOCOL
This module implements the NATS client protocol directly in XS. The protocol is text-based with CRLF-delimited control lines and binary payloads.
Connection flow: server sends INFO, client sends CONNECT + PING, server responds with PONG to confirm. All subscriptions (including queue groups and auto-unsub state) are automatically restored on reconnect.
Request/reply uses a single wildcard inbox subscription (_INBOX.<random>.*) for all requests, with unique suffixes per request.
CAVEATS
DNS resolution via
getaddrinfois blocking. Use numeric IP addresses for latency-sensitive applications.TLS requires OpenSSL headers at build time (auto-detected).
NKey auth requires OpenSSL with Ed25519 support (1.1.1+).
The module handles all data as bytes. Encode UTF-8 strings before passing them.
Do not let the
EV::Natsinstance go out of scope (or be explicitlyundef-ed) from inside a callback while that callback is still executing. The callback closure normally references$nats(via$nats->publish(...)etc.), which keeps it alive; if you write a callback that does not capture$natsand youundefthe last outer reference inside that callback, Perl will runDESTROYmid-callback and free the underlying state. Any subsequent operation on$natsin that callback is undefined behavior.Cluster URL discovery (the
connect_urlsfield of INFO) is trusted by default. On failover the client connects to whatever hostnames the previous server advertised, and TLS hostname verification is performed against those names. Use a private CA (tls_ca_file) to restrict which certificates are acceptable, or do not enabletlson public-CA topologies where any holder of a valid cert could redirect clients.
ENVIRONMENT
- TEST_NATS_HOST, TEST_NATS_PORT
-
Set these to run the test suite against a NATS server (default: 127.0.0.1:4222).
SEE ALSO
EV::Nats::JetStream, EV::Nats::KV, EV::Nats::ObjectStore, EV, NATS protocol, nats-server.
AUTHOR
vividsnow
LICENSE
This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself.