NAME

DBIx::Class::Async - Asynchronous database operations for DBIx::Class

VERSION

Version 0.40

DISCLAIMER

This is pure experimental currently.

You are encouraged to try and share your suggestions.

SYNOPSIS

use IO::Async::Loop;
use DBIx::Class::Async;

my $loop = IO::Async::Loop->new;
my $db   = DBIx::Class::Async->new(
    schema_class => 'MyApp::Schema',
    connect_info => [
        'dbi:SQLite:dbname=my.db',
        undef,
        undef,
        { sqlite_unicode => 1 },
    ],
    workers   => 2,
    cache_ttl => 60,
    loop      => $loop,
);

my $f = $db->search('User', { active => 1 });

$f->on_done(sub {
    my ($rows) = @_;
    for my $row (@$rows) {
        say $row->{name};
    }
    $loop->stop;
});

$f->on_fail(sub {
    warn "Query failed: @_";
    $loop->stop;
});

$loop->run;

$db->disconnect;

DESCRIPTION

DBIx::Class::Async provides asynchronous access to DBIx::Class using a process-based worker pool built on IO::Async::Function.

Each worker maintains a persistent database connection and executes blocking DBIx::Class operations outside the main event loop, returning results via Future objects.

Returned rows are plain Perl data structures (hashrefs), making results safe to pass across process boundaries.

Features include:

  • Process-based worker pool using IO::Async

  • Persistent DBIx::Class connections per worker

  • Non-blocking CRUD operations via Future

  • Optional result caching via CHI

  • Transaction support (single worker only)

  • Optional retry with exponential backoff

  • Health checks and graceful shutdown

CONSTRUCTOR

new

Creates a new DBIx::Class::Async instance.

my $async_db = DBIx::Class::Async->new(
    schema_class   => 'MyApp::Schema', # Required
    connect_info   => $connect_info,   # Required
    workers        => 4,               # Optional, default 4
    loop           => $loop,           # Optional IO::Async::Loop
    cache_ttl      => 300,             # Optional cache TTL in secs
    cache          => $chi_object,     # Optional custom cache
    enable_retry   => 1,               # Optional, default 0
    max_retries    => 3,               # Optional, default 3
    retry_delay    => 1,               # Optional, default 1 sec
    query_timeout  => 30,              # Optional, default 30 secs
    enable_metrics => 1,               # Optional, default 0
    health_check   => 300,             # Optional health check interval
    on_connect_do  => $sql_commands,   # Optional SQL to run on connect
);

Parameters:

  • schema_class (Required)

    The DBIx::Class::Schema class name.

  • connect_info (Required)

    Arrayref of connection parameters passed to $schema_class->connect().

  • workers (Optional, default: 4)

    Number of worker processes for the connection pool.

  • loop (Optional)

    IO::Async::Loop instance. A new loop will be created if not provided.

  • cache_ttl (Optional, default: 300)

    Cache time-to-live in seconds. Set to 0 to disable caching.

  • cache (Optional)

    CHI cache object for custom cache configuration.

  • enable_retry (Optional, default: 0)

    Enable automatic retry for deadlocks and timeouts.

  • max_retries (Optional, default: 3)

    Maximum number of retry attempts.

  • retry_delay (Optional, default: 1)

    Initial delay between retries in seconds (uses exponential backoff).

  • query_timeout (Optional, default: 30)

    Query timeout in seconds.

  • enable_metrics (Optional, default: 0)

    Enable metrics collection (requires Metrics::Any).

  • health_check (Optional, default: 300)

    Health check interval in seconds. Set to 0 to disable health checks.

  • on_connect_do (Optional)

    Arrayref of SQL statements to execute after connecting.

METHODS

count

Counts rows matching conditions.

my $count = await $async_db->count(
    $resultset_name,
    { active => 1, status => 'pending' }  # Optional
);

Returns: Integer count.

create

Creates a new row.

my $new_row = await $async_db->create(
    $resultset_name,
    { name => 'John', email => 'john@example.com' }
);

Returns: Hashref of created row data.

delete

Deletes a row.

my $success = await $async_db->delete($resultset_name, $id);

Returns: 1 if deleted, 0 if row not found.

deploy

my $future = $schema->deploy(\%sqlt_args?, $dir?);

$future->on_done(sub {
    my $self = shift;
    print "Database schema deployed successfully.\n";
});
Arguments: \%sqlt_args?, $dir?
Return Value: Future resolving to $self

Asynchronously deploys the schema to the database.

This method dispatches the deployment task to a background worker process via the internal worker pool. This ensures that the often-slow process of generating SQL (via SQL::Translator) and executing DDL statements does not block the main application thread or event loop.

The arguments are passed directly to the underlying "deploy" in DBIx::Class::Schema method in the worker. Common %sqlt_args include:

  • add_drop_table - Add a DROP TABLE before each CREATE.

  • quote_identifiers - Toggle database-specific identifier quoting.

If the deployment fails (e.g., due to permission issues or missing dependencies like SQL::Translator), the returned Future will fail with the error string returned by the worker.

disconnect

Gracefully disconnects all workers and cleans up resources.

$async_db->disconnect;

find

Finds a single row by primary key.

my $row = await $async_db->find($resultset_name, $id);

Returns: Hashref of row data or undef if not found.

health_check

Performs health check on all workers.

my $healthy_workers = await $async_db->health_check;

Returns: Number of healthy workers.

loop

Returns the IO::Async::Loop instance.

my $loop = $async_db->loop;

raw_query

Executes raw SQL query.

my $results = await $async_db->raw_query(
    'SELECT * FROM users WHERE age > ? AND status = ?',
    [25, 'active']  # Optional bind values
);

Returns: Arrayref of hashrefs.

schema_class

Returns the schema class name.

my $class = $async_db->schema_class;

Performs a search query.

my $results = await $async_db->search(
    $resultset_name,
    $search_conditions,    # Optional hashref
    $attributes,           # Optional hashref
);

Attributes may include:

{
    order_by  => 'name DESC',
    rows      => 50,
    page      => 2,
    columns   => [qw/id name/],
    prefetch  => 'relation',
    cache     => 1,
    cache_key => 'custom_key',
}

All results are returned as arrayrefs of hashrefs.

search_multi

Executes multiple search queries concurrently.

my @results = await $async_db->search_multi(
    ['User',    { active => 1 }, { rows => 10 }],
    ['Product', { category => 'books' }],
    ['Order',   undef, { order_by => 'created_at DESC', rows => 5 }],
);

Returns: Array of results in the same order as queries.

search_with_prefetch

my $future = $async_db->search_with_prefetch(
    $source_name,
    \%condition,
    $prefetch_spec,
    \%extra_attributes
);

Performs an asynchronous search while eager-loading related data. This method is specifically designed to solve the "N+1 query" problem in an asynchronous environment.

Arguments:

  • $source_name: The name of the ResultSource (e.g., 'User').

  • \%condition: A standard DBIx::Class search condition.

  • $prefetch_spec: A standard prefetch attribute (string, arrayref, or hashref).

  • \%extra_attributes: Optional search attributes (order_by, rows, etc.).

This method performs "Deep Serialiisation." Since DBIx::Class row objects contain live database handles that cannot be sent across process boundaries, this method ensures that the background worker:

1. Executes the join and collapses the result set to avoid duplicate parent rows.
2. Recursively converts the nested object tree into a transportable data structure.
3. Transports the data to the parent process where it is re-inflated into DBIx::Class::Async::Row objects, with the relationship data accessible via standard accessors.

Note: Unlike standard DBIx::Class, accessing a relationship that was not prefetched will fail, as the result row does not have a persistent connection to the database in the parent process.

stats

Returns statistics about database operations.

my $stats = $async_db->stats;

Returns: Hashref with query counts, cache hits, errors, etc.

txn_batch

Executes a batch of operations within a transaction. This is the recommended alternative to txn_do as it avoids CODE reference serialisation issues.

my $result = await $async_db->txn_batch(
    # Update operations
    { type => 'update', resultset => 'Account', id => 1,
      data => { balance => \'balance - 100' } },
    { type => 'update', resultset => 'Account', id => 2,
      data => { balance => \'balance + 100' } },

    # Create operation
    { type => 'create', resultset => 'Log',
      data => { event => 'transfer', amount => 100, timestamp => \'NOW()' } },
);

# Returns count of successful operations
say "Executed $result operations in transaction";

Supported operation types:

  • update - Update an existing record

    {
        type      => 'update',
        resultset => 'User',       # ResultSet name
        id        => 123,          # Primary key value
        data      => { name => 'New Name', status => 'active' }
    }
  • create - Create a new record

    {
        type      => 'create',
        resultset => 'Order',
        data      => { user_id => 1, amount => 99.99, status => 'pending' }
    }
  • delete - Delete a record

    {
        type      => 'delete',
        resultset => 'Session',
        id        => 456
    }
  • raw - Execute raw SQL (Atomic)

    {
        type => 'raw',
        sql  => 'UPDATE accounts SET balance = balance - ? WHERE id = ?',
        bind => [100, 1]
    }

    Executes a raw SQL statement via the worker's database handle. Note: Always use placeholders (? and the bind attribute) to prevent SQL injection.

Literal SQL Support

All data hashes support standard DBIx::Class literal SQL via scalar references, for example: data => { updated_at => \'NOW()' }. These are safely serialized and executed within the worker transaction.

Atomicity and Error Handling

All operations within the batch are wrapped in a single database transaction. If any operation fails (e.g., a constraint violation or a missing record), the worker will immediately:

1. Roll back all changes made within that batch.
2. Fail the Future in the parent process with the specific error message.

txn_do

Executes a transaction.

my $result = await $async_db->txn_do(sub {
    my $schema = shift;

    # Multiple operations that should succeed or fail together
    $schema->resultset('Account')->find(1)->update({ balance => \'balance - 100' });
    $schema->resultset('Account')->find(2)->update({ balance => \'balance + 100' });

    return 'transfer_complete';
});

The callback receives a DBIx::Class::Schema instance and should return the transaction result.

IMPORTANT: This method has limitations due to serialisation constraints. The CODE reference passed to txn_do must be serialisable by Sereal, which may not support anonymous subroutines or CODE references with closed over variables in all configurations.

If you encounter serialisation errors, consider:

  • Using named subroutines instead of anonymous ones

  • Recompiling Sereal with ENABLE_SRL_CODEREF support

  • Using individual async operations instead of transactions

  • Using the txn_batch method for predefined operations

Common error: Found type 13 CODE(...), but it is not representable by the Sereal encoding format

update

Updates an existing row.

my $updated_row = await $async_db->update(
    $resultset_name,
    $id,
    { name => 'Jane', status => 'active' }
);

Returns: Hashref of updated row data or undef if row not found.

update_bulk

$db->update_bulk($table, $condition, $data);

Performs a bulk update operation on multiple rows in the specified table.

This method updates all rows in the given table that match the specified conditions with the provided data values. It is particularly useful for batch operations where multiple records need to be modified with the same set of changes.

Parameters
$table

The name of the table to update (String, required).

$condition

A hash reference specifying the WHERE conditions for selecting rows to update. Each key-value pair in the hash represents a column and its required value. Rows matching ALL conditions will be updated (HashRef, required).

Example: { status => 'pending', active => 1 }

$data

A hash reference containing the column-value pairs to update. Each key-value pair specifies a column and its new value (HashRef, required).

Example: { status => 'processed', updated_at => '2024-01-01 10:00:00' }

Returns

Returns the result of the update operation from the worker. Typically this would be the number of rows affected or a success indicator, depending on your worker implementation.

Exceptions
  • Throws a validation error if any parameter does not match the expected type.

  • Throws an exception if the underlying worker call fails.

Examples
# Update all pending orders from a specific customer
my $result = $db->update_bulk(
    'orders',
    { customer_id => 123, status => 'pending' },
    { status      => 'processed', processed_at => \'NOW()' }
);

print "Updated $result rows\n";

# Deactivate all users who haven't logged in since 2023
$db->update_bulk(
    'users',
    { last_login => { '<' => '2023-01-01' } },
    { active     => 0, deactivation_date => \'CURRENT_DATE' }
);

PERFORMANCE TIPS

  • Worker Count

    Adjust the workers parameter based on your database connection limits and expected concurrency. Typically 2-4 workers per CPU core works well.

  • Caching

    Use caching for read-heavy workloads. Set cache_ttl appropriately for your data volatility.

  • Batch Operations

    Use search_multi for fetching unrelated data concurrently rather than sequential await calls.

  • Connection Pooling

    Each worker maintains its own persistent connection. Monitor database connection counts if using many instances.

  • Timeouts

    Set appropriate query_timeout values to prevent hung queries from blocking workers.

ERROR HANDLING

All methods throw exceptions on failure. Common error scenarios:

  • Database connection failures

    Thrown during initial connection or health checks.

  • Query timeouts

    Thrown when queries exceed query_timeout.

  • Deadlocks

    Automatically retried if enable_retry is true.

  • Invalid SQL/schema errors

    Passed through from DBIx::Class.

Use try/catch blocks or ->catch on futures to handle errors.

METRICS

When enable_metrics is true and Metrics::Any is installed, the module collects:

  • db_async_queries_total - Total query count

  • db_async_cache_hits_total - Cache hit count

  • db_async_cache_misses_total - Cache miss count

  • db_async_query_duration_seconds - Query duration histogram

  • db_async_workers_active - Active worker count

LIMITATIONS

  • Result objects

    Returned rows are plain hashrefs, not DBIx::Class row objects.

  • Transactions

    Transactions execute on a single worker only.

  • Large result sets

    All rows are loaded into memory. Use pagination for large datasets.

DEDICATION

This module is dedicated to the memory of Matt S. Trout (mst), a brilliant contributor to the Perl community, DBIx::Class core developer, and friend who is deeply missed.

AUTHOR

Mohammad Sajid Anwar, <mohammad.anwar at yahoo.com>

REPOSITORY

https://github.com/manwar/DBIx-Class-Async

BUGS

Please report any bugs or feature requests through the web interface at https://github.com/manwar/DBIx-Class-Async/issues. I will be notified and then you'll automatically be notified of progress on your bug as I make changes.

SUPPORT

You can find documentation for this module with the perldoc command.

perldoc DBIx::Class::Async

You can also look for information at:

LICENSE AND COPYRIGHT

Copyright (C) 2026 Mohammad Sajid Anwar.

This program is free software; you can redistribute it and / or modify it under the terms of the the Artistic License (2.0). You may obtain a copy of the full license at:

http://www.perlfoundation.org/artistic_license_2_0

Any use, modification, and distribution of the Standard or Modified Versions is governed by this Artistic License.By using, modifying or distributing the Package, you accept this license. Do not use, modify, or distribute the Package, if you do not accept this license.

If your Modified Version has been derived from a Modified Version made by someone other than you,you are nevertheless required to ensure that your Modified Version complies with the requirements of this license.

This license does not grant you the right to use any trademark, service mark, tradename, or logo of the Copyright Holder.

This license includes the non-exclusive, worldwide, free-of-charge patent license to make, have made, use, offer to sell, sell, import and otherwise transfer the Package with respect to any patent claims licensable by the Copyright Holder that are necessarily infringed by the Package. If you institute patent litigation (including a cross-claim or counterclaim) against any party alleging that the Package constitutes direct or contributory patent infringement,then this Artistic License to you shall terminate on the date that such litigation is filed.

Disclaimer of Warranty: THE PACKAGE IS PROVIDED BY THE COPYRIGHT HOLDER AND CONTRIBUTORS "AS IS' AND WITHOUT ANY EXPRESS OR IMPLIED WARRANTIES. THE IMPLIED WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, OR NON-INFRINGEMENT ARE DISCLAIMED TO THE EXTENT PERMITTED BY YOUR LOCAL LAW. UNLESS REQUIRED BY LAW, NO COPYRIGHT HOLDER OR CONTRIBUTOR WILL BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING IN ANY WAY OUT OF THE USE OF THE PACKAGE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.