# common exercises for Forks::Queue objects

use Time::HiRes;
use Data::Dumper; $Data::Dumper::Sortkeys = $Data::Dumper::Indent = 1;
sub diagdump { diag Dumper(@_) }

sub uninterruptable_sleep ($) {
    # SIGIO can interrupt CORE::sleep and Time::HiRes::sleep,
    # and sometimes we don't want that
    my $expire = Time::HiRes::time + $_[0];
    my $n = $_[0];
    while ($n > 0) {
        Time::HiRes::sleep $n;
        $n = $expire - Time::HiRes::time;
    }
    return $_[0];
}

sub exercise_fifo {
    my $q = shift;
    return if " @ARGV " =~ / [a-z]/ && " @ARGV " !~ / fifo /;

    ok($q->{style} eq 'fifo', 'queue style is fifo, can proceed');

    my $h = $q->status;
    ok($h, 'fifo: got status');
    ok($h->{avail} == 0, 'fifo: queue is empty');

    ok($q->put("hello"), 'fifo: put ok');
    ok($q->put("world"), 'fifo: put ok');
    ok(2 == $q->put([1,2,3,4],{foo => "bar"}), 'fifo: put 2 ok');

    $h = $q->status;
    ok($h->{avail} == 4, 'fifo: 4 items put on queue') or diagdump $h;
    ok($h->{end} == 0, 'fifo: end has not been called');

    my $p1 = $q->peek_front;
    my $p2 = $q->peek_back;
    my $p3 = $q->peek;
    ok($p1 eq 'hello', 'fifo: peek_front gets first item');
    ok(ref($p2) eq 'HASH', 'fifo: peek_back gets last item');
    is_deeply($p1, $p3, 'fifo: peek and peek_front are the same');

    my $g = $q->get;
    ok(defined($g), 'fifo: got item from queue');
    is_deeply($g, $p3, 'fifo: got item that was returned by peek');
    ok($g eq 'hello', 'fifo: first item out was first item in');

    $g = $q->get(1);
    ok($g == 1, 'fifo: $queue->get($count) in scalar context returns count');


    $q->end;
    $h = $q->status;
    ok($h->{end}, 'fifo: end has been called');


    $g = $q->get;
    ok(ref($g) eq 'ARRAY' && "@$g" eq "1 2 3 4",
       'fifo: got third item from queue after end call') or diagdump $g;


    $h = $q->status;
    ok($h->{avail} == 1, 'fifo: 1 item available');


    ($g, my $g2) = $q->get(2);
    ok(ref($g) eq 'HASH' && $g->{foo} eq "bar",
       'fifo: got 4th item from queue');
    ok(!defined($g2), 'fifo: 2nd return value to get(2) call is empty');

    print STDERR "\n";
    diag ">>> Expect 'put call after end call' warning";
    ok(0 == $q->put("blech"), 'fifo: put fails after end call');
}

sub exercise_forks {
    my ($q,$DEBUG) = @_;
    return if " @ARGV " =~ / [a-z]/ && " @ARGV " !~ / forks /;

    my $kidpid = fork();
    if (!defined $kidpid) {
        die "Fork failed: $!. We should have low expectations ",
            "for the rest of this test.";
    }
    if ($kidpid == 0) {
        if (fork() == 0) {
            for my $i (0 .. 9) {
                eval { $q->put( +{ item => "grandchild$i" } ) };
                print STDERR $@ if $@;
            }
            exit;
        }
        for my $i (0 .. 9) {
            eval { $q->put("child$i") };
            print STDERR $@ if $@;
        }
        my $gcpid = wait;
        uninterruptable_sleep 5;  # give parent time for its put calls
        uninterruptable_sleep 5 if $^O =~ /freebsd/i; #& give freebsd more time
        $q->end;
        exit;
    }

    for my $i (0..9) {
        $q->put("parent$i");
    }

    my $s = $q->status;
    sleep 5 if $^O =~ /freebsd/i;
    my $t = Time::HiRes::time;
    until ($s->{end}) {
        sleep 1;
        if (time - $t > 20) {
            diag "exercise_forks: ",
                "Taking too long for queue to become availabile";
            $q->end;
        }
        $s = $q->status;
        if (time - $t > 30) {
            die "Took too long for queue to become available";
        }
    }
    my $proctime = Time::HiRes::time - $t;
    if ($^O !~ /freebsd/i) {
        ok($proctime < 12.5,
           "forks: simultaneous queue access from 3 procs not too slow "
           . "($proctime s)");
    }
    ok($q->pending == 30, 'forks: 30 items on queue')
        or diag $q->pending;
    my @g = $q->get(30);
    ok(@g == 30, 'forks: get(30) retrieved 30 items from queue')
        or diag 0+@q," [@q]";

    my %expect;
    for my $i (0 .. 9) {
        $expect{"parent$i"} = $expect{"child$i"} =
            $expect{"grandchild$i"} = 1;
    }

    foreach my $g (@g) {
        if (ref($g)) {
            $g = $g->{item};
        }
        ok(delete $expect{$g}, "forks: found expected item $g");
    }
    ok(%expect == 0, "forks: all expected items removed from queue");
    waitpid $kidpid, 0;
}

sub exercise_lifo {
    my $q = shift;
    return if " @ARGV " =~ / [a-z]/ && " @ARGV " !~ / lifo /;

    ok($q->{style} eq 'lifo', 'lifo: queue style is lifo, can proceed');
    
    my $h = $q->status;
    ok($h, 'lifo: read status');
    ok($h->{avail} == 0, 'lifo: count for empty queue is 0');

    ok($q->put("hello"), 'lifo: 1st put ok');
    ok($q->put("world"), 'lifo: 2nd put ok');
    ok(2 == $q->put([1,2,3,4],{foo => "bar"}), 'lifo: put 2 ok');

    $h = $q->status;
    ok($h->{avail} == 4, 'lifo: 4 items put on queue') or diagdump $h;
    ok($q->pending == 4, 'lifo: 4 items put on queue');
    ok($h->{end} == 0, 'lifo: end has not been called');

    my $p1 = $q->peek_front;
    my $p2 = $q->peek_back;
    my $p3 = $q->peek;
    ok($p1 eq 'hello', 'lifo: peek_front gets first item');
    ok(ref($p2) eq 'HASH', 'lifo: peek_back gets last item');
    is_deeply($p2, $p3, 'lifo: peek and peek_back are the same');

    my $g = $q->get;
    ok(defined($g), 'lifo: got item from queue');
    is_deeply($g, $p3, 'lifo: got item that was returned by peek');
    ok(ref($g) eq 'HASH' && $g->{foo} eq 'bar',
       'lifo: first item out was last item in');

    $g = $q->peek;
    ok(ref($g) eq 'ARRAY' && $g->[2] == 3,
       'lifo: peek returned 3rd item') or diagdump $g;
    $g = $q->get(1);
    ok($g == 1, 'lifo: $queue->get($count) in scalar context returns count');

    $q->end;
    $h = $q->status;
    ok($h->{end}, 'lifo: end has been called');

    $g = $q->get;
    ok($g eq 'world',
       'lifo: got <xxx>third</xxx> second item from queue after end call');

    ok($q->pending == 1, 'lifo: 1 item available');

    ($g, my $g2) = $q->get(2);
    ok($g eq 'hello', 'lifo: got 4th item from queue');
    ok(!defined($g2), 'lifo: 2nd return value to get(2) call is empty');

    print STDERR "\n";
    diag ">>> Expect 'put call after end call' warning";
    ok(0 == $q->put("blech"), 'lifo: put fails after end call');

    local $SIG{ALRM} = sub { die "Timeout\n" };
    local $@;
    alarm 5;
    my $p = eval { $q->peek };
    alarm 0;
    ok(!defined($p) && !$@,
       'lifo: peek on empty queue returns <undef> right away')
        or diagdump($p,$@);
}

sub exercise_lifo2 {
    my $q = shift;
    $q->clear;
    ok($q->pending == 0, 'no items pending on cleared queue');
    ok($q->{style} eq 'lifo', 'queue style is LIFO');

    ok(25 == $q->put(1..25), 'populated queue');
    ok(25 == $q->get, 'get retrieves last item');
    my @g = $q->get(5);
    ok(@g == 5, 'get(COUNT) retrieves count');
    ok($g[0] == 24 && $g[4] == 20, 'get(COUNT) returns several in LIFO order')
        or diagdump \@g;
}

sub exercise_limits {
    my ($q,$behavior) = @_;
    return if " @ARGV" =~ / [a-z]/ && " @ARGV " !~ / limits /;

    ok($q->{limit} == 5,
       'limits: expect queue to have limit of 5 for these tests');
    ok($q->{on_limit} eq $behavior,
       "limits: expect on_limit behavior $behavior");
    ok($behavior eq 'fail' || $behavior eq 'block',
       'limits: recognized on_limit attr');

    if ($behavior eq 'fail') {
        ok(3 == $q->put(11,22,33), 'limits_f: put first 3 jobs ok');
        ok($q->put(44), 'limits_f: 4th job ok');
        ok($q->put(55), 'limits_f: 5th job ok');
        print STDERR "\n";
        diag ">>> Expect 'buffer is full' warning";
        ok(!$q->put(66), 'limits_f: 6th job exceeded limit');

        ok(5 == $q->get(100) && $q->pending == 0,
           'limits_f: cleared the queue');

        ok(3 == $q->put(7,8,9), 'limits_f: put first 3 jobs ok');
        diag ">>> Expect 'buffer is full' warning";
        ok(2 == $q->put(10,11,12,13), 'limits_f: put 2/4 jobs ok');
        ok(5 == $q->get(100) && $q->pending == 0,
           'limits: cleared the queue');
    } elsif ($behavior eq 'block') {

        my $kidpid = fork();
        if ($kidpid == 0) {
            my $item;
            while (defined($item = $q->get)) {
                uninterruptable_sleep(1);
            }
            exit;
        }

        my $t0 = time;
        ok(5 == $q->put(14,15,16,17,18), 'limits_b: put 5 first jobs on');
        ok(time - $t0 <= 1, 'limits_b: put first 5 jobs on fast');

        my $t1 = time;
        ok(4 == $q->put(19,20,21,22), 'limits_b: put next 4 jobs on');
        ok(time - $t1 >= 2, 'limits_b: took time to put next 4 jobs on');
        $q->end;
        waitpid $kidpid, 0;
    }
}

sub exercise_peek {
    my $q = shift;
    return if " @ARGV" =~ / [a-z]/ && " @ARGV " !~ / peek /;

    ok($q->pending == 0, 'peek: initial queue is empty');
    my $put = $q->put(51 .. 150);
    ok($put == 100 && $q->pending == 100,
       'peek: put 100 items on queue ok')
        or diag "put=$put, pending=",$q->pending;

    $DB::single = 1;
    
    if ($q->{style} eq 'fifo') {
        ok($q->peek == 51, 'peek: got correct top item');
        my $p5 = $q->peek(5);
        ok($p5 == 56, 'peek: got correct 5th index') or diagdump $p5;
        my $p105 = $q->peek(105);
        ok(!defined($p105), 'peek: undef past end of queue') or diagdump $p105;
        ok(!defined($q->peek(-105)), 'peek: undef past end of queue');
        ok($q->peek(-11) == 140, 'peek: got correct -11th index');
        ok($q->peek(50) == $q->peek(-50),
           'peek: found median item');
    } else {
        $DB::single = 1;
        ok($q->peek == 150, 'peek: got correct top item');
        my $p5 = $q->peek(5);
        ok($p5 == 145, 'peek: got correct 5th index') or diagdump $p5;
        my $p105 = $q->peek(105);
        ok(!defined($p105), 'peek: undef past end of queue') or diagdump $p105;
        ok(!defined($q->peek(-105)), 'peek: undef past end of queue');
        my $p_11 = $q->peek(-11);
        ok($p_11 == 61, 'peek: got correct -11th index') or diagdump($p_11);
        ok($q->peek(50) == $q->peek(-50),
           'peek: found median item');
    }
    ok($q->pending == 100, 'peek: peek does not remove jobs');
}

sub exercise_blocking {
    my $q = shift;
    return if " @ARGV" =~ / [a-z]/ && " @ARGV " !~ / blocking /;

    ok($q->pending == 0, 'block: queue is initially empty');

    my $t0 = Time::HiRes::time;
    my $pid = fork();
    if (!defined($pid)) {
        diag "fork call failed $!. Expect trouble";
        $pid = $$;
    }
    if ($pid == 0) {
        sleep 5;
        my @items = map [ ($_) x $_ ], 1..10;
        my $putcount = eval { $q->put(@items) };
        if ($putcount != 10) {
            diag "$$ put $putcount items on queue, not 10. Expect trouble.";
            diag "$$ \$\@ is $@";
            diag "$$ queue file is ", $q->{file};
            diag "$$ -f => ", -f $q->{file};
            diag "$$ -r => ", -r $q->{file};
            diag "$$ -w => ", -w $q->{file};
            diag "$$ -s => ", -s $q->{file};
        }
        $q->end;
        diag "put time=",Time::HiRes::time-$t0,"s";
        exit;
    }

    local $SIG{ALRM} = sub { die "Timeout\n" };
    my $t = Time::HiRes::time;
    my $t1 = $t;
    my ($x1,$x2,$x3) = (1,2,3);

    alarm 15;
    eval {
        $x1 = $q->get_nb;
        $x2 = $q->shift_nb;
        $x3 = $q->pop_nb;
    };
    $t = Time::HiRes::time - $t;
    alarm 0;

    ok($t <= 1, 'block: get_nb and friends return quickly');
    ok(!$@, 'block: nb calls do not timeout');
    ok(!defined($x1), 'block: get_nb returns undef') or diagdump $x1;
    ok(!defined($x2), 'block: shift_nb returns undef') or diagdump $x2;
    ok(!defined($x3), 'block: pop_nb returns undef') or diagdump $x3;

    alarm 15;
    $t = Time::HiRes::time;
    eval {
        $x3 = $q->pop;
        sleep 1;
        $x2 = $q->shift;
        $x1 = $q->get;
    };
    $t = Time::HiRes::time - $t;
    alarm 0;

    ok($t >= 3,
       'block: blocking get,pop,shift must wait for queue to be populated')
        or diag "wait time was ${t}s";
    ok(!$@, 'block: get,pop,shift did not timeout')
        or diag "wait time was ${t}s";
    ok($x3 && ref($x3) eq 'ARRAY' && @$x3==10,
       'block: blocking pop got last element') or diagdump($x3,$t1-$t0,$t);
    # t1-t0 is "start up time", time to recover after starting the child process
    # check that it is not large compared to "put time" reported by child
    # "put time" should also be a lot shorter than t-t0
    ok($x2 && ref($x2) eq 'ARRAY' && @$x2==1,
       'block: blocking shift got first element') or diagdump($x2);
    if ($q->{style} ne 'lifo') {
        ok($x1 && ref($x1) eq 'ARRAY' && @$x1==2,
           'block: blocking get got next element') or diagdump($x1);
    } else {
        ok($x1 && ref($x1) eq 'ARRAY' && @$x1==9,
           'block: blocking get got next element') or diagdump($x1);
    }
    
    ok(waitpid($pid, 0) == $pid,
       'block: successful wait on queue populator process');
}

sub exercise_persistence {
    # TODO
}

sub exercise_join {
    my (%opts) = @_;
    return if " @ARGV" =~ / [a-z]/ && " @ARGV " !~ / join /;

    my $file = $opts{file} = $opts{db_file} = $opts{file1};
    if ($opts{impl} eq 'Shmem') {
        $file = "$Forks::Queue::Shmem::DEV_SHM/$file";
    }
    unlink $file;
    ok(! -f $file, "join: queue file does not initially exist");
    my $pid1 = fork();
    if ($pid1 == 0) {
        my $qq = Forks::Queue->new(%opts, persist => 1);
        my $sum = 0;
        $qq->put( map { my $z=int(137*rand); $sum+=$z; [$_,$z] } 0..213 );
        open RESULT, ">", "$file.result";
        print RESULT $sum;
        close RESULT;
	my $lockdir = $qq->{_lockdir};
        $qq->end;
        sleep 10;
        diag "Process $$ no longer using original queue in $file";
	rmdir $lockdir if $lockdir;
        exit;
    }
    for (1..5) {
        last if -f $file;
        diag "waiting on $file ($_)";
        sleep 1;
    }
    ok(-f $file, "join: queue file created in child");
    print STDERR "\n";

    diag ">>> Expect 'file already exists' warning:";
    my $q = Forks::Queue->new(%opts, join => 0);
    if (ok($q, 'join: got queue')) {
        ok($q->pending == 0, 'join => 0   overwrites existing queue');
        unlink "$file.result";
	if ($q->{_lockdir}) {
	    rmdir $q->{_lockdir};
	}
        undef $q;
      SKIP: {
          sleep 2;
          if ($^O eq 'MSWin32') {
              skip "- queue file deletion happens at program end for MSWin32",1;
          }
	  if ($^O eq 'cygwin') {
	      skip "queue file deletion check on Cygwin";
	  }
          ok(! -f $file, 'join: destruction of queue deletes queue file')
              or diag $!," $file ",-s $file;
        }
    } else {
        diag "join: Constructor for $opts{impl} queue failed";
    }
    waitpid $pid1,0;

    
    $file = $opts{file} = $opts{db_file} = $opts{file2};
    if ($opts{impl} eq 'Shmem') {
        $file = "$Forks::Queue::Shmem::DEV_SHM/$file";
    }
    unlink $file;
    ok(! -f $file, "join: queue file does not initially exist");
    my $pid2 = fork();
    if ($pid2 == 0) {
        my $qq = Forks::Queue->new(%opts, persist => 1);
	diag "child $$ [persist=>1] created new queue";
        my $sum = 0;
        $qq->put( map { my $z=int(213*rand); $sum+=$z; [$_,$z] } 0..137 );
        open RESULT,">$file.result";
        print RESULT $sum;
        close RESULT;
        $qq->end;
        sleep 5;
	diag "child $$ [persist=>1] exiting";
        exit;
    }
    for (1..5) {
        last if -f $file;
        sleep 1;
    }
    ok(-f $file, "join: queue file created in child");
    $q = Forks::Queue->new(%opts, join => 1);
    ok($q, 'join: got queue  join => 1');
    ok($q->pending > 0, 'join => 1   attaches to existing queue');
    my $sum = 0;
    while (my @items = $q->get(17)) {
        $sum += $_->[1] for @items;
    }
    ok(open(RESULT, '<', "$file.result"),
       'join: result file from detached child ok');
    my $sumc = <RESULT>;
    close RESULT;

    ok($sumc == $sum, 'join: data passed correctly from detached child')
        or diag "got $sum expected $sumc\n";
    waitpid $pid2,0;
    ok(-f $file, "join: queue file $file exists while queue object exists");
    my $lockdir = $q->{_lockdir};
    undef $q;
    ok(! -f $file, 'join: destruction of queue deletes queue file');
    unlink "$file.result";
    rmdir $lockdir if $lockdir;
    return;
}

sub TEMP_DIR {
    if ($Forks::Queue::OPTS{impl} eq 'Shmem') {
        return $Forks::Queue::Shmem::DEV_SHM;
    }
    my $TEMP = "/tmp";
    if ($^O eq 'MSWin32') {
        if ($ENV{TEMP} && -d $ENV{TEMP} && -w _ && -x _) {
            $TEMP = $ENV{TEMP};
        } elsif ($ENV{TMPDIR} && -d $ENV{TMPDIR} && -w _ && -x _) {
            $TEMP = $ENV{TMPDIR};
        } elsif (-d "C:/Temp" && -w _ && -x _) {
            $TEMP = "C:/Temp";
        } elsif (-d "C:/Windows/Temp" && -w _ && -x _) {
            $TEMP = "C:/Windows/Temp";
        } else {
            $TEMP = "/";
            warn "$0: using '/' as temp file store";
        }
    }
    if (defined &Forks::Queue::Util::__is_nfs &&
        Forks::Queue::Util::__is_nfs($TEMP)) {
        diag "Warning: working directory '$TEMP' is an NFS directory";
        diag "Synchronization is more difficult with NFS filesystems";
    }
    return $TEMP;
}

sub PREP {
    my $impl = shift;
    $impl = lc $impl;
    if ($impl eq 'file') {
        use_ok('Forks::Queue::File');
        $Forks::Queue::OPTS{impl} = 'File';
    } elsif ($impl eq 'shmem') {
        use_ok('Forks::Queue::Shmem');
        if (! -d $Forks::Queue::Shmem::DEV_SHM) {
            Test::More::diag("shared memory virtual filesystem not found");
            ok(1, "# skip - shared memory virtual filesystem not found");
            done_testing;
            exit;
        }
        $Forks::Queue::OPTS{impl} = 'Shmem';
    } elsif ($impl eq 'sqlite') {
        if (!eval "use DBD::SQLite;1") {
            Test::More::diag("DBD::SQLite not installed");
            ok(1, "# skip - DBD::SQLite required for Forks::Queue::SQLite");
            done_testing;
            exit;
        }
        use_ok('Forks::Queue::SQLite');
        $Forks::Queue::OPTS{impl} = 'SQLite';
    }


#    # a failure mode of many of these tests is to hang the process.
#    # run the poor man's alarm to keep the tests moving
#    my $p=$$;
#    if (CORE::fork() == 0) {
#        exec($^X,"-e","for(1..30){sleep 1;exit if !kill(0,$p)}kill -9,$p");
#    }


}

sub IMPL {
    # which implementation can run / to run

    # to run all tests just against specific implementations
    #     FORKS_QUEUE_IMPL=Shmem,SQLite make test
    #
    # to run specific tests again specific implementations:
    #     perl t/18-timed.t File
    #     perl t/11-clear.t Shmem SQLite
    
    if ($ENV{FORKS_QUEUE_IMPL}) {
        return split /,/, $ENV{FORKS_QUEUE_IMPL};
    }
    if (@ARGV) {
        return @ARGV;
    }
    my @impl;
    if (eval "use Forks::Queue::File;1") {
        push @impl, 'File';
    }
    if (eval "use Forks::Queue::Shmem;-d \$Forks::Queue::Shmem::DEV_SHM") {
        push @impl, 'Shmem';
    }
    if (eval "use DBD::SQLite; use Forks::Queue::SQLite; 1") {
        push @impl, 'SQLite';
    }
    return @impl;
}

1;