$IO::Storm::Component::VERSION
=
'0.16'
;
binmode
STDERR,
':utf8'
;
binmode
STDIN,
':encoding(UTF-8)'
;
binmode
STDOUT,
':utf8'
;
has
'_pending_commands'
=> (
is
=>
'rw'
,
default
=>
sub
{ [] },
);
has
'_pending_taskids'
=> (
is
=>
'rw'
,
default
=>
sub
{ [] },
);
has
'_stdin'
=> (
is
=>
'rw'
,
default
=>
sub
{
my
$io
= IO::Handle->new;
$io
->fdopen(
fileno
(STDIN),
'r'
);
}
);
has
'max_lines'
=> (
is
=>
'rw'
,
default
=> 100
);
has
'max_blank_msgs'
=> (
is
=>
'rw'
,
default
=> 500
);
has
'_json'
=> (
is
=>
'rw'
,
default
=>
sub
{ JSON::XS->new->allow_blessed->convert_blessed }
);
has
'_topology_name'
=> (
is
=>
'rw'
,
init_args
=>
undef
);
has
'_task_id'
=> (
is
=>
'rw'
,
init_args
=>
undef
);
has
'_component_name'
=> (
is
=>
'rw'
,
init_args
=>
undef
);
has
'_debug'
=> (
is
=>
'rw'
,
init_args
=>
undef
);
has
'_storm_conf'
=> (
is
=>
'rw'
,
init_args
=>
undef
);
has
'_context'
=> (
is
=>
'rw'
,
init_args
=>
undef
);
my
$logger
= Log::Log4perl->get_logger(
'storm'
);
sub
_setup_component {
my
(
$self
,
$storm_conf
,
$context
) =
@_
;
my
$conf_is_hash
=
ref
(
$storm_conf
) eq
ref
{};
$self
->_topology_name(
(
$conf_is_hash
&&
exists
(
$storm_conf
->{
'topology.name'
} ) )
?
$storm_conf
->{
'topology.name'
}
:
''
);
$self
->_task_id(
exists
(
$context
->{taskid} ) ?
$context
->{taskid} :
''
);
$self
->_component_name(
''
);
if
(
exists
(
$context
->{
'task->component'
} ) ) {
my
$task_component
=
$context
->{
'task->component'
};
if
(
exists
(
$task_component
->{
$self
->_task_id } ) ) {
$self
->_component_name(
$task_component
->{
$self
->_task_id } );
}
}
$self
->_debug(
(
$conf_is_hash
&&
exists
(
$storm_conf
->{
'topology.debug'
} ) )
?
$storm_conf
->{
'topology.debug'
}
: 0
);
$self
->_storm_conf(
$storm_conf
);
$self
->_context(
$context
);
}
sub
read_message {
$logger
->debug(
'start read_message'
);
my
$self
=
shift
;
my
$blank_lines
= 0;
my
$message_size
= 0;
my
$line
=
''
;
my
@messages
= ();
while
(1) {
$line
=
$self
->_stdin->getline;
if
(
defined
(
$line
) ) {
$logger
->debug(
"read_message: line=$line"
);
}
else
{
$logger
->error(
"Received EOF while trying to read stdin from "
.
"Storm, pipe appears to be broken, exiting."
);
exit
(1);
}
if
(
$line
eq
"end\n"
) {
last
;
}
elsif
(
$line
eq
''
) {
$logger
->error(
"Received EOF while trying to read stdin from "
.
"Storm, pipe appears to be broken, exiting."
);
exit
(1);
}
elsif
(
$line
eq
"\n"
) {
$blank_lines
++;
if
(
$blank_lines
% 1000 == 0 ) {
$logger
->
warn
(
"While trying to read a command or pending "
.
"task ID, Storm has instead sent $blank_lines "
.
"'\\n' messages."
);
next
;
}
}
chomp
(
$line
);
push
(
@messages
,
$line
);
}
return
$self
->_json->decode(
join
(
"\n"
,
@messages
) );
}
sub
read_task_ids {
my
$self
=
shift
;
if
(
scalar
( @{
$self
->_pending_taskids } ) ) {
return
shift
( @{
$self
->_pending_taskids } );
}
else
{
my
$msg
=
$self
->read_message();
while
(
ref
(
$msg
) ne
'ARRAY'
) {
push
( @{
$self
->_pending_commands },
$msg
);
$msg
=
$self
->read_message();
}
return
$msg
;
}
}
sub
read_command {
my
$self
=
shift
;
if
( @{
$self
->_pending_commands } ) {
return
shift
( @{
$self
->_pending_commands } );
}
else
{
my
$msg
=
$self
->read_message();
while
(
ref
(
$msg
) eq
'ARRAY'
) {
push
( @{
$self
->_pending_taskids },
$msg
);
$msg
=
$self
->read_message();
}
return
$msg
;
}
}
sub
read_tuple {
my
$self
=
shift
;
$logger
->debug(
'read_tuple'
);
my
$tupmap
=
$self
->read_command();
return
IO::Storm::Tuple->new(
id
=>
$tupmap
->{id},
component
=>
$tupmap
->{comp},
stream
=>
$tupmap
->{stream},
task
=>
$tupmap
->{task},
values
=>
$tupmap
->{tuple}
);
}
sub
read_handshake {
my
$self
=
shift
;
autoflush STDOUT 1;
autoflush STDERR 1;
my
$msg
=
$self
->read_message();
$logger
->debug(
sub
{
'Received initial handshake from Storm: '
. Dumper(
$msg
) } );
my
$pid
= $$;
my
$pid_dir
=
$msg
->{pidDir};
my
$filename
=
$pid_dir
.
'/'
.
$pid
;
open
my
$fh
,
'>'
,
$filename
or
die
"Cant't write to '$filename': $!\n"
;
$fh
->
close
;
$logger
->debug(
"Sending process ID $pid to Storm"
);
$self
->send_message( {
pid
=>
int
(
$pid
) } );
return
[
$msg
->{conf},
$msg
->{context} ];
}
sub
send_message {
my
(
$self
,
$msg
) =
@_
;
say
$self
->_json->encode(
$msg
);
say
"end"
;
}
sub
sync {
my
$self
=
shift
;
$self
->send_message( {
command
=>
'sync'
} );
}
sub
log
{
my
(
$self
,
$message
) =
@_
;
$self
->send_message( {
command
=>
'log'
,
msg
=>
$message
} );
}
1;