use strict;
use warnings;
use feature 'state';
use lib 't'; use PETest;
use UniEvent::Error;
use UniEvent::Pipe;
use Test::More;
use Scalar::Util qw/blessed weaken/;
use Socket qw(PF_UNIX SOMAXCONN SOCK_STREAM pack_sockaddr_un);
# use Binder;
use CommonStream;
use UniClient;
use Talkers;
use IPC::Open2;
use subs 'bind';
package Marker;
sub new {
return bless {}, $_[0];
}
sub DESTROY {
# print STDERR "marker DESTROYed\n";
}
package main;
package MyPipe;
use parent 'UniEvent::Pipe';
sub DESTROY {
# use Data::Dumper;
# print STDERR "DESTROYed: ".Dumper($_[0])."\n";
# use Devel::Peek;
# # Dump($_[0]);
$_[0]->SUPER::DESTROY;
}
package main;
my %bind_points;
sub sock_path {
state $last_name;
my $sock = shift;
$bind_points{$sock} ||= shift;
my $res_ref = \($bind_points{$sock});
unless ($$res_ref) {
++$last_name;
my $fn = "$last_name.pipe";
# diag (("#" x 80) . "\n allocated pipe $fn\n" . ("#" x 80));
$$res_ref = PETest::var $fn;
}
return $$res_ref;
}
sub bind {
my $obj = shift @_;
if (blessed $obj and $obj->can('bind')) {
$obj->bind(@_);
} else {
my $paddr = pack_sockaddr_un($_[0]);
CORE::bind $obj, $paddr;
}
}
sub new_bind {
die 'WTF!!!' if $#_;
my $obj = shift;
delete $bind_points{$obj};
my $path = sock_path($obj);
bind($obj, $path);
}
my $l = UniEvent::Loop->default_loop;
sub check_active_num {
# my $expected = $_[0];
# my @act_hs;
# $l->walk(
# sub {
# push @act_hs, [sock_path($_[0], "CLIENT"), blessed $_[0]] if ($_[0]->active());
# });
# use Data::Dumper;
# # diag "check_active_num: " . Dumper(\@act_hs);
# is(scalar @act_hs, $expected, "chec_active_num") if $expected;
}
my $acceptor = new UniEvent::Pipe;
my $p = new UniEvent::Prepare;
is($acceptor->type, UniEvent::Handle::HTYPE_NAMED_PIPE, "new pipe object type");
my $magic_token = "MAGIC";
my $initer = sub {
my $acceptor = $_[0];
socket(my $sock, PF_UNIX, SOCK_STREAM, 0);
new_bind($sock);
$acceptor->open($sock);
$acceptor->listen();
return sock_path($acceptor, sock_path($sock));
};
check_active_num 0;
# diag "!";
$initer->($acceptor);
ok(eval {$acceptor->getsockname(); 1}, "getsockname()");
# diag "@";
TODO: {
local $TODO = "Not working by unknown reasons, probably getpeername()/getsockname() bugs are the key";
is(test_serv_read_shuffling(), 0, "Server reads shuffling");
}
sub connect_writer_local {
my $port = shift;
UniClient::connect_local($port, Talkers::make_writer(@_));
}
eval {
ok(
CommonStream::test_serv_reading($acceptor, \&sock_path, $magic_token, \&connect_writer_local),
"Recieving what was robustly sent. (bind - Perl)"
);
1;
} or diag $@;
check_active_num 1;
sub connect_echo_local {
my ($target, $source) = @_;
UniClient::connect_local($target, $source, \&Talkers::echo);
}
ok(
CommonStream::test_serv_writing($acceptor, \&sock_path, $magic_token, \&connect_echo_local),
"Robustly recieving what was sent. (bind - Perl)"
);
$acceptor->reset();
check_active_num 1;
CommonStream::run_now(
$acceptor,
sub {
new_bind($_[0]);
$_[0]->listen();
});
ok(
CommonStream::test_serv_reading($acceptor, \&sock_path, $magic_token, \&connect_writer_local),
"Recieved what was robustly sent. (bind - UV)");
check_active_num 1;
my $ok;
$p->start(
sub {
$acceptor->asyncq_run(
sub {
my $h = $_[0];
$ok = eval {new_bind($h); $h->listen(); 1};
$l->stop();
});
});
$acceptor->reset();
$l->run();
$p->stop();
ok($ok, 'asyncq_run() with bind is ok (no ref)');
check_active_num 1;
ok(
CommonStream::test_serv_writing($acceptor, \&sock_path, $magic_token, \&connect_echo_local),
"Robustly recieving what was sent. (bind - UV)");
check_active_num 1;
sub test_client {
my ($h, $action, $perl_args) = @_;
my $path = sock_path($h);
local $/;
my ($f, $dummy);
my $pid = open2 $f, $dummy, "perl", @$perl_args, $path;
my $waiter = <$f>;
eval { $h->connect($path); 1 } or diag "Fucked up connect(): $@";
$h->connect_callback(
sub {
diag $_[1] if $_[1];
});
$action->($h);
eval { $l->run(); 1 } or diag "Fucked up: $@";
}
my $h;
($h, $ok) = CommonStream::make_checker($magic_token, blessed $acceptor);
test_client($h, sub {}, ["./t/serv-writer-local.pl", "$magic_token"]);
ok($$ok, "Reading as client");
check_active_num 1;
is(test_serv_rw_shuffling(), 0, "Server rw shuffling");
check_active_num 1;
my $act;
$act = sub {
my $h = $_[0];
$h->write($magic_token);
$h->shutdown();
};
($h, $ok) = CommonStream::make_checker($magic_token, blessed $acceptor);
test_client($h, $act, ["./t/serv-reader-local.pl"]);
ok($$ok, "Writing as client");
check_active_num 1;
$SIG{PIPE} = 'IGNORE';
my $ok_order = 0;
my $bullshit = "BULLSHIT";
$act = sub {
my $h = $_[0];
$h->write($magic_token, sub { $ok_order = 1 });
$p->start(
sub {
$h->write($bullshit, sub {});
$h->shutdown();
$p->stop();
}
);
};
($h, $ok) = CommonStream::make_checker($magic_token.$bullshit, blessed $acceptor);
test_client($h, $act, ["./t/serv-reader-local.pl"]);
ok($$ok, "Write callbacks replaced before first write finished - reading");
ok($ok_order, "Write callbacks replaced before first write finished - callback assignment");
check_active_num 1;
use constant CONN_NUM => 10;
sub test_serv_read_shuffling {
use Devel::Peek;
my $count = CONN_NUM;
my $count_finish = CONN_NUM;
my $count_ok = CONN_NUM;
my %clients;
$acceptor->weak(0);
$acceptor->connection_callback(
sub {
my ($acceptor, $err) = @_;
diag $err if $err;
my $built_str;
if ($count--) {
my $client = new MyPipe;
$acceptor->accept($client);
use Data::Dumper;
diag Dumper($client);
my $un_p = '';
eval { $client->getpeername() };
diag Dumper($un_p);
$clients{$client} = { self => $client, peername => $un_p };
$client->read_callback(
sub {
my ($c, $str, $err) = @_;
$built_str .= $str;
diag $str;
}
);
$client->eof_callback(
sub {
my $cr = delete $clients{$_[0]};
my $expected = $cr->{peername};
if ($built_str eq $expected) {
--$count_ok;
}
if (!--$count_finish) {
$acceptor->weak(1);
}
}
);
$client->shutdown();
}
}
);
$p->start(
sub {
do {
CommonStream::concurrent_sub(
sub {
my ($target, $source) = @_;
UniClient::connect_local(
$target, $source,
sub {
my $sock = $_[0];
print $sock getsockname($sock);
}
);
},
sock_path($acceptor),
PETest::var("client-$count")
);
} while (--$count);
$count = CONN_NUM;
$p->stop();
}
);
$l->run();
# diag "That's o'kay";
return $count_ok;
}
sub test_serv_rw_shuffling {
my $count = CONN_NUM;
my $count_finish = CONN_NUM;
my $count_ok = CONN_NUM;
my %clients;
$acceptor->weak(0);
$acceptor->connection_callback(
sub {
my ($acceptor, $err) = @_;
my $built_str;
if ($count) {
my $client = (blessed $acceptor)->new();
$acceptor->accept($client);
$clients{$client} = { self => $client, expected => $count };
$client->read_callback(
sub {
my ($c, $str, $err) = @_;
$built_str .= $str;
}
);
$client->eof_callback(
sub {
my $cr = delete $clients{$_[0]};
my $expected = $cr->{expected};
if ($built_str eq $expected) {
--$count_ok;
}
if (!--$count_finish) {
$acceptor->weak(1);
}
}
);
$client->write($count);
$client->shutdown();
$count--;
}
}
);
$p->start(
sub {
do {
CommonStream::concurrent_sub(\&connect_echo_local, sock_path($acceptor));
} while (--$count);
$count = CONN_NUM;
$p->stop();
}
);
$l->run();
return $count_ok;
}
sub test_self_pleasing {
my @connect_params = @_;
my ($c_wcb_ok, $s_wcb_ok);
my ($c_scb_ok, $s_scb_ok);
my $ss = (blessed $acceptor)->new();
my $cs = (blessed $acceptor)->new();
my ($ct, $st) = (rand, rand);
my ($cr, $sr);
my ($s_eof, $c_eof);
$ss->read_callback(
sub {
$sr .= $_[1];
});
$ss->eof_callback(
sub {
$_[0]->write($st);
$_[0]->shutdown();
# diag "Server EOF";
$s_eof = 1;
$l->stop() if ($c_eof && $s_eof);
});
$ss->write_callback(
sub {
# diag "Server wrote";
$s_wcb_ok = 1;
});
$ss->shutdown_callback(
sub {
$s_scb_ok = 1;
});
$cs->read_callback(
sub {
# diag "Client read: data='$_[1]'";
$cr .= $_[1];
});
$cs->eof_callback(
sub {
# diag "Client EOF";
$c_eof = 1;
$l->stop() if ($c_eof && $s_eof);
});
$cs->write_callback(
sub {
# diag "Client wrote";
$c_wcb_ok = 1;
});
$cs->shutdown_callback(
sub {
$_[0]->want_read(1);
$c_scb_ok = 1;
});
CommonStream::to_listener(
$acceptor, $ss, sub {
});
$cs->connect(
@connect_params,
sub {
$_[0]->want_read(0);
$_[0]->write($ct);
$_[0]->shutdown();
});
eval { $l->run(); 1 } or diag "Error: $@";
# diag "Action finished";
ok($c_wcb_ok);
ok($s_wcb_ok);
ok($c_scb_ok);
ok($s_scb_ok);
is($cr, $st, "Client got right data");
is($sr, $ct, "Server got right data");
}
test_self_pleasing(sock_path($acceptor));
# use Data::Dumper;
# diag "XYU";
# map {
# my ($err, $h, $s) = Socket::getnameinfo($_->{addr}, Socket::NI_NUMERICHOST | Socket::NI_NUMERICSERV);
# $_->{addr} = {host => $h, port => $s};
# } @gais;
# diag Dumper($gais[0]);
done_testing();