From Code to Community: Sponsoring The Perl and Raku Conference 2025 Learn more

use strict;
qw(const_name :frame_types :errors :settings :flags :states
:limits :endpoints);
use Protocol::HTTP2::HeaderCompression qw(headers_encode);
use Protocol::HTTP2::Trace qw(tracer);
# Mixin
our @ISA =
qw(Protocol::HTTP2::Frame Protocol::HTTP2::Stream Protocol::HTTP2::Upgrade);
# Default settings
my %default_settings = (
&SETTINGS_HEADER_TABLE_SIZE => DEFAULT_HEADER_TABLE_SIZE,
&SETTINGS_ENABLE_PUSH => DEFAULT_ENABLE_PUSH,
&SETTINGS_MAX_CONCURRENT_STREAMS => DEFAULT_MAX_CONCURRENT_STREAMS,
&SETTINGS_INITIAL_WINDOW_SIZE => DEFAULT_INITIAL_WINDOW_SIZE,
&SETTINGS_MAX_FRAME_SIZE => DEFAULT_MAX_FRAME_SIZE,
&SETTINGS_MAX_HEADER_LIST_SIZE => DEFAULT_MAX_HEADER_LIST_SIZE,
);
sub new {
my ( $class, $type, %opts ) = @_;
my $self = bless {
type => $type,
streams => {},
last_stream => $type == CLIENT ? 1 : 2,
last_peer_stream => 0,
active_peer_streams => 0,
encode_ctx => {
# HPACK. Header Table
header_table => [],
# HPACK. Header Table size
ht_size => 0,
max_ht_size => DEFAULT_HEADER_TABLE_SIZE,
settings => {%default_settings},
},
decode_ctx => {
# HPACK. Header Table
header_table => [],
# HPACK. Header Table size
ht_size => 0,
max_ht_size => DEFAULT_HEADER_TABLE_SIZE,
# HPACK. Emitted headers
emitted_headers => [],
# last frame
frame => {},
settings => {%default_settings},
},
# Current error
error => 0,
# Output frames queue
queue => [],
# Connection must be shutdown
shutdown => 0,
# issued GOAWAY: no new streams on this connection
goaway => 0,
# get preface
preface => 0,
# perform upgrade
upgrade => 0,
# flow control
fcw_send => DEFAULT_INITIAL_WINDOW_SIZE,
fcw_recv => DEFAULT_INITIAL_WINDOW_SIZE,
# stream where expected CONTINUATION frames
pending_stream => undef,
}, $class;
for (qw(on_change_state on_new_peer_stream on_error upgrade)) {
$self->{$_} = $opts{$_} if exists $opts{$_};
}
if ( exists $opts{settings} ) {
for ( keys %{ $opts{settings} } ) {
$self->{decode_ctx}->{settings}->{$_} = $opts{settings}{$_};
}
}
# Sync decode context max_ht_size
$self->{decode_ctx}->{max_ht_size} =
$self->{decode_ctx}->{settings}->{&SETTINGS_HEADER_TABLE_SIZE};
$self;
}
sub decode_context {
shift->{decode_ctx};
}
sub encode_context {
shift->{encode_ctx};
}
sub pending_stream {
shift->{pending_stream};
}
sub dequeue {
my $self = shift;
shift @{ $self->{queue} };
}
sub enqueue_raw {
my $self = shift;
push @{ $self->{queue} }, @_;
}
sub enqueue {
my $self = shift;
while ( my ( $type, $flags, $stream_id, $data_ref ) = splice( @_, 0, 4 ) ) {
push @{ $self->{queue} },
$self->frame_encode( $type, $flags, $stream_id, $data_ref );
$self->state_machine( 'send', $type, $flags, $stream_id );
}
}
sub enqueue_first {
my $self = shift;
my $i = 0;
for ( 0 .. $#{ $self->{queue} } ) {
my $type =
( $self->frame_header_decode( \$self->{queue}->[$_], 0 ) )[1];
last if $type != CONTINUATION && $type != PING;
$i++;
}
while ( my ( $type, $flags, $stream_id, $data_ref ) = splice( @_, 0, 4 ) ) {
splice @{ $self->{queue} }, $i++, 0,
$self->frame_encode( $type, $flags, $stream_id, $data_ref );
$self->state_machine( 'send', $type, $flags, $stream_id );
}
}
sub finish {
my $self = shift;
$self->enqueue( GOAWAY, 0, 0,
[ $self->{last_peer_stream}, $self->{error} ] )
unless $self->shutdown;
$self->shutdown(1);
}
sub shutdown {
my $self = shift;
$self->{shutdown} = shift if @_;
$self->{shutdown};
}
sub goaway {
my $self = shift;
$self->{goaway} = shift if @_;
$self->{goaway};
}
sub preface {
my $self = shift;
$self->{preface} = shift if @_;
$self->{preface};
}
sub upgrade {
my $self = shift;
$self->{upgrade} = shift if @_;
$self->{upgrade};
}
sub state_machine {
my ( $self, $act, $type, $flags, $stream_id ) = @_;
return
if $stream_id == 0
|| $type == SETTINGS
|| $type == GOAWAY
|| $self->upgrade
|| !$self->preface;
my $promised_sid = $self->stream_promised_sid($stream_id);
my $prev_state = $self->{streams}->{ $promised_sid || $stream_id }->{state};
# REFUSED_STREAM error
return if !defined $prev_state && $type == RST_STREAM && $act eq 'send';
# Direction server->client
my $srv2cln = ( $self->{type} == SERVER && $act eq 'send' )
|| ( $self->{type} == CLIENT && $act eq 'recv' );
# Direction client->server
my $cln2srv = ( $self->{type} == SERVER && $act eq 'recv' )
|| ( $self->{type} == CLIENT && $act eq 'send' );
# Do we expect CONTINUATION after this frame?
my $pending = ( $type == HEADERS || $type == PUSH_PROMISE )
&& !( $flags & END_HEADERS );
#tracer->debug(
# sprintf "\e[0;31mStream state: frame %s is %s%s on %s stream %i\e[m\n",
# const_name( "frame_types", $type ),
# $act,
# $pending ? "*" : "",
# const_name( "states", $prev_state ),
# $promised_sid || $stream_id,
#);
# Wait until all CONTINUATION frames arrive
if ( my $ps = $self->stream_pending_state($stream_id) ) {
if ( $type != CONTINUATION ) {
tracer->error(
sprintf "invalid frame type %s. Expected CONTINUATION frame\n",
const_name( "frame_types", $type )
);
$self->error(PROTOCOL_ERROR);
}
elsif ( $flags & END_HEADERS ) {
$self->stream_promised_sid( $stream_id, undef ) if $promised_sid;
$self->stream_pending_state( $promised_sid || $stream_id, undef );
$self->stream_state( $promised_sid || $stream_id, $ps );
}
}
# Unexpected CONTINUATION frame
elsif ( $type == CONTINUATION ) {
tracer->error("Unexpected CONTINUATION frame\n");
$self->error(PROTOCOL_ERROR);
}
# State machine
# IDLE
elsif ( $prev_state == IDLE ) {
if ( $type == HEADERS && $cln2srv ) {
$self->stream_state( $stream_id,
( $flags & END_STREAM ) ? HALF_CLOSED : OPEN, $pending );
}
elsif ( $type == PUSH_PROMISE && $srv2cln ) {
$self->stream_state( $promised_sid, RESERVED, $pending );
$self->stream_promised_sid( $stream_id, undef )
if $flags & END_HEADERS;
}
# first frame in stream is invalid, so state is yet IDLE
elsif ( $type == RST_STREAM && $act eq 'send' ) {
tracer->notice('send RST_STREAM on IDLE state. possible bug?');
$self->stream_state( $stream_id, CLOSED );
}
elsif ( $type != PRIORITY ) {
tracer->error(
sprintf "invalid frame type %s for current stream state %s\n",
const_name( "frame_types", $type ),
const_name( "states", $prev_state )
);
$self->error(PROTOCOL_ERROR);
}
}
# OPEN
elsif ( $prev_state == OPEN ) {
if ( ( $flags & END_STREAM )
&& ( $type == DATA || $type == HEADERS ) )
{
$self->stream_state( $stream_id, HALF_CLOSED, $pending );
}
elsif ( $type == RST_STREAM ) {
$self->stream_state( $stream_id, CLOSED );
}
elsif ($type == HEADERS
&& !$pending
&& $self->stream_trailer($stream_id) )
{
tracer->error("expected END_STREAM flag for trailer HEADERS frame");
$self->error(PROTOCOL_ERROR);
}
}
# RESERVED (local/remote)
elsif ( $prev_state == RESERVED ) {
if ( $type == RST_STREAM ) {
$self->stream_state( $stream_id, CLOSED );
}
elsif ( $type == HEADERS && $srv2cln ) {
$self->stream_state( $stream_id,
( $flags & END_STREAM ) ? CLOSED : HALF_CLOSED, $pending );
}
elsif ( $type != PRIORITY && $cln2srv ) {
tracer->error("invalid frame $type for state RESERVED");
$self->error(PROTOCOL_ERROR);
}
}
# HALF_CLOSED (local/remote)
elsif ( $prev_state == HALF_CLOSED ) {
if ( ( $type == RST_STREAM )
|| ( ( $flags & END_STREAM ) && $srv2cln ) )
{
$self->stream_state( $stream_id, CLOSED, $pending );
}
elsif ( ( !grep { $type == $_ } ( WINDOW_UPDATE, PRIORITY ) )
&& $cln2srv )
{
tracer->error( sprintf "invalid frame %s for state HALF CLOSED\n",
const_name( "frame_types", $type ) );
$self->error(PROTOCOL_ERROR);
}
}
# CLOSED
elsif ( $prev_state == CLOSED ) {
if ( $type != PRIORITY && ( $type != WINDOW_UPDATE && $cln2srv ) ) {
tracer->error("stream is closed\n");
$self->error(STREAM_CLOSED);
}
}
else {
tracer->error("oops!\n");
$self->error(INTERNAL_ERROR);
}
}
# TODO: move this to some other module
sub send_headers {
my ( $self, $stream_id, $headers, $end ) = @_;
my $max_size = $self->enc_setting(SETTINGS_MAX_FRAME_SIZE);
my $header_block = headers_encode( $self->encode_context, $headers );
my $flags = $end ? END_STREAM : 0;
$flags |= END_HEADERS if length($header_block) <= $max_size;
$self->enqueue( HEADERS, $flags, $stream_id,
{ hblock => \substr( $header_block, 0, $max_size, '' ) } );
while ( length($header_block) > 0 ) {
my $flags = length($header_block) <= $max_size ? 0 : END_HEADERS;
$self->enqueue( CONTINUATION, $flags,
$stream_id, \substr( $header_block, 0, $max_size, '' ) );
}
}
sub send_pp_headers {
my ( $self, $stream_id, $promised_id, $headers ) = @_;
my $max_size = $self->enc_setting(SETTINGS_MAX_FRAME_SIZE);
my $header_block = headers_encode( $self->encode_context, $headers );
my $flags = length($header_block) <= $max_size ? END_HEADERS : 0;
$self->enqueue( PUSH_PROMISE, $flags, $stream_id,
[ $promised_id, \substr( $header_block, 0, $max_size - 4, '' ) ] );
while ( length($header_block) > 0 ) {
my $flags = length($header_block) <= $max_size ? 0 : END_HEADERS;
$self->enqueue( CONTINUATION, $flags,
$stream_id, \substr( $header_block, 0, $max_size, '' ) );
}
}
sub send_data {
my ( $self, $stream_id, $chunk, $end ) = @_;
my $data = $self->stream_blocked_data($stream_id);
$data .= defined $chunk ? $chunk : '';
$self->stream_end( $stream_id, $end ) if defined $end;
$end = $self->stream_end($stream_id);
while (1) {
my $l = length($data);
my $size = $self->enc_setting(SETTINGS_MAX_FRAME_SIZE);
for ( $l, $self->fcw_send, $self->stream_fcw_send($stream_id) ) {
$size = $_ if $size > $_;
}
# Flow control
last if $l != 0 && $size <= 0;
$self->fcw_send( -$size );
$self->stream_fcw_send( $stream_id, -$size );
$self->enqueue(
DATA, $end && $l == $size ? END_STREAM : 0,
$stream_id, \substr( $data, 0, $size, '' )
);
last if $l == $size;
}
$self->stream_blocked_data( $stream_id, $data );
}
sub send_blocked {
my $self = shift;
for my $stream_id ( keys %{ $self->{streams} } ) {
$self->stream_send_blocked($stream_id);
}
}
sub error {
my $self = shift;
if ( @_ && !$self->{shutdown} ) {
$self->{error} = shift;
$self->{on_error}->( $self->{error} ) if exists $self->{on_error};
$self->finish;
}
$self->{error};
}
sub setting {
require Carp;
Carp::confess("setting is deprecated\n");
}
sub _setting {
my ( $ctx, $self, $setting ) = @_;
my $s = $self->{$ctx}->{settings};
return undef unless exists $s->{$setting};
$s->{$setting} = pop if @_ > 3;
$s->{$setting};
}
sub enc_setting {
_setting( 'encode_ctx', @_ );
}
sub dec_setting {
_setting( 'decode_ctx', @_ );
}
sub accept_settings {
my $self = shift;
$self->enqueue( SETTINGS, ACK, 0, {} );
}
# Flow control windown of connection
sub _fcw {
my $dir = shift;
my $self = shift;
if (@_) {
$self->{$dir} += shift;
tracer->debug( "$dir now is " . $self->{$dir} . "\n" );
}
$self->{$dir};
}
sub fcw_send {
_fcw( 'fcw_send', @_ );
}
sub fcw_recv {
_fcw( 'fcw_recv', @_ );
}
sub fcw_update {
my $self = shift;
# TODO: check size of data in memory
my $size = $self->dec_setting(SETTINGS_INITIAL_WINDOW_SIZE);
tracer->debug("update fcw recv of connection with $size b.\n");
$self->fcw_recv($size);
$self->enqueue( WINDOW_UPDATE, 0, 0, $size );
}
sub fcw_initial_change {
my ( $self, $size ) = @_;
my $prev_size = $self->enc_setting(SETTINGS_INITIAL_WINDOW_SIZE);
my $diff = $size - $prev_size;
tracer->debug(
"Change flow control window on not closed streams with diff $diff\n");
for my $stream_id ( keys %{ $self->{streams} } ) {
next if $self->stream_state($stream_id) == CLOSED;
$self->stream_fcw_send( $stream_id, $diff );
}
}
sub ack_ping {
my ( $self, $payload_ref ) = @_;
$self->enqueue_first( PING, ACK, 0, $payload_ref );
}
sub send_ping {
my ( $self, $payload ) = @_;
if ( !defined $payload ) {
$payload = pack "C*", map { rand(256) } 1 .. PING_PAYLOAD_SIZE;
}
elsif ( length($payload) != PING_PAYLOAD_SIZE ) {
$payload = sprintf "%*.*s",
-PING_PAYLOAD_SIZE(), PING_PAYLOAD_SIZE, $payload;
}
$self->enqueue( PING, 0, 0, \$payload );
}
1;