use
Fcntl
qw[/SEEK_/ /O_/ :flock]
;
our
$VERSION
=
"1.0.2"
;
my
$block_size
= 2**14;
my
$FILE
= Type::Tiny->new(
name
=>
'File'
,
parent
=> Str,
constraint
=>
sub
{ -f
$_
},
message
=>
sub
{
"$_ isn't an existing file"
},
);
my
$RESERVED
= Type::Tiny->new(
name
=>
'Reserved'
,
parent
=> Str,
constraint
=>
sub
{
length
$_
== 8 },
message
=>
sub
{
'reserved data is malformed'
}
);
my
$PEERID
= Type::Tiny->new(
name
=>
'PeerID'
,
parent
=> Str,
constraint
=>
sub
{
length
$_
== 20 },
message
=>
sub
{
'Peer ID must be 20 chars in length'
;
}
);
my
$INFOHASH
= Type::Tiny->new(
name
=>
'Infohash'
,
parent
=> Str,
constraint
=>
sub
{
length
$_
== 20 },
message
=>
sub
{
'Infohashes are 20 bytes in length'
;
}
);
has
port
=> (
is
=>
'ro'
,
isa
=> Int,
default
=>
sub
{0},
writer
=>
'_set_port'
);
has
socket
=> (
is
=>
'ro'
,
isa
=> Ref,
init_arg
=>
undef
,
predicate
=>
'_has_socket'
,
builder
=>
'_build_socket'
);
sub
_build_socket {
my
$s
=
shift
;
tcp_server
undef
,
$s
->port,
sub
{
my
(
$fh
,
$host
,
$port
) =
@_
;
AE::
log
info
=>
'Accepted connection from %s:%d'
,
$host
,
$port
;
return
$fh
->destroy
if
$s
->state eq
'stopped'
;
my
$handle
= AnyEvent::Handle->new(
fh
=>
$fh
,
on_error
=>
sub
{
my
(
$hdl
,
$fatal
,
$msg
) =
@_
;
AE::
log
error
=>
'Socket error: '
.
$msg
;
$s
->_del_peer(
$hdl
);
},
on_eof
=>
sub
{
my
$h
=
shift
;
AE::
log
info
=>
'Socket EOF'
;
$s
->_del_peer(
$h
);
},
on_read
=>
sub
{
AE::
log
debug
=>
'Read Socket'
;
$s
->_on_read_incoming(
@_
);
}
);
$s
->_add_peer(
$handle
);
},
sub
{
my
(
$fh
,
$thishost
,
$thisport
) =
@_
;
$s
->_set_port(
$thisport
);
AE::
log
info
=>
"bound to $thishost, port $thisport"
;
};
}
has
path
=> (
is
=>
'ro'
,
isa
=>
$FILE
,
required
=> 1
);
has
reserved
=> (
is
=>
'ro'
,
builder
=>
'_build_reserved'
,
isa
=>
$RESERVED
);
sub
_build_reserved {
my
$reserved
=
"\0"
x 8;
vec
(
$reserved
, 7, 8) = 0x04;
AE::
log
debug
=>
'_build_reserved() => '
.
$reserved
;
$reserved
;
}
has
peerid
=> (
is
=>
'ro'
,
isa
=>
$PEERID
,
init_arg
=>
undef
,
required
=> 1,
builder
=>
'_build_peerid'
);
sub
_build_peerid {
return
pack
(
'a20'
,
(
sprintf
(
'-AB%01d%01d%01d%1s-%7s%-5s'
,
(
$VERSION
=~ m[^v?(\d+)\.(\d+)\.(\d+)]),
(
$VERSION
=~ m[[^\d\.^v]] ?
'U'
:
'S'
),
(
join
''
,
map
{
[
'A'
..
'Z'
,
'a'
..
'z'
, 0 .. 9,
qw[- . _ ~]
]->[
rand
(66)]
} 1 .. 7
),
[
qw[KaiLi April Aaron Sanko]
]->[
rand
4]
)
)
);
}
has
bitfield
=> (
is
=>
'ro'
,
lazy
=> 1,
builder
=>
'_build_bitfield'
,
isa
=> Str,
init_arg
=>
undef
,
);
sub
_build_bitfield {
pack
'b*'
,
"\0"
x
shift
->piece_count }
sub
wanted {
my
$s
=
shift
;
my
$wanted
=
''
;
for
my
$findex
(0 .. $
my
$prio
= !!
$s
->files->[
$findex
]{priority};
for
my
$index
(
$s
->_file_to_range(
$findex
)) {
vec
(
$wanted
,
$index
, 1) =
$prio
&& !
vec
(
$s
->bitfield,
$index
, 1);
}
}
AE::
log
debug
=>
'->wanted() => '
.
unpack
'b*'
,
$wanted
;
$wanted
;
}
sub
complete {
my
$s
=
shift
;
-1 ==
index
substr
(
unpack
(
'b*'
,
$s
->wanted), 0,
$s
->piece_count + 1), 1;
}
sub
seed {
my
$s
=
shift
;
-1 ==
index
substr
(
unpack
(
'b*'
,
$s
->bitfield), 0,
$s
->piece_count + 1), 0;
}
sub
_left {
my
$s
=
shift
;
$s
->piece_length *
scalar
grep
{
$_
}
split
''
,
substr
unpack
(
'b*'
,
$s
->wanted), 0,
$s
->piece_count + 1;
}
has
$_
=> (
is
=>
'ro'
,
isa
=> Int,
default
=>
sub
{0},
writer
=>
'_set_'
.
$_
)
for
qw[uploaded downloaded]
;
has
infohash
=> (
is
=>
'ro'
,
lazy
=> 1,
builder
=>
'_build_infohash'
,
isa
=>
$INFOHASH
,
init_arg
=>
undef
);
sub
_build_infohash { sha1(bencode(
shift
->metadata->{info})) }
has
metadata
=> (
is
=>
'ro'
,
lazy_build
=> 1,
builder
=>
'_build_metadata'
,
lazy
=> 1,
isa
=> HashRef,
init_arg
=>
undef
);
sub
_build_metadata {
my
$s
=
shift
;
open
my
$fh
,
'<'
,
$s
->path;
sysread
$fh
,
my
$raw
, -s
$fh
;
my
$metadata
= bdecode
$raw
;
AE::
log
debug
=>
sub
{
'_build_metadata() => '
. Data::Dump::
dump
(
$metadata
);
};
$metadata
;
}
sub
name {
shift
->metadata->{info}{name} }
sub
pieces {
shift
->metadata->{info}{pieces} }
sub
piece_length {
shift
->metadata->{info}{
'piece length'
} }
sub
piece_count {
my
$s
=
shift
;
my
$count
=
$s
->size /
$s
->piece_length;
int
(
$count
) + ((
$count
==
int
$count
) ? 1 : 0);
}
has
basedir
=> (
is
=>
'ro'
,
lazy
=> 1,
isa
=> Str,
required
=> 1,
default
=>
sub
{ File::Spec->rel2abs(File::Spec->curdir) },
trigger
=>
sub
{
my
(
$s
,
$n
,
$o
) =
@_
;
$o
//
return
;
$s
->_clear_files;
}
);
has
files
=> (
is
=>
'ro'
,
lazy
=> 1,
builder
=>
'_build_files'
,
isa
=> ArrayRef [HashRef],
init_arg
=>
undef
,
clearer
=>
'_clear_files'
);
sub
_build_files {
my
$s
=
shift
;
defined
$s
->metadata->{info}{files} ?
[
map
{
{
priority
=> 1,
fh
=>
undef
,
mode
=>
'c'
,
length
=>
$_
->{
length
},
timeout
=>
undef
,
path
=>
File::Spec->rel2abs(
File::Spec->catfile(
$s
->basedir,
$s
->name, @{
$_
->{path}})
)
}
} @{
$s
->metadata->{info}{files}}
]
: [
{
priority
=> 1,
fh
=>
undef
,
mode
=>
'c'
,
length
=>
$s
->metadata->{info}{
length
},
timeout
=>
undef
,
path
=>
File::Spec->rel2abs(File::Spec->catfile(
$s
->basedir,
$s
->name))
}
];
}
has
size
=> (
is
=>
'ro'
,
lazy
=> 1,
builder
=>
'_build_size'
,
isa
=> Int,
init_arg
=>
undef
);
sub
_build_size {
my
$s
=
shift
;
my
$ret
= 0;
$ret
+=
$_
->{
length
}
for
@{
$s
->files};
AE::
log
debug
=>
'_build_size() => '
.
$ret
;
$ret
;
}
sub
_open {
my
(
$s
,
$i
,
$m
) =
@_
;
AE::
log
trace
=>
'Opening file #%d (%s) for %s'
,
$i
,
$s
->files->[
$i
]->{path},
$m
;
return
1
if
$s
->files->[
$i
]->{mode} eq
$m
;
if
(
defined
$s
->files->[
$i
]->{fh}) {
AE::
log
trace
=>
'Closing %s'
,
$s
->files->[
$i
]->{fh};
flock
$s
->files->[
$i
]->{fh}, LOCK_UN;
close
$s
->files->[
$i
]->{fh};
$s
->files->[
$i
]->{fh} = ();
}
if
(
$m
eq
'r'
) {
AE::
log
trace
=>
'Opening %s to read'
,
$s
->files->[
$i
]->{path};
sysopen
(
$s
->files->[
$i
]->{fh},
$s
->files->[
$i
]->{path}, O_RDONLY)
||
return
;
flock
(
$s
->files->[
$i
]->{fh}, LOCK_SH) ||
return
;
weaken
$s
unless
isweak
$s
;
my
$x
=
$i
;
$s
->files->[
$x
]->{timeout}
= AE::timer(500, 0,
sub
{
$s
//
return
;
$s
->_open(
$x
,
'c'
) });
}
elsif
(
$m
eq
'w'
) {
AE::
log
trace
=>
'Opening %s to write'
,
$s
->files->[
$i
]->{path};
my
@split
= File::Spec->splitdir(
$s
->files->[
$i
]->{path});
pop
@split
;
my
$dir
= File::Spec->catdir(
@split
);
File::Path::mkpath(
$dir
)
if
!-d
$dir
;
sysopen
(
$s
->files->[
$i
]->{fh},
$s
->files->[
$i
]->{path},
O_WRONLY | O_CREAT)
||
return
;
flock
$s
->files->[
$i
]->{fh}, LOCK_EX;
truncate
$s
->files->[
$i
]->{fh},
$s
->files->[
$i
]->{
length
}
if
-s
$s
->files->[
$i
]->{fh}
!=
$s
->files->[
$i
]->{
length
};
weaken
$s
unless
isweak
$s
;
my
$x
=
$i
;
$s
->files->[
$x
]->{timeout}
= AE::timer(60, 0,
sub
{
$s
//
return
;
$s
->_open(
$x
,
'c'
) });
}
elsif
(
$m
eq
'c'
) {
$s
->files->[
$i
]->{timeout} = () }
else
{
return
}
return
$s
->files->[
$i
]->{mode} =
$m
;
}
has
piece_cache
=> (
is
=>
'ro'
,
isa
=> HashRef,
default
=>
sub
{ {} });
sub
_cache_path {
my
$s
=
shift
;
File::Spec->catfile(
$s
->basedir,
(
scalar
@{
$s
->files} == 1 ? () :
$s
->name),
'~ABPartFile_-'
.
uc
(
substr
(
unpack
(
'H*'
,
$s
->infohash), 0, 10))
.
'.dat'
);
}
sub
_write_cache {
my
(
$s
,
$f
,
$o
,
$d
) =
@_
;
my
$path
=
$s
->_cache_path;
AE::
log
debug
=>
'Attempting to store %d bytes to cache file (%s) [$f=%s, $o=%s]'
,
length
(
$d
),
$path
,
$f
,
$o
;
my
@split
= File::Spec->splitdir(
$path
);
pop
@split
;
my
$dir
= File::Spec->catdir(
@split
);
File::Path::mkpath(
$dir
)
if
!-d
$dir
;
sysopen
(
my
(
$fh
),
$path
, O_WRONLY | O_CREAT)
||
return
;
flock
$fh
, LOCK_EX;
my
$pos
=
sysseek
$fh
, 0, SEEK_CUR;
my
$w
=
syswrite
$fh
,
$d
;
flock
$fh
, LOCK_UN;
close
$fh
;
$s
->piece_cache->{
$f
}{
$o
} =
$pos
;
AE::
log
debug
=>
'Wrote %d bytes to cache file'
,
$w
;
return
$w
;
}
sub
_read_cache {
my
(
$s
,
$f
,
$o
,
$l
) =
@_
;
$s
->piece_cache->{
$f
} //
return
;
$s
->piece_cache->{
$f
}{
$o
} //
return
;
my
$path
=
$s
->_cache_path;
AE::
log
debug
=>
'Attempting to read %d bytes from cache file (%s) [$f=%s, $o=%s]'
,
$l
,
$path
,
$f
,
$o
;
sysopen
(
my
(
$fh
),
$path
, O_RDONLY)
||
return
;
flock
$fh
, LOCK_SH;
sysseek
$fh
,
$s
->piece_cache->{
$f
}{
$o
}, SEEK_SET;
my
$w
=
sysread
$fh
,
my
(
$d
),
$l
;
flock
$fh
, LOCK_UN;
close
$fh
;
return
$d
;
}
sub
_read {
my
(
$s
,
$index
,
$offset
,
$length
) =
@_
;
AE::
log
debug
=>
'Attempting to read %d bytes from piece %d starting at %d bytes'
,
$length
,
$index
,
$offset
;
my
$data
=
''
;
my
$file_index
= 0;
my
$total_offset
= (
$index
*
$s
->piece_length) +
$offset
;
SEARCH:
while
(
$total_offset
>
$s
->files->[
$file_index
]->{
length
}) {
$total_offset
-=
$s
->files->[
$file_index
]->{
length
};
$file_index
++;
AE::
log
trace
=>
'Searching for location... $total_offset = %d, $file_index = %d'
,
$total_offset
,
$file_index
;
last
SEARCH
if
not
defined
$s
->files->[
$file_index
]->{
length
};
}
READ:
while
((
defined
$length
) && (
$length
> 0)) {
my
$this_read
= (
(
$total_offset
+
$length
) >=
$s
->files->[
$file_index
]->{
length
})
?
(
$s
->files->[
$file_index
]->{
length
} -
$total_offset
)
:
$length
;
AE::
log
trace
=>
'Attempting to read %d bytes from file #%d (%s), starting at %d'
,
$this_read
,
$file_index
,
$s
->files->[
$file_index
]->{path},
$total_offset
;
if
( (!-f
$s
->files->[
$file_index
]->{path})
|| (!
$s
->_open(
$file_index
,
'r'
)))
{
$data
.=
$s
->_read_cache(
$file_index
,
$total_offset
,
$this_read
)
// (
"\0"
x
$this_read
);
AE::
log
note
=>
'Failed to open file. Using null chars instead.'
;
}
else
{
sysseek
$s
->files->[
$file_index
]->{fh},
$total_offset
, SEEK_SET;
sysread
$s
->files->[
$file_index
]->{fh},
my
(
$_data
),
$this_read
;
$data
.=
$_data
if
$_data
;
AE::
log
trace
=>
'Read %d bytes of data from file (%d bytes collected so far)'
,
length
$_data
,
length
$data
;
weaken
$s
unless
isweak
$s
;
my
$x
=
$file_index
;
$s
->files->[
$x
]->{timeout}
= AE::timer(500, 0,
sub
{
$s
//
return
;
$s
->_open(
$x
,
'c'
) });
}
$file_index
++;
$length
-=
$this_read
;
AE::
log
trace
=>
'Still need to read %d bytes'
,
$length
;
last
READ
if
not
defined
$s
->files->[
$file_index
];
$total_offset
= 0;
}
AE::
log
trace
=>
'Returning %d bytes of data'
,
length
$data
;
return
$data
;
}
sub
_write {
my
(
$s
,
$index
,
$offset
,
$data
) =
@_
;
AE::
log
debug
=>
'Attempting to write %d bytes from piece %d starting at %d bytes'
,
length
(
$data
),
$index
,
$offset
;
my
$file_index
= 0;
my
$total_offset
=
int
((
$index
*
$s
->piece_length) + (
$offset
|| 0));
AE::
log
debug
=>
'...calculated offset == %d'
,
$total_offset
;
SEARCH:
while
(
$total_offset
>
$s
->files->[
$file_index
]->{
length
}) {
$total_offset
-=
$s
->files->[
$file_index
]->{
length
};
$file_index
++;
AE::
log
trace
=>
'Searching for location... $total_offset = %d, $file_index = %d'
,
$total_offset
,
$file_index
;
last
SEARCH
if
not
defined
$s
->files->[
$file_index
]->{
length
};
}
WRITE:
while
((
defined
$data
) && (
length
$data
> 0)) {
my
$this_write
= ((
$total_offset
+
length
$data
)
>=
$s
->files->[
$file_index
]->{
length
})
?
(
$s
->files->[
$file_index
]->{
length
} -
$total_offset
)
:
length
$data
;
AE::
log
trace
=>
'Attempting to write %d bytes from file #%d (%s), starting at %d'
,
$this_write
,
$file_index
,
$s
->files->[
$file_index
]->{path},
$total_offset
;
if
(
$s
->files->[
$file_index
]->{priority} == 0) {
$s
->_write_cache(
$file_index
,
$total_offset
,
substr
$data
, 0,
$this_write
,
''
);
AE::
log
trace
=>
'Wrote data to cache...'
;
}
else
{
$s
->_open(
$file_index
,
'w'
);
sysseek
$s
->files->[
$file_index
]->{fh},
$total_offset
, SEEK_SET;
my
$w
=
syswrite
$s
->files->[
$file_index
]->{fh},
substr
$data
, 0,
$this_write
,
''
;
AE::
log
trace
=>
'Wrote %d bytes of data to file (%d bytes left)'
,
$w
,
length
$data
;
weaken
$s
unless
isweak
$s
;
my
$x
=
$file_index
;
$s
->files->[
$x
]->{timeout}
= AE::timer(120, 0,
sub
{
$s
//
return
;
$s
->_open(
$x
,
'c'
) });
}
$file_index
++;
last
WRITE
if
not
defined
$s
->files->[
$file_index
];
$total_offset
= 0;
}
return
length
$data
;
}
sub
hashcheck (;@) {
my
$s
=
shift
;
my
@indexes
=
@_
?
@_
: (0 ..
$s
->piece_count);
AE::
log
trace
=>
sub
{
'Hashcheck of : '
. Data::Dump::
dump
(\
@indexes
);
};
$s
->bitfield;
my
$total_size
=
$s
->size;
for
my
$index
(
@indexes
) {
next
if
$index
< 0 ||
$index
>
$s
->piece_count;
my
$piece
=
$s
->_read(
$index
,
0,
$index
==
$s
->piece_count
?
$total_size
%
$s
->piece_length
:
$s
->piece_length
);
my
$expected
=
substr
(
$s
->pieces,
$index
* 20, 20);
my
$reality
= sha1(
$piece
);
my
$ok
=
defined
(
$piece
)
&& (
$expected
eq
$reality
);
vec
(
$s
->{bitfield},
$index
, 1) =
$ok
;
AE::
log
trace
=>
sub
{
"Validate piece #%06d %s, Expected: %s\n"
.
" Reality: %s"
,
$index
, (
$ok
?
'PASS'
:
'FAIL'
),
unpack
(
'H*'
,
$expected
),
unpack
(
'H*'
,
$reality
);
};
$ok
?
$s
->_trigger_hash_pass(
$index
)
:
$s
->_trigger_hash_fail(
$index
);
}
}
has
peers
=> (
is
=>
'ro'
,
lazy
=> 1,
isa
=> HashRef,
clearer
=>
'_clear_peers'
,
builder
=>
'_build_peers'
);
sub
_build_peers { {} }
sub
_add_peer {
my
(
$s
,
$h
) =
@_
;
$s
->{peers}{+
$h
} = {
handle
=>
$h
,
peerid
=>
''
,
bitfield
=> (
pack
'b*'
,
"\0"
x
$s
->piece_count),
remote_choked
=> 1,
remote_interested
=> 0,
remote_requests
=> [],
local_choked
=> 1,
local_interested
=> 0,
local_requests
=> [],
timeout
=> AE::timer(20, 0,
sub
{
$s
->_del_peer(
$h
) }),
keepalive
=> AE::timer(
30, 120,
sub
{
$s
->_send_encrypted(
$h
, build_keepalive());
}
),
local_allowed
=> [],
remote_allowed
=> [],
local_suggest
=> [],
remote_suggest
=> [],
encryption
=>
'?'
};
}
sub
_del_peer {
my
(
$s
,
$h
) =
@_
;
$s
->peers->{
$h
} //
return
;
for
my
$req
(@{
$s
->peers->{
$h
}{local_requests}}) {
my
(
$i
,
$o
,
$l
) =
@$req
;
$s
->working_pieces->{
$i
}{
$o
}[3] = ();
}
delete
$s
->peers->{
$h
};
$h
->destroy;
}
my
$shuffle
;
has
trackers
=> (
is
=>
'ro'
,
lazy
=> 1,
builder
=>
'_build_trackers'
,
isa
=> ArrayRef [HashRef],
init_arg
=>
undef
);
sub
_build_trackers {
my
$s
=
shift
;
$shuffle
//=
sub
{
my
$deck
=
shift
;
return
unless
@$deck
;
my
$i
=
@$deck
;
while
(--
$i
) {
my
$j
=
int
rand
(
$i
+ 1);
@$deck
[
$i
,
$j
] =
@$deck
[
$j
,
$i
];
}
};
my
$trackers
= [
map
{
{
urls
=>
$_
,
complete
=> 0,
incomplete
=> 0,
peers
=>
''
,
peers6
=>
''
,
announcer
=>
undef
,
ticker
=> AE::timer(
1,
15 * 60,
sub
{
return
if
$s
->state eq
'stopped'
;
$s
->announce(
'started'
);
}
),
failures
=> 0
}
}
defined
$s
->metadata->{announce} ? [
$s
->metadata->{announce}]
: (),
defined
$s
->metadata->{
'announce-list'
}
? @{
$s
->metadata->{
'announce-list'
}}
: ()
];
AE::
log
trace
=>
sub
{
'$trackers before shuffle => '
. Data::Dump::
dump
(
$trackers
);
};
$shuffle
->(
$trackers
);
$shuffle
->(
$_
->{urls})
for
@$trackers
;
AE::
log
trace
=>
sub
{
'$trackers after shuffle => '
. Data::Dump::
dump
(
$trackers
);
};
$trackers
;
}
sub
announce {
my
(
$s
,
$e
) =
@_
;
return
if
$a
++ > 10;
for
my
$tier
(@{
$s
->trackers}) {
$tier
->{announcer} //=
$s
->_announce_tier(
$e
,
$tier
);
}
}
sub
_announce_tier {
my
(
$s
,
$e
,
$tier
) =
@_
;
my
@urls
=
grep
{m[^https?://]} @{
$tier
->{urls}};
return
if
$tier
->{failures} > 5;
return
if
$
return
if
$tier
->{urls}[0] !~ m[^https?://.+];
local
$AnyEvent::HTTP::USERAGENT
=
'AnyEvent::BitTorrent/'
.
$AnyEvent::BitTorrent::VERSION
;
my
$_url
=
$tier
->{urls}[0] .
'?info_hash='
.
sub
{
local
$_
=
shift
;
s/([^A-Za-z0-9])/
sprintf
(
"%%%2.2X"
,
ord
($1))/ge;
$_
;
}
->(
$s
->infohash)
. (
'&peer_id='
.
$s
->peerid)
. (
'&uploaded='
.
$s
->uploaded)
. (
'&downloaded='
.
$s
->downloaded)
. (
'&left='
.
$s
->_left)
. (
'&port='
.
$s
->port)
.
'&compact=1'
. (
$e
?
'&event='
.
$e
:
''
);
AE::
log
debug
=>
'Announce URL: '
.
$_url
;
http_get
$_url
,
sub
{
my
(
$body
,
$hdr
) =
@_
;
AE::
log
trace
=>
sub
{
'Announce response: '
. Data::Dump::
dump
(
$body
,
$hdr
);
};
$tier
->{announcer} = ();
if
(
$hdr
->{Status} =~ /^2/) {
my
$reply
= bdecode(
$body
);
if
(
defined
$reply
->{
'failure reason'
}) {
push
@{
$tier
->{urls}},
shift
@{
$tier
->{urls}};
$s
->_announce_tier(
$e
,
$tier
);
$tier
->{
'failure reason'
} =
$reply
->{
'failure reason'
};
$tier
->{failures}++;
}
else
{
$tier
->{failures} =
$tier
->{
'failure reason'
} = 0;
$tier
->{peers}
= compact_ipv4(
uncompact_ipv4(
$tier
->{peers} .
$reply
->{peers}))
if
$reply
->{peers};
$tier
->{peers6}
= compact_ipv6(
uncompact_ipv6(
$tier
->{peers6} .
$reply
->{peers6}))
if
$reply
->{peers6};
$tier
->{complete} =
$reply
->{complete};
$tier
->{incomplete} =
$reply
->{incomplete};
$tier
->{ticker} = AE::timer(
$reply
->{interval} // (15 * 60),
$reply
->{interval} // (15 * 60),
sub
{
return
if
$s
->state eq
'stopped'
;
$s
->_announce_tier(
$e
,
$tier
);
}
);
}
}
else
{
$tier
->{
'failure reason'
}
=
"HTTP Error: $hdr->{Status} $hdr->{Reason}\n"
;
$tier
->{failures}++;
push
@{
$tier
->{urls}},
shift
@{
$tier
->{urls}};
$s
->_announce_tier(
$e
,
$tier
);
}
}
}
has
_choke_timer
=> (
is
=>
'bare'
,
isa
=> Ref,
init_arg
=>
undef
,
required
=> 1,
default
=>
sub
{
my
$s
=
shift
;
AE::timer(
15, 45,
sub
{
return
if
$s
->state ne
'active'
;
AE::
log
trace
=>
'Choke timer...'
;
my
@interested
=
grep
{
$_
->{remote_interested} &&
$_
->{remote_choked} }
values
%{
$s
->peers};
for
my
$p
(
@interested
) {
$p
->{remote_choked} = 0;
$s
->_send_encrypted(
$p
->{handle}, build_unchoke());
AE::
log
trace
=>
'Choked %s'
,
$p
->{peerid};
}
}
);
}
);
has
_fill_requests_timer
=> (
is
=>
'bare'
,
isa
=> Ref,
init_arg
=>
undef
,
required
=> 1,
default
=>
sub
{
my
$s
=
shift
;
AE::timer(
15, 10,
sub
{
return
if
$s
->state ne
'active'
;
AE::
log
trace
=>
'Request fill timer...'
;
my
@waiting
=
grep
{
defined
&&
scalar
@{
$_
->{remote_requests}} }
values
%{
$s
->peers};
return
if
!
@waiting
;
my
$total_sent
= 0;
while
(
@waiting
&&
$total_sent
< 2**20) {
my
$p
=
splice
(
@waiting
,
rand
@waiting
, 1, ());
AE::
log
trace
=>
'Chosen peer: %s...'
,
$p
->{peerid};
while
(
$total_sent
< 2**20 && @{
$p
->{remote_requests}}) {
my
$req
=
shift
@{
$p
->{remote_requests}};
AE::
log
trace
=>
'Filling request i:%d, o:%d, l:%d for %s'
,
@$req
;
$s
->_send_encrypted(
$p
->{handle},
build_piece(
$req
->[0],
$req
->[1],
$s
->_read(
$req
->[0],
$req
->[1],
$req
->[2])
)
);
$total_sent
+=
$req
->[2];
}
}
$s
->_set_uploaded(
$s
->uploaded +
$total_sent
);
}
);
}
);
has
_peer_timer
=> (
is
=>
'ro'
,
lazy
=> 1,
isa
=> Ref,
init_arg
=>
undef
,
clearer
=>
'_clear_peer_timer'
,
builder
=>
'_build_peer_timer'
);
sub
_build_peer_timer {
my
$s
=
shift
;
AE::timer(
1, 15,
sub
{
return
if
!
$s
->_left;
AE::
log
trace
=>
'Attempting to connect to new peer...'
;
my
@cache
=
map
{
$_
->{peers} ? uncompact_ipv4(
$_
->{peers}) : (),
$_
->{peers6} ?
uncompact_ipv6(
$_
->{peers6})
: ()
} @{
$s
->trackers};
return
if
!
@cache
;
for
my
$i
(1 ..
@cache
) {
last
if
$i
> 10;
last
if
scalar
(
keys
%{
$s
->peers}) > 100;
my
$addr
=
splice
@cache
,
rand
$#cache
, 1;
$s
->_new_peer(
$addr
);
}
}
);
}
sub
_new_peer {
my
(
$s
,
$addr
) =
@_
;
AE::
log
trace
=>
'Connecting to %s:%d'
,
@$addr
;
my
$handle
;
$handle
= AnyEvent::Handle->new(
connect
=>
$addr
,
on_prepare
=>
sub
{60},
on_error
=>
sub
{
my
(
$hdl
,
$fatal
,
$msg
) =
@_
;
AE::
log
error
=>
'Socket error: %s (Removing peer)'
,
$msg
;
$s
->_del_peer(
$hdl
);
},
on_connect_error
=>
sub
{
my
(
$hdl
,
$fatal
,
$msg
) =
@_
;
$s
->_del_peer(
$hdl
);
AE::
log
error
=>
sprintf
"%sfatal error (%s)\n"
,
$fatal
?
''
:
'non-'
,
$msg
//
'Connection timed out'
;
return
if
!
$fatal
;
},
on_connect
=>
sub
{
my
(
$h
,
$host
,
$port
,
$retry
) =
@_
;
AE::
log
trace
=>
'Connection established with %s:%d'
,
$host
,
$port
;
$s
->_add_peer(
$handle
);
$s
->_send_handshake(
$handle
);
},
on_eof
=>
sub
{
my
$h
=
shift
;
AE::
log
trace
=>
'EOF from peer'
;
$s
->_del_peer(
$h
);
},
on_read
=>
sub
{
$s
->_on_read(
@_
);
}
);
return
$handle
;
}
sub
_on_read_incoming {
my
(
$s
,
$h
) =
@_
;
$h
->rbuf //
return
;
my
$packet
= parse_packet(\
$h
->rbuf);
return
if
!
$packet
;
AE::
log
trace
=>
sub
{
'Incoming packet: '
. Data::Dump::
dump
(
$packet
);
};
if
(
defined
$packet
->{error}) {
return
$s
->_del_peer(
$h
);
}
elsif
(
$packet
->{type} ==
$HANDSHAKE
) {
ref
$packet
->{payload} //
return
;
return
if
ref
$packet
->{payload} ne
'ARRAY'
;
$s
->peers->{
$h
}{reserved} =
$packet
->{payload}[0];
return
$s
->_del_peer(
$h
)
if
$packet
->{payload}[1] ne
$s
->infohash;
$s
->peers->{
$h
}{peerid} =
$packet
->{payload}[2];
$s
->_send_handshake(
$h
);
$s
->_send_bitfield(
$h
);
$s
->peers->{
$h
}{timeout}
= AE::timer(60, 0,
sub
{
$s
->_del_peer(
$h
) });
$s
->peers->{
$h
}{bitfield} =
pack
'b*'
, (0 x
$s
->piece_count);
$h
->on_read(
sub
{
$s
->_on_read(
@_
) });
}
else
{
}
1;
}
sub
_on_read {
my
(
$s
,
$h
) =
@_
;
while
(
my
$packet
= parse_packet(\
$h
->rbuf)) {
last
if
!
$packet
;
AE::
log
debug
=>
sub
{
'Incoming packet: '
. Data::Dump::
dump
(
$packet
->{error});
};
if
(
defined
$packet
->{error}) {
$s
->_del_peer(
$h
);
return
;
}
elsif
(
$packet
->{type} eq
$KEEPALIVE
) {
}
elsif
(
$packet
->{type} ==
$HANDSHAKE
) {
ref
$packet
->{payload} //
return
;
$s
->peers->{
$h
}{reserved} =
$packet
->{payload}[0];
return
$s
->_del_peer(
$h
)
if
$packet
->{payload}[1] ne
$s
->infohash;
$s
->peers->{
$h
}{peerid} =
$packet
->{payload}[2];
$s
->_send_bitfield(
$h
);
$s
->peers->{
$h
}{timeout}
= AE::timer(60, 0,
sub
{
$s
->_del_peer(
$h
) });
$s
->peers->{
$h
}{bitfield} =
pack
'b*'
, (0 x
$s
->piece_count);
}
elsif
(
$packet
->{type} ==
$INTERESTED
) {
$s
->peers->{
$h
}{remote_interested} = 1;
}
elsif
(
$packet
->{type} ==
$NOT_INTERESTED
) {
$s
->peers->{
$h
}{remote_interested} = 0;
}
elsif
(
$packet
->{type} ==
$CHOKE
) {
$s
->peers->{
$h
}{local_choked} = 1;
if
(!(
vec
(
$s
->peers->{
$h
}{reserved}, 7, 1) & 0x04)) {
for
my
$req
(@{
$s
->peers->{
$h
}{local_requests}}) {
$s
->working_pieces->{
$req
->[0]}{
$req
->[1]}[3] = ()
unless
defined
$s
->working_pieces->{
$req
->[0]}{
$req
->[1]}[4];
}
}
$s
->_consider_peer(
$s
->peers->{
$h
});
}
elsif
(
$packet
->{type} ==
$UNCHOKE
) {
$s
->peers->{
$h
}{local_choked} = 0;
$s
->peers->{
$h
}{timeout}
= AE::timer(120, 0,
sub
{
$s
->_del_peer(
$h
) });
$s
->_request_pieces(
$s
->peers->{
$h
});
}
elsif
(
$packet
->{type} ==
$HAVE
) {
vec
(
$s
->peers->{
$h
}{bitfield},
$packet
->{payload}, 1) = 1;
$s
->_consider_peer(
$s
->peers->{
$h
});
$s
->peers->{
$h
}{timeout}
= AE::timer(60, 0,
sub
{
$s
->_del_peer(
$h
) });
}
elsif
(
$packet
->{type} ==
$BITFIELD
) {
$s
->peers->{
$h
}{bitfield} =
$packet
->{payload};
$s
->_consider_peer(
$s
->peers->{
$h
});
}
elsif
(
$packet
->{type} ==
$REQUEST
) {
$s
->peers->{
$h
}{timeout}
= AE::timer(120, 0,
sub
{
$s
->_del_peer(
$h
) });
push
@{
$s
->peers->{
$h
}{remote_requests}},
$packet
->{payload};
}
elsif
(
$packet
->{type} ==
$PIECE
) {
$s
->peers->{
$h
}{timeout}
= AE::timer(120, 0,
sub
{
$s
->_del_peer(
$h
) });
my
(
$index
,
$offset
,
$data
) = @{
$packet
->{payload}};
$s
->working_pieces->{
$index
} //
return
;
return
if
!
grep
{
$_
->[0] ==
$index
&&
$_
->[1] ==
$offset
&&
$_
->[2] ==
length
$data
} @{
$s
->peers->{
$h
}{local_requests}};
$s
->peers->{
$h
}{local_requests} = [
grep
{
(
$_
->[0] !=
$index
)
|| (
$_
->[1] !=
$offset
)
|| (
$_
->[2] !=
length
(
$data
))
} @{
$s
->peers->{
$h
}{local_requests}}
];
$s
->working_pieces->{
$index
}{
$offset
}[4] =
$data
;
$s
->working_pieces->{
$index
}{
$offset
}[5] = ();
$s
->_set_downloaded(
$s
->downloaded +
length
$data
);
if
(0 ==
scalar
grep
{ !
defined
$_
->[4] }
values
%{
$s
->working_pieces->{
$index
}})
{
my
$piece
=
join
''
,
map
{
$s
->working_pieces->{
$index
}{
$_
}[4] }
sort
{
$a
<=>
$b
}
keys
%{
$s
->working_pieces->{
$index
}};
if
((
substr
(
$s
->pieces,
$index
* 20, 20) eq sha1(
$piece
))) {
for
my
$attempt
(1 .. 5) {
last
unless
$s
->_write(
$index
, 0,
$piece
);
}
vec
(
$s
->{bitfield},
$index
, 1) = 1;
$s
->_broadcast(
build_have(
$index
),
sub
{
!!!
index
substr
(
unpack
(
'b*'
,
$_
->{bitfield}),
0,
$s
->piece_count + 1),
0, 0;
}
);
$s
->announce(
'complete'
)
if
!
scalar
grep
{
$_
}
split
''
,
substr
unpack
(
'b*'
, ~
$s
->bitfield), 0,
$s
->piece_count + 1;
$s
->_consider_peer(
$_
)
for
grep
{
$_
->{local_interested} }
values
%{
$s
->peers};
$s
->_trigger_hash_pass(
$index
);
}
else
{
$s
->_trigger_hash_fail(
$index
);
}
delete
$s
->working_pieces->{
$index
};
}
$s
->_request_pieces(
$s
->peers->{
$h
});
}
elsif
(
$packet
->{type} ==
$CANCEL
) {
my
(
$index
,
$offset
,
$length
) = @{
$packet
->{payload}};
return
if
!
grep
{
$_
->[0] ==
$index
&&
$_
->[1] ==
$offset
&&
$_
->[2] ==
$length
} @{
$s
->peers->{
$h
}{remote_requests}};
$s
->peers->{
$h
}{remote_requests} = [
grep
{
(
$_
->[0] !=
$index
)
|| (
$_
->[1] !=
$offset
)
|| (
$_
->[2] !=
$length
)
} @{
$s
->peers->{
$h
}{remote_requests}}
];
}
elsif
(
$packet
->{type} ==
$SUGGEST
) {
push
@{
$s
->peers->{
$h
}{local_suggest}},
$packet
->{payload};
}
elsif
(
$packet
->{type} ==
$HAVE_ALL
) {
$s
->peers->{
$h
}{bitfield} =
pack
'b*'
, (1 x
$s
->piece_count);
$s
->_consider_peer(
$s
->peers->{
$h
});
$s
->peers->{
$h
}{timeout}
= AE::timer(120, 0,
sub
{
$s
->_del_peer(
$h
) });
}
elsif
(
$packet
->{type} ==
$HAVE_NONE
) {
$s
->peers->{
$h
}{bitfield} =
pack
'b*'
, (0 x
$s
->piece_count);
$s
->peers->{
$h
}{timeout}
= AE::timer(30, 0,
sub
{
$s
->_del_peer(
$h
) });
}
elsif
(
$packet
->{type} ==
$REJECT
) {
my
(
$index
,
$offset
,
$length
) = @{
$packet
->{payload}};
return
if
!
grep
{
$_
->[0] ==
$index
&&
$_
->[1] ==
$offset
&&
$_
->[2] ==
$length
} @{
$s
->peers->{
$h
}{local_requests}};
$s
->working_pieces->{
$index
}{
$offset
}->[3] = ();
$s
->peers->{
$h
}{local_requests} = [
grep
{
(
$_
->[0] !=
$index
)
|| (
$_
->[1] !=
$offset
)
|| (
$_
->[2] !=
$length
)
} @{
$s
->peers->{
$h
}{local_requests}}
];
$s
->peers->{
$h
}{timeout}
= AE::timer(30, 0,
sub
{
$s
->_del_peer(
$h
) });
}
elsif
(
$packet
->{type} ==
$ALLOWED_FAST
) {
push
@{
$s
->peers->{
$h
}{local_allowed}},
$packet
->{payload};
}
else
{
}
last
if
5 >
length
(
$h
->rbuf //
''
);
}
}
sub
_send_bitfield {
my
(
$s
,
$h
) =
@_
;
if
(
vec
(
$s
->peers->{
$h
}{reserved}, 7, 1) & 0x04) {
if
(
$s
->seed) {
return
$s
->_send_encrypted(
$h
, build_haveall());
}
elsif
(
$s
->bitfield() !~ m[[^\0]]) {
return
$s
->_send_encrypted(
$h
, build_havenone());
}
}
$s
->_send_encrypted(
$h
, build_bitfield(
$s
->bitfield));
}
sub
_broadcast {
my
(
$s
,
$data
,
$qualifier
) =
@_
;
$qualifier
//=
sub
{1};
$s
->_send_encrypted(
$_
->{handle},
$data
)
for
grep
{
$qualifier
->() }
values
%{
$s
->peers};
}
sub
_consider_peer {
my
(
$s
,
$p
) =
@_
;
return
if
$s
->state ne
'active'
;
return
if
$s
->complete;
my
$relevence
=
$p
->{bitfield} &
$s
->wanted;
my
$interesting
= (
index
(
substr
(
unpack
(
'b*'
,
$relevence
), 0,
$s
->piece_count + 1), 1, 0)
!= -1) ? 1 : 0;
if
(
$interesting
) {
if
(!
$p
->{local_interested}) {
$p
->{local_interested} = 1;
$s
->_send_encrypted(
$p
->{handle}, build_interested());
}
}
else
{
if
(
$p
->{local_interested}) {
$p
->{local_interested} = 0;
$s
->_send_encrypted(
$p
->{handle}, build_not_interested());
}
}
}
has
working_pieces
=> (
is
=>
'ro'
,
lazy
=> 1,
isa
=> HashRef,
init_arg
=>
undef
,
default
=>
sub
{ {} }
);
sub
_file_to_range {
my
(
$s
,
$file
) =
@_
;
my
$start
= 0;
for
(0 ..
$file
- 1) {
$start
+=
$s
->files->[
$_
]->{
length
};
}
my
$end
=
$start
+
$s
->files->[
$file
]->{
length
};
$start
=
$start
/
$s
->piece_length;
$end
=
$end
/
$s
->piece_length;
(
int
(
$start
) ..
int
$end
+ (
$end
!=
int
(
$end
) ? 0 : +1));
}
sub
_request_pieces {
my
(
$s
,
$p
) =
@_
;
return
if
$s
->state ne
'active'
;
weaken
$p
unless
isweak
$p
;
$p
//
return
;
$p
->{handle} //
return
;
my
@indexes
;
if
(
scalar
keys
%{
$s
->working_pieces} < 10) {
for
my
$findex
(0 .. $
for
my
$index
(
$s
->_file_to_range(
$findex
)) {
next
if
!(
vec
(
$p
->{bitfield},
$index
, 1)
&& !
vec
(
$s
->bitfield,
$index
, 1));
push
@indexes
,
map
{
$index
} 1 ..
$s
->{files}[
$findex
]{priority};
}
}
}
else
{
@indexes
=
keys
%{
$s
->working_pieces};
}
return
if
!
@indexes
;
my
$index
=
$indexes
[
rand
@indexes
];
my
$piece_size
=
$index
==
$s
->piece_count ?
$s
->size %
$s
->piece_length
:
$s
->piece_length;
my
$block_count
=
$piece_size
/
$block_size
;
my
@offsets
=
map
{
$_
*
$block_size
}
0 ..
$block_count
- ((
int
(
$block_count
) ==
$block_count
) ? 1 : 0);
$s
->working_pieces->{
$index
} //= {
map
{
$_
=> {} }
@offsets
};
my
@unrequested
=
sort
{
$a
<=>
$b
}
grep
{
(!
ref
$s
->working_pieces->{
$index
}{
$_
})
|| ( (!
defined
$s
->working_pieces->{
$index
}{
$_
}[4])
&& (!
defined
$s
->working_pieces->{
$index
}{
$_
}[3]))
}
@offsets
;
my
@unfilled_local_requests
=
grep
{ !
defined
$_
->[4] } @{
$p
->{local_requests}};
for
(
scalar
@unfilled_local_requests
.. 12) {
my
$offset
=
shift
@unrequested
;
$offset
//
return
;
my
$_block_size
= (
$offset
==
$offsets
[-1]) ?
(
$piece_size
%
$block_size
) ||
$block_size
:
$block_size
;
next
if
!
$_block_size
;
AE::
log
trace
=>
'Requesting %d, %d, %d'
,
$index
,
$offset
,
$_block_size
;
$s
->_send_encrypted(
$p
->{handle},
build_request(
$index
,
$offset
,
$_block_size
))
;
$s
->working_pieces->{
$index
}{
$offset
} = [
$index
,
$offset
,
$_block_size
,
$p
,
undef
,
AE::timer(
60, 0,
sub
{
$p
//
return
;
$p
->{handle} //
return
;
$s
->_send_encrypted(
$p
->{handle},
build_cancel(
$index
,
$offset
,
$_block_size
));
$s
->working_pieces->{
$index
}{
$offset
}[3] = ();
$p
->{local_requests} = [
grep
{
$_
->[0] !=
$index
||
$_
->[1] !=
$offset
||
$_
->[2] !=
$_block_size
} @{
$p
->{local_requests}}
];
$p
->{timeout} = AE::timer(45, 0,
sub
{
$s
->_del_peer(
$p
->{handle}) });
}
)
];
weaken(
$s
->working_pieces->{
$index
}{
$offset
}[3])
unless
isweak(
$s
->working_pieces->{
$index
}{
$offset
}[3]);
push
@{
$p
->{local_requests}}, [
$index
,
$offset
,
$_block_size
];
}
}
has
on_hash_pass
=> (
is
=>
'rw'
,
isa
=> CodeRef,
default
=>
sub
{
sub
{ !!1 }
},
clearer
=>
'_no_hash_pass'
);
sub
_trigger_hash_pass {
shift
->on_hash_pass()->(
@_
) }
has
on_hash_fail
=> (
is
=>
'rw'
,
isa
=> CodeRef,
default
=>
sub
{
sub
{ !!1 }
},
clearer
=>
'_no_hash_fail'
);
sub
_trigger_hash_fail {
shift
->on_hash_fail()->(
@_
) }
has
state
=> (
is
=>
'ro'
,
isa
=> Enum [
qw[active stopped paused]
],
writer
=>
'_set_state'
,
default
=>
sub
{
'active'
}
);
sub
stop {
my
$s
=
shift
;
AE::
log
debug
=>
'Stopping...'
;
return
if
$s
->state eq
'stopped'
;
AE::
log
trace
=>
'Announcing "stopped" event to trackers...'
;
$s
->announce(
'stopped'
);
AE::
log
trace
=>
'Disconnecting peers...'
;
$s
->_clear_peers;
AE::
log
trace
=>
'Stopping new peers ticker...'
;
$s
->_clear_peer_timer;
AE::
log
trace
=>
'Closing files...'
;
$s
->_open(
$_
,
'c'
)
for
0 .. $
AE::
log
trace
=>
'Setting internal status...'
;
$s
->_set_state(
'stopped'
);
}
sub
start {
my
$s
=
shift
;
AE::
log
debug
=>
'Starting...'
;
$s
->announce(
'started'
)
unless
$s
->state eq
'active'
;
$s
->peers;
AE::
log
trace
=>
'Starting new peers ticker...'
;
$s
->_peer_timer;
AE::
log
trace
=>
'Setting internal status...'
;
$s
->_set_state(
'active'
);
}
sub
pause {
my
$s
=
shift
;
AE::
log
debug
=>
'Pausing...'
;
$s
->peers;
AE::
log
trace
=>
'Starting new peers ticker...'
;
$s
->_peer_timer;
AE::
log
trace
=>
'Setting internal status...'
;
$s
->_set_state(
'paused'
);
}
sub
BUILD {
my
(
$s
,
$a
) =
@_
;
AE::
log
debug
=>
'BUILD()'
;
$s
->start && AE::
log
debug
=>
'Calling ->start()'
if
$s
->state eq
'active'
;
$s
->paused && AE::
log
debug
=>
'Calling ->paused() '
if
$s
->state eq
'paused'
;
}
sub
_send_encrypted {
my
(
$s
,
$h
,
$packet
) =
@_
;
return
if
!
$h
;
AE::
log
trace
=>
sub
{
'Outgoing packet: '
. Data::Dump::
dump
(
$packet
);
};
return
$h
->push_write(
$packet
);
}
sub
_send_handshake {
my
(
$s
,
$h
) =
@_
;
AE::
log
trace
=>
'Outgoing handshake'
;
$h
->push_write(build_handshake(
$s
->reserved,
$s
->infohash,
$s
->peerid));
}
1337;