to_IxHash
/
;
use
if
$ENV
{MONGOVERBOSE},
qw/Log::Any::Adapter Stderr/
;
build_client
get_test_db
server_version
server_type
clear_testdbs
get_unique_collection
skip_unless_mongod
skip_unless_failpoints_available
skip_unless_transactions
remap_hashref_to_snake_case
to_snake_case
set_failpoint
clear_failpoint
/
;
maybe_skip_multiple_mongos
foreach_spec_test
/
;
skip_unless_mongod();
skip_unless_failpoints_available();
skip_unless_transactions();
my
@events
;
sub
clear_events {
@events
= () }
sub
event_count {
scalar
@events
}
sub
event_cb {
push
@events
, dclone
$_
[0] }
my
$conn
= build_client(
wtimeout
=>
undef
);
my
%method_args
= (
insert_one
=> [
qw( document )
],
insert_many
=> [
qw( documents )
],
delete_one
=> [
qw( filter )
],
delete_many
=> [
qw( filter )
],
replace_one
=> [
qw( filter replacement )
],
update_one
=> [
qw( filter update )
],
update_many
=> [
qw( filter update )
],
find
=> [
qw( filter )
],
count
=> [
qw( filter )
],
count_documents
=> [
qw( filter )
],
bulk_write
=> [
qw( requests )
],
find_one_and_update
=> [
qw( filter update )
],
find_one_and_replace
=> [
qw( filter replacement )
],
find_one_and_delete
=> [
qw( filter )
],
run_command
=> [
qw( command readPreference )
],
aggregate
=> [
qw( pipeline )
],
distinct
=> [
qw( fieldName filter )
],
);
foreach_spec_test(
't/data/transactions-convenient-api'
,
$conn
,
sub
{
my
(
$test
,
$plan
) =
@_
;
maybe_skip_multiple_mongos(
$conn
,
$test
->{useMultipleMongoses} );
my
$test_db_name
=
$plan
->{database_name};
my
$test_coll_name
=
$plan
->{collection_name};
my
$client
= build_client(
wtimeout
=>
undef
);
eval
{
$client
->send_admin_command([
killAllSessions
=> [] ]) };
my
$test_db
=
$client
->get_database(
$test_db_name
);
my
$test_coll
=
$test_db
->get_collection(
$test_coll_name
,
{
write_concern
=> {
w
=>
'majority'
,
wtimeout
=> 10000 } }
);
$test_coll
->drop;
$test_db
->run_command([
create
=>
$test_coll_name
]);
set_failpoint(
$client
,
$test
->{failPoint} );
run_test(
$test_db_name
,
$test_coll_name
,
$test
);
clear_failpoint(
$client
,
$test
->{failPoint} );
if
(
defined
$test
->{outcome}{collection}{data} ) {
my
@outcome
=
$test_coll
->find()->all;
cmp_deeply( \
@outcome
,
$test
->{outcome}{collection}{data},
'outcome as expected'
)
}
});
my
%sessions
;
sub
run_test {
my
(
$test_db_name
,
$test_coll_name
,
$test
) =
@_
;
my
$client_options
=
$test
->{clientOptions} // {};
$client_options
= remap_hashref_to_snake_case(
$client_options
);
if
(
exists
$client_options
->{read_preference} ) {
$client_options
->{read_pref_mode} =
delete
$client_options
->{read_preference};
}
my
$client
= build_client(
monitoring_callback
=> \
&event_cb
,
wtimeout
=>
undef
,
%$client_options
);
my
$session_options
=
$test
->{sessionOptions} // {};
%sessions
= (
session0
=>
$client
->start_session(
$session_options
->{session0} ),
session1
=>
$client
->start_session(
$session_options
->{session1} ),
);
$sessions
{session0_lsid} =
$sessions
{session0}->session_id;
$sessions
{session1_lsid} =
$sessions
{session1}->session_id;
$client
->topology_status(
refresh
=> 1 );
clear_events();
for
my
$operation
( @{
$test
->{operations} } ) {
my
$collection_options
=
$operation
->{collectionOptions} // {};
$collection_options
= remap_hashref_to_snake_case(
$collection_options
);
my
$op_result
=
$operation
->{result};
eval
{
$sessions
{ database } =
$client
->get_database(
$test_db_name
);
$sessions
{ collection } =
$sessions
{ database }->get_collection(
$test_coll_name
,
$collection_options
);
my
$op_args
=
$operation
->{
'arguments'
};
my
$callback_args
=
$op_args
->{
'callback'
};
$sessions
{
$operation
->{object} }->with_transaction(
sub
{
my
(
$callback_session
) =
@_
;
foreach
my
$sub_op
(@{
$callback_args
->{
'operations'
} }) {
my
$cmd
= to_snake_case(
$sub_op
->{
'name'
});
my
$sub_op_result
=
$sub_op
->{
'result'
};
my
@args
= _adjust_arguments(
$cmd
,
$sub_op
->{
'arguments'
}||{});
if
(
@args
) {
$args
[-1]->{session} =
$callback_session
if
exists
$args
[-1]->{session};
$args
[-1]->{returnDocument} =
lc
$args
[-1]->{returnDocument}
if
exists
$args
[-1]->{returnDocument};
}
alarm
666;
pass
"running command $cmd"
;
my
$ret
=
$sessions
{
$sub_op
->{object} }->
$cmd
(
@args
);
alarm
0;
my
$result
=
$ret
;
$result
= [
$ret
->all ]
if
(
grep
{
$cmd
eq
$_
}
qw/ find aggregate distinct /
);
check_result_outcome(
$result
,
$sub_op_result
)
if
$sub_op_result
;
}
},
$op_args
->{
'options'
}
);
};
my
$err
= $@;
check_error(
$err
,
$op_result
);
}
$sessions
{session0}->end_session;
$sessions
{session1}->end_session;
if
(
defined
$test
->{expectations} ) {
check_event_expectations( _adjust_types(
$test
->{expectations} ) );
}
%sessions
= ();
}
sub
check_error {
my
(
$err
,
$exp
) =
@_
;
my
$expecting_error
= 0;
if
(
ref
(
$exp
) eq
'HASH'
) {
$expecting_error
=
grep
{/^error/}
keys
%{
$exp
};
}
if
(
$err
) {
unless
(
$expecting_error
) {
my
$diag_msg
=
'Not expecting error, got "'
.
$err
->message .
'"'
;
fail
$diag_msg
;
return
;
}
my
$err_contains
=
$exp
->{errorContains};
my
$err_code_name
=
$exp
->{errorCodeName};
my
$err_labels_contains
=
$exp
->{errorLabelsContain};
my
$err_labels_omit
=
$exp
->{errorLabelsOmit};
if
(
defined
$err_contains
) {
$err_contains
=~ s/abortTransaction/abort_transaction/;
$err_contains
=~ s/commitTransaction/commit_transaction/;
like
$err
->message,
qr/$err_contains/
i,
'error contains '
.
$err_contains
;
}
if
(
defined
$err_code_name
) {
my
$result
=
$err
->result;
if
(
$result
->isa(
'MongoDB::CommandResult'
) ) {
my
$output
=
$result
->output;
my
$code_name
=
$output
->{codeName} ||
$output
->{
'writeConcernError'
}{
'codeName'
};
is
$code_name
,
$err_code_name
,
'error has name '
.
$err_code_name
;
}
}
if
(
defined
$err_labels_omit
) {
for
my
$err_label
( @{
$err_labels_omit
} ) {
ok !
$err
->has_error_label(
$err_label
),
'error doesnt have label '
.
$err_label
;
}
}
if
(
defined
$err_labels_omit
) {
for
my
$err_label
( @{
$err_labels_contains
} ) {
ok
$err
->has_error_label(
$err_label
),
'error has label '
.
$err_label
;
}
}
}
elsif
(
$expecting_error
) {
fail
'Expecting error, but no error found'
;
}
}
sub
check_result_outcome {
my
(
$got
,
$exp
) =
@_
;
if
(
ref
(
$exp
) eq
'ARRAY'
) {
check_array_result_outcome(
$got
,
$exp
);
}
else
{
check_hash_result_outcome(
$got
,
$exp
);
}
}
sub
check_array_result_outcome {
my
(
$got
,
$exp
) =
@_
;
cmp_deeply
$got
,
$exp
,
'result as expected'
;
}
sub
check_hash_result_outcome {
my
(
$got
,
$exp
) =
@_
;
my
$ok
= 1;
if
(
ref
$exp
ne
'HASH'
) {
is_deeply(
$got
,
$exp
,
"non-hash result correct"
);
}
else
{
for
my
$key
(
keys
%$exp
) {
my
$obj_key
= to_snake_case(
$key
);
next
if
(
$key
eq
'upsertedCount'
&& !
$got
->can(
'upserted_count'
) );
if
(
ref
$got
eq
'HASH'
) {
$ok
&&= cmp_deeply
$got
->{
$obj_key
},
$exp
->{
$key
},
"$key result correct"
;
}
else
{
unless
( can_ok(
$got
,
$obj_key
) ) {
$ok
= 0;
next
;
}
$ok
&&= cmp_deeply
$got
->
$obj_key
,
$exp
->{
$key
},
"$key result correct"
;
}
}
}
if
( !
$ok
) {
diag
"GOT:\n"
, explain(
$got
),
"\nEXPECT:\n"
, explain(
$exp
);
}
}
sub
_adjust_arguments {
my
(
$method
,
$args
) =
@_
;
$args
= _adjust_types(
$args
);
my
@fields
= @{
$method_args
{
$method
} || [] };
my
@field_values
=
map
{
my
$val
=
delete
$args
->{
$_
};
(
$method
eq
'bulk_write'
and
$_
eq
'requests'
)
? _adjust_bulk_write_requests(
$val
)
:
$val
;
}
@fields
;
return
(
(
grep
{
defined
}
@field_values
),
scalar
(
keys
%$args
) ?
$args
: (),
);
}
sub
_adjust_bulk_write_requests {
my
(
$requests
) =
@_
;
return
[
map
{
my
$name
= to_snake_case(
$_
->{name} );
+{
$name
=> [_adjust_arguments(
$name
,
$_
->{arguments})] };
}
@$requests
];
}
sub
_adjust_types {
my
(
$value
) =
@_
;
if
(
ref
$value
eq
'HASH'
) {
if
(
scalar
(
keys
%$value
) == 1) {
my
(
$name
,
$value
) =
%$value
;
if
(
$name
eq
'$numberLong'
) {
return
0+
$value
;
}
}
return
+{
map
{
my
$key
=
$_
;
(
$key
, _adjust_types(
$value
->{
$key
}));
}
keys
%$value
};
}
elsif
(
ref
$value
eq
'ARRAY'
) {
return
[
map
{ _adjust_types(
$_
) }
@$value
];
}
else
{
return
$value
;
}
}
sub
prepare_data_spec {
my
(
$spec
) =
@_
;
if
( !
defined
$spec
) {
return
$spec
;
}
elsif
(not
ref
$spec
) {
if
(
$spec
eq
'test'
) {
return
any(
qw( test test_collection )
);
}
if
(
$spec
eq
'test-unacknowledged-bulk-write'
) {
return
code(\
&_verify_is_nonempty_str
);
}
if
(
$spec
eq
'command-monitoring-tests.test'
) {
return
code(\
&_verify_is_nonempty_str
);
}
return
$spec
;
}
elsif
(is_bool
$spec
) {
my
$specced
=
$spec
? 1 : 0;
return
code(
sub
{
my
$value
=
shift
;
return
(0,
'expected a true boolean value'
)
if
$specced
and not
$value
;
return
(0,
'expected a false boolean value'
)
if
$value
and not
$specced
;
return
1;
});
}
elsif
(
ref
$spec
eq
'ARRAY'
) {
return
[
map
{
prepare_data_spec(
$_
)
}
@$spec
];
}
elsif
(
ref
$spec
eq
'HASH'
) {
return
+{
map
{
(
$_
, prepare_data_spec(
$spec
->{
$_
}))
}
keys
%$spec
};
}
else
{
return
$spec
;
}
}
sub
check_event_expectations {
my
(
$expected
) =
@_
;
my
@got
=
grep
{ (
$_
->{type} //
''
) eq
'command_started'
&&
(
$_
->{commandName} //
''
) !~ /sasl|ismaster/ }
@events
;
for
my
$exp
(
@$expected
) {
my
(
$exp_type
,
$exp_spec
) =
%$exp
;
subtest
$exp_type
=>
sub
{
ok(
scalar
(
@got
),
'event available'
)
or
return
;
my
$event
=
shift
@got
;
is(
$event
->{type}.
'_event'
,
$exp_type
,
"is a $exp_type"
)
or
return
;
my
$event_tester
=
"check_$exp_type"
;
main->can(
$event_tester
)->(
$exp_spec
,
$event
);
};
}
is
scalar
(
@got
), 0,
'no outstanding events'
;
}
sub
check_event {
my
(
$exp
,
$event
) =
@_
;
for
my
$key
(
sort
keys
%$exp
) {
my
$check
=
"check_${key}_field"
;
main->can(
$check
)->(
$exp
->{
$key
},
$event
);
}
}
sub
check_command_started_event {
my
(
$exp
,
$event
) =
@_
;
check_event(
$exp
,
$event
);
}
sub
_verify_is_positive_num {
my
$value
=
shift
;
return
(0,
"error code is not defined"
)
unless
defined
$value
;
return
(0,
"error code is not positive"
)
unless
$value
> 1;
return
1;
}
sub
_verify_is_nonempty_str {
my
$value
=
shift
;
return
(0,
"error message is not defined"
)
unless
defined
$value
;
return
(0,
"error message is empty"
)
unless
length
$value
;
return
1;
}
sub
check_database_name_field {
my
(
$exp_name
,
$event
) =
@_
;
ok
defined
(
$event
->{databaseName}),
"database_name defined"
;
ok
length
(
$event
->{databaseName}),
"database_name non-empty"
;
}
sub
check_command_name_field {
my
(
$exp_name
,
$event
) =
@_
;
is
$event
->{commandName},
$exp_name
,
"command name"
;
}
sub
check_reply_field {
my
(
$exp_reply
,
$event
) =
@_
;
my
$event_reply
=
$event
->{reply};
if
(
exists
$exp_reply
->{cursor}) {
if
(
exists
$exp_reply
->{cursor}{id}) {
$exp_reply
->{cursor}{id} = code(\
&_verify_is_positive_num
)
if
$exp_reply
->{cursor}{id} eq
'42'
;
}
}
if
(
exists
$exp_reply
->{writeErrors}) {
for
my
$i
( 0 .. $
my
$error
=
$exp_reply
->{writeErrors}[
$i
];
if
(
exists
$error
->{code} and
$error
->{code} eq 42) {
$error
->{code} = code(\
&_verify_is_positive_num
);
}
if
(
exists
$error
->{errmsg} and
$error
->{errmsg} eq
''
) {
$error
->{errmsg} = code(\
&_verify_is_nonempty_str
);
}
$exp_reply
->{writeErrors}[
$i
] = superhashof(
$error
);
}
}
if
(
$event
->{commandName} eq
'killCursors'
and
defined
$exp_reply
->{cursorsUnknown}
) {
for
my
$index
(0 .. $
$exp_reply
->{cursorsUnknown}[
$index
]
= code(\
&_verify_is_positive_num
)
if
$exp_reply
->{cursorsUnknown}[
$index
] eq 42;
}
}
for
my
$exp_key
(
sort
keys
%$exp_reply
) {
cmp_deeply
$event_reply
->{
$exp_key
},
prepare_data_spec(
$exp_reply
->{
$exp_key
}),
"reply field $exp_key"
or diag explain
$event_reply
->{
$exp_key
};
}
}
sub
check_command_field {
my
(
$exp_command
,
$event
) =
@_
;
my
$event_command
=
$event
->{command};
delete
$exp_command
->{ordered};
if
(
exists
$exp_command
->{getMore}) {
$exp_command
->{getMore} = code(\
&_verify_is_positive_num
)
if
$exp_command
->{getMore} eq
'42'
;
}
if
(
defined
$exp_command
->{writeConcern} ) {
unless
(
defined
$exp_command
->{writeConcern}->{wtimeout} ) {
$exp_command
->{writeConcern}{wtimeout} = ignore();
$exp_command
->{writeConcern} = subhashof(
$exp_command
->{writeConcern});
}
}
else
{
$exp_command
->{writeConcern} = ignore();
}
if
(
$event
->{commandName} eq
'killCursors'
and
defined
$exp_command
->{cursors}
) {
for
my
$index
(0 .. $
$exp_command
->{cursors}[
$index
]
= code(\
&_verify_is_positive_num
)
if
$exp_command
->{cursors}[
$index
] eq 42;
}
}
if
(
defined
$exp_command
->{lsid} ) {
$exp_command
->{lsid} =
$sessions
{
$exp_command
->{lsid} .
'_lsid'
};
}
if
(
defined
$exp_command
->{readConcern} ) {
$exp_command
->{readConcern}{afterClusterTime} = Isa(
'BSON::Timestamp'
)
if
(
defined
$exp_command
->{readConcern}{afterClusterTime} &&
$exp_command
->{readConcern}{afterClusterTime} eq
'42'
);
}
if
(
defined
$exp_command
->{txnNumber} ) {
$exp_command
->{txnNumber} = Math::BigInt->new(
$exp_command
->{txnNumber});
}
for
my
$exp_key
(
sort
keys
%$exp_command
) {
my
$event_value
=
$event_command
->{
$exp_key
};
my
$exp_value
= prepare_data_spec(
$exp_command
->{
$exp_key
});
my
$label
=
"command field '$exp_key'"
;
if
(
(
grep
{
$exp_key
eq
$_
}
qw( comment maxTimeMS )
)
or
(
$event
->{commandName} eq
'getMore'
and
$exp_key
eq
'batchSize'
)
) {
cmp_deeply
$event_value
,
$exp_value
,
$label
;
}
elsif
( !
defined
$exp_value
)
{
ok !
exists
$event_command
->{
$exp_key
},
$label
.
' does not exist'
;
}
else
{
cmp_deeply
$event_value
,
$exp_value
,
$label
;
}
}
}
clear_testdbs;
done_testing;