skip_unless_mongod build_client get_test_db
server_version server_type
/
;
skip_unless_mongod();
my
$conn
= build_client();
my
$db
= get_test_db(
$conn
);
my
$coll
=
$db
->coll(
"inventory"
);
my
$server_version
= server_version(
$conn
);
my
$server_type
= server_type(
$conn
);
my
$cursor
;
subtest
"change streams"
=>
sub
{
plan
skip_all
=>
'$currentDate operator requires MongoDB 3.6+'
unless
$server_version
>= v3.6.0;
plan
skip_all
=>
'Change Streams require replica set'
unless
$server_type
eq
'RSPrimary'
;
my
$document
;
my
$resume_token
;
my
@pipeline
;
$db
->coll(
'warmup'
)->insert_one({});
$cursor
=
$db
->coll(
'inventory'
)->watch();
$document
=
$cursor
->
next
;
is
$document
,
undef
,
'no changes after example 1'
;
$coll
->insert_one({
username
=>
'alice'
});
$document
=
$cursor
->
next
;
is
$document
->{fullDocument}{username},
'alice'
,
'found change inserted after example 1'
;
$cursor
=
$db
->coll(
'inventory'
)->watch(
[],
{
fullDocument
=>
'updateLookup'
},
);
$document
=
$cursor
->
next
;
is
$document
,
undef
,
'no changes after example 2'
;
$coll
->update_one(
{
username
=>
'alice'
},
{
'$set'
=> {
updated
=> 1 } },
);
$document
=
$cursor
->
next
;
is
$document
->{fullDocument}{username},
'alice'
,
'found change made after example 2'
;
$resume_token
=
$document
->{_id};
$cursor
=
$db
->coll(
'inventory'
)->watch(
[],
{
resumeAfter
=>
$resume_token
},
);
$document
=
$cursor
->
next
;
is
$document
,
undef
,
'no changes after example 3'
;
$coll
->update_one(
{
username
=>
'alice'
},
{
'$set'
=> {
updated
=> 2 } },
);
$document
=
$cursor
->
next
;
ok
$document
,
'found change made after example 3'
;
@pipeline
= (
{
'$match'
=> {
'$or'
=> [
{
'fullDocument.username'
=>
'alice'
},
{
'operationType'
=> {
'$in'
=> [
'delete'
] } },
],
} },
);
$cursor
=
$db
->coll(
'inventory'
)->watch(\
@pipeline
);
$document
=
$cursor
->
next
;
is
$document
,
undef
,
'no changes after example 4'
;
$coll
->delete_one({
username
=>
'alice'
});
$document
=
$cursor
->
next
;
ok
$document
,
'found change made after example 4'
;
$coll
->drop;
};
done_testing;