NAME

DBIx::Class::Async::ResultSet - Non-blocking resultset proxy with Future-based execution

VERSION

Version 0.55

SYNOPSIS

my $rs = $schema->resultset('User')->search({ active => 1 });

# Async execution using Futures
$rs->all->then(sub {
    my $users = shift; # Arrayref of DBIx::Class::Async::Row objects
    foreach my $user (@$users) {
        print "Found: " . $user->username . "\n";
    }
})->retain;

# Using the await helper
my $count = $schema->await( $rs->count );

DESCRIPTION

This class provides an asynchronous interface to DBIx::Class::ResultSet. Most methods that would normally perform I/O (like all, count, or create) return a Future object instead of raw data.

METHODS

new

my $rs = DBIx::Class::Async::ResultSet->new(
    schema_instance => $schema,
    source_name     => 'User',
    cond            => { is_active => 1 },
    attrs           => { order_by => 'created_at' },
);

Instantiates a new asynchronous ResultSet. While typically called internally by $schema->resultset('Source'), it can be used to manually construct a result set with a specific state.

  • Required Arguments

  • Optional Arguments

    • async_db: The worker bridge. If omitted, it defaults to the bridge associated with the schema_instance.

    • cond: A hashref of search conditions (the WHERE clause).

    • attrs: A hashref of query attributes (join, prefetch, rows, etc.).

    • result_class: The class used to inflate rows. Defaults to DBIx::Class::Core.

  • Internal State

    The constructor initialises buffers for rows (_rows) and iteration cursors (_pos), ensuring that newly created ResultSets are always in a "clean" state ready for fresh execution.

new_result

Internal method used to turn a raw database hashref into a DBIx::Class::Async::Row object. It handles:

  • Dotted Key Expansion

    Expands {'user.name' => 'Bob'} into nested hashes for prefetched relationships.

  • Relationship Inflation

    Automatically turns prefetched relationship data into nested Row objects.

  • Class Hijacking

    If a custom result_class is used, it dynamically creates an anonymous class inheriting from both the async row and your custom class.

new_result_set

my $new_rs = $rs->new_result_set({ cond => { active => 1 } });

An internal but critical factory method used to spawn new instances of the ResultSet while preserving the asynchronous execution context.

This method performs a "smart clone" of the current object's state, ensuring that the background worker pool and metadata links are carried over to the derived ResultSet.

  • State Mapping

    It automatically maps internal underscored attributes (e.g., _source_name) to clean constructor arguments (source_name).

  • Infrastructure Persistence

    Explicitly carries over the async_db (the worker bridge) and the schema_instance.

  • Override Injection

    Accepts a hashref of overrides to modify the state of the new instance (commonly used for merging new search conditions or attributes).

Example: Manual ResultSet Cloning

If you were extending this library and needed to create a specialised ResultSet that shares the same worker pool:

sub specialised_search {
    my ($self, $extra_logic) = @_;

    # This ensures the new RS knows how to talk to the workers
    return $self->new_result_set({
        cond  => { %{$self->{_cond}},  %$extra_logic },
        attrs => { %{$self->{_attrs}}, cache_for => '1 hour' }
    });
}

all

my $future = $rs->all;
$future->on_done(sub {
    my $rows = shift; # Arrayref of row objects
    say $_->name for @$rows;
});

The primary method for retrieving all results from the ResultSet. It returns a Future that resolves to an arrayref of DBIx::Class::Async::Row objects.

  • Tier 1: Prefetched/Injected Data

    If data was manually injected via set_cache or arrived as raw hashrefs, all automatically inflates them into full Row objects, injecting the necessary async_db bridge and metadata into each.

  • Tier 2: Local Buffer

    If the ResultSet has already been executed and the rows are held in memory (_rows), it returns them immediately wrapped in a resolved Future.

  • Tier 3: Shared Query Cache

    Before hitting the database, it consults the _query_cache using a surgical lookup based on the source name and query signature.

  • Tier 4: Non-Blocking Fetch

    On a cache miss, it dispatches the request to the worker pool via all_future. Once results return, they are indexed in the cache for future requests.

Efficiency Note: This method is highly optimised to prevent redundant IPC (Inter-Process Communication). Multiple calls to all on the same ResultSet will only ever trigger a single network request.

all_future

my $future = $rs->all_future($cond?, \%attrs?);

The low-level execution engine for searches. Dispatches a search request to the background worker and returns a Future.

  • Payload Construction

    Merges the ResultSet's internal state (cond and attrs) with any temporary overrides passed to the method. It uses _build_payload to ensure the data is serialisable for IPC.

  • Worker Dispatch

    Calls the search method on the background worker pool via the internal _call_worker bridge.

  • Post-Fetch Inflation (Tier 1)

    Before creating objects, it runs custom column inflators. This is where database strings (like JSON or custom types) are converted back into Perl structures.

  • Inflation Logic (Tier 2)

    • If HashRefInflator is requested, it returns the raw data structures for maximum performance.

    • Otherwise, it maps the data through _inflate_row to produce fully functional DBIx::Class::Async::Row objects.

as_query

my ($sql, @bind) = @{ $rs->as_query };

Returns a representation of the SQL query and its bind parameters that would be executed by this ResultSet.

Unlike most other methods in this package, as_query is synchronous and returns a standard DBIC query arrayref immediately. It does not communicate with the worker pool.

  • Shadow Schema Execution

    Internally, it maintains a dbi:NullP: (Null Proxy) connection. This allows the DBIx::Class SQL generator to function in the parent process without requiring a real database socket.

  • Metadata Awareness

    It automatically loads your schema_class if it isn't already in memory to ensure relationships and column types are correctly mapped to SQL.

  • Warning Suppression

    It intelligently silences "Generic Driver" warnings (like undetermined_driver) that typically occur when generating SQL without an active database handle, keeping your logs clean while preserving actual errors if ASYNC_TRACE is enabled.

Example: Debugging a complex join

my $rs = $schema->resultset('User')->search(
    { 'orders.status' => 'shipped' },
    { join => 'orders' }
);

my ($sql, @bind) = @{ $rs->as_query };
print "Generated SQL: $sql\n";

clear_cache

Clears both the local ResultSet buffer and the central query cache for this specific source.

cursor

my $cursor = $rs->cursor;

Returns a storage-level cursor object for the current ResultSet.

This is a low-level method typically used when you need to handle extremely large result sets that would exceed memory limits if fetched all at once via all.

  • Async Integration

    The returned cursor is not a standard DBI cursor; it is wrapped by DBIx::Class::Async::Storage, ensuring that calls to next or all on the cursor itself are still routed through the non-blocking worker pool.

  • Efficiency

    Use this when you intend to process thousands of rows and want to maintain a constant memory footprint.

Example: Manual Cursor Iteration

my $cursor = $rs->search({ type => 'heavy_report' })->cursor;

# While this looks synchronous, the underlying storage handle
# manages the async state transitions.
while (my @row = $cursor->next) {
    process_data(@row);
}

create

my $future = $rs->create({
    username => 'jdoe',
    profile  => { theme => 'dark' } # Automatically deflated if configured
});

Asynchronously inserts a new record into the database. Returns a Future that resolves to a DBIx::Class::Async::Row object representing the persisted data (including database-generated defaults and IDs).

  • Auto-Deflation

    Before sending data to the worker pool, the method strips table aliases (me., self.) and applies custom deflation logic (e.g., serialising a hashref to a JSON string).

  • Cache Invalidation

    Calling create automatically clears the ResultSet's internal cache via "clear_cache" to ensure subsequent queries reflect the new state of the database.

  • Lifecycle Management

    Upon a successful insert, the worker returns the raw record data. The parent process then:

    1. Re-inflates complex types (e.g., strings back to objects).
    2. Instantiates a Row object via "new_result".
    3. Marks the object as in_storage, making it ready for immediate updates or deletions.

Example: Handling a New User Signup

$schema->resultset('User')->create({ email => 'new@user.com' })
    ->then(sub {
        my $user = shift;
        say "Created user with ID: " . $user->id;
    })
    ->catch(sub {
        warn "Failed to create user: " . shift;
    });

count

my $future = $rs->count({ status => 'active' });
$future->on_done(sub {
    my $count = shift;
    say "Found $count active records.";
});

Executes a SELECT COUNT(*) query against the database asynchronously. Returns a Future resolving to the integer count.

  • Optimisation - Row Limit

    If the ResultSet has a fixed number of rows defined in its attributes and no additional conditions are passed to count, the method returns the rows value immediately without hitting the database.

  • Caching

    Uses a specialised cache key (suffix :count) to store results in the async_db cache. Aggregate queries are often expensive; this ensures that multiple count requests for the same criteria are served from memory.

  • Worker Dispatch

    Dispatches the count command to the background worker pool, keeping the parent process non-blocking.

Example: Conditional Counting

$rs->count({ category_id => 5 })->then(sub {
    my $count = shift;
    return $count > 0 ? $rs->all : Future->done([]);
});

count_future

my $future = $rs->count_future;

Returns a Future that resolves to the row count, bypassing all parent-process caching mechanisms.

Unlike the standard "count" method, which may return a cached result or an in-memory attribute value, count_future always dispatches a SELECT COUNT(*) request directly to the worker pool.

  • Bypasses Cache

    Ignores any data stored in the _query_cache or the shared _cache bucket.

  • Stateless

    Does not inspect $rs-{_attrs}->{rows}>. It forces the database to perform the calculation.

  • Direct Dispatch

    Uses the low-level _call_worker interface to ensure the smallest possible overhead between the method call and the IPC (Inter-Process Communication) layer.

Example: Forcing a fresh count in a high-concurrency environment

# count() might return a cached value from 2 seconds ago
# count_future() hits the metal

$rs->count_future->on_done(sub {
    my $exact_count = shift;
    print "Live DB Count: $exact_count\n";
});

count_literal

my $future = $rs->count_literal('age > ? AND status = "active"', 21);

Performs a count operation using a raw SQL fragment. This is useful for complex filtering that is difficult to express via standard SQL::Abstract syntax.

Returns a Future resolving to the integer count.

  • Fluent Chaining

    Internally, this method calls search_literal to generate a transient, specialised ResultSet and then immediately calls count on it.

  • Infrastructure Inheritance

    The transient ResultSet automatically inherits the _async_db worker bridge and _schema_instance from the parent, ensuring the raw SQL is executed in the correct worker process.

  • Security

    Always pass parameters as a bind list (e.g., @bind) rather than interpolating variables directly into the SQL string to prevent SQL injection.

Example: Using Database-Specific Functions

# Using Postgres-specific ILIKE for case-insensitive counting
my $future = $rs->count_literal('email ILIKE ?', '%@GMAIL.COM');

$future->on_done(sub {
    my $gmail_users = shift;
    print "Found $gmail_users Gmail accounts.\n";
});

count_rs

my $count_rs = $rs->count_rs($cond?, \%attrs?);

Returns a new DBIx::Class::Async::ResultSet specifically configured to perform a COUNT(*) operation.

Unlike "count", which returns a Future that executes immediately, count_rs is synchronous and returns a ResultSet. This allows you to pre-define a count query and pass it around your application or combine it with further search modifiers before execution.

  • Chainability

    Because it returns a ResultSet, you can continue chaining methods like search or attr before eventually calling all or next to get the value.

  • Infrastructure Guarantee

    Internally calls search, ensuring the newly created ResultSet remains pinned to the same async_db worker bridge and parent schema_instance.

  • Automatic Projection

    Automatically sets the select attribute to { count = '*' }> and the as alias to 'count'.

Example: Defining a count query for later execution

# Create the count-specific ResultSet
my $pending_count_rs = $rs->count_rs({ status => 'pending' });

# ... later in the code ...

# Execute it via the standard async 'all' or 'next'
$pending_count_rs->next->then(sub {
    my $row = shift;
    print "Pending count: " . $row->get_column('count') . "\n";
});

count_total

my $total_future = $rs->count_total($extra_cond?, \%extra_attrs?);

Returns a Future resolving to the total number of records matching the current filter, specifically ignoring any pagination or sorting attributes.

This is primarily used to calculate the "Total Pages" in a paginated UI, where the current ResultSet might be sliced to only 20 rows, but you need to know that 5,000 total records match the criteria.

  • Attribute Stripping

    Automatically removes rows, offset, page, and order_by from the query. This ensures the database doesn't perform unnecessary sorting or slicing, resulting in a much faster count.

  • Parameter Merging

    Allows passing in $cond and $attrs which are merged with the existing ResultSet state before execution.

  • Direct Execution

    Like "count_future", this bypasses any local result buffers and asks the worker to perform a fresh COUNT(*) on the un-sliced dataset.

Example: Implementing Pagination Metadata

my $paged_rs = $rs->search({ category => 'electronics' })
                  ->page(1)
                  ->rows(10);

# This resolves to 10 (the current page size)
my $current_count_f = $paged_rs->count;

# This resolves to the absolute total (e.g., 450)
my $grand_total_f = $paged_rs->count_total;

Future->needs_all($current_count_f, $grand_total_f)->then(sub {
    my ($current, $total) = @_;
    print "Showing $current of $total total items.\n";
});

delete

my $future = $rs->search({ status => 'obsolete' })->delete;

Asynchronously removes records matching the ResultSet's criteria from the database. Returns a Future resolving to the number of rows deleted.

  • Auto-Routing Logic

    • Direct Path

      If the query is a simple attribute-free HASH (e.g., { id => 5 }), it dispatches a single DELETE command directly to the worker.

    • Safe Path (delete_all)

      If the ResultSet contains complex logic (joins, limits, or offsets), it automatically delegates to "delete_all" to avoid database-specific restrictions on multi-table deletes.

  • Cache Invalidation

    Immediately calls "clear_cache" on the ResultSet to prevent the application from reading stale data that has been removed from the physical storage.

  • Non-Blocking

    Like all write operations in this library, the process is handed off to the worker pool, allowing your main application to continue processing other requests.

Example: Safe Deletion of Limited Sets

# This uses the 'Safe Path' because of the 'rows' attribute
$rs->search({ type => 'log' }, { rows => 100 })->delete
   ->on_done(sub {
       my $count = shift;
       say "Cleaned up $count log entries.";
   });

delete_all

my $future = $rs->search({ status => 'expired' })->delete_all;

Performs a two-stage asynchronous deletion. First, it retrieves the records matching the criteria to identify their unique Primary Keys, then it issues a targeted bulk delete for those specific records.

  • Precision

    This method is safer than a direct delete when dealing with ResultSets that use rows, offset, or complex join attributes, as it ensures only the specific records visible to the ResultSet are removed.

  • Composite Key Support

    Automatically detects and handles tables with multiple primary keys, constructing a complex -or condition to ensure the correct rows are targeted.

  • Short-Circuiting

    If the initial search yields no results, the method returns a resolved Future with a value of 0, saving an unnecessary round-trip to the database worker.

  • Return Value

    Resolves to the number of rows that were successfully identified and sent for deletion.

Example: Safely Deleting with Relationships

# Delete orders for inactive users (complex join)
$schema->resultset('Order')->search({ 'user.is_active' => 0 }, { join => 'user' })
       ->delete_all
       ->on_done(sub {
           my $deleted = shift;
           print "Purged $deleted orders from inactive accounts.\n";
       });

first_future

An alias for "first". This naming convention is provided for consistency with other *_future methods in the library, signaling that the return value is an asynchronous Future.

Example: Retrieving the most recent login

my $future = $schema->resultset('UserLog')->search(
    { user_id => 42 },
    { order_by => { -desc => 'login_at' } }
)->first;

$future->on_done(sub {
    my $log = shift;
    print "Last seen: " . $log->login_at if $log;
});

first

my $future = $rs->first;

Returns a Future resolving to the first DBIx::Class::Async::Row object in the ResultSet, or undef if the set is empty.

  • Memory First

    If the ResultSet has already been populated (e.g. via a previous call to all), this method returns the first element from the internal _rows buffer immediately without a database hit.

  • Auto-Inflation

    If the internal buffer contains raw data (hashrefs), it is automatically inflated into a proper Row object before the Future resolves.

  • Optimised Fetch

    If no data is in memory, it executes a targeted query with rows => 1 (SQL LIMIT 1). This is significantly faster and more memory-efficient than fetching the entire result set.

find

# Find by Primary Key scalar
my $user_f = $rs->find(123);

# Find by unique constraint hashref
my $user_f = $rs->find({ email => 'gemini@example.com' });

Retrieves a single row from the database based on a unique identifier. Returns a Future resolving to a single DBIx::Class::Async::Row object or undef if no match is found.

  • Scalar Lookup

    If a single value is provided, it is automatically mapped to the table's Primary Key column. Note: This only works for tables with a single Primary Key.

  • HashRef Lookup

    If a hashref is provided, it is used as a specific search condition. This is useful for finding records by unique columns other than the primary key (e.g., username or slug).

  • Short-Circuiting

    If undef is passed as the identifier, the method immediately returns a resolved Future containing undef, avoiding a pointless database round-trip.

  • Under the Hood

    This method is a wrapper around "search" and "single", ensuring that a LIMIT 1 is always applied to the query for maximum performance.

find_or_new

my $future = $rs->find_or_new({ email => 'user@example.com' }, \%attrs?);

Attempts to find a record in the database using unique constraints. If found, it returns the existing row. If not, it returns a new, **in-memory** row object populated with the provided data.

Returns a Future resolving to a DBIx::Class::Async::Row object.

  • Unique Lookup

    Uses an internal helper to extract unique identifiers (Primary Keys or Unique Constraints) from the provided data for the initial find call.

  • Data Merging

    If the record is not found, the new object is created by merging the provided $data with any existing constraints (where clauses) currently on the ResultSet.

  • Namespace Cleaning

    Automatically strips DBIC aliases like me., foreign., or self. from column names to ensure the new object has clean, accessible attributes.

  • Non-Blocking

    Even though it may return a "new" object that hasn't hit the DB yet, it still returns a Future to maintain API consistency with the asynchronous find operation.

Example: Preparing a record for a form

$schema->resultset('User')->find_or_new({
    username => 'jdoe'
})->then(sub {
    my $user = shift;

    # If $user->in_storage is false, we know this is a fresh object
    say "Welcome back, " . $user->username if $user->in_storage;
    say "Sign up now, "  . $user->username unless $user->in_storage;
});

find_or_create

my $future = $rs->find_or_create({
    email => 'user@example.com',
    name  => 'John Doe'
});

Ensures a record exists in the database. Returns a Future resolving to a DBIx::Class::Async::Row object.

  • Optimistic Strategy

    First attempts to locate the record using "find". If the record is found, it is returned immediately.

  • Atomic Creation

    If the record is missing, it attempts to "create" it.

  • Race Condition Recovery

    In distributed systems, a "Time-of-Check to Time-of-Use" (TOCTOU) race can occur. If another process inserts the record after our find but before our create, the database will throw a Unique Constraint error. This method catches that error and performs one final find to retrieve the "winning" record.

  • Payload Handling

    Automatically extracts unique constraints from the provided data to build the lookup criteria.

Example: Safe Tagging System

# Multiple workers might try to create the 'perl' tag at once
$schema->resultset('Tag')->find_or_create({ name => 'perl' })
       ->then(sub {
           my $tag = shift;
           return $post->add_to_tags($tag);
       });

get_attribute

my $rows_limit = $rs->get_attribute('rows');

Returns the value of a specific attribute (e.g., rows, offset, join) currently set on the ResultSet. This is a synchronous read from the internal _attrs hashref.

get

my $data = $rs->get;

Returns the current "raw" state of the ResultSet's data buffer.

  • Returns the arrayref of inflated Row objects if they exist.

  • Falls back to returning raw hashrefs (_entries) if they have been fetched from a worker but not yet inflated.

  • Returns an empty arrayref [] if no data has been fetched.

get_cache

my $rows = $rs->get_cache;

Specifically returns the internal buffer of inflated Row objects. Unlike get, this will return undef if the objects haven't been created yet, adhering to the standard DBIC behaviour where "cache" implies fully realised results.

get_column

my $col_obj = $rs->get_column('price');

Returns a DBIx::Class::Async::ResultSetColumn object for the specified column.

This pivots the API from row-based operations to column-based aggregate functions. The returned object allows you to perform asynchronous math directly on the database:

$rs->get_column('age')->func_future('AVG')->on_done(sub {
    my $avg = shift;
    print "Average Age: $avg\n";
});
  • Context Preservation

    The column object inherits the ResultSet's current filters (where clause) and the async_db worker bridge.

is_cache_found

my $cached_rows = $rs->is_cache_found($cache_key);

Queries the shared async infrastructure to see if a specific query result is already available in the _query_cache.

This is a "surgical" cache lookup. Instead of searching a flat global cache, it first identifies the "bucket" for the current source_name and then looks for the specific key.

  • Returns

    The cached arrayref of rows if found, or undef if the cache is empty or expired.

  • Scope

    This lookup is scoped to the _async_db instance, allowing multiple ResultSets to benefit from the same background fetch.

is_ordered

if ($rs->is_ordered) { ... }

Returns a boolean indicating whether the ResultSet has an order_by attribute defined.

This is used internally by methods like "pager" to issue warnings when pagination is attempted on unordered data, which can lead to non-deterministic results in distributed systems.

is_paged

say "This is a subset" if $rs->is_paged;

Returns a boolean indicating whether the ResultSet has a page attribute defined. This is a reliable way to check if the current ResultSet represents a "slice" of data rather than the full set.

next

$rs->next->then(sub {
    my $row = shift;
    return unless $row;
    # Process row...
});

Iterates through the resultset. If the buffer is empty, it triggers an all call internally to populate the local cache and then returns the first element.

page / pager

my $paged_rs = $rs->page(2);
my $pager    = $paged_rs->pager; # Returns DBIx::Class::Async::ResultSet::Pager

page returns a cloned RS with paging attributes. pager provides an object for UI pagination logic (e.g., last_page, entries_per_page).

populate / populate_bulk

$rs->populate([
    { name => 'Alice', age => 30 },
    { name => 'Bob',   age => 25 },
]);

High-speed bulk insertion. Deflates all rows in the parent process before sending a single batch request to the worker. Returns a Future resolving to an arrayref of created objects.

prefetch

my $paged_rs = $rs->prefetch({ 'orders' => 'order_items' });

Informs the ResultSet to fetch related data alongside the primary records in a single query.

This is an alias for calling $rs->search(undef, { prefetch => $prefetch }).

  • Efficiency

    Reduces the number of round-trips to the worker pool. Without prefetch, accessing a relationship on a row might trigger a new asynchronous request; with it, the data is already present in the row's internal buffer.

  • Deep Nesting

    Supports the standard DBIx::Class syntax for nested relationships (hashes for single dependencies, arrays for multiple).

  • Async Inflation

    When the worker returns the data, the parent process's new_result method automatically identifies the prefetched columns and inflates them into nested DBIx::Class::Async::Row objects.

Example: Eager Loading for an API Response

# Fetch users and their profiles in one go
$schema->resultset('User')->prefetch('profile')->all->then(sub {
    my $users = shift;
    foreach my $user (@$users) {
        # This is now a synchronous, in-memory call because of prefetch
        print $user->username . " lives in " . $user->profile->city . "\n";
    }
});

result_class

$rs->result_class('DBIx::Class::ResultClass::HashRefInflator');

Gets or sets the class used to inflate rows. If set to HashRefInflator, all will return raw hashrefs instead of Row objects, which is significantly faster for read-only APIs.

my $orders_rs = $user_rs->related_resultset('orders');

Returns a new ResultSet representing a relationship. It automatically handles the JOIN logic and prefixes existing conditions (e.g., turning { id = 5 } into { 'user.id' = 5 }).

result_source

my $source = $rs->result_source;

Returns the DBIx::Class::ResultSource object for the current ResultSet.

This object contains the structural definition of the data source, including column names, data types, relationships, and the primary key configuration.

  • Metadata Access

    Use this to introspect the schema at runtime (e.g., checking if a column exists or retrieving relationship metadata).

  • Internal Proxy

    This is a wrapper around the internal _get_source method, which ensures the metadata is lazily loaded from the schema definition if it isn't already present in the parent process.

  • Consistency

    While the ResultSet handles the *query logic*, the ResultSource handles the *data contract*.

Example: Introspecting Column Types

my $source = $rs->result_source;
my $info   = $source->column_info('created_at');

print "Column Type: " . $info->{data_type} . "\n";

reset_stats

$rs->reset_stats;

Clears all performance and telemetry counters within the associated asynchronous bridge (_async_db).

This resets counters such as query execution counts, worker round-trip times, and cache hit/miss ratios to zero. It is typically used at the start of a profiling block to measure the impact of a specific set of operations.

  • Global Scope

    Note that because stats are stored on the async_db object, calling this on one ResultSet will reset the statistics for all ResultSets sharing that same worker bridge.

  • Chainable

    Returns the ResultSet object to allow for fluent calling styles.

reset

$rs->reset;

Resets the internal cursor position (_pos) of the ResultSet to the beginning.

After calling reset, the next call to next or next_future will return the first row in the result set again. This is useful for re-iterating over results already stored in the local buffer without re-fetching them from the database.

  • Local Operation

    This only affects the iteration state of the current ResultSet instance.

Example: Measuring query impact

$rs->reset_stats;

$rs->search({ type => 'critical' })->all->then(sub {
    my $stats = $rs->{_async_db}->{_stats};
    print "Queries executed: " . $stats->{query_count} . "\n";
});

source

my $source = $rs->source;

A convenient alias for "result_source".

Returns the DBIx::Class::ResultSource object for the current ResultSet. This object is the "source of truth" for the table's structure, including column definitions and relationship mappings.

source_name

my $name = $rs->source_name;

Returns the string identifier of the ResultSource (e.g., 'User' or 'Order').

This is a high-performance, synchronous accessor. In an asynchronous architecture, the source_name is the primary key used to tell the background worker which table logic to load before executing a query.

  • Immutable

    This value is set when the ResultSet is first instantiated and persists through searches and clones.

  • Lightweight

    Unlike source, this does not trigger any lazy-loading of metadata; it simply returns the stored string from the internal state.

Example: Dynamic Dispatch based on Source

my $rs = get_some_resultset();

if ($rs->source_name eq 'User') {
    # Perform user-specific async logic
    $rs->search({ is_active => 1 })->all->...
}
my $new_rs = $rs->search(
    { age      => { '>' => 21 } },
    { order_by => 'name'        }
);

Returns a new ResultSet object with the added conditions and attributes. This is a synchronous, non-I/O operation that clones the current state.

search_future

An alias for "all_future". Returns a Future that resolves to the full list of results matching the current ResultSet.

search_literal

my $new_rs = $rs->search_literal('price > ?', 100);

Adds a raw SQL fragment to the query criteria.

  • Safety

    Parameters are passed as a bind list, preventing SQL injection.

  • Chainability

    Returns a new ResultSet, allowing you to chain further search or search_related calls.

search_rs

my $new_rs = $rs->search_rs({ status => 'active' });

A synchronous method that returns a new ResultSet object with the added search constraints. This is the standard way to chain query builders.

my $future = $user_rs->search_related('orders');

Like search_related_rs, but immediately triggers the database fetch. Returns a Future resolving to an arrayref of related Row objects.

my $orders_rs = $user_rs->search_related_rs('orders', { status => 'shipped' });

Pivots from the current ResultSet to a related data source. This is the core of DBIx::Class relational power.

  • Shadow Translation

    Internally uses a temporary DBIx::Class::ResultSet to calculate the complex join logic and foreign key mappings.

  • State Persistence

    The resulting ResultSet inherits the async_db and schema_instance, allowing it to execute the related query on the background worker.

  • Namespace Resolution

    Automatically resolves the target source_name based on the relationship mapping.

search_with_pager

my ($rows_f, $pager_f) = $rs->search_with_pager({ status => 'active' }, { rows => 20 });

Executes a paginated search and returns a Future that resolves to both the retrieved rows and a populated Data::Page (or Async Pager) object.

This is the most efficient way to handle UI pagination because it dispatches the data fetch and the total count calculation simultaneously to the background workers.

  • Auto-Paging

    If no page or rows attributes are provided in $attrs, the method defaults to page(1) to ensure a pager can be instantiated.

  • Parallel Execution

    Unlike standard synchronous code which fetches rows and *then* counts them, this method uses Future-needs_all> to maximise throughput by running both queries at once.

  • Pager Syncing

    The returned pager object is fully "hydrated" with the total entry count, meaning methods like last_page, entries_on_this_page, and next_page are immediately available for use in your templates or API responses.

Example: Implementing a Paginated API Endpoint

$rs->search_with_pager({ category => 'books' }, { page => 2, rows => 50 })
   ->then(sub {
       my ($rows, $pager) = @_;

       return Future->done({
           data => [ map { $_->TO_JSON } @$rows ],
           meta => {
               total_records => $pager->total_entries,
               current_page  => $pager->current_page,
               total_pages   => $pager->last_page,
           }
       });
   });

single

my $future = $rs->single($cond?);

An alias for "first". Retrieves the first row matching the ResultSet's criteria.

single_future

my $user_f = $rs->single_future({ username => 'm_smith' });

A convenience method that performs a search for a specific condition and immediately calls "first". It returns a Future resolving to a single Row object or undef.

  • Contextual Search

    If a $cond (hashref) is provided, it creates a temporary narrowed ResultSet before fetching.

  • Efficiency

    Like first, this automatically applies a LIMIT 1 if no data is already cached, ensuring the worker doesn't fetch unnecessary rows.

stats

my $all_stats = $rs->stats;
my $q_count   = $rs->stats('queries');

Returns performance metrics from the underlying asynchronous bridge.

  • Key Mapping

    Automatically handles both "clean" keys (queries) and internal keys (_queries).

  • Metrics included

    Usually contains queries (execution count), cache_hits, and errors.

schema

my $schema = $rs->schema;

Returns the DBIx::Class::Async::Schema instance that originally spawned this ResultSet. This is useful for pivoting to other ResultSets from within a row-processing callback.

slice

# Scalar context: returns a new ResultSet
my $sub_rs = $rs->slice(10, 19);

# List context: triggers execution
my $future = $rs->slice(0, 4);

Returns a subset of the ResultSet based on specific start and end indices.

  • Zero-Indexed

    Unlike page, slice uses 0-based indexing. For example, slice(0, 9) retrieves the first 10 rows.

  • Mathematical Translation

    Automatically converts the indices into offset and rows attributes for the SQL generator.

  • Context Sensitivity

    • In scalar context, it returns a new ResultSet object. This is ideal for further chaining or passing to a view.

    • In list context, it behaves like "all" and initiates an asynchronous fetch, returning a Future.

  • Validation

    Strictly enforces that indices are non-negative and that the first index does not exceed the last.

Example: Retrieving a specific "chunk" for processing

# Get rows 100 through 150 (inclusive)
$rs->slice(100, 149)->all->then(sub {
    my $rows = shift;
    process_batch($rows);
});

set_cache

$rs->set_cache(\@data);

Manually populates the internal data buffer of the ResultSet.

  • Input Format

    Requires an arrayref of either raw hashrefs (column data) or already-inflated Row objects.

  • Inflation Trigger

    By setting the internal _is_prefetched flag, this method ensures that the next time "all" or "next" is called, the library will process these entries through the standard inflation logic.

  • State Reset

    Automatically clears any previously cached rows (_rows) and resets the internal cursor position (_pos) to zero. This ensures that iteration starts fresh with the new dataset.

  • Chainable

    Returns the ResultSet object, allowing for fluent initialisation.

Example: Hydrating a ResultSet from an external cache

my $cached_data = $memcached->get("users_batch_1");

if ($cached_data) {
    $rs->set_cache($cached_data);
}

# Now $rs acts like it just hit the DB
$rs->all->then(sub {
    my $rows = shift;
    say $_->email for @$rows;
});

update

# Bulk update all rows in the current ResultSet
$rs->search({ status => 'pending' })->update({ status => 'processing' });

# Single targeted update ignoring current RS filters
$rs->update({ id => 42 }, { status => 'archived' });

Performs an asynchronous UPDATE operation on the database.

  • Set-Based Operation

    Unlike a row-level update, this method acts on the entire scope of the ResultSet in a single database round-trip.

  • Cache Invalidation

    Automatically calls clear_cache on the ResultSet. This prevents the parent process from serving stale data after the update has completed.

  • Deflation Support

    Automatically detects columns that require custom serialisation (e.g., JSON to string, DateTime to ISO string) by consulting the _custom_inflators registry.

  • Flexible Signature

    • update(\%updates): Uses the ResultSet's existing where clause.

    • update(\%cond, \%updates): Overrides the current filters with the provided %cond.

Example: Atomic Batch Update with Deflation

# Assuming 'metadata' is a column that deflates to JSON
my $future = $rs->search({ active => 1 })
                ->update({
                    last_updated => \'NOW()',
                    metadata     => { version => '2.0', source => 'api' }
                });

$future->on_done(sub { say "Batch update complete." });

update_all

my $future = $rs->search({ type => 'temporary' })->update_all({ type => 'permanent' });

Performs a two-step asynchronous update. First, it retrieves all rows matching the current criteria to identify their Primary Keys, then it issues a bulk update targeting those specific IDs.

  • Precision

    By fetching the IDs first, this method ensures that triggers or logic dependent on the primary key are correctly handled.

  • Safety

    If no rows match the initial search, the method short-circuits and returns a resolved Future with a value of 0, avoiding an unnecessary database trip for the update phase.

  • Traceability

    Supports ASYNC_TRACE logging to help debug empty sets or unexpected data types during the fetch phase.

  • Atomicity Note

    Unlike update, this involves two distinct database interactions. If the data changes between the fetch and the update phase, only the rows identified in the first phase will be updated.

Example: Safe Batch Update

$rs->update_all({ last_processed => \'NOW()' })->on_done(sub {
    my $count = shift;
    print "Successfully updated $count specific records.\n";
});

update_or_new

my $future = $rs->update_or_new({
    email => 'dev@example.com',
    name  => 'Gemini'
});

Attempts to locate a record using its unique constraints or primary key.

  • Action on Success

    If the record is found, it immediately triggers an asynchronous update with the provided data and returns a Future resolving to the updated Row object.

  • Action on Failure

    If no record is found, it creates a new in-memory row object. This object is not yet saved to the database (in_storage will be false).

  • Data Sanitisation

    Automatically strips table aliases (me., foreign.) from the data keys to ensure the Row object constructor receives clean column names.

  • Consistency

    This method always returns a Future, regardless of whether it performed a database update or a local object instantiation.

Example: Syncing User Profiles

$rs->update_or_new({
    external_id => $id,
    last_login  => \'NOW()'
})->then(sub {
    my $user = shift;
    if ($user->in_storage) {
        say "Updated existing user: " . $user->id;
    } else {
        say "Prepared new user for registration.";
        # You must call ->insert on the new object to persist it
        return $user->insert;
    }
});

update_or_create

my $future = $rs->update_or_create({
    username => 'coder123',
    last_seen => \'NOW()'
});

Attempts to find a record by its unique constraints. If found, it updates it. If not, it creates a new record in the database.

  • Atomic Strategy

    This method manages the "Check-then-Act" pattern safely across asynchronous workers.

  • Race Condition Recovery

    In highly concurrent systems, a record might be inserted by another process between this method's find and create steps. This method detects that specific database conflict (Unique Constraint Violation) and automatically recovers by re-finding the newly created record and updating it instead.

  • Error Handling

    While it handles "Already Exists" conflicts gracefully, other database errors (like type mismatches or connection issues) will still trigger a fail state in the returned Future.

Example: Distributed Token Sync

# Multiple workers might run this for the same 'service_key'
$schema->resultset('AuthToken')->update_or_create({
    service_key => 'worker_node_1',
    token       => $new_secure_token
})->on_done(sub {
    my $row = shift;
    say "Token synced for ID: " . $row->id;
})->on_fail(sub {
    die "Sync failed: " . shift;
});

AUTHOR

Mohammad Sajid Anwar, <mohammad.anwar at yahoo.com>

REPOSITORY

https://github.com/manwar/DBIx-Class-Async

BUGS

Please report any bugs or feature requests through the web interface at https://github.com/manwar/DBIx-Class-Async/issues. I will be notified and then you'll automatically be notified of progress on your bug as I make changes.

SUPPORT

You can find documentation for this module with the perldoc command.

perldoc DBIx::Class::Async::ResultSet

You can also look for information at:

LICENSE AND COPYRIGHT

Copyright (C) 2026 Mohammad Sajid Anwar.

This program is free software; you can redistribute it and / or modify it under the terms of the the Artistic License (2.0). You may obtain a copy of the full license at: http://www.perlfoundation.org/artistic_license_2_0 Any use, modification, and distribution of the Standard or Modified Versions is governed by this Artistic License.By using, modifying or distributing the Package, you accept this license. Do not use, modify, or distribute the Package, if you do not accept this license. If your Modified Version has been derived from a Modified Version made by someone other than you,you are nevertheless required to ensure that your Modified Version complies with the requirements of this license. This license does not grant you the right to use any trademark, service mark, tradename, or logo of the Copyright Holder. This license includes the non-exclusive, worldwide, free-of-charge patent license to make, have made, use, offer to sell, sell, import and otherwise transfer the Package with respect to any patent claims licensable by the Copyright Holder that are necessarily infringed by the Package. If you institute patent litigation (including a cross-claim or counterclaim) against any party alleging that the Package constitutes direct or contributory patent infringement,then this Artistic License to you shall terminate on the date that such litigation is filed. Disclaimer of Warranty: THE PACKAGE IS PROVIDED BY THE COPYRIGHT HOLDER AND CONTRIBUTORS "AS IS' AND WITHOUT ANY EXPRESS OR IMPLIED WARRANTIES. THE IMPLIED WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, OR NON-INFRINGEMENT ARE DISCLAIMED TO THE EXTENT PERMITTED BY YOUR LOCAL LAW. UNLESS REQUIRED BY LAW, NO COPYRIGHT HOLDER OR CONTRIBUTOR WILL BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING IN ANY WAY OUT OF THE USE OF THE PACKAGE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.