use strict;
use warnings;
use lib 't'; use PETest;
use UniEvent::Error;
use UniEvent::TCP;
use Test::More;
use Scalar::Util qw/blessed/;
use Socket qw(PF_INET IPPROTO_TCP SOMAXCONN SOCK_STREAM inet_aton sockaddr_in);
# use Binder;
use CommonStream;
use Talkers;
use UniClient;
use IPC::Open2;
package MyTCP;
use parent 'UniEvent::TCP';
use Test::More;
sub DESTROY {
use Data::Dumper;
diag "DESTROYed".(${^GLOBAL_PHASE} and " in global destruction phase").": ".Dumper($_[0])."";
# use Devel::Peek;
# Dump($_[0]);
$_[0]->SUPER::DESTROY;
}
package main;
my $l = UniEvent::Loop->default_loop;
my $tcp = new MyTCP;
$$tcp = '$tcp';
my $p = new UniEvent::Prepare;
is($tcp->type, UniEvent::Handle::HTYPE_TCP, "new tcp object type");
my $magic_token = "MAGIC";
my $port;
my $initer = sub {
my $tcp = $_[0];
my $sock;
socket($sock, PF_INET, SOCK_STREAM, IPPROTO_TCP);
$port = Binder::bind2free(Binder::make_perl_bound_socket(
$sock, sub {
$tcp->open($_[0]);
$tcp->listen(1);
}));
return $port;
};
$initer->($tcp);
sub connect_writer_remote {
my ($port, $line) = @_;
UniClient::connect_remote($port, Talkers::make_writer($line));
}
eval {
ok(
CommonStream::test_serv_reading($tcp, sub {$port}, $magic_token, \&connect_writer_remote),
"Recieving what was robustly sent. (bind - Perl)"
);
1;
} or diag $@;
sub connect_echo_remote {
my $port = $_[0];
UniClient::connect_remote($port, \&Talkers::echo);
}
use Devel::Peek;
ok(
CommonStream::test_serv_writing($tcp, sub {$port}, $magic_token, \&connect_echo_remote),
"Robustly recieving what was sent. (bind - Perl)"
);
done_testing();
exit 0;
$tcp->reset();
$port = CommonStream::regular_bind($tcp);
ok(
CommonStream::test_serv_reading($tcp, sub {$port}, $magic_token, \&connect_writer_remote),
"Recieved what was robustly sent. (bind - full UV)");
ok(
CommonStream::test_serv_writing($tcp, sub {$port}, $magic_token, \&connect_echo_remote),
"Robustly recieving what was sent. (bind - full UV)");
$tcp->reset();
CommonStream::run_now($tcp, CommonStream::make_binder(sub { $_[0]->bind(Binder::sa($_[1])) }, \$port));
ok(
CommonStream::test_serv_reading($tcp, sub {$port}, $magic_token, \&connect_writer_remote),
"Recieved what was robustly sent. (bind - UV)");
my $ok;
$p->start(
sub {
$tcp->asyncq_run(
sub {
my $h = $_[0];
$ok = eval {$h->bind(Binder::sa($port)); $h->listen(); 1};
$l->stop();
});
});
$tcp->reset();
$l->run();
$p->stop();
ok($ok, 'asyncq_run() with bind is ok (no ref)');
ok(
CommonStream::test_serv_writing($tcp, sub {$port}, $magic_token, \&connect_echo_remote),
"Robustly recieving what was sent. (bind - UV)");
sub test_client {
my ($h, $action, $perl_args) = @_;
local $/;
my ($f, $dummy);
my $pid = open2 $f, $dummy, "perl", @$perl_args;
my $port = <$f>;
$action->($h, $port);
eval { $l->run(); 1 } or diag "Fucked up: $@";
}
(my $h, $ok) = CommonStream::make_checker($magic_token, blessed $tcp);
test_client(
$h,
sub {
$_[0]->connect('localhost', $_[1]);
},
["./t/serv-writer-remote.pl", "$magic_token"]);
ok($$ok, "Reading as client");
($h, $ok) = CommonStream::make_checker($magic_token, blessed $tcp);
test_client(
$h,
sub {
my $sa = UniEvent::inet_ptos('localhost', $_[1]);
$_[0]->connect($sa, sub {});
},
["./t/serv-writer-remote.pl", "$magic_token"]);
ok($$ok, "Reading as client, known sockaddr, callback");
my $act;
$act = sub {
my $h = $_[0];
$h->write($magic_token);
$h->shutdown();
};
($h, $ok) = CommonStream::make_checker($magic_token, blessed $tcp);
test_client($h, $act, ["./t/serv-reader-remote.pl"]);
ok($$ok, "Writing as client");
$SIG{PIPE} = 'IGNORE';
my $ok_order = 0;
my $bullshit = "BULLSHIT";
$act = sub {
my $h = $_[0];
$h->write($magic_token, sub { $ok_order = 1 });
$p->start(
sub {
$h->write($bullshit, sub {});
$h->shutdown();
$p->stop();
}
);
};
($h, $ok) = CommonStream::make_checker($magic_token.$bullshit, blessed $tcp);
test_client($h, $act, ["./t/serv-reader-remote.pl"]);
ok($$ok, "Write callbacks replaced before first write finished - reading");
ok($ok_order, "Write callbacks replaced before first write finished - callback assignment");
use constant CONN_NUM => 10;
sub test_serv_read_shuffling {
my $count = CONN_NUM;
my $count_finish = CONN_NUM;
my $count_ok = CONN_NUM;
my %clients;
$tcp->weak(0);
$tcp->connection_callback(
sub {
my ($tcp, $err) = @_;
my $built_str;
if ($count--) {
my $client = new MyTCP;
$tcp->accept($client);
my @inet_p = UniEvent::inet_stop($client->getpeername());
$clients{$client} = { self => $client, port => $inet_p[1] };
$client->read_callback(
sub {
my ($c, $str, $err) = @_;
$built_str .= $str;
}
);
$client->eof_callback(
sub {
# diag "It's okay to be gay";
my $cr = delete $clients{$_[0]};
my $expected = $cr->{port};
if ($built_str eq $expected) {
--$count_ok;
}
if (!--$count_finish) {
$tcp->weak(1);
}
}
);
$client->shutdown();
}
}
);
$p->start(
sub {
do {
CommonStream::concurrent_sub(
sub {
my $port = $_[0];
UniClient::connect_remote(
$port,
sub {
my $sock = $_[0];
my ($port) = sockaddr_in(getsockname($sock));
print $sock $port;
});
},
$port
);
} while (--$count);
$count = CONN_NUM;
$p->stop();
}
);
$l->run();
return $count_ok;
}
is(test_serv_read_shuffling(), 0, "Server reads shuffling");
sub test_serv_rw_shuffling {
my $count = CONN_NUM;
my $count_finish = CONN_NUM;
my $count_ok = CONN_NUM;
my %clients;
$tcp->weak(0);
$tcp->connection_callback(
sub {
my ($tcp, $err) = @_;
my $built_str;
if ($count) {
my $client = new UniEvent::TCP;
$tcp->accept($client);
$clients{$client} = { self => $client, expected => $count };
$client->read_callback(
sub {
my ($c, $str, $err) = @_;
$built_str .= $str;
}
);
$client->eof_callback(
sub {
my $cr = delete $clients{$_[0]};
my $expected = $cr->{expected};
if ($built_str eq $expected) {
--$count_ok;
}
if (!--$count_finish) {
$tcp->weak(1);
}
}
);
$client->write($count);
$client->shutdown();
$count--;
}
}
);
$p->start(
sub {
do {
CommonStream::concurrent_sub \&connect_echo_remote, $port;
} while (--$count);
$count = CONN_NUM;
$p->stop();
}
);
$l->run();
return $count_ok;
}
is(test_serv_rw_shuffling(), 0, "Server rw shuffling");
sub test_self_pleasing {
my @connect_params = @_;
my ($c_wcb_ok, $s_wcb_ok);
my ($c_scb_ok, $s_scb_ok);
my $ss = new UniEvent::TCP;
my $cs = new UniEvent::TCP;
my ($ct, $st) = (rand, rand);
my ($cr, $sr);
my ($s_eof, $c_eof);
$ss->read_callback(
sub {
$sr .= $_[1];
});
$ss->eof_callback(
sub {
$_[0]->write($st);
$_[0]->shutdown();
# diag "Server EOF";
$s_eof = 1;
$l->stop() if ($c_eof && $s_eof);
});
$ss->write_callback(
sub {
# diag "Server wrote";
$s_wcb_ok = 1;
});
$ss->shutdown_callback(
sub {
$s_scb_ok = 1;
});
$cs->read_callback(
sub {
# diag "Client read: data='$_[1]'";
$cr .= $_[1];
});
$cs->eof_callback(
sub {
# diag "Client EOF";
$c_eof = 1;
$l->stop() if ($c_eof && $s_eof);
});
$cs->write_callback(
sub {
# diag "Client wrote";
$c_wcb_ok = 1;
});
$cs->shutdown_callback(
sub {
$_[0]->want_read(1);
$c_scb_ok = 1;
});
CommonStream::to_listener(
$tcp, $ss, sub {
});
$cs->connect(
@connect_params,
sub {
$_[0]->want_read(0);
$_[0]->write($ct);
$_[0]->shutdown();
});
eval { $l->run(); 1 } or diag "Error: $@";
# diag "Action finished";
ok($c_wcb_ok);
ok($s_wcb_ok);
ok($c_scb_ok);
ok($s_scb_ok);
is($cr, $st, "Client got right data");
is($sr, $ct, "Server got right data");
}
test_self_pleasing('127.0.0.1', $port, undef);
my ($err, $gai) = Socket::getaddrinfo('localhost', $port, {family => Socket::AF_INET, protocol => Socket::IPPROTO_TCP});
test_self_pleasing($gai->{addr});
# use Data::Dumper;
# diag "XYU";
# map {
# my ($err, $h, $s) = Socket::getnameinfo($_->{addr}, Socket::NI_NUMERICHOST | Socket::NI_NUMERICSERV);
# $_->{addr} = {host => $h, port => $s};
# } @gais;
# diag Dumper($gais[0]);
done_testing();