package Mojo::Transaction::WebSocket; use Mojo::Base 'Mojo::Transaction'; # "I'm not calling you a liar but... # I can't think of a way to finish that sentence." use Config; use Mojo::Transaction::HTTP; use Mojo::Util qw(b64_encode decode encode sha1_bytes); use constant DEBUG => $ENV{MOJO_WEBSOCKET_DEBUG} || 0; # 64bit Perl use constant MODERN => $Config{ivsize} > 4; # Unique value from the spec use constant GUID => '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'; # Opcodes use constant { CONTINUATION => 0, TEXT => 1, BINARY => 2, CLOSE => 8, PING => 9, PONG => 10 }; has handshake => sub { Mojo::Transaction::HTTP->new }; has 'masked'; has max_websocket_size => sub { $ENV{MOJO_MAX_WEBSOCKET_SIZE} || 262144 }; sub new { my $self = shift->SUPER::new(@_); $self->on(frame => sub { shift->_message(@_) }); return $self; } sub build_frame { my ($self, $fin, $rsv1, $rsv2, $rsv3, $op, $payload) = @_; warn "-- Building frame ($fin, $rsv1, $rsv2, $rsv3, $op)\n" if DEBUG; # Head my $frame = 0b00000000; vec($frame, 0, 8) = $op | 0b10000000 if $fin; vec($frame, 0, 8) |= 0b01000000 if $rsv1; vec($frame, 0, 8) |= 0b00100000 if $rsv2; vec($frame, 0, 8) |= 0b00010000 if $rsv3; # Small payload my $len = length $payload; my $prefix = 0; my $masked = $self->masked; if ($len < 126) { warn "-- Small payload ($len)\n$payload\n" if DEBUG; vec($prefix, 0, 8) = $masked ? ($len | 0b10000000) : $len; $frame .= $prefix; } # Extended payload (16bit) elsif ($len < 65536) { warn "-- Extended 16bit payload ($len)\n$payload\n" if DEBUG; vec($prefix, 0, 8) = $masked ? (126 | 0b10000000) : 126; $frame .= $prefix; $frame .= pack 'n', $len; } # Extended payload (64bit with 32bit fallback) else { warn "-- Extended 64bit payload ($len)\n$payload\n" if DEBUG; vec($prefix, 0, 8) = $masked ? (127 | 0b10000000) : 127; $frame .= $prefix; $frame .= MODERN ? pack('Q>', $len) : pack('NN', 0, $len & 0xFFFFFFFF); } # Mask payload if ($masked) { my $mask = pack 'N', int(rand 9999999); $payload = $mask . _xor_mask($payload, $mask); } return $frame . $payload; } sub client_challenge { my $self = shift; return $self->_challenge($self->req->headers->sec_websocket_key) eq $self->res->headers->sec_websocket_accept; } sub client_handshake { my $self = shift; # Default headers my $headers = $self->req->headers; $headers->upgrade('websocket') unless $headers->upgrade; $headers->connection('Upgrade') unless $headers->connection; $headers->sec_websocket_protocol('mojo') unless $headers->sec_websocket_protocol; $headers->sec_websocket_version(13) unless $headers->sec_websocket_version; # Generate WebSocket challenge $headers->sec_websocket_key(b64_encode(pack('N*', int(rand 9999999)), '')) unless $headers->sec_websocket_key; } sub client_read { shift->server_read(@_) } sub client_write { shift->server_write(@_) } sub connection { shift->handshake->connection } sub finish { my $self = shift; $self->send([1, 0, 0, 0, CLOSE, '']); $self->{finished} = 1; return $self; } sub is_websocket {1} sub kept_alive { shift->handshake->kept_alive } sub local_address { shift->handshake->local_address } sub local_port { shift->handshake->local_port } sub parse_frame { my ($self, $buffer) = @_; # Head return unless length(my $clone = $$buffer) >= 2; my $head = substr $clone, 0, 2; # FIN my $fin = (vec($head, 0, 8) & 0b10000000) == 0b10000000 ? 1 : 0; # RSV1-3 my $rsv1 = (vec($head, 0, 8) & 0b01000000) == 0b01000000 ? 1 : 0; my $rsv2 = (vec($head, 0, 8) & 0b00100000) == 0b00100000 ? 1 : 0; my $rsv3 = (vec($head, 0, 8) & 0b00010000) == 0b00010000 ? 1 : 0; # Opcode my $op = vec($head, 0, 8) & 0b00001111; warn "-- Parsing frame ($fin, $rsv1, $rsv2, $rsv3, $op)\n" if DEBUG; # Small payload my $len = vec($head, 1, 8) & 0b01111111; my $hlen = 2; if ($len < 126) { warn "-- Small payload ($len)\n" if DEBUG } # Extended payload (16bit) elsif ($len == 126) { return unless length $clone > 4; $hlen = 4; $len = unpack 'n', substr($clone, 2, 2); warn "-- Extended 16bit payload ($len)\n" if DEBUG; } # Extended payload (64bit with 32bit fallback) elsif ($len == 127) { return unless length $clone > 10; $hlen = 10; my $ext = substr $clone, 2, 8; $len = MODERN ? unpack('Q>', $ext) : unpack('N', substr($ext, 4, 4)); warn "-- Extended 64bit payload ($len)\n" if DEBUG; } # Check message size $self->finish and return if $len > $self->max_websocket_size; # Check if whole packet has arrived my $masked = vec($head, 1, 8) & 0b10000000; return if length $clone < ($len + $hlen + ($masked ? 4 : 0)); substr $clone, 0, $hlen, ''; # Payload $len += 4 if $masked; return if length $clone < $len; my $payload = $len ? substr($clone, 0, $len, '') : ''; # Unmask payload $payload = _xor_mask($payload, substr($payload, 0, 4, '')) if $masked; warn "$payload\n" if DEBUG; $$buffer = $clone; return [$fin, $rsv1, $rsv2, $rsv3, $op, $payload]; } sub remote_address { shift->handshake->remote_address } sub remote_port { shift->handshake->remote_port } sub req { shift->handshake->req } sub res { shift->handshake->res } sub resume { my $self = shift; $self->handshake->resume; return $self; } sub send { my ($self, $frame, $cb) = @_; # Binary or raw text if (ref $frame eq 'HASH') { $frame = exists $frame->{text} ? [1, 0, 0, 0, TEXT, $frame->{text}] : [1, 0, 0, 0, BINARY, $frame->{binary}]; } # Text elsif (!ref $frame) { $frame = [1, 0, 0, 0, TEXT, encode('UTF-8', $frame)] } # Prepare frame $self->once(drain => $cb) if $cb; $self->{write} .= $self->build_frame(@$frame); $self->{state} = 'write'; # Resume return $self->emit('resume'); } sub server_handshake { my $self = shift; # WebSocket handshake my $res_headers = $self->res->code(101)->headers; $res_headers->upgrade('websocket')->connection('Upgrade'); my $req_headers = $self->req->headers; ($req_headers->sec_websocket_protocol || '') =~ /^\s*([^,]+)/ and $res_headers->sec_websocket_protocol($1); $res_headers->sec_websocket_accept( $self->_challenge($req_headers->sec_websocket_key)); } sub server_read { my ($self, $chunk) = @_; # Parse frames $self->{read} .= $chunk // ''; while (my $frame = $self->parse_frame(\$self->{read})) { $self->emit(frame => $frame); } # Resume $self->emit('resume'); } sub server_write { my $self = shift; # Drain unless (length($self->{write} // '')) { $self->{state} = $self->{finished} ? 'finished' : 'read'; $self->emit('drain'); } # Empty buffer return delete $self->{write} // ''; } sub _challenge { b64_encode(sha1_bytes((pop() || '') . GUID), '') } sub _message { my ($self, $frame) = @_; # Assume continuation my $op = $frame->[4] || CONTINUATION; # Ping/Pong return $self->send([1, 0, 0, 0, PONG, $frame->[5]]) if $op == PING; return if $op == PONG; # Close return $self->finish if $op == CLOSE; # Append chunk and check message size $self->{op} = $op unless exists $self->{op}; $self->{message} .= $frame->[5]; $self->finish and last if length $self->{message} > $self->max_websocket_size; # No FIN bit (Continuation) return unless $frame->[0]; # Message my $message = delete $self->{message}; $message = decode 'UTF-8', $message if $message && delete $self->{op} == TEXT; $self->emit(message => $message); } sub _xor_mask { my ($input, $mask) = @_; # 512 byte mask $mask = $mask x 128; my $output = ''; $output .= $_ ^ $mask while length($_ = substr($input, 0, 512, '')) == 512; return $output .= $_ ^ substr($mask, 0, length, ''); } 1; =head1 NAME Mojo::Transaction::WebSocket - WebSocket transaction =head1 SYNOPSIS use Mojo::Transaction::WebSocket; # Send and receive WebSocket messages my $ws = Mojo::Transaction::WebSocket->new; $ws->send('Hello World!'); $ws->on(message => sub { my ($ws, $message) = @_; say "Message: $message"; }); =head1 DESCRIPTION L<Mojo::Transaction::WebSocket> is a container for WebSocket transactions as described in RFC 6455. Note that 64bit frames require a Perl with 64bit integer support, or they are limited to 32bit. =head1 EVENTS L<Mojo::Transaction::WebSocket> inherits all events from L<Mojo::Transaction> and can emit the following new ones. =head2 C<drain> $ws->on(drain => sub { my $ws = shift; ... }); Emitted once all data has been sent. $ws->on(drain => sub { my $ws = shift; $ws->send(time); }); =head2 C<frame> $ws->on(frame => sub { my ($ws, $frame) = @_; ... }); Emitted when a WebSocket frame has been received. $ws->unsubscribe('frame'); $ws->on(frame => sub { my ($ws, $frame) = @_; say "FIN: $frame->[0]"; say "RSV1: $frame->[1]"; say "RSV2: $frame->[2]"; say "RSV3: $frame->[3]"; say "Opcode: $frame->[4]"; say "Payload: $frame->[5]"; }); =head2 C<message> $ws->on(message => sub { my ($ws, $message) = @_; ... }); Emitted when a complete WebSocket message has been received. $ws->on(message => sub { my ($ws, $message) = @_; say "Message: $message"; }); =head1 ATTRIBUTES L<Mojo::Transaction::WebSocket> inherits all attributes from L<Mojo::Transaction> and implements the following new ones. =head2 C<handshake> my $handshake = $ws->handshake; $ws = $ws->handshake(Mojo::Transaction::HTTP->new); The original handshake transaction, defaults to a L<Mojo::Transaction::HTTP> object. =head2 C<masked> my $masked = $ws->masked; $ws = $ws->masked(1); Mask outgoing frames with XOR cipher and a random 32bit key. =head2 C<max_websocket_size> my $size = $ws->max_websocket_size; $ws = $ws->max_websocket_size(1024); Maximum WebSocket message size in bytes, defaults to the value of the C<MOJO_MAX_WEBSOCKET_SIZE> environment variable or C<262144>. =head1 METHODS L<Mojo::Transaction::WebSocket> inherits all methods from L<Mojo::Transaction> and implements the following new ones. =head2 C<new> my $multi = Mojo::Content::MultiPart->new; Construct a new L<Mojo::Transaction::WebSocket> object and subscribe to C<frame> event with default message parser, which also handles C<PING> and C<CLOSE> frames automatically. =head2 C<build_frame> my $bytes = $ws->build_frame($fin, $rsv1, $rsv2, $rsv3, $op, $payload); Build WebSocket frame. # Continuation frame with FIN bit and payload say $ws->build_frame(1, 0, 0, 0, 0, 'World!'); # Text frame with payload say $ws->build_frame(0, 0, 0, 0, 1, 'Hello'); # Binary frame with FIN bit and payload say $ws->build_frame(1, 0, 0, 0, 2, 'Hello World!'); # Close frame with FIN bit and without payload say $ws->build_frame(1, 0, 0, 0, 8, ''); # Ping frame with FIN bit and payload say $ws->build_frame(1, 0, 0, 0, 9, 'Test 123'); # Pong frame with FIN bit and payload say $ws->build_frame(1, 0, 0, 0, 10, 'Test 123'); =head2 C<client_challenge> my $success = $ws->client_challenge; Check WebSocket handshake challenge. =head2 C<client_handshake> $ws->client_handshake; WebSocket handshake. =head2 C<client_read> $ws->client_read($data); Read raw WebSocket data. =head2 C<client_write> my $chunk = $ws->client_write; Raw WebSocket data to write. =head2 C<connection> my $connection = $ws->connection; Connection identifier or socket. =head2 C<finish> $ws = $ws->finish; Finish the WebSocket connection gracefully. =head2 C<is_websocket> my $true = $ws->is_websocket; True. =head2 C<kept_alive> my $kept_alive = $ws->kept_alive; Connection has been kept alive. =head2 C<local_address> my $local_address = $ws->local_address; Local interface address. =head2 C<local_port> my $local_port = $ws->local_port; Local interface port. =head2 C<parse_frame> my $frame = $ws->parse_frame(\$bytes); Parse WebSocket frame. # Parse single frame and remove it from buffer my $frame = $ws->parse_frame(\$buffer); say "FIN: $frame->[0]"; say "RSV1: $frame->[1]"; say "RSV2: $frame->[2]"; say "RSV3: $frame->[3]"; say "Opcode: $frame->[4]"; say "Payload: $frame->[5]"; =head2 C<remote_address> my $remote_address = $ws->remote_address; Remote interface address. =head2 C<remote_port> my $remote_port = $ws->remote_port; Remote interface port. =head2 C<req> my $req = $ws->req; Handshake request, usually a L<Mojo::Message::Request> object. =head2 C<res> my $res = $ws->res; Handshake response, usually a L<Mojo::Message::Response> object. =head2 C<resume> $ws = $ws->resume; Resume C<handshake> transaction. =head2 C<send> $ws = $ws->send({binary => $bytes}); $ws = $ws->send({text => $bytes}); $ws = $ws->send([$fin, $rsv1, $rsv2, $rsv3, $op, $payload]); $ws = $ws->send('Hi there!'); $ws = $ws->send('Hi there!' => sub {...}); Send message or frame non-blocking via WebSocket, the optional drain callback will be invoked once all data has been written. # Send "Ping" frame $ws->send([1, 0, 0, 0, 9, 'Hello World!']); =head2 C<server_handshake> $ws->server_handshake; WebSocket handshake. =head2 C<server_read> $ws->server_read($data); Read raw WebSocket data. =head2 C<server_write> my $chunk = $ws->server_write; Raw WebSocket data to write. =head1 DEBUGGING You can set the C<MOJO_WEBSOCKET_DEBUG> environment variable to get some advanced diagnostics information printed to C<STDERR>. MOJO_WEBSOCKET_DEBUG=1 =head1 SEE ALSO L<Mojolicious>, L<Mojolicious::Guides>, L<http://mojolicio.us>. =cut