our
$VERSION
=
'v1.999.0'
;
requires
qw/monitoring_callback db_name/
;
has
command_start_time
=> (
is
=>
'rw'
, );
has
command_start_event
=> (
is
=>
'rw'
, );
sub
publish_command_started {
my
(
$self
,
$link
,
$command
,
$request_id
) =
@_
;
return
unless
$self
->monitoring_callback;
$command
= _to_tied_ixhash(
$command
);
my
$command_name
=
tied
(
%$command
)->Keys(0);
my
$event
= {
type
=>
'command_started'
,
databaseName
=>
$self
->db_name,
commandName
=>
$command_name
,
command
=> (
_needs_redaction(
$command_name
)
? _to_tied_ixhash([])
:
$command
,
),
requestId
=>
$request_id
,
connectionId
=>
$link
->address,
};
$self
->command_start_event(
$event
);
eval
{
$self
->monitoring_callback->(
$event
) };
$self
->command_start_time(
time
);
return
;
}
sub
publish_command_reply {
my
(
$self
,
$bson
) =
@_
;
return
unless
$self
->monitoring_callback;
my
$duration
=
time
-
$self
->command_start_time();
my
$start_event
=
$self
->command_start_event();
my
$reply
=
ref
(
$bson
) eq
'HASH'
?
$bson
: BSON->new()->decode_one(
$bson
);
my
$event
= {
databaseName
=>
$start_event
->{databaseName},
commandName
=>
$start_event
->{commandName},
requestId
=>
$start_event
->{requestId},
connectionId
=>
$start_event
->{connectionId},
durationSecs
=>
$duration
,
reply
=> (
_needs_redaction(
$start_event
->{commandName})
? {}
:
$reply
,
),
};
if
(
$reply
->{ok} ) {
$event
->{type} =
'command_succeeded'
;
}
else
{
$event
->{type} =
'command_failed'
;
$event
->{failure} = _extract_errmsg(
$reply
);
}
eval
{
$self
->monitoring_callback->(
$event
) };
return
;
}
sub
publish_command_exception {
my
(
$self
,
$err
) =
@_
;
return
unless
$self
->monitoring_callback;
my
$duration
=
time
-
$self
->command_start_time();
my
$start_event
=
$self
->command_start_event();
my
$event
= {
type
=>
"command_failed"
,
databaseName
=>
$start_event
->{databaseName},
commandName
=>
$start_event
->{commandName},
requestId
=>
$start_event
->{requestId},
connectionId
=>
$start_event
->{connectionId},
durationSecs
=>
$duration
,
reply
=> {},
failure
=>
"$err"
,
eval_error
=>
$err
,
};
eval
{
$self
->monitoring_callback->(
$event
) };
return
;
}
sub
publish_legacy_write_started {
my
(
$self
,
$link
,
$cmd_name
,
$op_doc
,
$request_id
) =
@_
;
my
$method
=
"_convert_legacy_$cmd_name"
;
return
$self
->publish_command_started(
$link
,
$self
->
$method
(
$op_doc
),
$request_id
);
}
sub
publish_legacy_reply_succeeded {
my
(
$self
,
$result
) =
@_
;
my
$batchfield
=
ref
(
$self
) eq
"MongoDB::Op::_Query"
?
"firstBatch"
:
"nextBatch"
;
my
$reply
= {
ok
=> 1,
cursor
=> {
id
=>
$result
->{cursor_id},
ns
=>
$self
->full_name,
$batchfield
=> [ @{
$result
->{docs}} ],
},
};
return
$self
->publish_command_reply(
$reply
);
}
sub
publish_legacy_query_error {
my
(
$self
,
$result
) =
@_
;
my
$reply
= {
%$result
,
ok
=> 0,
};
return
$self
->publish_command_reply(
$reply
);
}
sub
_needs_redaction {
my
(
$name
) =
@_
;
return
1
if
grep
{
$name
eq
$_
}
qw(
authenticate
saslStart
saslContinue
getnonce
createUser
updateUser
copydbgetnonce
copydbsaslstart
copydb
)
;
return
0;
}
sub
_convert_legacy_insert {
my
(
$self
,
$op_doc
) =
@_
;
$op_doc
= [
$op_doc
]
unless
ref
$op_doc
eq
'ARRAY'
;
return
[
insert
=>
$self
->coll_name,
documents
=>
$op_doc
,
@{
$self
->write_concern->as_args },
];
}
sub
_extract_errmsg {
my
(
$output
) =
@_
;
for
my
$err_key
(
qw/$err err errmsg/
) {
return
$output
->{
$err_key
}
if
exists
$output
->{
$err_key
};
}
if
(
exists
$output
->{writeConcernError} ) {
return
$output
->{writeConcernError}{errmsg};
}
return
""
;
}
sub
_convert_legacy_update {
my
(
$self
,
$op_doc
) =
@_
;
return
[
update
=>
$self
->coll_name,
updates
=> [
update
=>
$self
->coll_name,
updates
=> [
$op_doc
],
],
@{
$self
->write_concern->as_args },
];
}
sub
_convert_legacy_delete {
my
(
$self
,
$op_doc
) =
@_
;
return
[
delete
=>
$self
->coll_name,
deletes
=> [
$op_doc
],
@{
$self
->write_concern->as_args },
];
}
sub
_decode_preencoded {
my
(
$obj
) =
@_
;
my
$codec
= BSON->new;
my
$type
=
ref
(
$obj
);
if
(
$type
eq
'BSON::Raw'
) {
return
$codec
->decode_one(
$obj
->{bson} );
}
elsif
(
$type
eq
'Tie::IxHash'
) {
tie
my
%out
,
"Tie::IxHash"
;
$out
{
$_
} = _decode_preencoded(
$obj
->FETCH(
$_
) )
for
$obj
->Keys;
return
\
%out
;
}
elsif
(
$type
eq
'ARRAY'
) {
return
[
map
{ _decode_preencoded(
$_
) }
@$obj
];
}
elsif
(
$type
eq
'HASH'
) {
return
{
map
{ ;
$_
=> _decode_preencoded(
$obj
->{
$_
} ) }
keys
%$obj
};
}
return
$obj
;
}
sub
_to_tied_ixhash {
my
(
$in
) =
@_
;
my
$type
=
ref
(
$in
);
my
%out
;
if
(
$type
eq
'ARRAY'
) {
tie
%out
,
"Tie::IxHash"
,
map
{ _decode_preencoded(
$_
) }
@$in
;
}
elsif
(
$type
eq
"Tie::IxHash"
) {
tie
%out
,
"Tie::IxHash"
;
$out
{
$_
} = _decode_preencoded(
$in
->FETCH(
$_
) )
for
$in
->Keys;
}
else
{
tie
%out
,
"Tie::IxHash"
,
map
{ ;
$_
=> _decode_preencoded(
$in
->{
$_
} ) }
keys
%$in
;
}
return
\
%out
;
}
1;