From Code to Community: Sponsoring The Perl and Raku Conference 2025 Learn more

use strict;
use Moo;
use Scalar::Util ();
use Encode ();
use AE;
use PerlX::Maybe qw( maybe provided );
use Carp ();
# ABSTRACT: WebSocket connection for AnyEvent
our $VERSION = '0.55'; # VERSION
has handle => (
is => 'ro',
required => 1,
);
has masked => (
is => 'ro',
default => sub { 0 },
);
has subprotocol => (
is => 'ro',
);
has max_payload_size => (
is => 'ro',
);
has max_fragments => (
is => 'ro',
);
has close_code => (
is => 'rw',
);
has close_reason => (
is => 'rw',
);
has close_error => (
is => 'rw',
);
foreach my $type (qw( each_message next_message finish parse_error ))
{
has "_${type}_cb" => (
is => 'ro',
init_arg => undef,
default => sub { [] },
);
}
foreach my $flag (qw( _is_read_open _is_write_open ))
{
has $flag => (
is => 'rw',
init_arg => undef,
default => sub { 1 },
);
}
has "_is_finished" => (
is => 'rw',
init_arg => undef,
default => sub { 0 },
);
sub BUILD
{
my $self = shift;
Scalar::Util::weaken $self;
my @temp_messages = ();
my $are_callbacks_supposed_to_be_ready = 0;
my $finish = sub {
my(undef, undef, $message) = @_;
my $strong_self = $self; # preserve $self because otherwise $self can be destroyed in the callbacks.
return if $self->_is_finished;
eval
{
$self->_process_message($_) foreach @temp_messages;
};
@temp_messages = ();
$self->_is_finished(1);
$self->handle->push_shutdown;
$self->_is_read_open(0);
$self->_is_write_open(0);
$self->close_error($message) if defined $message;
$_->($self, $message) for @{ $self->_finish_cb };
};
$self->handle->on_error($finish);
$self->handle->on_eof($finish);
my $frame = Protocol::WebSocket::Frame->new(
maybe max_payload_size => $self->max_payload_size,
maybe max_fragments_amount => $self->max_fragments,
);
my $read_cb = sub {
my ($handle) = @_;
local $@;
my $strong_self = $self; # preserve $self because otherwise $self can be destroyed in the callbacks
my $success = eval
{
$frame->append($handle->{rbuf});
while(defined(my $body = $frame->next_bytes))
{
next if !$self->_is_read_open; # not 'last' but 'next' in order to consume data in $frame buffer.
my $message = AnyEvent::WebSocket::Message->new(
body => $body,
opcode => $frame->opcode,
);
if($are_callbacks_supposed_to_be_ready)
{
$self->_process_message($message);
}
else
{
push(@temp_messages, $message);
}
}
1; # succeed to parse.
};
if(!$success)
{
$self->_force_shutdown();
$_->($self, $@) for @{ $self->_parse_error_cb };
}
};
# Message processing (calling _process_message) is delayed by
# $are_callbacks_supposed_to_be_ready flag. This is necessary to
# make sure all received messages are delivered to each_message and
# next_message callbacks. If there is some data in rbuf, changing
# the on_read callback makes the callback fire, but there is of
# course no each_message/next_message callback to receive the
# message yet. So we put messages to @temp_messages for a
# while. After the control is returned to the user, who sets up
# each_message/next_message callbacks, @temp_messages are processed.
# An alternative approach would be temporarily disabling on_read by
# $self->handle->on_read(undef). However, this can cause a weird
# situation in TLS mode, because on_eof can fire even if we don't
# have any on_read (
# )
$self->handle->on_read($read_cb);
my $idle_w; $idle_w = AE::idle sub {
undef $idle_w;
if(defined($self))
{
my $strong_self = $self;
$are_callbacks_supposed_to_be_ready = 1;
local $@;
my $success = eval
{
$self->_process_message($_) foreach @temp_messages;
1;
};
@temp_messages = ();
if(!$success)
{
$self->_force_shutdown();
}
}
};
}
sub _process_message
{
my ($self, $received_message) = @_;
return if !$self->_is_read_open;
if($received_message->is_text || $received_message->is_binary)
{
# make a copy in order to allow specifying new callbacks inside the
# currently executed next_callback itself. otherwise, any next_callback
# added inside the currently executed callback would be added to the end
# of the array and executed for the currently processed message instead of
# actually the next one.
my @next_callbacks = @{ $self->_next_message_cb };
@{ $self->_next_message_cb } = ();
$_->($self, $received_message) for @next_callbacks;
# make a copy in case one of the callbacks get
# unregistered in the middle of the loop
my @callbacks = @{ $self->_each_message_cb };
$_->($self, $received_message, $self->_cancel_for(each_message => $_) )
for @callbacks;
}
elsif($received_message->is_close)
{
my $body = $received_message->body;
if($body)
{
my($code, $reason) = unpack 'na*', $body;
$self->close_code($code);
$self->close_reason(Encode::decode('UTF-8', $reason));
}
$self->_is_read_open(0);
$self->close();
}
elsif($received_message->is_ping)
{
$self->send(AnyEvent::WebSocket::Message->new(opcode => 10, body => $received_message->body));
}
}
sub _force_shutdown
{
my ($self) = @_;
$self->handle->push_shutdown;
$self->_is_write_open(0);
$self->_is_read_open(0);
}
sub send
{
my($self, $message) = @_;
my $frame;
return $self if !$self->_is_write_open;
if(ref $message)
{
$frame = Protocol::WebSocket::Frame->new(opcode => $message->opcode, buffer => $message->body, masked => $self->masked, max_payload_size => 0);
}
else
{
$frame = Protocol::WebSocket::Frame->new(buffer => $message, masked => $self->masked, max_payload_size => 0);
}
$self->handle->push_write($frame->to_bytes);
$self;
}
sub _cancel_for
{
my( $self, $event, $handler ) = @_;
my $handler_id = Scalar::Util::refaddr($handler);
return sub {
my $accessor = "_${event}_cb";
@{ $self->$accessor } = grep { Scalar::Util::refaddr($_) != $handler_id }
@{ $self->$accessor };
};
}
sub on
{
my($self, $event, $cb) = @_;
if($event eq 'next_message')
{
push @{ $self->_next_message_cb }, $cb;
}
elsif($event eq 'each_message')
{
push @{ $self->_each_message_cb }, $cb;
}
elsif($event eq 'finish')
{
push @{ $self->_finish_cb }, $cb;
}
elsif($event eq 'parse_error')
{
push @{ $self->_parse_error_cb }, $cb;
}
else
{
Carp::croak "unrecongized event: $event";
}
return $self->_cancel_for($event,$cb);
}
sub close
{
my($self, $code, $reason) = @_;
my $body = pack('n', ($code) ? $code : '1000');
$body .= Encode::encode 'UTF-8', $reason if defined $reason;
$self->send(AnyEvent::WebSocket::Message->new(
opcode => 8,
body => $body,
));
$self->handle->push_shutdown;
$self->_is_write_open(0);
$self;
}
if($] < 5.010)
{
# This is a workaround for GH#19
# I am not 100% sure about this, but maybe as a trade off it isn't
# too bad? The previous workaround was to downgrade to AE 6.x
# something. Unfortunately, we now require AE 7.x something for
# SSL bug fixes.
*DEMOLISH = sub
{
my($self) = @_;
eval { $self->handle->push_shutdown } if $self->_is_write_open;
};
}
1;
__END__
=pod
=encoding UTF-8
=head1 NAME
AnyEvent::WebSocket::Connection - WebSocket connection for AnyEvent
=head1 VERSION
version 0.55
=head1 SYNOPSIS
# send a message through the websocket...
$connection->send('a message');
# recieve message from the websocket...
$connection->on(each_message => sub {
# $connection is the same connection object
# $message isa AnyEvent::WebSocket::Message
my($connection, $message) = @_;
...
});
# handle a closed connection...
$connection->on(finish => sub {
# $connection is the same connection object
my($connection) = @_;
...
});
# close an opened connection
# (can do this either inside or outside of
# a callback)
$connection->close;
(See L<AnyEvent::WebSocket::Client> or L<AnyEvent::WebSocket::Server> on
how to create a connection)
=head1 DESCRIPTION
This class represents a WebSocket connection with a remote server or a
client.
If the connection object falls out of scope then the connection will be
closed gracefully.
This class was created for a client to connect to a server via
L<AnyEvent::WebSocket::Client>, and was later extended to work on the
server side via L<AnyEvent::WebSocket::Server>. Once a WebSocket
connection is established, the API for both client and server is
identical.
=head1 ATTRIBUTES
=head2 handle
The underlying L<AnyEvent::Handle> object used for the connection.
WebSocket handshake MUST be already completed using this handle.
You should not use the handle directly after creating L<AnyEvent::WebSocket::Connection> object.
Usually only useful for creating server connections, see below.
=head2 masked
If set to true, it masks outgoing frames. The default is false.
=head2 subprotocol
The subprotocol returned by the server. If no subprotocol was requested, it
may be C<undef>.
=head2 max_payload_size
The maximum payload size for received frames. Currently defaults to whatever
L<Protocol::WebSocket> defaults to.
=head2 max_fragments
The maximum number of fragments for received frames. Currently defaults to whatever
L<Protocol::WebSocket> defaults to.
=head2 close_code
If provided by the other side, the code that was provided when the
connection was closed.
=head2 close_reason
If provided by the other side, the reason for closing the connection.
=head2 close_error
If the connection is closed due to a network error, this will hold the
message.
=head1 METHODS
=head2 send
$connection->send($message);
Send a message to the other side. C<$message> may either be a string
(in which case a text message will be sent), or an instance of
L<AnyEvent::WebSocket::Message>.
=head2 on
$connection->on(each_message => $cb);
$connection->on(each_message => $cb);
$connection->on(finish => $cb);
Register a callback to a particular event.
For each event C<$connection> is the L<AnyEvent::WebSocket::Connection> and
and C<$message> is an L<AnyEvent::WebSocket::Message> (if available).
Returns a coderef that unregisters the callback when invoked.
my $cancel = $connection->on( each_message => sub { ... });
# later on...
$cancel->();
=head3 each_message
$cb->($connection, $message, $unregister)
Called each time a message is received from the WebSocket.
C<$unregister> is a coderef that removes this callback from
the active listeners when invoked.
=head3 next_message
$cb->($connection, $message)
Called only for the next message received from the WebSocket.
[0.49]
Adding a next_message callback from within a next_message callback will
result in a callback called on the next message instead of the current
one. There was a bug in previous versions where the callback would be
called immediately after current set of callbacks with the same message.
=head3 parse_error
$cb->($connection, $text_error_message)
Called if there is an error parsing a message sent from the remote end.
After this callback is called, the connection will be closed.
Among other possible errors, this event will trigger if a frame has a
payload which is larger that C<max_payload_size>.
=head3 finish
$cb->($connection, $message)
Called when the connection is terminated. If the connection is terminated
due to an error, the message will be provided as the second argument.
On a cleanly closed connection this will be `undef`.
=head2 close
$connection->close;
$connection->close($code);
$connection->close($code, $reason);
Close the connection. You may optionally provide a code and a reason.
The code is a 16-bit unsigned integer value that indicates why you close the connection. By default the code is 1000.
The reason is a character string (not an octet string) that further describes why you close the connection. By default the reason is an empty string.
=head1 SERVER CONNECTIONS
Although written originally to work with L<AnyEvent::WebSocket::Client>,
this class was designed to be used for either client or server WebSocket
connections. For details, contact the author and/or take a look at the
source for L<AnyEvent::WebSocket::Client> and the examples that come with
L<Protocol::WebSocket>.
=head1 SEE ALSO
=over 4
=item *
L<AnyEvent::WebSocket::Client>
=item *
L<AnyEvent::WebSocket::Message>
=item *
L<AnyEvent::WebSocket::Server>
=item *
L<AnyEvent>
=item *
L<RFC 6455 The WebSocket Protocol|http://tools.ietf.org/html/rfc6455>
=back
=for stopwords Joaquín José
=head1 AUTHOR
Author: Graham Ollis E<lt>plicease@cpan.orgE<gt>
Contributors:
Toshio Ito (debug-ito, TOSHIOITO)
José Joaquín Atria (JJATRIA)
Kivanc Yazan (KYZN)
Yanick Champoux (YANICK)
Fayland Lam (FAYLAND)
Daniel Kamil Kozar (xavery)
=head1 COPYRIGHT AND LICENSE
This software is copyright (c) 2013-2022 by Graham Ollis.
This is free software; you can redistribute it and/or modify it under
the same terms as the Perl 5 programming language system itself.
=cut