#!perl

## This file is for Test::Dynmaic internal testing only
## For the most current version, see http://bucardo.org/

use strict;
use warnings;
use Data::Dumper;
use DBI;
use IO::Handle;
use Test::More;
use Time::HiRes qw/sleep gettimeofday tv_interval/;
use Test::Dynamic '1.3.1';

## Running all the tests can take quite a while
## This allows us to only run a subset while debugging
our $TEST_METHODS       = 1;
our $TEST_CONFIG        = 1;
our $TEST_PURGE         = 1;
our $TEST_PUSHDELTA     = 1;
our $TEST_MAKEDELTA     = 1;
our $TEST_COPY          = 1;
our $TEST_SWAP          = 1;
our $TEST_CUSTOM_CODE   = 1;
our $TEST_PING          = 1;

our $TEST_RANDOM_SWAP   = 0;

## Count the number of tests
my $tests = Test::Dynamic::count_tests
	(
	 {
	  filehandle => \*DATA,
	  verbose    => 1,
	  local      => [qw(bc_deeply compare_tables)]
	  }
	 );

plan tests => $tests;

my $location = 'setup';
my $testmsg  = ' ?';
my $testline = '?';
my $showline = 1;
my $showtime = 0;

## Once we reach a certain point, we may need to shutdown our test Bucardo processes
our $need_shutdown = 0;

## Used by the tt sub
my %timing;

## To avoid stepping on other instance's toes
my $PIDDIR = "/tmp/bucardo_testing_$ENV{USER}";
mkdir $PIDDIR if ! -e $PIDDIR;
my $PIDFILE = "bucardo_testing.pid";
my $TEST_INFO_FILE = "t/bucardo.test.data";
my $TEST_SCHEMA = "bucardo_schema";
my $REASONFILE = "/tmp/bucardo_testing_reason_$ENV{USER}";
$ENV{BUCARDO_SENDMAIL_FILE} = 'bucardo_test.email';
$ENV{BUCARDO_NOSENDMAIL} = 1;
if (! exists $ENV{BUCARDO_TEST_NUKE_OKAY}) {
    $ENV{BUCARDO_TEST_NUKE_OKAY} = 1;
}

# Set a semi-unique name to make killing old tests easier
my $xname = "bctest_$ENV{USER}";

my $DEBUGDIR = ".";
-e $DEBUGDIR or mkdir $DEBUGDIR;

## Maximum time to wait for bucardo_ctl to return
my $ALARM_BUCARDO_CTL = 100;
## Maximum time to wait for a kid to appear via pg_listener
my $ALARM_WAIT4KID = 10;

## How long to wait for most syncs to take effect?
my $TIMEOUT_SYNCWAIT = 20;
## How long to sleep between checks for sync being done?
my $TIMEOUT_SLEEP = 0.1;
## How long to wait for a notice to be issued?
my $TIMEOUT_NOTICE = 10;

my $DEBUG = 0;

*STDOUT->autoflush(1);
*STDERR->autoflush(1);

use vars qw($masterdbh $SQL $sth $sth2 %sth $dbh1 $dbh2 $dbh3 $result $result2 $info $count);
use vars qw($type $now $now2 $val $val2 $t $got $expected);

## Sometimes, we want to stop as soon as we see an error
my $bail_on_error = $ENV{BUCARDO_TESTBAIL} || 0;
my $total_errors = 0;

eval { require Bucardo; };
$@ and BAIL_OUT qq{Could not load the Bucardo module: $@};
pass(" Bucardo module loaded");

## Load the setup information from the test info file
-e $TEST_INFO_FILE or BAIL_OUT qq{Must have a "$TEST_INFO_FILE" file for testing};
open my $bct, "<", $TEST_INFO_FILE or BAIL_OUT qq{Could not open "$TEST_INFO_FILE": $!\n};
pass(" Opened configuration file");

our %bc;
while (<$bct>) {
	next unless /^\s*(\w\S+?):?\s+(.*?)\s*$/;
	$bc{$1} = $2; ## no critic
}
$bc{TESTPW}  ||= 'pie';
$bc{TESTPW1} ||= 'pie';
$bc{TESTPW2} ||= 'pie';
$bc{TESTPW3} ||= 'pie';
## Quick sanity check
for my $req (qw(DBNAME DBUSER TESTDB TESTBC)) {
	for my $suffix ('','1','2') {
		exists $bc{"$req$suffix"} or BAIL_OUT qq{Required test arg "$req$suffix" not found in config file};
	}
}
if (
	($bc{DBHOST} eq $bc{DBHOST1} and $bc{DBPORT} == $bc{DBPORT1} and $bc{TESTDB} eq $bc{TESTDB1})
	or
	($bc{DBHOST} eq $bc{DBHOST2} and $bc{DBPORT} == $bc{DBPORT2} and $bc{TESTDB} eq $bc{TESTDB2})
	or
	($bc{DBHOST1} eq $bc{DBHOST2} and $bc{DBPORT1} == $bc{DBPORT2} and $bc{TESTDB1} eq $bc{TESTDB2})
	) {
	BAIL_OUT qq{Test databases cannot be the same!};
}

## We use alarms a bit so we can wait a certain amount of time
local $SIG{ALRM} = sub { die "Timed out"; };

## Shut down any existing tests
shutdown_bucardo();

## Connect to the main database and set things up
$masterdbh = setup_database('master');

## Same for our test databases
$dbh1 = setup_database(1);
$dbh2 = setup_database(2);
$dbh3 = setup_database(3);

my %dbmap = (
			 $masterdbh => 'master',
			 $dbh1      => 'one',
			 $dbh2      => 'two',
			 $dbh3      => 'three',
);

## Make sure the bucardo_ctl helper is running
bucardo_ctl("--help", 5);
pass(" Helper script bucardo.test.helper appears to be running");

if (!exists $ENV{BUCARDO_KEEP_OLD_DEBUG}) {
	my $dirh;
	opendir $dirh, $DEBUGDIR;
	for my $file (grep { -f "$DEBUGDIR/$_" and $_ =~ /^log\.bucardo(?:\....\.\d+(?:~\d+~)*)*$/ } readdir($dirh)) {
		unlink "$DEBUGDIR/$file";
	}
	closedir $dirh;
}

## Create a new schema from the local file
my $schema_file = 'bucardo.schema';
-e $schema_file or BAIL_OUT qq{Cannot find the file "$schema_file"!};
open my $fh, '<', $schema_file or BAIL_OUT qq{Could not open "$schema_file": $!\n};
my $sql='';
my (%copy,%copydata);
my ($copy,$insidecopy) = (0,0);
while (<$fh>) {
	next if /^\\[^\.]/; ## Avoid psql meta-commands at top of file
	if (1==$insidecopy) {
		$copy{$copy} .= $_;
		if (/;/) {
			$insidecopy = 2;
		}
	}
	elsif (2==$insidecopy) {
		if (/^\\\./) {
			$insidecopy = 0;
		}
		else {
			push @{$copydata{$copy}}, $_;
		}
	}
	elsif (/^\s*(COPY bucardo.*)/) {
		$copy{++$copy} = $1;
		$insidecopy = 1;
	}
	else {
		$sql .= $_;
	}
}
close $fh or die qq{Could not close "$schema_file": $!\n};

$masterdbh->do("SET escape_string_warning = 'off'");

$masterdbh->{pg_server_prepare} = 0;

; ## ENV_BUCARDO_TEST_NOCREATEDB TESTCOUNT - 16

unless ($ENV{BUCARDO_TEST_NOCREATEDB}) {
	$masterdbh->do($sql);

	$count = 1;
	while ($count <= $copy) {
		$masterdbh->do($copy{$count});
		for my $copyline (@{$copydata{$count}}) {
			$masterdbh->pg_putline($copyline);
		}
		$masterdbh->pg_endcopy();
		$count++;
	}
}

$masterdbh->commit();
pass(" Bucardo master schema was created");

## Set up the config table
$masterdbh->do("UPDATE bucardo.bucardo_config SET value='$PIDDIR' WHERE setting = 'piddir'");
$masterdbh->do("UPDATE bucardo.bucardo_config SET value='$PIDFILE' WHERE setting = 'pidfile'");
$masterdbh->do("UPDATE bucardo.bucardo_config SET value='$REASONFILE' WHERE setting = 'reason_file'");
$masterdbh->commit();

$masterdbh->do("ALTER USER $bc{TESTBC} SET search_path = bucardo, public");
$masterdbh->commit();

## Create a new Bucardo instance
my $bc;
eval {
	$bc = Bucardo->new
		({
		  dbhost      => $bc{DBHOST},
		  dbport      => $bc{DBPORT},
		  dbname      => $bc{TESTDB},
		  dbuser      => $bc{TESTBC},
		  dbpass      => $bc{TESTPW},
		  verbose     => 0,
		  debugsyslog => 0,
		  debugstderr => 0,
		  debugstdout => 0,
		  debugfile   => 1,
		  cleandebugs => 0,
		  debugsql    => 0,
		  bcquiet     => 1,
		  });
};
$@ and BAIL_OUT "Could not create Bucardo instance: $@";

isa_ok($bc, 'Bucardo');

$need_shutdown = 1;

$masterdbh->commit();

## Assign names to the databases, and set some common things for each one
my %db = (
		  $dbh1 => 'bctest1',
		  $dbh2 => 'bctest2',
		  $dbh3 => 'bctest3',
		  );

## Create our test tables, one for each major data type we handle
my %tabletype =
	(
	 'bucardo_test0' => 'BIGINT',
	 'bucardo_test1' => 'INT',
	 'bucardo_test2' => 'TEXT',
	 'bucardo_test3' => 'DATE',
	 'bucardo_test4' => 'TIMESTAMP',
 );

my %table; ## This will hold the oids

## Used for rule testing
$SQL = qq{
	CREATE TABLE droptest (
		name TEXT NOT NULL,
		type TEXT NOT NULL,
		inty INTEGER NOT NULL
	)
};
unless ($ENV{BUCARDO_TEST_NOCREATEDB}) {
	$dbh1->do($SQL);
	$dbh2->do($SQL);
	$dbh3->do($SQL);
}

## Used for trigger testing
## no critic
$SQL = q{
	CREATE OR REPLACE FUNCTION trigger_test()
	RETURNS trigger
	LANGUAGE plpgsql
	AS $_$ BEGIN
		INSERT INTO droptest(name,type,inty) VALUES (TG_RELNAME, 'trigger', NEW.inty);
		RETURN NULL;
		END;
	$_$;
	CREATE OR REPLACE FUNCTION trigger_test_zero()
	RETURNS trigger
	LANGUAGE plpgsql
	AS $_$ BEGIN
		INSERT INTO droptest(name,type,inty) VALUES (TG_RELNAME, 'trigger', 0);
		RETURN NULL;
		END;
	$_$;
};
## use critic

$dbh1->do($SQL);
$dbh2->do($SQL);
$dbh3->do($SQL);


for my $table (sort keys %tabletype) {
	$SQL = qq{
		CREATE TABLE $table (
			id    $tabletype{$table} NOT NULL PRIMARY KEY};
	$SQL .= $table =~ /0/ ? "\n)" : qq{,
        	data1 TEXT                   NULL,
	        inty  SMALLINT               NULL,
    	    email TEXT                   NULL UNIQUE
		)
	};
	unless ($ENV{BUCARDO_TEST_NOCREATEDB}) {
		$dbh1->do($SQL);
		$dbh2->do($SQL);
		$dbh3->do($SQL);
	}

	## Get the oids back out:
	$SQL = qq{
		SELECT c.oid
		FROM   pg_catalog.pg_class c, pg_catalog.pg_namespace n
		WHERE  c.relnamespace = n.oid
		AND    n.nspname = ?
		AND    relkind = 'r'
		AND    relname = ?
	};
	$sth = $dbh1->prepare($SQL);
	$count = $sth->execute($TEST_SCHEMA, $table);
	BAIL_OUT(qq{No oid for "$table"}) unless 1==$count;
	$table{$dbh1}{$table} = $sth->fetchall_arrayref()->[0][0];

	$sth = $dbh2->prepare($SQL);
	$count = $sth->execute($TEST_SCHEMA, $table);
	BAIL_OUT(qq{No oid for "$table"}) unless 1==$count;
	$table{$dbh2}{$table} = $sth->fetchall_arrayref()->[0][0];

	$sth = $dbh3->prepare($SQL);
	$count = $sth->execute($TEST_SCHEMA, $table);
	BAIL_OUT(qq{No oid for "$table"}) unless 1==$count;
	$table{$dbh3}{$table} = $sth->fetchall_arrayref()->[0][0];

	## Create a trigger to test trigger supression during syncs
	$SQL = qq{
		CREATE TRIGGER bctrig_$table
		AFTER INSERT OR UPDATE ON $table
		FOR EACH ROW EXECUTE PROCEDURE trigger_test()
	};
	$table =~ /0/ and ($SQL =~ s/trigger_test/trigger_test_zero/);
	unless ($ENV{BUCARDO_TEST_NOCREATEDB}) {
		$dbh1->do($SQL);
		$dbh2->do($SQL);
		$dbh3->do($SQL);
	}

	## Create a rule to test rule supression during syncs
	$SQL = qq{
		CREATE OR REPLACE RULE bcrule_$table
		AS ON INSERT TO $table
		DO ALSO INSERT INTO droptest(name,type,inty) VALUES ('$table','rule',NEW.inty)
	};
	$table =~ /0/ and $SQL =~ s/NEW.inty/0/;
	$dbh1->do($SQL);
	$dbh2->do($SQL);
	$dbh3->do($SQL);


} ## end creating each table

pass(" Create test tables on remote databases");

## We must commit as we will not be connecting from this session
$dbh1->commit();
$dbh2->commit();
$dbh3->commit();

## Prepare some test values for easy use
my %val;
for (1..30) {
	$val{BIGINT}{$_} = $_;
	$val{INT}{$_} = $_;
	$val{TEXT}{$_} = "bc$_";
	$val{DATE}{$_} = sprintf "2001-10-%02d", $_;
	$val{TIMESTAMP}{$_} = $val{DATE}{$_} . " 12:34:56";
}

if ($ENV{BUCARDO_TEST_NOCREATEDB}) {
	$masterdbh->do("DELETE FROM q");
	$masterdbh->do("DELETE FROM sync");
	$masterdbh->do("DELETE FROM goat");
	$masterdbh->do("DELETE FROM db");
	$masterdbh->commit();
}


## Add in our test databases
$bc->database({
	name   => $db{$dbh1},
	dbhost => $bc{DBHOST1},
	dbport => $bc{DBPORT1},
	dbname => $bc{TESTDB1},
	dbuser => $bc{TESTBC1},
	dbpass => $bc{TESTPW1}
});

$bc->database({
	name   => $db{$dbh2},
	dbhost => $bc{DBHOST2},
	dbport => $bc{DBPORT2},
	dbname => $bc{TESTDB2},
	dbuser => $bc{TESTBC2},
	dbpass => $bc{TESTPW2}
});

$bc->database({
	name   => $db{$dbh3},
	dbhost => $bc{DBHOST3},
	dbport => $bc{DBPORT3},
	dbname => $bc{TESTDB3},
	dbuser => $bc{TESTBC3},
	dbpass => $bc{TESTPW3}
});

pass(" Added in databases");

## Add all the goats
my $herd1 = "bctestherd1";
my $herd2 = "bctestherd2";
for my $table (sort keys %tabletype) {
	for my $db (1..2) {
		$bc->goat
			({
			  db         => "bctest$db",
			  schemaname => $TEST_SCHEMA,
			  tablename  => $table,
			  herd       => "bctestherd$db",
			  pkey       => 'id',
			  pkeytype   => lc $tabletype{$table},
			  standard_conflict => 'source',
		  });
	}
}

if ($TEST_PING) { ## START_TEST_PING

	$location = 'ping';
	pass(" Begin TEST_PING section");

	ping_testing();

} ## STOP_TEST_PING


if ($TEST_METHODS) { ## START_TEST_METHODS

	## Test methods to change things in the Bucardo database

	$location = 'methods';
	pass(" Begin TEST_METHODS section");

	clean_all_tables();

	shutdown_bucardo();

	test_customcode_methods();

	test_database_methods();

	test_goat_methods();

	test_sync_methods();

} ## STOP_TEST_METHODS

if ($TEST_CONFIG) { ## START_TEST_CONFIG

	$location = 'config';
	pass(" Begin TEST_CONFIG section");

	shutdown_bucardo();

	test_config();

} ## STOP_TEST_CONFIG

if ($TEST_PURGE) { ## START_TEST_PURGE

	$location = 'purge';
	pass(" Begin TEST_PURGE section");

	shutdown_bucardo();

	clean_all_tables();

	test_purge();

} ## STOP_TEST_PURGE

if ($TEST_PUSHDELTA) { ## START_TEST_PUSHDELTA

	$location = 'pushdelta';
	pass(" Begin TEST_PUSHDELTA section");

	shutdown_bucardo();
	clean_all_tables();

	## Setup a pushdelta sync
	$bc->sync
		({
		  name             => 'pushdeltatest',
		  source           => 'bctestherd1',
		  targetdb         => 'bctest2',
		  synctype         => 'pushdelta',
	  });

	bucardo_ctl("start 'Start pushdelta testing'");
	wait4kid('bucardo_q_pushdeltatest_bctest2');
	pass(" Bucardo was started");

	for my $table (sort keys %tabletype) {
		basic_pushdelta_testing($table,$dbh1,$dbh2); ## TESTCOUNT * 5
	}

	pass(" Finished with pushdelta tests");

} ## STOP_TEST_PUSHDELTA

if ($TEST_MAKEDELTA) { ## START_TEST_MAKEDELTA

	$location = 'makedelta';
	pass(" Begin TEST_MAKEDELTA section");

	shutdown_bucardo();
	clean_all_tables();

	## Test makedelta column
	$bc->sync
		({
		  name      => 'makedeltatest',
		  source    => 'bctestherd1',
		  targetdb  => 'bctest2',
		  synctype  => 'swap',
          makedelta => 1,
	  });

	bucardo_ctl("start 'Start makedelta testing'");
	wait4kid('bucardo_q_makedeltatest_bctest2');
	pass(" Bucardo was started");

	for my $table (sort keys %tabletype) {
		next if $table =~ /0/;
		makedelta_testing($table,$dbh1,$dbh2); ## TESTCOUNT * 4
	}

	pass(" Finished with makedelta tests");

} ## STOP_TEST_MAKEDELTA

if ($TEST_COPY) { ## START_TEST_COPY

	$location = 'copy';
	pass(" Begin TEST_COPY section");

	shutdown_bucardo();

	clean_all_tables();

	## Test full push
	$bc->sync
		({
		  name             => 'copytest',
		  source           => 'bctestherd1',
		  targetdb         => 'bctest2',
		  synctype         => 'fullcopy',
		  disable_triggers => 'replica',
		  disable_rules    => 'replica',
	  });

	bucardo_ctl("start 'Start fullcopy testing'");

	wait4kid('bucardo_q_copytest_bctest2');
	pass(" Bucardo was started");

	for my $table (sort keys %tabletype) {
		basic_copy_testing($table,$dbh1,$dbh2); ## TESTCOUNT * 5
	}

	analyze_after_copy('bucardo_test1',$dbh1,$dbh2);

	pass(" Finished with fullcopy tests");

} ## STOP_TEST_COPY

if ($TEST_SWAP) { ## START_TEST_SWAP

	$location = 'swap';
	pass(" Begin TEST_SWAP section");

	shutdown_bucardo();
	clean_all_tables();

	## Test swap
	$bc->sync
		({
		  name              => 'swaptest',
		  source            => 'bctestherd2',
		  targetdb          => 'bctest1',
		  synctype          => 'swap',
	  });
	bucardo_ctl("start 'Start swap testing'");
	wait4kid('bucardo_q_swaptest_bctest1');
	pass(" Bucardo was started");

	## Check that each table type populates bucardo_delta
	for my $table (sort keys %tabletype) {
		next if $table =~ /0/;
		bucardo_delta_populate($table,$dbh1); ## TESTCOUNT * 4
		bucardo_delta_populate($table,$dbh2); ## TESTCOUNT * 4
	}
	$dbh1->rollback();
	$dbh2->rollback();

	## Test the swap sync method
	for my $table (sort keys %tabletype) {
		basic_swap_testing($table,$dbh1,$dbh2); ## TESTCOUNT * 5
		basic_swap_testing($table,$dbh2,$dbh1); ## TESTCOUNT * 5
	}

	pass(" Finished with swap tests");

} ## STOP_TEST_SWAP

if ($TEST_CUSTOM_CODE) { ## START_TEST_CUSTOM_CODE

	$location = 'customcode';
	pass(" Begin TEST_CUSTOM_CODE section");

	shutdown_bucardo();
	clean_all_tables(); ## TEST_CUSTOM_CODE

	## Test custom code
	$bc->sync
		({
		  name      => 'customcode',
		  source    => 'bctestherd1',
		  targetdb  => 'bctest2',
		  synctype  => 'swap', ## separate pushdelta later?
	  });

	bucardo_ctl("start 'Start customcode testing'");
	wait4kid('bucardo_q_customcode_bctest2');
	pass(" Bucardo was started");

	for my $table (sort keys %tabletype) {
		next if $table =~ /0/;
		test_customcode($table,$dbh1,$dbh2); ## TESTCOUNT * 4
	}

	pass(" Finished with custom_code tests");

} ## STOP_TEST_CUSTOM_CODE

if ($TEST_RANDOM_SWAP) { ## START_TEST_RANDOM_SWAP

	$location = 'randomswap';
	pass(" Begin TEST_RANDOM_SWAP section");

	shutdown_bucardo();
	clean_all_tables();

	## Test swap
	$bc->sync
		({
		  name              => 'swaptest',
		  source            => 'bctestherd2',
		  targetdb          => 'bctest1',
		  synctype          => 'swap',
	  });
	bucardo_ctl("start 'Start random swap testing'");
	wait4kid('bucardo_q_swaptest_bctest1');
	pass(" Bucardo was started");

	for my $table (sort keys %tabletype) {
		random_swap_testing($table,$dbh1,$dbh2); ## TESTCOUNT * 4
	}

	pass(" Finished with random swap tests");

} ## STOP_TEST_RANDOM_SWAP

exit;

END {
	if ($need_shutdown) {
		diag "\nLeaving, shutting down any running processes";
		bucardo_ctl("stop 'Stop the testing'");

		if ($masterdbh) {
			$masterdbh->rollback();
			$masterdbh->disconnect();
		}
		$dbh1 and $dbh1->disconnect();
		$dbh2 and $dbh2->disconnect();
		system("/bin/rm -fr $PIDDIR/*.pid");
		system("touch $PIDDIR/fullstopbucardo");
	}

	## Kill our test program if running
	## TODO : Clean this up
	if ($^O !~ /Win/) {		
		for (split /\n/ => qx{/bin/ps w}) {
			next if /^\s*$$\s/;
			if (m{(\d+).*perl t/bucardo.test.helper\b}) {
				kill 15, $1;
				last;
			}
		}
	}
	exit;
}



sub setup_database {

	my $type = shift;
	my $suffix = ($type eq 'master') ? '' : $type;

	my $dbname   = $bc{"DBNAME$suffix"};
	my $dbuser   = $bc{"DBUSER$suffix"};
	my $dbpass   = $bc{"DBPASS$suffix"};
	my $dbhost   = $bc{"DBHOST$suffix"} || '';
	my $dbport   = $bc{"DBPORT$suffix"} || '';
	my $testdb   = $bc{"TESTDB$suffix"};
	my $testuser = $bc{"TESTBC$suffix"};
	my $testpass = $bc{"TESTPW$suffix"} || 'pie';

	my $dsn = "dbi:Pg:database=$dbname";
	length $dbhost and $dsn .= ";host=$dbhost";
	length $dbport and $dsn .= ";port=$dbport";
	my $dbh;
	eval {
		$dbh = DBI->connect($dsn, $dbuser, $dbpass,
							{AutoCommit=>0,RaiseError=>1,PrintError=>0});
	};
	$@ and BAIL_OUT "Could not connect to the database (check your t/bucardo.test.data file): $@\n";
	pass(" Connected to the database");

	## Does the test user and test database exist?
	$SQL = "SELECT 1 FROM pg_catalog.pg_user WHERE usename = ?";
	$sth = $dbh->prepare($SQL);
	my $usercount = $sth->execute($testuser);
	$sth->finish();
	$usercount=0 if $usercount eq '0E0';

	$SQL = "SELECT 1 FROM pg_catalog.pg_database WHERE datname = ?";
	$sth = $dbh->prepare($SQL);
	my $dbcount = $sth->execute($testdb);
	$sth->finish();
	$dbcount=0 if $dbcount eq '0E0';

	unless ($ENV{BUCARDO_TEST_NOCREATEDB}) {

		if (!$ENV{BUCARDO_TEST_NUKE_OKAY} and ($dbcount or $usercount)) {
			diag (($dbcount and $usercount)
				  ? qq{\nOkay to drop user "$testuser" and database "$testdb"?}
				  : $usercount
				  ? qq{Okay to drop user "$testuser"?}
				  : qq{Okay to drop database "$testdb"?});
		}
		if ($dbcount or $usercount) {
			BAIL_OUT("As you wish!") if !$ENV{BUCARDO_TEST_NUKE_OKAY} and <> !~ /^Y/i;
			if ($dbcount) {
				$dbh->{AutoCommit} = 1;
				$dbh->do("DROP DATABASE $testdb");
				$dbh->{AutoCommit} = 0;
				pass(qq{ Dropped database "$testdb"});
			}
			if ($usercount) {
				$dbh->do("DROP USER $testuser");
				pass(qq{ Dropped user "$testuser"});
				$dbh->commit;
			}
		}
		
		$SQL = "CREATE USER $testuser SUPERUSER PASSWORD '$testpass'";
		eval { $dbh->do($SQL); };
		$@ and BAIL_OUT qq{Could not create test superuser "$testuser": $@\n};
		pass(qq{ Created test super user "$testuser"});
		
		$dbh->{AutoCommit} = 1;
		$SQL = "CREATE DATABASE $testdb OWNER $testuser";
		eval { $dbh->do($SQL); };
		$dbh->{AutoCommit} = 0;
		$@ and BAIL_OUT qq{Could not create test database $testdb: $@\n};
		pass(qq{ Created test database "$testdb" owned by user "$testuser"});
	}

	## Reconnect as the test user in the test database
	$dbh->disconnect();
	$dsn =~ s/database=$dbname/database=$testdb/;
	eval {
		$dbh = DBI->connect($dsn, $testuser, $testpass,
							{AutoCommit=>0,RaiseError=>1,PrintError=>0});
	};
	$@ and BAIL_OUT "Could not connect to database: $@\n";
	pass(qq{ Connected to the test database as the test user "$testuser"});

	## Do we have the languages we need?
	$sth = $dbh->prepare("SELECT 1 FROM pg_catalog.pg_language WHERE lanname = ?");
	my @languages = ('plpgsql');
	if ($type eq 'master') {
		push @languages, 'plperl', 'plperlu';
	}
	for my $lan (@languages) {
		$count = $sth->execute($lan);
		$sth->finish();
		if ($count eq '0E0') {
			$dbh->do("CREATE LANGUAGE $lan");
			$count = $sth->execute($lan);
			$sth->finish();
			$count==1 or BAIL_OUT ("Could not create language $lan");
			$dbh->commit();
		}
	}
	pass(" All needed languages are installed");

	$dbh->do("SET escape_string_warning = 'off'");
	$dbh->do("ALTER USER $testuser SET escape_string_warning = 'off'");

	if ($type ne 'master') {
		$dbh->do("SET client_min_messages = 'warning'");
		$dbh->do("CREATE SCHEMA $TEST_SCHEMA") unless $ENV{BUCARDO_TEST_NOCREATEDB};
		$dbh->do("SET search_path = $TEST_SCHEMA");
		$dbh->do("ALTER USER $testuser SET search_path = $TEST_SCHEMA");
	}

	$dbh->commit();

	return $dbh;

} ## end of setup_database


sub shutdown_bucardo {

	my $STOPFILE = "$PIDDIR/fullstopbucardo";
	open my $stop, '>', $STOPFILE or die qq{Could not create "$STOPFILE": $!\n};
	print {$stop} "Stopped by $0 on " . (scalar localtime) . "\n";
	close $stop or warn qq{Could not close "$STOPFILE": $!\n};

	pass(" Existing Bucardo asked to shut down");

    ## Find a grep we can use.

    my $grep_path = $ENV{GREP_PATH}     ||
                    qx!which grep!      ||
                    qx!whereis -b grep!;
    if (defined $grep_path) {
        $grep_path =~ s!\Agrep: /!!;
        $grep_path =~ s!\n\z!!;
    }
    else {
        for my $path (grep { -x } qw!/bin/grep /usr/bin/grep /usr/local/bin/grep!) {
            $grep_path = $path;
            last;
        }
    }

    BAIL_OUT q!Cannot find a usable "grep"; set GREP_PATH!
        unless -x $grep_path;

	my $loop = 1;
	{
		my $res = qx{/bin/ps -Afwww | $grep_path Bucardo | $grep_path $xname | $grep_path -v grep}; ## no critic
		last if $res !~ /Bucardo/m;
		if ($loop++ > 10) {
			BAIL_OUT "Could not persuade existing Bucardo to shut down\n";
		}
		sleep 1;
		redo;
	}

	return;

} ## end of shutdown_bucardo


sub wait_until_true {

	my $xline = (caller)[2];
	my $dbh = shift or die "Need a database handle (from line $xline)\n";
	my $sql = shift or die "Need a SQL statement (from line $xline)\n";
	my $timeout = shift || $TIMEOUT_SYNCWAIT;
	my $sleep = shift || $TIMEOUT_SLEEP;
	my $type = shift || 'true';
	my $line = shift || $xline;

	alarm $timeout;
	$sth = $dbh->prepare($sql);
	eval {
	  W: {
			$count = $sth->execute();
			$sth->finish();
			$dbh->commit();
			last if ($type eq 'true' and $count >= 1) or ($type eq 'false' and $count < 1);
			sleep $sleep;
			redo;
		}
	};
	$count = alarm 0;
	return $count unless $@;
	my $db = $dbmap{$dbh} || '?';
	BAIL_OUT (qq{Gave up waiting for "$sql" on db "$db" to be $type: timed out at $timeout from line $line ($@)});
	return;

} ## end of wait_until_true


sub wait_until_false {

	my $xline = (caller)[2];
	my $dbh = shift or die "Need a database handle (from line $xline)\n";
	my $sql = shift or die "Need a SQL statement (from line $xline)\n";
	my $timeout = shift || $TIMEOUT_SYNCWAIT;
	my $sleep = shift || $TIMEOUT_SLEEP;
	return wait_until_true($dbh,$sql,$timeout,$sleep,'false',$xline);

} ## end of wait_until_false

sub clean_all_tables {

	## Reset out test databases to their original state
	## Empty out all data from the tables
	## Remove any triggers added
	## Drop the helper bucardo schema

	$masterdbh->rollback();
	$masterdbh->do("DELETE FROM sync"); ## Needs to go first due to trigger tek

	for my $dbh ($dbh1,$dbh2,$dbh3) {
		$dbh->rollback();
		for my $table (sort keys %tabletype) {
			$dbh->do("TRUNCATE TABLE $table");
			$SQL = "SELECT tgname FROM pg_trigger WHERE tgrelid = (SELECT oid FROM pg_class WHERE relname = '$table')";
			for (@{$dbh->selectall_arrayref($SQL)}) {
				next if $_->[0] =~ /^bctrig_/o;
				$dbh->do("DROP TRIGGER $_->[0] ON $table");
			}
		}
		## Nuke the entire bucardo schema if it exists
		if (object_count($dbh,'bucardo','schema','')) {
			$dbh->do("DROP SCHEMA bucardo CASCADE");
		}
		$dbh->do("TRUNCATE TABLE droptest");
		$dbh->commit();
	}
	$masterdbh->do("DELETE FROM q");
	$masterdbh->do("DELETE FROM audit_pid");
	$masterdbh->commit();
	pass(" Finished clean_all_tables");
	return;

} ## end of clean_all_tables

sub clean_swap_table {

	## Empty out swap table and associated tables on one or more databases
	my ($table,$dbs) = @_;
	for my $dbh (@{$dbs}) {
		$dbh->rollback;
	}
	for my $dbh (@{$dbs}) {
		my $oid = $table{$dbh}{$table};
		$dbh->do("DELETE FROM $table");
		$dbh->do("DELETE FROM bucardo.bucardo_track WHERE tablename = $oid");
		$dbh->do("DELETE FROM bucardo.bucardo_delta WHERE tablename = $oid");
		$dbh->do("DELETE FROM bucardo.bucardo_track");
		$dbh->do("DELETE FROM bucardo.bucardo_delta");
		$dbh->commit;
	}
	return;

} ## end of clean_swap_table


sub table_exists {

	my ($dbh,$table) = @_;
	my $schema = '';
	if ($table =~ /(\w+)\.(\w+)/) {
		($schema,$table) = ($1,$2);
	}
	$SQL = "SELECT count(*) FROM information_schema.tables WHERE table_name = ".$dbh->quote($table);
	if ($schema) {
		$SQL .= "AND table_schema = ".$dbh->quote($schema);
	}
	return $dbh->selectall_arrayref($SQL)->[0][0];

} ## end of table_exists


sub object_count {

	## See if an object exists in a database. Returns number of objects found, usually 0 or 1

	my ($dbh,$schema,$type,$name) = @_;

	if ('table' eq $type) {
		$SQL = "SELECT 1 FROM pg_class c, pg_namespace n WHERE c.relnamespace=n.oid".
			" AND n.nspname = ? AND c.relname = ?";
	}
	elsif ('function' eq $type) {
		$SQL = "SELECT 1 FROM pg_proc p, pg_namespace n WHERE p.pronamespace = n.oid".
			" AND n.nspname = ? AND p.proname = ?";
	}
	elsif ('schema' eq $type) {
		$SQL = "SELECT 1 FROM pg_namespace n WHERE nspname = ? AND 'null'<>?";
	}
	else {
		die "Invalid type: $type\n";
	}

	$sth = $dbh->prepare_cached($SQL);
	$count = $sth->execute($schema,$name);
	$sth->finish();
	return $count >= 1 ? $count : 0;

} ## end of object_count

sub bucardo_ctl {

	## Use a helper program to safely invoke bucardo_ctl with the given args

	my $command = shift;
	my $timeout = shift || $ALARM_BUCARDO_CTL;

	if ($command =~ /kick/io) {
		$dbh1->commit();
		$dbh2->commit();
		$dbh3->commit();
		$masterdbh->commit();
	}

	## diag "Starting bucardo_ctl with $command I am $$\n";
	my $controlfile = "bucardo_test_control";
	my $tmpfile = "$controlfile.tmp";
	open my $fh, ">", $tmpfile or die qq{Could not open "$tmpfile": $!\n};
	my $text = "$command --dbname=$bc{TESTDB} --dbuser=$bc{TESTBC} --dbpass=$bc{TESTPW} --ctlquiet ".
		"--debugstderr=0 --debugstdout=0 --debugfile=1 --verbose=1 --sendmail=0 ".
		"--cleandebugs=0 --debugdir=$DEBUGDIR ".
		"--extraname=$xname";
	$bc{DBPORT} and $text .= " --dbport=$bc{DBPORT}";
	$bc{DBHOST} and $text .= " --dbhost=$bc{DBHOST}";
	$DEBUG and diag "Called bucardo_ctl with: $text";
	print $fh "$text\n";
	close $fh or die qq{Could not close "$tmpfile": $!\n};
	rename $tmpfile, $controlfile or die qq{Could not rename $tmpfile to $controlfile\n};
	## Wait until it is deleted

	alarm $timeout;
	eval {
	  S: {
			last if ! -e $controlfile;
			sleep 0.1;
			redo;
		}
	};
	$count = alarm 0;

	if (!$@) {
		## diag "End bucardo_ctl with $command\n";
		return $count;
	}

	if ($@ =~ /Timed out/) {
		system("touch $PIDDIR/fullstopbucardo");
		BAIL_OUT("bucardo_ctl was not invoked: is the bucardo.test.helper file running? (command=$command)");
	}
	BAIL_OUT("bucardo_ctl gave an error: $@");

	return;

} ## end of bucardo_ctl


sub wait4kid {

	my $notice = shift;
	my $timeout = shift || $ALARM_WAIT4KID;

	$SQL = "SELECT 1 FROM pg_catalog.pg_listener WHERE relname = ?";
	my $listen = $masterdbh->prepare($SQL);
	alarm $timeout;
	eval {
	  S: {
			$count = $listen->execute($notice);
			$listen->finish();
			last if $count == 1;
			sleep 0.1;
			redo;
		}
	};
	$count = alarm 0;
	return $count unless $@;
	BAIL_OUT("Waited too long ($timeout) for kid to LISTEN to $notice");
	return;

} ## end of wait4kid

sub wait_for_notice {

	my $dbh = shift;
	my $text = shift;
	my $timeout = shift || $TIMEOUT_NOTICE;
	my $sleep = shift || $TIMEOUT_SLEEP;

	my $n;
	alarm $timeout;
	eval {
	  N: {
			while ($n = $dbh->func('pg_notifies')) {
				last N if $n->[0] eq $text;
			}
			sleep $sleep;
			redo;
		}
	};
	$count = alarm 0;
	return $count unless $@;
	my $line = (caller)[2];
	BAIL_OUT (qq{Gave up waiting for notice "$text": timed out at $timeout from line $line ($@)});
	return;

} ## end of wait_for_notice


## no critic
{
	no warnings; ## Yes, we know they are being redefined!
	sub is_deeply {
		t($_[2],$_[3] || (caller)[2]);
		return if Test::More::is_deeply($_[0],$_[1],$testmsg);
		if ($bail_on_error > $total_errors++) {
			my $line = (caller)[2];
			my $time = time;
			diag("GOT: ".Dumper $_[0]);
			diag("EXPECTED: ". Dumper $_[1]);
			BAIL_OUT "Stopping on a failed 'is_deeply' test from line $line. Time: $time";
		}
	} ## end of is_deeply
	sub like {
		t($_[2],(caller)[2]);
		return if Test::More::like($_[0],$_[1],$testmsg);
		if ($bail_on_error > $total_errors++) {
			my $line = (caller)[2];
			my $time = time;
			BAIL_OUT "Stopping on a failed 'like' test from line $line. Time: $time";
		}
	} ## end of like
	sub pass {
		t($_[0],$_[1]||(caller)[2]);
		Test::More::pass($testmsg);
	} ## end of pass
	sub is {
		t($_[2],(caller)[2]);
		return if Test::More::is($_[0],$_[1],$testmsg);
		if ($bail_on_error > $total_errors++) {
			my $line = (caller)[2];
			my $time = time;
			BAIL_OUT "Stopping on a failed 'is' test from line $line. Time: $time";
		}
	} ## end of is
	sub isa_ok {
		t("Object isa $_[1]",(caller)[2]);
		my ($name, $type, $msg) = ($_[0],$_[1]);
		if (ref $name and ref $name eq $type) {
			Test::More::pass($testmsg);
			return;
		}
		$bail_on_error > $total_errors++ and BAIL_OUT "Stopping on a failed test";
	} ## end of isa_ok
	sub ok {
		t($_[1]||$testmsg);
		return if Test::More::ok($_[0],$testmsg);
		if ($bail_on_error > $total_errors++) {
			my $line = (caller)[2];
			my $time = time;
			BAIL_OUT "Stopping on a failed 'ok' test from line $line. Time: $time";
		}
	} ## end of ok
}
## use critic


sub now_time {
	my $dbh = shift;
	return $dbh->selectall_arrayref("SELECT now()")->[0][0];
} ## end of now_time


sub bc_deeply {

	my ($exp,$dbh,$sql,$msg) = @_;
	my $line = (caller)[2];

	local $Data::Dumper::Terse = 1;
	local $Data::Dumper::Indent = 0;

	die "Very invalid statement from line $line: $sql\n" if $sql !~ /^\s*select/i;

	my $got;
	eval {
		$got = $dbh->selectall_arrayref($sql);
	};
	if ($@) {
		die "bc_deeply failed from line $line. SQL=$sql\n";
	}

	return is_deeply($got,$exp,$msg,(caller)[2]);

} ## end of bc_deeply


sub compare_tables {

	my ($table,$sdbh,$rdbh) = @_;

	my ($line) = (caller)[2];

	local $Data::Dumper::Terse = 1;
	local $Data::Dumper::Indent = 0;

	my $msg = "Table $table is the same on both databases";
	$SQL = "SELECT * FROM $table ORDER BY inty, id";
	$SQL =~ s/inty, // if $table =~ /0/;
	my $uno = $sdbh->selectall_arrayref($SQL);
	my $dos = $rdbh->selectall_arrayref($SQL);
	if ((Dumper $uno) eq (Dumper $dos)) {
		pass($msg,$line);
		return 1;
	}

	return is_deeply($uno,$dos,$msg,$line);

} ## end of compare_tables


sub tt {
	## Simple timing routine. Call twice with the same arg, before and after
	my $name = shift or die qq{Need a name!\n};
	if (exists $timing{$name}) {
		my $newtime = tv_interval($timing{$name});
		warn "Timing for $name: $newtime\n";
		delete $timing{$name};
	}
	else {
		$timing{$name} = [gettimeofday];
	}
	return;
} ## end of tt

sub t {
	$testmsg = shift;
	$testline = shift || (caller)[2];
	$testmsg =~ s/^\s+//;
	if ($location) {
		$testmsg = "($location) $testmsg";
	}
	if ($showline) {
		$testmsg .= " [line: $testline]";
	}
	if ($showtime) {
		my $time = time;
		$testmsg .= " [time: $time]";
	}
} ## end of t


sub exitnow {
	$need_shutdown = 0;
	exit;
} ## end of exitnow



sub test_customcode_methods {

	## Test methods related to custom code: customcode, remove_customcode

	$location = 'customcode_methods';

	my ($code, $codeid);

	$t=q{ Method customcode fails if no arguments are given };
	eval { $bc->customcode(); };
	like($@, qr{must be a hashref}, $t);

	$t=q{ Method customcode fails if no 'src_code' argument given };
	eval { $bc->customcode({name => 'test'}); };
	like($@, qr{\QAttribute (src_code) is required}, $t);

	$t=q{ Method customcode fails if no 'name' argument given };
	eval { $bc->customcode({src_code => 'foo'}); };
	like($@, qr{\QAttribute (name) is required}, $t);

	$t=q{ Method customcode fails if no 'whenrun' argument given };
	eval { $bc->customcode({name => 'test', src_code => 'foo'}); };
	like($@, qr{\QAttribute (whenrun) is required}, $t);

	$t=q{ Method customcode fails if 'whenrun' is invalid };
	eval { $bc->customcode({name => 'test', src_code => 'foo', whenrun => 'invalid'}); };
	like($@, qr{violates check constraint "customcode_whenrun"}, $t);

	$t=q{ Method customcode works if given valid arguments };
	eval { $code = $bc->customcode({name => 'test', src_code => 'foo', whenrun => 'before_txn'}); };
	is($@, q{}, $t);

	$t=q{ Method customcode returns a hashref };
	is(ref $code, 'HASH', $t);

	$t=q{ Method customcode returns a hashref containing a numeric 'id' key };
	$codeid = $code->{id} || 0;
	like($codeid, qr{^\d$}, $t);

	$t=q{ Method customcode inserts to the database correctly };
	$SQL = "SELECT id, name, about, whenrun, src_code, getdbh, getrows FROM customcode";
	$sth = $masterdbh->prepare($SQL);
	$sth->execute();
	$got = $sth->fetchall_arrayref({});
	$expected = [
				 {
				  id       => $codeid,
				  name     => 'test',
				  about    => undef,
				  whenrun  => 'before_txn',
				  src_code => 'foo',
				  getdbh   => 1,
				  getrows  => 0,
				  }
				 ];
	is_deeply($got, $expected, $t);

	$t=q{ Method customcode fails if 'name' already exists };
	eval { $bc->customcode({name => 'test', src_code => 'foo', whenrun => 'before_txn'}); };
	like($@, qr{duplicate key}, $t);

	$t=q{ Method customcode inserts with optional attributes };
	eval { $code = $bc->customcode({name => 'test2', about=>'bz', src_code => 'foo',
									whenrun => 'after_txn', getdbh=>0, getrows=>1});
	};
	is($@, q{}, $t);

	$t=q{ Method customcode inserts to the database correctly };
	$sth->execute();
	$got = $sth->fetchall_arrayref({});
	push @$expected,
		{
		 id       => $code->{id},
		 name     => 'test2',
		 about    => 'bz',
		 whenrun  => 'after_txn',
		 getdbh   => 0,
		 getrows  => 1,
		 src_code => 'foo',
		 };
	is_deeply($got, $expected, $t);

	## Codes can be connected to syncs and goats via the customcode_map table

	$t=q{ Method customcode fails if 'id' is not numeric };
	eval { $bc->customcode({id => 'foobar'}); };
	like($@, qr{must be a number}, $t);

	$t=q{ Method customcode fails if 'id' given, but not 'goat' or 'sync' };
	eval { $bc->customcode({id => $codeid}); };
	like($@, qr{sync or goat}, $t);

	$t=q{ Method customcode fails if 'id' does not exist };
	eval { $bc->customcode({id => 42, sync => 'foo'}); };
	like($@, qr{\d does not exist}, $t);

	$t=q{ Method customcode fails if 'sync' does not exist };
	eval { $bc->customcode({id => $codeid, sync => 'foo'}); };
	like($@, qr{sync does not exist}, $t);

	$t=q{ Method customcode fails if 'goat' is not numeric };
	eval { $bc->customcode({id => $codeid, goat => 'foo'}); };
	like($@, qr{Invalid goat}, $t);

	$t=q{ Method customcode fails if 'goat' does not exist };
	eval { $bc->customcode({id => $codeid, goat => 999}); };
	like($@, qr{goat does not exist}, $t);

	$t=q{ Method customcode works if given a valid 'id' and 'goat' };
	eval { $bc->customcode({id => $codeid, goat => 1}); };
	is($@, q{}, $t);

	## Create a sync for us to use
	$bc->sync
		({
		  name      => 'cctest',
		  source    => 'bctestherd1',
		  targetdb  => 'bctest2',
		  synctype  => 'fullcopy',
	  });

	$t=q{ Method customcode works if given a valid 'id' and 'sync' };
	eval { $bc->customcode({id => $codeid, sync => 'cctest'}); };
	is($@, q{}, $t);

	$t=q{ Method customcode fails if 'id' and 'sync' already exist };
	eval { $bc->customcode({id => $codeid, sync => 'cctest'}); };
	like($@, qr{customcode_map_unique_sync}, $t);

	$t=q{ Method customcode fails if 'id' and 'goat' already exists };
	eval { $bc->customcode({id => $codeid, goat => 1}); };
	like($@, qr{customcode_map_unique_goat}, $t);

	$t=q{ Method customcode inserts to the database correctly };
	$SQL = "SELECT code,sync,goat,active,priority FROM customcode_map ORDER BY code";
	$sth = $masterdbh->prepare($SQL);
	$sth->execute();
	$got = $sth->fetchall_arrayref({});
	$expected = [
				 {code=>$codeid, goat=>1, sync=>undef,active=>1,priority=>0},
				 {code=>$codeid, goat=>undef, sync=>'cctest',active=>1,priority=>0},
				 ];
	is_deeply($got, $expected, $t);

	$t=q{ Method customcode works with optional attributes 'active' and 'priority' };
	eval { $bc->customcode({id => $codeid, goat => 2,active => 0, priority => 9}); };
	is($@, q{}, $t);

	$t=q{ Method customcode inserts to database correctly with optional attribs };
	$sth->execute();
	$got = $sth->fetchall_arrayref({});
	push @$expected, {code=>2, goat=>2, sync=>undef,active=>0,priority=>9};
	is_deeply($got, $expected, $t);

	## Codes can be unmapped

	$t=q{ Method remove_customcode fails if no arguments are given };
	eval { $bc->remove_customcode(); };
	like($@, qr{must be a hashref}, $t);

	$t=q{ Method remove_customcode fails if no 'id', 'name', or 'code' argument given };
	eval { $bc->remove_customcode({foo => 'bar'}); };
	like($@, qr{required argument}, $t);

	$t=q{ Method remove_customcode fails if 'code' is not numeric };
	eval { $bc->remove_customcode({code => 'foo'}); };
	like($@, qr{'code' is not numeric}, $t);

	$t=q{ Method remove_customcode fails if 'code' given, but not 'sync' or 'goat' };
	eval { $bc->remove_customcode({code => '123'}); };
	like($@, qr{sync or goat}, $t);

	$t=q{ Method remove_customcode fails if 'goat' is not numeric };
	eval { $bc->remove_customcode({code => $codeid, goat => 'foo'}); };
	like($@, qr{goat is not numeric}, $t);

	$t=q{ Method remove_customcode returns a 0 if 'code' is not valid };
	$count = $bc->remove_customcode({code => '123', goat => 1});
	is($count, 0, $t);

	$t=q{ Method remove_customcode returns 0 if 'sync' is not valid };
	$count = $bc->remove_customcode({code => $codeid, sync => 'nosuch'});
	is($count, 0, $t);

	$t=q{ Method remove_customcode returns 1 if given valid 'id' and 'goat' };
	$count = $bc->remove_customcode({code => $codeid, goat => 1});
	is($count, 1, $t);

	$t=q{ Method customcode deletes from customcode_map correctly };
	$sth->execute();
	$got = $sth->fetchall_arrayref({});
	shift @$expected;
	is_deeply($got, $expected, $t);

	$t=q{ Method remove_customcode returns 1 if given valid 'id' and 'sync' };
	$count = $bc->remove_customcode({code => $codeid, sync => 'cctest'});
	is($count, 1, $t);

	$t=q{ Method remove_customcode returns 0 if repeating previous deletion };
	$count = $bc->remove_customcode({code => $codeid, sync => 'cctest'});
	is($count, 0, $t);

	$t=q{ Method customcode deletes from customcode_map correctly. };
	$sth->execute();
	$got = $sth->fetchall_arrayref({});
	shift @$expected;
	is_deeply($got, $expected, $t);

	## Codes can be deleted

	$t=q{ Method remove_customcode fails if 'id' is not numeric };
	eval { $count = $bc->remove_customcode({id => 'foo'}); };
	like($@, qr{not numeric}, $t);

	$t=q{ Method remove_customcode returns 0 if 'id' does not match };
	$count = $bc->remove_customcode({id => 123});
	is($count, 0, $t);

	$t=q{ Method remove_customcode returns 0 if 'name' does not match };
	$count = $bc->remove_customcode({name => 'nosuch'});
	is($count, 0, $t);

	$t=q{ Method remove_customcode returns 1 if 'id' matches };
	$count = $bc->remove_customcode({id => $codeid});
	is($count, 1, $t);

	$t=q{ Method remove_customcode returns 1 if 'name' matches };
	$count = $bc->remove_customcode({name => 'test2'});
	is($count, 1, $t);

	$t=q{ Method customcode deletes from customcode correctly };
	$SQL = "SELECT * FROM customcode ORDER BY id";
	$sth = $masterdbh->prepare($SQL);
	$sth->execute();
	$got = $sth->fetchall_arrayref({});
	is_deeply($got, [], $t);

	return;

} ## end of test_customcode_methods


sub test_database_methods {

	## Test methods related to the db table

	$location = 'database_methods';

	$t=q{ Adding a database with a null name does not work };
	eval { $masterdbh->do(qq{INSERT INTO db(name) VALUES (NULL)}); };
	like($@, qr{violates not-null constraint}, $t);
	$masterdbh->rollback();

	$t=q{ Adding an invalid database to the db table fails };
	eval { $masterdbh->do(qq{INSERT INTO db(name,dbname,dbuser) VALUES ('bctest','nosuchdb!','no_such_user!')}); };
	like($@, qr{authentication failed|database .* does not exist|no password supplied|could not connect to server}, $t);
	$masterdbh->rollback();

	$t=q{ Dots not allowed in database names };
	eval { $masterdbh->do(qq{INSERT INTO db(name,dbname,dbuser) VALUES ('bct.dotted','aa','bb')}); };
	like($@, qr{db_name_sane}, $t);
	$masterdbh->rollback();

	return;

} ## end of test_database_methods


sub test_goat_methods {

	## Test methods related to the goat table

	$location = 'goat_methods';

	$SQL = qq{INSERT INTO goat(db,tablename,pkey,pkeytype) VALUES };

	$t=q{ Adding a goat with a non-existent database fails };
	eval { $masterdbh->do(qq{$SQL ('invalid','bucardo_test1','id','int')}); };
	like($@, qr{find a database}, $t);
	$masterdbh->rollback();

	$t=q{ Adding an goat with a null table fails };
	eval { $masterdbh->do(qq{$SQL ('bctest1',null,'id','int')}); };
	like($@, qr{not-null constraint}, $t);
	$masterdbh->rollback();
	
	$t=q{ Adding an goat with a no primary key type fails };
	eval { $masterdbh->do(qq{$SQL ('bctest1','bucardo_test1','notid',null)}); };
	like($@, qr{pkey_needs_type}, $t);
	$masterdbh->rollback();

	$t=q{ Adding an goat with an invalid primary key type fails };
	eval { $masterdbh->do(qq{$SQL ('bctest1','bucardo_test1','notid','money')}); };
	like($@, qr{pkeytype_check}, $t);
	$masterdbh->rollback();

	return;

} ## end of test_goat_methods


sub test_sync_methods {

	## Test methods related to the sync table

	$location = 'sync_methods';

	$SQL = qq{INSERT INTO sync(name,synctype,source,targetdb) VALUES };

	$t=q{ Adding invalid synctype to the sync table fails };
	eval { $masterdbh->do(qq{$SQL ('bct','foobar','bctesterd1','bctest2')}) };
	like($@, qr{sync_type}, $t);
	$masterdbh->rollback();
	
	$t=q{ Adding invalid source to the sync table fails };
	eval { $masterdbh->do(qq{$SQL ('bct','pushdelta','invalid','bctest2')}); };
	like($@, qr{sync_source_herd_fk}, $t);
	$masterdbh->rollback();

	$t=q{ Adding invalid targetdb to the sync table fails };
	eval { $masterdbh->do(qq{$SQL ('bct','fullcopy','bctestherd1','invalid')}); };
	like($@, qr{sync_targetdb_fk}, $t);
	$masterdbh->rollback();

	$t=q{ Adding targetgroup to a swap sync table fails };
	(my $si = $SQL) =~ s/targetdb/targetgroup/;
	eval { $masterdbh->do(qq{$si ('bct','swap','bctestherd1','invalid')}); };
	like($@, qr{sync_swap_nogroup}, $t);
	$masterdbh->rollback();

	$t=q{ Adding invalid targetgroup to the sync table fails };
	eval { $masterdbh->do(qq{$si ('bct','pushdelta','bctestherd1','invalid')}); };
	like($@, qr{sync_targetgroup_fk}, $t);
	$masterdbh->rollback();

	$masterdbh->do("DELETE FROM sync");
	$t=q{ Adding a duplicate row to the sync table fails };
	$masterdbh->do(qq{$SQL ('bct','fullcopy','bctestherd1','bctest2')});
	eval { $masterdbh->do(qq{$SQL ('bct','fullcopy','bctestherd1','bctest2')}); };
	like($@, qr{sync_name_pk}, $t);
	$masterdbh->rollback();

	$t=q{ One of targetdb or targetgroup must be not-null when adding to sync table };
	eval { $masterdbh->do(qq{$SQL ('bct','fullcopy','bctestherd1',NULL)}); };
	like($@, qr{sync_validtarget}, $t);
	$masterdbh->rollback();

	$t=q{ Dots not allowed in sync names };
	eval { $masterdbh->do(qq{$SQL ('bct.dotted','swap','bctestherd1','bctest2')}); };
	like($@, qr{sync_name_sane}, $t);
	$masterdbh->rollback();

	return;

} ## end of test_sync_methods


sub test_config {

	$location = 'bucardo_config';

	$t=q{ Values of bucardo_config from bucardo.schema are available from %config };
	my $val = $bc->get_config('kid_abort_limit');
	is($val, 3, $t);

	$t=q{ Values of bucardo_config from bucardo.schema are available from %config_about };
	$val = $bc->get_config_about('kid_abort_limit');
	is($val, 'How many times we will restore an aborted kid before giving up?', $t);
	
	$t=q{ Method set_config stores a new setting inside of %config };
	my $fingerprint = '2529 DF6A B8F7 9407 E944  45B4 BC9B 9067 1496 4AC8';
	$bc->set_config('pgp_fingerprint', $fingerprint);
	$val = $bc->get_config('pgp_fingerprint');
	is($val, $fingerprint, $t);

	$t=q{ Method set_config_about allows us set a new setting description };
	$bc->set_config_about('pgp_fingerprint', 'PGP fingerprint');
	$val = $bc->get_config_about('pgp_fingerprint');
	is($val, 'PGP fingerprint', $t);

	$t=q{ Method set_config allows us to change an existing value };
	$fingerprint =~ s/ //g;
	$bc->set_config('pgp_fingerprint', $fingerprint);
	$val = $bc->get_config('pgp_fingerprint');
	is($val, $fingerprint, $t);

	$t=q{ Method save_config writes an existing config to the database };
	$bc->store_config('pgp_fingerprint');
	$SQL = "SELECT value FROM bucardo_config WHERE setting = 'pgp_fingerprint'";
	$val = $masterdbh->selectall_arrayref($SQL)->[0][0];
	is($val, $fingerprint, $t);

	$t=q{ All configuration settings are forced to lowercase };
	$fingerprint = lc $fingerprint;
	$bc->set_config('PGP_Fingerprint', $fingerprint);
	$val = $bc->get_config('pgp_fingerprint');
	is($val, $fingerprint, $t);

	$t=q{ Setting a bucardo_config setting twice gives an error };
	$SQL = "INSERT INTO bucardo_config(setting,value) VALUES (?,?)";
	$sth = $masterdbh->prepare($SQL);
	$sth->execute('bctest_unique', 123);
	$masterdbh->commit();
	eval {
		$sth->execute('bctest_unique', 123);
	};
	like($@, qr{violates unique constraint}, $t);
	$masterdbh->rollback();

	$t=q{ A bucardo_config setting of 'sync' gives an error };
	eval {
		$sth->execute('sync', 123);
	};
	like($@, qr{Invalid setting name}, $t);
	$masterdbh->rollback();

	$t=q{ A bucardo_config setting of 'goat' gives an error };
	eval {
		$sth->execute('goat', 123);
	};
	like($@, qr{Invalid setting name}, $t);
	$masterdbh->rollback();

	$t=q{ Setting a bucardo_config type/name works };
	$SQL = "INSERT INTO bucardo_config(setting,value,type,name) VALUES (?,?,?,?)";
	$sth = $masterdbh->prepare($SQL);
	$sth->execute('bctest_unique', 123, 'sync', 'foo');

	$t=q{ Setting a bucardo_config type without a name gives an error };
	$sth = $masterdbh->prepare($SQL);
	eval {
		$sth->execute('bctest_unique', 123, 'sync', undef);
	};
	like($@, qr{provide a specific sync}, $t);
	$masterdbh->rollback();

	$t=q{ Setting a bucardo_config name without a type gives an error };
	$sth = $masterdbh->prepare($SQL);
	eval {
		$sth->execute('bctest_unique', 123, undef, 'foo');
	};
	like($@, qr{provide a type}, $t);
	$masterdbh->rollback();

	$t=q{ Setting bucardo_config.name to an invalid goat gives an error };

	$masterdbh->do("DELETE FROM sync");
	$masterdbh->commit();
	$t=q{ Running Bucardo process can be forced to reload conf from the database };
	$bc->sync
		({
		  name      => 'configtest',
		  source    => 'bctestherd1',
		  targetdb  => 'bctest2',
		  synctype  => 'pushdelta',
	  });

	bucardo_ctl("start 'Start config testing'");
	wait4kid('bucardo_q_configtest_bctest2');
	pass(" Bucardo was started");

	$SQL = "INSERT INTO bucardo_config(setting,value,about) VALUES (?,?,?)";
	$sth = $masterdbh->prepare($SQL);
	$sth->execute('bctesting','22','Testing config reload');
	$masterdbh->commit();
	bucardo_ctl("reload_config");
	## TODO: test actual results without adding overhead

	return;

} ## end of test_config


sub ping_testing {

	## Setup a pushdelta sync
	$bc->sync
		({
		  name             => 'pingtest',
		  source           => 'bctestherd1',
		  targetdb         => 'bctest2',
		  synctype         => 'pushdelta',
	  });

	$masterdbh->do("LISTEN bucardo_started");
	$masterdbh->commit();
	pass(" Waiting for bucardo to start up");
	bucardo_ctl("start 'Ping testing'");
	{
		last if $masterdbh->func('pg_notifies');
		sleep 0.1;
		redo;
	}
	pass("Bucardo started up for ping testing");

	$masterdbh->do("UNLISTEN *");
	$masterdbh->do("LISTEN bucardo_mcp_pong");
	$masterdbh->commit();
	$masterdbh->do("NOTIFY bucardo_mcp_ping");
	## This should return very quickly, but we'll give it 5 whole seconds
	my $found = 0;
	for (1..50) {
		$masterdbh->commit();
		if ($masterdbh->func('pg_notifies')) {
			$found = 1;
			last;
		}
		sleep 0.1;
	}
	is($found, 1, qq{MCP responds to bucardo_mcp_ping});

	# We will need the PID to test the CTL ping
	$SQL = "SELECT pid FROM audit_pid WHERE type='CTL' ORDER BY birthdate DESC LIMIT 1";
	my $pid = $masterdbh->selectall_arrayref($SQL)->[0][0];

	$masterdbh->do("LISTEN bucardo_ctl_${pid}_pong");
	$masterdbh->commit();
	$masterdbh->do("NOTIFY bucardo_ctl_${pid}_ping");
	$found = 0;
	for (1..50) {
		$masterdbh->commit();
		if ($masterdbh->func('pg_notifies')) {
			$found = 1;
			last;
		}
		sleep 0.1;
	}
	is($found, 1, qq{CTL responds to bucardo_ctl_<pid>_ping});

	$SQL = "SELECT pid FROM audit_pid WHERE type='KID' ORDER BY birthdate DESC LIMIT 1";
	$pid = $masterdbh->selectall_arrayref($SQL)->[0][0];

	$masterdbh->do("LISTEN bucardo_kid_${pid}_pong");
	$masterdbh->commit();
	$masterdbh->do("NOTIFY bucardo_kid_${pid}_ping");
	$found = 0;
	for (1..50) {
		$masterdbh->commit();
		if ($masterdbh->func('pg_notifies')) {
			$found = 1;
			last;
		}
		sleep 0.1;
	}
	is($found, 1, qq{KID responds to bucardo_kid_<pid>_ping});

	bucardo_ctl("stop 'Ping testing'");

} ## end of ping_testing



sub test_purge {

	## Test the bucardo_purge_delta function
	$location = 'purge';

	$dbh1->do("SET search_path = bucardo_schema, public");
	$dbh2->do("SET search_path = bucardo_schema, public");

	clean_all_tables();

	## A fullcopy sync should not create any of the above
	$bc->sync
		({
		  name             => 'purgetest1',
		  source           => 'bctestherd1',
		  targetdb         => 'bctest2',
		  synctype         => 'fullcopy',
	});

	$t=q{ Table bucardo_delta is not created on source database automatically for fullcopy};
	is (object_count($dbh1, 'bucardo', 'table', 'bucardo_delta'), 0, $t);
	$t=q{ Table bucardo_track is not created on source database automatically for fullcopy};
	is (object_count($dbh1, 'bucardo', 'table', 'bucardo_track'), 0, $t);
	$t=q{ Table bucardo_delta_targets is not created on source database automatically for fullcopy};
	is (object_count($dbh1, 'bucardo', 'table', 'bucardo_delta_targets'), 0, $t);
	$t=q{ Function bucardo_purge_delta() is not created on source database automatically for fullcopy};
	is (object_count($dbh1, 'bucardo', 'function', 'bucardo_purge_delta'), 0, $t);

	$t=q{ Table bucardo_delta is not created on remote database automatically for fullcopy};
	is (object_count($dbh2, 'bucardo', 'table', 'bucardo_delta'), 0, $t);
	$t=q{ Table bucardo_track is not created on remote database automatically for fullcopy};
	is (object_count($dbh2, 'bucardo', 'table', 'bucardo_track'), 0, $t);
	$t=q{ Table bucardo_delta_targets is not created on remote database automatically for fullcopy};
	is (object_count($dbh2, 'bucardo', 'table', 'bucardo_delta_targets'), 0, $t);
	$t=q{ Function bucardo_purge_delta() is not created on remote database automatically for fullcopy};
	is (object_count($dbh2, 'bucardo', 'function', 'bucardo_purge_delta'), 0, $t);

	$masterdbh->do("DELETE FROM sync");
	$masterdbh->commit();

	## A pushdelta should create things on one side only
	$bc->sync
		({
		  name             => 'purgetest2',
		  source           => 'bctestherd1',
		  targetdb         => 'bctest2',
		  synctype         => 'pushdelta',
	});

	$dbh1->do("SET search_path = bucardo_schema, bucardo, public");
	$dbh1->commit();

	$t=q{ Table bucardo_delta IS created on source database automatically for pushdelta};
	is (object_count($dbh1, 'bucardo', 'table', 'bucardo_delta'), 1, $t);
	$t=q{ Table bucardo_track IS created on source database automatically for pushdelta};
	is (object_count($dbh1, 'bucardo', 'table', 'bucardo_track'), 1, $t);
	$t=q{ Table bucardo_delta_targets IS created on source database automatically for pushdelta};
	is (object_count($dbh1, 'bucardo', 'table', 'bucardo_delta_targets'), 1, $t);
	$t=q{ Function bucardo_purge_delta() IS created on source database automatically for pushdelta};
	is (object_count($dbh1, 'bucardo', 'function', 'bucardo_purge_delta'), 1, $t);

	$t=q{ Table bucardo_delta is not created on remote database automatically for pushdelta};
	is (object_count($dbh2, 'bucardo', 'table', 'bucardo_delta'), 0, $t);
	$t=q{ Table bucardo_track is not created on remote database automatically for pushdelta};
	is (object_count($dbh2, 'bucardo', 'table', 'bucardo_track'), 0, $t);
	$t=q{ Table bucardo_delta_targets is not created on remote database automatically for pushdelta};
	is (object_count($dbh2, 'bucardo', 'table', 'bucardo_delta_targets'), 0, $t);
	$t=q{ Function bucardo_purge_delta() is not created on remote database automatically for pushdelta};
	is (object_count($dbh2, 'bucardo', 'function', 'bucardo_purge_delta'), 0, $t);

	$t=q{ Table bucardo_delta_targets is populated on source database at pushdelta sync creation};
	my $getoids = "SELECT relname, oid FROM pg_class WHERE relname ~ 'bucardo_test'";
	my %oid;
	for (@{$dbh1->selectall_arrayref($getoids)}) {
		$oid{source}{$_->[0]} = $_->[1];
	}
	for (sort keys %tabletype) {
		push @{$oid{sourceresult}} => [$oid{source}{$_},'bctest2'];
	}
	my $view_targets = "SELECT * FROM bucardo_delta_targets ORDER BY tablename";
	$got = $dbh1->selectall_arrayref($view_targets);
	is_deeply($got, $oid{sourceresult}, $t);

	$masterdbh->do("DELETE FROM bucardo.sync");
	$masterdbh->commit();

	$t=q{ Delete from sync removes the bucardo_delta_targets row};
	$got = $dbh1->selectall_arrayref($view_targets);
	is_deeply($got, [], $t);

	## Remove the old ones for a better test
	$dbh1->do("DROP TABLE bucardo_delta");
	$dbh1->do("DROP TABLE bucardo_delta_targets");
	$dbh1->do("DROP TABLE bucardo_track");
	$dbh1->do("DROP FUNCTION bucardo_purge_delta(interval)");
	$dbh1->commit();

	## A swap should create things on both sides
	$bc->sync
		({
		  name             => 'purgetest3',
		  source           => 'bctestherd1',
		  targetdb         => 'bctest2',
		  synctype         => 'swap',
	});

	$dbh2->do("SET search_path = bucardo_schema, bucardo, public");
	$dbh2->commit();

	$t=q{ Table bucardo_delta IS created on source database automatically for swap};
	is (object_count($dbh1, 'bucardo', 'table', 'bucardo_delta'), 1, $t);
	$t=q{ Table bucardo_track IS created on source database automatically for swap};
	is (object_count($dbh1, 'bucardo', 'table', 'bucardo_track'), 1, $t);
	$t=q{ Table bucardo_delta_targets IS created on source database automatically for swap};
	is (object_count($dbh1, 'bucardo', 'table', 'bucardo_delta_targets'), 1, $t);
	$t=q{ Function bucardo_purge_delta() IS created on source database automatically for swap};
	is (object_count($dbh1, 'bucardo', 'function', 'bucardo_purge_delta'), 1, $t);

	$t=q{ Table bucardo_delta IS created on remote database automatically for swap};
	is (object_count($dbh2, 'bucardo', 'table', 'bucardo_delta'), 1, $t);
	$t=q{ Table bucardo_track IS created on remote database automatically for swap};
	is (object_count($dbh2, 'bucardo', 'table', 'bucardo_track'), 1, $t);
	$t=q{ Table bucardo_delta_targets IS created on remote database automatically for swap};
	is (object_count($dbh2, 'bucardo', 'table', 'bucardo_delta_targets'), 1, $t);
	$t=q{ Function bucardo_purge_delta() IS created on remote database automatically for swap};
	is (object_count($dbh2, 'bucardo', 'function', 'bucardo_purge_delta'), 1, $t);


	$t=q{ Table bucardo_delta_targets is populated on source database at swap sync creation};
	$got = $dbh1->selectall_arrayref($view_targets);
	is_deeply($got, $oid{sourceresult}, $t);

	$t=q{ Table bucardo_delta_targets is populated on target database at swap sync creation};
	for (@{$dbh2->selectall_arrayref($getoids)}) {
		$oid{target}{$_->[0]} = $_->[1];
	}
	for (sort keys %tabletype) {
		push @{$oid{targetresult}} => [$oid{target}{$_},'bctest1'];
	}
	$got = $dbh2->selectall_arrayref($view_targets);
	is_deeply($got, $oid{targetresult}, $t);

	$masterdbh->do("DELETE FROM bucardo.sync");
	$masterdbh->commit();

	$t=q{ Delete from sync removes the bucardo_delta_targets row on source database};
	$got = $dbh1->selectall_arrayref($view_targets);
	is_deeply($got, [], $t);
	$t=q{ Delete from sync removes the bucardo_delta_targets row on target database};
	$got = $dbh2->selectall_arrayref($view_targets);
	is_deeply($got, [], $t);

	## Same, but test with a targetgroup

	$masterdbh->do("INSERT INTO dbgroup(name) VALUES ('bcgroup1')");
	$masterdbh->do("INSERT INTO dbmap(db,dbgroup) VALUES ('bctest2','bcgroup1')");
	$masterdbh->do("INSERT INTO dbmap(db,dbgroup) VALUES ('bctest3','bcgroup1')");
	$masterdbh->commit();

	## Remove the old ones for a better test
	$dbh1->do("DROP TABLE bucardo_delta");
	$dbh1->do("DROP TABLE bucardo_delta_targets");
	$dbh1->do("DROP TABLE bucardo_track");
	$dbh1->do("DROP FUNCTION bucardo_purge_delta(interval)");
	$dbh1->commit();

	## Remove the old ones for a better test
	$dbh2->do("DROP TABLE bucardo_delta");
	$dbh2->do("DROP TABLE bucardo_delta_targets");
	$dbh2->do("DROP TABLE bucardo_track");
	$dbh2->do("DROP FUNCTION bucardo_purge_delta(interval)");
	$dbh2->commit();

	$bc->sync
		({
		  name             => 'purgetest3',
		  source           => 'bctestherd1',
		  targetgroup      => 'bcgroup1',
		  synctype         => 'pushdelta',
	});

	$t=q{ Table bucardo_delta IS created on source database automatically for pushdelta};
	is (object_count($dbh1, 'bucardo', 'table', 'bucardo_delta'), 1, $t);
	$t=q{ Table bucardo_track IS created on source database automatically for pushdelta};
	is (object_count($dbh1, 'bucardo', 'table', 'bucardo_track'), 1, $t);
	$t=q{ Table bucardo_delta_targets IS created on source database automatically for pushdelta};
	is (object_count($dbh1, 'bucardo', 'table', 'bucardo_delta_targets'), 1, $t);
	$t=q{ Function bucardo_purge_delta() IS created on source database automatically for pushdelta};
	is (object_count($dbh1, 'bucardo', 'function', 'bucardo_purge_delta'), 1, $t);

	$t=q{ Table bucardo_delta is not created on first remote database automatically for pushdelta};
	is (object_count($dbh2, 'bucardo', 'table', 'bucardo_delta'), 0, $t);
	$t=q{ Table bucardo_track is not created on first remote database automatically for pushdelta};
	is (object_count($dbh2, 'bucardo', 'table', 'bucardo_track'), 0, $t);
	$t=q{ Table bucardo_delta_targets is not created on first remote database automatically for pushdelta};
	is (object_count($dbh2, 'bucardo', 'table', 'bucardo_delta_targets'), 0, $t);
	$t=q{ Function bucardo_purge_delta() is not created on first remote database automatically for pushdelta};
	is (object_count($dbh2, 'bucardo', 'function', 'bucardo_purge_delta'), 0, $t);

	$t=q{ Table bucardo_delta is not created on second remote database automatically for pushdelta};
	is (object_count($dbh3, 'bucardo', 'table', 'bucardo_delta'), 0, $t);
	$t=q{ Table bucardo_track is not created on second remote database automatically for pushdelta};
	is (object_count($dbh3, 'bucardo', 'table', 'bucardo_track'), 0, $t);
	$t=q{ Table bucardo_delta_targets is not created on second remote database automatically for pushdelta};
	is (object_count($dbh3, 'bucardo', 'table', 'bucardo_delta_targets'), 0, $t);
	$t=q{ Function bucardo_purge_delta() is not created on second remote database automatically for pushdelta};
	is (object_count($dbh3, 'bucardo', 'function', 'bucardo_purge_delta'), 0, $t);

	$t=q{ Table bucardo_delta_targets is populated on source database at pushdelta sync creation};
	$oid{sourceresult} = [];
	for (sort { $oid{target}{$a} <=> $oid{source}{$b} } keys %tabletype) {
		push @{$oid{sourceresult}} => [$oid{source}{$_},'bctest2'], [$oid{source}{$_},'bctest3'];
	}
	$got = $dbh1->selectall_arrayref($view_targets);
	is_deeply($got, $oid{sourceresult}, $t);

	$masterdbh->do("DELETE FROM sync");
	$masterdbh->commit();

	$t=q{ Table bucardo_delta_targets is emptied out after sync is removed};
	$got = $dbh1->selectall_arrayref($view_targets);
	is_deeply($got, [], $t);

	## Start up the sync again for function testing
	$bc->sync
		({
		  name             => 'purgetest3',
		  source           => 'bctestherd1',
		  targetgroup      => 'bcgroup1',
		  synctype         => 'pushdelta',
	});

	## And kick it off
	bucardo_ctl("start 'Start purge testing'");

	$t=q{ Calling bucardo_purge_delta with no arguments fails};
	eval {
		$dbh1->do("SELECT bucardo.bucardo_purge_delta()");
	};
	like($@, qr{does not exist}, $t);
	$dbh1->rollback();

	$t=q{ Calling bucardo_purge_delta with text argument fails};
	eval {
		$dbh1->do("SELECT bucardo.bucardo_purge_delta('foobar')");
	};
	like($@, qr{invalid input syntax for type interval}, $t);
	$dbh1->rollback();

	$t=q{ Calling bucardo_purge_delta with valid interval argument works};
	eval {
		$got = $dbh1->selectall_arrayref("SELECT bucardo.bucardo_purge_delta('10 minutes'::interval)")->[0][0];
	};
	like($@, qr{}, $t);

	$t=q{ Calling bucardo_purge_delta returns the expected text string};
	is($got, "Rows deleted from bucardo_delta: 0 Rows deleted from bucardo_track: 0", $t);

	## Populate the table with a few rows
	$SQL = "INSERT INTO bucardo_test1(id,inty,data1) VALUES (?,?,?)";
	my $insert1 = $dbh1->prepare($SQL);
	my $insert2 = $dbh2->prepare($SQL);

	$insert1->execute(1,1,'purgetest1');
	$insert1->execute(2,2,'purgetest2');

	bucardo_ctl("kick purgetest3 0");

	my $viewdelta = "SELECT * FROM bucardo.bucardo_delta ORDER BY txntime, rowid";
	$val = $dbh1->selectall_arrayref($viewdelta);

	$t=q{ Calling bucardo_purge_delta respects time passed in};
	$SQL = "SELECT bucardo_purge_delta('1 hour'::interval)";
	$dbh1->do($SQL);
	$got = $dbh1->selectall_arrayref($viewdelta);
	is_deeply($got, $val, $t);

	## Insert some bogus entries directly into bucardo_delta and bucardo_track
	$dbh1->commit();
	$dbh1->do("INSERT INTO bucardo_delta(tablename, rowid) VALUES ($oid{source}{'bucardo_test1'},'12345')");
	$dbh1->do("INSERT INTO bucardo_delta(tablename, rowid) VALUES ($oid{source}{'bucardo_test2'},'67890')");
	$sth = $dbh1->prepare("INSERT INTO bucardo_track(txntime, tablename, targetdb) VALUES ((SELECT now()),?,?)");
	$sth->execute('999','invalid1');
	$sth->execute('888','invalid1');
	$sth->execute($oid{source}{'bucardo_test1'},'invalid1');
	$sth->execute($oid{source}{'bucardo_test2'},'invalid2');
	my $now = $dbh1->selectall_arrayref("SELECT now()")->[0][0];
	$dbh1->commit();

	my $deltaval = [
					[$oid{source}{'bucardo_test1'},'12345',$now],
					[$oid{source}{'bucardo_test2'},'67890',$now],
					];

	$SQL = "SELECT bucardo_purge_delta('1 second'::interval)";
	$got = $dbh1->selectall_arrayref("SELECT bucardo_purge_delta('0 second'::interval)")->[0][0];
	$t=q{ Calling bucardo_purge_delta returns the expected text string};
	is($got, "Rows deleted from bucardo_delta: 2 Rows deleted from bucardo_track: 2", $t);

	$t=q{ Calling bucardo_purge_delta purges the expected rows};
	$got = $dbh1->selectall_arrayref($viewdelta);
	is_deeply($got, $deltaval, $t);
	
	return;

} ## end of sub test_purge


sub basic_pushdelta_testing {

	my ($table,$sdbh,$rdbh) = @_;

	$location = 'pushdelta';

	$type = $tabletype{$table};
	my $oid = $table{$sdbh}{$table};

	compare_tables($table,$sdbh,$rdbh) or BAIL_OUT "Compare tables failed?!\n";

	$val = $val{$type}{1};

	$masterdbh->do("LISTEN bucardo_syncdone_pushdeltatest");
	$masterdbh->commit();

	$SQL = $table =~ /0/ 
		? "INSERT INTO $table(id) VALUES ('$val')"
		: "INSERT INTO $table(id,data1,inty) VALUES ('$val','one',1)";
	$sdbh->do($SQL);
	$sdbh->commit;

	$t=qq{ Second table $table still empty before commit };
	$SQL = $table =~ /0/
		? "SELECT id FROM $table"
		: "SELECT id,data1 FROM $table";
	$result = [];
	bc_deeply($result, $rdbh, $SQL, $t);

	$t=q{ After insert, trigger and rule both populate droptest table };
	my $DROPSQL = $table =~ /0/
		? "SELECT type,0 FROM droptest WHERE name = ".$sdbh->quote($table)." ORDER BY 1,2"
		: "SELECT type,inty FROM droptest WHERE name = ".$sdbh->quote($table)." ORDER BY 1,2";
	my $tval = $table =~ /0/ ? 0 : 1;
	$result = [['rule',$tval],['trigger',$tval]];
	bc_deeply($result, $sdbh, $DROPSQL, $t);
   
	$t=q{ Table droptest is empty on remote database };
	$result = [];
	bc_deeply($result, $rdbh, $DROPSQL, $t);

	wait_for_notice($masterdbh, 'bucardo_syncdone_pushdeltatest');

	## Insert to 1 should be echoed to two, after a slight delay:
	$t=qq{ Second table $table got the pushdelta row};
	$SQL = $table =~ /0/
		? "SELECT id,'one' FROM $table"
		: "SELECT id,data1 FROM $table";
	$result = [[qq{$val},'one']];
	bc_deeply($result, $rdbh, $SQL, $t);

	$t=q{ Triggers and rules did not fire on remote table };
	$result = [];
	bc_deeply($result, $rdbh, $DROPSQL, $t);

	## Add a row to two, should not get removed or replicated
	my $rval = $val{$type}{9};
	$SQL = $table =~ /0/
		? "INSERT INTO $table(id) VALUES ('$rval')"
		: "INSERT INTO $table(id,data1,inty) VALUES ('$rval','nine',9)";
	$rdbh->do($SQL);
	$rdbh->commit;

	## Another source change, but with a different trigger drop method
	$SQL = "UPDATE sync SET disable_triggers = 'SQL'";
	$masterdbh->do($SQL);
	$masterdbh->do("NOTIFY bucardo_reload_sync_pushdeltattest");
	$masterdbh->commit();

	$val = $val{$type}{2};
	$SQL = $table =~ /0/
		? "INSERT INTO $table(id) VALUES ('$val')"
		: "INSERT INTO $table(id,data1,inty) VALUES ('$val','two',2)";
	$sdbh->do($SQL);
	$sdbh->commit;

	$t=q{ After insert, trigger and rule both populate droptest table4 };
	$result = $table =~ /0/
		? [['rule',0],['rule',0],['trigger',0],['trigger',0]]
		: [['rule',1],['rule',2],['trigger',1],['trigger',2]];
	bc_deeply($result, $sdbh, $DROPSQL, $t);
   
	$t=q{ Table droptest has correct entries on remote database };
	my $ninezero = $table =~ /0/ ? 0 : 9;
	$result = [['rule',$ninezero],['trigger',$ninezero]];
	bc_deeply($result, $rdbh, $DROPSQL, $t);

	wait_for_notice($masterdbh, 'bucardo_syncdone_pushdeltatest');

	## Insert to 1 should be echoed to two, after a slight delay:
	$t=qq{ Second table $table got the pushdelta row};
	$SQL = $table =~ /0/
		? "SELECT id FROM $table ORDER BY id"
		: "SELECT data1,inty FROM $table ORDER BY inty";
	$result = $table =~ /0/
		? [[1],[2],[9]]
		: [['one',1],['two',2],['nine',9]];
	bc_deeply($result, $rdbh, $SQL, $t);

	$t=q{ Triggers and rules did not fire on remote table };
	$result = [['rule',$ninezero],['trigger',$ninezero]];
	bc_deeply($result, $rdbh, $DROPSQL, $t);

	$t=q{ Source table did not get updated for pushdelta sync };
	my $col = $table =~ /0/ ? 'id' : 'inty';
	$SQL = "SELECT count(*) FROM $table WHERE $col = 9";
	$count = $sdbh->selectall_arrayref($SQL)->[0][0];
	is($count, 0, $t);

	## Now with many rows
	$SQL = $table =~ /0/
		? "INSERT INTO $table(id) VALUES (?)"
		: "INSERT INTO $table(id,data1,inty) VALUES (?,?,?)";
	$sth = $sdbh->prepare($SQL);
	for (3..6) {
		$val = $val{$type}{$_};
		$table =~ /0/ ? $sth->execute($val) : $sth->execute($val,'bob',$_);
	}
	$sdbh->commit;

	## Sanity check
	$t=qq{ Rows are not in target table before the kick for $table};
	$sth = $rdbh->prepare("SELECT 1 FROM $table WHERE $col BETWEEN 3 and 6");
	$count = $sth->execute();
	$sth->finish();
	is($count, '0E0', $t);

	wait_for_notice($masterdbh, 'bucardo_syncdone_pushdeltatest');

	$t=qq{ Second table $table got the pushdelta rows};
	$SQL = "SELECT $col FROM $table ORDER BY 1";
	$result = [['1'],['2'],['3'],['4'],['5'],['6'],['9']];
	bc_deeply($result, $rdbh, $SQL, $t);
	$sdbh->commit();
	$rdbh->commit();
	return;

} ## end of basic_pushdelta_testing


sub makedelta_testing {

	my ($table,$sdbh,$rdbh) = @_;

	$location = 'makedelta';

	$type = $tabletype{$table};
	my $oid = $table{$sdbh}{$table};
	my $roid = $table{$rdbh}{$table};

	my ($src_delta,$src_track,$tgt_delta,$tgt_track);

	for my $dbh ($sdbh, $rdbh) {
		$dbh->rollback;
		$dbh->do("DELETE FROM $table");
		$dbh->do("DELETE FROM bucardo.bucardo_delta");
		$dbh->do("DELETE FROM bucardo.bucardo_track");
		$dbh->commit;
	}

	compare_tables($table,$sdbh,$rdbh) or BAIL_OUT "Compare tables failed?!\n";

	$val = $val{$type}{1};

	my $sourcerows = "SELECT * FROM bucardo.bucardo_delta WHERE tablename = $oid ".
		"ORDER BY txntime DESC, rowid DESC";
	my $remoterows = "SELECT tablename,rowid FROM bucardo.bucardo_delta WHERE tablename = $roid ".
		"ORDER BY txntime DESC, rowid DESC";
	my $remotetrackrows = "SELECT tablename,targetdb FROM bucardo.bucardo_track WHERE tablename = $roid ".
		"ORDER BY txntime DESC, tablename DESC";
	my $sourcetrackrows = "SELECT tablename,targetdb FROM bucardo.bucardo_track WHERE tablename = $oid ".
		"ORDER BY txntime DESC, tablename DESC";

	$t=qq{ Insert to source $table populated source bucardo_delta correctly };
	$SQL = "INSERT INTO $table(id,data1,inty) VALUES ('$val','one',1)";
	$sdbh->do($SQL);
	$now = now_time($sdbh);
	$info = $sdbh->selectall_arrayref($sourcerows);
	$result = [[$oid,$val,$now]];
	$src_delta = [[$oid,$val,$now]];
	is_deeply($info, $result, $t);
	$sdbh->commit();

	## Wait until the row gets synced to the target database
	wait_until_true($rdbh => "SELECT 1 FROM $table");

	$t=qq{ Insert to source $table with makedelta created a target bucardo_delta row };
	$info = $rdbh->selectall_arrayref($remoterows);
	$tgt_delta = [[$roid,$val]];
	is_deeply($info, $tgt_delta, $t);

	$t=qq{ Insert to source $table with makedelta created a target bucardo_track row };
	$info = $rdbh->selectall_arrayref($remotetrackrows);
	$tgt_track = [[$roid,'bctest1']];
	is_deeply($info, $tgt_track, $t);

	$t=qq{ Insert to source $table with makedelta created a source bucardo_track row };
	$info = $sdbh->selectall_arrayref($sourcetrackrows);
	$src_track = [[$oid,'bctest2']];
	is_deeply($info, $src_track, $t);

	$t=q{ All rows in bucardo_delta and bucardo_track have the same txntime };
	$SQL = "SELECT count(*) FROM bucardo.bucardo_delta d, bucardo.bucardo_track t WHERE d.txntime <> t.txntime";
	$info = $rdbh->selectall_arrayref($SQL)->[0][0];
	is($info, 0, $t);

	$t=qq{ Update to source $table populated source bucardo_delta correctly };
	$SQL = "UPDATE $table SET inty = 2";
	$sdbh->do($SQL);
	$now = now_time($sdbh);
	$info = $sdbh->selectall_arrayref($sourcerows);
	unshift @$src_delta, [$oid,$val,$now];
	is_deeply($info, $src_delta, $t);
	$sdbh->commit();

	wait_until_true($rdbh => "SELECT 1 FROM $table WHERE inty = 2");

	$t=qq{ Update to source $table with makedelta created a new bucardo_delta row };
	$info = $rdbh->selectall_arrayref($remoterows);
	unshift @$tgt_delta, [$roid, $val];
	is_deeply($info, $tgt_delta, $t);

	$t=qq{ Update to source $table with makedelta created a new bucardo_track row };
	$info = $rdbh->selectall_arrayref($remotetrackrows);
	unshift @$tgt_track, [$roid,'bctest1'];
	is_deeply($info, $tgt_track, $t);

	$t=q{ All rows in bucardo_delta and bucardo_track have the same txntime tablename };
	$SQL = "SELECT count(*) FROM bucardo.bucardo_delta d, bucardo.bucardo_track t ".
		"WHERE d.txntime = t.txntime AND d.tablename = $roid AND t.tablename = $roid";
	$info = $rdbh->selectall_arrayref($SQL)->[0][0];
	is($info, 2, $t);

	## Do an update of the primary key: which should give two rows in bucardo_delta on both ends
	$t=qq{ Update to pk of source $table populated source bucardo_delta correctly };
	my $newval = $val{$type}{3};
	$SQL = "UPDATE $table SET id = '$newval' WHERE id = '$val'";
	$sdbh->do($SQL);
	$now = now_time($sdbh);
	$info = $sdbh->selectall_arrayref($sourcerows);
	unshift @$src_delta, [$oid,$newval,$now],[$oid,$val,$now];
	is_deeply($info, $src_delta, $t);
	$sdbh->commit();

	wait_until_true($rdbh => "SELECT 1 FROM $table WHERE id = '$newval'");

	$t=qq{ Update to pk of source $table with makedelta created two target bucardo_delta rows ($val) };
	$info = $rdbh->selectall_arrayref($remoterows);
	unshift @$tgt_delta, [$roid, $newval], [$roid,$val];
	is_deeply($info, $tgt_delta, $t);

	$t=qq{ Update to pk of source $table with makedelta created a target bucardo_track row };
	$info = $rdbh->selectall_arrayref($remotetrackrows);
	unshift @$tgt_track, [$roid,'bctest1'];
	is_deeply($info, $tgt_track, $t);

	$t=q{ All rows in bucardo_delta and bucardo_track have the same txntime tablename };
	my $bothrows = "SELECT count(*) FROM bucardo.bucardo_delta d, bucardo.bucardo_track t ".
		"WHERE d.txntime = t.txntime AND d.tablename = $roid AND t.tablename = $roid";
	$info = $rdbh->selectall_arrayref($bothrows)->[0][0];
	is($info, 4, $t);

	## Delete should also add a ghost row
	$t=qq{ Delete to source $table populated source bucardo_delta correctly };
	$SQL = "DELETE FROM $table WHERE id = '$newval'";
	$count = $sdbh->do($SQL);
	$now = now_time($sdbh);
	$sdbh->commit();
	$info = $sdbh->selectall_arrayref($sourcerows);
	unshift @$src_delta, [$oid,$newval,$now];
	is_deeply($info, $src_delta, $t);

	wait_until_false($rdbh => "SELECT 1 FROM $table WHERE id = '$newval'");

	$t=qq{ Delete to source $table with makedelta created a target bucardo_delta row };
	$info = $rdbh->selectall_arrayref($remoterows);
	unshift @$tgt_delta, [$roid,$newval];
	is_deeply($info, $tgt_delta, $t);

	$t=qq{ Delete to source $table with makedelta created a target bucardo_track row };
	$info = $rdbh->selectall_arrayref($remotetrackrows);
	unshift @$tgt_track, [$roid,'bctest1'];
	is_deeply($info, $tgt_track, $t);

	$t=q{ All rows in bucardo_delta and bucardo_track have the same txntime tablename };
	$info = $rdbh->selectall_arrayref($bothrows)->[0][0];
	is($info, 5, $t);

	## Now the same thing, but the other way
	$sdbh->do("DELETE FROM bucardo.bucardo_track");
	$sdbh->do("DELETE FROM bucardo.bucardo_delta");
	$sdbh->commit();
	$rdbh->do("DELETE FROM bucardo.bucardo_delta");
	$rdbh->do("DELETE FROM bucardo.bucardo_track");
	$rdbh->commit();
	$src_delta = []; $src_track = []; $tgt_delta = []; $tgt_track = [];

	$sourcerows = "SELECT tablename,rowid FROM bucardo.bucardo_delta WHERE tablename = $oid ".
		"ORDER BY txntime DESC, rowid DESC";

	## Insert:
	$val = $val{$type}{4};
	$SQL = "INSERT INTO $table(id,data1,inty) VALUES ('$val','one',4)";
	$rdbh->do($SQL);
	$rdbh->commit();

	wait_until_true($sdbh => "SELECT 1 FROM $table WHERE inty = 4");

	## The source delta and track tables should now have entries
	$t=qq{ Insert to target $table with makedelta created a source bucardo_delta row };
	$info = $sdbh->selectall_arrayref($sourcerows);
	unshift @$src_delta, [$oid,$val];
	is_deeply($info, $src_delta, $t);

	$t=qq{ Insert to target $table with makedelta created a source bucardo_track row };
	$info = $sdbh->selectall_arrayref($sourcetrackrows);
	unshift @$src_track, [$oid,'bctest2'];
	is_deeply($info, $src_track, $t);

	$t=qq{ Update to target $table populated target bucardo_delta correctly };
	$SQL = "UPDATE $table SET inty = 5 WHERE inty = 4";
	$rdbh->do($SQL);
	$now = now_time($rdbh);
	$info = $rdbh->selectall_arrayref($remoterows);
	unshift @$tgt_delta, [$roid,$val], [$roid,$val];
	is_deeply($info, $tgt_delta, $t);
	$rdbh->commit();

	wait_until_true($sdbh => "SELECT 1 FROM $table WHERE inty = 5");

	$t=qq{ Update to target $table with makedelta created a source bucardo_delta row };
	$info = $sdbh->selectall_arrayref($sourcerows);
	unshift @$src_delta, [$oid, $val];
	is_deeply($info, $src_delta, $t);

	$t=qq{ Update to target $table with makedelta created a source bucardo_track row };
	$info = $sdbh->selectall_arrayref($sourcetrackrows);
	unshift @$src_track, [$oid,'bctest2'];
	is_deeply($info, $src_track, $t);

	## Delete should also add a ghost row
	$t=qq{ Delete to target $table populated target bucardo_delta correctly };
	$SQL = "DELETE FROM $table WHERE inty = 5";
	$rdbh->do($SQL);
	$now = now_time($rdbh);
	$info = $rdbh->selectall_arrayref($remoterows);
	unshift @$tgt_delta, [$roid,$val];
	is_deeply($info, $tgt_delta, $t);
	$rdbh->commit();

	wait_until_false($sdbh => "SELECT 1 FROM $table WHERE inty = 5");

	$t=qq{ Delete to target $table with makedelta created a source bucardo_delta row };
	$info = $sdbh->selectall_arrayref($sourcerows);
	unshift @$src_delta, [$oid,$val];
	is_deeply($info, $src_delta, $t);

	$t=qq{ Delete to target $table with makedelta created a source bucardo_track row };
	$info = $sdbh->selectall_arrayref($sourcetrackrows);
	unshift @$src_track, [$oid,'bctest2'];
	is_deeply($info, $src_track, $t);

	$sdbh->commit();
	$rdbh->commit();
	return;

} ## end of makedelta_testing


sub basic_copy_testing {

	my ($table,$sdbh,$rdbh) = @_;

	$location = 'fullcopy';

	$type = $tabletype{$table};
	my $oid = $table{$sdbh}{$table};

	compare_tables($table,$sdbh,$rdbh) or BAIL_OUT "Compare tables failed?!\n";

	$val = $val{$type}{1};

	$SQL = $table =~ /0/
		? "INSERT INTO $table(id) VALUES ('$val')"
		: "INSERT INTO $table(id,data1,inty) VALUES ('$val','one',1)";
	$sdbh->do($SQL);
	$sdbh->commit;

	$t=q{ After insert, trigger and rule both populate droptest table };
	my $DROPSQL = "SELECT type,inty FROM droptest WHERE name = ".$sdbh->quote($table)." ORDER BY 1,2";
	$table =~ /0/ and $DROPSQL =~ s/inty/1/;
	$result = [['rule',1],['trigger',1]];
	bc_deeply($result, $sdbh, $DROPSQL, $t);

	$t=q{ Table droptest is empty on remote database };
	$result = [];
	bc_deeply($result, $rdbh, $DROPSQL, $t);

	$t=qq{ Second table $table still empty before kick };
	my $SELECTSQL = "SELECT inty FROM $table ORDER BY id";
	$table =~ /0/ and $SELECTSQL =~ s/inty/id/;
	$result = [];
	bc_deeply($result, $rdbh, $SELECTSQL, $t);
   
	bucardo_ctl("Kick copytest 0");

	$t=qq{ Second table $table got the fullcopy row};
	$result = [[1]];
	bc_deeply($result, $rdbh, $SELECTSQL, $t);

	$t=q{ Triggers and rules did NOT fire on remote table };
	$result = [];
	bc_deeply($result, $rdbh, $DROPSQL, $t);

	## Same thing, but with a different trigger drop method
	$SQL = "UPDATE sync SET disable_triggers = 'SQL', disable_rules = 'pg_class'";
	$masterdbh->do($SQL);
	$masterdbh->do("NOTIFY bucardo_reload_sync_copytest");
	$masterdbh->commit();

	my $oneval = $val;
	$val = $val{$type}{2};
	$SQL = $table =~ /0/
		? "INSERT INTO $table(id) VALUES ('$val')"
		: "INSERT INTO $table(id,data1,inty) VALUES ('$val','two',2)";
	$sdbh->do($SQL);
	$sdbh->commit;

	$t=q{ After insert, trigger and rule both populate droptest table };
	$result = $table =~ /0/
		? [['rule',1],['rule',1],['trigger',1],['trigger',1]]
		: [['rule',1],['rule',2],['trigger',1],['trigger',2]];
	bc_deeply($result, $sdbh, $DROPSQL, $t);
   
	$t=q{ Table droptest is empty on remote database };
	$result = [];
	bc_deeply($result, $rdbh, $DROPSQL, $t);

	bucardo_ctl("kick copytest 0");

	$t=qq{ Second table $table got the fullcopy row};
	$result = [[1],[2]];
	bc_deeply($result, $rdbh, $SELECTSQL, $t);

	$t=q{ Triggers and rules did NOT fire on remote table };
	$result = [];
	bc_deeply($result, $rdbh, $DROPSQL, $t);

	$rdbh->commit; $sdbh->commit; $masterdbh->commit;
	## Now with many rows
	$SQL = $table =~ /0/
		? "INSERT INTO $table(id) VALUES (?)"
		: "INSERT INTO $table(id,data1,inty) VALUES (?,?,?)";
	$sth = $sdbh->prepare($SQL);
	for (3..6) {
		$val = $val{$type}{$_};
		$table =~ /0/ ? $sth->execute($val) : $sth->execute($val,'bob',$_);
	}
	$sdbh->commit;

	## Sanity check
	$t=qq{ Rows are not in target table before the kick for $table};
	$sth = $rdbh->prepare($SELECTSQL);
	$count = $sth->execute();
	$sth->finish();
	is($count, 2, $t);

	bucardo_ctl("kick copytest 0");

	$t=qq{ Second table $table got the fullcopy rows};
	$result = [[1],[2],[3],[4],[5],[6]];
	bc_deeply($result, $rdbh, $SELECTSQL, $t);
	$sdbh->commit();
	$rdbh->commit();
	pass(" End of basic_copy_testing for $table");
	return;

} ## end of basic_copy_testing


sub analyze_after_copy {

	my ($table,$sdbh,$rdbh) = @_;

	$location = 'fullcopy analyze';

	$type = $tabletype{$table};
	my $oid = $table{$sdbh}{$table};

	compare_tables($table,$sdbh,$rdbh) or BAIL_OUT "Compare tables failed?!\n";

	my $insertval = 7;
	$val = $val{$type}{$insertval};

	## Make sure by default we do an analyze
	$SQL = "INSERT INTO $table(id,inty) VALUES ('$val', $insertval)";
	$sdbh->do($SQL);

	bucardo_ctl("kick copytest 0");

	$t=q{ By default, analyze_after_copy is run};
	$SQL = qq{
         SELECT reltuples
         FROM pg_class c, pg_namespace n
         WHERE c.relnamespace = n.oid AND n.nspname=? AND c.relname=?
	};
	$sth{reltuples} = $sth = $rdbh->prepare_cached($SQL);
	$sth->execute($TEST_SCHEMA,$table);
	$count = $sth->fetchall_arrayref()->[0][0];
	is($count, $insertval, $t);

	$t=q{ After truncate, reltuples is 0};
	$rdbh->do("TRUNCATE TABLE $table");
	$sth->execute($TEST_SCHEMA,$table);
	$count = $sth->fetchall_arrayref()->[0][0];
	is($count, 0, $t);
	$sdbh->do("TRUNCATE TABLE $table");
	$sdbh->commit();
	$rdbh->commit();

	## Turn off at the goat level, reload the sync, should not analyze
	our $sync_reloaded_notice = 'bucardo_reloaded_sync_copytest';
	$masterdbh->do("LISTEN $sync_reloaded_notice");
	$SQL = "UPDATE goat SET analyze_after_copy = false WHERE tablename = '$table'";
	$masterdbh->do($SQL);
	$masterdbh->commit();
	$masterdbh->do("NOTIFY bucardo_reload_sync_copytest");
	$masterdbh->commit();
	wait_for_notice($masterdbh, $sync_reloaded_notice);

	$SQL = "INSERT INTO $table(id,inty) VALUES ('$val', $insertval)";
	$sdbh->do($SQL);

	bucardo_ctl("kick copytest 0");

	$t=q{ With goat-level analyze_after_copy false, analyze is not run};
	$sth = $sth{reltuples};
	$sth->execute($TEST_SCHEMA,$table);
	$count = $sth->fetchall_arrayref()->[0][0];
	is($count, 0, $t);

	## Turn off at sync level, on at goat, reload the sync, should not analyze
	$rdbh->do("TRUNCATE TABLE $table");
	$sdbh->do("TRUNCATE TABLE $table");
	$sdbh->commit();
	$rdbh->commit();
	$SQL = "UPDATE goat SET analyze_after_copy = true WHERE tablename = '$table'";
	$masterdbh->do($SQL);
	$SQL = "UPDATE sync SET analyze_after_copy = false WHERE name = 'copytest'";
	$masterdbh->do($SQL);
	$masterdbh->commit();
	$masterdbh->do("NOTIFY bucardo_reload_sync_copytest");
	$masterdbh->commit();
	wait_for_notice($masterdbh, $sync_reloaded_notice);

	$SQL = "INSERT INTO $table(id,inty) VALUES ('$val', $insertval)";
	$sdbh->do($SQL);

	bucardo_ctl("kick copytest 0");

	$t=q{ With sync-level analyze_after_copy false, analyze is not run};
	$sth = $sth{reltuples};
	$sth->execute($TEST_SCHEMA,$table);
	$count = $sth->fetchall_arrayref()->[0][0];
	is($count, 0, $t);

	## Turn them both back on, should now run
	$rdbh->do("TRUNCATE TABLE $table");
	$sdbh->do("TRUNCATE TABLE $table");
	$sdbh->commit();
	$rdbh->commit();
	$SQL = "UPDATE sync SET analyze_after_copy = true WHERE name = 'copytest'";
	$masterdbh->do($SQL);
	$masterdbh->commit();
	$masterdbh->do("NOTIFY bucardo_reload_sync_copytest");
	$masterdbh->commit();
	wait_for_notice($masterdbh, $sync_reloaded_notice);

	$SQL = "INSERT INTO $table(id,inty) VALUES ('$val', $insertval)";
	$sdbh->do($SQL);

	bucardo_ctl("kick copytest 0");

	$t=q{ With analyze_after_copy both true, analyze is run};
	$sth = $sth{reltuples};
	$sth->execute($TEST_SCHEMA,$table);
	$count = $sth->fetchall_arrayref()->[0][0];
	is($count, 1, $t);

	return;

} ## end of analyze_after_copy


sub basic_swap_testing {

	my ($table,$sdbh,$rdbh) = @_;

	$location = 'swap';

	$type = $tabletype{$table};
	my $oid = $table{$sdbh}{$table};

	clean_swap_table($table,[$sdbh,$rdbh]);

	compare_tables($table,$sdbh,$rdbh) or BAIL_OUT "Compare tables failed?!\n";

	my ($val1,$val2,$val3,$val4) = 
		($val{$type}{1},$val{$type}{2},$val{$type}{3},$val{$type}{4});

	$SQL = $table =~ /0/
		? "INSERT INTO $table(id) VALUES ('$val1')"
		: "INSERT INTO $table(id,data1,inty) VALUES ('$val1','one',1)";
	$sdbh->do($SQL);

	$t=qq{ Second table $table still empty before commit};
	my $SELECTID = "SELECT id FROM $table";
	$result = [];
	bc_deeply($result, $rdbh, $SELECTID, $t);

	$t=qq{ Sync on $table does not create a bucardo_track entry before commit};
	$SQL = "SELECT * FROM bucardo.bucardo_track WHERE tablename = $oid";
	bc_deeply([], $sdbh, $SQL, $t);

	$t=q{ After insert, trigger and rule both populate droptest table };
	my $DROPSQL = "SELECT type,inty FROM droptest WHERE name = ".$sdbh->quote($table)." ORDER BY 1,2";
	$table =~ /0/ and $DROPSQL =~ s/inty/1/;
	$result = [['rule',1],['trigger',1]];
	bc_deeply($result, $sdbh, $DROPSQL, $t);
   
	$t=q{ Table droptest is empty on remote database };
	$result = [];
	bc_deeply($result, $rdbh, $DROPSQL, $t);

	## Insert to 1 should be echoed to two, after a slight delay:
	$t=qq{ Second table $table got the sync insert row};
	$now = now_time($sdbh);
	$sdbh->commit();
	wait_until_true($rdbh => "SELECT 1 FROM $table WHERE id = '$val1'");
	$result = [[qq{$val1}]];
	bc_deeply($result, $rdbh, $SELECTID, $t);

	$t=qq{ Sync on $table creates a valid bucardo_track entry};
	$SQL = "SELECT * FROM bucardo.bucardo_track WHERE tablename = $oid";
	$result2 = [[$now,$oid,$db{$rdbh}]];
	bc_deeply($result2, $sdbh, $SQL, $t);

	$t=q{ Table droptest is empty on remote database after sync };
	$result = [];
	bc_deeply($result, $rdbh, $DROPSQL, $t);

	my $SELECTDATA = "SELECT id,data1 FROM $table";
	## An update should echo
	if ($table =~ /0/) {
		$t=qq{ Skipping update test - only one column};
		$result = [['skip']];
		$SQL = "SELECT 'skip'";
	}
	else {
		$t=qq{ Second table $table caught the sync update};
		$SQL = "UPDATE $table SET data1 = 'upper' WHERE id = '$val1'";
		$sdbh->do($SQL);
		$now = now_time($sdbh);
		$sdbh->commit();
		wait_until_true($rdbh => "SELECT 1 FROM $table WHERE data1 = 'upper'");
		$SQL = $SELECTDATA;
		$result = [[qq{$val1},'upper']];
	}
	bc_deeply($result, $rdbh, $SQL, $t);

	$t=qq{ Second sync on $table creates a valid bucardo_track entry};
	$SQL = "SELECT * FROM bucardo.bucardo_track WHERE tablename = $oid ORDER BY txntime";
	push @$result2, [$now,$oid,$db{$rdbh}] unless $table =~ /0/;
	## XX Sometimes make test fails here - race condition?
	bc_deeply($result2, $sdbh, $SQL, $t);

	$t=qq{ Second table $table caught the delete};
	$SQL = "DELETE FROM $table WHERE id = '$val1'";
	$sdbh->do($SQL);
	$sdbh->commit();
	wait_until_false($rdbh => "SELECT 1 FROM $table WHERE id = '$val1'");
	$result = [];
	$SQL = $table =~ /0/ ? $SELECTID : $SELECTDATA;
	bc_deeply($result, $rdbh, $SQL, $t);

	## Quick test of a noop update. Tables are empty at this point.
	$SQL = "UPDATE $table SET data1 = 'foobar' WHERE id = '$val1'";
	if ($table !~ /0/) {
		$sdbh->do($SQL);
		$rdbh->do($SQL);
	}

	## Insert, reverse direction
	$t=qq{ First table $table synced the insert};
	$SQL = $table =~ /0/
		? "INSERT INTO $table(id) VALUES ('$val2')"
		: "INSERT INTO $table(id,data1,inty) VALUES ('$val2','revins',2)";
	$rdbh->do($SQL);
	$rdbh->commit();
	wait_until_true($sdbh => "SELECT 1 FROM $table WHERE id = '$val2'");
	$result = [[qq{$val2}]];
	bc_deeply($result, $sdbh, $SELECTID, $t);


	## Rest of the tests do not apply for test0
	if ($table =~ /test0/) {
		for (1..4) {
			pass(qq{ Skipping update tests as $table only has one column }); ## TESTCOUNT - 1
		}
		$sdbh->do("DELETE FROM droptest");
		$rdbh->do("DELETE FROM droptest");
		$sdbh->commit();
		$rdbh->commit();
		return;
	}

	## Insert, forward direction, and update, reverse
	$SQL = "INSERT INTO $table(id,data1,inty) VALUES ('$val3','insert',3)";
	$sdbh->do($SQL);

	$SQL = "UPDATE $table SET data1 = 'gator' WHERE id = '$val2'";
	$rdbh->do($SQL);

	$t=qq{ Sync on $table inserted to second};
	$sdbh->commit();
	$rdbh->commit();
	wait_until_true($sdbh => "SELECT 1 FROM $table WHERE data1 = 'gator'");
	$SQL = "SELECT id,data1 FROM $table WHERE id = '$val3'";
	$result = [[qq{$val3},'insert']];
	bc_deeply($result, $rdbh, $SQL, $t);
	$t=qq{ Sync on $table updated first};
	$SQL = "SELECT id,data1 FROM $table WHERE id = '$val2'";
	$result = [[qq{$val2},'gator']];
	bc_deeply($result, $sdbh, $SQL, $t);

	## Add to both sides, delete from both sides, update both sides
	## They currently both have:
	# 2 | gator
	# 3 | insert
	## Add to second: 12, 14, 16

	$rdbh->do("INSERT INTO $table(id,data1,inty) VALUES ('$val{$type}{12}','insert',12)");
	$rdbh->do("INSERT INTO $table(id,data1,inty) VALUES ('$val{$type}{14}','insert',14)");
	$rdbh->do("INSERT INTO $table(id,data1,inty) VALUES ('$val{$type}{16}','insert',16)");
	## Add to first: 13, 15, 17
	$sdbh->do("INSERT INTO $table(id,data1,inty) VALUES ('$val{$type}{13}','insert',13)");
	$sdbh->do("INSERT INTO $table(id,data1,inty) VALUES ('$val{$type}{15}','insert',15)");
	$sdbh->do("INSERT INTO $table(id,data1,inty) VALUES ('$val{$type}{17}','insert',17)");

	## Delete one from each: 14, 13
	$rdbh->do("DELETE FROM $table WHERE id = '$val{$type}{14}'");
	$sdbh->do("DELETE FROM $table WHERE id = '$val{$type}{13}'");

	## Update one old and one new: 2,17,3,12
	$sdbh->do("UPDATE $table SET data1 = 'updated' WHERE id = '$val2'");
	$sdbh->do("UPDATE $table SET data1 = 'updated' WHERE id = '$val{$type}{17}'");
	$rdbh->do("UPDATE $table SET data1 = 'updated' WHERE id = '$val3'");
	$rdbh->do("UPDATE $table SET data1 = 'updated' WHERE id = '$val{$type}{12}'");

	$sdbh->commit();
	$rdbh->commit();
	wait_until_true($rdbh => "SELECT 1 FROM $table WHERE inty = 15");

	$SQL = "SELECT id,data1 FROM $table ORDER BY inty";
	$result = [
			   [qq{$val{$type}{2}},'updated'],
			   [qq{$val{$type}{3}},'updated'],
			   [qq{$val{$type}{12}},'updated'],
			   [qq{$val{$type}{15}},'insert'],
			   [qq{$val{$type}{16}},'insert'],
			   [qq{$val{$type}{17}},'updated'],
			   ];
	bc_deeply($result, $sdbh, $SQL, " Complex sync of $table looks good on first database");
	bc_deeply($result, $rdbh, $SQL, " Complex sync of $table looks good on second database");

	$sdbh->do("DELETE FROM droptest");
	$rdbh->do("DELETE FROM droptest");

	$sdbh->commit();
	$rdbh->commit();

	return;

} ## end of basic_swap_testing


sub bucardo_delta_populate {

	## Tests the population of bucardo_delta
	my ($table,$dbh) = @_;

	$location = 'delta populate';

	my $oid = $table{$dbh}{$table};
	$type = $tabletype{$table};
	## Just in case, empty out the bucardo_delta table
	$dbh->rollback;
	$dbh->do("DELETE FROM $table");
	$dbh->do("DELETE FROM bucardo.bucardo_delta WHERE tablename = '$oid'");
	$dbh->commit;

	## Does an insert create an entry in the bucardo_delta table?
	$val = $val{$type}{1};

	my $sourcerows = "SELECT * FROM bucardo.bucardo_delta WHERE tablename = $oid ".
		"ORDER BY txntime DESC, rowid DESC";

	$t=qq{ Insert to $table populated bucardo_delta correctly};
	$SQL = "INSERT INTO $table(id,data1,inty) VALUES ('$val','one',1)";
	$dbh->do($SQL);
	$now = now_time($dbh);
	$info = $dbh->selectall_arrayref($sourcerows);
	$result = [[$oid,$val,$now]];
	is_deeply($info, $result, $t);

	## Does an update do the same?
	$t=qq{ Update to $table populated bucardo_delta correctly};
	$SQL = "UPDATE $table SET data1='changed' WHERE id = '$val'";
	$dbh->do($SQL);
	$now = now_time($dbh);
	$info = $dbh->selectall_arrayref($sourcerows);
	unshift @$result, [$oid,$val,$now];
	is_deeply($info, $result, $t);

	$t=qq{ Update to $table populated bucardo_delta correctly};
	$val2 = $val;
	$val = $val{$type}{18};
	$SQL = "UPDATE $table SET id='$val' WHERE id = '$val2'";
	$dbh->do($SQL);
	$info = $dbh->selectall_arrayref($sourcerows);
	unshift @$result, [$oid,$val,$now], [$oid,$val2,$now];
	is_deeply($info, $result, $t);

	## Does a delete add a new row as well?
	$t=qq{ Delete to $table populated bucardo_delta correctly };
	$SQL = "DELETE FROM $table WHERE id = '$val'";
	$dbh->do($SQL);
	$info = $dbh->selectall_arrayref($sourcerows);
	unshift @$result, [$oid,$val,$now];
	is_deeply($info, $result, $t);

	## Two inserts at once
	$t=qq{ Double insert to $table populated bucardo_delta correctly};
	$val = $val{$type}{22};
	$val2 = $val{$type}{23};
	$SQL = qq{
		INSERT INTO $table(id,data1,inty)
		SELECT '$val'::$type, 'twentytwo',22
		UNION ALL
		SELECT '$val2'::$type, 'twentythree',23
	};
	$dbh->do($SQL);
	$info = $dbh->selectall_arrayref($sourcerows);
	unshift @$result, [$oid,$val2,$now], [$oid,$val,$now];
	is_deeply($info, $result, $t);
	$dbh->rollback;
	return;

} ## end of bucardo_delta_populate


sub test_customcode {

	our ($table,$sdbh,$tdbh) = @_;

	$location = 'customcode';
	
	$type = $tabletype{$table};
	my $oid = $table{$sdbh}{$table};
	my $toid = $table{$tdbh}{$table};
	die unless $table =~ /(\d+)/;
	my $goatnumber = ($1*2)+1; ## no critic

	clean_swap_table($table,[$sdbh,$tdbh]);
	## Make sure Bucardo has a controller out for this code
	$SQL = "SELECT 1 FROM bucardo.audit_pid WHERE sync = 'customcode' AND type='CTL' AND killdate IS NULL";
	wait_until_true($masterdbh => $SQL);

	## We want to know when the mcp and syncs are reloaded
	our $mcp_reloaded_notice = 'bucardo_reloaded_mcp';
	$masterdbh->do("LISTEN $mcp_reloaded_notice");
	our $sync_reloaded_notice = 'bucardo_reloaded_sync_customcode';
	$masterdbh->do("LISTEN $sync_reloaded_notice");
	$masterdbh->do("DELETE FROM customcode");
	$masterdbh->commit();

	## Test "bad" code
	my $badcode = q{use strict; return 1; };

	my $code = $bc->customcode
		({
		  src_code => $badcode,
		  name     => 'custom code test',
		  sync     => 'customcode',
		  whenrun  => 'before_txn',
		  });

	$t=q{ The customcode method returned a number };
	my $codeid = $code->{id};
	like($codeid, qr{^\d+$}, $t);

	$t=q{ Bucardo was reloaded };
	$masterdbh->do("NOTIFY bucardo_mcp_reload");
	$masterdbh->commit();
	wait_for_notice($masterdbh, $mcp_reloaded_notice);
	pass($t);

	$t=q{ Lack of "dummy" in customcode prevents a sync from activating };
	$SQL = "SELECT 1 FROM bucardo.audit_pid WHERE sync = 'customcode' AND type='CTL' AND killdate IS NULL";
	wait_until_false($masterdbh => $SQL);
	pass($t);

	$badcode = q{use strict; my ($arg) = @_; return if $arg->{dummy}; throwerror; return 1; }; ## no critic

	$SQL = "UPDATE customcode SET name = ?, src_code = ?";
	$sth = $masterdbh->prepare($SQL);
	$sth->execute($badcode,"custom code test 'bad'");

	$t=q{ Bucardo was reloaded };
	$masterdbh->do("NOTIFY bucardo_mcp_reload");
	$masterdbh->commit();
	wait_for_notice($masterdbh, $mcp_reloaded_notice);
	pass($t);

	$t=q{ Custom code that does not compile prevents a sync from activating };
	$SQL = "SELECT 1 FROM bucardo.audit_pid WHERE sync = 'customcode' AND type='CTL' AND killdate IS NULL";
	$sth = $masterdbh->prepare($SQL);
	wait_until_false($masterdbh => $SQL);
	pass($t);

	## Now load code that does work

	## no critic
	my $codetemplate = q{
use strict;

my ($arg) = @_;

return if exists $arg->{dummy};

my $file = "BC_TEST_FILE";
system("touch $file");

$arg->{message} = "Created file $file";

return;
};
	## use critic

	our (%testcode,%testfile);
	my $BC_TEST_FILE = "/tmp/bucardo_test_file";
	for my $num (1..12) {
		$testcode{$num} = $codetemplate;
		$testfile{$num} = "$BC_TEST_FILE.$num";
		$testcode{$num} =~ s/BC_TEST_FILE/$BC_TEST_FILE.$num/;
		## Remove any previous test file
		unlink $testfile{$num};
	}

	## XXX Make this into a $bc method
	$t=q{ Update of customcode worked};
	$SQL = "UPDATE customcode SET name = ?, src_code = ? WHERE id = ?";
	$sth = $masterdbh->prepare($SQL);
	$count = $sth->execute("custom code test '1'",$testcode{1},$codeid);
	is($count, 1, $t);

	$masterdbh->do("NOTIFY bucardo_mcp_reload");
	$masterdbh->commit();

	$t=q{ Bucardo replied to mcp_reload notice };
	wait_for_notice($masterdbh, $mcp_reloaded_notice);
	pass($t);

	## The sync should be there, and active
	$SQL = "SELECT 1 FROM pg_listener WHERE relname = 'bucardo_syncdone_customcode_bctest2'";
	wait_until_true($masterdbh => $SQL);
	pass(" Sync with good customcode is now active after a reload");

	## Check most of the custom code types
	## TESTCOUNT + 4

	$val = $val{$type}{1};
	$sdbh->do("INSERT INTO $table(id,data1,inty) VALUES ('$val','one',1)");
	$sdbh->commit();

	our $sync_done_notice = "bucardo_syncdone_customcode";
	$masterdbh->do("LISTEN $sync_done_notice");
	$masterdbh->commit();

	sub quick_cc_test {
		my ($name,$number) = @_;

		$bc->customcode
			({
			  src_code => $testcode{$number},
			  name     => "${name}_test '$number'",
			  sync     => 'customcode',
			  whenrun  => $name,
			  });

		$masterdbh->do("NOTIFY bucardo_reload_sync_customcode");
		$masterdbh->commit();
		wait_for_notice($masterdbh, $sync_reloaded_notice, 10);
		$masterdbh->commit();

		$sdbh->do("UPDATE $table SET inty = $number");
		$sdbh->commit();
		wait_until_true($tdbh => "SELECT 1 FROM $table WHERE inty = $number");

		my $timeout = 20;
		my $found = 0;
		{
			if (-e $testfile{$number}) {
				$found = 1;
				last;
			}
			last if $timeout-- < 1;
			sleep 0.5;
			redo;
		}

		$t=qq{ Test file "$testfile{$number}" was created by '$name' custom code };
		is(-e _, 1, $t);
		unlink $testfile{$number};
		return;

	} ## end of quick_cc_test

	quick_cc_test('before_txn',            2);
	quick_cc_test('before_check_rows',     3);
	quick_cc_test('before_trigger_drop',   4);
	quick_cc_test('before_trigger_enable', 5);
	quick_cc_test('after_trigger_enable',  6);
	quick_cc_test('after_txn',             7);

	quick_cc_test('before_sync',           11);
	quick_cc_test('after_sync',            12);

	for (1..12) {
		unlink $testfile{$_};
	}

	## Test that conflict code fires, with a simple "target wins" resolution
	my $conflict_code = $testcode{8};

	$conflict_code =~ s/return;/\$arg->{rowinfo}{action} = 2;\nreturn;/;

	## Generate a conflict!

	## Turn off this sync
	my $syncoff = "bucardo_deactivated_sync_customcode";
	$masterdbh->do("LISTEN $syncoff");
	$masterdbh->do("NOTIFY bucardo_deactivate_sync_customcode");
	$masterdbh->commit();

	wait_for_notice($masterdbh, $syncoff);

	## Load in the conflict code
	$masterdbh->do("DELETE FROM customcode");
	$masterdbh->commit();
	$code = $bc->customcode
		({
		  src_code => $conflict_code,
		  name     => "custom code test '8'",
		  goat     => $goatnumber,
		  whenrun  => 'conflict',
		  });

	$t=q{ The customcode method returned a number };
	$codeid = $code->{id};
	like($codeid, qr{^\d+$}, $t);

	## Turn off the standard conflict for this table
	$SQL = "UPDATE goat SET standard_conflict = NULL WHERE tablename = '$table'";
	$masterdbh->do($SQL);
	$masterdbh->commit();

	## Create the conflict
	$val = $val{$type}{3};
	$sdbh->do("INSERT INTO $table(id,data1,inty) VALUES ('$val','source',3)");
	$sdbh->commit();
	$tdbh->do("INSERT INTO $table(id,data1,inty) VALUES ('$val','target',33)");
	$tdbh->commit();

	## Start up this sync, then kick it
	my $syncon = "bucardo_activated_sync_customcode";
	$masterdbh->do("LISTEN $syncon");
	$masterdbh->do("NOTIFY bucardo_activate_sync_customcode");
	$masterdbh->commit();

	wait_for_notice($masterdbh, $syncon);
	pass(" Activated sync for conflict testing (table $table)");

	bucardo_ctl("kick customcode 0");

	$t=qq{ Test file "$testfile{8}" was created by 'conflict' custom code };
	is(-e $testfile{8}, 1, $t);


	$SQL = "UPDATE goat SET standard_conflict = 'source'";
	$masterdbh->do($SQL);
	$masterdbh->commit();

	## Try out 'exception'

	## Turn off this sync
	$masterdbh->do("NOTIFY bucardo_deactivate_sync_customcode");
	$masterdbh->commit();
	wait_for_notice($masterdbh, $syncoff);

	my $exception_code = $testcode{9};

	## no critic
	my $newcode = q{

$SIG{__DIE__} = sub {
	$arg->{warning} = shift;
	die "Out of here!\n";
};

my $rowinfo = $arg->{rowinfo};

my $sdbh = $arg->{sourcedbh};
my $tdbh = $arg->{targetdbh};

my $sourcerow = $rowinfo->{sourcerow};
my $targetrow = $rowinfo->{targetrow};

my $error = $rowinfo->{dbi_error};

if ($error =~ /unique constraint "bucardo_test._email_key"/) {
	## Who threw the error?
	my ($okdbh,$errdbh,$email,$email2);
	if ($rowinfo->{source_error}) { ## target to source failed
		$email = $targetrow->{email};
		$email2 = $sourcerow->{email};
		$errdbh = $sdbh;
		$okdbh = $tdbh;
	}
	else { ## source to target failed
		$email = $sourcerow->{email};
		$email2 = $targetrow->{email};
		$errdbh = $tdbh;
		$okdbh = $sdbh;
	}
	## Our solution? Remove the offending row
	my ($S,$T) = ($rowinfo->{schema},$rowinfo->{table});
	my ($pkeyname,$pkey) = ($rowinfo->{pkeyname}, $rowinfo->{pkey});
	my $SQL = "DELETE FROM $S.$T WHERE email = ?";
	my $sth = $errdbh->prepare($SQL);
	my $count = $sth->execute($email);
	$arg->{runagain} = 1;
	return;
}

$arg->{message} = "Cannot handle unknown error: $error";

};
	## use critic

	$exception_code =~ s/return;/$newcode\nreturn;/;

	## Make the table conflictable - currently has id:inty of 1:11 and 3:33
	$SQL = "UPDATE $table SET inty=55, email = 'nobody\@example.com' WHERE inty=12";
	$sdbh->do($SQL);
	$sdbh->commit();
	$SQL = "UPDATE $table SET inty=44, email = 'nobody\@example.com' WHERE inty=33";
	$tdbh->do($SQL);
	$tdbh->commit();
	## Tables are now: source 1:55 3:33 target 1:7 3:44

	$masterdbh->do("DELETE FROM customcode");
	$masterdbh->commit();
	$code = $bc->customcode
		({
		  src_code => $exception_code,
		  name     => 'custom code test',
		  goat     => $goatnumber,
		  whenrun  => 'exception',
		  });

	$t=q{ The customcode method returned a number };
	$codeid = $code->{id};
	like($codeid, qr{^\d+$}, $t);

	## Start up this sync, then kick it
	$masterdbh->do("NOTIFY bucardo_activate_sync_customcode");
	$masterdbh->commit();
	wait_for_notice($masterdbh, $syncon);
	pass(" Activated sync for exception testing (table $table)");

	bucardo_ctl("kick customcode 0");

	$SQL = "SELECT 1 FROM $table WHERE inty=55";
	wait_until_true($sdbh => $SQL);
	wait_until_true($tdbh => $SQL);

	$t=qq{ Test file "$testfile{9}" was created by 'exception' custom code };
	is(-e $testfile{9}, 1, $t);
	unlink $testfile{9};
	return;

} ## end of test_customcode


sub random_swap_testing {

	## Run lots of random transactions, then compare the differences
	## NOTE: These will not always succeed! But they are good for catching problems

	my ($table,$sdbh,$rdbh) = @_;

	$location = 'random swap';

	## Clean out everything
	clean_swap_table($table,[$sdbh,$rdbh]);

	compare_tables($table,$sdbh,$rdbh) or BAIL_OUT "Compare tables failed?!\n";

	$type = $tabletype{$table};

	$SQL = "INSERT INTO $table(id,data1,inty) VALUES (?,?,?)";
	my $s_insert = $sdbh->prepare($SQL);
	my $r_insert = $rdbh->prepare($SQL);

	for (1..100) {
		my $dbh = rand(2) > 1 ? $sdbh : $rdbh;
		my $action = int rand(100);
		my $commit = int rand (100);
		if ($action < 3) { ## Changed from 3
			## Update
			my $num = int 1+rand(3);
			$SQL = qq{
				UPDATE $table
				SET data1 = 'random_update'
				WHERE inty IN 
					(SELECT inty
					 FROM $table
					 ORDER BY random()
					 LIMIT $num
				)};
			$dbh->do($SQL);
		}
		elsif ($action < 90) {
			## Insert
			$SQL = "SELECT max(inty) FROM $table";
			my $max = $dbh->selectall_arrayref($SQL)->[0][0];
			my $num = int 1+rand(10);
			for (1..$num) {
				$max++;
				my $val = $max;
				next if $val == 3 or $val==4;
				if ($type eq 'TEXT') {
					$val = "bc$val";
				}
				elsif ($type eq 'DATE') {
					$SQL = "SELECT '2001-11-01'::date + '$max days'::interval";
					$val = $dbh->selectall_arrayref($SQL)->[0][0];
				}
				elsif ($type eq 'TIMESTAMP') {
					$SQL = "SELECT '2001-11-01 12:34:56'::timestamp + '$max days'::interval";
					$val = $dbh->selectall_arrayref($SQL)->[0][0];
				}
				eval {
					$sth = ($dbh eq $sdbh) ? $s_insert : $r_insert;
					$sth->execute($val,'newrandom',$max);
				};
				if ($@) {
					$dbh->rollback();
				}
			}
		}
		else {
			## Delete
			my $num = int 1+rand(5);
			$SQL = "DELETE FROM $table WHERE inty IN (SELECT inty FROM $table ORDER BY random() LIMIT $num)";
			$dbh->do($SQL);
		}
		if ($commit < 5) {
			$dbh->commit();
		}
		elsif ($commit < 10) {
			$dbh->rollback();
		}

	} ## end random iterations

	## Final inserts so we know when all syncing has completed
	$val = $val{$type}{3};
	$SQL = "INSERT INTO $table(id,data1,inty) VALUES ('$val','stop',9999)";
	$sdbh->do($SQL);
	$sdbh->commit();
	$val = $val{$type}{4};
	$SQL = "INSERT INTO $table(id,data1,inty) VALUES ('$val','stop',8888)";
	$rdbh->do($SQL);
	$rdbh->commit();

	wait_until_true($rdbh => "SELECT 1 FROM $table WHERE inty = 9999");
	wait_until_true($sdbh => "SELECT 1 FROM $table WHERE inty = 8888");

	compare_tables($table,$sdbh,$rdbh);

	return;

} ## end of random_swap_testing


__DATA__
## The above __DATA__ line must be kept for the test counting