# ABSTRACT: The base class for Bolts and Spouts.
$IO::Storm::Component::VERSION = '0.16';
# Imports
use strict;
use v5.10;
use IO::Handle qw(autoflush);
use Log::Log4perl qw(:easy);
# Setup Moo for object-oriented niceties
use Moo;
binmode STDERR, ':utf8';
binmode STDIN, ':encoding(UTF-8)';
binmode STDOUT, ':utf8';
has '_pending_commands' => (
is => 'rw',
default => sub { [] },
has '_pending_taskids' => (
is => 'rw',
default => sub { [] },
has '_stdin' => (
is => 'rw',
default => sub {
my $io = IO::Handle->new;
$io->fdopen( fileno(STDIN), 'r' );
has 'max_lines' => (
is => 'rw',
default => 100
has 'max_blank_msgs' => (
is => 'rw',
default => 500
has '_json' => (
is => 'rw',
default => sub { JSON::XS->new->allow_blessed->convert_blessed }
has '_topology_name' => (
is => 'rw',
init_args => undef
has '_task_id' => (
is => 'rw',
init_args => undef
has '_component_name' => (
is => 'rw',
init_args => undef
has '_debug' => (
is => 'rw',
init_args => undef
has '_storm_conf' => (
is => 'rw',
init_args => undef
has '_context' => (
is => 'rw',
init_args => undef
my $logger = Log::Log4perl->get_logger('storm');
sub _setup_component {
my ( $self, $storm_conf, $context ) = @_;
my $conf_is_hash = ref($storm_conf) eq ref {};
( $conf_is_hash && exists( $storm_conf->{'topology.name'} ) )
? $storm_conf->{'topology.name'}
: ''
$self->_task_id( exists( $context->{taskid} ) ? $context->{taskid} : '' );
if ( exists( $context->{'task->component'} ) ) {
my $task_component = $context->{'task->component'};
if ( exists( $task_component->{ $self->_task_id } ) ) {
$self->_component_name( $task_component->{ $self->_task_id } );
( $conf_is_hash && exists( $storm_conf->{'topology.debug'} ) )
? $storm_conf->{'topology.debug'}
: 0
sub read_message {
$logger->debug('start read_message');
my $self = shift;
my $blank_lines = 0;
my $message_size = 0;
my $line = '';
my @messages = ();
while (1) {
$line = $self->_stdin->getline;
if ( defined($line) ) {
$logger->debug("read_message: line=$line");
else {
$logger->error( "Received EOF while trying to read stdin from "
. "Storm, pipe appears to be broken, exiting." );
if ( $line eq "end\n" ) {
elsif ( $line eq '' ) {
$logger->error( "Received EOF while trying to read stdin from "
. "Storm, pipe appears to be broken, exiting." );
elsif ( $line eq "\n" ) {
if ( $blank_lines % 1000 == 0 ) {
$logger->warn( "While trying to read a command or pending "
. "task ID, Storm has instead sent $blank_lines "
. "'\\n' messages." );
push( @messages, $line );
return $self->_json->decode( join( "\n", @messages ) );
sub read_task_ids {
my $self = shift;
if ( scalar( @{ $self->_pending_taskids } ) ) {
return shift( @{ $self->_pending_taskids } );
else {
my $msg = $self->read_message();
while ( ref($msg) ne 'ARRAY' ) {
push( @{ $self->_pending_commands }, $msg );
$msg = $self->read_message();
return $msg;
sub read_command {
my $self = shift;
if ( @{ $self->_pending_commands } ) {
return shift( @{ $self->_pending_commands } );
else {
my $msg = $self->read_message();
while ( ref($msg) eq 'ARRAY' ) {
push( @{ $self->_pending_taskids }, $msg );
$msg = $self->read_message();
return $msg;
sub read_tuple {
my $self = shift;
my $tupmap = $self->read_command();
return IO::Storm::Tuple->new(
id => $tupmap->{id},
component => $tupmap->{comp},
stream => $tupmap->{stream},
task => $tupmap->{task},
values => $tupmap->{tuple}
sub read_handshake {
my $self = shift;
# TODO: Figure out how to redirect stdout to ensure that print
# statements/functions won't crash the Storm Java worker
autoflush STDOUT 1;
autoflush STDERR 1;
my $msg = $self->read_message();
sub { 'Received initial handshake from Storm: ' . Dumper($msg) } );
# Write a blank PID file out to the pidDir
my $pid = $$;
my $pid_dir = $msg->{pidDir};
my $filename = $pid_dir . '/' . $pid;
open my $fh, '>', $filename
or die "Cant't write to '$filename': $!\n";
$logger->debug("Sending process ID $pid to Storm");
$self->send_message( { pid => int($pid) } );
return [ $msg->{conf}, $msg->{context} ];
sub send_message {
my ( $self, $msg ) = @_;
say $self->_json->encode($msg);
say "end";
sub sync {
my $self = shift;
$self->send_message( { command => 'sync' } );
sub log {
my ( $self, $message ) = @_;
$self->send_message( { command => 'log', msg => $message } );
=head1 NAME
IO::Storm::Component - The base class for Bolts and Spouts.
=head1 VERSION
version 0.16
=head1 METHODS
=head2 _setup_component
Add helpful instance variables to component after initial handshake with Storm.
=head2 read_message
Read a message from the ShellBolt. Reads until it finds a "end" line.
=head2 read_tuple
Turn the incoming Tuple structure into an <IO::Storm::Tuple>.
=head2 read_handshake
Read and process an initial handshake message from Storm
=head2 send_message
Send a message to Storm, encoding it as JSON.
=head2 sync
Send a sync command to Storm.
=head2 log
Send a log command to Storm
=head1 AUTHORS
=over 4
=item *
Dan Blanchard <dblanchard@ets.org>
=item *
Cory G Watson <gphat@cpan.org>
This software is copyright (c) 2014 by Educational Testing Service.
This is free software; you can redistribute it and/or modify it under
the same terms as the Perl 5 programming language system itself.