use
Socket
qw(IPPROTO_TCP SOCK_STREAM TCP_NODELAY)
;
? 0
my
$NDN
;
use
constant
SOCKS
=>
$ENV
{MOJO_NO_SOCKS}
? 0
use
constant
READ
=> SOCKS ? IO::Socket::Socks::SOCKS_WANT_READ() : 0;
use
constant
WRITE
=> SOCKS ? IO::Socket::Socks::SOCKS_WANT_WRITE() : 0;
has
reactor
=>
sub
{ Mojo::IOLoop->singleton->reactor };
sub
DESTROY {
shift
->_cleanup }
sub
can_nnr {NNR}
sub
can_socks {SOCKS}
sub
connect
{
my
(
$self
,
$args
) = (
shift
,
ref
$_
[0] ?
$_
[0] : {
@_
});
weaken
$self
;
my
$reactor
=
$self
->reactor;
$self
->{timer} =
$reactor
->timer(
$args
->{timeout} || 10,
sub
{
$self
->emit(
error
=>
'Connect timeout'
) });
$_
&& s/[[\]]//g
for
@$args
{
qw(address socks_address)
};
my
$address
=
$args
->{socks_address} || (
$args
->{address} ||=
'127.0.0.1'
);
return
$reactor
->next_tick(
sub
{
$self
&&
$self
->_connect(
$args
) })
if
!NNR ||
$args
->{handle} ||
$args
->{path};
$NDN
//= Net::DNS::Native->new(
pool
=> 5,
extra_thread
=> 1);
my
$handle
=
$self
->{dns} =
$NDN
->getaddrinfo(
$address
, _port(
$args
),
{
protocol
=> IPPROTO_TCP,
socktype
=> SOCK_STREAM});
$reactor
->io(
$handle
=>
sub
{
my
$reactor
=
shift
;
$reactor
->remove(
$self
->{dns});
my
(
$err
,
@res
) =
$NDN
->get_result(
delete
$self
->{dns});
return
$self
->emit(
error
=>
"Can't resolve: $err"
)
if
$err
;
$args
->{addr_info} = \
@res
;
$self
->_connect(
$args
);
}
)->watch(
$handle
, 1, 0);
}
sub
_cleanup {
my
$self
=
shift
;
$NDN
->timedout(
$self
->{dns})
if
$NDN
&&
$self
->{dns};
return
unless
my
$reactor
=
$self
->reactor;
$self
->{
$_
} &&
$reactor
->remove(
delete
$self
->{
$_
})
for
qw(dns timer handle)
;
return
$self
;
}
sub
_connect {
my
(
$self
,
$args
) =
@_
;
my
$path
=
$args
->{path};
my
$handle
=
$self
->{handle} =
$args
->{handle};
unless
(
$handle
) {
my
$class
=
$path
?
'IO::Socket::UNIX'
:
'IO::Socket::IP'
;
my
%options
= (
Blocking
=> 0);
if
(
$path
) {
$options
{Peer} =
$path
}
else
{
if
(
my
$info
=
$args
->{addr_info}) {
$options
{PeerAddrInfo} =
$info
}
else
{
$options
{PeerAddr} =
$args
->{socks_address} ||
$args
->{address};
$options
{PeerPort} = _port(
$args
);
}
$options
{LocalAddr} =
$args
->{local_address}
if
$args
->{local_address};
}
return
$self
->emit(
error
=>
"Can't connect: $@"
)
unless
$self
->{handle} =
$handle
=
$class
->new(
%options
);
}
$handle
->blocking(0);
$path
?
$self
->_try_socks(
$args
) :
$self
->_wait(
'_ready'
,
$handle
,
$args
);
}
sub
_port {
$_
[0]{socks_port} ||
$_
[0]{port} || (
$_
[0]{tls} ? 443 : 80) }
sub
_ready {
my
(
$self
,
$args
) =
@_
;
my
$handle
=
$self
->{handle};
unless
(
$handle
->
connect
) {
return
$self
->emit(
error
=> $!)
unless
$! == EINPROGRESS;
$self
->reactor->remove(
$handle
);
return
$self
->_wait(
'_ready'
,
$handle
,
$args
);
}
return
$self
->emit(
error
=> $! ||
'Not connected'
)
unless
$handle
->connected;
setsockopt
$handle
, IPPROTO_TCP, TCP_NODELAY, 1;
$self
->_try_socks(
$args
);
}
sub
_socks {
my
(
$self
,
$args
) =
@_
;
my
$handle
=
$self
->{handle};
return
$self
->_try_tls(
$args
)
if
$handle
->ready;
my
$err
=
$IO::Socket::Socks::SOCKS_ERROR
;
if
(
$err
== READ) {
$self
->reactor->watch(
$handle
, 1, 0) }
elsif
(
$err
== WRITE) {
$self
->reactor->watch(
$handle
, 1, 1) }
else
{
$self
->emit(
error
=>
$err
) }
}
sub
_try_socks {
my
(
$self
,
$args
) =
@_
;
my
$handle
=
$self
->{handle};
return
$self
->_try_tls(
$args
)
unless
$args
->{socks_address};
return
$self
->emit(
error
=>
'IO::Socket::Socks 0.64+ required for SOCKS support'
)
unless
SOCKS;
my
%options
= (
ConnectAddr
=>
$args
->{address},
ConnectPort
=>
$args
->{port});
@options
{
qw(AuthType Username Password)
}
= (
'userpass'
,
@$args
{
qw(socks_user socks_pass)
})
if
$args
->{socks_user};
my
$reactor
=
$self
->reactor;
$reactor
->remove(
$handle
);
return
$self
->emit(
error
=>
'SOCKS upgrade failed'
)
unless
IO::Socket::Socks->start_SOCKS(
$handle
,
%options
);
$self
->_wait(
'_socks'
,
$handle
,
$args
);
}
sub
_try_tls {
my
(
$self
,
$args
) =
@_
;
my
$handle
=
$self
->{handle};
return
$self
->_cleanup->emit(
connect
=>
$handle
)
unless
$args
->{tls};
my
$reactor
=
$self
->reactor;
$reactor
->remove(
$handle
);
weaken
$self
;
my
$tls
= Mojo::IOLoop::TLS->new(
$handle
)->reactor(
$self
->reactor);
$tls
->on(
upgrade
=>
sub
{
$self
->_cleanup->emit(
connect
=>
pop
) });
$tls
->on(
error
=>
sub
{
$self
->emit(
error
=>
pop
) });
$tls
->negotiate(
%$args
);
}
sub
_wait {
my
(
$self
,
$next
,
$handle
,
$args
) =
@_
;
weaken
$self
;
$self
->reactor->io(
$handle
=>
sub
{
$self
->
$next
(
$args
) })
->watch(
$handle
, 0, 1);
}
1;