—package
POE::Filter::Zlib::Stream;
$POE::Filter::Zlib::Stream::VERSION
=
'2.04'
;
use
strict;
use
warnings;
use
Carp;
$VERSION
=
'2.02'
;
sub
new {
my
$type
=
shift
;
croak
"$type requires an even number of parameters"
if
@_
% 2;
my
$buffer
= {
@_
};
$buffer
->{
lc
$_
} =
delete
$buffer
->{
$_
}
for
keys
%{
$buffer
};
$buffer
->{BUFFER} =
''
;
delete
$buffer
->{deflateopts}
unless
ref
(
$buffer
->{deflateopts} ) eq
'HASH'
;
$buffer
->{d} = Compress::Raw::Zlib::Deflate->new( %{
$buffer
->{deflateopts} } );
unless
(
$buffer
->{d} ) {
warn
"Failed to create deflate stream\n"
;
return
;
}
delete
$buffer
->{inflateopts}
unless
ref
(
$buffer
->{inflateopts} ) eq
'HASH'
;
$buffer
->{i} = Compress::Raw::Zlib::Inflate->new ( %{
$buffer
->{inflateopts} } );
unless
(
$buffer
->{i} ) {
warn
"Failed to create inflate stream\n"
;
return
;
}
if
(not
defined
$buffer
->{flushtype}) {
$buffer
->{flushtype} = Z_SYNC_FLUSH;
}
return
bless
$buffer
,
$type
;
}
# use inherited get() from POE::Filter
sub
get_one_start {
my
(
$self
,
$raw_lines
) =
@_
;
$self
->{BUFFER} .=
join
''
, @{
$raw_lines
};
}
sub
get_one {
my
$self
=
shift
;
return
[ ]
unless
length
$self
->{BUFFER};
my
(
$status
,
$out
);
$status
=
$self
->{i}->inflate( \
$self
->{BUFFER},
$out
);
unless
(
$status
== Z_OK or
$status
== Z_STREAM_END ) {
warn
"Couldn\'t inflate buffer\n"
;
return
[ ];
}
if
(
$status
== Z_STREAM_END) {
$self
->{i} = Compress::Raw::Zlib::Inflate->new ( %{
$self
->{inflateopts} } );
}
return
[
$out
];
}
sub
get_pending {
my
$self
=
shift
;
return
$self
->{BUFFER} ? [
$self
->{BUFFER} ] :
undef
;
}
sub
put {
my
(
$self
,
$events
) =
@_
;
my
$raw_lines
= [];
foreach
my
$event
(
@$events
) {
my
(
$dstat
,
$dout
);
$dstat
=
$self
->{d}->deflate(
$event
,
$dout
);
unless
(
$dstat
== Z_OK ) {
warn
"(data) Couldn\'t deflate: $event\n($dstat)"
;
next
;
}
my
(
$fout
,
$fstat
);
$fstat
=
$self
->{d}->flush(
$fout
,
$self
->{flushtype} );
unless
(
$fstat
== Z_OK ) {
warn
"(flush) Couldn\'t flush/deflate: $event\n"
;
next
;
}
if
(
$self
->{flushtype} == Z_FINISH) {
$self
->{d} = Compress::Raw::Zlib::Deflate->new ( %{
$self
->{deflateopts} } );
}
push
@$raw_lines
,
$dout
.
$fout
;
}
return
$raw_lines
;
}
sub
clone {
my
$self
=
shift
;
my
$nself
= { };
$nself
->{
$_
} =
$self
->{
$_
}
for
keys
%{
$self
};
$nself
->{d} = Compress::Raw::Zlib::Deflate->new( %{
$nself
->{deflateopts} } );
$nself
->{i} = Compress::Raw::Zlib::Inflate->new( %{
$nself
->{inflateopts} } );
$nself
->{BUFFER} =
''
;
return
bless
$nself
,
ref
$self
;
}
1;
__END__
=pod
=encoding UTF-8
=head1 NAME
POE::Filter::Zlib::Stream
=head1 VERSION
version 2.04
=head1 SYNOPSIS
use POE::Filter::Zlib::Stream;
my $filter = POE::Filter::Zlib::Stream->new( deflateopts => { -Level => 9 } );
my $scalar = 'Blah Blah Blah';
my $compressed_array = $filter->put( [ $scalar ] );
my $uncompressed_array = $filter->get( $compressed_array );
use POE qw(Filter::Stackable Filter::Line Filter::Zlib::Stream);
my ($filter) = POE::Filter::Stackable->new();
$filter->push( POE::Filter::Zlib::Stream->new(),
POE::Filter::Line->new( InputRegexp => '\015?\012', OutputLiteral => "\015\012" ),
=head1 DESCRIPTION
POE::Filter::Zlib::Stream provides a POE filter for performing compression/uncompression using L<Compress::Zlib>. It is
suitable for use with L<POE::Filter::Stackable>.
Unlike L<POE::Filter::Zlib> this filter uses deflate and inflate, not the higher level compress and uncompress.
Ideal for streaming compressed data over sockets.
=head1 NAME
POE::Filter::Zlib::Stream - A POE filter wrapped around Compress::Zlib deflate and inflate.
=head1 CONSTRUCTOR
=over
=item C<new>
Creates a new POE::Filter::Zlib::Stream object. Takes some optional arguments:
=over 4
=item "deflateopts"
a hashref of options to be passed to deflateInit();
=item "inflateopts"
a hashref of options to be passed to inflateInit();
=item "flushtype"
The type of flush to use when flushing the compressed data. Defaults to
Z_SYNC_FLUSH so you get a single stream, but if there is a
L<POE::Filter::Zlib> on the other end, you want to set this to Z_FINISH.
=back
Consult L<Compress::Zlib> for more detail regarding these options.
=back
=head1 METHODS
=over
=item C<get>
=item C<get_one_start>
=item C<get_one>
Takes an arrayref which is contains streams of compressed input. Returns an arrayref of uncompressed streams.
=item C<get_pending>
Returns any data in a filter's input buffer. The filter's input buffer is not cleared, however.
=item C<put>
Takes an arrayref containing streams of uncompressed output, returns an arrayref of compressed streams.
=item C<clone>
Makes a copy of the filter, and clears the copy's buffer.
=back
=head1 AUTHOR
Chris Williams <chris@bingosnet.co.uk>
Martijn van Beers <martijn@cpan.org>
=head1 LICENSE
Copyright E<copy> Chris Williams and Martijn van Beers.
This program is free software; you can redistribute it and/or modify it under the same terms as Perl itself.
=head1 SEE ALSO
L<POE::Filter>
L<Compress::Zlib>
L<POE::Filter::Stackable>
=head1 AUTHORS
=over 4
=item *
Chris Williams <chris@bingosnet.co.uk>
=item *
Martijn van Beers
=back
=head1 COPYRIGHT AND LICENSE
This software is copyright (c) 2017 by Chris Williams and Martijn van Beers.
This is free software; you can redistribute it and/or modify it under
the same terms as the Perl 5 programming language system itself.
=cut