# common exercises for Forks::Queue objects
use Time::HiRes;
use Data::Dumper; $Data::Dumper::Sortkeys = $Data::Dumper::Indent = 1;
sub diagdump { diag Dumper(@_) }
sub uninterruptable_sleep ($) {
# SIGIO can interrupt CORE::sleep and Time::HiRes::sleep,
# and sometimes we don't want that
my $expire = Time::HiRes::time + $_[0];
my $n = $_[0];
while ($n > 0) {
Time::HiRes::sleep $n;
$n = $expire - Time::HiRes::time;
}
return $_[0];
}
sub exercise_fifo {
my $q = shift;
return if " @ARGV " =~ / [a-z]/ && " @ARGV " !~ / fifo /;
ok($q->{style} eq 'fifo', 'queue style is fifo, can proceed');
my $h = $q->status;
ok($h, 'fifo: got status');
ok($h->{avail} == 0, 'fifo: queue is empty');
ok($q->put("hello"), 'fifo: put ok');
ok($q->put("world"), 'fifo: put ok');
ok(2 == $q->put([1,2,3,4],{foo => "bar"}), 'fifo: put 2 ok');
$h = $q->status;
ok($h->{avail} == 4, 'fifo: 4 items put on queue') or diagdump $h;
ok($h->{end} == 0, 'fifo: end has not been called');
my $p1 = $q->peek_front;
my $p2 = $q->peek_back;
my $p3 = $q->peek;
ok($p1 eq 'hello', 'fifo: peek_front gets first item');
ok(ref($p2) eq 'HASH', 'fifo: peek_back gets last item');
is_deeply($p1, $p3, 'fifo: peek and peek_front are the same');
my $g = $q->get;
ok(defined($g), 'fifo: got item from queue');
is_deeply($g, $p3, 'fifo: got item that was returned by peek');
ok($g eq 'hello', 'fifo: first item out was first item in');
$g = $q->get(1);
ok($g == 1, 'fifo: $queue->get($count) in scalar context returns count');
$q->end;
$h = $q->status;
ok($h->{end}, 'fifo: end has been called');
$g = $q->get;
ok(ref($g) eq 'ARRAY' && "@$g" eq "1 2 3 4",
'fifo: got third item from queue after end call') or diagdump $g;
$h = $q->status;
ok($h->{avail} == 1, 'fifo: 1 item available');
($g, my $g2) = $q->get(2);
ok(ref($g) eq 'HASH' && $g->{foo} eq "bar",
'fifo: got 4th item from queue');
ok(!defined($g2), 'fifo: 2nd return value to get(2) call is empty');
print STDERR "\n";
diag ">>> Expect 'put call after end call' warning";
ok(0 == $q->put("blech"), 'fifo: put fails after end call');
}
sub exercise_forks {
my ($q,$DEBUG) = @_;
return if " @ARGV " =~ / [a-z]/ && " @ARGV " !~ / forks /;
my $kidpid = fork();
if (!defined $kidpid) {
die "Fork failed: $!. We should have low expectations ",
"for the rest of this test.";
}
if ($kidpid == 0) {
if (fork() == 0) {
for my $i (0 .. 9) {
eval { $q->put( +{ item => "grandchild$i" } ) };
print STDERR $@ if $@;
}
exit;
}
for my $i (0 .. 9) {
eval { $q->put("child$i") };
print STDERR $@ if $@;
}
my $gcpid = wait;
uninterruptable_sleep 5; # give parent time for its put calls
uninterruptable_sleep 5 if $^O =~ /freebsd/i; #& give freebsd more time
$q->end;
exit;
}
for my $i (0..9) {
$q->put("parent$i");
}
my $s = $q->status;
sleep 5 if $^O =~ /freebsd/i;
my $t = Time::HiRes::time;
until ($s->{end}) {
sleep 1;
if (time - $t > 20) {
diag "exercise_forks: ",
"Taking too long for queue to become availabile";
$q->end;
}
$s = $q->status;
if (time - $t > 30) {
die "Took too long for queue to become available";
}
}
my $proctime = Time::HiRes::time - $t;
if ($^O !~ /freebsd/i) {
ok($proctime < 12.5,
"forks: simultaneous queue access from 3 procs not too slow "
. "($proctime s)");
}
ok($q->pending == 30, 'forks: 30 items on queue')
or diag $q->pending;
my @g = $q->get(30);
ok(@g == 30, 'forks: get(30) retrieved 30 items from queue')
or diag 0+@q," [@q]";
my %expect;
for my $i (0 .. 9) {
$expect{"parent$i"} = $expect{"child$i"} =
$expect{"grandchild$i"} = 1;
}
foreach my $g (@g) {
if (ref($g)) {
$g = $g->{item};
}
ok(delete $expect{$g}, "forks: found expected item $g");
}
ok(%expect == 0, "forks: all expected items removed from queue");
waitpid $kidpid, 0;
}
sub exercise_lifo {
my $q = shift;
return if " @ARGV " =~ / [a-z]/ && " @ARGV " !~ / lifo /;
ok($q->{style} eq 'lifo', 'lifo: queue style is lifo, can proceed');
my $h = $q->status;
ok($h, 'lifo: read status');
ok($h->{avail} == 0, 'lifo: count for empty queue is 0');
ok($q->put("hello"), 'lifo: 1st put ok');
ok($q->put("world"), 'lifo: 2nd put ok');
ok(2 == $q->put([1,2,3,4],{foo => "bar"}), 'lifo: put 2 ok');
$h = $q->status;
ok($h->{avail} == 4, 'lifo: 4 items put on queue') or diagdump $h;
ok($q->pending == 4, 'lifo: 4 items put on queue');
ok($h->{end} == 0, 'lifo: end has not been called');
my $p1 = $q->peek_front;
my $p2 = $q->peek_back;
my $p3 = $q->peek;
ok($p1 eq 'hello', 'lifo: peek_front gets first item');
ok(ref($p2) eq 'HASH', 'lifo: peek_back gets last item');
is_deeply($p2, $p3, 'lifo: peek and peek_back are the same');
my $g = $q->get;
ok(defined($g), 'lifo: got item from queue');
is_deeply($g, $p3, 'lifo: got item that was returned by peek');
ok(ref($g) eq 'HASH' && $g->{foo} eq 'bar',
'lifo: first item out was last item in');
$g = $q->peek;
ok(ref($g) eq 'ARRAY' && $g->[2] == 3,
'lifo: peek returned 3rd item') or diagdump $g;
$g = $q->get(1);
ok($g == 1, 'lifo: $queue->get($count) in scalar context returns count');
$q->end;
$h = $q->status;
ok($h->{end}, 'lifo: end has been called');
$g = $q->get;
ok($g eq 'world',
'lifo: got <xxx>third</xxx> second item from queue after end call');
ok($q->pending == 1, 'lifo: 1 item available');
($g, my $g2) = $q->get(2);
ok($g eq 'hello', 'lifo: got 4th item from queue');
ok(!defined($g2), 'lifo: 2nd return value to get(2) call is empty');
print STDERR "\n";
diag ">>> Expect 'put call after end call' warning";
ok(0 == $q->put("blech"), 'lifo: put fails after end call');
local $SIG{ALRM} = sub { die "Timeout\n" };
local $@;
alarm 5;
my $p = eval { $q->peek };
alarm 0;
ok(!defined($p) && !$@,
'lifo: peek on empty queue returns <undef> right away')
or diagdump($p,$@);
}
sub exercise_lifo2 {
my $q = shift;
$q->clear;
ok($q->pending == 0, 'no items pending on cleared queue');
ok($q->{style} eq 'lifo', 'queue style is LIFO');
ok(25 == $q->put(1..25), 'populated queue');
ok(25 == $q->get, 'get retrieves last item');
my @g = $q->get(5);
ok(@g == 5, 'get(COUNT) retrieves count');
ok($g[0] == 24 && $g[4] == 20, 'get(COUNT) returns several in LIFO order')
or diagdump \@g;
}
sub exercise_limits {
my ($q,$behavior) = @_;
return if " @ARGV" =~ / [a-z]/ && " @ARGV " !~ / limits /;
ok($q->{limit} == 5,
'limits: expect queue to have limit of 5 for these tests');
ok($q->{on_limit} eq $behavior,
"limits: expect on_limit behavior $behavior");
ok($behavior eq 'fail' || $behavior eq 'block',
'limits: recognized on_limit attr');
if ($behavior eq 'fail') {
ok(3 == $q->put(11,22,33), 'limits_f: put first 3 jobs ok');
ok($q->put(44), 'limits_f: 4th job ok');
ok($q->put(55), 'limits_f: 5th job ok');
print STDERR "\n";
diag ">>> Expect 'buffer is full' warning";
ok(!$q->put(66), 'limits_f: 6th job exceeded limit');
ok(5 == $q->get(100) && $q->pending == 0,
'limits_f: cleared the queue');
ok(3 == $q->put(7,8,9), 'limits_f: put first 3 jobs ok');
diag ">>> Expect 'buffer is full' warning";
ok(2 == $q->put(10,11,12,13), 'limits_f: put 2/4 jobs ok');
ok(5 == $q->get(100) && $q->pending == 0,
'limits: cleared the queue');
} elsif ($behavior eq 'block') {
my $kidpid = fork();
if ($kidpid == 0) {
my $item;
while (defined($item = $q->get)) {
uninterruptable_sleep(1);
}
exit;
}
my $t0 = time;
ok(5 == $q->put(14,15,16,17,18), 'limits_b: put 5 first jobs on');
ok(time - $t0 <= 1, 'limits_b: put first 5 jobs on fast');
my $t1 = time;
ok(4 == $q->put(19,20,21,22), 'limits_b: put next 4 jobs on');
ok(time - $t1 >= 2, 'limits_b: took time to put next 4 jobs on');
$q->end;
waitpid $kidpid, 0;
}
}
sub exercise_peek {
my $q = shift;
return if " @ARGV" =~ / [a-z]/ && " @ARGV " !~ / peek /;
ok($q->pending == 0, 'peek: initial queue is empty');
my $put = $q->put(51 .. 150);
ok($put == 100 && $q->pending == 100,
'peek: put 100 items on queue ok')
or diag "put=$put, pending=",$q->pending;
$DB::single = 1;
if ($q->{style} eq 'fifo') {
ok($q->peek == 51, 'peek: got correct top item');
my $p5 = $q->peek(5);
ok($p5 == 56, 'peek: got correct 5th index') or diagdump $p5;
my $p105 = $q->peek(105);
ok(!defined($p105), 'peek: undef past end of queue') or diagdump $p105;
ok(!defined($q->peek(-105)), 'peek: undef past end of queue');
ok($q->peek(-11) == 140, 'peek: got correct -11th index');
ok($q->peek(50) == $q->peek(-50),
'peek: found median item');
} else {
$DB::single = 1;
ok($q->peek == 150, 'peek: got correct top item');
my $p5 = $q->peek(5);
ok($p5 == 145, 'peek: got correct 5th index') or diagdump $p5;
my $p105 = $q->peek(105);
ok(!defined($p105), 'peek: undef past end of queue') or diagdump $p105;
ok(!defined($q->peek(-105)), 'peek: undef past end of queue');
my $p_11 = $q->peek(-11);
ok($p_11 == 61, 'peek: got correct -11th index') or diagdump($p_11);
ok($q->peek(50) == $q->peek(-50),
'peek: found median item');
}
ok($q->pending == 100, 'peek: peek does not remove jobs');
}
sub exercise_blocking {
my $q = shift;
return if " @ARGV" =~ / [a-z]/ && " @ARGV " !~ / blocking /;
ok($q->pending == 0, 'block: queue is initially empty');
my $t0 = Time::HiRes::time;
my $pid = fork();
if (!defined($pid)) {
diag "fork call failed $!. Expect trouble";
$pid = $$;
}
if ($pid == 0) {
sleep 5;
my @items = map [ ($_) x $_ ], 1..10;
my $putcount = eval { $q->put(@items) };
if ($putcount != 10) {
diag "$$ put $putcount items on queue, not 10. Expect trouble.";
diag "$$ \$\@ is $@";
diag "$$ queue file is ", $q->{file};
diag "$$ -f => ", -f $q->{file};
diag "$$ -r => ", -r $q->{file};
diag "$$ -w => ", -w $q->{file};
diag "$$ -s => ", -s $q->{file};
}
$q->end;
diag "put time=",Time::HiRes::time-$t0,"s";
exit;
}
local $SIG{ALRM} = sub { die "Timeout\n" };
my $t = Time::HiRes::time;
my $t1 = $t;
my ($x1,$x2,$x3) = (1,2,3);
alarm 15;
eval {
$x1 = $q->get_nb;
$x2 = $q->shift_nb;
$x3 = $q->pop_nb;
};
$t = Time::HiRes::time - $t;
alarm 0;
ok($t <= 1, 'block: get_nb and friends return quickly');
ok(!$@, 'block: nb calls do not timeout');
ok(!defined($x1), 'block: get_nb returns undef') or diagdump $x1;
ok(!defined($x2), 'block: shift_nb returns undef') or diagdump $x2;
ok(!defined($x3), 'block: pop_nb returns undef') or diagdump $x3;
alarm 15;
$t = Time::HiRes::time;
eval {
$x3 = $q->pop;
sleep 1;
$x2 = $q->shift;
$x1 = $q->get;
};
$t = Time::HiRes::time - $t;
alarm 0;
ok($t >= 3,
'block: blocking get,pop,shift must wait for queue to be populated')
or diag "wait time was ${t}s";
ok(!$@, 'block: get,pop,shift did not timeout')
or diag "wait time was ${t}s";
ok($x3 && ref($x3) eq 'ARRAY' && @$x3==10,
'block: blocking pop got last element') or diagdump($x3,$t1-$t0,$t);
# t1-t0 is "start up time", time to recover after starting the child process
# check that it is not large compared to "put time" reported by child
# "put time" should also be a lot shorter than t-t0
ok($x2 && ref($x2) eq 'ARRAY' && @$x2==1,
'block: blocking shift got first element') or diagdump($x2);
if ($q->{style} ne 'lifo') {
ok($x1 && ref($x1) eq 'ARRAY' && @$x1==2,
'block: blocking get got next element') or diagdump($x1);
} else {
ok($x1 && ref($x1) eq 'ARRAY' && @$x1==9,
'block: blocking get got next element') or diagdump($x1);
}
ok(waitpid($pid, 0) == $pid,
'block: successful wait on queue populator process');
}
sub exercise_persistence {
# TODO
}
sub exercise_join {
my (%opts) = @_;
return if " @ARGV" =~ / [a-z]/ && " @ARGV " !~ / join /;
my $file = $opts{file} = $opts{db_file} = $opts{file1};
if ($opts{impl} eq 'Shmem') {
$file = "$Forks::Queue::Shmem::DEV_SHM/$file";
}
unlink $file;
ok(! -f $file, "join: queue file does not initially exist");
my $pid1 = fork();
if ($pid1 == 0) {
my $qq = Forks::Queue->new(%opts, persist => 1);
my $sum = 0;
$qq->put( map { my $z=int(137*rand); $sum+=$z; [$_,$z] } 0..213 );
open RESULT, ">", "$file.result";
print RESULT $sum;
close RESULT;
my $lockdir = $qq->{_lockdir};
$qq->end;
sleep 10;
diag "Process $$ no longer using original queue in $file";
rmdir $lockdir if $lockdir;
exit;
}
for (1..5) {
last if -f $file;
diag "waiting on $file ($_)";
sleep 1;
}
ok(-f $file, "join: queue file created in child");
print STDERR "\n";
diag ">>> Expect 'file already exists' warning:";
my $q = Forks::Queue->new(%opts, join => 0);
if (ok($q, 'join: got queue')) {
ok($q->pending == 0, 'join => 0 overwrites existing queue');
unlink "$file.result";
if ($q->{_lockdir}) {
rmdir $q->{_lockdir};
}
undef $q;
SKIP: {
sleep 2;
if ($^O eq 'MSWin32') {
skip "- queue file deletion happens at program end for MSWin32",1;
}
if ($^O eq 'cygwin') {
skip "queue file deletion check on Cygwin";
}
ok(! -f $file, 'join: destruction of queue deletes queue file')
or diag $!," $file ",-s $file;
}
} else {
diag "join: Constructor for $opts{impl} queue failed";
}
waitpid $pid1,0;
$file = $opts{file} = $opts{db_file} = $opts{file2};
if ($opts{impl} eq 'Shmem') {
$file = "$Forks::Queue::Shmem::DEV_SHM/$file";
}
unlink $file;
ok(! -f $file, "join: queue file does not initially exist");
my $pid2 = fork();
if ($pid2 == 0) {
my $qq = Forks::Queue->new(%opts, persist => 1);
diag "child $$ [persist=>1] created new queue";
my $sum = 0;
$qq->put( map { my $z=int(213*rand); $sum+=$z; [$_,$z] } 0..137 );
open RESULT,">$file.result";
print RESULT $sum;
close RESULT;
$qq->end;
sleep 5;
diag "child $$ [persist=>1] exiting";
exit;
}
for (1..5) {
last if -f $file;
sleep 1;
}
ok(-f $file, "join: queue file created in child");
$q = Forks::Queue->new(%opts, join => 1);
ok($q, 'join: got queue join => 1');
ok($q->pending > 0, 'join => 1 attaches to existing queue');
my $sum = 0;
while (my @items = $q->get(17)) {
$sum += $_->[1] for @items;
}
ok(open(RESULT, '<', "$file.result"),
'join: result file from detached child ok');
my $sumc = <RESULT>;
close RESULT;
ok($sumc == $sum, 'join: data passed correctly from detached child')
or diag "got $sum expected $sumc\n";
waitpid $pid2,0;
ok(-f $file, "join: queue file $file exists while queue object exists");
my $lockdir = $q->{_lockdir};
undef $q;
ok(! -f $file, 'join: destruction of queue deletes queue file');
unlink "$file.result";
rmdir $lockdir if $lockdir;
return;
}
sub TEMP_DIR {
if ($Forks::Queue::OPTS{impl} eq 'Shmem') {
return $Forks::Queue::Shmem::DEV_SHM;
}
my $TEMP = "/tmp";
if ($^O eq 'MSWin32') {
if ($ENV{TEMP} && -d $ENV{TEMP} && -w _ && -x _) {
$TEMP = $ENV{TEMP};
} elsif ($ENV{TMPDIR} && -d $ENV{TMPDIR} && -w _ && -x _) {
$TEMP = $ENV{TMPDIR};
} elsif (-d "C:/Temp" && -w _ && -x _) {
$TEMP = "C:/Temp";
} elsif (-d "C:/Windows/Temp" && -w _ && -x _) {
$TEMP = "C:/Windows/Temp";
} else {
$TEMP = "/";
warn "$0: using '/' as temp file store";
}
}
if (defined &Forks::Queue::Util::__is_nfs &&
Forks::Queue::Util::__is_nfs($TEMP)) {
diag "Warning: working directory '$TEMP' is an NFS directory";
diag "Synchronization is more difficult with NFS filesystems";
}
return $TEMP;
}
sub PREP {
my $impl = shift;
$impl = lc $impl;
if ($impl eq 'file') {
use_ok('Forks::Queue::File');
$Forks::Queue::OPTS{impl} = 'File';
} elsif ($impl eq 'shmem') {
use_ok('Forks::Queue::Shmem');
if (! -d $Forks::Queue::Shmem::DEV_SHM) {
Test::More::diag("shared memory virtual filesystem not found");
ok(1, "# skip - shared memory virtual filesystem not found");
done_testing;
exit;
}
$Forks::Queue::OPTS{impl} = 'Shmem';
} elsif ($impl eq 'sqlite') {
if (!eval "use DBD::SQLite;1") {
Test::More::diag("DBD::SQLite not installed");
ok(1, "# skip - DBD::SQLite required for Forks::Queue::SQLite");
done_testing;
exit;
}
use_ok('Forks::Queue::SQLite');
$Forks::Queue::OPTS{impl} = 'SQLite';
}
# # a failure mode of many of these tests is to hang the process.
# # run the poor man's alarm to keep the tests moving
# my $p=$$;
# if (CORE::fork() == 0) {
# exec($^X,"-e","for(1..30){sleep 1;exit if !kill(0,$p)}kill -9,$p");
# }
}
sub IMPL {
# which implementation can run / to run
# to run all tests just against specific implementations
# FORKS_QUEUE_IMPL=Shmem,SQLite make test
#
# to run specific tests again specific implementations:
# perl t/18-timed.t File
# perl t/11-clear.t Shmem SQLite
if ($ENV{FORKS_QUEUE_IMPL}) {
return split /,/, $ENV{FORKS_QUEUE_IMPL};
}
if (@ARGV) {
return @ARGV;
}
my @impl;
if (eval "use Forks::Queue::File;1") {
push @impl, 'File';
}
if (eval "use Forks::Queue::Shmem;-d \$Forks::Queue::Shmem::DEV_SHM") {
push @impl, 'Shmem';
}
if (eval "use DBD::SQLite; use Forks::Queue::SQLite; 1") {
push @impl, 'SQLite';
}
return @impl;
}
1;