use strict;
use warnings;
use lib 't'; use PETest;
use UniEvent::Error;
use UniEvent::TCP;
use Test::More;
use Scalar::Util qw/blessed/;
use Socket qw(PF_INET IPPROTO_TCP SOMAXCONN SOCK_STREAM inet_aton sockaddr_in);
# use Binder;
use CommonStream;
use Talkers;
use UniClient;
use IPC::Open2;


package MyTCP;
use parent 'UniEvent::TCP';
use Test::More;

sub DESTROY {
    use Data::Dumper;
    diag "DESTROYed".(${^GLOBAL_PHASE} and " in global destruction phase").": ".Dumper($_[0])."";
    # use Devel::Peek;
    # Dump($_[0]);
    $_[0]->SUPER::DESTROY;
}

package main;

my $l = UniEvent::Loop->default_loop;
my $tcp = new MyTCP;
$$tcp = '$tcp';
my $p = new UniEvent::Prepare;

is($tcp->type, UniEvent::Handle::HTYPE_TCP, "new tcp object type");

my $magic_token = "MAGIC";

my $port;

my $initer = sub {
    my $tcp = $_[0];
    my $sock;
    socket($sock, PF_INET, SOCK_STREAM, IPPROTO_TCP);
    $port = Binder::bind2free(Binder::make_perl_bound_socket(
	$sock, sub {
	    $tcp->open($_[0]);
	    $tcp->listen(1);
	}));
    return $port;
};

$initer->($tcp);

sub connect_writer_remote {
    my ($port, $line) = @_;
    UniClient::connect_remote($port, Talkers::make_writer($line));
}

eval {
    ok(
	CommonStream::test_serv_reading($tcp, sub {$port}, $magic_token, \&connect_writer_remote),
	"Recieving what was robustly sent. (bind - Perl)"
       );
    1;
} or diag $@;

sub connect_echo_remote {
    my $port = $_[0];
    UniClient::connect_remote($port, \&Talkers::echo);
}

use Devel::Peek;

ok(
    CommonStream::test_serv_writing($tcp, sub {$port}, $magic_token, \&connect_echo_remote),
    "Robustly recieving what was sent. (bind - Perl)"
   );

done_testing();
exit 0;

$tcp->reset();

$port = CommonStream::regular_bind($tcp);

ok(
    CommonStream::test_serv_reading($tcp, sub {$port}, $magic_token, \&connect_writer_remote),
    "Recieved what was robustly sent. (bind - full UV)");

ok(
    CommonStream::test_serv_writing($tcp, sub {$port}, $magic_token, \&connect_echo_remote),
    "Robustly recieving what was sent. (bind - full UV)");

$tcp->reset();

CommonStream::run_now($tcp, CommonStream::make_binder(sub { $_[0]->bind(Binder::sa($_[1])) }, \$port));

ok(
    CommonStream::test_serv_reading($tcp, sub {$port}, $magic_token, \&connect_writer_remote),
    "Recieved what was robustly sent. (bind - UV)");


my $ok;

$p->start(
    sub {
	$tcp->asyncq_run(
	    sub {
		my $h = $_[0];
		$ok = eval {$h->bind(Binder::sa($port)); $h->listen(); 1};
		$l->stop();
	    });
    });
$tcp->reset();
$l->run();
$p->stop();
ok($ok, 'asyncq_run() with bind is ok (no ref)');

ok(
    CommonStream::test_serv_writing($tcp, sub {$port}, $magic_token, \&connect_echo_remote),
    "Robustly recieving what was sent. (bind - UV)");

sub test_client {
    my ($h, $action, $perl_args) = @_;
    local $/;
    my ($f, $dummy);
    my $pid = open2 $f, $dummy, "perl", @$perl_args;
    my $port = <$f>;
    $action->($h, $port);
    eval { $l->run(); 1 } or diag "Fucked up: $@";
}

(my $h, $ok) = CommonStream::make_checker($magic_token, blessed $tcp);
test_client(
    $h,
    sub {
        $_[0]->connect('localhost', $_[1]);
    },
    ["./t/serv-writer-remote.pl", "$magic_token"]);
ok($$ok, "Reading as client");

($h, $ok) = CommonStream::make_checker($magic_token, blessed $tcp);
test_client(
    $h,
    sub {
        my $sa = UniEvent::inet_ptos('localhost', $_[1]);
        $_[0]->connect($sa, sub {});
    },
    ["./t/serv-writer-remote.pl", "$magic_token"]);
ok($$ok, "Reading as client, known sockaddr, callback");

my $act;

$act = sub {
    my $h = $_[0];
    $h->write($magic_token);
    $h->shutdown();
};
($h, $ok) = CommonStream::make_checker($magic_token, blessed $tcp);
test_client($h, $act, ["./t/serv-reader-remote.pl"]);
ok($$ok, "Writing as client");

$SIG{PIPE} = 'IGNORE';

my $ok_order = 0;
my $bullshit = "BULLSHIT";
$act = sub {
    my $h = $_[0];
    $h->write($magic_token, sub { $ok_order = 1 });
    $p->start(
	sub {
	    $h->write($bullshit, sub {});
	    $h->shutdown();
	    $p->stop();
	}
       );
};
($h, $ok) = CommonStream::make_checker($magic_token.$bullshit, blessed $tcp);
test_client($h, $act, ["./t/serv-reader-remote.pl"]);
ok($$ok, "Write callbacks replaced before first write finished - reading");
ok($ok_order, "Write callbacks replaced before first write finished - callback assignment");

use constant CONN_NUM => 10;

sub test_serv_read_shuffling {
    my $count = CONN_NUM;
    my $count_finish = CONN_NUM;
    my $count_ok = CONN_NUM;
    my %clients;
    $tcp->weak(0);
    $tcp->connection_callback(
	sub {
	    my ($tcp, $err) = @_;
	    my $built_str;
	    if ($count--) {
		my $client = new MyTCP;
		$tcp->accept($client);
		my @inet_p = UniEvent::inet_stop($client->getpeername());
		$clients{$client} = { self => $client, port => $inet_p[1] };
		$client->read_callback(
		    sub {
			my ($c, $str, $err) = @_;
			$built_str .= $str;
		    }
		   );
		$client->eof_callback(
		    sub {
			# diag "It's okay to be gay";
			my $cr = delete $clients{$_[0]};
			my $expected = $cr->{port};
			if ($built_str eq $expected) {
			    --$count_ok;
			}
			if (!--$count_finish) {
			    $tcp->weak(1);
			}
		    }
		   );
		$client->shutdown();
	    }
	}
       );
    $p->start(
	sub {
	    do {
		CommonStream::concurrent_sub(
                    sub {
                        my $port = $_[0];
                        UniClient::connect_remote(
                            $port,
                            sub {
                                my $sock = $_[0];
                                my ($port) = sockaddr_in(getsockname($sock));
                                print $sock $port;
                            });
                    },
                    $port
                   );
	    } while (--$count);
	    $count = CONN_NUM;
	    $p->stop();
	}
       );
    $l->run();
    return $count_ok;
}

is(test_serv_read_shuffling(), 0, "Server reads shuffling");

sub test_serv_rw_shuffling {
    my $count = CONN_NUM;
    my $count_finish = CONN_NUM;
    my $count_ok = CONN_NUM;
    my %clients;
    $tcp->weak(0);
    $tcp->connection_callback(
	sub {
	    my ($tcp, $err) = @_;
	    my $built_str;
	    if ($count) {
		my $client = new UniEvent::TCP;
		$tcp->accept($client);
		$clients{$client} = { self => $client, expected => $count };
		$client->read_callback(
		    sub {
			my ($c, $str, $err) = @_;
			$built_str .= $str;
		    }
		   );
		$client->eof_callback(
		    sub {
			my $cr = delete $clients{$_[0]};
			my $expected = $cr->{expected};
			if ($built_str eq $expected) {
			    --$count_ok;
			}
			if (!--$count_finish) {
			    $tcp->weak(1);
			}
		    }
		   );
		$client->write($count);
		$client->shutdown();
		$count--;
	    }
	}
       );
    $p->start(
	sub {
	    do {
		CommonStream::concurrent_sub \&connect_echo_remote, $port;
	    } while (--$count);
	    $count = CONN_NUM;
	    $p->stop();
	}
       );
    $l->run();
    return $count_ok;
}

is(test_serv_rw_shuffling(), 0, "Server rw shuffling");

sub test_self_pleasing {
    my @connect_params = @_;
    my ($c_wcb_ok, $s_wcb_ok);
    my ($c_scb_ok, $s_scb_ok);
    my $ss = new UniEvent::TCP;
    my $cs = new UniEvent::TCP;
    my ($ct, $st) = (rand, rand);
    my ($cr, $sr);
    my ($s_eof, $c_eof);
    $ss->read_callback(
        sub {
            $sr .= $_[1];
        });
    $ss->eof_callback(
        sub {
            $_[0]->write($st);
            $_[0]->shutdown();
            # diag "Server EOF";
            $s_eof = 1;
            $l->stop() if ($c_eof && $s_eof);
        });
    $ss->write_callback(
        sub {
            # diag "Server wrote";
            $s_wcb_ok = 1;
        });
    $ss->shutdown_callback(
        sub {
            $s_scb_ok = 1;
        });
    $cs->read_callback(
        sub {
            # diag "Client read: data='$_[1]'";
            $cr .= $_[1];
        });
    $cs->eof_callback(
        sub {
            # diag "Client EOF";
            $c_eof = 1;
            $l->stop() if ($c_eof && $s_eof);
        });
    $cs->write_callback(
        sub {
            # diag "Client wrote";
            $c_wcb_ok = 1;
        });
    $cs->shutdown_callback(
        sub {
            $_[0]->want_read(1);
            $c_scb_ok = 1;
        });
    CommonStream::to_listener(
        $tcp, $ss, sub {
        });
    $cs->connect(
        @connect_params,
        sub {
            $_[0]->want_read(0);
            $_[0]->write($ct);
            $_[0]->shutdown();
        });
    eval { $l->run(); 1 } or diag "Error: $@";
    # diag "Action finished";
    ok($c_wcb_ok);
    ok($s_wcb_ok);
    ok($c_scb_ok);
    ok($s_scb_ok);
    is($cr, $st, "Client got right data");
    is($sr, $ct, "Server got right data");
}

test_self_pleasing('127.0.0.1', $port, undef);
my ($err, $gai) = Socket::getaddrinfo('localhost', $port, {family => Socket::AF_INET, protocol => Socket::IPPROTO_TCP});
test_self_pleasing($gai->{addr});
# use Data::Dumper;
# diag "XYU";
# map {
#     my ($err, $h, $s) = Socket::getnameinfo($_->{addr}, Socket::NI_NUMERICHOST | Socket::NI_NUMERICSERV);
#     $_->{addr} = {host => $h, port => $s};
# } @gais;
# diag Dumper($gais[0]);

done_testing();