Thread::Subs

Execute selected Perl subroutines concurrently in worker threads with minimal cognitive overhead -- structured concurrency done right.

Synopsis

use threads;
use Thread::Subs;

# Mark a subroutine for threaded execution
sub expensive_operation :Thread {
    my ($data) = @_;
    # ... complex processing ...
    return $value;
}

# Start worker threads (uses reasonable defaults)
Thread::Subs::startup();

# Call returns immediately with a result object
my $result = expensive_operation($data);

# Do other work while expensive_operation runs in parallel
do_something_else();

# Block until result is ready
my $value = $result->recv;

Description

Thread::Subs provides a practical way to execute subroutines in concurrent worker threads while maintaining a simple programming model. It aims for very low cognitive overhead: you declare which subs should run in threads, start the workers, and then call those subs like normal functions. The only visible difference is that the sub immediately returns a lightweight "result" object, very similar to AnyEvent's condition variables.

It is a higher-level abstraction of parallel execution than most threading libraries offer, and facilitates multiple common use patterns like the following through a single API.

This module can be used stand-alone, but it also has ready-made integrations for AnyEvent, Mojolicious (Mojo::Promise), and Future. With Future::AsyncAwait, you can await a result from a worker thread inside an async sub. Mix classic event-driven code with genuine parallel execution at will.

Key Features

Caveats and Limitations

Example

The following program is a CPU-intensive search for partial MD5 hash matches. It finds and prints ten distinct strings which have a hex MD5 hash starting with five zeros. There is no parallelism.

use 5.014;
use warnings;
use Digest::MD5 qw(md5_hex);

sub find_partial_md5 {
    my ($string, $target) = @_;
    my $x = 0;
    ++$x until substr(md5_hex("$string $x"), 0, length($target)) eq $target;
    return "$string $x";
}

say $_ for map { find_partial_md5("blah $_", '00000') } (1..10);

This is an easily parallelised problem: if we have the resources, we can execute all ten calls in parallel. Let's assume we have resources to run five simultaneously. The parallelised code is as follows, with attention drawn to the changes.

use 5.014;
use warnings;
use threads;                   # added
use Digest::MD5 qw(md5_hex);
use Thread::Subs;              # added

sub find_partial_md5 :Thread { # added attribute
    my ($string, $target) = @_;
    my $x = 0;
    ++$x until substr(md5_hex("$string $x"), 0, length($target)) eq $target;
    return "$string $x";
}

Thread::Subs::startup(5); # added, start 5 workers
# Same map, but we store it in an array, then process the results.
my @work = map { find_partial_md5("blah $_", '00000') } (1..10);
say $_->recv for @work;

This produces the same output, but will execute in considerably less time if you have available CPU resources.

Installation

cpan Thread::Subs

Or manually:

perl Makefile.PL
make
make test
make install

Quick Start

Basic Usage

use threads;
use Thread::Subs;

sub compute :Thread {
    my ($n) = @_;
    # CPU-intensive work
    return $n * $n;
}

Thread::Subs::startup(5);  # 5 workers in default pool

my @results = map { compute($_) } 1..10;
print $_->recv, "\n" for @results;

With Callbacks

sub fetch_url :Thread {
    my ($url) = @_;
    # Network I/O
    return get($url);
}

Thread::Subs::startup();

my $result = fetch_url($url);
$result->cb(sub {
    my ($r) = @_;
    if ($r->failed) { warn "Failed: ", $r->data }
    else { process_data($r->data) }
});
Thread::Subs::stop_and_wait(); # ensure callbacks happen before exit

Concurrency Control

# Only one instance can run at a time
sub write_log :Thread(clim=1) {
    my ($message) = @_;
    # Safe concurrent access to shared resource
}

# Limit queue depth to prevent memory issues
sub batch_process :Thread(qlim=100) {
    my ($item) = @_;
    # Process item
}

# Dedicated pool for database operations
sub db_query :Thread(pool=DB) {
    my ($sql) = @_;
    # Each worker maintains its own DB connection
}

Integration with Event Loops

# AnyEvent
use AnyEvent;
my $cv = compute($n)->ae_cv;
$cv->cb(sub { say "Result: ", shift->recv });

# Mojolicious
use Mojo::Promise;
compute($n)->mojo_promise->then(
    sub { say "Success: @_" },
    sub { warn "Error: @_" }
)->wait;

# Future (with Future::AsyncAwait)
use Future::AsyncAwait;
async sub process {
    my $result = await compute($n)->future;
    return $result * 2;
}

Common Use Cases

CPU-Intensive Parallelism

Distribute CPU-bound work across multiple cores:

sub crunch_numbers :Thread {
    # Expensive calculation
}

Thread::Subs::startup(7);  # Leave one core free (assumes 8)

my @jobs = map { crunch_numbers($_) } @data;
my @results = map { $_->recv } @jobs;

Resource Pool Management

Manage limited resources (database connections, API clients):

use DBI;
use Thread::Subs;

my $dbh;  # Each worker gets its own connection

sub query :Thread(pool=DB) {
    my ($sql, @bind) = @_;
    $dbh //= DBI->connect(...);  # Lazy connection per worker
    return $dbh->selectall_arrayref($sql, {}, @bind);
}

Thread::Subs::startup(DB => 10);  # 10 DB workers

Sequential Operations with Parallelism

Serialize access to shared resources without blocking:

sub append_file :Thread(clim=1, pool=SUB) {
    state $fh;
    $fh //= IO::File->new(">>log.txt");
    print $fh @_;
}

# Multiple callers won't block or interleave writes.
append_file("Entry 1\n");
append_file("Entry 2\n");

Requirements

Optional:

Documentation

Full documentation is available via perldoc:

perldoc Thread::Subs

Or view online at MetaCPAN.

Testing

make test

Tests cover:

Author

Brett Watson brett.watson@gmail.com

License

This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself.

See Also