use
5.010;
our
$VERSION
=
'v1.5.1'
;
sub
destination {
my
$self
=
shift
;
return
$self
->target->name
if
$self
->target->name !~ /:/
||
$self
->target->uri->dbname;
my
$uri
=
$self
->target->uri->clone;
$uri
->password(
undef
)
if
$uri
->password;
$uri
->dbname(
$ENV
{PGDATABASE}
||
$self
->username
||
$ENV
{PGUSER}
||
$self
->sqitch->sysuser
);
return
$uri
->as_string;
}
sub
_def_user { }
sub
_def_pass { }
has
_psql
=> (
is
=>
'ro'
,
isa
=> ArrayRef,
lazy
=> 1,
default
=>
sub
{
my
$self
=
shift
;
my
$uri
=
$self
->uri;
my
@ret
= (
$self
->client );
my
%query_params
=
$uri
->query_params;
my
@conninfo
;
for
my
$spec
(
[
user
=>
$self
->username ],
[
dbname
=>
$uri
->dbname ],
[
host
=>
$uri
->host ],
[
port
=>
$uri
->_port ],
map
{ [
$_
=>
$query_params
{
$_
} ] }
sort
keys
%query_params
,
) {
next
unless
defined
$spec
->[1] &&
length
$spec
->[1];
if
(
$spec
->[1] =~ /[ "'\\]/) {
$spec
->[1] =~ s/([ "'\\])/\\$1/g;
}
push
@conninfo
,
"$spec->[0]=$spec->[1]"
;
}
push
@ret
=>
'--dbname'
,
join
' '
,
@conninfo
if
@conninfo
;
if
(
my
%vars
=
$self
->variables) {
push
@ret
=>
map
{;
'--set'
,
"$_=$vars{$_}"
}
sort
keys
%vars
;
}
push
@ret
=>
$self
->_client_opts;
return
\
@ret
;
},
);
sub
_client_opts {
my
$self
=
shift
;
return
(
'--quiet'
,
'--no-psqlrc'
,
'--no-align'
,
'--tuples-only'
,
'--set'
=>
'ON_ERROR_STOP=1'
,
'--set'
=>
'registry='
.
$self
->registry,
);
}
sub
psql { @{
shift
->_psql } }
sub
key {
'pg'
}
sub
name {
'PostgreSQL'
}
sub
driver {
'DBD::Pg 2.0'
}
sub
default_client {
'psql'
}
has
dbh
=> (
is
=>
'rw'
,
isa
=> DBH,
lazy
=> 1,
default
=>
sub
{
my
$self
=
shift
;
$self
->use_driver;
local
$ENV
{PGCLIENTENCODING} =
'UTF8'
;
DBI->
connect
(
$self
->_dsn,
$self
->username,
$self
->password, {
PrintError
=> 0,
RaiseError
=> 0,
AutoCommit
=> 1,
pg_enable_utf8
=> 1,
pg_server_prepare
=> 1,
HandleError
=>
$self
->error_handler,
Callbacks
=> {
connected
=>
sub
{
my
$dbh
=
shift
;
$dbh
->
do
(
'SET client_min_messages = WARNING'
);
$dbh
->
do
(
'SET search_path = ?'
,
undef
,
$self
->registry
) or
$self
->_handle_no_registry(
$dbh
);
my
$v
=
$dbh
->selectcol_arrayref(
q{SELECT split_part(version(), ' ', 2)}
)->[0] //
''
;
$dbh
->{private_sqitch_info} = {
provider
=>
$v
=~ /-YB-/ ?
'yugabyte'
:
'postgres'
,
};
return
;
},
},
});
}
);
sub
_log_tags_param {
[
map
{
$_
->format_name }
$_
[1]->tags ];
}
sub
_log_requires_param {
[
map
{
$_
->as_string }
$_
[1]->requires ];
}
sub
_log_conflicts_param {
[
map
{
$_
->as_string }
$_
[1]->conflicts ];
}
sub
_ts2char_format {
q{to_char(%s AT TIME ZONE 'UTC', '"year":YYYY:"month":MM:"day":DD:"hour":HH24:"minute":MI:"second":SS:"time_zone":"UTC"')}
;
}
sub
_ts_default {
'clock_timestamp()'
}
sub
_char2ts {
$_
[1]->as_string(
format
=>
'iso'
) }
sub
_listagg_format {
my
$dbh
=
shift
->dbh;
return
q{array_remove(array_agg(%1$s ORDER BY %1$s), NULL)}
if
$dbh
->{pg_server_version} >= 90300;
return
q{ARRAY(SELECT * FROM UNNEST( array_agg(%1$s ORDER BY %1$s) ) a WHERE a IS NOT NULL)}
if
$dbh
->{pg_server_version} >= 90000;
return
q{ARRAY(SELECT * FROM UNNEST( array_agg(%s) ) a WHERE a IS NOT NULL)}
;
}
sub
_regex_op {
'~'
}
sub
_version_query {
'SELECT MAX(version)::TEXT FROM releases'
}
sub
_initialized {
my
$self
=
shift
;
return
$self
->dbh->selectcol_arrayref(
q{
SELECT EXISTS(
SELECT TRUE FROM pg_catalog.pg_tables
WHERE schemaname = ? AND tablename = ?
)
}
,
undef
,
$self
->registry,
'changes'
)->[0];
}
sub
_initialize {
my
$self
=
shift
;
hurl
engine
=> __x(
'Sqitch schema "{schema}" already exists'
,
schema
=>
$self
->registry
)
if
$self
->initialized;
$self
->_run_registry_file( file(__FILE__)->dir->file(
$self
->key .
'.sql'
) );
$self
->_register_release;
}
sub
_psql_major_version {
my
$self
=
shift
;
my
$psql_version
=
$self
->sqitch->probe(
$self
->client,
'--version'
);
my
@parts
=
split
/\s+/,
$psql_version
;
my
(
$maj
) =
$parts
[-1] =~ /^(\d+)/;
return
$maj
|| 0;
}
sub
_run_registry_file {
my
(
$self
,
$file
) =
@_
;
my
$schema
=
$self
->registry;
my
$version
=
$self
->_probe(
'-c'
,
'SHOW server_version_num'
);
my
$psql_maj
=
$self
->_psql_major_version;
my
$opts
=
$self
->_probe(
'-c'
,
q{
SELECT count(*)
FROM pg_catalog.pg_proc p
JOIN pg_catalog.pg_namespace n ON p.pronamespace = n.oid
WHERE nspname = 'pg_catalog'
AND proname = 'pgxc_version';
}
) ?
' DISTRIBUTE BY REPLICATION'
:
''
;
if
(
$version
< 90300 ||
$psql_maj
< 9) {
my
$sql
=
scalar
$file
->slurp;
$sql
=~ s/SCHEMA IF NOT EXISTS/SCHEMA/
if
$version
< 90300;
if
(
$psql_maj
< 9) {
(
$schema
) =
$self
->dbh->selectrow_array(
'SELECT quote_ident(?)'
,
undef
,
$schema
);
$sql
=~ s{:
"registry"
}{
$schema
}g;
}
my
$fh
= File::Temp->new;
print
$fh
$sql
;
close
$fh
;
$self
->_run(
'--file'
=>
$fh
->filename,
'--set'
=>
"tableopts=$opts"
,
);
}
else
{
$self
->_run(
'--file'
=>
$file
,
'--set'
=>
"registry=$schema"
,
'--set'
=>
"tableopts=$opts"
,
);
}
$self
->dbh->
do
(
'SET search_path = ?'
,
undef
,
$schema
);
}
sub
_provider {
shift
->dbh->{private_sqitch_info}{provider}
}
sub
begin_work {
my
$self
=
shift
;
my
$dbh
=
$self
->dbh;
$dbh
->begin_work;
$dbh
->
do
(
'LOCK TABLE changes IN EXCLUSIVE MODE'
)
if
$self
->_provider eq
'postgres'
;
return
$self
;
}
sub
try_lock {
my
$self
=
shift
;
return
1
if
$self
->_provider ne
'postgres'
;
$self
->dbh->selectcol_arrayref(
'SELECT pg_try_advisory_lock(75474063)'
)->[0]
}
sub
wait_lock {
my
$self
=
shift
;
return
1
if
$self
->_provider ne
'postgres'
;
my
$dbh
=
$self
->dbh;
$dbh
->
do
(
'SELECT pg_advisory_lock(75474063)'
,
{
pg_async
=> DBD::Pg::PG_ASYNC() },
);
return
1
if
$self
->_timeout(
sub
{
$dbh
->pg_ready &&
$dbh
->pg_result });
$dbh
->pg_cancel;
return
0;
}
sub
run_file {
my
(
$self
,
$file
) =
@_
;
$self
->_run(
'--file'
=>
$file
);
}
sub
run_verify {
my
$self
=
shift
;
my
$meth
=
$self
->can(
$self
->sqitch->verbosity > 1 ?
'_run'
:
'_capture'
);
return
$self
->
$meth
(
'--file'
=>
@_
);
}
sub
run_handle {
my
(
$self
,
$fh
) =
@_
;
$self
->_spool(
$fh
);
}
sub
run_upgrade {
shift
->_run_registry_file(
@_
);
}
sub
log_new_tags {
my
(
$self
,
$change
) =
@_
;
my
@tags
=
$change
->tags or
return
$self
;
my
$sqitch
=
$self
->sqitch;
my
(
$id
,
$name
,
$proj
,
$user
,
$email
) = (
$change
->id,
$change
->format_name,
$change
->project,
$sqitch
->user_name,
$sqitch
->user_email
);
$self
->dbh->
do
(
q{
INSERT INTO tags (
tag_id
, tag
, project
, change_id
, note
, committer_name
, committer_email
, planned_at
, planner_name
, planner_email
)
SELECT tid, tg, proj, chid, n, name, email, at, pname, pemail FROM ( VALUES
}
.
join
(
",\n "
, (
q{(?::text, ?::text, ?::text, ?::text, ?::text, ?::text, ?::text, ?::timestamptz, ?::text, ?::text)}
) x
@tags
)
.
q{
) i(tid, tg, proj, chid, n, name, email, at, pname, pemail)
LEFT JOIN tags ON i.tid = tags.tag_id
WHERE tags.tag_id IS NULL
}
,
undef
,
map
{ (
$_
->id,
$_
->format_name,
$proj
,
$id
,
$_
->note,
$user
,
$email
,
$_
->timestamp->as_string(
format
=>
'iso'
),
$_
->planner_name,
$_
->planner_email,
) }
@tags
);
return
$self
;
}
sub
log_revert_change {
my
(
$self
,
$change
) =
@_
;
my
$dbh
=
$self
->dbh;
my
$del_tags
=
$dbh
->selectcol_arrayref(
'DELETE FROM tags WHERE change_id = ? RETURNING tag'
,
undef
,
$change
->id
) || [];
my
(
$req
,
$conf
) =
$dbh
->selectrow_array(
q{
SELECT ARRAY(
SELECT dependency
FROM dependencies
WHERE change_id = $1
AND type = 'require'
), ARRAY(
SELECT dependency
FROM dependencies
WHERE change_id = $1
AND type = 'conflict'
)
}
,
undef
,
$change
->id);
$dbh
->
do
(
'DELETE FROM changes where change_id = ?'
,
undef
,
$change
->id,
);
return
$self
->_log_event(
revert
=>
$change
,
$del_tags
,
$req
,
$conf
);
}
sub
_dt($) {
return
App::Sqitch::DateTime->new(
split
/:/ =>
shift
);
}
sub
_no_table_error {
return
0
unless
$DBI::state
&&
$DBI::state
eq
'42P01'
;
my
$dbh
=
shift
->dbh;
return
1
unless
$dbh
->{pg_server_version} >= 90000;
my
@msg
=
map
{
$dbh
->quote(
$_
) } (
__
'Sqitch registry not initialized'
,
__
'Because the "changes" table does not exist, Sqitch will now initialize the database to create its registry tables.'
,
);
$dbh
->
do
(
sprintf
q{DO $$
BEGIN
SET LOCAL client_min_messages = 'ERROR';
RAISE WARNING USING ERRCODE = 'undefined_table', MESSAGE = %s, DETAIL = %s;
END;
$$}
,
@msg
);
return
1;
}
sub
_no_column_error {
return
$DBI::state
&&
$DBI::state
eq
'42703'
;
}
sub
_unique_error {
return
$DBI::state
&&
$DBI::state
eq
'23505'
;
}
sub
_in_expr {
my
(
$self
,
$vals
) =
@_
;
return
'= ANY(?)'
,
$vals
;
}
sub
_run {
my
$self
=
shift
;
my
$sqitch
=
$self
->sqitch;
my
$pass
=
$self
->password or
return
$sqitch
->run(
$self
->psql,
@_
);
local
$ENV
{PGPASSWORD} =
$pass
;
return
$sqitch
->run(
$self
->psql,
@_
);
}
sub
_capture {
my
$self
=
shift
;
my
$sqitch
=
$self
->sqitch;
my
$pass
=
$self
->password or
return
$sqitch
->capture(
$self
->psql,
@_
);
local
$ENV
{PGPASSWORD} =
$pass
;
return
$sqitch
->capture(
$self
->psql,
@_
);
}
sub
_probe {
my
$self
=
shift
;
my
$sqitch
=
$self
->sqitch;
my
$pass
=
$self
->password or
return
$sqitch
->probe(
$self
->psql,
@_
);
local
$ENV
{PGPASSWORD} =
$pass
;
return
$sqitch
->probe(
$self
->psql,
@_
);
}
sub
_spool {
my
$self
=
shift
;
my
$fh
=
shift
;
my
$sqitch
=
$self
->sqitch;
my
$pass
=
$self
->password or
return
$sqitch
->spool(
$fh
,
$self
->psql,
@_
);
local
$ENV
{PGPASSWORD} =
$pass
;
return
$sqitch
->spool(
$fh
,
$self
->psql,
@_
);
}
1;