use
5.010;
$Neo4j::Driver::Result::Jolt::VERSION
=
'0.37'
;
our
@CARP_NOT
=
qw(Neo4j::Driver::Net::HTTP Neo4j::Driver::Result)
;
my
(
$TRUE
,
$FALSE
);
my
$MEDIA_TYPE
=
"application/vnd.neo4j.jolt"
;
my
$ACCEPT_HEADER
=
"$MEDIA_TYPE-v2+json-seq"
;
my
$ACCEPT_HEADER_V1
=
"$MEDIA_TYPE+json-seq"
;
my
$ACCEPT_HEADER_STRICT
=
"$MEDIA_TYPE+json-seq;strict=true"
;
my
$ACCEPT_HEADER_SPARSE
=
"$MEDIA_TYPE+json-seq;strict=false"
;
my
$ACCEPT_HEADER_NDJSON
=
"$MEDIA_TYPE"
;
our
$gather_results
= 1;
sub
new {
my
(
$class
,
$params
) =
@_
;
my
$jolt_v2
=
$params
->{http_header}->{content_type} =~ m/^\Q
$MEDIA_TYPE
\E-v2\b/i;
my
$self
= {
attached
=> 1,
exhausted
=> 0,
buffer
=> [],
server_info
=>
$params
->{server_info},
json_coder
=>
$params
->{http_agent}->json_coder,
http_agent
=>
$params
->{http_agent},
cypher_types
=>
$params
->{cypher_types},
v2_id_prefix
=>
$jolt_v2
?
'element_'
:
''
,
};
bless
$self
,
$class
;
(
$TRUE
,
$FALSE
) = @{
$self
->{json_coder}->decode(
'[true,false]'
) }
unless
$TRUE
;
return
$self
->_gather_results(
$params
)
if
$gather_results
;
die
"Unimplemented"
;
}
sub
_gather_results {
my
(
$self
,
$params
) =
@_
;
my
$error
=
'Neo4j::Error'
;
my
@results
= ();
my
$columns
=
undef
;
my
@data
= ();
$self
->{result} = {};
my
(
$state
,
$prev
) = (0,
'in first place'
);
my
(
$type
,
$event
);
while
( (
$type
,
$event
) =
$self
->_next_event ) {
if
(
$type
eq
'header'
) {
croak
"Jolt error: unexpected header event $prev"
unless
$state
== 0 ||
$state
== 3;
croak
"Jolt error: expected reference to HASH, received "
. (
ref
$event
?
"reference to "
.
ref
$event
:
"scalar"
) .
" in $type event $prev"
unless
ref
$event
eq
'HASH'
;
$state
= 1;
$columns
=
$event
->{fields};
}
elsif
(
$type
eq
'data'
) {
croak
"Jolt error: unexpected data event $prev"
unless
$state
== 1 ||
$state
== 2;
croak
"Jolt error: expected reference to ARRAY, received "
. (
ref
$event
?
"reference to "
.
ref
$event
:
"scalar"
) .
" in $type event $prev"
unless
ref
$event
eq
'ARRAY'
;
$state
= 2;
push
@data
, {
row
=>
$event
,
meta
=> [] };
}
elsif
(
$type
eq
'summary'
) {
croak
"Jolt error: unexpected summary event $prev"
unless
$state
== 1 ||
$state
== 2;
croak
"Jolt error: expected reference to HASH, received "
. (
ref
$event
?
"reference to "
.
ref
$event
:
"scalar"
) .
" in $type event $prev"
unless
ref
$event
eq
'HASH'
;
$state
= 3;
push
@results
, {
data
=> [
@data
],
stats
=>
$event
->{stats},
plan
=>
$event
->{plan},
columns
=>
$columns
// [],
};
@data
= ();
$columns
=
undef
;
}
elsif
(
$type
eq
'info'
) {
croak
"Jolt error: unexpected info event $prev"
unless
$state
== 0 ||
$state
== 3 ||
$state
== 4;
croak
"Jolt error: expected reference to HASH, received "
. (
ref
$event
?
"reference to "
.
ref
$event
:
"scalar"
) .
" in $type event $prev"
unless
ref
$event
eq
'HASH'
;
$state
+= 10;
$self
->{info} =
$event
;
$self
->{notifications} =
$event
->{notifications};
}
elsif
(
$type
eq
'error'
) {
carp
"Jolt error: unexpected error event $prev"
unless
$state
== 0 ||
$state
== 3 ||
$state
== 4;
croak
"Jolt error: expected reference to HASH, received "
. (
ref
$event
?
"reference to "
.
ref
$event
:
"scalar"
) .
" in $type event $prev"
unless
ref
$event
eq
'HASH'
;
$state
= 4;
$error
=
$error
->append_new(
Internal
=>
"Jolt error: Jolt $type event with 0 errors $prev"
)
unless
@{
$event
->{errors}};
$error
=
$error
->append_new(
Server
=>
$_
)
for
@{
$event
->{errors}};
}
else
{
croak
"Jolt error: unsupported $type event $prev"
;
}
$prev
=
"after $type event"
;
}
croak
"Jolt error: unexpected end of event stream $prev"
unless
$state
>= 10;
if
(!
$params
->{http_header}->{success}) {
$error
=
$error
->append_new(
Network
=> {
code
=>
$params
->{http_header}->{status},
as_string
=>
sprintf
(
"HTTP error: %s %s on %s to %s"
,
$params
->{http_header}->{status},
$params
->{http_agent}->http_reason,
$params
->{http_method},
$params
->{http_path}),
});
}
$params
->{error_handler}->(
$error
)
if
ref
$error
;
$self
->{http_agent} =
undef
;
if
(
@results
== 1) {
$self
->{result} =
$results
[0];
$self
->{statement} =
$params
->{statements}->[0];
return
$self
->_as_fully_buffered;
}
@results
=
map
{ __PACKAGE__->_new_result(
$_
,
undef
,
$params
) }
@results
;
$results
[
$_
]->{statement} =
$params
->{statements}->[
$_
]
for
(0 ..
$#results
);
$self
->{attached} = 0;
$self
->{exhausted} = 1;
$self
->{result_list} = \
@results
if
@results
;
return
$self
;
}
sub
_new_result {
my
(
$class
,
$result
,
$json
,
$params
) =
@_
;
my
$jolt_v2
=
$params
->{http_header}->{content_type} =~ m/^\Q
$MEDIA_TYPE
\E-v2\b/i;
my
$self
= {
attached
=> 0,
exhausted
=> 0,
result
=>
$result
,
buffer
=> [],
columns
=>
undef
,
summary
=>
undef
,
cypher_types
=>
$params
->{cypher_types},
server_info
=>
$params
->{server_info},
v2_id_prefix
=>
$jolt_v2
?
'element_'
:
''
,
};
bless
$self
,
$class
;
return
$self
->_as_fully_buffered;
}
sub
_next_event {
my
(
$self
) =
@_
;
my
$line
=
$self
->{http_agent}->fetch_event;
return
unless
defined
$line
;
my
$json
=
$self
->{json_coder}->decode(
$line
);
croak
"Jolt error: expected reference to HASH, received "
. (
ref
$json
?
"reference to "
.
ref
$json
:
"scalar"
)
unless
ref
$json
eq
'HASH'
;
my
@events
=
keys
%$json
;
croak
"Jolt error: expected exactly 1 event, received "
.
scalar
@events
unless
@events
== 1;
return
(
$events
[0],
$json
->{
$events
[0]} );
}
sub
_results {
my
(
$self
) =
@_
;
return
@{
$self
->{result_list} }
if
$self
->{result_list};
return
(
$self
);
}
sub
_info {
my
(
$self
) =
@_
;
return
$self
->{info};
}
sub
_init_record {
my
(
$self
,
$record
) =
@_
;
$record
->{column_keys} =
$self
->{columns};
$self
->_deep_bless(
$record
->{row} );
return
bless
$record
,
'Neo4j::Driver::Record'
;
}
sub
_deep_bless {
my
(
$self
,
$data
) =
@_
;
my
$cypher_types
=
$self
->{cypher_types};
if
(
ref
$data
eq
'ARRAY'
) {
$data
->[
$_
] =
$self
->_deep_bless(
$data
->[
$_
])
for
0 .. $
return
$data
;
}
if
(
ref
$data
eq
''
) {
return
$data
;
}
if
(
$data
==
$TRUE
||
$data
==
$FALSE
) {
return
$data
;
}
die
"Assertion failed: unexpected type: "
.
ref
$data
unless
ref
$data
eq
'HASH'
;
die
"Assertion failed: sigil count: "
.
scalar
keys
%$data
if
scalar
keys
%$data
!= 1;
my
$sigil
= (
keys
%$data
)[0];
my
$value
=
$data
->{
$sigil
};
if
(
$sigil
eq
'?'
) {
return
$TRUE
if
$value
eq
'true'
;
return
$FALSE
if
$value
eq
'false'
;
die
"Assertion failed: unexpected bool value: "
.
$value
;
}
if
(
$sigil
eq
'Z'
) {
return
0 +
$value
;
}
if
(
$sigil
eq
'R'
) {
return
0 +
$value
;
}
if
(
$sigil
eq
'U'
) {
return
$value
;
}
if
(
$sigil
eq
'[]'
) {
die
"Assertion failed: unexpected list type: "
.
ref
$value
unless
ref
$value
eq
'ARRAY'
;
$value
->[
$_
] =
$self
->_deep_bless(
$value
->[
$_
])
for
0 .. $
return
$value
;
}
if
(
$sigil
eq
'{}'
) {
die
"Assertion failed: unexpected map type: "
.
ref
$value
unless
ref
$value
eq
'HASH'
;
delete
$data
->{
'{}'
};
$data
->{
$_
} =
$self
->_deep_bless(
$value
->{
$_
})
for
keys
%$value
;
return
$data
;
}
if
(
$sigil
eq
'()'
) {
die
"Assertion failed: unexpected node type: "
.
ref
$value
unless
ref
$value
eq
'ARRAY'
;
die
"Assertion failed: unexpected node fields: "
.
scalar
@$value
unless
@$value
== 3;
die
"Assertion failed: unexpected prop type: "
.
ref
$value
->[2]
unless
ref
$value
->[2] eq
'HASH'
;
my
$props
=
$value
->[2];
$props
->{
$_
} =
$self
->_deep_bless(
$props
->{
$_
})
for
keys
%$props
;
my
$node
= \(
$props
);
bless
$node
,
$cypher_types
->{node};
$$node
->{_meta} = {
"$self->{v2_id_prefix}id"
=>
$value
->[0],
labels
=>
$value
->[1],
};
$cypher_types
->{init}->(
$node
)
if
$cypher_types
->{init};
return
$node
;
}
if
(
$sigil
eq
'->'
||
$sigil
eq
'<-'
) {
die
"Assertion failed: unexpected rel type: "
.
ref
$value
unless
ref
$value
eq
'ARRAY'
;
die
"Assertion failed: unexpected rel fields: "
.
scalar
@$value
unless
@$value
== 5;
die
"Assertion failed: unexpected prop type: "
.
ref
$value
->[4]
unless
ref
$value
->[4] eq
'HASH'
;
my
$props
=
$value
->[4];
$props
->{
$_
} =
$self
->_deep_bless(
$props
->{
$_
})
for
keys
%$props
;
my
$rel
= \(
$props
);
bless
$rel
,
$cypher_types
->{relationship};
$$rel
->{_meta} = {
"$self->{v2_id_prefix}id"
=>
$value
->[0],
type
=>
$value
->[2],
"$self->{v2_id_prefix}start"
=>
$sigil
eq
'->'
?
$value
->[1] :
$value
->[3],
"$self->{v2_id_prefix}end"
=>
$sigil
eq
'->'
?
$value
->[3] :
$value
->[1],
};
$cypher_types
->{init}->(
$rel
)
if
$cypher_types
->{init};
return
$rel
;
}
if
(
$sigil
eq
'..'
) {
die
"Assertion failed: unexpected path type: "
.
ref
$value
unless
ref
$value
eq
'ARRAY'
;
die
"Assertion failed: unexpected path fields: "
.
scalar
@$value
unless
@$value
& 1;
$value
->[
$_
] =
$self
->_deep_bless(
$value
->[
$_
])
for
0 .. $
my
$path
=
bless
{
path
=>
$value
},
$cypher_types
->{path};
$cypher_types
->{init}->(
$path
)
if
$cypher_types
->{init};
return
$path
;
}
if
(
$sigil
eq
'@'
) {
bless
$data
,
$cypher_types
->{point};
return
$data
;
}
if
(
$sigil
eq
'T'
) {
bless
$data
,
$cypher_types
->{temporal};
return
$data
;
}
if
(
$sigil
eq
'#'
) { # Bytes
$value
=~
tr
/ //d;
$value
=
pack
'H*'
,
$value
;
utf8::downgrade(
$value
);
return
$value
;
}
die
"Assertion failed: unexpected sigil: "
.
$sigil
;
}
sub
_accept_header {
my
(
undef
,
$want_jolt
,
$method
) =
@_
;
return
unless
$method
eq
'POST'
;
if
(
defined
$want_jolt
) {
return
if
!
$want_jolt
;
return
(
$ACCEPT_HEADER_V1
)
if
$want_jolt
eq
'v1'
;
return
(
$ACCEPT_HEADER_STRICT
)
if
$want_jolt
eq
'strict'
;
return
(
$ACCEPT_HEADER_SPARSE
)
if
$want_jolt
eq
'sparse'
;
return
(
$ACCEPT_HEADER_NDJSON
)
if
$want_jolt
eq
'ndjson'
;
}
return
(
$ACCEPT_HEADER
);
}
sub
_acceptable {
my
(
undef
,
$content_type
) =
@_
;
return
$content_type
=~ m/^\Q
$MEDIA_TYPE
\E\b/i;
}
1;