our
$VERSION
=
'1.302210'
;
our
@CARP_NOT
=
qw/Test2::Util::HashBase/
;
use
Carp
qw/croak cluck confess/
;
name hub
trace frame send_to
events
finished
active
stack
id cid uuid
children
_in_use
_attached pid tid
start_stamp stop_stamp
}
;
sub
CAN_REALLY_THREAD {
return
0
unless
CAN_THREAD;
return
0
unless
eval
{
require
threads; threads->VERSION(
'1.34'
); 1 };
return
1;
}
my
$UUID_VIA
= Test2::API::_add_uuid_via_ref();
my
$CID
= 1;
my
@STACK
;
sub
TOP {
@STACK
?
$STACK
[-1] :
undef
}
sub
init {
my
$self
=
shift
;
croak
"'name' is a required attribute"
unless
$self
->{+NAME};
my
$to
=
$self
->{+SEND_TO} ||= Test2::API::test2_stack()->top;
$self
->{+STACK} = [
@STACK
];
$_
->{+_IN_USE}++
for
reverse
@STACK
;
$self
->{+TID} = get_tid;
$self
->{+PID} = $$;
$self
->{+CID} =
'AsyncSubtest-'
.
$CID
++;
$self
->{+ID} = 1;
$self
->{+FINISHED} = 0;
$self
->{+ACTIVE} = 0;
$self
->{+_IN_USE} = 0;
$self
->{+CHILDREN} = [];
$self
->{+UUID} = ${
$UUID_VIA
}->()
if
defined
$$UUID_VIA
;
unless
(
$self
->{+HUB}) {
my
$ipc
= Test2::API::test2_ipc();
my
$formatter
= Test2::API::test2_stack->top->
format
;
my
$args
=
delete
$self
->{hub_init_args} || {};
my
$hub
= Test2::AsyncSubtest::Hub->new(
%$args
,
ipc
=>
$ipc
,
nested
=>
$to
->nested + 1,
buffered
=> 1,
formatter
=>
$formatter
,
);
weaken(
$hub
->{ast} =
$self
);
$self
->{+HUB} =
$hub
;
}
$self
->{+TRACE} ||= Test2::Util::Trace->new(
frame
=>
$self
->{+FRAME} || [
caller
(1)],
buffered
=>
$to
->buffered,
nested
=>
$to
->nested,
cid
=>
$self
->{+CID},
uuid
=>
$self
->{+UUID},
hid
=>
$to
->hid,
huuid
=>
$to
->uuid,
);
my
$hub
=
$self
->{+HUB};
$hub
->set_ast_ids({})
unless
$hub
->ast_ids;
$hub
->
listen
(
$self
->_listener);
}
sub
_listener {
my
$self
=
shift
;
my
$events
=
$self
->{+EVENTS} ||= [];
sub
{
push
@$events
=>
$_
[1] };
}
sub
context {
my
$self
=
shift
;
my
$send_to
=
$self
->{+SEND_TO};
confess
"Attempt to close AsyncSubtest when original parent hub (a non async-subtest?) has ended"
if
$send_to
->ended;
return
Test2::API::Context->new(
trace
=>
$self
->{+TRACE},
hub
=>
$send_to
,
);
}
sub
_gen_event {
my
$self
=
shift
;
my
(
$type
,
$id
,
$hub
) =
@_
;
my
$class
=
"Test2::AsyncSubtest::Event::$type"
;
return
$class
->new(
id
=>
$id
,
trace
=> Test2::Util::Trace->new(
frame
=> [
caller
(1)],
buffered
=>
$hub
->buffered,
nested
=>
$hub
->nested,
cid
=>
$self
->{+CID},
uuid
=>
$self
->{+UUID},
hid
=>
$hub
->hid,
huuid
=>
$hub
->uuid,
),
);
}
sub
cleave {
my
$self
=
shift
;
my
$id
=
$self
->{+ID}++;
$self
->{+HUB}->ast_ids->{
$id
} = 0;
return
$id
;
}
sub
attach {
my
$self
=
shift
;
my
(
$id
) =
@_
;
croak
"An ID is required"
unless
$id
;
croak
"ID $id is not valid"
unless
defined
$self
->{+HUB}->ast_ids->{
$id
};
croak
"ID $id is already attached"
if
$self
->{+HUB}->ast_ids->{
$id
};
croak
"You must attach INSIDE the child process/thread"
if
$self
->{+HUB}->is_local;
$self
->{+_ATTACHED} = [ $$, get_tid,
$id
];
$self
->{+HUB}->
send
(
$self
->_gen_event(
'Attach'
,
$id
,
$self
->{+HUB}));
}
sub
detach {
my
$self
=
shift
;
if
(
$self
->{+PID} == $$ &&
$self
->{+TID} == get_tid) {
cluck
"You must detach INSIDE the child process/thread ($$, "
. get_tid .
" instead of $self->{+PID}, $self->{+TID})"
;
return
;
}
my
$att
=
$self
->{+_ATTACHED}
or croak
"Not attached"
;
croak
"Attempt to detach from wrong child"
unless
$att
->[0] == $$ &&
$att
->[1] == get_tid;
my
$id
=
$att
->[2];
$self
->{+HUB}->
send
(
$self
->_gen_event(
'Detach'
,
$id
,
$self
->{+HUB}));
delete
$self
->{+_ATTACHED};
}
sub
ready {
return
!
shift
->pending }
sub
pending {
my
$self
=
shift
;
my
$hub
=
$self
->{+HUB};
return
-1
unless
$hub
->is_local;
$hub
->cull;
return
$self
->{+_IN_USE} +
keys
%{
$self
->{+HUB}->ast_ids};
}
sub
run {
my
$self
=
shift
;
my
(
$code
,
@args
) =
@_
;
croak
"AsyncSubtest->run() takes a codeblock as the first argument"
unless
$code
&&
ref
(
$code
) eq
'CODE'
;
$self
->start;
my
(
$ok
,
$err
,
$finished
);
T2_SUBTEST_WRAPPER: {
$ok
=
eval
{
$code
->(
@args
); 1 };
$err
= $@;
if
(!
$ok
&&
$err
=~ m/Label not found
for
"last T2_SUBTEST_WRAPPER"
/) {
$ok
=
undef
;
$err
=
undef
;
}
else
{
$finished
= 1;
}
}
$self
->stop;
my
$hub
=
$self
->{+HUB};
if
(!
$finished
) {
if
(
my
$bailed
=
$hub
->bailed_out) {
my
$ctx
=
$self
->context;
$ctx
->bail(
$bailed
->reason);
return
;
}
my
$code
=
$hub
->exit_code;
$ok
= !
$code
;
$err
=
"Subtest ended with exit code $code"
if
$code
;
}
unless
(
$ok
) {
my
$e
= Test2::Event::Exception->new(
error
=>
$err
,
trace
=> Test2::Util::Trace->new(
frame
=> [
caller
(0)],
buffered
=>
$hub
->buffered,
nested
=>
$hub
->nested,
cid
=>
$self
->{+CID},
uuid
=>
$self
->{+UUID},
hid
=>
$hub
->hid,
huuid
=>
$hub
->uuid,
),
);
$hub
->
send
(
$e
);
}
return
$hub
->is_passing;
}
sub
start {
my
$self
=
shift
;
croak
"Subtest is already complete"
if
$self
->{+FINISHED};
$self
->{+START_STAMP} = Time::HiRes::
time
()
unless
defined
$self
->{+START_STAMP};
$self
->{+ACTIVE}++;
push
@STACK
=>
$self
;
my
$hub
=
$self
->{+HUB};
my
$stack
= Test2::API::test2_stack();
$stack
->
push
(
$hub
);
return
$hub
->is_passing;
}
sub
stop {
my
$self
=
shift
;
croak
"Subtest is not active"
unless
$self
->{+ACTIVE}--;
croak
"AsyncSubtest stack mismatch"
unless
@STACK
&&
$self
==
$STACK
[-1];
$self
->{+STOP_STAMP} = Time::HiRes::
time
();
pop
@STACK
;
my
$hub
=
$self
->{+HUB};
my
$stack
= Test2::API::test2_stack();
$stack
->
pop
(
$hub
);
return
$hub
->is_passing;
}
sub
finish {
my
$self
=
shift
;
my
%params
=
@_
;
my
$hub
=
$self
->hub;
croak
"Subtest is already finished"
if
$self
->{+FINISHED}++;
croak
"Subtest can only be finished in the process/thread that created it"
unless
$hub
->is_local;
croak
"Subtest is still active"
if
$self
->{+ACTIVE};
$self
->
wait
;
$self
->{+STOP_STAMP} = Time::HiRes::
time
()
unless
defined
$self
->{+STOP_STAMP};
my
$stop_stamp
=
$self
->{+STOP_STAMP};
my
$todo
=
$params
{todo};
my
$skip
=
$params
{skip};
my
$empty
= !@{
$self
->{+EVENTS}};
my
$no_asserts
= !
$hub
->count;
my
$collapse
=
$params
{collapse};
my
$no_plan
=
$params
{no_plan} || (
$collapse
&&
$no_asserts
) ||
$skip
;
my
$trace
= Test2::Util::Trace->new(
frame
=>
$self
->{+TRACE}->{frame},
buffered
=>
$hub
->buffered,
nested
=>
$hub
->nested,
cid
=>
$self
->{+CID},
uuid
=>
$self
->{+UUID},
hid
=>
$hub
->hid,
huuid
=>
$hub
->uuid,
);
$hub
->finalize(
$trace
, !
$no_plan
)
unless
$hub
->no_ending ||
$hub
->ended;
if
(
$hub
->ipc) {
$hub
->ipc->drop_hub(
$hub
->hid);
$hub
->set_ipc(
undef
);
}
return
$hub
->is_passing
if
$params
{silent};
my
$ctx
=
$self
->context;
my
$pass
= 1;
if
(
$skip
) {
$ctx
->skip(
$self
->{+NAME},
$skip
);
}
else
{
if
(
$collapse
&&
$empty
) {
$ctx
->ok(
$hub
->is_passing,
$self
->{+NAME});
return
$hub
->is_passing;
}
if
(
$collapse
&&
$no_asserts
) {
push
@{
$self
->{+EVENTS}} => Test2::Event::Plan->new(
trace
=>
$trace
,
max
=> 0,
directive
=>
'SKIP'
,
reason
=>
"No assertions"
);
}
my
$e
=
$ctx
->build_event(
'Subtest'
,
pass
=>
$hub
->is_passing,
subtest_id
=>
$hub
->id,
subtest_uuid
=>
$hub
->uuid,
name
=>
$self
->{+NAME},
buffered
=> 1,
subevents
=>
$self
->{+EVENTS},
start_stamp
=>
$self
->{+START_STAMP},
stop_stamp
=>
$self
->{+STOP_STAMP},
$todo
? (
todo
=>
$todo
,
effective_pass
=> 1,
) : (),
);
$ctx
->hub->
send
(
$e
);
unless
(
$e
->effective_pass) {
$ctx
->failure_diag(
$e
);
$ctx
->diag(
"Bad subtest plan, expected "
.
$hub
->plan .
" but ran "
.
$hub
->count)
if
$hub
->plan && !
$hub
->check_plan && !
grep
{
$_
->causes_fail} @{
$self
->{+EVENTS}};
}
$pass
=
$e
->pass;
}
$_
->{+_IN_USE}--
for
reverse
@{
$self
->{+STACK}};
return
$pass
;
}
sub
wait
{
my
$self
=
shift
;
my
$hub
=
$self
->{+HUB};
my
$children
=
$self
->{+CHILDREN};
while
(
@$children
) {
$hub
->cull;
if
(
my
$child
=
pop
@$children
) {
if
(blessed(
$child
)) {
$child
->
join
;
}
else
{
waitpid
(
$child
, 0);
}
}
else
{
Time::HiRes::
sleep
(
'0.01'
);
}
}
$hub
->cull;
cluck
"Subtest '$self->{+NAME}': All children have completed, but we still appear to be pending"
if
$hub
->is_local &&
keys
%{
$self
->{+HUB}->ast_ids};
}
sub
fork
{
croak
"Forking is not supported"
unless
CAN_FORK;
my
$self
=
shift
;
my
$id
=
$self
->cleave;
my
$pid
= CORE::
fork
();
unless
(
defined
$pid
) {
delete
$self
->{+HUB}->ast_ids->{
$id
};
croak
"Failed to fork"
;
}
if
(
$pid
) {
push
@{
$self
->{+CHILDREN}} =>
$pid
;
return
$pid
;
}
$self
->attach(
$id
);
return
$self
->_guard;
}
sub
run_fork {
my
$self
=
shift
;
my
(
$code
,
@args
) =
@_
;
my
$f
=
$self
->
fork
;
return
$f
unless
blessed(
$f
);
$self
->run(
$code
,
@args
);
$self
->detach();
$f
->dismiss();
exit
0;
}
sub
run_thread {
croak
"Threading is not supported"
unless
CAN_REALLY_THREAD;
my
$self
=
shift
;
my
(
$code
,
@args
) =
@_
;
my
$id
=
$self
->cleave;
my
$thr
= threads->create(
sub
{
$self
->attach(
$id
);
$self
->run(
$code
,
@args
);
$self
->detach(get_tid);
return
0;
});
push
@{
$self
->{+CHILDREN}} =>
$thr
;
return
$thr
;
}
sub
_guard {
my
$self
=
shift
;
my
(
$pid
,
$tid
) = ($$, get_tid);
return
Test2::Util::Guard->new(
sub
{
return
unless
$$ ==
$pid
&& get_tid ==
$tid
;
my
$error
=
"Scope Leak"
;
if
(
my
$ex
= $@) {
chomp
(
$ex
);
$error
.=
" ($ex)"
;
}
cluck
$error
;
my
$e
=
$self
->context->build_event(
'Exception'
,
error
=>
"$error\n"
,
);
$self
->{+HUB}->
send
(
$e
);
$self
->detach();
exit
255;
});
}
sub
DESTROY {
my
$self
=
shift
;
return
unless
$self
->{+NAME};
if
(
my
$att
=
$self
->{+_ATTACHED}) {
return
unless
$self
->{+HUB};
eval
{
$self
->detach() };
}
return
if
$self
->{+FINISHED};
return
unless
$self
->{+PID} == $$;
return
unless
$self
->{+TID} == get_tid;
local
$@;
eval
{
$_
->{+_IN_USE}--
for
reverse
@{
$self
->{+STACK}} };
warn
"Subtest $self->{+NAME} did not finish!"
;
exit
255;
}
1;