package AnyEvent::RabbitMQ::Fork::Worker; $AnyEvent::RabbitMQ::Fork::Worker::VERSION = '0.5'; =head1 NAME AnyEvent::RabbitMQ::Fork::Worker - Fork side magic =head1 DESCRIPTION No user serviceable parts inside. Venture at your own risk. =cut use Moo; use Types::Standard qw(InstanceOf Bool); use Guard; use Scalar::Util qw(weaken blessed); use namespace::clean; use AnyEvent::RabbitMQ 1.18; has verbose => (is => 'rw', isa => Bool, default => 0); has connection => ( is => 'lazy', isa => InstanceOf['AnyEvent::RabbitMQ'], clearer => 1, handles => ['channels'], ); sub _build_connection { my $self = shift; my $conn = AnyEvent::RabbitMQ->new(verbose => $self->verbose); _cb_hooks($conn); return $conn; } ### RPC Interface ### my $instance; sub init { my $class = shift; $instance = $class->new(@_); return; } sub run { my ($done, $method, $ch_id, @args, %args) = @_; weaken(my $self = $instance); unless (@args % 2) { %args = @args; @args = (); foreach my $event (grep { /^on_/ } keys %args) { # callback signature provided by parent process my $sig = delete $args{$event}; # our callback to be used by AE::RMQ $args{$event} = $self->_generate_callback($method, $event, $sig); } } if (defined $ch_id and my $ch = $self->channels->{ $ch_id }) { $ch->$method(@args ? @args : %args); $done->(); } elsif (defined $ch_id and $ch_id == 0) { if ($method eq 'DEMOLISH') { $self->clear_connection; } else { $self->connection->$method(@args ? @args : %args); } $done->(); } else { $ch_id ||= '<undef>'; $done->("Unknown channel: '$ch_id'"); } return; } my %cb_hooks = ( channel => { _state => 'is_open', _is_active => 'is_active', _is_confirm => 'is_confirm', }, connection => { _state => 'is_open', _login_user => 'login_user', _server_properties => 'server_properties', } ); sub _cb_hooks { weaken(my $obj = shift); my ($type, $hooks) = $obj->isa('AnyEvent::RabbitMQ') ? ('connection', $cb_hooks{connection}) : ($obj->id, $cb_hooks{channel}); foreach my $prop (keys %$hooks) { my $method = $hooks->{$prop}; ## no critic (Miscellanea::ProhibitTies) tie $obj->{$prop}, 'AnyEvent::RabbitMQ::Fork::Worker::TieScalar', $obj->{$prop}, sub { AnyEvent::Fork::RPC::event( i => { $type => { $method => $obj->$method } }); }; } return; } sub _generate_callback { my ($self, $method, $event, $sig) = @_; my $should_clear_connection = ( $sig->[-1] eq 'AnyEvent::RabbitMQ' and ($method eq 'close' or ($method eq 'connect' and $event eq 'on_close')) ) ? 1 : 0; my $guard = guard { # inform parent process that this callback is no longer needed AnyEvent::Fork::RPC::event(cbd => @$sig); }; # our callback to be used by AE::RMQ weaken(my $wself = $self); return sub { $guard if 0; # keepalive $wself->clear_connection if $should_clear_connection; if ((my $isa = blessed $_[0] || q{}) =~ /^AnyEvent::RabbitMQ/) { # we put our sentry value in place later my $obj = shift; if ($method eq 'open_channel' and $event eq 'on_success') { my $id = $obj->id; $obj->{"_$wself\_guard"} ||= guard { # channel was GC'd by AE::RMQ AnyEvent::Fork::RPC::event(chd => $id); }; # needs to be done after parent registers channel AE::postpone { _cb_hooks($obj) }; } if ($isa eq 'AnyEvent::RabbitMQ') { # replace with our own handling $obj->{_handle} ->on_drain(sub { AnyEvent::Fork::RPC::event('cdw') }); } # this is our signal back to the parent as to what kind of object # it was unshift @_, \[$isa, ($isa eq 'AnyEvent::RabbitMQ::Channel' ? $obj->id : ())]; } # these values don't pass muster with Storable delete local @{ $_[0] }{ 'fh', 'on_error', 'on_drain' } if $method eq 'connect' and $event = 'on_failure' and blessed $_[0]; # tell the parent to run the users callback known by $sig AnyEvent::Fork::RPC::event(cb => $sig, @_); }; } =head1 AUTHOR William Cox <mydimension@gmail.com> =head1 COPYRIGHT Copyright (c) 2014, the above named author(s). =head1 LICENSE This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself. =cut package # hide from PAUSE AnyEvent::RabbitMQ::Fork::Worker::TieScalar; use strict; use warnings; sub TIESCALAR { $_[2]->(); return bless [$_[1], $_[2]] => $_[0] } sub FETCH { return $_[0][0] } sub STORE { $_[0][1]->(); return $_[0][0] = $_[1] } sub DESTROY { return @{ $_[0] } = () } 1;