package App::Netdisco::JobQueue::PostgreSQL; use Dancer qw/:moose :syntax :script/; use Dancer::Plugin::DBIC 'schema'; use App::Netdisco::Util::Device qw/is_discoverable is_macsuckable is_arpnipable/; use App::Netdisco::Backend::Job; use Module::Load (); use Try::Tiny; use base 'Exporter'; our @EXPORT = (); our @EXPORT_OK = qw/ jq_warm_thrusters jq_getsome jq_locked jq_queued jq_lock jq_defer jq_complete jq_log jq_userlog jq_insert jq_delete /; our %EXPORT_TAGS = ( all => \@EXPORT_OK ); #Â given a device, tests if any of the primary acls applies #Â returns a list of job actions to be denied/skipped on this host. sub _get_denied_actions { my $device = shift; my @badactions = (); return @badactions unless $device; push @badactions, ('discover', @{ setting('job_prio')->{high} }) if not is_discoverable($device); push @badactions, (qw/macsuck nbtstat/) if not is_macsuckable($device); push @badactions, 'arpnip' if not is_arpnipable($device); return @badactions; } sub jq_warm_thrusters { my @devices = schema('netdisco')->resultset('Device')->all; my $rs = schema('netdisco')->resultset('DeviceSkip'); my %actionset = (); foreach my $d (@devices) { my @badactions = _get_denied_actions($d); $actionset{$d->ip} = \@badactions if scalar @badactions; } schema('netdisco')->txn_do(sub { $rs->search({ backend => setting('workers')->{'BACKEND'} })->delete; $rs->populate([ map {{ backend => setting('workers')->{'BACKEND'}, device => $_, actionset => $actionset{$_}, }} keys %actionset ]); }); } sub jq_getsome { my $num_slots = shift; return () unless $num_slots and $num_slots > 0; my $jobs = schema('netdisco')->resultset('Admin'); my @returned = (); my %jobsearch = ( status => 'queued', device => { '-not_in' => $jobs->skipped(setting('workers')->{'BACKEND'}, setting('workers')->{'max_deferrals'}, setting('workers')->{'retry_after'}) ->columns('device')->as_query }, ); my %randoms = (order_by => 'random()', rows => $num_slots ); my $hiprio = $jobs->search({ %jobsearch, -or => [ { username => { '!=' => undef } }, { action => { -in => setting('job_prio')->{'high'} } }, ], }, { %randoms, '+select' => [\'100 as job_priority'], '+as' => ['me.job_priority'], }); my $loprio = $jobs->search({ %jobsearch, action => { -not_in => setting('job_prio')->{'high'} }, }, { %randoms, '+select' => [\'0 as job_priority'], '+as' => ['me.job_priority'], }); my $rs = $hiprio->union($loprio)->search(undef, { order_by => { '-desc' => 'job_priority' }, rows => $num_slots, }); while (my $job = $rs->next) { if ($job->device) { #Â need to handle device discovered since backend daemon started # and the skiplist was primed. these should be checked against #Â the various acls and have device_skip entry added if needed, #Â and return false if it should have been skipped. my @badactions = _get_denied_actions($job->device); if (scalar @badactions) { schema('netdisco')->resultset('DeviceSkip')->find_or_create({ backend => setting('workers')->{'BACKEND'}, device => $job->device, },{ key => 'device_skip_pkey' })->add_to_actionset(@badactions); # will now not be selected in a future _getsome() next if scalar grep {$_ eq $job->action} @badactions; } } # remove any duplicate jobs, incuding possibly this job if there # is already an equivalent job running #Â note that the self-removal of a job has an unhelpful log: it is #Â reported as a duplicate of itself! however what's happening is that #Â netdisco has seen another running job with same params (but the query #Â cannot see that ID to use it in the message). my %job_properties = ( action => $job->action, port => $job->port, subaction => $job->subaction, -or => [ { device => $job->device }, ($job->device_key ? ({ device_key => $job->device_key }) : ()), ], ); my $gone = $jobs->search({ status => 'queued', -and => [ %job_properties, -or => [{ job => { '!=' => $job->id }, },{ job => $job->id, -exists => $jobs->search({ status => { -like => 'queued-%' }, started => \[q/> (now() - ?::interval)/, setting('jobs_stale_after')], %job_properties, })->as_query, }], ], }, {for => 'update'}) ->update({ status => 'error', log => (sprintf 'duplicate of %s', $job->id) }); debug sprintf 'getsome: cancelled %s duplicate(s) of job %s', ($gone || 0), $job->id; push @returned, App::Netdisco::Backend::Job->new({ $job->get_columns }); } return @returned; } sub jq_locked { my @returned = (); my $rs = schema('netdisco')->resultset('Admin')->search({ status => ('queued-'. setting('workers')->{'BACKEND'}), started => \[q/> (now() - ?::interval)/, setting('jobs_stale_after')], }); while (my $job = $rs->next) { push @returned, App::Netdisco::Backend::Job->new({ $job->get_columns }); } return @returned; } sub jq_queued { my $job_type = shift; return schema('netdisco')->resultset('Admin')->search({ device => { '!=' => undef}, action => $job_type, status => { -like => 'queued%' }, })->get_column('device')->all; } sub jq_lock { my $job = shift; my $happy = false; # lock db row and update to show job has been picked try { my $updated = schema('netdisco')->resultset('Admin') ->search({ job => $job->id, status => 'queued' }, { for => 'update' }) ->update({ status => ('queued-'. setting('workers')->{'BACKEND'}), started => \"now()", }); $happy = true if $updated > 0; } catch { error $_; }; return $happy; } sub jq_defer { my $job = shift; my $happy = false; #Â note this taints all actions on the device. for example if both #Â macsuck and arpnip are allowed, but macsuck fails 10 times, then #Â arpnip (and every other action) will be prevented on the device. #Â seeing as defer is only triggered by an SNMP connect failure, this #Â behaviour seems reasonable, to me (or desirable, perhaps). try { schema('netdisco')->txn_do(sub { if ($job->device) { schema('netdisco')->resultset('DeviceSkip')->find_or_create({ backend => setting('workers')->{'BACKEND'}, device => $job->device, },{ key => 'device_skip_pkey' })->increment_deferrals; } # lock db row and update to show job is available schema('netdisco')->resultset('Admin') ->find($job->id, {for => 'update'}) ->update({ status => 'queued', started => undef }); }); $happy = true; } catch { error $_; }; return $happy; } sub jq_complete { my $job = shift; my $happy = false; # lock db row and update to show job is done/error #Â now that SNMP connect failures are deferrals and not errors, any complete #Â status, whether success or failure, indicates an SNMP connect. reset the #Â connection failures counter to forget oabout occasional connect glitches. try { schema('netdisco')->txn_do(sub { if ($job->device) { schema('netdisco')->resultset('DeviceSkip')->find_or_create({ backend => setting('workers')->{'BACKEND'}, device => $job->device, },{ key => 'device_skip_pkey' })->update({ deferrals => 0 }); } schema('netdisco')->resultset('Admin') ->find($job->id, {for => 'update'})->update({ status => $job->status, log => $job->log, started => $job->started, finished => $job->finished, }); }); $happy = true; } catch { # use DDP; p $job; error $_; }; return $happy; } sub jq_log { return schema('netdisco')->resultset('Admin')->search({}, { prefetch => 'target', order_by => { -desc => [qw/entered device action/] }, rows => 50, })->with_times->hri->all; } sub jq_userlog { my $user = shift; return schema('netdisco')->resultset('Admin')->search({ username => $user, finished => { '>' => \"(now() - interval '5 seconds')" }, })->with_times->all; } sub jq_insert { my $jobs = shift; $jobs = [$jobs] if ref [] ne ref $jobs; my $happy = false; try { schema('netdisco')->txn_do(sub { schema('netdisco')->resultset('Admin')->populate([ map {{ device => $_->{device}, device_key => $_->{device_key}, port => $_->{port}, action => $_->{action}, subaction => ($_->{extra} || $_->{subaction}), username => $_->{username}, userip => $_->{userip}, status => 'queued', }} @$jobs ]); }); $happy = true; } catch { error $_; }; return $happy; } sub jq_delete { my $id = shift; if ($id) { schema('netdisco')->txn_do(sub { schema('netdisco')->resultset('Admin')->find($id)->delete(); }); } else { schema('netdisco')->txn_do(sub { schema('netdisco')->resultset('Admin')->delete(); }); } } true;