NAME

DBIx::Class::Async - Asynchronous database operations for DBIx::Class

VERSION

Version 0.01

DISCLAIMER

This is pure experimental currently.

You are encouraged to try and share your suggestions.

SYNOPSIS

use IO::Async::Loop;
use DBIx::Class::Async;

my $loop = IO::Async::Loop->new;

my $db = DBIx::Class::Async->new(
    schema_class => 'MyApp::Schema',
    connect_info => [
        'dbi:SQLite:dbname=my.db',
        undef,
        undef,
        { sqlite_unicode => 1 },
    ],
    workers   => 2,
    cache_ttl => 60,
    loop      => $loop,
);

my $f = $db->search('User', { active => 1 });

$f->on_done(sub {
    my ($rows) = @_;
    for my $row (@$rows) {
        say $row->{name};
    }
    $loop->stop;
});

$f->on_fail(sub {
    warn "Query failed: @_";
    $loop->stop;
});

$loop->run;

$db->disconnect;

DESCRIPTION

DBIx::Class::Async provides asynchronous access to DBIx::Class using a process-based worker pool built on IO::Async::Function.

Each worker maintains a persistent database connection and executes blocking DBIx::Class operations outside the main event loop, returning results via Future objects.

Returned rows are plain Perl data structures (hashrefs), making results safe to pass across process boundaries.

Features include:

  • Process-based worker pool using IO::Async

  • Persistent DBIx::Class connections per worker

  • Non-blocking CRUD operations via Future

  • Optional result caching via CHI

  • Transaction support (single worker only)

  • Optional retry with exponential backoff

  • Health checks and graceful shutdown

CONSTRUCTOR

new

Creates a new DBIx::Class::Async instance.

my $async_db = DBIx::Class::Async->new(
    schema_class   => 'MyApp::Schema', # Required
    connect_info   => $connect_info,   # Required
    workers        => 4,               # Optional, default 4
    loop           => $loop,           # Optional IO::Async::Loop
    cache_ttl      => 300,             # Optional cache TTL in secs
    cache          => $chi_object,     # Optional custom cache
    enable_retry   => 1,               # Optional, default 0
    max_retries    => 3,               # Optional, default 3
    retry_delay    => 1,               # Optional, default 1 sec
    query_timeout  => 30,              # Optional, default 30 secs
    enable_metrics => 1,               # Optional, default 0
    health_check   => 300,             # Optional health check interval
    on_connect_do  => $sql_commands,   # Optional SQL to run on connect
);

Parameters:

  • schema_class (Required)

    The DBIx::Class::Schema class name.

  • connect_info (Required)

    Arrayref of connection parameters passed to $schema_class->connect().

  • workers (Optional, default: 4)

    Number of worker processes for the connection pool.

  • loop (Optional)

    IO::Async::Loop instance. A new loop will be created if not provided.

  • cache_ttl (Optional, default: 300)

    Cache time-to-live in seconds. Set to 0 to disable caching.

  • cache (Optional)

    CHI cache object for custom cache configuration.

  • enable_retry (Optional, default: 0)

    Enable automatic retry for deadlocks and timeouts.

  • max_retries (Optional, default: 3)

    Maximum number of retry attempts.

  • retry_delay (Optional, default: 1)

    Initial delay between retries in seconds (uses exponential backoff).

  • query_timeout (Optional, default: 30)

    Query timeout in seconds.

  • enable_metrics (Optional, default: 0)

    Enable metrics collection (requires Metrics::Any).

  • health_check (Optional, default: 300)

    Health check interval in seconds. Set to 0 to disable health checks.

  • on_connect_do (Optional)

    Arrayref of SQL statements to execute after connecting.

METHODS

Performs a search query.

my $results = await $async_db->search(
    $resultset_name,
    $search_conditions,    # Optional hashref
    $attributes,           # Optional hashref
);

Attributes may include:

{
    order_by  => 'name DESC',
    rows      => 50,
    page      => 2,
    columns   => [qw/id name/],
    prefetch  => 'relation',
    cache     => 1,
    cache_key => 'custom_key',
}

All results are returned as arrayrefs of hashrefs.

find

Finds a single row by primary key.

my $row = await $async_db->find($resultset_name, $id);

Returns: Hashref of row data or undef if not found.

create

Creates a new row.

my $new_row = await $async_db->create(
    $resultset_name,
    { name => 'John', email => 'john@example.com' }
);

Returns: Hashref of created row data.

update

Updates an existing row.

my $updated_row = await $async_db->update(
    $resultset_name,
    $id,
    { name => 'Jane', status => 'active' }
);

Returns: Hashref of updated row data or undef if row not found.

delete

Deletes a row.

my $success = await $async_db->delete($resultset_name, $id);

Returns: 1 if deleted, 0 if row not found.

count

Counts rows matching conditions.

my $count = await $async_db->count(
    $resultset_name,
    { active => 1, status => 'pending' }  # Optional
);

Returns: Integer count.

raw_query

Executes raw SQL query.

my $results = await $async_db->raw_query(
    'SELECT * FROM users WHERE age > ? AND status = ?',
    [25, 'active']  # Optional bind values
);

Returns: Arrayref of hashrefs.

txn_do

Executes a transaction.

my $result = await $async_db->txn_do(sub {
    my $schema = shift;

    # Multiple operations that should succeed or fail together
    $schema->resultset('Account')->find(1)->update({ balance => \'balance - 100' });
    $schema->resultset('Account')->find(2)->update({ balance => \'balance + 100' });

    return 'transfer_complete';
})->get;

The callback receives a DBIx::Class::Schema instance and should return the transaction result.

IMPORTANT: This method has limitations due to serialisation constraints. The CODE reference passed to txn_do must be serialisable by Sereal, which may not support anonymous subroutines or CODE references with closed over variables in all configurations.

If you encounter serialisation errors, consider:

  • Using named subroutines instead of anonymous ones

  • Recompiling Sereal with ENABLE_SRL_CODEREF support

  • Using individual async operations instead of transactions

  • Using the txn_batch method for predefined operations

Common error: Found type 13 CODE(...), but it is not representable by the Sereal encoding format

This indicates that your Sereal installation does not support CODE reference serialization. You may need to recompile Sereal with:

perl Makefile.PL --enable-srl-coderef
make
make install

Alternatively, structure your code to avoid passing CODE references through worker boundaries.

txn_batch

Executes a batch of operations within a transaction. This is the recommended alternative to txn_do as it avoids CODE reference serialisation issues.

my $result = $async_db->txn_batch(
    # Update operations
    { type => 'update', resultset => 'Account', id => 1,
      data => { balance => \'balance - 100' } },
    { type => 'update', resultset => 'Account', id => 2,
      data => { balance => \'balance + 100' } },

    # Create operation
    { type => 'create', resultset => 'Log',
      data => { event => 'transfer', amount => 100, timestamp => \'NOW()' } },
)->get;

# Returns count of successful operations
say "Executed $result operations in transaction";

Supported operation types:

  • update - Update an existing record

    {
        type      => 'update',
        resultset => 'User',       # ResultSet name
        id        => 123,          # Primary key value
        data      => { name => 'New Name', status => 'active' }
    }
  • create - Create a new record

    {
        type      => 'create',
        resultset => 'Order',
        data      => { user_id => 1, amount => 99.99, status => 'pending' }
    }
  • delete - Delete a record

    {
        type      => 'delete',
        resultset => 'Session',
        id        => 456
    }
  • raw - Execute raw SQL (use with caution)

    {
        type => 'raw',
        sql  => 'UPDATE accounts SET balance = balance - ? WHERE id = ?',
        bind => [100, 1]
    }

All operations succeed or fail together. If any operation fails, the entire transaction is rolled back.

Returns: Number of successfully executed operations.

search_multi

Executes multiple search queries concurrently.

my @results = await $async_db->search_multi(
    ['User', { active => 1 }, { rows => 10 }],
    ['Product', { category => 'books' }],
    ['Order', undef, { order_by => 'created_at DESC', rows => 5 }],
);

Returns: Array of results in the same order as queries.

search_with_prefetch

Search with eager loading of relationships.

my $users_with_orders = await $async_db->search_with_prefetch(
    'User',
    { active => 1 },
    'orders',  # Relationship name or arrayref for multiple
    { rows => 10 }
);

Returns: Arrayref of results with prefetched data.

health_check

Performs health check on all workers.

my $healthy_workers = await $async_db->health_check;

Returns: Number of healthy workers.

stats

Returns statistics about database operations.

my $stats = $async_db->stats;

Returns: Hashref with query counts, cache hits, errors, etc.

disconnect

Gracefully disconnects all workers and cleans up resources.

$async_db->disconnect;

loop

Returns the IO::Async::Loop instance.

my $loop = $async_db->loop;

schema_class

Returns the schema class name.

my $class = $async_db->schema_class;

PERFORMANCE TIPS

  • Worker Count

    Adjust the workers parameter based on your database connection limits and expected concurrency. Typically 2-4 workers per CPU core works well.

  • Caching

    Use caching for read-heavy workloads. Set cache_ttl appropriately for your data volatility.

  • Batch Operations

    Use search_multi for fetching unrelated data concurrently rather than sequential await calls.

  • Connection Pooling

    Each worker maintains its own persistent connection. Monitor database connection counts if using many instances.

  • Timeouts

    Set appropriate query_timeout values to prevent hung queries from blocking workers.

ERROR HANDLING

All methods throw exceptions on failure. Common error scenarios:

  • Database connection failures

    Thrown during initial connection or health checks.

  • Query timeouts

    Thrown when queries exceed query_timeout.

  • Deadlocks

    Automatically retried if enable_retry is true.

  • Invalid SQL/schema errors

    Passed through from DBIx::Class.

Use try/catch blocks or ->catch on futures to handle errors.

METRICS

When enable_metrics is true and Metrics::Any is installed, the module collects:

  • db_async_queries_total - Total query count

  • db_async_cache_hits_total - Cache hit count

  • db_async_cache_misses_total - Cache miss count

  • db_async_query_duration_seconds - Query duration histogram

  • db_async_workers_active - Active worker count

LIMITATIONS

  • Result objects

    Returned rows are plain hashrefs, not DBIx::Class row objects.

  • Transactions

    Transactions execute on a single worker only.

  • Large result sets

    All rows are loaded into memory. Use pagination for large datasets.

AUTHOR

Mohammad Sajid Anwar, <mohammad.anwar at yahoo.com>

REPOSITORY

https://github.com/manwar/DBIx-Class-Async

BUGS

Please report any bugs or feature requests through the web interface at https://github.com/manwar/DBIx-Class-Async/issues. I will be notified and then you'll automatically be notified of progress on your bug as I make changes.

SUPPORT

You can find documentation for this module with the perldoc command.

perldoc DBIx::Class::Async

You can also look for information at:

LICENSE AND COPYRIGHT

Copyright (C) 2026 Mohammad Sajid Anwar.

This program is free software; you can redistribute it and / or modify it under the terms of the the Artistic License (2.0). You may obtain a copy of the full license at:

http://www.perlfoundation.org/artistic_license_2_0

Any use, modification, and distribution of the Standard or Modified Versions is governed by this Artistic License.By using, modifying or distributing the Package, you accept this license. Do not use, modify, or distribute the Package, if you do not accept this license.

If your Modified Version has been derived from a Modified Version made by someone other than you,you are nevertheless required to ensure that your Modified Version complies with the requirements of this license.

This license does not grant you the right to use any trademark, service mark, tradename, or logo of the Copyright Holder.

This license includes the non-exclusive, worldwide, free-of-charge patent license to make, have made, use, offer to sell, sell, import and otherwise transfer the Package with respect to any patent claims licensable by the Copyright Holder that are necessarily infringed by the Package. If you institute patent litigation (including a cross-claim or counterclaim) against any party alleging that the Package constitutes direct or contributory patent infringement,then this Artistic License to you shall terminate on the date that such litigation is filed.

Disclaimer of Warranty: THE PACKAGE IS PROVIDED BY THE COPYRIGHT HOLDER AND CONTRIBUTORS "AS IS' AND WITHOUT ANY EXPRESS OR IMPLIED WARRANTIES. THE IMPLIED WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, OR NON-INFRINGEMENT ARE DISCLAIMED TO THE EXTENT PERMITTED BY YOUR LOCAL LAW. UNLESS REQUIRED BY LAW, NO COPYRIGHT HOLDER OR CONTRIBUTOR WILL BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING IN ANY WAY OUT OF THE USE OF THE PACKAGE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.