NAME
Beekeeper::Worker - Base class for creating services
VERSION
Version 0.09
SYNOPSIS
package MyApp::Worker;
use Beekeeper::Worker ':log';
use base 'Beekeeper::Worker';
sub on_startup {
my $self = shift;
$self->accept_notifications(
'myapp.msg' => 'got_message',
);
$self->accept_remote_calls(
'myapp.sum' => 'do_sum',
);
log_info 'Ready';
}
sub authorize_request {
my ($self, $req) = @_;
return BKPR_REQUEST_AUTHORIZED;
}
sub got_message {
my ($self, $params) = @_;
warn $params->{message};
}
sub do_sum {
my ($self, $params) = @_;
return $params->[0] + $params->[1];
}
DESCRIPTION
Base class for creating services.
METHODS
CONSTRUCTOR
Beekeeper::Worker objects are created automatically by Beekeeper::WorkerPool after spawning new processes.
METHODS
on_startup
This method is executed on a fresh worker process immediately after it was spawned, after connecting to the broker and initializing the logger.
The default implementation is just a placeholder, intended to be overrided in subclasses.
This is the place to perform startup tasks (like creating database or cache connections) and declare which calls and notifications the worker will accept.
After this method returns the worker will wait for incoming events to handle.
on_shutdown
This method is executed just before a worker process is stopped.
It can be overrided as needed, the default implementation does nothing.
authorize_request( $req )
This method must be overrided in worker classes, as the default behavior is to deny the execution of any request.
When a request is received this method is called before executing the corresponding handler, and it must return the exported constant BKPR_REQUEST_AUTHORIZED
in order to authorize it. Returning any other value will result in the request being ignored.
This is the place to handle application authentication and authorization.
Parameter $req
is either a Beekeeper::JSONRPC::Notification or a Beekeeper::JSONRPC::Request object.
log_handler
By default, all workers use a Beekeeper::Logger logger which logs errors and warnings to files and also to a topic on the message bus. The command line tool bkpr-log allows to inspect in real time the logs from the message bus.
This method can be overrided in worker classes in order to replace the default log mechanism for another one. To do so, the new implementation must return an object implementing a log
method (see Beekeeper::Logger::log
for reference).
For convenience you can import the ':log' symbols and expose to your class the functions log_fatal
, log_alert
, log_critical
, log_error
, log_warn
, log_warning
, log_notice
, log_info
, log_debug
, log_trace
and log_level
.
These will call the underlying log
method of the logger class if the severity is equal or higher than $Beekeeper::Worker::LogLevel
, which is LOG_INFO
by default and can be set with log_level
. The default level can be set globally to LOG_DEBUG
with the --debug option of bkpr, or setting a "debug" option to a true value in config file pool.config.json.
Using these functions makes very easy to switch logging backends at a later date.
All warnings and errors generated by the execution of the worker code are logged (unless their severity is below the current log level).
RPC call methods
In order to make RPC calls to another services all methods from Beekeeper::Client are imported automatically. Workers can use send_notification
, call_remote
, call_remote_async
, fire_remote
, wait_async_calls
, set_authentication_data
and get_authentication_data
the same as clients.
accept_notifications ( $method => $callback, ... )
Makes a worker start accepting the specified notifications from the message bus.
$method
is a string with the format {service_class}.{method}
. A default or fallback handler can be specified using a wildcard like {service_class}.*
.
$callback
is the method handler (a method name or a coderef) that will be called when a notification is received. When executed, the handler will receive two parameters $params
(which contains the notification data itself) and $req
which is a Beekeeper::JSONRPC::Notification object (usually redundant unless it is necessary to inspect the MQTT properties of the notification).
Notifications are not expected to return a value. Any value returned from notification handlers will be ignored.
The handler is executed within an eval block. If it dies the error will be logged but the worker will continue running.
Example:
package MyWorker;
use Beekeeper::Worker ':log';
use base 'Beekeeper::Worker';
sub on_startup {
my ($self) = @_;
$self->accept_notifications(
'foo.bar' => 'bar', # call $self->bar for notifications 'foo.bar'
'foo.baz' => $coderef, # call $coderef->() for notifications 'foo.baz'
'foo.*' => 'fallback', # call $self->fallback for any other 'foo.*'
);
}
sub bar {
my ($self, $params, $req) = @_;
# $self is a MyWorker object
# $params is a ref to the notification data
# $req is a Beekeeper::JSONRPC::Notification object
log_warn "Got a notification foo.bar";
}
accept_remote_calls ( $method => $callback, ... )
Makes a worker start accepting the specified RPC requests from the message bus.
$method
is a string with the format {service_class}.{method}
. A default or fallback handler can be specified using a wildcard like {service_class}.*
.
$callback
is the method handler (a method name or a coderef) that will be called when a request is received. When executed, the handler will receive two parameters $params
(which contains the notification data itself) and $req
which is a Beekeeper::JSONRPC::Request object.
The value or reference returned by the handler will be sent back to the caller as response (unless the response is deferred with $req->async_response
).
The handler is executed within an eval block. If it dies the error will be logged and the caller will receive a generic error response, but the worker will continue running.
Example:
package MyWorker;
use Beekeeper::Worker ':log';
use base 'Beekeeper::Worker';
sub on_startup {
my ($self) = @_;
$self->accept_remote_calls(
'foo.inc' => 'increment', # call $self->increment for requests to 'foo.inc'
'foo.baz' => $coderef, # call $coderef->() for requests to 'foo.baz'
'foo.*' => 'fallback', # call $self->fallback for any other 'foo.*'
);
}
sub increment {
my ($self, $params, $req) = @_;
# $self is a MyWorker object
# $params is a ref to the parameters of the request
# $req is a Beekeeper::JSONRPC::Request object
log_warn "Got a call to foo.inc";
return $params->{number} + 1;
}
Remote calls can be processed concurrently by means of calling $req->async_response
to tell Beekeeper that the response for the request will be deferred until it is available, freeing the worker to accept more requests. Once the response is ready, it must be sent back to the caller with $req->send_response
.
This handler process requests concurrently:
sub increment {
my ($self, $params, $req) = @_;
my $number = $params->{number};
$req->async_response;
my $t; $t = AnyEvent->timer( after => 1, cb => sub {
undef $t;
$req->send_response( $number + 1 );
});
}
Note that callback closures will not be executed in Beekeeper scope but in the event loop one, so uncatched exceptions in these closures will cause the worker to die and be respawn.
Asynchronous method handlers use system resources more efficiently, but are significantly harder to write and debug.
stop_accepting_notifications ( $method, ... )
Makes a worker stop accepting the specified notifications from the message bus.
$method
must be one of the strings used previously in accept_notifications
.
stop_accepting_calls ( $method, ... )
Makes a worker stop accepting the specified RPC requests from the message bus.
$method
must be one of the strings used previously in accept_remote_calls
.
stop_working
Makes a worker stop accepting new RPC requests, process all requests already received, execute on_shutdown
method, and then exit.
This is the default signal handler for TERM
signal.
Please note that it is not possible to stop worker pools calling this method, as WorkerPool will immediately respawn another worker after the current one exits.
SEE ALSO
Beekeeper::Client, Beekeeper::Config, Beekeeper::Logger, Beekeeper::WorkerPool.
AUTHOR
José Micó, jose.mico@gmail.com
COPYRIGHT AND LICENSE
Copyright 2015-2023 José Micó.
This is free software; you can redistribute it and/or modify it under the same terms as the Perl 5 programming language itself.
This software is distributed in the hope that it will be useful, but it is provided “as is” and without any express or implied warranties. For details, see the full text of the license in the file LICENSE.