{
our
$VERSION
=
'0.014'
; }
has
connect_timeout
=> (
is
=>
'ro'
,
default
=>
sub
{
return
20 },
);
has
interval
=> (
is
=>
'ro'
,
default
=>
sub
{
return
0.1 },
);
has
max_redirects
=> (
is
=>
'ro'
,
default
=>
sub
{
return
5 },
);
has
_start
=> (
is
=>
'ro'
,
default
=>
sub
{
return
1 },
init_arg
=>
'start'
,
);
has
update_timeout
=> (
is
=>
'ro'
,
default
=>
sub
{
return
300 },
);
sub
BUILD {
my
$self
=
shift
;
$self
->start
if
$self
->_start;
}
sub
class_custom_pairs {
my
$self
=
shift
;
return
(
token
=>
$self
->token);
}
sub
parse_response {
my
(
$self
,
$res
,
$threshold_id
) =
@_
;
my
$data
=
$res
->json // {};
return
grep
{
$_
->{update_id} >=
$threshold_id
} @{
$data
->{result}//[]}
if
$data
->{ok};
my
$error
=
$data
->{description} //
'unknown error'
;
$log
->error(
'getUpdates error: '
.
$error
);
if
(
$log
->is_trace) {
local
$Data::Dumper::Indent
= 1;
for
([
json
=>
$data
], [
res
=>
$res
]) {
(
my
$d
= Dumper
$_
->[1]) =~ s{\A.*?=}{
$_
->[0] =>}mxs;
$log
->trace(
$d
);
}
}
return
;
}
sub
poller {
my
$self
=
shift
;
my
$args
= (
@_
&&
ref
(
$_
[0])) ?
$_
[0] : {
@_
};
my
$update_timeout
=
$self
->update_timeout;
my
%query
= (
offset
=> 0,
telegram_method
=>
'getUpdates'
,
timeout
=>
$update_timeout
,
);
my
$sender
=
$self
->sender;
$sender
->telegram->agent->connect_timeout(
$self
->connect_timeout)
->inactivity_timeout(
$update_timeout
+ 5)
->max_redirects(
$self
->max_redirects);
my
$is_busy
;
my
$on_data
=
sub
{
my
(
$ua
,
$tx
) =
@_
;
my
@updates
;
try
{
@updates
=
$self
->parse_response(
$tx
->res,
$query
{offset});
}
catch
{
$log
->error(bleep
$_
);
die
$_
if
$self
->should_rethrow(
$args
);
};
my
@retval
=
$self
->process_updates(
refs
=> {
sender
=>
$sender
,
tx
=>
$tx
,
ua
=>
$ua
,
},
source_pairs
=> {
query
=> \
%query
,
},
updates
=> \
@updates
,
%$args
,
);
for
my
$item
(
@retval
) {
next
unless
defined
$item
;
defined
(
my
$record
=
$item
->{record}) or
next
;
defined
(
my
$outcome
=
$item
->{outcome}) or
next
;
defined
(
my
$message
=
$outcome
->{send_response}) or
next
;
$sender
->send_message(
$message
,
record
=>
$record
);
}
$query
{offset} = 1 + max
map
{
$_
->{update_id} }
@updates
if
@updates
;
$is_busy
= 0;
};
return
sub
{
return
if
$is_busy
;
$is_busy
= 1;
$sender
->send_message(\
%query
,
callback
=>
$on_data
);
};
}
around
process
=>
sub
{
my
(
$orig
,
$self
,
$record
) =
@_
;
my
$outcome
=
$orig
->(
$self
,
$record
);
$record
->{source}{query}{offset} =
$record
->{update}{update_id} + 1;
return
$outcome
;
};
sub
start {
my
$self
=
shift
;
Mojo::IOLoop->recurring(
$self
->interval,
$self
->poller(
@_
));
Mojo::IOLoop->start
unless
Mojo::IOLoop->is_running;
return
$self
;
}
1;