NAME
DBIx::Class::Async - Asynchronous database operations for DBIx::Class
VERSION
Version 0.40
DISCLAIMER
This is pure experimental currently.
You are encouraged to try and share your suggestions.
SYNOPSIS
use IO::Async::Loop;
use DBIx::Class::Async;
my $loop = IO::Async::Loop->new;
my $db = DBIx::Class::Async->new(
schema_class => 'MyApp::Schema',
connect_info => [
'dbi:SQLite:dbname=my.db',
undef,
undef,
{ sqlite_unicode => 1 },
],
workers => 2,
cache_ttl => 60,
loop => $loop,
);
my $f = $db->search('User', { active => 1 });
$f->on_done(sub {
my ($rows) = @_;
for my $row (@$rows) {
say $row->{name};
}
$loop->stop;
});
$f->on_fail(sub {
warn "Query failed: @_";
$loop->stop;
});
$loop->run;
$db->disconnect;
DESCRIPTION
DBIx::Class::Async provides asynchronous access to DBIx::Class using a process-based worker pool built on IO::Async::Function.
Each worker maintains a persistent database connection and executes blocking DBIx::Class operations outside the main event loop, returning results via Future objects.
Returned rows are plain Perl data structures (hashrefs), making results safe to pass across process boundaries.
Features include:
Process-based worker pool using IO::Async
Persistent DBIx::Class connections per worker
Non-blocking CRUD operations via Future
Optional result caching via CHI
Transaction support (single worker only)
Optional retry with exponential backoff
Health checks and graceful shutdown
CONSTRUCTOR
new
Creates a new DBIx::Class::Async instance.
my $async_db = DBIx::Class::Async->new(
schema_class => 'MyApp::Schema', # Required
connect_info => $connect_info, # Required
workers => 4, # Optional, default 4
loop => $loop, # Optional IO::Async::Loop
cache_ttl => 300, # Optional cache TTL in secs
cache => $chi_object, # Optional custom cache
enable_retry => 1, # Optional, default 0
max_retries => 3, # Optional, default 3
retry_delay => 1, # Optional, default 1 sec
query_timeout => 30, # Optional, default 30 secs
enable_metrics => 1, # Optional, default 0
health_check => 300, # Optional health check interval
on_connect_do => $sql_commands, # Optional SQL to run on connect
);
Parameters:
schema_class (Required)
The DBIx::Class::Schema class name.
connect_info (Required)
Arrayref of connection parameters passed to
$schema_class->connect().workers (Optional, default: 4)
Number of worker processes for the connection pool.
loop (Optional)
IO::Async::Loop instance. A new loop will be created if not provided.
cache_ttl (Optional, default: 300)
Cache time-to-live in seconds. Set to 0 to disable caching.
cache (Optional)
CHI cache object for custom cache configuration.
enable_retry (Optional, default: 0)
Enable automatic retry for deadlocks and timeouts.
max_retries (Optional, default: 3)
Maximum number of retry attempts.
retry_delay (Optional, default: 1)
Initial delay between retries in seconds (uses exponential backoff).
query_timeout (Optional, default: 30)
Query timeout in seconds.
enable_metrics (Optional, default: 0)
Enable metrics collection (requires Metrics::Any).
health_check (Optional, default: 300)
Health check interval in seconds. Set to 0 to disable health checks.
on_connect_do (Optional)
Arrayref of SQL statements to execute after connecting.
METHODS
count
Counts rows matching conditions.
my $count = await $async_db->count(
$resultset_name,
{ active => 1, status => 'pending' } # Optional
);
Returns: Integer count.
create
Creates a new row.
my $new_row = await $async_db->create(
$resultset_name,
{ name => 'John', email => 'john@example.com' }
);
Returns: Hashref of created row data.
delete
Deletes a row.
my $success = await $async_db->delete($resultset_name, $id);
Returns: 1 if deleted, 0 if row not found.
deploy
my $future = $schema->deploy(\%sqlt_args?, $dir?);
$future->on_done(sub {
my $self = shift;
print "Database schema deployed successfully.\n";
});
- Arguments:
\%sqlt_args?,$dir? - Return Value: Future resolving to
$self
Asynchronously deploys the schema to the database.
This method dispatches the deployment task to a background worker process via the internal worker pool. This ensures that the often-slow process of generating SQL (via SQL::Translator) and executing DDL statements does not block the main application thread or event loop.
The arguments are passed directly to the underlying "deploy" in DBIx::Class::Schema method in the worker. Common %sqlt_args include:
add_drop_table- Add aDROP TABLEbefore eachCREATE.quote_identifiers- Toggle database-specific identifier quoting.
If the deployment fails (e.g., due to permission issues or missing dependencies like SQL::Translator), the returned Future will fail with the error string returned by the worker.
disconnect
Gracefully disconnects all workers and cleans up resources.
$async_db->disconnect;
find
Finds a single row by primary key.
my $row = await $async_db->find($resultset_name, $id);
Returns: Hashref of row data or undef if not found.
health_check
Performs health check on all workers.
my $healthy_workers = await $async_db->health_check;
Returns: Number of healthy workers.
loop
Returns the IO::Async::Loop instance.
my $loop = $async_db->loop;
raw_query
Executes raw SQL query.
my $results = await $async_db->raw_query(
'SELECT * FROM users WHERE age > ? AND status = ?',
[25, 'active'] # Optional bind values
);
Returns: Arrayref of hashrefs.
schema_class
Returns the schema class name.
my $class = $async_db->schema_class;
search
Performs a search query.
my $results = await $async_db->search(
$resultset_name,
$search_conditions, # Optional hashref
$attributes, # Optional hashref
);
Attributes may include:
{
order_by => 'name DESC',
rows => 50,
page => 2,
columns => [qw/id name/],
prefetch => 'relation',
cache => 1,
cache_key => 'custom_key',
}
All results are returned as arrayrefs of hashrefs.
search_multi
Executes multiple search queries concurrently.
my @results = await $async_db->search_multi(
['User', { active => 1 }, { rows => 10 }],
['Product', { category => 'books' }],
['Order', undef, { order_by => 'created_at DESC', rows => 5 }],
);
Returns: Array of results in the same order as queries.
search_with_prefetch
my $future = $async_db->search_with_prefetch(
$source_name,
\%condition,
$prefetch_spec,
\%extra_attributes
);
Performs an asynchronous search while eager-loading related data. This method is specifically designed to solve the "N+1 query" problem in an asynchronous environment.
Arguments:
$source_name: The name of the ResultSource (e.g., 'User').\%condition: A standardDBIx::Classsearch condition.$prefetch_spec: A standardprefetchattribute (string, arrayref, or hashref).\%extra_attributes: Optional search attributes (order_by, rows, etc.).
This method performs "Deep Serialiisation." Since DBIx::Class row objects contain live database handles that cannot be sent across process boundaries, this method ensures that the background worker:
- 1. Executes the join and collapses the result set to avoid duplicate parent rows.
- 2. Recursively converts the nested object tree into a transportable data structure.
- 3. Transports the data to the parent process where it is re-inflated into DBIx::Class::Async::Row objects, with the relationship data accessible via standard accessors.
Note: Unlike standard DBIx::Class, accessing a relationship that was not prefetched will fail, as the result row does not have a persistent connection to the database in the parent process.
stats
Returns statistics about database operations.
my $stats = $async_db->stats;
Returns: Hashref with query counts, cache hits, errors, etc.
txn_batch
Executes a batch of operations within a transaction. This is the recommended alternative to txn_do as it avoids CODE reference serialisation issues.
my $result = await $async_db->txn_batch(
# Update operations
{ type => 'update', resultset => 'Account', id => 1,
data => { balance => \'balance - 100' } },
{ type => 'update', resultset => 'Account', id => 2,
data => { balance => \'balance + 100' } },
# Create operation
{ type => 'create', resultset => 'Log',
data => { event => 'transfer', amount => 100, timestamp => \'NOW()' } },
);
# Returns count of successful operations
say "Executed $result operations in transaction";
Supported operation types:
update - Update an existing record
{ type => 'update', resultset => 'User', # ResultSet name id => 123, # Primary key value data => { name => 'New Name', status => 'active' } }create - Create a new record
{ type => 'create', resultset => 'Order', data => { user_id => 1, amount => 99.99, status => 'pending' } }delete - Delete a record
{ type => 'delete', resultset => 'Session', id => 456 }raw - Execute raw SQL (Atomic)
{ type => 'raw', sql => 'UPDATE accounts SET balance = balance - ? WHERE id = ?', bind => [100, 1] }Executes a raw SQL statement via the worker's database handle. Note: Always use placeholders (
?and thebindattribute) to prevent SQL injection.
Literal SQL Support
All data hashes support standard DBIx::Class literal SQL via scalar references, for example: data => { updated_at => \'NOW()' }. These are safely serialized and executed within the worker transaction.
Atomicity and Error Handling
All operations within the batch are wrapped in a single database transaction. If any operation fails (e.g., a constraint violation or a missing record), the worker will immediately:
- 1. Roll back all changes made within that batch.
- 2. Fail the Future in the parent process with the specific error message.
txn_do
Executes a transaction.
my $result = await $async_db->txn_do(sub {
my $schema = shift;
# Multiple operations that should succeed or fail together
$schema->resultset('Account')->find(1)->update({ balance => \'balance - 100' });
$schema->resultset('Account')->find(2)->update({ balance => \'balance + 100' });
return 'transfer_complete';
});
The callback receives a DBIx::Class::Schema instance and should return the transaction result.
IMPORTANT: This method has limitations due to serialisation constraints. The CODE reference passed to txn_do must be serialisable by Sereal, which may not support anonymous subroutines or CODE references with closed over variables in all configurations.
If you encounter serialisation errors, consider:
Using named subroutines instead of anonymous ones
Recompiling Sereal with
ENABLE_SRL_CODEREFsupportUsing individual async operations instead of transactions
Using the
txn_batchmethod for predefined operations
Common error: Found type 13 CODE(...), but it is not representable by the Sereal encoding format
update
Updates an existing row.
my $updated_row = await $async_db->update(
$resultset_name,
$id,
{ name => 'Jane', status => 'active' }
);
Returns: Hashref of updated row data or undef if row not found.
update_bulk
$db->update_bulk($table, $condition, $data);
Performs a bulk update operation on multiple rows in the specified table.
This method updates all rows in the given table that match the specified conditions with the provided data values. It is particularly useful for batch operations where multiple records need to be modified with the same set of changes.
- Parameters
-
$table-
The name of the table to update (String, required).
$condition-
A hash reference specifying the WHERE conditions for selecting rows to update. Each key-value pair in the hash represents a column and its required value. Rows matching ALL conditions will be updated (HashRef, required).
Example:
{ status => 'pending', active => 1 } $data-
A hash reference containing the column-value pairs to update. Each key-value pair specifies a column and its new value (HashRef, required).
Example:
{ status => 'processed', updated_at => '2024-01-01 10:00:00' }
- Returns
-
Returns the result of the update operation from the worker. Typically this would be the number of rows affected or a success indicator, depending on your worker implementation.
- Exceptions
-
Throws a validation error if any parameter does not match the expected type.
Throws an exception if the underlying worker call fails.
- Examples
-
# Update all pending orders from a specific customer my $result = $db->update_bulk( 'orders', { customer_id => 123, status => 'pending' }, { status => 'processed', processed_at => \'NOW()' } ); print "Updated $result rows\n"; # Deactivate all users who haven't logged in since 2023 $db->update_bulk( 'users', { last_login => { '<' => '2023-01-01' } }, { active => 0, deactivation_date => \'CURRENT_DATE' } );
PERFORMANCE TIPS
Worker Count
Adjust the
workersparameter based on your database connection limits and expected concurrency. Typically 2-4 workers per CPU core works well.Caching
Use caching for read-heavy workloads. Set
cache_ttlappropriately for your data volatility.Batch Operations
Use
search_multifor fetching unrelated data concurrently rather than sequentialawaitcalls.Connection Pooling
Each worker maintains its own persistent connection. Monitor database connection counts if using many instances.
Timeouts
Set appropriate
query_timeoutvalues to prevent hung queries from blocking workers.
ERROR HANDLING
All methods throw exceptions on failure. Common error scenarios:
Database connection failures
Thrown during initial connection or health checks.
Query timeouts
Thrown when queries exceed
query_timeout.Deadlocks
Automatically retried if
enable_retryis true.Invalid SQL/schema errors
Passed through from DBIx::Class.
Use try/catch blocks or ->catch on futures to handle errors.
METRICS
When enable_metrics is true and Metrics::Any is installed, the module collects:
db_async_queries_total- Total query countdb_async_cache_hits_total- Cache hit countdb_async_cache_misses_total- Cache miss countdb_async_query_duration_seconds- Query duration histogramdb_async_workers_active- Active worker count
LIMITATIONS
Result objects
Returned rows are plain hashrefs, not DBIx::Class row objects.
Transactions
Transactions execute on a single worker only.
Large result sets
All rows are loaded into memory. Use pagination for large datasets.
DEDICATION
This module is dedicated to the memory of Matt S. Trout (mst), a brilliant contributor to the Perl community, DBIx::Class core developer, and friend who is deeply missed.
AUTHOR
Mohammad Sajid Anwar, <mohammad.anwar at yahoo.com>
REPOSITORY
https://github.com/manwar/DBIx-Class-Async
BUGS
Please report any bugs or feature requests through the web interface at https://github.com/manwar/DBIx-Class-Async/issues. I will be notified and then you'll automatically be notified of progress on your bug as I make changes.
SUPPORT
You can find documentation for this module with the perldoc command.
perldoc DBIx::Class::Async
You can also look for information at:
BUG Report
CPAN Ratings
Search MetaCPAN
LICENSE AND COPYRIGHT
Copyright (C) 2026 Mohammad Sajid Anwar.
This program is free software; you can redistribute it and / or modify it under the terms of the the Artistic License (2.0). You may obtain a copy of the full license at:
http://www.perlfoundation.org/artistic_license_2_0
Any use, modification, and distribution of the Standard or Modified Versions is governed by this Artistic License.By using, modifying or distributing the Package, you accept this license. Do not use, modify, or distribute the Package, if you do not accept this license.
If your Modified Version has been derived from a Modified Version made by someone other than you,you are nevertheless required to ensure that your Modified Version complies with the requirements of this license.
This license does not grant you the right to use any trademark, service mark, tradename, or logo of the Copyright Holder.
This license includes the non-exclusive, worldwide, free-of-charge patent license to make, have made, use, offer to sell, sell, import and otherwise transfer the Package with respect to any patent claims licensable by the Copyright Holder that are necessarily infringed by the Package. If you institute patent litigation (including a cross-claim or counterclaim) against any party alleging that the Package constitutes direct or contributory patent infringement,then this Artistic License to you shall terminate on the date that such litigation is filed.
Disclaimer of Warranty: THE PACKAGE IS PROVIDED BY THE COPYRIGHT HOLDER AND CONTRIBUTORS "AS IS' AND WITHOUT ANY EXPRESS OR IMPLIED WARRANTIES. THE IMPLIED WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, OR NON-INFRINGEMENT ARE DISCLAIMED TO THE EXTENT PERMITTED BY YOUR LOCAL LAW. UNLESS REQUIRED BY LAW, NO COPYRIGHT HOLDER OR CONTRIBUTOR WILL BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING IN ANY WAY OUT OF THE USE OF THE PACKAGE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.