NAME

ClickHouse::Encoder - Fast XS encoder for ClickHouse Native format

SYNOPSIS

use ClickHouse::Encoder;

# 1. Encode rows into a Native-format block --------------------
my $enc = ClickHouse::Encoder->new(columns => [
    ['id',     'UInt64'],
    ['user',   'String'],
    ['tags',   'Array(String)'],
    ['score',  'Nullable(Float64)'],
    ['stamp',  'DateTime'],
]);
my $body = $enc->encode([
    [1, 'alice', ['perl','db'], 0.95, time()],
    [2, 'bob',   [],            undef, time()],
]);
# $body is the request body for `insert into events format native`.

# 2. Decode a select ... format native response ----------------
my $response_body = '...';  # HTTP body of select * format native
my $blocks = ClickHouse::Encoder->decode_blocks($response_body);
for my $blk (@$blocks) {
    for my $r (0 .. $blk->{nrows} - 1) {
        my %row = map { $_->{name} => $_->{values}[$r] }
                      @{ $blk->{columns} };
        ...;
    }
}

# 3. Bulk insert with auto-flush + retries ---------------------
my $bi = ClickHouse::Encoder->bulk_inserter(
    host => 'db', table => 'events',
    columns => [['ev','String'],['ts','DateTime']],
    batch_size => 5000, compress => 'zstd');
my @events = ('login', 'click', 'logout');  # or any iterable
$bi->push([$_, time()]) for @events;
$bi->finish;

# 4. JSON columns (CH 24.8+) -----------------------------------
use JSON::PP ();  # for ::true / ::false
my $jenc = ClickHouse::Encoder->new(columns => [['j','JSON']]);
my $jbody = $jenc->encode([
    [{ user => { name => 'alice', id => 42 },
       active => JSON::PP::true }],
    [{ user => { name => 'bob' }, tags => ['a','b'] }],
]);

# 5. Streaming select, with response compression ---------------
ClickHouse::Encoder->select_blocks(
    'select id, event, ts from events where date = today()',
    host => 'db', port => 8123,
    decompress => 1,                  # CH wraps each block in LZ4
    keep => { id => 1, event => 1 },  # optional column projection
    on_block => sub {
        my $blk = shift;
        # Optional: coerce DateTime / Date columns to ISO strings.
        ClickHouse::Encoder->coerce_datetimes($blk);
        for my $r (0 .. $blk->{nrows} - 1) {
            print "row $r: ", join(' | ',
                map { $_->{values}[$r] // 'NULL' }
                grep { !$_->{skipped} } @{ $blk->{columns} }), "\n";
        }
    });

DESCRIPTION

Builds a block in ClickHouse's Native columnar binary format from a Perl arrayref of rows. The returned scalar is the raw body for an insert ... format native request: send it over HTTP, the native TCP protocol, or pipe it into clickhouse-client.

Construct one encoder per schema and reuse it across batches: type parsing happens up front, encoding is pure XS, and failed encodes never leak the partial buffer.

METHODS

new

my $enc = ClickHouse::Encoder->new(columns => \@columns);

columns is an arrayref of [$name, $type] pairs. The $type string must match the syntax ClickHouse uses in describe table output (see "TYPES").

Croaks on unknown or malformed types, on Nullable(Nullable(...)), on empty enum names, on enum values out of range, on FixedString(0), and on out-of-range Decimal* / DateTime64 parameters.

encode

my $bytes = $enc->encode(\@rows);

Returns the raw bytes of one Native-format block. \@rows is an arrayref of arrayrefs; each inner arrayref must have exactly as many elements as the encoder's columns. Croaks if a row's shape is wrong, if a value can't be coerced to its column type, or on string parse errors (see "Value coercion").

encode_columns

my $bytes = $enc->encode_columns({
    id   => [1, 2, 3],
    name => ['a', 'b', 'c'],
});

Like "encode" but takes a column-oriented hashref { name => \@values }. Skips the row-to-column permutation step that encode performs and is slightly faster when your data already lives in columns. All arrays must have the same length; missing column names croak.

encode_into

$enc->encode_into(\$buffer, \@rows);

Like "encode", but appends the bytes to an existing scalar via reference. Useful for batching multiple blocks into one HTTP request body without copying.

encode_to_handle

$enc->encode_to_handle($fh, \@rows);

Like "encode" but writes the bytes directly to a Perl filehandle via PerlIO_write, skipping a copy. Useful when piping to clickhouse-client or a network socket.

for_query

my $enc = ClickHouse::Encoder->for_query($sql, %opts);

Like "for_table" but for arbitrary select: runs describe ($sql) to discover the column shape and returns a configured encoder. Useful when the schema doesn't correspond to a real table (CTEs, joins, computed columns). Options are the same as "for_table"; the caller is responsible for the SQL.

insert_http

my $resp = ClickHouse::Encoder->insert_http(
    host => 'localhost', port => 8123, table => 'events',
    columns => \@cols, rows => \@rows,
    compress => 'zstd',   # optional
    scheme   => 'https',  # optional; needs IO::Socket::SSL + Net::SSLeay
    settings => {         # optional per-query CH settings
        max_execution_time => 30,
        max_memory_usage   => '10G',
    },
    dedup_token => $batch_id,  # optional idempotency token
);
die "insert failed (status $resp->{status}): $resp->{content}"
    unless $resp->{success};
# CH response metadata, when present, is also attached:
my $qid     = $resp->{ch}{'query-id'};
my $written = $resp->{ch}{summary}{written_rows};

Thin convenience wrapper that builds an encoder, encodes rows, and POSTs the bytes to http(s)://host:port/?query=insert into $table format native. Pass encoder = $enc> instead of columns to reuse one. compress accepts 'raw' (default), 'zstd', or 'gzip'. scheme => 'https' enables TLS via HTTP::Tiny's SSL support (install IO::Socket::SSL and Net::SSLeay). ssl_options / verify_SSL pass through to HTTP::Tiny. settings applies per-query CH settings via URL params. dedup_token stamps the request with an insert_deduplication_token so an identical retry is rejected server-side. Returns the HTTP::Tiny response hashref with a ch => { query-id, server, format, exception-code, summary, progress, ... } slot containing parsed X-ClickHouse-* response headers; no automatic retries (do HTTP policy in the caller, or use "bulk_inserter").

for_table

my $enc = ClickHouse::Encoder->for_table($table, %opts);

Convenience constructor that introspects a table's schema and runs

describe table $table format tabseparated

$table must be a plain identifier or db.table; each component matches [A-Za-z_][A-Za-z0-9_]*. Anything else is rejected to avoid SQL injection through the describe table query.

Options (all optional):

via

'client' (default) shells out to clickhouse-client. 'http' uses HTTP::Tiny against host:port directly with no external binary dependency. Recommended on environments without clickhouse-client.

host, port, database, user

Connection parameters. Defaults: localhost; 9000 for via => 'client' or 8123 for via => 'http'; default, default.

password

For client mode, passed via CLICKHOUSE_PASSWORD env var. For http mode, sent as the X-ClickHouse-Key header.

client

Path to the clickhouse-client executable (only for via => 'client'). Default: whatever exec finds on $PATH.

scheme, timeout, ssl_options, verify_SSL, settings

For via => 'http': the URL scheme (default http; pass https for TLS), request timeout in seconds (default 10), optional SSL options / verify_SSL passthrough to HTTP::Tiny, and a hashref of per-query CH settings. Ignored under via => 'client' (the clickhouse-client binary handles its own connection).

stream

$enc->stream(\&iter, \&writer, batch_size => 10_000);

Pulls rows from &iter (a coderef returning the next row, or undef when done) and emits one Native block per batch_size rows by calling &writer with the encoded bytes. The iterator loop runs in XS, so the per-row Perl overhead is bounded.

streamer

my $st = $enc->streamer(\&writer, batch_size => 10_000);
my $st = $enc->streamer(\&writer, batch_size => 10_000,
                        compress => 'lz4');    # XS-level compression
$st->push_row($row);
$st->push_row($row);
...
$st->finish;

Returns a ClickHouse::Encoder::Streamer object that buffers rows and flushes a complete Native block to &writer every batch_size rows. Call finish to flush any partial last batch, or reset to discard the buffered rows without flushing (useful for error recovery after an upstream failure). buffered_count and is_empty let producers inspect the current backlog. The streamer keeps the encoder alive for its own lifetime, so dropping the encoder reference after creating the streamer is safe.

Options:

batch_size => N

Flush every N rows. Default 10_000.

compress => 'lz4' | 'zstd' | 'auto' | 'none'

When set (and not 'none'/'raw'), each emitted batch is wrapped in CH's compressed-block framing via "compress_native_block" before being passed to &writer. Done at the XS level, so no per-batch Perl-callback overhead beyond the one compress_native_block call. Pair with "compressed_writer" only when you want a different compression scheme on top (e.g. HTTP Content-Encoding: gzip).

hasher => $coderef

Override the bundled CityHash128 v1.0.2 used by the compression framing. Useful when integrating with a non-default checksum implementation; usually omit.

The returned ClickHouse::Encoder::Streamer object exposes:

$st->push_row(\@row)

Append one row. Triggers an auto-flush (one call to &writer) once the buffer reaches batch_size.

$st->finish

Flush any partial last batch via &writer and return. Safe to call multiple times - subsequent calls on an empty buffer are no-ops, and a fresh push_row after finish reopens the streamer.

$st->reset

Discard buffered rows without flushing. Useful when an upstream producer hits an error mid-batch and the in-flight rows should be dropped rather than emitted with stale data.

$st->buffered_count

Return the integer count of rows currently buffered (not yet flushed). Lets producers inspect the backlog before committing.

$st->is_empty

Return a true value when buffered_count == 0. Convenience for hot-path checks.

columns

my $cols = $enc->columns;
# [ ['id', 'UInt32'], ['name', 'String'], ... ]

Returns a fresh arrayref of [$name, $type] pairs reflecting the encoder's schema. The values are copies, not references into the encoder's internal state.

validate_rows

my $errors = $enc->validate_rows(\@rows);
# [ { row => 7, error => "..." }, ... ]

Trial-encodes each row and collects per-row failures into an arrayref of { row => $idx, error => $msg } hashes (empty arrayref if all rows are valid). Useful in ETL pipelines that want to log the bad rows and continue, instead of croaking on the first failure.

The trial encoding does the full encode work and discards the bytes, so the cost is comparable to a real encode per row. For hot ingestion paths, prefer letting bad rows croak upstream and only call validate_rows on suspect batches.

encode_to_command

$enc->encode_to_command(\@cmd, \@rows);

Convenience: forks a child running @cmd, opens its stdin as a write pipe, and streams the encoded bytes into it via "encode_to_handle". Croaks if the fork fails, the exec fails, or the child exits non-zero; returns nothing on success.

Typical use is piping into clickhouse-client:

$enc->encode_to_command(
    ['clickhouse-client', '--query', 'insert into events format native'],
    \@rows,
);

flatten_nested

my $cols = ClickHouse::Encoder->flatten_nested(\@cols);

Class method that expands any Nested(field T, ...) entries in a column list into the flat name.field Array(T) columns ClickHouse stores them as on the wire. Non-Nested columns pass through unchanged.

my $cols = ClickHouse::Encoder->flatten_nested([
    ['events', 'Nested(t DateTime, kind String)'],
    ['ts',     'DateTime'],
]);
# => [ ['events.t', 'Array(DateTime)'],
#      ['events.kind', 'Array(String)'],
#      ['ts', 'DateTime'] ]
my $enc = ClickHouse::Encoder->new(columns => $cols);

for_table already returns the flat form because describe table reports it that way, so this helper is for hand-written schemas that mirror the user's create table more naturally.

decode_block

my $block = ClickHouse::Encoder->decode_block($bytes);

Decode the first Native block in $bytes. Returns a hashref:

{
    ncols    => $n_columns,
    nrows    => $n_rows,
    columns  => [
        { name => 'id',   type => 'UInt64', values => [...] },
        { name => 'name', type => 'String', values => [...] },
        ...
    ],
    consumed => $bytes_used,
}

consumed is the number of bytes used. To walk a stream of concatenated blocks (multi-block select format native response), prefer "decode_blocks". Or pass a starting offset directly:

my $block = ClickHouse::Encoder->decode_block($bytes, $offset);

The 3-arg form avoids the O(N) substr copy per call that substr($bytes, $offset) would entail.

An optional fourth-argument hashref filters which columns to keep:

my $block = ClickHouse::Encoder->decode_block(
    $bytes, 0, { id => 1, ts => 1 });

Columns whose name isn't in the filter still consume their wire bytes (so the cursor stays aligned) but their values array is replaced with N undefs and the column hashref carries a skipped => 1 marker. Skips the SV-allocation cost for unwanted columns; useful on wide select * responses.

XS implementation: walks the Native byte stream using the same type parser the encoder uses, so symmetric round-trips are guaranteed for every type encode handles (BFloat16, alphabetical Variant remapping, LowCardinality dict indirection, SimpleAggregateFunction passthrough, JSON typed paths, etc.).

Decimal128 values come back as [$lo_uint64, $hi_int64]; Decimal256 as a 4-limb arrayref. Use "decimal128_str" / "decimal256_str" to convert to scaled decimal strings.

decode_rows

my $r = ClickHouse::Encoder->decode_rows($bytes);
my $r = ClickHouse::Encoder->decode_rows($bytes, $offset);

Row-oriented convenience. Returns:

{
    ncols => $n_columns,
    nrows => $n_rows,
    names => [...],
    types => [...],
    rows  => [[...], [...], ...],
    consumed => $bytes_used,
}

Calls an XS row-major decoder (decode_block_rows) that walks each column then immediately distributes its values into the per-row arrayrefs and frees the column AV. Peak memory holds one column's AV plus the row AVs (vs both column- and row-major representations fully alive, which a Perl-side transpose would entail). Throughput is similar to "decode_block"; the win is the tighter peak memory on wide blocks.

decode_block_rows

my $r = ClickHouse::Encoder->decode_block_rows($bytes, $offset);

XS row-major decoder; same return shape as "decode_rows". Direct entry point if you want to avoid the "decode_rows" Perl-side trampoline.

decode_blocks

my $blocks = ClickHouse::Encoder->decode_blocks($bytes);
ClickHouse::Encoder->decode_blocks($bytes, sub { my $b = shift; ... });

A select ... format native response is a concatenated stream of blocks (one per granule of max_block_size rows). With no callback, decode_blocks walks the stream and returns an arrayref of the same hashref shape as "decode_block". With a callback, each block is passed to the callback as it's decoded and no list is accumulated - useful for very long selects where the full block list wouldn't fit comfortably in memory.

Uses the 3-arg form of "decode_block" (with explicit offset) to keep total work O(N) regardless of block count. Stops cleanly when bytes are exhausted; partial trailing bytes croak.

The optional keep => \%names hashref forwards a column filter to "decode_block" for every block in the stream, matching the same semantics: present keys are decoded, absent ones still have their bytes consumed (to keep the cursor aligned) but their values are not materialized and their column hash carries skipped => 1. Useful for big-fan-out select responses where only a few columns of a wide row matter.

ClickHouse::Encoder->decode_blocks($bytes, $cb,
    keep => { id => 1, event => 1 });

decode_blocks_iter

my $iter = ClickHouse::Encoder->decode_blocks_iter($bytes);
while (my $block = $iter->()) { ... }

my $iter = ClickHouse::Encoder->decode_blocks_iter($bytes,
    keep => { id => 1 });

Returns a coderef that yields one block per call (undef when exhausted). Same per-block payload as "decode_block"; useful when you want pull-style iteration without committing to a callback. Accepts the same keep filter as "decode_blocks".

decode_stream

ClickHouse::Encoder->decode_stream($fh, sub { my $block = shift; ... },
                                   chunk_size => 65536);

ClickHouse::Encoder->decode_stream($fh, $cb,
    keep => { id => 1, event => 1 });

Pull bytes incrementally from a filehandle (or any read-able IO handle), yielding each complete block to the callback as it arrives. Uses a sliding buffer; on a truncated decode it reads more bytes and retries. Memory stays bounded by chunk_size + one block, so this is the right entry point for select responses too large to buffer in full. Croaks on partial trailing bytes.

The keep filter is the same one "decode_block" accepts: unwanted columns still have their bytes consumed (so the cursor stays aligned) but their values are not materialized into an SV array, so peak memory stays bounded by the kept columns.

With decompress => 1, $fh is expected to deliver a stream of compressed-block-framed Native blocks (the format CH's HTTP ?compress=1 response uses, or a captured native-TCP Data stream under compression). decode_stream peels each compressed block via "decompress_native_block" before feeding the resulting raw Native bytes into "decode_block".

$fh must support Perl's read() builtin (any plain filehandle or IO::Handle subclass). Raw socket descriptors that only support sysread need to be wrapped via IO::Socket or read into a buffer that is then fed to "decode_block" directly.

ping

ClickHouse::Encoder->ping(host => 'db', port => 8123);
ClickHouse::Encoder->ping(scheme => 'https', host => 'db', port => 8443);

Liveness check via CH's /ping endpoint. Returns 1 on success; croaks on connection refused, timeout, or any non-2xx HTTP status. Accepts the same scheme/host/port/timeout/ssl_options options as the rest of the HTTP entry points.

server_version

my $v = ClickHouse::Encoder->server_version(
    host => 'db', port => 8123);
if ($v->{major} >= 24) { ... }

Fetches select version() over HTTP. In scalar context returns { major, minor, patch, build, raw }; in list context returns ($major, $minor, $patch, $build, $raw). Useful for capability gating in user code. Accepts the same scheme/host/port/database/user/password/timeout/ ssl_options/verify_SSL/settings options as the rest of the HTTP entry points.

types

my @t = ClickHouse::Encoder->types;

Returns the list of supported ClickHouse type names (parametric types as their syntactic prefix, e.g. Decimal, Array). For runtime feature detection and tooling that wants to introspect supported types without parsing POD.

schema_diff

my $d = ClickHouse::Encoder->schema_diff(\@cols_a, \@cols_b);
# $d = {
#     added   => [[name, type], ...],   # in $b but not $a
#     removed => [[name, type], ...],   # in $a but not $b
#     changed => [[name, type_a, type_b], ...],
# }

Compare two column lists (each an arrayref of [$name, $type] pairs, the shape "new" takes). Useful for migration scripts and detecting schema drift between source and destination in CH-to-CH replication pipelines.

format_create_table

my $sql = ClickHouse::Encoder->format_create_table(
    table        => 'events',
    columns      => [['id','Int32'], ['msg','String']],
    engine       => 'MergeTree',           # default
    order_by     => '(id)',
    partition_by => 'toYYYYMM(ts)',        # optional
    primary_key  => '(id)',                # optional
    sample_by    => 'id',                  # optional
    ttl          => 'event_date + INTERVAL 90 DAY',  # optional
    settings     => 'index_granularity=8192', # optional
);

Emits a create table statement string from a column list of the same shape "new" takes. The table name is validated by the same regex "for_table" / "insert_http" use, and column names are backtick-quoted with embedded backticks escaped. The engine / partition_by / primary_key / order_by / sample_by / ttl / settings opts are inserted verbatim - the caller is responsible for SQL correctness there. Clauses are emitted in CH's canonical show create table order, so the output round-trips through "parse_create_table" without reordering.

A column entry may be [name, type] or [name, type, \%col], where %col carries per-column modifiers rendered in CH's own order: one of default / materialized / alias (an expression; passing more than one croaks), then codec, then ttl, then comment. The comment is quoted as a string literal (embedded single quotes escaped); every other value is inserted verbatim.

my $sql = ClickHouse::Encoder->format_create_table(
    table   => 'events',
    columns => [
        ['id',   'UInt64'],
        ['ts',   'DateTime', { codec => 'DoubleDelta, LZ4' }],
        ['kind', 'String',   { default => "'unknown'",
                               comment => 'event kind' }],
    ],
    engine => 'MergeTree', order_by => '(id, ts)',
    ttl    => 'ts + INTERVAL 30 DAY',
);

Pair with "schema_diff" + "apply_schema_diff" for end-to-end schema migration tooling.

parse_create_table

my $info = ClickHouse::Encoder->parse_create_table($show_create_sql);
# $info = {
#   database => 'analytics', table => 'events',
#   columns  => [['id','UInt64'], ['name','String'], ...],
#   engine   => 'MergeTree', order_by => '(id, ts)',
#   partition_by => 'toYYYYMM(ts)', primary_key => 'id',
#   sample_by => 'id', ttl => 'ts + INTERVAL 90 DAY',
#   settings => 'index_granularity = 8192',
# }
# Clause keys are present only when the DDL has that clause.

Parses the output of show create table (or any create table DDL) into a structured hashref. columns is in the same [name, type] shape "schema_diff" and "format_create_table" consume, so a round trip - fetch DDL, parse, diff against a desired shape, emit ALTERs - is one call each. Per-column DEFAULT / CODEC / TTL / COMMENT modifiers are stripped from the reported type (the bare type is what describe reports). Nested-comma types (Decimal(18, 4), named Tuple, Map) and backtick-quoted identifiers are handled. database is undef when the name is not schema-qualified. Croaks if no create table header or column block is found.

Trailing clauses (ENGINE, partition by, TTL, ...) are delimited by the next clause keyword: a clause expression that itself embeds a standalone clause keyword (rare - e.g. a TTL expression containing the bare word SETTINGS) would be truncated there. The column list is parsed precisely; clause values are best-effort.

apply_schema_diff

my $diff  = ClickHouse::Encoder->schema_diff(\@before, \@after);
my $stmts = ClickHouse::Encoder->apply_schema_diff(
    $diff, table => 'events');
# $stmts = [ 'alter table `events` drop column `old_col`',
#            'alter table `events` modify column `x` UInt32',
#            'alter table `events` add column `new_col` Int64' ]

Translates a "schema_diff" hashref into a list of alter table statements. Returns an arrayref of SQL strings; the caller decides whether to apply them transactionally or one at a time. Ordering is deterministic: drops first, then modifies, then adds.

for_native_bytes

my $enc = ClickHouse::Encoder->for_native_bytes($captured_bytes);

Inspect a captured Native block and return a fresh encoder configured for that exact column shape. Zero-row blocks work fine (the column headers are still on the wire); the typical use case is round-tripping captured payloads through a transform / filter step where you need an encoder matching the input's schema but don't have access to the source server.

encode_row_binary

my $body = $enc->encode_row_binary(\@rows);
# POST as: insert into t format RowBinary

Encode rows into ClickHouse's row-major RowBinary format (the request body for insert ... format RowBinary). Native is the preferred format and is what the rest of this module uses; RowBinary is offered for interoperability with producers and pipelines that speak it. Call on an encoder instance - its column types drive serialisation.

Supported column types: every scalar type (Int*/UInt*/Float*/Bool/Date*/DateTime*/ Decimal*/UUID/IPv4/IPv6/Enum*), String, FixedString(N), Nullable(...) of any of those, Array(...) nesting, and LowCardinality(...) (encoded as its inner type, as RowBinary represents it). Map, Tuple, Variant, JSON, Dynamic, Geo and Nested columns croak - their Native and RowBinary framings differ. Use Native ("encode") for those.

decode_row_binary

my $rows = $enc->decode_row_binary($bytes);

Decode a RowBinary byte string into an arrayref of row arrayrefs. Call on an encoder instance whose column types match the producer (RowBinary carries no schema, so the types must be known out of band). The supported type surface and per-type value semantics are identical to "encode_row_binary" and to the Native decoder - decode_row_binary yields the same Perl values "decode_block" would for the same data.

coerce_datetimes

my $blk = ClickHouse::Encoder->decode_block($bytes);
ClickHouse::Encoder->coerce_datetimes($blk);             # ISO strings
ClickHouse::Encoder->coerce_datetimes($blk, as => 'datetime');  # Time::Moment

Post-process a decoded block: rewrite every Date / Date32 / DateTime / DateTime(tz) / DateTime64(p) column's values from raw epoch integers into either ISO 8601 strings (the default; UTC with a Z suffix) or Time::Moment instances (as => 'datetime'). The block is mutated in place and also returned, so the call can be chained.

Nullable()-wrapped time columns are handled too; undef values pass through unchanged. Non-time columns are untouched. DateTime64(p) precision is honored - the fractional part is emitted as exactly p digits in ISO form, or as nanoseconds widened from p ticks in Time::Moment form.

This is a separate post-decode step (rather than an option on "decode_block") so the cost is only paid when the caller wants formatted values. Raw integer epochs are still cheaper to compare, filter, or pass back into a re-encode.

parse_wkt

my $point = ClickHouse::Encoder->parse_wkt('POINT(1.5 2.5)');
my $poly  = ClickHouse::Encoder->parse_wkt(
    'POLYGON((0 0, 4 0, 4 4, 0 4, 0 0))');
# round-trip into a CH Geo column:
my $enc = ClickHouse::Encoder->new(columns => [
    ['p', 'Point'], ['poly', 'Polygon']]);
$enc->encode([[ $point, $poly ]]);

Parse a Well-Known-Text geometry string into the nested-arrayref shape that the Geo column encoders accept. Supports POINT, LINESTRING, MULTILINESTRING, POLYGON, and MULTIPOLYGON. The CH Ring type has no WKT name; feed a LINESTRING result into a Ring column directly. Geometry names are case-insensitive and surrounding whitespace is tolerated. Malformed input croaks with a message identifying the offending geometry.

estimate_size

my $bytes = $enc->estimate_size(\@rows);
my $bytes = $enc->estimate_size($n_rows,
                                avg_string_size => 64);

Coarse byte-size estimate for an encoded block, parameterized on row count (an integer or arrayref-of-rows; only the count is used). Returns an order-of-magnitude figure for batch-split decisions ("should I split this into two POSTs?") without paying the encode cost. Fixed-width types are byte-exact; variable types (String, Array, etc.) use a configurable 16-byte average heuristic. For byte-exact size, call length($enc->encode(...)).

select_blocks

ClickHouse::Encoder->select_blocks(
    'select id, event, ts from events where date = today()',
    host     => 'db.example', port => 8123,
    database => 'default',    user => 'default',
    on_block => sub { my $block = shift; ... },
    keep     => { id => 1, event => 1 },  # optional projection
);

Streaming counterpart to "insert_http": POSTs $sql to the ClickHouse HTTP endpoint with default_format=Native, feeds the response chunks into a sliding buffer, and invokes on_block for every complete "decode_block"-shaped block as it arrives. Memory stays bounded by one HTTP::Tiny chunk plus one block; this is the right entry point for selects that return more than fits in process memory.

$sql must NOT end with a format ... clause - select_blocks appends format Native at the URL level and croaks when the SQL trails with a different format pin. A FORMAT token inside the query body (for example a column literal) is fine.

The optional keep => \%names hashref forwards a column filter to "decode_block": skipped columns still have their bytes consumed (so the cursor stays aligned) but their values are not materialized into SVs. Useful when you only need a few of many select-list columns.

With decompress => 1 the URL is augmented with ?compress=1 so ClickHouse wraps each response Native block in its compressed-block framing (16-byte CityHash128 + 9-byte header + LZ4 payload). The HTTP body is then a stream of compressed blocks which select_blocks peels and decompresses block-by-block via "decompress_native_block" before feeding the result to "decode_block". Memory stays bounded by one HTTP chunk plus one compressed block plus one decompressed block.

Recognised options (besides on_block / keep / decompress): scheme, host, port, database, user, password, timeout, ssl_options, verify_SSL, and settings (per-query CH settings hashref, useful for max_execution_time and similar). dedup_token is meaningful only on insert and is ignored if passed here.

bulk_inserter

my $bi = ClickHouse::Encoder->bulk_inserter(
    host => 'db.example', port => 8123, table => 'events',
    columns => \@cols, batch_size => 5000, compress => 'zstd',
    retries => 3);
$bi->push([$row]) for @rows;
$bi->finish;

Holds an HTTP::Tiny instance with keep-alive across batches, accumulates rows, auto-flushes at batch_size, retries transient HTTP failures (5xx and 599 network errors) with exponential backoff and jitter. 4xx errors die immediately. Options:

host, port, database, user, password, scheme, timeout

Same as "for_table"; passed to HTTP::Tiny.

table

Required; same identifier rule as "for_table".

encoder or columns

Pass either an existing encoder or a column list (used to build one).

batch_size

Auto-flush threshold (default 10_000).

retries

Max retries on transient failure (default 3). Set to 0 to disable.

retry_wait

Base backoff in seconds (default 0.5). Waits grow exponentially - the window for attempt n is retry_wait * 2 ** n - with equal jitter (a random point in the upper half of the window), so concurrent inserters retrying the same failed server do not resynchronise into a thundering herd.

retry_max_wait

Upper bound in seconds on the backoff window (default 30), capping the exponential growth from retry_wait.

compress

'raw' (default), 'zstd', or 'gzip'. Sets the Content-Encoding header accordingly.

scheme, ssl_options, verify_SSL

scheme => 'https' enables TLS via HTTP::Tiny (install IO::Socket::SSL and Net::SSLeay). ssl_options and verify_SSL pass through to HTTP::Tiny.

settings

Hashref of per-query CH settings (max_execution_time, max_memory_usage, ...) appended to every flush as URL params.

dedup_token

Stamps every POST with insert_deduplication_token. Identical retries are rejected server-side, making the inserter transactionally idempotent.

Methods on the returned object:

$bi->push($row)

Append a single row. Auto-flushes if buffer crosses batch_size.

$bi->push_many(\@rows)

Append many rows in one call.

$bi->flush

Flush whatever is in the buffer (idempotent: no-op when empty).

$bi->finish

Flush and return { rows => $sent_total, batches => $batches }.

$bi->buffered_count / $bi->sent_rows / $bi->sent_batches

Instrumentation accessors.

$bi->summary

Hashref of cumulative X-ClickHouse-Summary stats rolled up across batches (written_rows, written_bytes, elapsed_ns, ...).

$bi->last_response

The HTTP::Tiny response hashref from the most recent flush, with the parsed ch => { query-id, server, format, exception-code, summary, progress, ... } slot attached (whichever X-ClickHouse-* headers the server sent). undef until the first flush succeeds.

decimal128_str

my $s = ClickHouse::Encoder->decimal128_str($lo, $hi, $scale);

Convert a decoded Decimal128 low/high uint64 pair (as returned by "decode_block") into a signed decimal string with the given fractional scale. Uses Math::BigInt for the 128-bit arithmetic.

decimal256_str

my $s = ClickHouse::Encoder->decimal256_str(\@limbs, $scale);

Convert a decoded Decimal256 4-limb arrayref (low-to-high uint64s, as returned by "decode_block") into a signed decimal string with the given fractional scale. Uses Math::BigInt for the 256-bit arithmetic.

compressed_writer

my $w = ClickHouse::Encoder->compressed_writer('zstd', \&raw_writer);
my $st = $enc->streamer($w, batch_size => 1000);

Class method that wraps a writer coderef so each emitted block is compressed before being forwarded. Modes: 'zstd' (requires Compress::Zstd), 'gzip' (uses core IO::Compress::Gzip), 'raw' / undef (pass-through). Compose with "stream" or "streamer"; the wrapper handles one block at a time so memory stays proportional to a single batch.

compress_native_block

my $framed = ClickHouse::Encoder->compress_native_block(
    $native_bytes,
    mode   => 'lz4',  # 'lz4' | 'zstd' | 'auto' | 'none'
    # hasher => \&my_cityhash128,   # optional; default = bundled
);

Wraps an encoded Native block in ClickHouse's CompressedReadBuffer framing: a 16-byte checksum, then a 9-byte header (1-byte method tag + LE UInt32 compressed_size + LE UInt32 uncompressed_size), then the LZ4 (tag 0x82), ZSTD (tag 0x90), or uncompressed (tag 0x02) payload. This is the framing used by the native TCP protocol when compression is negotiated and by Native-over-HTTP with &compress=1 / &decompress=1.

Modes:

'lz4'

LZ4-compressed via Compress::LZ4's raw form (no length prefix).

'zstd'

ZSTD-compressed via Compress::Zstd.

'auto'

Try LZ4 first; if the result is >= the input, fall back to 'none'. Mirrors CH's own CompressedWriteBuffer behavior for incompressible payloads.

'none'

No compression but still wrapped in the framing (method tag 0x02). Useful when the wire context requires compressed-block framing but the payload doesn't benefit from compression.

The checksum is CityHash128 in the "cityhash102" variant (ClickHouse's namespace fork of Google CityHash v1.0.2). This module bundles a port of that algorithm in cityhash.c, exposed as the XSUB _cityhash128; both compress_native_block and "decompress_native_block" default to it. Pass an explicit hasher => $coderef only if you want to plug in a different implementation.

Compress::LZ4 is required for 'lz4' / 'auto' mode; Compress::Zstd for 'zstd'. Both are listed as runtime recommends.

decompress_native_block

my ($plain, $consumed) = ClickHouse::Encoder->decompress_native_block(
    $framed);                          # default hasher = bundled
my $plain = ClickHouse::Encoder->decompress_native_block(
    $framed, hasher => undef);         # skip checksum verification
my ($plain, $n) = ClickHouse::Encoder->decompress_native_block(
    $stream, offset => $cursor);       # walk a multi-block stream

Inverse of "compress_native_block": verifies the checksum (unless hasher => undef), unpacks the payload by method tag, and returns the raw Native bytes. In list context also returns the number of bytes consumed from $bytes (16 + 9 + payload length), so the caller can advance an offset cursor through a stream of back-to-back compressed blocks.

TYPES

Supported

  • Integers: Int8, Int16, Int32, Int64, UInt8, UInt16, UInt32, UInt64.

  • Floats: Float32, Float64, BFloat16 (CH 24.x; 2-byte truncated Float32). Inf, -Inf, and NaN are preserved.

  • Strings: String (length-prefixed bytes), FixedString(N) (N bytes, null-padded). Both pass the SV's bytes through unchanged: a UTF-8 string encodes its UTF-8 bytes, a binary blob encodes its bytes, and truncation is by byte not codepoint.

  • Dates: Date, Date32, DateTime, DateTime('tz') (timezone is part of the schema, not the value), DateTime64(P) with P in 0..9.

  • Decimals: Decimal32(S) (S in 0..9), Decimal64(S) (0..18), Decimal128(S) (0..38), Decimal256(S) (0..76), and Decimal(P, S) with P in 1..38 (auto-routed to 32/64/128).

  • Enums: Enum8('a' = 1, ...), Enum16(...).

  • Bool / Boolean (1 byte; truthy/falsy in Perl sense).

  • UUID (16 bytes; accept either the standard xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx string or 16 raw bytes).

  • IPv4 (UInt32 LE; accept dotted-quad string or integer), IPv6 (16 bytes network-order; accept colon-hex string or 16 raw bytes).

  • Map(K, V) (wire-equivalent to Array(Tuple(K, V)); accept either a hashref or an arrayref of pairs).

  • Variant(T1, T2, ...) (CH 24.1+; tagged union). Each row is either undef (null) or [$variant_idx, $value], where $variant_idx is the 0-based position in the declared Variant(...) type list (so Variant(String, UInt32) uses 0 for String and 1 for UInt32). Up to 254 variants. ClickHouse stores Variant arms alphabetically by type name on the wire; the encoder remaps your declared index transparently, so you always use declaration order in your code. describe table returns the arms already alphabetized, and for_table uses that form, so indices match either way.

  • LowCardinality(String), LowCardinality(FixedString(N)), LowCardinality(Nullable(String)), LowCardinality(Nullable(FixedString(N))).

  • Geo: Point, Ring, LineString, MultiLineString, Polygon, MultiPolygon (aliases over Tuple/Array). "parse_wkt" turns a Well-Known-Text string into the nested-arrayref input these columns expect.

  • Composites: Array(T), Tuple(T1, T2, ...), Nullable(T). Arbitrary nesting; Nullable(Nullable(T)) is rejected, matching ClickHouse. Named tuple elements (Tuple(a Int32, b String)) are accepted; the wire format itself ignores names, but for a named tuple the encoder will also accept rows as hashrefs ({ a => 1, b => "x" }) in addition to arrayrefs ([1, "x"]). Missing keys in a hashref encode as the type's null placeholder.

  • SimpleAggregateFunction(func, T): parsed as plain T; the aggregation function name only affects how readers consume the column, not the wire-level binary format. Full AggregateFunction (with function-specific binary state) is not supported; use SimpleAggregateFunction where the inner type matches the on-the-wire representation.

  • JSON / Object('json') (the stable CH 24.8+ JSON type). Each row is a hashref of leaf values; nested hashrefs are auto-flattened into dotted-path subcolumns matching CH's internal storage (so { user => { name => "alice" } } stores under the user.name path on the wire and round-trips back as the same nested structure on decode). Leaf value typing is inferred per-path from Perl SV flags:

    • Numeric SV with the integer flag set but not the float flag (SvIOK && !SvNOK) -> Int64.

    • Numeric SV with the float flag set (SvNOK): non-integer value -> Float64; a NV that happens to equal an integer (e.g. 1.0) collapses to Int64 on the wire, mirroring CH's own JSONEachRow inference. Round-trip then decodes back as an integer; to keep an integer-valued float as Float64 use a dedicated Float64 column or a Variant(Float64, ...). NaN and Inf stay Float64.

    • Blessed scalarref into JSON::PP::Boolean, JSON::XS::Boolean, Types::Serialiser::Boolean, Cpanel::JSON::XS::Boolean, or boolean -> Bool. Perl 5.36+ native booleans (SvIsBOOL) also work.

    • Anything else stringy -> String.

    • undef -> null discriminator for that path in that row.

    A path's per-row types may differ; ClickHouse's Dynamic sub-column representation handles the union. Multiple rows of the same path sharing different types are encoded as a Variant. Each variant kind is one of: Bool, Float64, Int64, String, or an Array(...) of those (homogeneous element type per array). Mixed or nested arrays are rejected with a clear message.

    The JSON(name Type, name Type, ...) form pins specific paths to concrete inner types ("typed paths"). Those paths skip the Dynamic+Variant wrapping and emit as regular columns, which is cheaper on the wire and lets you query them as concrete types server-side without toInt64() / toString() coercion. Path names may be dotted (e.g. JSON(user.id UInt64)), the type list is comma-separated, and typed paths are independent of the dynamic paths (extra keys still go through Variant). Missing typed keys encode the inner type's default (0 for numerics, empty for String / Array / Map, null for Nullable). Inner types whose ClickHouse serialization includes a wire prefix (Variant, LowCardinality, JSON, Dynamic) are rejected at encoder construction.

  • Dynamic as a standalone column type: same wire format as one JSON path's Dynamic sub-column without the Object wrapper. Each row is a scalar leaf (Bool / Float64 / Int64 / String), an Array(...) of those, or undef (null). Hashrefs aren't accepted here - use a JSON column for object-shaped values.

Not currently supported

LowCardinality(T) for non-string T, AggregateFunction (per-function binary state).

Heterogeneous arrays as JSON leaves (e.g. [1, "two"]); only homogeneous arrays of Bool / Float64 / Int64 / String are supported. Arrays-of-objects ([{...}, {...}]) and nested arrays ([[1,2],[3,4]]) are likewise rejected. Future work.

Nested(...) at the encoder level. ClickHouse splits a Nested column on the wire into flat name.field columns of type Array(T); this encoder doesn't perform that expansion. Use the flat form directly:

columns => [
    ['events.time', 'Array(DateTime)'],
    ['events.type', 'Array(String)'],
],

for_table() introspects this form correctly because describe table returns the flat columns.

Value coercion

  • Numeric types go through SvIV / SvUV / SvNV. Negative inputs to unsigned types are bit-cast (standard Perl behaviour).

  • Date / Date32: integer (or integer-valued string) is interpreted as days since the epoch; a YYYY-MM-DD string is parsed. Pass DateTime / Time::Piece / Time::Moment objects as $dt->epoch / 86400 -- the encoder doesn't dispatch through ->epoch itself.

  • DateTime: integer is Unix seconds; a YYYY-MM-DD HH:MM:SS string is parsed. ISO 8601 forms are accepted: the T separator, plus an optional trailing timezone marker (Z, +HH:MM, -HH:MM, +HHMM, +HH, -HH) is applied to convert to UTC. Pass date-objects via their ->epoch.

  • DateTime64(P): integer is in scaled units (i.e. ticks of 10^-P seconds); a float is in seconds and scaled to ticks; a YYYY-MM-DD HH:MM:SS.fff string is parsed. For sub-second-aware objects pass $dt->hires_epoch (or ->epoch if the object is integer-only).

  • Decimal*: a number goes through double (lossy past 2^53); a string matching [+-]?digits[.digits]? is parsed digit-by-digit and scaled exactly. If precision matters, pass strings.

  • Enum8 / Enum16: accept either the declared name or its integer value; mixing the two within a column is fine.

  • Nullable(T): undef writes a null-bitmap entry plus a type-shaped placeholder (zero scalar, empty array, or a recursive zero tuple).

EXAMPLES

The eg/ directory ships runnable scripts:

eg/insert_http.pl

End-to-end insert over HTTP via HTTP::Tiny. The shortest path to "insert real data into a real ClickHouse".

eg/insert_streaming.pl

Reuse one encoder across many batches, piping each batch to clickhouse-client. Demonstrates the intended one-encoder-many-batches pattern.

eg/for_table.pl

Schema discovery via for_table.

eg/from_csv.pl

Read a CSV with Text::CSV_XS, map columns to a ClickHouse schema, and insert via HTTP.

eg/insert_clickhouse_local.pl

Server-less ETL: encode rows, pipe Native bytes into clickhouse-local, have it write a Parquet (or ORC, etc.) file.

eg/etl_dbi.pl

Read rows from a source database via DBI, encode to Native, insert into ClickHouse via HTTP. Reuses one encoder across all fetched batches.

eg/insert_compressed.pl

insert with on-the-wire compression (zstd via Compress::Zstd, falling back to gzip via core IO::Compress::Gzip). Sets Content-Encoding so ClickHouse decompresses transparently.

eg/insert_async_ev.pl

Non-blocking concurrent inserts using EV's event loop with raw HTTP sockets, paired with this encoder's "streamer". Demonstrates the "many in-flight inserts without blocking on each round-trip" pattern.

eg/insert_with_lowcardinality.pl

Measures the wire-size reduction (~50% on event/log data) and encoding throughput of LowCardinality(String) versus plain String for the typical case where a column has few distinct values that repeat across many rows.

eg/json_lines_ingest.pl

Reads NDJSON from STDIN or a file, maps each object's fields onto a ClickHouse table's columns (discovered via for_table), and inserts batched blocks over HTTP.

eg/streaming_aggregate.pl

Pre-aggregates an event stream in Perl (per minute, per key) and flushes rolled-up counters to a SummingMergeTree on a wall-clock timer. The classic pattern when the firehose is too high-cardinality to store as raw rows.

eg/postgres_to_clickhouse.pl

Replicates a PostgreSQL table to ClickHouse using DBD::Pg on the source side and this encoder's streamer on the destination side. Memory is bounded by the batch size, so it scales to hundreds of millions of rows.

eg/clickhouse_replication.pl

Replicates one ClickHouse table to another (potentially on a different server) by streaming Native bytes end-to-end via a temp-file spool.

eg/parallel_loader.pl

Forks N worker processes, each ingesting one slice of the input. Workers share nothing; each opens its own HTTP connection. Scales network-bound ingestion linearly with worker count.

eg/redis_to_clickhouse.pl

Drains a Redis stream (XREADGROUP) or list (BRPOP) into a ClickHouse table, with idle-flush so the destination doesn't see arbitrarily delayed batches when the source is quiet.

eg/syslog_ingest.pl

Reads RFC 5424 syslog lines from STDIN, parses lossily (any unparseable line still goes through with the raw text in msg), and inserts into a fixed schema.

eg/json_streaming.pl

NDJSON from STDIN into a JSON column via the encoder's streaming mode; one HTTP request per batch instead of one per line.

eg/json_query.pl

select ... format Native over HTTP and walks the returned blocks via "decode_blocks", demonstrating the symmetric decode path for JSON columns.

eg/json_aggregate.pl

Sketches an aggregation pipeline that bins JSON events by path and emits the aggregates as a second insert.

eg/migrate_table.pl

Copies one CH table into another (possibly on a different host) by discovering the source schema via "for_table" and streaming Native blocks through.

eg/native_to_jsonl.pl

Reads a Native byte stream from STDIN and prints each row as NDJSON on STDOUT; the dual of json_lines_ingest.pl.

eg/replay.pl

Replays a captured Native byte stream against a table, useful for post-hoc reproduction of an ingest bug from a saved request body.

eg/select_blocks_streaming.pl

Streaming select counterpart to insert_streaming.pl: uses "select_blocks" to walk a select response block-by-block, with optional column projection via --keep.

eg/json_path_projection.pl

Demo of keep => {...} projection on top of "select_blocks": decodes only the requested columns and prints one row per line.

eg/csv_export.pl

select to CSV: counterpart to from_csv.pl. Drives a CSV writer from a streaming select, emitting the header row from the first block's column names.

eg/migrate_with_transform.pl

CH-to-CH migration with a row-level transform between read and write. Discovers source schema via "for_table", streams the rows via "select_blocks", applies a user-supplied transform coderef, and forwards survivors through "bulk_inserter".

eg/replay_pcap.pl

Replay a captured Native byte stream (e.g. saved from curl --output of a select ... format native response) and print a block-by-block summary. Off-line debugging tool.

eg/tcp_compressed_pipeline.pl

End-to-end TCP insert pipeline that negotiates compression in pack_query, then wraps every pack_data / pack_data_end in CH's compressed-block framing. Showcases the matched-pair convention against a real ClickHouse server (protocol revision <= 54474; see "CAVEATS" in ClickHouse::Encoder::TCP).

eg/rowbinary_insert.pl

insert using the RowBinary format via "encode_row_binary", with a local "decode_row_binary" round-trip check. For interop with pipelines that speak RowBinary rather than Native.

eg/async_insert.pl

Server-side async insert - async_insert=1 (and optionally wait_for_async_insert) passed through the settings option, so the server buffers and background-flushes the batch.

eg/geo_from_wkt.pl

Ingest geometry given as Well-Known-Text into Point / Polygon columns using "parse_wkt" to convert each WKT string.

eg/insert_with_settings.pl

insert with per-query CH settings and an insert_deduplication_token, showing how an identical retry under the same token is deduplicated server-side.

eg/ping_healthcheck.pl

A wait-for-server readiness gate built on "ping" - retry until the /ping endpoint answers, then proceed.

eg/observability.pl

Reads server-side stats from an insert pipeline: per-batch $bi->last_response->{ch} detail plus the cumulative $bi->summary rollup of X-ClickHouse-Summary counters.

eg/schema_migrate.pl

Fetches show create table, parses it with "parse_create_table", diffs the columns against a desired schema, and emits the alter table migration via "apply_schema_diff".

For a working reference of the ClickHouse Native binary format that this module emits, see doc/wire-format.md.

PERFORMANCE

The encoder is written so that the dominant cost on most workloads is the data-generation Perl code, not the encoding step. Some indicative numbers from bench/local_insert_benchmark.pl (500_000 rows, 5 columns including Array(String), in-process via clickhouse-local):

Native (this module)    encode 0.29s + ingest 0.13s = 0.42s end-to-end
TabSeparated (Perl)     encode 0.79s + ingest 0.11s = 0.90s end-to-end
-> Native ~2x faster end-to-end, payload ~18% smaller

For wide tables with many string columns the gap widens (the XS encoder pulls further ahead of plain-Perl TSV serialization). See bench/ for reproducible scripts and additional scenarios.

CAVEATS

  • A 64-bit Perl is required ($Config{ivsize} >= 8). On a 32-bit perl Makefile.PL exits with OS unsupported (CPAN Testers reports NA, not FAIL) because Int64 / UInt64 would otherwise silently truncate.

  • Output is little-endian; this matches every supported ClickHouse server build. The encoder relies on the host's native float byte order matching its native integer byte order, which holds on every IEEE 754 platform in practice.

  • encode builds the whole block in memory. For very large batches you typically want to chunk into multiple encode calls and send each block sequentially.

  • "compress_native_block" and "decompress_native_block" bundle a port of CityHash128 v1.0.2 (the "cityhash102" variant ClickHouse uses internally) in cityhash.c. Wire compatibility with a real CH server is exercised by t/live.t against an installed server.

SEE ALSO

ClickHouse::Encoder::TCP - pack/unpack a subset of ClickHouse's native TCP protocol packets, for driving insert pipelines directly over port 9000 (transport is the caller's choice). Targets protocol revision 54429.

EV::ClickHouse - async ClickHouse client supporting both HTTP and the native TCP protocol with the full handshake (including chunking negotiation that this module skips).

ClickHouse Native format, ClickHouse HTTP interface.

AUTHOR

vividsnow

LICENSE

This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself.