use
5.010_001;
ro
=> [
qw/ctx data/
],
);
use
version;
our
$VERSION
=
'v0.5.1'
;
sub
consume {
args(
my
$class
,
my
$job_id
=>
'Int'
,
my
$now
=> +{
isa
=>
'DateTime'
,
optional
=> 1 },
my
$ctx
=>
'App::Koyomi::Context'
,
);
my
$ds
=
$ctx
->datasource_semaphore;
if
(
$ds
->isa(
'App::Koyomi::DataSource::Semaphore::None'
)) {
return
1;
}
$now
||=
$ctx
->now;
my
$header
=
sprintf
(
'%d %d'
, $$,
$job_id
);
my
$semaphore
=
$ds
->get_by_job_id(
job_id
=>
$job_id
,
ctx
=>
$ctx
,
);
unless
(
$semaphore
) {
critf(
q/%s Not found semaphore data!/
,
$header
);
return
;
}
my
$ttl
=
$ctx
->config->{job}{lock_ttl_seconds};
debugf(
q/now:%d semaphore:%d diff:%d/
,
$now
->epoch,
$semaphore
->run_date->epoch,
$now
->epoch -
$semaphore
->run_date->epoch);
if
(
$now
->epoch -
$semaphore
->run_date->epoch <
$ttl
) {
debugf(
q/%s run on another proc. Host=%s, Pid=%d, Run_On='%s'/
,
$header
,
$semaphore
->run_host,
$semaphore
->run_pid,
$semaphore
->run_date->datetime
);
return
;
}
my
$ret
=
$semaphore
->update_with_condition(
data
=> +{
run_host
=> hostname,
run_pid
=> $$,
run_date
=>
$now
,
},
where
=> +{
run_date
=>
$semaphore
->run_date,
},
ctx
=>
$ctx
,
);
unless
(
$ret
) {
warnf(
q/%s Failed to update semaphore; Probably another process got lock./
,
$header
);
}
return
$ret
;
}
1;