our
$VERSION
=
'1.000081'
;
use
Errno
qw/EMFILE ENFILE/
;
parse_stdout_tap
parse_stderr_tap
}
;
<run_id <job_id <job_try <job_root <runner_pid
<done
-_ready_buffer
-_events_files -_events_buffer -_events_indexes -events_dir -_events_seen
-stderr_file -_stderr_buffer -_stderr_index -_stderr_cg -_stderr_state
-stdout_file -_stdout_buffer -_stdout_index -_stdout_cg -_stdout_state
-exit_file -_exit_done -_exit_buffer
-et_file -et_buffer -et_done
-pet_file -pet_buffer -pet_done
-last_stamp
-open_errors -open_error_seen
}
;
sub
init {
my
$self
=
shift
;
croak
"'run_id' is a required attribute"
unless
$self
->{+RUN_ID};
croak
"'job_id' is a required attribute"
unless
$self
->{+JOB_ID};
croak
"'job_root' is a required attribute"
unless
$self
->{+JOB_ROOT};
$self
->{+_EVENTS_SEEN} = {};
$self
->{+_STDOUT_BUFFER} ||= [];
$self
->{+_STDERR_BUFFER} ||= [];
$self
->{+_EVENTS_BUFFER} ||= {};
$self
->{+_READY_BUFFER} ||= [];
$self
->{+LAST_STAMP} =
time
();
}
sub
poll {
my
$self
=
shift
;
my
(
$max
) =
@_
;
$self
->_fill_buffers(
$max
);
return
if
$self
->{+OPEN_ERRORS};
my
(
@out
,
@new
);
my
$check
=
defined
(
$max
)
?
sub
{
my
$want
=
$max
-
scalar
(
@out
) -
scalar
(
@new
);
return
undef
if
$want
< 1;
return
$want
;
}
:
sub
{ 1 };
while
(!
defined
(
$max
) ||
@out
<
$max
) {
push
@new
=>
$self
->_poll_streams(
$check
->() //
last
);
push
@new
=>
$self
->_poll_timeouts(
$check
->() //
last
)
if
$self
->{+ET_BUFFER} ||
$self
->{+PET_BUFFER};
push
@new
=>
$self
->_poll_exit(
$check
->() //
last
)
if
!
@new
&&
defined
$self
->{+_EXIT_BUFFER};
last
unless
@new
;
push
@out
=>
@new
;
@new
= ();
}
return
map
{
my
$stamp
=
$_
->{stamp} ?
$self
->{+LAST_STAMP} =
$_
->{stamp} :
$self
->{+LAST_STAMP};
Test2::Harness::Event->new(
stamp
=>
$stamp
, %{
$_
});
}
@out
;
}
sub
_poll_streams {
my
$self
=
shift
;
my
(
$max
) =
@_
;
my
$ready
=
$self
->{+_READY_BUFFER};
return
splice
(
@$ready
, 0,
$max
)
unless
@$ready
<
$max
;
my
$stdout
=
$self
->{+_STDOUT_BUFFER};
my
$stdout_cg
=
$self
->{+_STDOUT_CG} ||= [];
my
$stdout_params
= {
buffer
=>
$stdout
,
comment_group
=>
$stdout_cg
,
tag
=>
'STDOUT'
,
debug
=> 0,
parser
=> \
&parse_stdout_tap
,
max
=>
$max
,
};
my
$stderr
=
$self
->{+_STDERR_BUFFER};
my
$stderr_cg
=
$self
->{+_STDERR_CG} ||= [];
my
$stderr_params
= {
buffer
=>
$stderr
,
comment_group
=>
$stderr_cg
,
tag
=>
'STDERR'
,
debug
=> 1,
parser
=> \
&parse_stderr_tap
,
max
=>
$max
,
};
my
$out_event
=
$self
->_poll_stream(
$stdout_params
);
my
$err_event
=
$self
->_poll_stream(
$stderr_params
);
if
(
$out_event
&&
$err_event
) {
$self
->_poll_streams_ready_buffer_event(
$stdout
);
$self
->_poll_streams_ready_buffer_event(
$stderr
);
}
if
(
$self
->{+_EXIT_DONE} && (!
$max
||
@$ready
<
$max
)) {
$self
->_poll_stream_flush_group(
$stdout_params
)
if
@$stdout_cg
;
$self
->_poll_stream_flush_group(
$stderr_params
)
if
@$stderr_cg
;
$self
->_poll_streams_flush_events();
}
return
splice
(
@$ready
, 0,
$max
);
}
sub
_poll_streams_flush_events {
my
$self
=
shift
;
my
$buffers
=
$self
->{+_EVENTS_BUFFER};
for
my
$pid
(
keys
%$buffers
) {
for
my
$tid
(
keys
%{
$buffers
->{
$pid
}}) {
my
$buffer
=
$buffers
->{
$pid
}->{
$tid
} or
next
;
while
(
my
$e
=
shift
@$buffer
) {
$e
=
ref
(
$e
) ?
$e
: decode_json(
$e
);
push
@{
$self
->{+_READY_BUFFER}} =>
$self
->_process_events_line(
$e
);
}
}
}
}
sub
_poll_streams_ready_buffer_event {
my
$self
=
shift
;
my
(
$buffer
) =
@_
;
my
$set
=
shift
@$buffer
;
my
(
$pid
,
$tid
,
$sid
) =
@$set
;
my
$seen
=
$self
->{+_EVENTS_SEEN};
return
if
$seen
->{
$tid
}->{
$pid
}->{
$sid
};
my
$e
=
shift
@{
$self
->{+_EVENTS_BUFFER}->{
$pid
}->{
$tid
}} or
return
;
$seen
->{
$tid
}->{
$pid
}->{
$sid
} = 1;
$e
=
ref
(
$e
) ?
$e
: decode_json(
$e
);
die
"Stream error: Events skipped or recieved out of order ($e->{stream_id} != $sid)"
if
$e
->{stream_id} !=
$sid
;
push
@{
$self
->{+_READY_BUFFER}} =>
$self
->_process_events_line(
$e
);
}
sub
_poll_stream_add_event {
my
$self
=
shift
;
my
(
$line
,
$params
) =
@_
;
my
$parser
=
$params
->{parser};
my
$tag
=
$params
->{tag};
my
$debug
=
$params
->{debug};
my
$facet_data
=
$parser
->(
$line
);
$facet_data
||= {
info
=> [{
details
=>
$line
,
tag
=>
$tag
,
debug
=>
$debug
}]};
my
$event_id
=
$facet_data
->{about}->{uuid} ||= gen_uuid();
push
@{
$self
->{+_READY_BUFFER}} => {
facet_data
=>
$facet_data
,
event_id
=>
$event_id
,
job_id
=>
$self
->{+JOB_ID},
job_try
=>
$self
->{+JOB_TRY},
run_id
=>
$self
->{+RUN_ID},
};
}
sub
_poll_stream_flush_group {
my
$self
=
shift
;
my
(
$params
) =
@_
;
my
$comment_group
=
$params
->{comment_group};
return
unless
@$comment_group
;
shift
@$comment_group
;
my
$line
=
join
"\n"
=>
@$comment_group
;
$self
->_poll_stream_add_event(
$line
,
$params
);
@$comment_group
= ();
}
sub
_poll_stream_buffer_group {
my
$self
=
shift
;
my
(
$line
,
$params
) =
@_
;
return
undef
unless
$line
=~ m/^(\s*)
my
$indent
= $1;
my
$comment_group
=
$params
->{comment_group};
if
(
@$comment_group
&&
$comment_group
->[0] ne
$indent
) {
$self
->_poll_stream_flush_group(
$params
);
return
1;
}
else
{
push
@$comment_group
=>
$indent
;
}
push
@$comment_group
=>
$line
;
shift
@{
$params
->{buffer}};
return
0;
}
sub
_poll_stream {
my
$self
=
shift
;
my
(
$params
) =
@_
;
my
$max
=
$params
->{max};
my
$buff
=
$params
->{buffer};
my
$comment_group
=
$params
->{comment_group};
my
$added
= 0;
while
(
@$buff
&& (!
$max
||
$added
<
$max
)) {
my
$line
=
$buff
->[0];
return
1
if
ref
$line
;
chomp
(
$line
);
my
$esync
=
$self
->_poll_stream_process_harness_line(
$line
,
$params
);
return
1
if
$esync
;
my
$stat
=
$self
->_poll_stream_buffer_group(
$line
,
$params
);
if
(
defined
(
$stat
)) {
$added
+=
$stat
;
next
;
}
if
(
@$comment_group
) {
$self
->_poll_stream_flush_group(
$params
);
$added
++;
next
;
}
shift
@$buff
;
$self
->_poll_stream_add_event(
$line
,
$params
);
$added
++;
}
return
0;
}
sub
_poll_stream_process_harness_line {
my
$self
=
shift
;
my
(
$line
,
$params
) =
@_
;
my
$job_id
=
$self
->{+JOB_ID};
return
undef
unless
$line
=~ s/T2-HARNESS-\Q
$job_id
\E-(ESYNC|EVENT): (.+)//;
my
(
$type
,
$data
) = ($1, $2);
my
$esync
;
if
(
$type
eq
'ESYNC'
) {
$esync
= [
split
ipc_separator() =>
$data
];
}
elsif
(
$type
eq
'EVENT'
) {
my
$event_data
= decode_json(
$data
);
my
$pid
=
$event_data
->{pid};
my
$tid
=
$event_data
->{tid};
my
$sid
=
$event_data
->{stream_id};
push
@{
$self
->{+_EVENTS_BUFFER}->{
$pid
}->{
$tid
}} =>
$event_data
;
$esync
= [
$pid
,
$tid
,
$sid
];
}
else
{
die
"Unexpected harness type: $type"
;
}
my
$buff
=
$params
->{buffer};
$buff
->[0] =
$esync
;
$buff
->[1] =
defined
(
$buff
->[1]) ?
$line
.
$buff
->[1] :
$line
if
length
$line
;
$self
->_poll_stream_flush_group(
$params
);
return
$esync
;
}
my
%FILE_MAP
= (
'stdout'
=> [STDOUT_FILE, \
&open_file
],
'stderr'
=> [STDERR_FILE, \
&open_file
],
'exit'
=> [EXIT_FILE,
'Test2::Harness::Util::File::Value'
],
'event_timeout'
=> [ET_FILE,
'Test2::Harness::Util::File::Value'
],
'post_exit_timeout'
=> [PET_FILE,
'Test2::Harness::Util::File::Value'
],
);
sub
_open_file {
my
$self
=
shift
;
my
(
$file
) =
@_
;
my
$map
=
$FILE_MAP
{
$file
} or croak
"'$file' is not a known job file"
;
my
(
$key
,
$type
) =
@$map
;
return
$self
->{
$key
}
if
$self
->{
$key
};
my
$path
= File::Spec->catfile(
$self
->{+JOB_ROOT},
$file
);
my
$out
;
if
(
ref
$type
) {
return
undef
unless
-e
$path
;
return
$self
->{
$key
} =
$self
->try_open(
$path
=>
sub
{
$type
->(
$path
,
'<'
) });
}
return
$self
->{
$key
} =
$self
->try_open(
$path
=>
sub
{
$type
->new(
name
=>
$path
) });
}
sub
_fill_stream_buffers {
my
$self
=
shift
;
my
(
$max
) =
@_
;
my
$stdout_state
=
$self
->{+_STDOUT_STATE} //= {};
my
$stderr_state
=
$self
->{+_STDERR_STATE} //= {};
my
$stdout_buff
=
$self
->{+_STDOUT_BUFFER} ||= [];
my
$stderr_buff
=
$self
->{+_STDERR_BUFFER} ||= [];
my
$stdout_file
=
$self
->{+STDOUT_FILE} ||
$self
->_open_file(
'stdout'
);
my
$stderr_file
=
$self
->{+STDERR_FILE} ||
$self
->_open_file(
'stderr'
);
return
unless
$stdout_file
&&
$stderr_file
;
my
@sets
=
grep
{
defined
$_
->[0] } (
[
$stdout_file
,
$stdout_buff
,
'io'
,
'STDOUT'
,
$stdout_state
],
[
$stderr_file
,
$stderr_buff
,
'io'
,
'STDERR'
,
$stderr_state
],
);
return
unless
@sets
;
while
(1) {
my
$added
= 0;
my
@events_files
=
$self
->events_files();
for
my
$set
(
@events_files
,
@sets
) {
my
(
$file
,
$buff
,
$type
,
$name
,
$state
) =
@$set
;
next
if
$max
&&
@$buff
>
$max
;
my
$pos
=
tell
(
$file
);
my
$line
= <
$file
>;
if
(
defined
(
$line
) && (
$self
->{+_EXIT_DONE} ||
substr
(
$line
, -1) eq
"\n"
)) {
print
"\n"
if
$state
&&
delete
$state
->{
$pos
};
my
$job_id
=
$self
->{+JOB_ID};
if
(
$type
eq
'io'
&&
$line
=~ s/T2-HARNESS-\Q
$job_id
\E-ENCODING: (.+)\n$//) {
apply_encoding(
$file
, $1);
}
push
@$buff
=>
$line
if
length
(
$line
);
seek
(
$file
, 0, 1)
if
eof
(
$file
);
$added
++;
}
else
{
if
(
$name
&&
defined
(
$line
) &&
$ENV
{YATH_INTERACTIVE}) {
my
(
$fh
);
if
(
$name
eq
'STDOUT'
) {
$fh
= \
*STDOUT
;
}
elsif
(
$name
eq
'STDERR'
) {
$fh
= \
*STDERR
;
}
my
$len
=
length
(
$line
);
if
(
my
$check
=
$state
->{
$pos
}->{len}) {
if
(
$len
!=
$check
) {
delete
$state
->{
$pos
}->{done};
$line
=
substr
(
$line
,
$check
);
}
else
{
$line
=
"\n[INTERACTIVE] $line"
;
}
}
else
{
$line
=
"\n[INTERACTIVE] $line"
;
}
$state
->{
$pos
}->{len} =
$len
;
my
$stamp
=
$state
->{
$pos
}->{stamp} //=
time
;
my
$delta
=
time
-
$stamp
;
if
(
$delta
>= 1 && !
$state
->{
$pos
}->{done}) {
$fh
->autoflush(1);
$state
->{
$pos
}->{done} = 1;
print
$fh
$line
;
}
}
seek
(
$file
,
$pos
, 0);
}
}
last
unless
$added
;
}
}
sub
events_files {
my
$self
=
shift
;
my
$buff
=
$self
->{+_EVENTS_BUFFER} ||= {};
my
$files
=
$self
->{+_EVENTS_FILES} ||= {};
my
$dir
= File::Spec->catdir(
$self
->{+JOB_ROOT},
'events'
);
return
unless
-d
$dir
;
my
$dh
;
if
(
$self
->try_open(
$dir
=>
sub
{
opendir
(
$dh
,
$dir
) or
die
$! })) {
for
my
$file
(
readdir
(
$dh
)) {
next
unless
'.jsonl'
eq
substr
(
$file
, -6);
next
if
$files
->{
$file
};
my
$path
= File::Spec->catfile(
$dir
,
$file
);
next
if
$files
->{
$file
};
my
$fh
=
$self
->try_open(
$path
=>
sub
{ [
split
(ipc_separator() =>
substr
(
substr
(
$file
, 6 +
length
(ipc_separator())), 0, -6)),
open_file(
$path
,
'<'
),
] }
);
$files
->{
$file
} =
$fh
if
$fh
;
}
}
return
map
{ [
$_
->[2] =>
$buff
->{
$_
->[0]}->{
$_
->[1]} ||= [],
'jsonl'
] }
values
%$files
;
}
sub
try_open {
my
$self
=
shift
;
my
(
$path
,
$callback
) =
@_
;
local
($@, $?, $!);
my
$out
;
my
$ok
=
eval
{
$out
=
$callback
->();
1;
};
my
$errno
= $!;
my
$err
= $@;
return
$out
if
$ok
;
die
$@
unless
$errno
== ENFILE ||
$errno
== EMFILE;
$self
->{+OPEN_ERRORS}++;
warn
"Could not open '$path', this is NOT FATAL as yath will try again. Errno is '$errno', Exception was: $err"
unless
$self
->{+OPEN_ERROR_SEEN}->{
$path
}++;
return
undef
;
}
sub
_fill_buffers {
my
$self
=
shift
;
my
(
$max
) =
@_
;
return
unless
-d
$self
->{+JOB_ROOT};
$self
->{+OPEN_ERRORS} = 0;
$self
->_fill_stream_buffers(
$max
);
return
if
$self
->{+_EXIT_DONE} || @{
$self
->{+_STDOUT_BUFFER}} || @{
$self
->{+_STDERR_BUFFER}} || first {
@$_
}
map
{
values
%{
$_
} }
values
%{
$self
->{+_EVENTS_BUFFER}};
$self
->_open_file(
'event_timeout'
);
$self
->_open_file(
'post_exit_timeout'
);
my
$found_timeout
= 0;
for
my
$set
([ET_FILE, ET_BUFFER], [PET_FILE, PET_BUFFER]) {
my
(
$key
,
$buffer_key
) =
@$set
;
next
if
$self
->{
$buffer_key
};
next
unless
$self
->{
$key
} &&
$self
->{
$key
}->
exists
;
$self
->{
$buffer_key
} =
$self
->{
$key
}->read_line //
next
;
$found_timeout
++;
}
return
if
$found_timeout
;
return
if
$self
->{+OPEN_ERRORS};
my
$ended
= 0;
my
$runner_exited
=
$self
->{+RUNNER_PID} && !
kill
(0,
$self
->{+RUNNER_PID});
my
$exit_file
=
$self
->{+EXIT_FILE} ||
$self
->_open_file(
'exit'
) ||
return
;
return
if
$self
->{+OPEN_ERRORS};
if
(
$exit_file
->
exists
) {
my
$line
=
$exit_file
->read_line;
if
(
defined
(
$line
)) {
$self
->{+_EXIT_BUFFER} =
$line
;
$self
->{+_EXIT_DONE} = 1;
$ended
++;
}
}
elsif
(
$runner_exited
) {
$self
->{+_EXIT_BUFFER} =
'-1'
;
$self
->{+_EXIT_DONE} = 1;
$ended
++;
}
return
unless
$ended
;
$self
->_fill_stream_buffers();
}
sub
_poll_timeouts {
my
$self
=
shift
;
my
@out
;
if
(
defined
$self
->{+ET_BUFFER} && !
$self
->{+ET_DONE}++) {
push
@out
=>
$self
->_process_timeout_line(
'event'
=>
$self
->{+ET_BUFFER},
<<" EOT");
Test2::Harness checks for timeouts at a configurable interval, if a test does
not produce any output to stdout or stderr between intervals it will be
forcefully killed under the assumption it has hung. See the '--event-timeout'
option to configure the interval.
EOT
}
if
(
defined
$self
->{+PET_BUFFER} && !
$self
->{+PET_DONE}++) {
push
@out
=>
$self
->_process_timeout_line(
'post-exit'
=>
$self
->{+ET_BUFFER},
<<" EOT");
Sometimes tests will fork and then return. On supported systems Test2::Harness
will start all tests with their own process group and will wait for the entire
group to exit before considering the test done. In these cases Test2::Harness
will poll for output from the process group at a configurable interval, if no
output is produced between intervals the process group will be forcefully
killed. See the '--post-exit-timeout' option to configure the interval.
EOT
}
return
@out
;
}
sub
_poll_exit {
my
$self
=
shift
;
return
unless
defined
$self
->{+_EXIT_BUFFER};
my
$value
=
delete
$self
->{+_EXIT_BUFFER};
return
$self
->_process_exit_line(
$value
);
}
sub
_process_events_line {
my
$self
=
shift
;
my
(
$event_data
) =
@_
;
$event_data
->{job_id} =
$self
->{+JOB_ID};
$event_data
->{job_try} =
$self
->{+JOB_TRY};
$event_data
->{run_id} =
$self
->{+RUN_ID};
$event_data
->{event_id} ||=
$event_data
->{facet_data}->{about}->{uuid} ||= gen_uuid();
return
$event_data
;
}
sub
_process_exit_line {
my
$self
=
shift
;
my
(
$value
) =
@_
;
chomp
(
$value
);
my
$stdout
= maybe_read_file(File::Spec->catfile(
$self
->{+JOB_ROOT},
"stdout"
));
my
$stderr
= maybe_read_file(File::Spec->catfile(
$self
->{+JOB_ROOT},
"stderr"
));
$stdout
=~ s/T2-HARNESS-\S+-(?:ESYNC|EVENT): .+\n//g;
$stderr
=~ s/T2-HARNESS-\S+-(?:ESYNC|EVENT): .+\n//g;
my
$event_id
= gen_uuid();
my
(
$exit
,
$err
,
$sig
,
$dmp
,
$stamp
,
$retry
) = (
split
(/\s+/,
$value
),
''
,
''
,
''
,
''
,
''
,
''
);
$self
->{+DONE} = {
retry
=>
$retry
};
return
{
event_id
=>
$event_id
,
job_id
=>
$self
->{+JOB_ID},
job_try
=>
$self
->{+JOB_TRY},
run_id
=>
$self
->{+RUN_ID},
stamp
=>
$stamp
,
facet_data
=> {
about
=> {
uuid
=>
$event_id
},
harness_job_exit
=> {
details
=>
"Test script exited $exit ($err\:$sig)"
,
exit
=>
$exit
,
code
=>
$err
,
signal
=>
$sig
,
dumped
=>
$dmp
,
retry
=>
$retry
,
job_id
=>
$self
->{+JOB_ID},
job_try
=>
$self
->{+JOB_TRY},
stdout
=>
$stdout
,
stderr
=>
$stderr
,
stamp
=>
$stamp
,
line
=>
$value
,
},
}
};
}
sub
_process_timeout_line {
my
$self
=
shift
;
my
(
$type
,
$buffer
,
$reason
) =
@_
;
chomp
(
$buffer
//=
''
);
my
(
$stamp
,
$delta
) =
split
/\s+/,
$buffer
;
$stamp
//=
time
();
$delta
=
defined
(
$delta
) ?
sprintf
(
'%.4f'
,
$delta
) :
'??'
;
my
$event_id
= gen_uuid();
return
{
event_id
=>
$event_id
,
job_id
=>
$self
->{+JOB_ID},
job_try
=>
$self
->{+JOB_TRY},
run_id
=>
$self
->{+RUN_ID},
stamp
=>
$stamp
,
facet_data
=> {
about
=> {
uuid
=>
$event_id
,
details
=>
"Timeout ($type)"
},
errors
=> [
{
tag
=>
'TIMEOUT'
,
details
=>
"A timeout ($type) has occured (after $delta seconds), job was forcefully killed"
,
fail
=> 1,
},
],
info
=> [
{
tag
=>
'TIMEOUT'
,
debug
=> 1,
important
=> 1,
details
=>
$reason
,
},
],
}
};
}
1;