use
5.010;
$Neo4j::Driver::Net::HTTP::VERSION
=
'0.33'
;
our
@CARP_NOT
=
qw(Neo4j::Driver::Transaction Neo4j::Driver::Transaction::HTTP)
;
my
$DISCOVERY_ENDPOINT
=
'/'
;
my
$COMMIT_ENDPOINT
=
'commit'
;
my
@RESULT_MODULES
=
qw( Neo4j::Driver::Result::Jolt Neo4j::Driver::Result::JSON )
;
my
$RESULT_FALLBACK
=
'Neo4j::Driver::Result::Text'
;
my
$RFC5322_DATE
=
'%a, %d %b %Y %H:%M:%S %z'
;
sub
new {
my
(
$class
,
$driver
) =
@_
;
$driver
->{plugins}->{default_handlers}->{http_adapter_factory} //=
sub
{
my
$net_module
=
$driver
->{net_module} ||
'Neo4j::Driver::Net::HTTP::LWP'
;
return
$net_module
->new(
$driver
);
};
my
$http_adapter
=
$driver
->{plugins}->trigger_event(
'http_adapter_factory'
,
$driver
);
my
$self
=
bless
{
die_on_error
=>
$driver
->{die_on_error},
cypher_types
=>
$driver
->{cypher_types},
server_info
=>
$driver
->{server_info},
http_agent
=>
$http_adapter
,
want_jolt
=>
$driver
->{jolt},
want_concurrent
=>
$driver
->{concurrent_tx} // 1,
active_tx
=> {},
},
$class
;
return
$self
;
}
sub
_server {
my
(
$self
) =
@_
;
my
(
$neo4j_version
,
$tx_endpoint
);
my
@discovery_queue
= (
$DISCOVERY_ENDPOINT
);
while
(
@discovery_queue
) {
my
$tx
= {
transaction_endpoint
=>
shift
@discovery_queue
};
my
$service
=
$self
->_request(
$tx
,
'GET'
)->_json;
$neo4j_version
=
$service
->{neo4j_version};
$tx_endpoint
=
$service
->{transaction};
last
if
$neo4j_version
&&
$tx_endpoint
;
if
(
$service
->{data}) {
push
@discovery_queue
, URI->new(
$service
->{data} )->path;
}
}
croak
"Neo4j server not found (ServerInfo discovery failed)"
unless
$neo4j_version
;
my
$date
=
$self
->{http_agent}->date_header;
$date
=~ s/ GMT$/ +0000/;
$date
=
$date
? Time::Piece->strptime(
$date
,
$RFC5322_DATE
) : Time::Piece->new;
$self
->{server_info} = Neo4j::Driver::ServerInfo->new({
uri
=>
$self
->{http_agent}->uri,
version
=>
"Neo4j/$neo4j_version"
,
time_diff
=> Time::Piece->new -
$date
,
tx_endpoint
=>
$tx_endpoint
,
});
return
$self
->{server_info};
}
sub
_set_database {
my
(
$self
,
$database
) =
@_
;
my
$tx_endpoint
=
$self
->{server_info}->{tx_endpoint};
$self
->{endpoints} = {
new_transaction
=>
"$tx_endpoint"
,
new_commit
=>
"$tx_endpoint/$COMMIT_ENDPOINT"
,
}
if
$tx_endpoint
;
return
unless
defined
$database
;
$database
= URI::Escape::uri_escape_utf8
$database
;
$self
->{endpoints}->{new_transaction} =~ s/\{databaseName}/
$database
/;
$self
->{endpoints}->{new_commit} =~ s/\{databaseName}/
$database
/;
}
sub
_run {
my
(
$self
,
$tx
,
@statements
) =
@_
;
if
( !
$self
->{want_concurrent} ) {
my
$is_concurrent
= %{
$self
->{active_tx}} && !
defined
$tx
->{commit_endpoint};
$is_concurrent
||=
keys
%{
$self
->{active_tx}} > 1;
$is_concurrent
and carp
"Concurrent transactions for HTTP are disabled; use multiple sessions or enable the concurrent_tx config option (this warning may become fatal in a future Neo4j::Driver version)"
;
}
my
$json
= {
statements
=> \
@statements
};
return
$self
->_request(
$tx
,
'POST'
,
$json
)->_results;
}
sub
_accept_for {
my
(
$self
,
$method
) =
@_
;
$self
->{want_jolt} =
'v1'
if
!
defined
$self
->{want_jolt}
&&
$self
->{server_info} &&
$self
->{server_info}->{version} =~ m{^Neo4j/4\.[234]\.};
my
@modules
=
@RESULT_MODULES
;
unshift
@modules
,
$self
->{http_agent}->result_handlers
if
$self
->{http_agent}->can(
'result_handlers'
);
my
@accept
=
map
{
$_
->_accept_header(
$self
->{want_jolt},
$method
) }
@modules
;
return
$self
->{accept_for}->{
$method
} =
join
', '
,
@accept
;
}
sub
_result_module_for {
my
(
$self
,
$content_type
) =
@_
;
my
@modules
=
@RESULT_MODULES
;
unshift
@modules
,
$self
->{http_agent}->result_handlers
if
$self
->{http_agent}->can(
'result_handlers'
);
foreach
my
$module
(
@modules
) {
if
(
$module
->_acceptable(
$content_type
)) {
return
$self
->{result_module_for}->{
$content_type
} =
$module
;
}
}
return
$RESULT_FALLBACK
;
}
sub
_request {
my
(
$self
,
$tx
,
$method
,
$json
) =
@_
;
if
(!
defined
$tx
->{transaction_endpoint}) {
$tx
->{transaction_endpoint} = URI->new(
$self
->{endpoints}->{new_transaction} )->path;
}
my
$tx_endpoint
=
"$tx->{transaction_endpoint}"
;
my
$accept
=
$self
->{accept_for}->{
$method
}
//
$self
->_accept_for(
$method
);
$self
->{http_agent}->request(
$method
,
$tx_endpoint
,
$json
,
$accept
);
my
$header
=
$self
->{http_agent}->http_header;
$tx
->{closed} =
$header
->{success};
my
$result_module
=
$self
->{result_module_for}->{
$header
->{content_type} }
//
$self
->_result_module_for(
$header
->{content_type} );
my
$result
=
$result_module
->new({
http_agent
=>
$self
->{http_agent},
http_method
=>
$method
,
http_path
=>
$tx_endpoint
,
http_header
=>
$header
,
die_on_error
=>
$self
->{die_on_error},
cypher_types
=>
$self
->{cypher_types},
server_info
=>
$self
->{server_info},
statements
=>
$json
?
$json
->{statements} : [],
});
$self
->_parse_tx_status(
$tx
,
$header
,
$result
->_info);
return
$result
;
}
sub
_parse_tx_status {
my
(
$self
,
$tx
,
$header
,
$info
) =
@_
;
$tx
->{unused} = 0;
$tx
->{closed} = !
$info
->{commit} || !
$info
->{transaction};
if
(
$tx
->{closed} ) {
my
$old_endpoint
=
$tx
->{transaction_endpoint};
$old_endpoint
=~ s|/
$COMMIT_ENDPOINT
$||;
delete
$self
->{active_tx}->{
$old_endpoint
};
return
;
}
if
(
$header
->{location} &&
$header
->{status} eq
'201'
) {
my
$new_commit
= URI->new(
$info
->{commit} )->path_query;
my
$new_endpoint
= URI->new(
$header
->{location} )->path_query;
$tx
->{commit_endpoint} =
$new_commit
;
$tx
->{transaction_endpoint} =
$new_endpoint
;
}
if
(
my
$expires
=
$info
->{transaction}->{expires} ) {
$expires
=~ s/ GMT$/ +0000/;
$expires
= Time::Piece->strptime(
$expires
,
$RFC5322_DATE
) +
$self
->{server_info}->{time_diff};
$self
->{active_tx}->{
$tx
->{transaction_endpoint} } =
$expires
;
}
}
sub
_is_active_tx {
my
(
$self
,
$tx
) =
@_
;
my
$now
= Time::Piece->new;
foreach
my
$tx_key
(
keys
%{
$self
->{active_tx}} ) {
my
$expires
=
$self
->{active_tx}->{
$tx_key
};
delete
$self
->{active_tx}->{
$tx_key
}
if
$now
>
$expires
;
}
my
$tx_endpoint
=
$tx
->{transaction_endpoint};
$tx_endpoint
=~ s|/
$COMMIT_ENDPOINT
$||;
return
exists
$self
->{active_tx}->{
$tx_endpoint
};
}
1;