NAME

EV::Gearman - asynchronous Gearman client and worker on libev

SYNOPSIS

use EV;
use EV::Gearman;

# ----- client -----
my $cli = EV::Gearman->new(host => '127.0.0.1', port => 4730);

# foreground job: callback fires once on WORK_COMPLETE / WORK_FAIL,
# after any number of intermediate WORK_DATA / WORK_STATUS events
$cli->submit_job(reverse => 'hello', sub {
    my ($result, $err) = @_;
    die "job failed: $err" if $err;
    print "got: $result\n";          # "olleh"
    EV::break;
});

# foreground job with progress events
$cli->submit_job(crunch => $payload, {
    unique     => 'job-key',         # de-duplicate identical jobs
    on_data    => sub { print "partial: $_[0]\n"     },
    on_status  => sub { printf "%d/%d\n", @_         },
    on_warning => sub { warn "worker warning: $_[0]" },
}, sub {
    my ($result, $err) = @_;
    ...
});

# background job: callback fires once on JOB_CREATED with handle
$cli->submit_job_bg('mail::send', $payload, sub {
    my ($handle, $err) = @_;
    warn "queued: $handle\n";
});

# ----- worker -----
my $w = EV::Gearman->new(
    host       => '127.0.0.1',
    port       => 4730,
    client_id  => "worker-$$",
    reconnect  => 1,
);

# synchronous worker: return value -> WORK_COMPLETE; die -> WORK_FAIL
$w->register_function(reverse => sub {
    my ($job) = @_;                  # EV::Gearman::Job
    return scalar reverse $job->workload;
});

# asynchronous worker: defer completion via a timer / external IO
$w->register_function(slow => { async => 1 }, sub {
    my $job = shift;
    my $t; $t = EV::timer 5, 0, sub { $job->complete("done"); undef $t };
});

$w->work;                            # GRAB → JOB_ASSIGN → WORK_COMPLETE
EV::run;

DESCRIPTION

A pure-XS Gearman client and worker built on the EV event loop. The binary protocol is implemented directly — no libgearman build dependency, no glue layer between Perl and a third-party C client.

A single EV::Gearman instance can act as a client, a worker, or both: foreground submissions and worker GRAB_JOB packets multiplex over the same connection, and responses are routed by their request type (head of FIFO) or by the job handle (work events). Pipelining is the default, so submitting N jobs in a tight loop ships them in one batched write and lets the server stream replies back at full throughput.

The text/admin protocol (status, workers, version, maxqueue, shutdown) shares the connection too; multi-line replies are buffered to the ".\n" terminator and delivered as a single string.

AnyEvent applications work unchanged when EV is the active backend.

PROTOCOL OVERVIEW

The Gearman wire format is a 12-byte header followed by a payload:

+--------+---------------+----------------+----------------+
| magic  |  command (BE) |  data len (BE) |     data       |
| 4 byte |    4 bytes    |    4 bytes     |   data len B   |
+--------+---------------+----------------+----------------+
  "\0REQ"     uint32         uint32        NUL-separated
    or                                     args (last arg
  "\0RES"                                  unterminated)

Foreground job lifecycle (client perspective):

SUBMIT_JOB           -->                              # request
                     <--   JOB_CREATED(handle)        # ack
                     <--   WORK_DATA(handle, ...)*    # 0..N
                     <--   WORK_WARNING(handle, ...)*
                     <--   WORK_STATUS(handle, n,d)*
                     <--   WORK_EXCEPTION(handle,..)* # if option set
                     <--   WORK_COMPLETE(handle, r)   # terminal
                             -- or --
                     <--   WORK_FAIL(handle)          # terminal

The handle binds the per-job event callbacks to the right submission even when many submissions are in flight at once.

Worker lifecycle:

CAN_DO(func)         -->                              # advertise
                     repeat ----------------------+
GRAB_JOB             -->                          |
                     <--   JOB_ASSIGN(h,fn,wl)    |   # got work
                             ... user callback   |
WORK_COMPLETE(h, r)  -->                          |
                     -------------- or -----------+
                     <--   NO_JOB                 |
PRE_SLEEP            -->                          |
                     <--   NOOP                   |   # wake-up
                     ----------------------------+

EV::Gearman drives that state machine in C; the per-function callback only sees a ready-to-process EV::Gearman::Job.

ENCODING

All function names, payloads, results, and handles are byte strings on the wire. Encode UTF-8 yourself before passing data in:

use Encode;
$cli->submit_job(reverse => encode_utf8($str), sub {
    my $result = decode_utf8($_[0] // '');
    ...
});

Workload and result values can contain arbitrary bytes including embedded NULs — Gearman's framing puts the payload last in the packet and uses the header's length field, so it is not NUL-bounded.

CALLBACK CONVENTIONS

Every command callback receives ($result, $err). On success $err is undef; on failure $err is a string like "disconnected", "job failed", "command timeout", or text forwarded from the server (e.g. "INVALID_FUNCTION_NAME: ...").

Callback exceptions are caught with G_EVAL and surfaced via warn so a stray die from your code never unwinds the libev event loop. Use EV::break to abort the loop deliberately.

CONSTRUCTOR

new(%options)

my $g = EV::Gearman->new(
    host             => '127.0.0.1',
    port             => 4730,
    on_error         => sub { warn "@_" },
    on_connect       => sub { ... },
    on_disconnect    => sub { ... },
    connect_timeout  => 5_000,    # ms
    command_timeout  => 30_000,   # ms
    reconnect        => 1,
    reconnect_delay  => 1000,     # ms
    keepalive        => 60,       # seconds (TCP only)
    exceptions       => 1,        # request "exceptions" option
    client_id        => "worker-$$",
    grab_unique      => 1,        # use GRAB_JOB_UNIQ
);

If host (or path) is given, a non-blocking connect starts immediately. With neither, the object is unconfigured; call $g->connect / $g->connect_unix later.

All keys default to undef unless noted. Booleans accept any Perl truthy value.

Connection

host => $str
port => $int

TCP host and port. Default port: 4730. Mutually exclusive with path.

Name resolution is currently synchronous: a non-numeric host is passed straight to getaddrinfo, which can block the event loop for the system resolver timeout. Pass an IP literal (or pre-resolve once) to keep reconnect cycles fully non-blocking.

path => $str

Unix-domain socket path. Mutually exclusive with host.

loop => $ev_loop

EV loop to attach to. Default: EV::default_loop.

priority => $num

EV watcher priority in -2 .. +2. Higher = serviced before other EV watchers in the same iteration. Default 0.

keepalive => $seconds

TCP keepalive idle interval. 0 disables. Ignored on Unix sockets.

Timeouts

connect_timeout => $ms

Abort an in-progress non-blocking connect after this many ms. 0 = no timeout (default).

command_timeout => $ms

Disconnect with "command timeout" if no response arrives within this interval. The timer resets on every byte received. 0 = no timeout (default).

Reconnect

reconnect => $bool

Enable automatic reconnect on transport errors.

reconnect_delay => $ms

Wait this many ms before each reconnect attempt. Default 1000. The delay is always honored via a timer, so even 0 defers through the event loop (no synchronous retry recursion).

max_reconnect_attempts => $num

Give up after this many consecutive failures and emit "max reconnect attempts reached". 0 = unlimited (default).

After a reconnect, all worker CAN_DO/CAN_DO_TIMEOUT registrations and the exceptions option are re-sent automatically.

Worker / option flags

exceptions => $bool

If true, the exceptions option is sent on every connect, so foreground clients receive WORK_EXCEPTION packets. For workers, this also enables forwarding die messages from sync callbacks as exceptions before the WORK_FAIL.

client_id => $str

Sent as SET_CLIENT_ID on every connect. Visible in the admin workers output.

grab_unique => $bool

If true, the worker GRAB loop uses GRAB_JOB_UNIQ, so the job object exposes the unique key supplied by the submitter.

Event handlers

on_error => $cb->($errstr)

Connection-level error callback. Default: warn. User callbacks are run under G_EVAL.

on_connect => $cb->()

Fires once the TCP/Unix connection is fully established and the client has enqueued its options and worker-function CAN_DOs. Those packets sit ahead of any user submissions made from inside the callback — so submitting a job here is safe even though the ability registrations haven't yet hit the socket.

on_disconnect => $cb->()

Fires after a disconnect, after pending callbacks have been cancelled with the disconnect error. For server-initiated close, this fires before on_error.

CONNECTION

connect($host, [$port])

Connect to a TCP host. Port defaults to 4730. Cancels any pending auto-reconnect timer and clears any prior path.

connect_unix($path)

Connect via Unix socket.

disconnect

Disconnect cleanly. Cancels reconnect, drains pending callbacks with (undef, "disconnected"), fires on_disconnect. on_error does not fire on user-initiated disconnect — this distinguishes it from server-initiated close.

is_connected

Returns true while a session is established or connection is in progress.

CLIENT API

echo($data, [$cb->($echoed, $err)])

Round-trip ECHO_REQ. Useful as a ping or to verify that all prior pipelined requests have been consumed.

submit_job($func, $workload, [\%opts], [$cb])

submit_job_high($func, $workload, [\%opts], [$cb])

submit_job_low($func, $workload, [\%opts], [$cb])

Foreground submission with normal / high / low priority. The callback fires once on the terminal event:

$cb->($result, undef)        # WORK_COMPLETE
$cb->(undef, "job failed")   # WORK_FAIL
$cb->(undef, "exception")    # WORK_EXCEPTION (data via on_exception)
$cb->(undef, $errstr)        # ERROR / disconnect

Optional opts:

unique       => $key                   # de-dup / coalesce
on_data      => $cb->($partial)        # WORK_DATA
on_warning   => $cb->($w)              # WORK_WARNING
on_status    => $cb->($num, $denom)    # WORK_STATUS
on_exception => $cb->($exc)            # WORK_EXCEPTION

The status callback receives $numerator and $denominator as strings, matching the wire format (Gearman doesn't constrain them to integers).

A single packet (function name + unique key + workload) is capped at 256 MiB; a larger submission croaks before anything is sent. This applies to every submit_job* variant.

submit_job_bg($func, $workload, [\%opts], [$cb])

submit_job_high_bg($func, $workload, [\%opts], [$cb])

submit_job_low_bg($func, $workload, [\%opts], [$cb])

Background submission. Callback fires once on JOB_CREATED with ($handle, $err); subsequent work events are not delivered to this client. unique in %opts is honored; per-event handlers (on_data, on_warning, ...) are ignored because the server emits no work events to a background submitter.

submit_job_epoch($func, $workload, $epoch, [\%opts], [$cb])

Schedule a background job for absolute epoch time $epoch (seconds since 1970-01-01 UTC). Same callback shape as submit_job_bg: ($handle, $err) on JOB_CREATED. Of %opts, only unique is meaningful — the per-event handlers (on_data, on_warning, etc.) are silently ignored because the server delivers no work events to the submitting client for scheduled / background jobs. Server must be built with persistent queue support for scheduled jobs to survive a restart.

get_status($handle, $cb->($info, $err))

Query the server about a known handle:

$info = {
    handle      => 'H:host:1',
    known       => 0|1,           # server has the job
    running     => 0|1,           # a worker grabbed it
    numerator   => '42',          # last reported progress
    denominator => '100',
}

get_status_unique($unique, $cb->($info, $err))

Status by unique key; $info additionally has unique and client_count (how many clients are listening).

option($name, [$cb->($ok, $err)])

Send an OPTION_REQ. The exceptions option is also tracked client-side so reconnects re-enable it without your help.

WORKER API

register_function($name, [\%opts], $cb->($job))

Register a worker handler for function $name. Sends CAN_DO (or CAN_DO_TIMEOUT) on the wire if connected; otherwise the ability is queued and sent on connect.

%opts:

timeout => $seconds   # CAN_DO_TIMEOUT instead of CAN_DO
async   => $bool      # see below

Sync mode (default): the callback's return value is sent as the WORK_COMPLETE body. die becomes WORK_FAIL (and WORK_EXCEPTION when the exceptions option is on).

Async mode: the callback's return value is ignored; you must explicitly call $job->complete($result), $job->fail, or $job->exception($data) from a later event. The worker loop does immediately grab the next job after dispatching to your async callback, so async workers process jobs concurrently — bounded only by what the server has queued. To cap concurrency, call $g->work_stop in the callback when you reach the cap and $g->work again from complete/fail when a slot frees up. See eg/worker_pool.pl for a worked example.

unregister_function($name)

Alias for cant_do.

can_do($name, [$timeout])

Lower-level: announce ability without a Perl handler. Combine with grab_job to build a custom worker loop.

cant_do($name)

Withdraw an ability.

reset_abilities

Withdraw all abilities (sends RESET_ABILITIES).

set_client_id($id)

Send SET_CLIENT_ID. This is what shows up in the admin workers command.

work([$on_idle])

Activate the worker loop. Issues a GRAB_JOB[_UNIQ] on the wire as soon as the connection is established (deferred until then if called pre-connect); on JOB_ASSIGN the registered function callback runs; on NO_JOB the connection sleeps via PRE_SLEEP until the server sends NOOP, then resumes grabbing. $on_idle fires every time the loop enters the sleep state.

work_one([$cb])

Dispatch exactly one job, then stop the loop. Issues a GRAB_JOB; on NO_JOB the worker enters PRE_SLEEP and waits for the server's NOOP wake-up just like work, so this is a "wait for one job" — not "try once and bail" (use grab_job for that).

In sync mode the loop stops after the user callback returns and the WORK_COMPLETE/WORK_FAIL packet has been queued. In async mode it stops as soon as the user callback has been dispatched — the job is still in flight at the server until the user explicitly calls $job->complete(...) or $job->fail.

The optional $cb is invoked when the worker enters the sleep state, and shares storage with work's $on_idle — calling work_one after work overwrites whatever idle handler work set, and vice versa.

work_stop

Drop out of the worker loop. The connection stays up; in-flight jobs continue to deliver their results, but no new GRAB_JOB will be sent.

grab_job($cb)

Lower-level: request exactly one GRAB_JOB[_UNIQ]. The callback gets a job object on JOB_ASSIGN, or (undef, "no job") on NO_JOB. The user is responsible for completing the job. Requires a prior can_do (or register_function) so the server knows this connection serves the function.

Unlike the managed work loop, grab_job does not de-duplicate: calling it again before the previous grab has been answered puts a second GRAB_JOB on the wire. Drive one grab at a time (issue the next from the previous callback) as in eg/cron_consumer.pl.

all_yours

Send ALL_YOURS. Hint to the server that this worker handles all known abilities and should be preferred for new jobs.

ADMIN / TEXT PROTOCOL

The Gearman text protocol shares the same TCP/Unix connection. We dispatch each incoming byte stream to either the binary parser or the text parser based on whether the first byte is \0 (binary) or printable (admin). Multi-line replies are accumulated to the ".\n" terminator and delivered as a single string.

admin($command, [$cb->($text, $err)])

Send a raw text command (newline appended automatically). Replies are accumulated until the ".\n" terminator for the multi-line commands status, workers, and prioritystatus; everything else is treated as single-line.

server_status([$cb])

Tab-separated lines: FUNC \t TOTAL_JOBS \t RUNNING_JOBS \t WORKERS.

server_workers([$cb])

One line per connected worker.

server_version([$cb])

Single-line reply (e.g. "OK 1.1.21+ds").

maxqueue($func, $size, [$cb])

Set the per-function queue size cap. Reply is "OK\n".

shutdown_server(graceful => $bool, cb => $cb)

Send shutdown or shutdown graceful. The server replies with "OK\n" and then closes the connection.

INTROSPECTION

pending_count

Number of binary requests sent and awaiting a response.

waiting_count

Number of requests held in the local pre-connect queue.

active_count

Number of foreground jobs whose handle has been received but which haven't yet completed.

ACCESSORS

These tunables have a getter / setter of the same name. Calling without arguments reads the current value; with one argument, writes and (where meaningful) takes effect immediately:

$g->connect_timeout($ms);
$g->command_timeout($ms);
$g->priority($num);
$g->keepalive($seconds);
$g->on_error($cb);         # set; pass undef to clear
$g->on_connect($cb);
$g->on_disconnect($cb);

The remaining new options (host, port, path, exceptions, client_id, grab_unique, ...) are set once at construction and have no accessor.

reconnect is the exception — it is a setter only; pass 0/1 plus optional new delay and attempt cap:

$g->reconnect($enable, [$delay_ms], [$max_attempts]);

Omitting $delay_ms / $max_attempts leaves the previously configured values unchanged.

reconnect_enabled

The getter that reconnect lacks: returns true while automatic reconnect is enabled, false otherwise. Takes no arguments.

LIFECYCLE AND DESTRUCTION

When a connection drops, the FIFO of pending requests is drained with (undef, "disconnected"); foreground active jobs are drained with the same error. Reconnect (if enabled) re-runs the connect sequence and re-registers worker abilities.

When the EV::Gearman object goes out of scope, every pending and active callback fires once with (undef, "disconnected"), then the FD is closed. The clean-shutdown idiom is:

$g->disconnect;            # drains queues, fires on_disconnect
undef $g;

If callbacks close over $g (a common mistake — every reference inside a closure keeps the object alive), break the cycle first:

$g->on_error(undef);
$g->on_connect(undef);
$g->on_disconnect(undef);
undef $g;

DESTROY is reentrancy-safe: if a callback fired during teardown drops the last external reference to a separate EV::Gearman, that object's DESTROY is correctly deferred and run once unwound.

PERFORMANCE

Loopback benchmark on Linux, Perl 5.40, gearmand 1.1.21, single worker (always EV::Gearman so the worker isn't the bottleneck).

bench/benchmark.pl measures one client by itself:

                                 ops/sec
Pipelined foreground jobs        ~53,000
Sequential round-trip            ~19,000
Background submissions          ~280,000

bench/vs.pl compares against the existing CPAN clients (Gearman::Client 2.004.015 sync, AnyEvent::Gearman 0.10 async). Numbers in operations / second:

                      EV::Gearman   AnyEvent::Gearman   Gearman::Client
pipelined foreground   ~51,000          ~5,200              n/a (1)
sequential round-trip  ~19,000          ~5,900             ~5,400
background submits    ~248,000          ~5,400              n/a (1)

(1) Gearman::Client is synchronous — it has neither pipelining
    nor concurrent background submits.

EV::Gearman is roughly 10x the foreground throughput of AnyEvent::Gearman, 45x the background submission rate, and 3x the sequential round-trip rate. The gap comes from three places:

  • Pipelining is the default. Submitting N jobs in a tight loop ships them in batched writes; responses are demultiplexed by handle as they stream back. AnyEvent::Gearman is async but serializes one request per round-trip, so it pays a full RTT per job.

  • The protocol implementation is C/XS — packet encode/decode, buffer growth, and FIFO bookkeeping run without per-call Perl allocations.

  • The IO layer is direct ev_io on the gearmand socket, so each read/write involves no AnyEvent guard-object construction or backend-dispatch overhead.

Background submissions are particularly fast because the JOB_CREATED reply is the only round-trip — no work events to demultiplex — so the limit is just network latency and parser throughput.

Sequential round-trip throughput is the worst case: each job waits for its own reply before the next is built, so pipelining buys nothing. EV::Gearman is still ~3x faster here purely from the C-side protocol parser.

Numbers reproduce with bench/vs.pl (run with --help for options).

Memory

Each connection keeps one read and one write buffer that grow on demand to fit the largest packet seen. After a buffer fully drains, anything that grew past ~1 MiB is released back to the initial 16 KiB, so a one-off large job (or status reply) does not pin its high-water-mark allocation for the life of the connection. Steady small-packet traffic never reallocates.

Pass byte strings, not Perl lists: building an N-byte payload with join '', map chr(...), 1..$n materializes N scalars first. For large payloads use the repeat operator ($block x $count) or pack.

EXAMPLES

The eg/ directory has runnable scripts (point them at a local gearmand on 127.0.0.1:4730):

  • Clientsclient.pl (one foreground job), pipeline.pl (concurrent submissions), background.pl (fire-and-forget), scheduled.pl (submit_job_epoch delayed jobs), unique.pl (job coalescing + get_status_unique), event_client.pl (consume WORK_DATA/WORK_STATUS), fanout.pl (scatter/gather), priority_queue.pl, json_payload.pl (structured payloads), retry.pl (client-side backoff).

  • Workersworker.pl (serve forever), async_worker.pl (timer-driven completion), worker_pool.pl (concurrency cap), graceful_worker.pl (drain on SIGTERM), cron_consumer.pl (grab_job drain-and-exit batch), exceptions.pl (WORK_EXCEPTION round-trip).

  • Operationsadmin.pl, monitoring.pl (poll metrics), reconnect.pl (survive a server bounce), multi_server.pl (round-robin across a farm), unix_socket.pl (connect_unix), error_handling.pl, anyevent.pl (run under AnyEvent).

SEE ALSO

EV, AnyEvent.

Gearman::Client, Gearman::Worker — the synchronous reference client and worker on CPAN.

AnyEvent::Gearman — older AE-based async client.

https://gearman.org/protocol/ — the upstream protocol spec.

This module is one of the EV-* family on CPAN by the same author: EV::Memcached, EV::Redis, EV::Nats, EV::Pg, EV::ClickHouse, EV::Kafka, EV::MariaDB, EV::cares, EV::Etcd, EV::Websockets.

AUTHOR

vividsnow

LICENSE

This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself.