Hide Show 26 lines of Pod
our
$VERSION
=
'0.002_01'
;
my
@pos_names
= Attean::API::Quad->variables;
Hide Show 16 lines of Pod
has
dbh
=> (
is
=>
'ro'
,
isa
=> InstanceOf[
'DBI::db'
],
required
=> 1);
has
_i2t_cache
=> (
is
=>
'ro'
,
default
=>
sub
{ Cache::LRU->new(
size
=> 256 ) });
has
_t2i_cache
=> (
is
=>
'ro'
,
default
=>
sub
{ Cache::LRU->new(
size
=> 256 ) });
Hide Show 6 lines of Pod
sub
init {
my
$self
=
shift
;
my
$dbh
=
$self
->dbh;
my
$batch
= DBIx::MultiStatementDo->new(
dbh
=>
$dbh
);
my
$file
=
$self
->create_schema_file or
die
'No schema files available for store initialization'
;
my
$sql
= read_file(
$file
);
$batch
->
do
(
$sql
);
}
Hide Show 6 lines of Pod
sub
temporary_store {
my
$class
=
shift
;
my
$dbh
= DBI->
connect
(
'dbi:SQLite:dbname=:memory:'
,
''
,
''
);
my
$store
=
$class
->new(
dbh
=>
$dbh
);
$store
->init();
return
$store
;
}
sub
_last_insert_id {
my
$self
=
shift
;
my
$table
=
shift
;
my
$dbh
=
$self
->dbh;
return
$dbh
->last_insert_id(
undef
,
undef
,
$table
,
undef
);
}
sub
_get_term {
my
$self
=
shift
;
my
$id
=
shift
;
if
(
my
$term
=
$self
->_i2t_cache->get(
$id
)) {
return
$term
;
}
my
$sth
=
$self
->dbh->prepare(
'SELECT term.type, term.value, dtterm.value AS datatype, term.language FROM term LEFT JOIN term dtterm ON (term.datatype_id = dtterm.term_id) WHERE term.term_id = ?'
);
$sth
->execute(
$id
);
my
$row
=
$sth
->fetchrow_hashref;
my
$type
=
$row
->{type};
my
$term
;
my
$value
=
$row
->{value};
my
$datatype
=
$row
->{datatype};
my
$lang
=
$row
->{language};
if
(
$type
eq
'iri'
) {
$term
= Attean::IRI->new(
value
=>
$value
);
}
elsif
(
$type
eq
'blank'
) {
$term
= Attean::Blank->new(
value
=>
$value
);
}
elsif
(
$type
eq
'literal'
) {
my
%args
= (
value
=>
$value
,
datatype
=> Attean::IRI->new(
value
=>
$datatype
));
if
(
$lang
) {
$args
{language} =
$lang
;
}
$term
= Attean::Literal->new(
%args
);
}
if
(
$term
) {
$self
->_i2t_cache->set(
$id
=>
$term
);
return
$term
;
}
Carp::confess
"Failed to load term values for bad ID "
. Dumper(
$id
);
}
sub
_get_term_id {
my
$self
=
shift
;
my
$term
=
shift
;
if
(
my
$id
=
$self
->_t2i_cache->get(
$term
->as_string)) {
return
$id
;
}
my
$dbh
=
$self
->dbh;
my
$tid
;
if
(
$term
->does(
'Attean::API::IRI'
)) {
my
$sth
=
$dbh
->prepare(
'SELECT term_id FROM term WHERE type = ? AND value = ?'
);
my
$value
=
$term
->value;
$sth
->execute(
'iri'
,
$value
);
(
$tid
) =
$sth
->fetchrow_array;
}
elsif
(
$term
->does(
'Attean::API::Blank'
)) {
my
$sth
=
$dbh
->prepare(
'SELECT term_id FROM term WHERE type = ? AND value = ?'
);
my
$value
=
$term
->value;
$sth
->execute(
'blank'
,
$value
);
(
$tid
) =
$sth
->fetchrow_array;
}
elsif
(
$term
->does(
'Attean::API::Literal'
)) {
my
$dtid
=
$self
->_get_or_create_term_id(
$term
->datatype);
my
$sth
=
$dbh
->prepare(
'SELECT term_id FROM term WHERE type = ? AND value = ? AND datatype_id = ? AND language = ?'
);
my
$value
=
$term
->value;
my
$lang
=
$term
->language;
$sth
->execute(
'literal'
,
$value
,
$dtid
,
$lang
);
(
$tid
) =
$sth
->fetchrow_array;
}
if
(
defined
(
$tid
)) {
$self
->_t2i_cache->set(
$term
->
as_string
=>
$tid
);
return
$tid
;
}
return
;
}
sub
_get_or_create_term_id {
my
$self
=
shift
;
my
$term
=
shift
;
if
(
my
$id
=
$self
->_t2i_cache->get(
$term
->as_string)) {
return
$id
;
}
my
$dbh
=
$self
->dbh;
my
$tid
;
my
$insert_term_sth
=
$dbh
->prepare(
'INSERT INTO term (type, value, datatype_id, language) VALUES (?, ?, ?, ?)'
);
if
(
$term
->does(
'Attean::API::IRI'
)) {
my
$sth
=
$dbh
->prepare(
'SELECT term_id FROM term WHERE type = ? AND value = ?'
);
my
$value
=
$term
->value;
$sth
->execute(
'iri'
,
$value
);
(
$tid
) =
$sth
->fetchrow_array;
unless
(
defined
(
$tid
)) {
$insert_term_sth
->execute(
'iri'
,
$value
,
undef
,
undef
);
$tid
=
$self
->_last_insert_id(
'term'
);
}
}
elsif
(
$term
->does(
'Attean::API::Blank'
)) {
my
$sth
=
$dbh
->prepare(
'SELECT term_id FROM term WHERE type = ? AND value = ?'
);
my
$value
=
$term
->value;
$sth
->execute(
'blank'
,
$value
);
(
$tid
) =
$sth
->fetchrow_array;
unless
(
defined
(
$tid
)) {
$insert_term_sth
->execute(
'blank'
,
$value
,
undef
,
undef
);
$tid
=
$self
->_last_insert_id(
'term'
);
}
}
elsif
(
$term
->does(
'Attean::API::Literal'
)) {
my
$dtid
=
$self
->_get_or_create_term_id(
$term
->datatype);
my
$sql
=
'SELECT term_id FROM term WHERE type = ? AND value = ? AND datatype_id = ?'
;
my
$value
=
$term
->value;
my
@bind
= (
'literal'
,
$value
,
$dtid
);
my
$lang
=
$term
->language;
if
(
$lang
) {
$sql
.=
' AND language = ?'
;
push
(
@bind
,
$lang
);
}
my
$sth
=
$dbh
->prepare(
$sql
);
$sth
->execute(
@bind
);
(
$tid
) =
$sth
->fetchrow_array;
unless
(
defined
(
$tid
)) {
$insert_term_sth
->execute(
'literal'
,
$value
,
$dtid
,
$lang
);
$tid
=
$self
->_last_insert_id(
'term'
);
}
}
else
{
die
"Failed to get ID for term: "
.
$term
->as_string;
}
if
(
defined
(
$tid
)) {
$self
->_t2i_cache->set(
$term
->
as_string
=>
$tid
);
return
$tid
;
}
die
;
}
Hide Show 9 lines of Pod
sub
get_quads {
my
$self
=
shift
;
my
@nodes
=
map
{
ref
(
$_
) eq
'ARRAY'
?
$_
: [
$_
] }
@_
;
my
@where
;
my
@bind
;
foreach
my
$i
(0 .. 3) {
my
$name
=
$pos_names
[
$i
];
my
$terms
=
$nodes
[
$i
];
if
(
defined
(
$terms
)) {
unless
(
scalar
(
@$terms
) == 1 and not
defined
(
$terms
->[0])) {
unless
(any {
$_
->does(
'Attean::API::Variable'
) }
@$terms
) {
my
@ids
=
map
{
$self
->_get_term_id(
$_
) }
@$terms
;
unless
(
scalar
(
@ids
)) {
return
Attean::ListIterator->new(
values
=> [],
item_type
=>
'Attean::API::Quad'
);
}
push
(
@where
,
"$name IN ("
.
join
(
', '
, (
'?'
) x
scalar
(
@ids
)) .
")"
);
push
(
@bind
,
@ids
);
}
}
}
}
my
$sql
=
'SELECT subject, predicate, object, graph FROM quad'
;
if
(
scalar
(
@where
)) {
$sql
.=
' WHERE '
.
join
(
' AND '
,
@where
);
}
my
$sth
=
$self
->dbh->prepare(
$sql
);
$sth
->execute(
@bind
);
my
$ok
= 1;
my
$sub
=
sub
{
return
unless
(
$ok
);
if
(
my
$row
=
$sth
->fetchrow_arrayref) {
my
@terms
=
map
{
$self
->_get_term(
$_
) }
@$row
;
my
$quad
= Attean::Quad->new(zip
@pos_names
,
@terms
);
return
$quad
;
}
$ok
= 0;
return
;
};
my
$iter
= Attean::CodeIterator->new(
generator
=>
$sub
,
item_type
=>
'Attean::API::Quad'
);
return
$iter
;
}
Hide Show 9 lines of Pod
sub
count_quads {
my
$self
=
shift
;
my
@nodes
=
map
{
ref
(
$_
) eq
'ARRAY'
?
$_
: [
$_
] }
@_
;
my
@where
;
my
@bind
;
foreach
my
$i
(0 .. 3) {
my
$name
=
$pos_names
[
$i
];
my
$terms
=
$nodes
[
$i
];
if
(
defined
(
$terms
)) {
unless
(
scalar
(
@$terms
) == 1 and not
defined
(
$terms
->[0])) {
unless
(any {
$_
->does(
'Attean::API::Variable'
) }
@$terms
) {
my
@ids
=
map
{
$self
->_get_term_id(
$_
) }
@$terms
;
return
0
unless
scalar
(
@ids
);
push
(
@where
,
"$name IN ("
.
join
(
', '
, (
'?'
) x
scalar
(
@ids
)) .
")"
);
push
(
@bind
,
@ids
);
}
}
}
}
my
$sql
=
'SELECT COUNT(*) FROM quad'
;
if
(
scalar
(
@where
)) {
$sql
.=
' WHERE '
.
join
(
' AND '
,
@where
);
}
my
$sth
=
$self
->dbh->prepare(
$sql
);
$sth
->execute(
@bind
);
my
(
$count
) =
$sth
->fetchrow_array;
return
$count
;
}
Hide Show 7 lines of Pod
sub
get_graphs {
my
$self
=
shift
;
my
$sth
=
$self
->dbh->prepare(
'SELECT DISTINCT value FROM quad JOIN term ON (quad.graph = term.term_id)'
);
$sth
->execute;
my
$sub
=
sub
{
my
$row
=
$sth
->fetchrow_arrayref;
return
unless
ref
(
$row
);
my
(
$value
) =
@$row
;
return
Attean::IRI->new(
value
=>
$value
);
};
return
Attean::CodeIterator->new(
generator
=>
$sub
,
item_type
=>
'Attean::API::Term'
);
}
Hide Show 6 lines of Pod
sub
add_quad {
my
$self
=
shift
;
my
$st
=
shift
;
my
@ids
=
map
{
$self
->_get_or_create_term_id(
$_
) }
$st
->
values
;
if
(any { not
defined
(
$_
) }
@ids
) {
return
;
}
my
$type
=
$self
->database_type;
my
@bind
=
@ids
;
my
$sql
=
'INSERT INTO quad (subject, predicate, object, graph) VALUES (?, ?, ?, ?)'
;
if
(
$type
eq
'sqlite'
) {
$sql
=
'INSERT OR IGNORE INTO quad (subject, predicate, object, graph) VALUES (?, ?, ?, ?)'
;
}
elsif
(
$type
eq
'mysql'
) {
$sql
=
'INSERT IGNORE INTO quad (subject, predicate, object, graph) VALUES (?, ?, ?, ?)'
;
}
elsif
(
$type
eq
'postgresql'
) {
$sql
=
'INSERT INTO quad (subject, predicate, object, graph) SELECT ?, ?, ?, ? WHERE NOT EXISTS (SELECT 1 FROM quad WHERE subject = ? AND predicate = ? AND object = ? AND graph = ?)'
;
push
(
@bind
,
@ids
);
}
my
$sth
=
$self
->dbh->prepare(
$sql
);
$sth
->execute(
@bind
);
return
;
}
Hide Show 6 lines of Pod
sub
remove_quad {
my
$self
=
shift
;
my
$st
=
shift
;
my
@ids
=
map
{
$self
->_get_term_id(
$_
) }
$st
->
values
;
unless
(
scalar
(
@ids
) == 4) {
return
;
}
unless
(all {
defined
(
$_
) }
@ids
) {
return
;
}
my
$sth
=
$self
->dbh->prepare(
'DELETE FROM quad WHERE subject = ? AND predicate = ? AND object = ? AND graph = ?'
);
$sth
->execute(
@ids
);
return
;
}
Hide Show 6 lines of Pod
sub
create_graph {
}
Hide Show 6 lines of Pod
sub
drop_graph {
my
$self
=
shift
;
return
$self
->clear_graph(
@_
);
}
Hide Show 6 lines of Pod
sub
clear_graph {
my
$self
=
shift
;
my
$graph
=
shift
;
my
$gid
=
$self
->_get_term_id(
$graph
);
return
unless
defined
(
$gid
);
my
$sth
=
$self
->dbh->prepare(
'DELETE FROM quad WHERE graph = ?'
);
$sth
->execute(
$gid
);
return
;
}
Hide Show 6 lines of Pod
sub
begin_transaction {
my
$self
=
shift
;
$self
->dbh->begin_work;
}
Hide Show 6 lines of Pod
sub
abort_transaction {
my
$self
=
shift
;
$self
->dbh->rollback;
}
Hide Show 6 lines of Pod
sub
end_transaction {
my
$self
=
shift
;
$self
->dbh->commit;
}
Hide Show 6 lines of Pod
sub
begin_bulk_updates {
my
$self
=
shift
;
$self
->dbh->begin_work;
}
Hide Show 6 lines of Pod
sub
end_bulk_updates {
my
$self
=
shift
;
$self
->dbh->commit;
}
Hide Show 6 lines of Pod
sub
database_type {
my
$self
=
shift
;
my
$dbh
=
$self
->dbh;
my
$type
=
lc
(
$dbh
->get_info(
$GetInfoType
{SQL_DBMS_NAME}));
return
$type
;
}
Hide Show 6 lines of Pod
sub
initialize_version {
my
$self
=
shift
;
my
$dbh
=
$self
->dbh;
$dbh
->
do
(
'DELETE FROM attean_version'
);
my
$sql
=
'INSERT INTO attean_version (attean_version, store_version) VALUES (?, ?);'
;
$dbh
->
do
(
$sql
,
undef
,
$Attean::VERSION
,
$AtteanX::Store::DBI::VERSION
);
}
Hide Show 7 lines of Pod
sub
create_schema_file {
my
$self
=
shift
;
my
$type
=
$self
->database_type;
my
$dir
=
$ENV
{ATTEAN_SHAREDIR} ||
eval
{ dist_dir(
'AtteanX-Store-DBI'
) } ||
'share'
;
my
$file
= File::Spec->catfile(
$dir
,
'database-schema'
,
sprintf
(
'%s-create.sql'
,
$type
));
if
(-r
$file
) {
return
$file
;
}
return
;
}
Hide Show 7 lines of Pod
sub
drop_schema_file {
my
$self
=
shift
;
my
$type
=
$self
->database_type;
my
$dir
=
$ENV
{ATTEAN_SHAREDIR} ||
eval
{ dist_dir(
'AtteanX-Store-DBI'
) } ||
'share'
;
my
$file
= File::Spec->catfile(
$dir
,
'database-schema'
,
sprintf
(
'%s-drop.sql'
,
$type
));
if
(-r
$file
) {
return
$file
;
}
return
;
}
Hide Show 7 lines of Pod
sub
available_database_types {
my
$self
=
shift
;
my
$dir
=
$ENV
{ATTEAN_SHAREDIR} ||
eval
{ dist_dir(
'AtteanX-Store-DBI'
) } ||
'share'
;
my
$pat
= File::Spec->catfile(
$dir
,
'database-schema'
,
'*-create.sql'
);
my
@files
=
glob
(
$pat
);
my
@types
=
map
{ /(\w+)-create.sql/ }
@files
;
return
@types
;
}
Hide Show 21 lines of Pod
sub
dbi_connect_args {
my
$self
=
shift
;
my
$type
= blessed(
$self
) ?
$self
->database_type :
shift
;
my
%args
=
@_
;
my
$database
=
$args
{database};
my
$user
=
$args
{user};
my
$password
=
$args
{password};
my
$host
=
$args
{host};
my
$port
=
$args
{port};
my
$dsn
;
my
%connect_args
;
$connect_args
{RaiseError} = 1;
if
(
$type
eq
'mysql'
) {
$dsn
=
"DBI:mysql:database=${database}"
;
if
(
defined
(
$host
)) {
$dsn
.=
";host=$host"
;
}
if
(
defined
(
$port
)) {
$dsn
.=
";port=$port"
;
}
$connect_args
{mysql_enable_utf8} = 1;
}
elsif
(
$type
eq
'postgresql'
) {
$dsn
=
"DBI:Pg:dbname=${database}"
;
if
(
defined
(
$host
)) {
$dsn
.=
";host=$host"
;
}
if
(
defined
(
$port
)) {
$dsn
.=
";port=$port"
;
}
}
elsif
(
$type
eq
'sqlite'
) {
$dsn
=
"DBI:SQLite:dbname=${database}"
;
$connect_args
{sqlite_unicode} = 1;
}
return
(
$dsn
,
$user
,
$password
, \
%connect_args
);
}
Hide Show 7 lines of Pod
sub
plans_for_algebra {
my
$self
=
shift
;
my
$algebra
=
shift
;
my
$model
=
shift
;
my
$active_graphs
=
shift
;
my
$default_graphs
=
shift
;
return
unless
(
$algebra
);
my
%args
=
@_
;
my
$counter
=
$args
{dbi_filter_counter}++;
if
(
$algebra
->isa(
'Attean::Algebra::Filter'
)) {
my
$e
=
$algebra
->expression;
if
(
$e
->isa(
'Attean::FunctionExpression'
)) {
if
(
$e
->operator =~ m/IS(IRI|LITERAL|BLANK)/i) {
my
$type
=
lc
($1);
my
(
$operand
) = @{
$e
->children };
if
(
$operand
->isa(
'Attean::ValueExpression'
) and
$operand
->value->does(
'Attean::API::Variable'
)) {
my
$var
=
$operand
->value;
if
(
my
(
$plan
) =
$self
->plans_for_algebra(
$algebra
->child,
$model
,
$active_graphs
,
$default_graphs
,
%args
)) {
if
(
$plan
->isa(
'AtteanX::Store::DBI::Plan'
)) {
if
(
exists
$plan
->variables->{
$var
->value }) {
my
(
$table
,
$col
) = @{
$plan
->variables->{
$var
->value } };
my
$ref
=
join
(
'.'
,
map
{
$self
->dbh->quote_identifier(
$_
) } (
$table
,
$col
));
my
$typecol
=
$self
->dbh->quote_identifier(
'type'
);
push
(@{
$plan
->where },
"$ref IN (SELECT term_id FROM term WHERE ${typecol} = ?)"
);
push
(@{
$plan
->bindings },
$type
);
return
$plan
;
}
}
}
}
}
elsif
(
$e
->operator eq
'STRSTARTS'
or
$e
->operator eq
'CONTAINS'
) {
my
(
$varexpr
,
$pat
) = @{
$e
->children };
if
(
$varexpr
->isa(
'Attean::ValueExpression'
) and
$varexpr
->value->does(
'Attean::API::Variable'
) and
$pat
->isa(
'Attean::ValueExpression'
) and
$pat
->value->does(
'Attean::API::Literal'
)) {
if
(
my
(
$plan
) =
$self
->plans_for_algebra(
$algebra
->child,
$model
,
$active_graphs
,
$default_graphs
,
%args
)) {
if
(
$plan
->isa(
'AtteanX::Store::DBI::Plan'
)) {
my
$var
=
$varexpr
->value;
my
$varname
=
$var
->value;
if
(
exists
$plan
->variables->{
$var
->value }) {
my
(
$table
,
$col
) = @{
$plan
->variables->{
$var
->value } };
my
$literal
=
$pat
->value;
my
$ref
=
join
(
'.'
,
map
{
$self
->dbh->quote_identifier(
$_
) } (
$table
,
$col
));
my
$typecol
=
$self
->dbh->quote_identifier(
'type'
);
my
$termtable
=
"tf$counter"
;
push
(@{
$plan
->tables }, [
'term'
,
$termtable
]);
my
$db
=
$self
->database_type;
return
unless
(
$db
eq
'mysql'
or
$db
eq
'postgresql'
or
$db
eq
'sqlite'
);
push
(@{
$plan
->where },
"$ref = $termtable.term_id"
);
push
(@{
$plan
->where },
"$termtable.$typecol = ?"
);
my
$op
= (
$e
->operator eq
'STRSTARTS'
) ?
'='
:
'>='
;
if
(
$db
eq
'mysql'
) {
push
(@{
$plan
->where },
"LOCATE(?, $termtable.value) ${op} ?"
);
push
(@{
$plan
->bindings },
'literal'
);
push
(@{
$plan
->bindings },
$literal
->value);
push
(@{
$plan
->bindings }, 1);
}
elsif
(
$db
eq
'postgresql'
) {
push
(@{
$plan
->where },
"STRPOS($termtable.value, ?) ${op} ?"
);
push
(@{
$plan
->bindings },
'literal'
);
push
(@{
$plan
->bindings },
$literal
->value);
push
(@{
$plan
->bindings }, 1);
}
elsif
(
$db
eq
'sqlite'
) {
push
(@{
$plan
->where },
"INSTR($termtable.value, ?) ${op} 1"
);
push
(@{
$plan
->bindings },
'literal'
);
push
(@{
$plan
->bindings },
$literal
->value);
}
if
(
my
$lang
=
$literal
->language) {
push
(@{
$plan
->where },
"$termtable.language = ?"
);
push
(@{
$plan
->bindings },
$lang
);
}
else
{
my
$xid
=
$self
->_get_term_id(
$xs
);
my
$lid
=
$self
->_get_term_id(
$la
);
push
(@{
$plan
->where },
"$termtable.datatype_id IN (?, ?)"
);
push
(@{
$plan
->bindings },
$xid
,
$lid
);
}
return
$plan
;
}
}
}
}
}
}
}
elsif
(
$algebra
->isa(
'Attean::Algebra::BGP'
) and
scalar
(@{
$algebra
->triples }) > 0) {
my
@vars
=
$algebra
->in_scope_variables;
my
@triples
= @{
$algebra
->triples };
my
@select
;
my
@where_joins
;
my
%seen_vars
;
my
%source_table_for_var
;
my
%blanks
;
my
$tcounter
= 0;
my
$bcounter
= 0;
my
@tables
;
my
%rename_mapping
;
my
$rename_proj
=
sub
{
my
$name
=
shift
;
if
(
$name
=~ /[-._]|\W/) {
my
$old
=
$name
;
$name
=~ s/_/__/g;
$name
=~ s/([-.]|\W)/_d/g;
$name
=~ s/([-.]|\W)/
'_x'
.
sprintf
(
'%02x'
,
ord
($1))/e;
$rename_mapping
{
$old
} =
$name
;
}
return
$name
;
};
Carp::confess Dumper(
$active_graphs
)
unless
(
ref
(
$active_graphs
) eq
'ARRAY'
);
my
@graph_ids
=
map
{
$self
->_get_term_id(
$_
) } @{
$active_graphs
};
if
(any { not
defined
(
$_
) }
@graph_ids
) {
return
;
}
my
@bind
;
my
$graph
= Attean::Variable->new(
value
=>
'___g'
);
$seen_vars
{
$graph
->value }++;
my
$graph_values
=
sprintf
(
'(%s)'
,
join
(
', '
, (
'?'
) x
scalar
(
@graph_ids
)));
push
(
@bind
,
@graph_ids
);
my
@where
= (
"t0.graph IN $graph_values"
);
foreach
my
$t
(
@triples
) {
my
$table
=
't'
.
$tcounter
++;
push
(
@tables
, [
'quad'
,
$table
]);
my
@vars
;
my
$q
=
$t
->as_quadpattern(
$graph
);
my
@nodes
=
$q
->
values
;
foreach
my
$i
(0 ..
$#nodes
) {
my
$node
=
$nodes
[
$i
];
my
$name
=
$pos_names
[
$i
];
if
(
$node
->does(
'Attean::API::Variable'
)) {
my
$var
=
$node
;
push
(
@vars
, [
$var
,
$name
]);
}
elsif
(
$node
->does(
'Attean::API::Blank'
)) {
my
$id
=
$node
->value;
unless
(
exists
$blanks
{
$id
}) {
my
$bname
=
sprintf
(
'.%s_%d'
,
'blank'
,
$bcounter
++);
$blanks
{
$id
} = Attean::Variable->new(
value
=>
$bname
);
}
my
$var
=
$blanks
{
$id
};
push
(
@vars
, [
$var
,
$name
]);
$seen_vars
{
$var
->value }++;
}
else
{
my
$id
=
$self
->_get_term_id(
$node
);
return
unless
defined
(
$id
);
push
(
@where
,
sprintf
(
'%s.%s = ?'
,
$table
,
$name
));
push
(
@bind
,
$id
);
}
}
foreach
my
$vdata
(
@vars
) {
my
(
$var
,
$name
) =
@$vdata
;
my
$var_name
=
$rename_proj
->(
$var
->value );
push
(
@select
, [
$table
,
$name
,
$var_name
])
unless
(
$seen_vars
{
$var
->value}++);
if
(
my
$tt
=
$source_table_for_var
{
$var
->value }) {
push
(
@where_joins
, [
'='
,
$tt
, [
$table
,
$name
]]);
}
else
{
$source_table_for_var
{
$var
->value } = [
$table
,
$name
];
}
}
}
foreach
my
$w
(
@where_joins
) {
my
(
$op
,
$a
,
$b
) =
@$w
;
my
(
$as
,
$bs
) =
map
{
sprintf
(
'%s.%s'
,
@$_
) } (
$a
,
$b
);
push
(
@where
,
join
(
' '
,
$as
,
$op
,
$bs
));
}
return
AtteanX::Store::DBI::Plan->new(
store
=>
$self
,
select
=> \
@select
,
where
=> \
@where
,
tables
=> \
@tables
,
in_scope_variables
=> [
@vars
],
rename_mapping
=> \
%rename_mapping
,
bindings
=> \
@bind
,
variables
=> \
%source_table_for_var
,
);
}
return
;
}
Hide Show 6 lines of Pod
sub
cost_for_plan {
my
$self
=
shift
;
my
$plan
=
shift
;
if
(
$plan
->isa(
'AtteanX::Store::DBI::Plan'
)) {
return
1;
}
return
;
}
}
has
store
=> (
is
=>
'ro'
,
isa
=> InstanceOf[
'AtteanX::Store::DBI'
],
required
=> 1);
has
rename_mapping
=> (
is
=>
'ro'
,
isa
=> HashRef[Str],
default
=>
sub
{ +{} });
has
variables
=> (
is
=>
'ro'
,
isa
=> HashRef,
required
=> 1);
has
bindings
=> (
is
=>
'ro'
,
isa
=> ArrayRef,
required
=> 1);
has
select
=> (
is
=>
'ro'
,
isa
=> ArrayRef,
required
=> 1);
has
where
=> (
is
=>
'ro'
,
isa
=> ArrayRef,
required
=> 1);
has
tables
=> (
is
=>
'ro'
,
isa
=> ArrayRef[ArrayRef[Str]],
required
=> 1);
sub
plan_as_string {
my
$self
=
shift
;
my
(
$sql
,
@bind
) =
$self
->sql();
return
sprintf
(
'DBI BGP { %s ← (%s) }'
,
$sql
,
join
(
', '
,
@bind
));
}
sub
sql {
my
$self
=
shift
;
my
$bind
=
shift
;
my
$store
=
$self
->store;
my
$dbh
=
$store
->dbh;
my
@bind
= @{
$self
->bindings };
my
@where
= @{
$self
->where };
if
(
$bind
) {
foreach
my
$var
(
$bind
->variables) {
my
$id
=
$store
->_get_term_id(
$bind
->value(
$var
));
return
unless
defined
(
$id
);
if
(
my
$cdata
=
$self
->variables->{
$var
}) {
my
(
$table
,
$col
) =
@$cdata
;
push
(
@where
,
sprintf
(
"%s.%s = ?"
,
$table
,
$col
));
push
(
@bind
,
$id
);
}
}
}
my
@select
=
map
{
sprintf
(
"%s.%s AS %s"
,
map
{
$dbh
->quote_identifier(
$_
) }
@$_
) } @{
$self
->
select
};
unless
(
scalar
(
@select
)) {
push
(
@select
,
'1'
);
}
my
@sql
;
push
(
@sql
,
'SELECT'
);
push
(
@sql
,
join
(
', '
,
@select
));
push
(
@sql
,
'FROM'
);
push
(
@sql
,
join
(
', '
,
map
{
join
(
' '
,
@$_
) } @{
$self
->tables }));
if
(
scalar
(
@where
)) {
push
(
@sql
,
'WHERE'
);
push
(
@sql
,
join
(
' AND '
,
map
{
"($_)"
}
@where
));
}
my
$sql
=
join
(
" "
,
@sql
);
return
(
$sql
,
@bind
);
}
sub
substitute_impl {
my
$self
=
shift
;
my
$model
=
shift
;
my
(
$sql
,
@bind
) =
$self
->sql(
@_
);
my
$store
=
$self
->store;
my
$dbh
=
$store
->dbh;
my
$vars
=
$self
->in_scope_variables;
my
$sth
=
$dbh
->prepare(
$sql
);
return
sub
{
my
$rv
=
$sth
->execute(
@bind
);
unless
(
$rv
) {
warn
'*** SQL error: '
.
$sth
->errstr;
die
;
}
my
$sub
=
sub
{
if
(
my
$row
=
$sth
->fetchrow_hashref) {
my
%bindings
;
foreach
my
$k
(
@$vars
) {
my
$key
=
$self
->rename_mapping->{
$k
} //
$k
;
my
$term
=
$store
->_get_term(
$row
->{
$key
});
$bindings
{
$k
} =
$term
;
}
my
$r
= Attean::Result->new(
bindings
=> \
%bindings
);
return
$r
;
}
else
{
}
return
;
};
return
Attean::CodeIterator->new(
generator
=>
$sub
,
item_type
=>
'Attean::API::Result'
,
variables
=>
$vars
);
};
}
}
1;
Hide Show 19 lines of Pod