From Code to Community: Sponsoring The Perl and Raku Conference 2025 Learn more

use strict;
our $VERSION = '1.000081';
use File::Spec();
use Errno qw/EMFILE ENFILE/;
use Carp qw/croak/;
use Time::HiRes qw/time/;
use List::Util qw/first/;
use Test2::Util qw/ipc_separator/;
use Test2::Harness::Util::UUID qw/gen_uuid/;
use Test2::Harness::Util::JSON qw/decode_json/;
use Test2::Harness::Util qw/maybe_read_file open_file apply_encoding/;
<run_id <job_id <job_try <job_root <runner_pid
-_events_files -_events_buffer -_events_indexes -events_dir -_events_seen
-stderr_file -_stderr_buffer -_stderr_index -_stderr_cg -_stderr_state
-stdout_file -_stdout_buffer -_stdout_index -_stdout_cg -_stdout_state
-exit_file -_exit_done -_exit_buffer
-et_file -et_buffer -et_done
-pet_file -pet_buffer -pet_done
-open_errors -open_error_seen
sub init {
my $self = shift;
croak "'run_id' is a required attribute"
unless $self->{+RUN_ID};
croak "'job_id' is a required attribute"
unless $self->{+JOB_ID};
croak "'job_root' is a required attribute"
unless $self->{+JOB_ROOT};
$self->{+_EVENTS_SEEN} = {};
$self->{+_STDOUT_BUFFER} ||= [];
$self->{+_STDERR_BUFFER} ||= [];
$self->{+_EVENTS_BUFFER} ||= {};
$self->{+_READY_BUFFER} ||= [];
$self->{+LAST_STAMP} = time();
sub poll {
my $self = shift;
my ($max) = @_;
return if $self->{+OPEN_ERRORS};
my (@out, @new);
# If we have a max number of events then we need to pass that along to the
# inner-pollers, but we need to pass around how many MORE we need, this sub
# will return the amount we still need.
# If this finds that we do not need any more it will exit the loop instead
# of returning a number.
my $check = defined($max)
? sub {
my $want = $max - scalar(@out) - scalar(@new);
return undef if $want < 1;
return $want;
: sub { 1 };
while (!defined($max) || @out < $max) {
push @new => $self->_poll_streams($check->() // last);
push @new => $self->_poll_timeouts($check->() // last) if $self->{+ET_BUFFER} || $self->{+PET_BUFFER};
# 'exit' MUST come last, so do not even think about grabbing
# them until @new is empty.
# Micro-optimization, 'exit' only ever has 1 thing, so do
# not enter the subs if we do not need to.
push @new => $self->_poll_exit($check->() // last) if !@new && defined $self->{+_EXIT_BUFFER};
# We need to check if the runner exited BEFORE trying to check the exit value.
last unless @new;
push @out => @new;
@new = ();
return map {
my $stamp = $_->{stamp} ? $self->{+LAST_STAMP} = $_->{stamp} : $self->{+LAST_STAMP};
Test2::Harness::Event->new(stamp => $stamp, %{$_});
} @out;
sub _poll_streams {
my $self = shift;
my ($max) = @_;
my $ready = $self->{+_READY_BUFFER};
return splice(@$ready, 0, $max) unless @$ready < $max;
my $stdout = $self->{+_STDOUT_BUFFER};
my $stdout_cg = $self->{+_STDOUT_CG} ||= [];
my $stdout_params = {
buffer => $stdout,
comment_group => $stdout_cg,
tag => 'STDOUT',
debug => 0,
parser => \&parse_stdout_tap,
max => $max,
my $stderr = $self->{+_STDERR_BUFFER};
my $stderr_cg = $self->{+_STDERR_CG} ||= [];
my $stderr_params = {
buffer => $stderr,
comment_group => $stderr_cg,
tag => 'STDERR',
debug => 1,
parser => \&parse_stderr_tap,
max => $max,
my $out_event = $self->_poll_stream($stdout_params);
my $err_event = $self->_poll_stream($stderr_params);
# Once both stderr and stdout are waiting for an event we should go ahead
# and stick the events into ready. More often than not both streams will be
# waiting for the same event, the read_buffer_event logic will avoid
# duplicates. We want to call it on both buffers because some IPC
# situations can result in both streams waiting for different events. Also
# we need the sync point removed from both buffers so things can continue.
# This is an intentional bottle-neck that keeps STDOUT, STDERR, and the
# Test2 events in sync so that stderr and stdout appear where they should
# (mostly) relative to the events. This is not perfect, but it is as close
# as we can get when recombining 3+ output streams.
if ($out_event && $err_event) {
if ($self->{+_EXIT_DONE} && (!$max || @$ready < $max)) {
# All done, flush the comment groups
$self->_poll_stream_flush_group($stdout_params) if @$stdout_cg;
$self->_poll_stream_flush_group($stderr_params) if @$stderr_cg;
return splice(@$ready, 0, $max);
sub _poll_streams_flush_events {
my $self = shift;
my $buffers = $self->{+_EVENTS_BUFFER};
for my $pid (keys %$buffers) {
for my $tid (keys %{$buffers->{$pid}}) {
my $buffer = $buffers->{$pid}->{$tid} or next;
while(my $e = shift @$buffer) {
$e = ref($e) ? $e : decode_json($e);
push @{$self->{+_READY_BUFFER}} => $self->_process_events_line($e);
sub _poll_streams_ready_buffer_event {
my $self = shift;
my ($buffer) = @_;
my $set = shift @$buffer;
my ($pid, $tid, $sid) = @$set;
my $seen = $self->{+_EVENTS_SEEN};
return if $seen->{$tid}->{$pid}->{$sid};
my $e = shift @{$self->{+_EVENTS_BUFFER}->{$pid}->{$tid}} or return;
$seen->{$tid}->{$pid}->{$sid} = 1;
$e = ref($e) ? $e : decode_json($e);
die "Stream error: Events skipped or recieved out of order ($e->{stream_id} != $sid)"
if $e->{stream_id} != $sid;
push @{$self->{+_READY_BUFFER}} => $self->_process_events_line($e);
sub _poll_stream_add_event {
my $self = shift;
my ($line, $params) = @_;
my $parser = $params->{parser};
my $tag = $params->{tag};
my $debug = $params->{debug};
my $facet_data = $parser->($line);
$facet_data ||= {info => [{details => $line, tag => $tag, debug => $debug}]};
my $event_id = $facet_data->{about}->{uuid} ||= gen_uuid();
push @{$self->{+_READY_BUFFER}} => {
facet_data => $facet_data,
event_id => $event_id,
job_id => $self->{+JOB_ID},
job_try => $self->{+JOB_TRY},
run_id => $self->{+RUN_ID},
sub _poll_stream_flush_group {
my $self = shift;
my ($params) = @_;
my $comment_group = $params->{comment_group};
return unless @$comment_group;
shift @$comment_group; # Remove the indentation state
my $line = join "\n" => @$comment_group;
$self->_poll_stream_add_event($line, $params);
@$comment_group = ();
sub _poll_stream_buffer_group {
my $self = shift;
my ($line, $params) = @_;
return undef unless $line =~ m/^(\s*)#/;
my $indent = $1;
my $comment_group = $params->{comment_group};
if (@$comment_group && $comment_group->[0] ne $indent) {
# If comment indentation has changed we do not want to append to the group
return 1;
else {
# Starting a new group
push @$comment_group => $indent;
push @$comment_group => $line;
shift @{$params->{buffer}};
return 0;
sub _poll_stream {
my $self = shift;
my ($params) = @_;
my $max = $params->{max};
my $buff = $params->{buffer};
my $comment_group = $params->{comment_group};
my $added = 0;
while (@$buff && (!$max || $added < $max)) {
my $line = $buff->[0];
# Already have an esync waiting
return 1 if ref $line;
my $esync = $self->_poll_stream_process_harness_line($line, $params);
return 1 if $esync;
# Put 'comment' lines together in a group, IE buffer this until we are done with comments
# get undef if there was no comment to buffer
# get 1 if we had to flush the buffer and start a new one
# get 0 if we did buffer the event, but no flush
my $stat = $self->_poll_stream_buffer_group($line, $params);
if (defined($stat)) {
$added += $stat;
# non-comment line, flush the comment group
if (@$comment_group) {
shift @$buff;
$self->_poll_stream_add_event($line, $params);
return 0;
sub _poll_stream_process_harness_line {
my $self = shift;
my ($line, $params) = @_;
my $job_id = $self->{+JOB_ID};
return undef unless $line =~ s/T2-HARNESS-\Q$job_id\E-(ESYNC|EVENT): (.+)//;
my ($type, $data) = ($1, $2);
my $esync;
if ($type eq 'ESYNC') {
$esync = [split ipc_separator() => $data];
elsif ($type eq 'EVENT') {
my $event_data = decode_json($data);
my $pid = $event_data->{pid};
my $tid = $event_data->{tid};
my $sid = $event_data->{stream_id};
push @{$self->{+_EVENTS_BUFFER}->{$pid}->{$tid}} => $event_data;
$esync = [$pid, $tid, $sid];
else {
die "Unexpected harness type: $type";
# This becomes the esync, anything leftover actually belongs to the
# next line.
my $buff = $params->{buffer};
$buff->[0] = $esync;
$buff->[1] = defined($buff->[1]) ? $line . $buff->[1] : $line if length $line;
# Flush any comment group already buffered, an event is a sane
# boundary, not above that partial comments that might be
# interrupted by the sync point will be part of the next group
return $esync;
my %FILE_MAP = (
'stdout' => [STDOUT_FILE, \&open_file],
'stderr' => [STDERR_FILE, \&open_file],
'exit' => [EXIT_FILE, 'Test2::Harness::Util::File::Value'],
'event_timeout' => [ET_FILE, 'Test2::Harness::Util::File::Value'],
'post_exit_timeout' => [PET_FILE, 'Test2::Harness::Util::File::Value'],
sub _open_file {
my $self = shift;
my ($file) = @_;
my $map = $FILE_MAP{$file} or croak "'$file' is not a known job file";
my ($key, $type) = @$map;
return $self->{$key} if $self->{$key};
my $path = File::Spec->catfile($self->{+JOB_ROOT}, $file);
my $out;
if (ref $type) {
return undef unless -e $path;
return $self->{$key} = $self->try_open($path => sub { $type->($path, '<') });
return $self->{$key} = $self->try_open($path => sub { $type->new(name => $path) });
sub _fill_stream_buffers {
my $self = shift;
my ($max) = @_;
my $stdout_state = $self->{+_STDOUT_STATE} //= {};
my $stderr_state = $self->{+_STDERR_STATE} //= {};
my $stdout_buff = $self->{+_STDOUT_BUFFER} ||= [];
my $stderr_buff = $self->{+_STDERR_BUFFER} ||= [];
my $stdout_file = $self->{+STDOUT_FILE} || $self->_open_file('stdout');
my $stderr_file = $self->{+STDERR_FILE} || $self->_open_file('stderr');
return unless $stdout_file && $stderr_file;
my @sets = grep { defined $_->[0] } (
[$stdout_file, $stdout_buff, 'io', 'STDOUT', $stdout_state],
[$stderr_file, $stderr_buff, 'io', 'STDERR', $stderr_state],
return unless @sets;
# Cache the result of the exists check on success, files can come into
# existence at any time though so continue to check if it fails.
while (1) {
my $added = 0;
my @events_files = $self->events_files();
for my $set (@events_files, @sets) {
my ($file, $buff, $type, $name, $state) = @$set;
next if $max && @$buff > $max;
my $pos = tell($file);
my $line = <$file>;
if (defined($line) && ($self->{+_EXIT_DONE} || substr($line, -1) eq "\n")) {
print "\n" if $state && delete $state->{$pos};
my $job_id = $self->{+JOB_ID};
if ($type eq 'io' && $line =~ s/T2-HARNESS-\Q$job_id\E-ENCODING: (.+)\n$//) {
apply_encoding($file, $1);
push @$buff => $line if length($line);
seek($file, 0, 1) if eof($file); # Reset EOF.
else {
if ($name && defined($line) && $ENV{YATH_INTERACTIVE}) {
my ($fh);
if ($name eq 'STDOUT') {
$fh = \*STDOUT;
elsif ($name eq 'STDERR') {
$fh = \*STDERR;
my $len = length($line);
if (my $check = $state->{$pos}->{len}) {
if ($len != $check) {
delete $state->{$pos}->{done};
$line = substr($line, $check);
else {
$line = "\n[INTERACTIVE] $line";
else {
$line = "\n[INTERACTIVE] $line";
$state->{$pos}->{len} = $len;
my $stamp = $state->{$pos}->{stamp} //= time;
my $delta = time - $stamp;
if($delta >= 1 && !$state->{$pos}->{done}) {
$state->{$pos}->{done} = 1;
print $fh $line;
seek($file, $pos, 0);
last unless $added;
sub events_files {
my $self = shift;
my $buff = $self->{+_EVENTS_BUFFER} ||= {};
my $files = $self->{+_EVENTS_FILES} ||= {};
my $dir = File::Spec->catdir($self->{+JOB_ROOT}, 'events');
return unless -d $dir;
my $dh;
if ($self->try_open($dir => sub { opendir($dh, $dir) or die $! })) {
for my $file (readdir($dh)) {
next unless '.jsonl' eq substr($file, -6);
next if $files->{$file};
my $path = File::Spec->catfile($dir, $file);
next if $files->{$file};
my $fh = $self->try_open(
$path => sub { [
split(ipc_separator() => substr(substr($file, 6 + length(ipc_separator())), 0, -6)),
open_file($path, '<'),
] }
$files->{$file} = $fh if $fh;
return map { [$_->[2] => $buff->{$_->[0]}->{$_->[1]} ||= [], 'jsonl'] } values %$files;
sub try_open {
my $self = shift;
my ($path, $callback) = @_;
local ($@, $?, $!);
my $out;
my $ok = eval {
$out = $callback->();
my $errno = $!;
my $err = $@;
return $out if $ok;
die $@ unless $errno == ENFILE || $errno == EMFILE;
warn "Could not open '$path', this is NOT FATAL as yath will try again. Errno is '$errno', Exception was: $err"
unless $self->{+OPEN_ERROR_SEEN}->{$path}++;
return undef;
sub _fill_buffers {
my $self = shift;
my ($max) = @_;
# NOTE 1: 'max' will only effect stdout, stderr, and events.jsonl, the
# other files only have 1 value each so they will not eat too much memory.
# NOTE 2: 'max' only effects how many items are ADDED to the buffer, not
# how many are in the buffer, that is good enough, poll() will take care of
# the actual event limiting. We only use this here to make sure the buffer
# grows slowly, this is important if max is used to avoid eating memory. We
# still need to add to the buffers each time though in case we are waiting
# for a sync event before we flush.
# Wait for the directory
return unless -d $self->{+JOB_ROOT};
$self->{+OPEN_ERRORS} = 0;
# Do not look for exit until we are done with the other streams
return if $self->{+_EXIT_DONE} || @{$self->{+_STDOUT_BUFFER}} || @{$self->{+_STDERR_BUFFER}} || first { @$_ } map { values %{$_} } values %{$self->{+_EVENTS_BUFFER}};
my $found_timeout = 0;
for my $set ([ET_FILE, ET_BUFFER], [PET_FILE, PET_BUFFER]) {
my ($key, $buffer_key) = @$set;
next if $self->{$buffer_key};
next unless $self->{$key} && $self->{$key}->exists;
$self->{$buffer_key} = $self->{$key}->read_line // next;
return if $found_timeout;
return if $self->{+OPEN_ERRORS};
my $ended = 0;
# We need to check if the runner exited BEFORE trying to check the exit value.
my $runner_exited = $self->{+RUNNER_PID} && !kill(0, $self->{+RUNNER_PID});
my $exit_file = $self->{+EXIT_FILE} || $self->_open_file('exit') || return;
return if $self->{+OPEN_ERRORS};
if ($exit_file->exists) {
my $line = $exit_file->read_line;
if (defined($line)) {
$self->{+_EXIT_BUFFER} = $line;
$self->{+_EXIT_DONE} = 1;
elsif ($runner_exited) {
$self->{+_EXIT_BUFFER} = '-1';
$self->{+_EXIT_DONE} = 1;
return unless $ended;
# If we found exit we need one last buffer fill on the other sources.
# If we do not do this we have a race condition. Ignore the max for this.
sub _poll_timeouts {
my $self = shift;
my @out;
if (defined $self->{+ET_BUFFER} && !$self->{+ET_DONE}++) {
push @out => $self->_process_timeout_line('event' => $self->{+ET_BUFFER}, <<" EOT");
Test2::Harness checks for timeouts at a configurable interval, if a test does
not produce any output to stdout or stderr between intervals it will be
forcefully killed under the assumption it has hung. See the '--event-timeout'
option to configure the interval.
if (defined $self->{+PET_BUFFER} && !$self->{+PET_DONE}++) {
push @out => $self->_process_timeout_line('post-exit' => $self->{+ET_BUFFER}, <<" EOT");
Sometimes tests will fork and then return. On supported systems Test2::Harness
will start all tests with their own process group and will wait for the entire
group to exit before considering the test done. In these cases Test2::Harness
will poll for output from the process group at a configurable interval, if no
output is produced between intervals the process group will be forcefully
killed. See the '--post-exit-timeout' option to configure the interval.
return @out;
sub _poll_exit {
my $self = shift;
# Intentionally ignoring the max argument, this only ever returns 1 item,
# and would not be called if max was 0.
return unless defined $self->{+_EXIT_BUFFER};
my $value = delete $self->{+_EXIT_BUFFER};
return $self->_process_exit_line($value);
sub _process_events_line {
my $self = shift;
my ($event_data) = @_;
$event_data->{job_id} = $self->{+JOB_ID};
$event_data->{job_try} = $self->{+JOB_TRY};
$event_data->{run_id} = $self->{+RUN_ID};
$event_data->{event_id} ||= $event_data->{facet_data}->{about}->{uuid} ||= gen_uuid();
return $event_data;
sub _process_exit_line {
my $self = shift;
my ($value) = @_;
my $stdout = maybe_read_file(File::Spec->catfile($self->{+JOB_ROOT}, "stdout"));
my $stderr = maybe_read_file(File::Spec->catfile($self->{+JOB_ROOT}, "stderr"));
$stdout =~ s/T2-HARNESS-\S+-(?:ESYNC|EVENT): .+\n//g;
$stderr =~ s/T2-HARNESS-\S+-(?:ESYNC|EVENT): .+\n//g;
my $event_id = gen_uuid();
my ($exit, $err, $sig, $dmp, $stamp, $retry) = (split(/\s+/, $value), '', '', '', '', '', '');
$self->{+DONE} = {retry => $retry};
return {
event_id => $event_id,
job_id => $self->{+JOB_ID},
job_try => $self->{+JOB_TRY},
run_id => $self->{+RUN_ID},
stamp => $stamp,
facet_data => {
about => {uuid => $event_id},
harness_job_exit => {
details => "Test script exited $exit ($err\:$sig)",
exit => $exit,
code => $err,
signal => $sig,
dumped => $dmp,
retry => $retry,
job_id => $self->{+JOB_ID},
job_try => $self->{+JOB_TRY},
stdout => $stdout,
stderr => $stderr,
stamp => $stamp,
line => $value,
sub _process_timeout_line {
my $self = shift;
my ($type, $buffer, $reason) = @_;
chomp($buffer //= '');
my ($stamp, $delta) = split /\s+/, $buffer;
$stamp //= time();
$delta = defined($delta) ? sprintf('%.4f', $delta) : '??';
my $event_id = gen_uuid();
return {
event_id => $event_id,
job_id => $self->{+JOB_ID},
job_try => $self->{+JOB_TRY},
run_id => $self->{+RUN_ID},
stamp => $stamp,
facet_data => {
about => {uuid => $event_id, details => "Timeout ($type)"},
errors => [
tag => 'TIMEOUT',
details => "A timeout ($type) has occured (after $delta seconds), job was forcefully killed",
fail => 1,
info => [
tag => 'TIMEOUT',
debug => 1,
important => 1,
details => $reason,
=encoding UTF-8
=head1 NAME
Test2::Harness::Collector::JobDir - Job Directory Parser, read events from an active
jobs output directory.
This module is responsible for reading and parsing a running jobs output
directory. The result is an event stream.
This module is not intended for external use, it is an implementation detail
and can change at any time. Currently instances of this module are not passed
to any plugins or callbacks.
=head1 SOURCE
The source code repository for Test2-Harness can be found at
=over 4
=item Chad Granum E<lt>exodist@cpan.orgE<gt>
=head1 AUTHORS
=over 4
=item Chad Granum E<lt>exodist@cpan.orgE<gt>
Copyright 2020 Chad Granum E<lt>exodist7@gmail.comE<gt>.
This program is free software; you can redistribute it and/or
modify it under the same terms as Perl itself.