use strict;
use warnings;
use feature 'state';
use lib 't'; use PETest;
use UniEvent::Error;
use UniEvent::Pipe;
use Test::More;
use Scalar::Util qw/blessed weaken/;
use Socket qw(PF_UNIX SOMAXCONN SOCK_STREAM pack_sockaddr_un);
# use Binder;
use CommonStream;
use UniClient;
use Talkers;
use IPC::Open2;
use subs 'bind';

package Marker;

sub new {
    return bless {}, $_[0];
}

sub DESTROY {
    # print STDERR "marker DESTROYed\n";
}

package main;

package MyPipe;
use parent 'UniEvent::Pipe';

sub DESTROY {
    # use Data::Dumper;
    # print STDERR "DESTROYed: ".Dumper($_[0])."\n";
    # use Devel::Peek;
    # # Dump($_[0]);
    $_[0]->SUPER::DESTROY;
}

package main;

my %bind_points;

sub sock_path {
    state $last_name;
    my $sock = shift;
    $bind_points{$sock} ||= shift;
    my $res_ref = \($bind_points{$sock});
    unless ($$res_ref) {
	++$last_name;
	my $fn = "$last_name.pipe";
	# diag (("#" x 80) . "\n allocated pipe $fn\n"  . ("#" x 80));
	$$res_ref = PETest::var $fn;
    }
    return $$res_ref;
}

sub bind {
    my $obj = shift @_;
    if (blessed $obj and $obj->can('bind')) {
	$obj->bind(@_);
    } else {
	my $paddr = pack_sockaddr_un($_[0]);
	CORE::bind $obj, $paddr;
    }
}

sub new_bind {
    die 'WTF!!!' if $#_;
    my $obj = shift;
    delete $bind_points{$obj};
    my $path = sock_path($obj);
    bind($obj, $path);
}

my $l = UniEvent::Loop->default_loop;

sub check_active_num {
    # my $expected = $_[0];
    # my @act_hs;
    # $l->walk(
    # 	sub {
    # 	    push @act_hs, [sock_path($_[0], "CLIENT"), blessed $_[0]] if ($_[0]->active());
    # 	});
    # use Data::Dumper;
    # # diag "check_active_num: " . Dumper(\@act_hs);
    # is(scalar @act_hs, $expected, "chec_active_num") if $expected;
}

my $acceptor = new UniEvent::Pipe;
my $p = new UniEvent::Prepare;

is($acceptor->type, UniEvent::Handle::HTYPE_NAMED_PIPE, "new pipe object type");

my $magic_token = "MAGIC";

my $initer = sub {
    my $acceptor = $_[0];
    socket(my $sock, PF_UNIX, SOCK_STREAM, 0);
    new_bind($sock);
    $acceptor->open($sock);
    $acceptor->listen();
    return sock_path($acceptor, sock_path($sock));
};

check_active_num 0;

# diag "!";
$initer->($acceptor);
ok(eval {$acceptor->getsockname(); 1}, "getsockname()");
# diag "@";

TODO: {
    local $TODO = "Not working by unknown reasons, probably getpeername()/getsockname() bugs are the key";
    is(test_serv_read_shuffling(), 0, "Server reads shuffling");
}

sub connect_writer_local {
    my $port = shift;
    UniClient::connect_local($port, Talkers::make_writer(@_));
}

eval {
    ok(
	CommonStream::test_serv_reading($acceptor, \&sock_path, $magic_token, \&connect_writer_local),
	"Recieving what was robustly sent. (bind - Perl)"
       );
    1;
} or diag $@;

check_active_num 1;

sub connect_echo_local {
    my ($target, $source) = @_;
    UniClient::connect_local($target, $source, \&Talkers::echo);
}

ok(
    CommonStream::test_serv_writing($acceptor, \&sock_path, $magic_token, \&connect_echo_local),
    "Robustly recieving what was sent. (bind - Perl)"
   );

$acceptor->reset();

check_active_num 1;

CommonStream::run_now(
    $acceptor,
    sub {
	new_bind($_[0]);
	$_[0]->listen();
    });

ok(
    CommonStream::test_serv_reading($acceptor, \&sock_path, $magic_token, \&connect_writer_local),
    "Recieved what was robustly sent. (bind - UV)");

check_active_num 1;

my $ok;

$p->start(
    sub {
	$acceptor->asyncq_run(
	    sub {
		my $h = $_[0];
		$ok = eval {new_bind($h); $h->listen(); 1};
		$l->stop();
	    });
    });
$acceptor->reset();
$l->run();
$p->stop();
ok($ok, 'asyncq_run() with bind is ok (no ref)');

check_active_num 1;

ok(
    CommonStream::test_serv_writing($acceptor, \&sock_path, $magic_token, \&connect_echo_local),
    "Robustly recieving what was sent. (bind - UV)");

check_active_num 1;

sub test_client {
    my ($h, $action, $perl_args) = @_;
    my $path = sock_path($h);
    local $/;
    my ($f, $dummy);
    my $pid = open2 $f, $dummy, "perl", @$perl_args, $path;
    my $waiter = <$f>;
    eval { $h->connect($path); 1 } or diag "Fucked up connect(): $@";
    $h->connect_callback(
	sub {
	    diag $_[1] if $_[1];
	});
    $action->($h);
    eval { $l->run(); 1 } or diag "Fucked up: $@";
}

my $h;

($h, $ok) = CommonStream::make_checker($magic_token, blessed $acceptor);
test_client($h, sub {}, ["./t/serv-writer-local.pl", "$magic_token"]);
ok($$ok, "Reading as client");

check_active_num 1;

is(test_serv_rw_shuffling(), 0, "Server rw shuffling");

check_active_num 1;

my $act;

$act = sub {
    my $h = $_[0];
    $h->write($magic_token);
    $h->shutdown();
};
($h, $ok) = CommonStream::make_checker($magic_token, blessed $acceptor);
test_client($h, $act, ["./t/serv-reader-local.pl"]);
ok($$ok, "Writing as client");

check_active_num 1;

$SIG{PIPE} = 'IGNORE';

my $ok_order = 0;
my $bullshit = "BULLSHIT";
$act = sub {
    my $h = $_[0];
    $h->write($magic_token, sub { $ok_order = 1 });
    $p->start(
	sub {
	    $h->write($bullshit, sub {});
	    $h->shutdown();
	    $p->stop();
	}
       );
};
($h, $ok) = CommonStream::make_checker($magic_token.$bullshit, blessed $acceptor);
test_client($h, $act, ["./t/serv-reader-local.pl"]);
ok($$ok, "Write callbacks replaced before first write finished - reading");
ok($ok_order, "Write callbacks replaced before first write finished - callback assignment");

check_active_num 1;

use constant CONN_NUM => 10;

sub test_serv_read_shuffling {
    use Devel::Peek;
    my $count = CONN_NUM;
    my $count_finish = CONN_NUM;
    my $count_ok = CONN_NUM;
    my %clients;
    $acceptor->weak(0);
    $acceptor->connection_callback(
	sub {
	    my ($acceptor, $err) = @_;
	    diag $err if $err;
	    my $built_str;
	    if ($count--) {
		my $client = new MyPipe;
		$acceptor->accept($client);
                use Data::Dumper;
                diag Dumper($client);
                my $un_p = '';
		eval { $client->getpeername() };
                diag Dumper($un_p);
		$clients{$client} = { self => $client, peername => $un_p };
		$client->read_callback(
		    sub {
			my ($c, $str, $err) = @_;
			$built_str .= $str;
                        diag $str;
		    }
		   );
		$client->eof_callback(
		    sub {
			my $cr = delete $clients{$_[0]};
			my $expected = $cr->{peername};
			if ($built_str eq $expected) {
			    --$count_ok;
			}
			if (!--$count_finish) {
			    $acceptor->weak(1);
			}
		    }
		   );
		$client->shutdown();
	    }
	}
       );
    $p->start(
	sub {
	    do {
		CommonStream::concurrent_sub(
                    sub {
                        my ($target, $source) = @_;
                        UniClient::connect_local(
                            $target, $source,
                            sub {
                                my $sock = $_[0];
                                print $sock getsockname($sock);
                            }
                           );
                    },
                    sock_path($acceptor),
                    PETest::var("client-$count")
                   );
	    } while (--$count);
	    $count = CONN_NUM;
	    $p->stop();
	}
       );
    $l->run();
    # diag "That's o'kay";
    return $count_ok;
}

sub test_serv_rw_shuffling {
    my $count = CONN_NUM;
    my $count_finish = CONN_NUM;
    my $count_ok = CONN_NUM;
    my %clients;
    $acceptor->weak(0);
    $acceptor->connection_callback(
	sub {
	    my ($acceptor, $err) = @_;
	    my $built_str;
	    if ($count) {
		my $client = (blessed $acceptor)->new();
		$acceptor->accept($client);
		$clients{$client} = { self => $client, expected => $count };
		$client->read_callback(
		    sub {
			my ($c, $str, $err) = @_;
			$built_str .= $str;
		    }
		   );
		$client->eof_callback(
		    sub {
			my $cr = delete $clients{$_[0]};
			my $expected = $cr->{expected};
			if ($built_str eq $expected) {
			    --$count_ok;
			}
			if (!--$count_finish) {
			    $acceptor->weak(1);
			}
		    }
		   );
		$client->write($count);
		$client->shutdown();
		$count--;
	    }
	}
       );
    $p->start(
	sub {
	    do {
		CommonStream::concurrent_sub(\&connect_echo_local, sock_path($acceptor));
	    } while (--$count);
	    $count = CONN_NUM;
	    $p->stop();
	}
       );
    $l->run();
    return $count_ok;
}

sub test_self_pleasing {
    my @connect_params = @_;
    my ($c_wcb_ok, $s_wcb_ok);
    my ($c_scb_ok, $s_scb_ok);
    my $ss = (blessed $acceptor)->new();
    my $cs = (blessed $acceptor)->new();
    my ($ct, $st) = (rand, rand);
    my ($cr, $sr);
    my ($s_eof, $c_eof);
    $ss->read_callback(
        sub {
            $sr .= $_[1];
        });
    $ss->eof_callback(
        sub {
            $_[0]->write($st);
            $_[0]->shutdown();
            # diag "Server EOF";
            $s_eof = 1;
            $l->stop() if ($c_eof && $s_eof);
        });
    $ss->write_callback(
        sub {
            # diag "Server wrote";
            $s_wcb_ok = 1;
        });
    $ss->shutdown_callback(
        sub {
            $s_scb_ok = 1;
        });
    $cs->read_callback(
        sub {
            # diag "Client read: data='$_[1]'";
            $cr .= $_[1];
        });
    $cs->eof_callback(
        sub {
            # diag "Client EOF";
            $c_eof = 1;
            $l->stop() if ($c_eof && $s_eof);
        });
    $cs->write_callback(
        sub {
            # diag "Client wrote";
            $c_wcb_ok = 1;
        });
    $cs->shutdown_callback(
        sub {
            $_[0]->want_read(1);
            $c_scb_ok = 1;
        });
    CommonStream::to_listener(
        $acceptor, $ss, sub {
        });
    $cs->connect(
        @connect_params,
        sub {
            $_[0]->want_read(0);
            $_[0]->write($ct);
            $_[0]->shutdown();
        });
    eval { $l->run(); 1 } or diag "Error: $@";
    # diag "Action finished";
    ok($c_wcb_ok);
    ok($s_wcb_ok);
    ok($c_scb_ok);
    ok($s_scb_ok);
    is($cr, $st, "Client got right data");
    is($sr, $ct, "Server got right data");
}

test_self_pleasing(sock_path($acceptor));
# use Data::Dumper;
# diag "XYU";
# map {
#     my ($err, $h, $s) = Socket::getnameinfo($_->{addr}, Socket::NI_NUMERICHOST | Socket::NI_NUMERICSERV);
#     $_->{addr} = {host => $h, port => $s};
# } @gais;
# diag Dumper($gais[0]);

done_testing();