—package
Mojo::IOLoop::Stream;
use
Mojo::IOLoop;
use
Mojo::Util;
has
high_water_mark
=> 1048576;
has
reactor
=>
sub
{ Mojo::IOLoop->singleton->reactor },
weak
=> 1;
sub
DESTROY {
shift
->
close
unless
${^GLOBAL_PHASE} eq
'DESTRUCT'
}
sub
bytes_read {
shift
->{
read
} || 0 }
sub
bytes_waiting {
length
(
shift
->{buffer} //
''
) }
sub
bytes_written {
shift
->{written} || 0 }
sub
can_write {
$_
[0]{handle} &&
$_
[0]->bytes_waiting <
$_
[0]->high_water_mark }
sub
close
{
my
$self
=
shift
;
return
unless
my
$reactor
=
$self
->reactor;
return
unless
my
$handle
=
delete
$self
->timeout(0)->{handle};
$reactor
->remove(
$handle
);
$self
->emit(
'close'
);
}
sub
close_gracefully {
$_
[0]->is_writing ?
$_
[0]{graceful}++ :
$_
[0]->
close
}
sub
handle {
shift
->{handle} }
sub
is_readable {
my
$self
=
shift
;
$self
->_again;
return
$self
->{handle} && Mojo::Util::_readable(0,
fileno
$self
->{handle});
}
sub
is_writing {
my
$self
=
shift
;
return
undef
unless
$self
->{handle};
return
!!
length
(
$self
->{buffer}) ||
$self
->has_subscribers(
'drain'
);
}
sub
new {
shift
->SUPER::new(
handle
=>
shift
,
timeout
=> 15) }
sub
start {
my
$self
=
shift
;
# Resume
return
unless
$self
->{handle};
my
$reactor
=
$self
->reactor;
return
$reactor
->watch(
$self
->{handle}, 1,
$self
->is_writing)
if
delete
$self
->{paused};
weaken
$self
;
my
$cb
=
sub
{
pop
() ?
$self
->_write :
$self
->_read };
$reactor
->io(
$self
->timeout(
$self
->{timeout})->{handle} =>
$cb
);
}
sub
steal_handle {
my
$self
=
shift
;
$self
->reactor->remove(
$self
->{handle});
return
delete
$self
->{handle};
}
sub
stop {
$_
[0]->reactor->watch(
$_
[0]{handle}, 0,
$_
[0]->is_writing)
if
$_
[0]{handle} && !
$_
[0]{paused}++ }
sub
timeout {
my
(
$self
,
$timeout
) =
@_
;
return
$self
->{timeout}
unless
defined
$timeout
;
$self
->{timeout} =
$timeout
;
my
$reactor
=
$self
->reactor;
if
(
$self
->{timer}) {
if
(!
$self
->{timeout}) {
$reactor
->remove(
delete
$self
->{timer}) }
else
{
$reactor
->again(
$self
->{timer},
$self
->{timeout}) }
}
elsif
(
$self
->{timeout}) {
weaken
$self
;
$self
->{timer}
=
$reactor
->timer(
$timeout
=>
sub
{
$self
and
delete
(
$self
->{timer}) and
$self
->emit(
'timeout'
)->
close
});
}
return
$self
;
}
sub
write
{
my
(
$self
,
$chunk
,
$cb
) =
@_
;
# IO::Socket::SSL will corrupt data with the wrong internal representation
utf8::downgrade
$chunk
;
$self
->{buffer} .=
$chunk
;
if
(
$cb
) {
$self
->once(
drain
=>
$cb
) }
elsif
(!
length
$self
->{buffer}) {
return
$self
}
$self
->reactor->watch(
$self
->{handle}, !
$self
->{paused}, 1)
if
$self
->{handle};
return
$self
;
}
sub
_again {
$_
[0]->reactor->again(
$_
[0]{timer})
if
$_
[0]{timer} }
sub
_read {
my
$self
=
shift
;
if
(
defined
(
my
$read
=
$self
->{handle}->
sysread
(
my
$buffer
, 131072, 0))) {
$self
->{
read
} +=
$read
;
return
$read
== 0 ?
$self
->
close
:
$self
->emit(
read
=>
$buffer
)->_again;
}
# Retry
return
undef
if
$! == EAGAIN || $! == EINTR || $! == EWOULDBLOCK;
# Closed (maybe real error)
$! == ECONNRESET ?
$self
->
close
:
$self
->emit(
error
=> $!)->
close
;
}
sub
_write {
my
$self
=
shift
;
# Handle errors only when reading (to avoid timing problems)
my
$handle
=
$self
->{handle};
if
(
length
$self
->{buffer}) {
return
undef
unless
defined
(
my
$written
=
$handle
->
syswrite
(
$self
->{buffer}));
$self
->{written} +=
$written
;
$self
->emit(
write
=>
substr
(
$self
->{buffer}, 0,
$written
,
''
))->_again;
}
# Clear the buffer to free the underlying SV* memory
undef
$self
->{buffer},
$self
->emit(
'drain'
)
unless
length
$self
->{buffer};
return
undef
if
$self
->is_writing;
return
$self
->
close
if
$self
->{graceful};
$self
->reactor->watch(
$handle
, !
$self
->{paused}, 0)
if
$self
->{handle};
}
1;
=encoding utf8
=head1 NAME
Mojo::IOLoop::Stream - Non-blocking I/O stream
=head1 SYNOPSIS
use Mojo::IOLoop::Stream;
# Create stream
my $stream = Mojo::IOLoop::Stream->new($handle);
$stream->on(read => sub ($stream, $bytes) {...});
$stream->on(close => sub ($stream) {...});
$stream->on(error => sub ($stream, $err) {...});
# Start and stop watching for new data
$stream->start;
$stream->stop;
# Start reactor if necessary
$stream->reactor->start unless $stream->reactor->is_running;
=head1 DESCRIPTION
L<Mojo::IOLoop::Stream> is a container for I/O streams used by L<Mojo::IOLoop>.
=head1 EVENTS
L<Mojo::IOLoop::Stream> inherits all events from L<Mojo::EventEmitter> and can emit the following new ones.
=head2 close
$stream->on(close => sub ($stream) {...});
Emitted if the stream gets closed.
=head2 drain
$stream->on(drain => sub ($stream) {...});
Emitted once all data has been written.
=head2 error
$stream->on(error => sub ($stream, $err) {...});
Emitted if an error occurs on the stream, fatal if unhandled.
=head2 read
$stream->on(read => sub ($stream, $bytes) {...});
Emitted if new data arrives on the stream.
=head2 timeout
$stream->on(timeout => sub ($stream) {...});
Emitted if the stream has been inactive for too long and will get closed automatically.
=head2 write
$stream->on(write => sub ($stream, $bytes) {...});
Emitted if new data has been written to the stream.
=head1 ATTRIBUTES
L<Mojo::IOLoop::Stream> implements the following attributes.
=head2 high_water_mark
my $size = $msg->high_water_mark;
$msg = $msg->high_water_mark(1024);
Maximum size of L</"write"> buffer in bytes before L</"can_write"> returns false, defaults to C<1048576> (1MiB).
=head2 reactor
my $reactor = $stream->reactor;
$stream = $stream->reactor(Mojo::Reactor::Poll->new);
Low-level event reactor, defaults to the C<reactor> attribute value of the global L<Mojo::IOLoop> singleton. Note that
this attribute is weakened.
=head1 METHODS
L<Mojo::IOLoop::Stream> inherits all methods from L<Mojo::EventEmitter> and implements the following new ones.
=head2 bytes_read
my $num = $stream->bytes_read;
Number of bytes received.
=head2 bytes_waiting
my $num = $stream->bytes_waiting;
Number of bytes that have been enqueued with L</"write"> and are waiting to be written.
=head2 bytes_written
my $num = $stream->bytes_written;
Number of bytes written.
=head2 can_write
my $bool = $stream->can_write;
Returns true if calling L</"write"> is safe.
=head2 close
$stream->close;
Close stream immediately.
=head2 close_gracefully
$stream->close_gracefully;
Close stream gracefully.
=head2 handle
my $handle = $stream->handle;
Get handle for stream, usually an L<IO::Socket::IP> or L<IO::Socket::SSL> object.
=head2 is_readable
my $bool = $stream->is_readable;
Quick non-blocking check if stream is readable, useful for identifying tainted sockets.
=head2 is_writing
my $bool = $stream->is_writing;
Check if stream is writing.
=head2 new
my $stream = Mojo::IOLoop::Stream->new($handle);
Construct a new L<Mojo::IOLoop::Stream> object.
=head2 start
$stream->start;
Start or resume watching for new data on the stream.
=head2 steal_handle
my $handle = $stream->steal_handle;
Steal L</"handle"> and prevent it from getting closed automatically.
=head2 stop
$stream->stop;
Stop watching for new data on the stream.
=head2 timeout
my $timeout = $stream->timeout;
$stream = $stream->timeout(45);
Maximum amount of time in seconds stream can be inactive before getting closed automatically, defaults to C<15>.
Setting the value to C<0> will allow this stream to be inactive indefinitely.
=head2 write
$stream = $stream->write($bytes);
$stream = $stream->write($bytes => sub {...});
Enqueue data to be written to the stream as soon as possible, the optional drain callback will be executed once all
data has been written.
=head1 SEE ALSO
L<Mojolicious>, L<Mojolicious::Guides>, L<https://mojolicious.org>.
=cut