BEGIN { use_ok('IO::File') }
BEGIN { use_ok('Storable',qw(store retrieve)) }
BEGIN { use_ok('Thread::Queue') }

$SIG{ALRM} = sub {};

my $unresolved = 'unresolved';
my $resolved = 'resolved';
my $filter = 'filter';
my $filtered = 'filtered';
my $ip2domain = 'ip2domain';

diag( "Creating test log files" );

my @ip;
my %ip2domain;
my %domain2ip;

my $domainref = Thread::Pool::Resolve->can( 'rand_domain' );
my $ipref = Thread::Pool::Resolve->can( 'rand_ip' );

my $resolve;
my @shared : shared;
my %status : shared;
my $checkpointed : shared;

for (my $i = 1; $i <= 100; $i++) {
  my ($ip,$domain) = (&$ipref,&$domainref);
  push( @ip,$ip );
  $ip2domain{$ip} = $domain;
  $domain2ip{$domain} = $ip;
}

for (my $i = 1; $i <= 30; $i++) {
  my $ip = &$ipref;
  push( @ip,$ip );
  $ip2domain{$ip} = '';
}

ok( store( \%ip2domain,$ip2domain ),	'save ip2domain hash with Storable' );

ok( open( my $out1,'>',$unresolved ),	'create unresolved log file' );
ok( open( my $out2,'>',$resolved ),	'create resolved log file' );
my $lines;
for ($lines = 1; $lines <= 1000; $lines++) {
  my $ip = $ip[int(rand(@ip))];
  my $domain = $ip2domain{$ip} || $ip;
  my $hits = 1+int(rand(10));
  foreach (1..$hits) {
    print $out1 "$ip $lines\n";
    print $out2 "$domain $lines\n";
    $lines++;
  }
  $lines--;
}
$lines--;
my $checkpoints = int($lines/10);

ok( close( $out1 ),			'close unresolved log file' );
ok( close( $out2 ),			'close resolved log file' );

ok( open( my $script,'>',$filter ),	'create script file' );
print $script <<EOD;
BEGIN {\@INC = qw(@INC)};
\$SIG{ALRM} = sub {};

use Storable qw(retrieve);
my \$ip2domain = retrieve( '$ip2domain' );

sub gethostbyaddr { select( undef,undef,undef,rand() ); \$ip2domain->{\$_[0]} }

use Thread::Pool::Resolve;
my \$r = Thread::Pool::Resolve->new(
 {
  optimize => shift,
  resolver => 'gethostbyaddr',
 }
)->read;
EOD
ok( close( $script ),			'close script file' );

diag( "Test simple external log resolve filter ($optimize)" );

ok( !system( "$^X $filter $optimize <$unresolved >$filtered" ), 'filter unresolved' );

ok( check( $resolved,$filtered ),	'check result of script filter' );

diag( "Test resolving from a file ($optimize)" );
$checkpointed = 0;
$resolve = Thread::Pool::Resolve->new(
 {
  status => \%status,
  optimize => $optimize,
  checkpoint => sub {$checkpointed++},
  frequency => 10,
  resolver => 'gethostbyaddr',
 },
 $filtered
);
isa_ok( $resolve,'Thread::Pool::Resolve', 'check object type' );
ok( $resolve->read( $unresolved ),	'read from file' );
$resolve->shutdown;
ok( check( $resolved,$filtered ),	'check result of file' );
cmp_ok( $checkpointed,'==',$checkpoints,'check checkpoints' );

diag( "Test resolving from an open()ed handle ($optimize)" );
ok( open( my $log,'<',$unresolved ),	'open GLOB log file' );
$resolve = Thread::Pool::Resolve->new(
 {
  status => \%status,
  optimize => $optimize,
  resolver => 'gethostbyaddr',
 },
 $filtered
);
isa_ok( $resolve,'Thread::Pool::Resolve', 'check object type' );
ok( $resolve->read( $log ),		'read from opened GLOB' );
$resolve->shutdown;
ok( check( $resolved,$filtered ),	'check result of opened GLOB' );
ok( close( $log ),			'close GLOB log file' );

diag( "Test resolving from an IO::File->new handle ($optimize)" );
$log = IO::File->new( $unresolved,'<' );
isa_ok( $log,'IO::Handle', 		'check IO::File handle' );
$resolve = Thread::Pool::Resolve->new(
 {
  status => \%status,
  optimize => $optimize,
  open => sub { IO::File->new( @_ ) },
  resolver => 'gethostbyaddr',
 },
 $filtered
);
isa_ok( $resolve,'Thread::Pool::Resolve', 'check object type' );
ok( $resolve->read( $log ),		'read from opened IO::File' );
$resolve->shutdown;
ok( check( $resolved,$filtered ),	'check result of opened IO::File' );
ok( close( $log ),			'close IO::File log file' );

diag( "Test resolving from a SCALAR handle ($optimize)" );
ok( open( $log,'<',$unresolved ),	'check opening unresolved file' );
my $scalar;
{local $/; $scalar = <$log>}
ok( close( $log ),			'check closing unresolved file' );

ok( open( $log,'<',\$scalar ),		'check SCALAR handle' );
my $output;
$resolve = Thread::Pool::Resolve->new(
 {
  optimize => $optimize,
  pre => sub { open( $output,'>',$filtered ) or die "$filtered: $!" },
  monitor => sub { print $output $_[0] },
  post => sub { close( $output ) },
  resolver => 'gethostbyaddr',
 }
);
isa_ok( $resolve,'Thread::Pool::Resolve', 'check object type' );
ok( $resolve->read( $log ),		'read from opened SCALAR' );
$resolve->shutdown;
ok( check( $resolved,$filtered ),	'check result of opened SCALAR' );
ok( close( $log ),			'close IO::File log file' );

diag( "Test resolving from a list ($optimize)" );
ok( open( $log,'<',$unresolved ),	'check opening unresolved file' );
my @array = <$log>;
ok( close( $log ),			'check closing unresolved file' );
$resolve = Thread::Pool::Resolve->new(
 {
  optimize => $optimize,
  resolver => 'gethostbyaddr',
 },
 $filtered
);
isa_ok( $resolve,'Thread::Pool::Resolve', 'check object type' );
ok( $resolve->lines( @array ),		'check object returned from lines' );
$resolve->shutdown;
ok( check( $resolved,$filtered ),	'check result from lines' );

diag( "Test resolving from different threads ($optimize)" );
@shared = @array;
$resolve = Thread::Pool::Resolve->new(
 {
  optimize => $optimize,
  resolver => 'gethostbyaddr',
 },
 $filtered
);
isa_ok( $resolve,'Thread::Pool::Resolve', 'check object type' );
my @thread;
push( @thread,threads->new( \&bythread ) ) foreach 1..10;
$_->join foreach @thread;
$resolve = undef; # needed to finalize streaming in time
ok( check( $resolved,$filtered ),	'check result different threads' );

diag( "Test resolving from Thread::Queue ($optimize)" );
@shared = (@array,undef);
my $queue = bless \@shared,'Thread::Queue';
isa_ok( $queue,'Thread::Queue',		'check object type' );
$resolve = Thread::Pool::Resolve->new(
 {
  status => \%status,
  optimize => $optimize,
  resolver => 'gethostbyaddr',
 },
 $filtered
);
isa_ok( $resolve,'Thread::Pool::Resolve', 'check object type' );
ok( $resolve->read( $queue ),		'check result of reading with queue' );
$resolve = undef;
ok( check( $resolved,$filtered ),	'check result Thread::Queue' );

diag( "Test resolving using Thread::Conveyor ($optimize)" );
my $belt = Thread::Conveyor->new( {optimize => $optimize} );
isa_ok( $belt,'Thread::Conveyor',	'check object type' );
$belt->put( $_ ) foreach @array,undef;
$resolve = Thread::Pool::Resolve->new(
 {
  optimize => $optimize,
  resolved => retrieve( $ip2domain ),
  resolver => 'gethostbyaddr',
 },
 $filtered
);
isa_ok( $resolve,'Thread::Pool::Resolve', 'check object type' );
ok( $resolve->read( $belt ),		'check result of reading with belt' );
$resolve = undef;
ok( check( $resolved,$filtered ),	'check result Thread::Conveyor' );

ok( unlink( $unresolved,$resolved,$filter,$filtered ), 'remove created files' );

#=======================================================================

# necessary subroutines

# adding resolve lines by shared array per thread

sub bythread {
  local ($_);
  READ: while (1) {
    {lock( @shared );
     my $line = shift( @shared );
     last READ unless defined( $line );
     $resolve->line( $line );
    }
  }
}

# resolve from the dummy hash

sub gethostbyaddr { select( undef,undef,undef,rand() ); $ip2domain{$_[0]} }

# check two files and return true if they are the same

sub check {

  my ($file1,$file2) = @_;
  open( my $handle1,'<',$file1 ) or return 0;
  open( my $handle2,'<',$file2 ) or return 0;

  while (readline( $handle1 )) {
    my $line = readline( $handle2 );
    return 0 unless defined($line);
    next if $line eq $_;
    return 0;
  }
  my $line = readline( $handle2 );
  return !defined($line);
}