Sponsoring The Perl Toolchain Summit 2025: Help make this important event another success Learn more

#!perl
use 5.10.1;
use strict;
## Core:
use Config;
use DB_File;
use Fcntl qw/:DEFAULT :flock/;
use Digest::SHA qw/sha1_hex/;
use File::Path qw/make_path/;
use Scalar::Util qw/blessed/;
use Storable qw/dclone/;
use POSIX ();
use Time::HiRes qw/tv_interval gettimeofday/;
require bytes;
## POE + HTTP
use POE;
use URI;
## Indexer:
use Compress::Zlib qw/memGzip memGunzip/;
my $opts = {
help => sub {
print(
"ircindexer-server-json\n\n",
" -v, --version\n",
" Display IRC::Indexer version\n\n",
" -c, --config=PATH\n",
" Configuration file for this server.\n\n",
" -l, --loglevel=LEVEL\n",
" Log verbosity, one of: debug, info, warn\n",
" Overrides configuration file.\n\n",
" -d, --detach\n",
" Run as a daemon process.\n\n",
" -i, --interval=SECS\n",
" Interval between trawling servers on a specific network.\n",
" Defaults to 600 seconds (10 minutes)\n",
);
exit 0
},
version => sub {
print(
"ircindexer-server-json (IRC::Indexer $IRC::Indexer::VERSION)\n",
" POEx::HTTP::Server $POEx::HTTP::Server::VERSION\n",
);
exit 0
},
detach => 0,
interval => 600,
};
GetOptions( $opts,
qw/
help
version
config=s
detach!
interval=i
loglevel=s
showsource
/,
);
my $obj = {
Log => undef,
Cfg => undef,
active_trawlers => {},
workers_by_pid => {},
workers_by_wid => {},
};
my $state = {
timers => { trawlers => {} },
stats => {
## Basic outline; guaranteed present in /stats output.
RunID => sha1_hex(Time::HiRes::time() . rand),
Version => $IRC::Indexer::VERSION,
StartedAt => time,
LoopTick => 0,
TotalNetworks => 0,
TotalServers => 0,
TrawlersExecuted => 0,
TrawlersCreated => 0,
LastTrawl => 0,
LastTrawledServer => undef,
LastTrawlerReaped => undef,
HTTP_Responses => 0,
},
};
my $cfg = {
## Defaults.
## These will be pulled from our httpd.conf in get_cfg.
NetworkDir => undef,
BindAddr => '0.0.0.0',
ServerPort => 8700,
CacheDir => undef,
ListChans => undef,
Forking => 0,
MaxEncoders => 20,
MaxTrawlers => 20,
LogFile => undef,
LogLevel => undef,
LogHTTP => 1,
LogIRC => 1,
## Constructed by get_nets:
Networks => {},
};
## for managing trawler creation
## keyed on network name
## value is array of arrays containing:
## [ 'network', 'server', $trawler_opts_hashref ]
my $ArrayConf = { };
## If we're missing CacheDir:
my $jsmemcache = { };
sub get_nets {
my $conf_obj = $obj->{Cfg};
die "Configuration directive missing: NetworkDir\n"
unless $cfg->{NetworkDir};
my $nethash = $conf_obj->parse_nets($cfg->{NetworkDir});
$cfg->{Networks} = $nethash;
}
sub get_cfg {
## set up $cfg, networks, and logging
die "No --config specified.\n" unless $opts->{config};
die "Specified conf nonexistant: $opts->{config}"
unless -e $opts->{config};
die "Specified conf not readable: $opts->{config}"
unless -r $opts->{config};
## Try to set up primary conf:
my $cfpath = $opts->{config};
my $conf = IRC::Indexer::Conf->new;
$obj->{Cfg} = $conf;
my $cfhash;
{
local $@;
eval { $cfhash = $conf->parse_conf($cfpath) };
die "Could not parse conf: $@" if $@;
}
## Only copies our defined params, see $cfg
for my $thisopt (keys %$cfg) {
$cfg->{$thisopt} = $cfhash->{$thisopt};
}
## Set up logging:
my $loglevel = $opts->{loglevel} || $cfg->{LogLevel} || 'info' ;
if ($cfg->{LogFile}) {
my $handler = IRC::Indexer::Logger->new(
LogFile => $cfg->{LogFile},
LogLevel => $loglevel,
);
$obj->{Log} = $handler->logger;
}
if ($opts->{detach}) {
open 'STDERR', '>', $cfg->{LogFile}
if $cfg->{LogFile};
} else {
## not detached, log to STDOUT
unless (blessed $obj->{Log}) {
my $handler = IRC::Indexer::Logger->new(
DevNull => 1,
);
$obj->{Log} = $handler->logger;
}
$obj->{Log}->add(
screen => {
log_to => "STDOUT",
maxlevel => $loglevel || 'info',
timeformat => "%Y/%m/%d %H:%M:%S",
message_layout => "[%T] %L %m",
},
);
}
$cfg->{ServerPort} //= 8700;
## Read server specifications:
get_nets();
}
sub log_to {
## log_to('http', 'warn', ...)
## log_to('irc', 'info', ...)
## null-op if there's no logger present
my ($type, $level, @lines) = @_;
return unless @lines;
my $log = $obj->{Log};
return unless blessed $log;
given (lc $type) {
when ("http") {
return unless $cfg->{LogHTTP};
$log->$level("$type ", @lines);
}
when ("irc") {
return unless $cfg->{LogIRC};
$log->$level("$type ", @lines);
}
default {
## anything else goes right to the log:
$log->$level("$type ", @lines);
}
}
}
sub poco_cfg_httpd {
$state->{htevents} = [ map { 'h_'.$_ } qw/
error_404
src
stats
list
server
servlist
network
netlist
/ ];
push(@{$state->{htevents}}, 'post_request');
POEx::HTTP::Server->spawn(
retry => 0,
inet => {
LocalAddr => $cfg->{BindAddr},
LocalPort => $cfg->{ServerPort},
},
handlers => [
'^/$' => 'poe:indexerhttp/h_stats',
'^/stats' => 'poe:indexerhttp/h_stats',
'^/list' => 'poe:indexerhttp/h_list',
'^/network' => 'poe:indexerhttp/h_network',
'^/netlist' => 'poe:indexerhttp/h_netlist',
'^/src' => 'poe:indexerhttp/h_src',
'' => 'poe:indexerhttp/h_error_404',
'post_request' => 'poe:indexerhttp/post_request',
],
);
log_to('http', 'info', 'HTTP configured');
return 1
}
sub poco_init_session {
## create 'indexerhttp' session
POE::Session->create(
package_states => [
main => [ qw/
_start
_stop
session_sig_int
session_sig_hup
e_init_nets
e_rehash
e_timer_chk
e_trawler_run
e_trawler_process
js_push_pending
js_worker_stdout
js_worker_stderr
js_worker_error
js_worker_closed
js_worker_sigchld
/ ],
],
);
## e_* events bridge trawler sessions.
## js_* events handle forked encoders.
## h_* events bridge HTTP; they are added via state() in _start.
log_to('session', 'debug', 'Session initialized');
}
sub _start {
my ($kernel, $heap) = @_[KERNEL, HEAP];
$kernel->alias_set('indexerhttp');
$kernel->sig('INT', 'session_sig_int');
$kernel->sig('TERM', 'session_sig_int');
$kernel->sig('HUP', 'session_sig_hup');
log_to('session', 'info', 'Session started');
## init httpd
my @htstates = poco_cfg_httpd();
$kernel->call( $_[SESSION], 'e_init_nets' );
## add states from $state->{htevents}
$kernel->state($_, __PACKAGE__) for @{$state->{htevents}};
}
sub e_init_nets {
my ($kernel, $heap) = @_[KERNEL, HEAP];
## called by either _start or e_rehash
## clear configured trawlers in $ArrayConf
$ArrayConf = { };
my($total_nets, $total_servers);
NETWORK: for my $network (keys %{ $cfg->{Networks} }) {
++$total_nets;
my $this_net = $cfg->{Networks}->{$network};
SERVER: for my $server (keys %$this_net) {
++$total_servers;
my $this_cf = $this_net->{$server};
log_to('irc', 'info', "Init trawler: $network -> $server");
## push configuration to array
## create_trawler in e_trawler_run
my %trawlopts = %$this_cf;
push(@{$ArrayConf->{$network}},
[ $network, $server, { Server => $server, %trawlopts } ]
);
log_to('irc', 'debug', "Trawler configured for $server");
} ## SERVER
} ## NETWORK
log_to('irc', 'info',
"Trawling $total_servers servers across $total_nets networks"
);
$state->{stats}->{TotalNetworks} = $total_nets;
$state->{stats}->{TotalServers} = $total_servers;
## Reset our HasRun.
## This may be a rehash.
$state->{hasrun} = {};
$kernel->yield('e_timer_chk');
}
sub e_rehash {
my ($kernel, $heap) = @_[KERNEL, HEAP];
## Suspend our check loop, e_init_nets will reset it.
$kernel->alarm('e_timer_chk');
## Stop any running trawler runs.
## Running encoders for dead nets are automatically discarded.
## (You might bottleneck for a few if they're throttled hard, though.)
NETWORK: for my $network (keys %{ $obj->{active_trawlers} }) {
log_to('irc', 'debug', "Suspending trawler for $network");
my $trawler = delete $obj->{active_trawlers}->{$network};
## If there's no Session ID, this trawler isn't running.
my $sessid = $trawler->ID // next NETWORK;
$kernel->post( $sessid, 'shutdown' );
} ## NETWORK
## Reload Networks cfg
my @prev_nets = keys %{ $cfg->{Networks} };
get_nets();
## Check if any networks have disappeared.
## If they have, we don't need their cache.
CFG: for my $old_net (@prev_nets) {
unless (defined $cfg->{Networks}->{$old_net}) {
cache_clear($old_net);
}
}
## Re-initialize runs right now.
$kernel->call( $_[SESSION], 'e_init_nets');
}
sub session_sig_hup {
log_to('session', 'warn', 'SIGHUP, reloading');
$_[KERNEL]->sig_handled();
$_[KERNEL]->call( $_[SESSION], 'e_rehash');
}
sub session_sig_int {
warn "Exiting due to signal";
$_[KERNEL]->yield('_stop');
}
sub _stop {
## Run some cleanup if needed.
for my $pidof (keys %{ $obj->{workers_by_pid} }) {
my $wheel = delete $obj->{workers_by_pid}->{$pidof};
next unless ref $wheel;
my $wid = $wheel->ID;
delete $obj->{workers_by_wid}->{$wid};
$wheel->kill(9);
}
if ($state->{openDB}) {
for my $network (keys %{ $state->{openDB} }) {
db_close($network);
}
}
log_to('session', 'warn', "_stop received");
## HTTP server should pick up shutdown and clean up.
$_[KERNEL]->signal( $_[KERNEL], 'shutdown' );
}
sub e_timer_chk {
my ($kernel, $heap) = @_[KERNEL, HEAP];
my $state_run = $state->{hasrun};
++$state->{stats}->{LoopTick};
my @trawlers;
my $sp = 0;
NETWORK: for my $network (keys %$ArrayConf) {
my $last_trawled = $state_run->{$network};
unless ($last_trawled) {
## No trawlers have been run for this network, yet.
## Spin one up, set HasRun time.
$state_run->{$network} = time();
unless (@{ $ArrayConf->{$network} }) {
log_to('irc', 'warn', "No trawlers to start for $network");
next NETWORK
}
## Space out our initial runs.
log_to('irc', 'debug', "Scheduling initial trawler for $network");
$kernel->alarm_add( 'e_trawler_run', time + ($sp+=1),
$ArrayConf->{$network}->[0]
);
}
} ## NETWORK
TRAWL: for my $network (keys %{ $obj->{active_trawlers} }) {
my $trawler = delete $obj->{active_trawlers}->{$network};
if ($trawler->done) {
## A trawler has finished.
## Rotate ArrayConf and schedule a new one.
my $prev = shift @{ $ArrayConf->{$network} };
push(@{ $ArrayConf->{$network} }, $prev );
$kernel->yield( 'e_trawler_process', $network, $trawler );
## Schedule the next one for this net to run in ten minutes.
my $next_trawler = $ArrayConf->{$network}->[0];
if (defined $next_trawler) {
my $next_run_at = time + ($opts->{interval} || 600);
$kernel->alarm_add(
'e_trawler_run', $next_run_at, $next_trawler
);
log_to('irc', 'debug',
"Scheduling next trawler for $network"
);
} else {
log_to('irc', 'warn', "ArrayConf disappeared for $network" );
}
} else {
## Still pending.
$obj->{active_trawlers}->{$network} = $trawler;
}
} ## TRAWL
$kernel->alarm('e_timer_chk' => time + 1);
}
sub e_trawler_run {
## takes an arrayref:
## [ network, server, conf-as-hashref ]
my ($kernel, $heap, $confitem) = @_[KERNEL, HEAP, ARG0];
unless (ref $confitem eq 'ARRAY' && $ArrayConf->{ $confitem->[0] }) {
## e_trawler_run was scheduled, but we lost the network in a rehash.
## this is a hack; we should be tracking/removing alarm_add IDs.
log_to('irc', 'debug',
"skipped e_trawler_run for cancelled trawler"
);
return
}
my ($network, $server, $trawlcf) = @$confitem;
## Too many trawlers running.
my $current_count = keys %{$obj->{active_trawlers}};
if ($cfg->{MaxTrawlers} && $current_count >= $cfg->{MaxTrawlers}) {
++$state->{stats}->{ThrottledTrawler};
$kernel->alarm_add( 'e_trawler_run', time + 1, $confitem );
return
}
my $trawler = create_trawler(%$trawlcf);
$trawler->run();
log_to('irc', 'info', "Trawler run: $network: $server");
## Add to active trawlers for this network
$obj->{active_trawlers}->{$network} = $trawler;
++$state->{stats}->{TrawlersExecuted};
$state->{stats}->{LastTrawl} = time;
$state->{stats}->{LastTrawledServer} = $server;
}
sub e_trawler_process {
my ($kernel, $heap) = @_[KERNEL, HEAP];
my ($network, $trawler) = @_[ARG0, ARG1];
## passed trawler objects marked as done (from _chk)
my $orig_server = $trawler->trawler_for;
log_to('irc', 'debug', "Processing trawler: $network: $orig_server");
$state->{stats}->{LastTrawlerReaped} = $network;
if (my $err = $trawler->failed) {
log_to('irc', 'warn',
"Trawler failed: $network ($orig_server): $err"
);
$state->{stats}->{LastFailure}->{$network} = {
Target => $orig_server,
Error => $err,
TS => time,
};
++$state->{stats}->{Fail}->{$network}->{$orig_server};
} else {
my $info = $trawler->info;
my $server = $info->servername;
++$state->{stats}->{Success}->{$network}->{$server};
log_to('irc', 'info',
"Completed trawler: $network ($server)"
);
## sort ListChans if configured:
$info->channels if $cfg->{ListChans};
my $s_info = $info->netinfo;
my $netinfo_obj;
if ($obj->{NetInfo}->{$network}) {
my $stored = $obj->{NetInfo}->{$network};
$netinfo_obj = IRC::Indexer::Report::Network->new(
FromHash => $stored,
NoChannels => 1,
);
} else {
$netinfo_obj = IRC::Indexer::Report::Network->new(
NoChannels => 1,
);
}
## feed it this server's Info object:
$netinfo_obj->add_server($info);
undef $info;
## process and grab new hash:
my $n_info = $netinfo_obj->netinfo;
undef $netinfo_obj;
## cheating here a bit to save some memory ...
## preserve hash for later Network reconstruction, w/out chans
## then merge this server report's hashchans:
$obj->{NetInfo}->{$network} = dclone($n_info);
$n_info->{HashChans} = $s_info->{HashChans};
$kernel->call( $_[SESSION], 'js_push_pending',
[ $n_info, $network ]
);
undef $n_info;
$kernel->call( $_[SESSION], 'js_push_pending',
[ $s_info, $network, $server ],
);
}
}
## Helpers
sub create_trawler {
my %trawlopts = @_;
my $this_cf = \%trawlopts;
my $trawler_class;
if ($cfg->{Forking}) {
$trawler_class = 'Trawl::Forking';
} else {
$trawler_class = 'Trawl::Bot';
}
$trawler_class = 'IRC::Indexer::'.$trawler_class;
my $trawler = $trawler_class->new(
Server => $this_cf->{Server},
Port => $this_cf->{Port},
Nickname => $this_cf->{Nickname},
Timeout => $this_cf->{Timeout},
Interval => $this_cf->{Interval},
BindAddr => $this_cf->{BindAddr},
UseIPV6 => $this_cf->{UseIPV6},
);
++$state->{stats}->{TrawlersCreated};
return $trawler
}
sub db_open {
my ($network, %dbopts) = @_;
$dbopts{lc $_} = $dbopts{$_} for keys %dbopts;
my $cachedir;
die "db_open called but no CacheDir configured"
unless $cachedir = $cfg->{CacheDir};
make_path($cachedir) unless -e $cachedir;
my $dbpath = join '/', $cachedir, $network;
my $lockpath = $dbpath . '.lock';
open my $lock_fh, '>', $lockpath
or die "lockfile open failed: $!";
my $timer = 0;
my $timeout = 3;
until ( flock $lock_fh, LOCK_EX | LOCK_NB ) {
if ($timer > $timeout) {
log_to('DB', 'warn', "Lock timed out.");
return
}
select undef, undef, undef, 0.1;
$timer += 0.1;
}
print $lock_fh $$;
$state->{locks}->{$network} = $lock_fh;
my %this_cache;
tie %this_cache, "DB_File", $dbpath,
O_CREAT|O_RDWR, 0666, $DB_HASH
or die "Failed DB tie: $dbpath: $!";
my $runid = $state->{stats}->{RunID};
if ($this_cache{'_RUNID'} && $this_cache{'_RUNID'} ne $runid) {
log_to('DB', 'warn', "Clearing stale DB: $network");
%this_cache = ();
}
$this_cache{'_RUNID'} = $runid;
unless ($dbopts{gzip} || $dbopts{raw}) {
(tied %this_cache)->filter_fetch_value(
sub { $_ = memGunzip($_) }
);
(tied %this_cache)->filter_store_value(
sub { $_ = memGzip($_) }
);
}
$state->{openDB}->{$network} = \%this_cache;
++$state->{stats}->{DB}->{Open};
}
sub db_write {
my ($data, $network, $server) = @_;
++$state->{stats}->{DB}->{Write};
my $opendb = $state->{openDB}->{$network};
unless ($opendb && tied %$opendb) {
die "Attempted db_write on unopened DB ($network)"
}
if ($server) {
$opendb->{$server} = $data;
} else {
$opendb->{'_NETMETA'} = $data;
}
return 1
}
sub db_read {
my ($network, $server) = @_;
++$state->{stats}->{DB}->{Read};
my $opendb = $state->{openDB}->{$network};
unless ($opendb && tied %$opendb) {
die "Attempted db_read on unopened DB ($network)"
}
my $data;
if ($server) {
$data = $opendb->{$server};
} else {
$data = $opendb->{'_NETMETA'};
}
return $data
}
sub db_close {
my ($network) = @_;
++$state->{stats}->{DB}->{Close};
my $cachedir;
die "db_close called but no CacheDir configured"
unless $cachedir = $cfg->{CacheDir};
my $dbpath = join '/', $cachedir, $network;
my $lockpath = $dbpath . '.lock';
my $lock_fh = delete $state->{locks}->{$network};
my $opendb = delete $state->{openDB}->{$network};
if ($lock_fh) {
close $lock_fh;
unlink $lockpath;
}
if ($opendb) {
untie %$opendb;
}
}
sub db_unlink {
my ($network) = @_;
++$state->{stats}->{DB}->{Unlink};
my $cachedir;
die "db_unlink called but no CacheDir configured"
unless $cachedir = $cfg->{CacheDir};
my $dbpath = join '/', $cachedir, $network;
log_to('DB', 'debug', "db_unlink: $dbpath");
## Not sure if we should die() ...
unlink($dbpath) or log_to('DB', 'warn', "db_unlink: $network: $!");
}
sub cachedir_write {
my ($zipped, $network, $server) = @_;
return unless $network;
my $cachedir;
die "cachedir_write called but no CacheDir configured"
unless $cachedir = $cfg->{CacheDir};
my $timer0 = [gettimeofday];
db_open($network, Raw => 1);
db_write($zipped, $network, $server);
db_close($network);
my $interval = tv_interval($timer0);
$state->{stats}->{DB}->{'WR_Total'} += (sprintf("%.6f", $interval)+0);
$state->{stats}->{DB}->{'WR_Longest'} = $interval
if $interval > ($state->{stats}->{DB}->{'WR_Longest'}//0);
my $runs = $state->{stats}->{DB}->{Write} // 1;
my $avg = $state->{stats}->{DB}->{'WR_Total'} / $runs;
$state->{stats}->{DB}->{'WR_Avg'} = sprintf("%.6f", $avg) + 0;
return 1
}
sub cachedir_read {
my ($gzipped, $network, $server) = @_;
return unless $network;
$gzipped = 0 if $gzipped eq 'json';
my $cachedir;
die "cachedir_read called but no CacheDir configured"
unless $cachedir = $cfg->{CacheDir};
my $timer0 = [gettimeofday];
db_open($network, Raw => $gzipped);
my $data = db_read($network, $server);
db_close($network);
my $interval = tv_interval($timer0);
$state->{stats}->{DB}->{'RD_Total'} += (sprintf("%.6f", $interval)+0);
$state->{stats}->{DB}->{'RD_Longest'} = $interval
if $interval > ($state->{stats}->{DB}->{'RD_Longest'}//0);
my $read = $state->{stats}->{DB}->{Read} // 1;
my $avg = $state->{stats}->{DB}->{'RD_Total'} / $read;
$state->{stats}->{DB}->{'RD_Avg'} = sprintf("%.6f", $avg) + 0;
return $data
}
sub cache_clear {
my ($network) = @_;
return unless $network;
log_to('DB', 'warn', "Clearing cache: $network");
if ($cfg->{CacheDir}) {
db_unlink($network);
} else {
delete $jsmemcache->{Networks}->{$network}
}
}
sub cache_json {
my ($gzip, $network, $server) = @_;
if ($server) {
## store a server's json
if ($cfg->{CacheDir}) {
cachedir_write($gzip, $network, $server);
} else {
$jsmemcache->{Servers}->{$network}->{$server} = memGunzip($gzip);
}
} else {
## store a network's json
if ($cfg->{CacheDir}) {
cachedir_write($gzip, $network);
} else {
$jsmemcache->{Networks}->{$network} = memGunzip($gzip);
}
}
}
sub restore_cache {
my ($gzipped, $network, $server) = @_;
$gzipped = 0 if $gzipped eq 'json';
if ($server) {
if ($cfg->{CacheDir}) {
return cachedir_read($gzipped, $network, $server)
} else {
return unless exists $jsmemcache->{Servers}->{$network};
my $cachedjs = $jsmemcache->{Servers}->{$network}->{$server};
return memGzip($cachedjs) if $cachedjs and $gzipped;
return $cachedjs
}
} else {
if ($cfg->{CacheDir}) {
return cachedir_read($gzipped, $network)
} else {
my $cachedjs = $jsmemcache->{Networks}->{$network};
return memGzip($cachedjs) if $cachedjs and $gzipped;
return $cachedjs
}
}
}
## HTTP states
sub h_error_404 {
my ($kernel, $heap) = @_[KERNEL, HEAP];
my ($req, $resp, $reason) = @_[ARG0, ARG1, ARG2];
my $uri = URI->new( $req->uri );
my $path = $uri->path;
my $conn = $req->connection;
my $addr = $conn->remote_host;
log_to('http', 'info', "$addr 404 $path");
$reason = "Undefined action: ".$req->uri unless $reason;
$resp->error(404, $reason);
}
sub h_src {
my ($kernel, $heap) = @_[KERNEL, HEAP];
my ($req, $resp) = @_[ARG0, ARG1];
## Silly little undocumented proof-of-concept.
## Won't do anything if --showsource wasn't specified.
unless ($opts->{showsource}) {
$kernel->yield( h_error_404 => $req, $resp );
return
}
my $src;
seek(DATA, 0, 0);
{
local $/; $src = <DATA>;
}
$resp->content_type('text/plain');
$resp->content($src);
$resp->respond;
my $conn = $req->connection;
my $addr = $conn->remote_host;
log_to('http', 'info', "$addr 200 h_src");
$resp->done;
}
sub h_stats {
my ($kernel, $heap) = @_[KERNEL, HEAP];
my ($req, $resp) = @_[ARG0, ARG1];
my $uri = URI->new($req->uri);
my @chunks = $uri->path_segments;
splice @chunks, 0, 2;
my ($method) = @chunks;
$method = 'json' unless $method;
if ($method eq 'text' || $method eq 'plain') {
my $n_count = $state->{stats}->{TotalNetworks};
my $s_count = $state->{stats}->{TotalServers};
$resp->content_type('text/plain');
$resp->content(
"Trawling $s_count servers across $n_count networks\n\n"
. "Version: $IRC::Indexer::VERSION\n\n"
);
} else {
my @pending = keys %{ $obj->{active_trawlers} };
my $ref = { ActiveTrawlers => \@pending, %{$state->{stats}} };
my $json = JSON::XS->new->utf8(1)->pretty->encode($ref);
$resp->content_type('application/json');
$resp->content($json);
}
$resp->respond;
my $conn = $req->connection;
my $addr = $conn->remote_host;
log_to('http', 'info', "$addr 200 h_stats");
$resp->done;
}
sub h_server {
my ($kernel, $heap) = @_[KERNEL, HEAP];
my ($req, $resp) = @_[ARG0, ARG1];
my ($network, $server) = @_[ARG2, ARG3];
## handed off by h_network
my $zipped;
unless ( $zipped = restore_cache('zip', $network, $server) ) {
## no JSON for this server, 404
my $reason;
if (defined $cfg->{Networks}->{$network}
&& defined $cfg->{Networks}->{$network}->{$server}
) {
## configured but not trawled (yet)
$reason = "PENDING $server ($network) pending trawler run";
} else {
$reason = "NO_SUCH No such server";
}
$kernel->yield( h_error_404 => $req, $resp, $reason );
return
}
my $uri = URI->new($req->uri);
if ($uri->query && $uri->query =~ /^gzip/i) {
$resp->content_type('application/x-gzip');
$resp->content($zipped);
} else {
my $json = memGunzip($zipped);
undef $zipped;
$resp->content_type('application/json');
$resp->content($json);
}
$resp->respond;
my $conn = $req->connection;
my $addr = $conn->remote_host;
log_to('http', 'info', "$addr 200 h_server $network $server");
$resp->done;
}
sub h_servlist {
my ($kernel, $heap) = @_[KERNEL, HEAP];
my ($req, $resp, $network) = @_[ARG0, ARG1, ARG2];
## handed off by h_network
my $servlist = [keys %{$state->{stats}->{Success}->{$network}}];
my $jsify = IRC::Indexer::Output::JSON->new(
Input => $servlist,
);
my $json = $jsify->dump;
$resp->content_type('application/json');
$resp->content($json);
$resp->respond;
my $conn = $req->connection;
my $addr = $conn->remote_host;
log_to('http', 'info', "$addr 200 h_servlist $network");
$resp->done;
}
sub h_netlist {
my ($kernel, $heap) = @_[KERNEL, HEAP];
my ($req, $resp) = @_[ARG0, ARG1];
## Return JSONified array of pullable networks:
my $netlist = {
Configured => [ keys %$ArrayConf ],
Trawled => [ keys %{$state->{stats}->{Success}} ],
};
my $jsify = IRC::Indexer::Output::JSON->new(
Input => $netlist,
);
my $json = $jsify->dump;
$resp->content_type('application/json');
$resp->content($json);
$resp->respond;
my $conn = $req->connection;
my $addr = $conn->remote_host;
log_to('http', 'info', "$addr 200 h_netlist");
$resp->done;
}
sub h_list {
my ($kernel, $heap) = @_[KERNEL, HEAP];
my ($req, $resp) = @_[ARG0, ARG1];
## Return all configured networks/servers
my $jsify = IRC::Indexer::Output::JSON->new(
Input => $cfg->{Networks},
);
my $json = $jsify->dump;
$resp->content_type('application/json');
$resp->content($json);
$resp->respond;
my $conn = $req->connection;
my $addr = $conn->remote_host;
log_to('http', 'info', "$addr 200 h_list");
$resp->done;
}
sub h_network {
my ($kernel, $heap) = @_[KERNEL, HEAP];
my ($req, $resp) = @_[ARG0, ARG1];
## handler for /network/...
my $uri = URI->new($req->uri);
my @chunks = $uri->path_segments;
splice @chunks, 0, 2;
my ($network) = @chunks;
unless ($network) {
## supply JSONified network list:
$kernel->yield( h_netlist => $req, $resp );
return
}
my $zipped;
## We don't know this network:
unless ( $zipped = restore_cache('zip', $network) ) {
my $reason;
if (defined $cfg->{Networks}->{$network}) {
## configured but not trawled
$reason = "PENDING $network pending trawler run";
} else {
$reason = "NO_SUCH No such network";
}
$kernel->yield( h_error_404 => $req, $resp, $reason );
return
}
## if this is a /network/<net>/server, hand it off:
if ($chunks[1] && $chunks[1] eq "server") {
my $server = $chunks[2];
unless ($server) {
## supply available server list:
$kernel->yield( h_servlist => $req, $resp, $network );
return
}
$kernel->yield( h_server => $req, $resp, $network, $server );
return
}
## otherwise, try to serve a net hash:
if ($uri->query && $uri->query =~ /^gzip/i) {
$resp->content_type('application/x-gzip');
$resp->content($zipped);
} else {
my $json = memGunzip($zipped);
undef $zipped;
$resp->content_type('application/json');
$resp->content($json);
}
$resp->respond;
my $conn = $req->connection;
my $addr = $conn->remote_host;
log_to('http', 'info', "$addr 200 h_network $network");
$resp->done;
}
sub post_request {
++$state->{stats}->{HTTP_Responses};
}
## Forkable worker states
sub js_push_pending {
my ($kernel, $heap) = @_[KERNEL, HEAP];
my $item = $_[ARG0];
## yield back if too many workers running
my $current_workers = scalar keys %{ $obj->{workers_by_pid}//{} };
if ($cfg->{MaxEncoders} && $cfg->{MaxEncoders} <= $current_workers) {
++$state->{stats}->{ThrottledEncoder};
$kernel->alarm_add('js_push_pending', Time::HiRes::time + 0.5, $item);
return
}
## otherwise fork worker and put()
my ($network) = @$item;
log_to('session', 'debug', "Encoder spawn: $network");
## borrowed from PoCo Resolver:
my $perlpath = $Config{perlpath};
if ($^O ne 'VMS') {
$perlpath .= $Config{_exe}
unless $perlpath =~ m/$Config{_exe}$/i;
}
my $forkable;
if ($^O eq 'MSWin32') {
## sucks to be you!
$forkable = \&IRC::Indexer::Process::JSONify::worker;
} else {
$forkable = [
$perlpath, (map { "-I$_" } @INC),
'-MIRC::Indexer::Process::JSONify', '-e',
'IRC::Indexer::Process::JSONify->worker()'
];
}
my $wheel = POE::Wheel::Run->new(
Program => $forkable,
ErrorEvent => 'js_worker_error',
StdoutEvent => 'js_worker_stdout',
StderrEvent => 'js_worker_stderr',
CloseEvent => 'js_worker_closed',
StdioFilter => POE::Filter::Reference->new(),
);
++$state->{stats}->{EncoderWheels}->{Created};
my $wheelid = $wheel->ID;
my $pidof = $wheel->PID;
$kernel->sig_child($pidof, 'js_worker_sigchld');
$obj->{workers_by_pid}->{$pidof} = $wheel;
$obj->{workers_by_wid}->{$wheelid} = $wheel;
## Feed this worker.
$wheel->put($item);
}
sub js_worker_stdout {
my ($kernel, $item) = @_[KERNEL, ARG0];
log_to('session', 'debug', "Handling input from encoder");
my ($zip, $network, $server) = splice @$item, 0, 4;
unless ($network) {
log_to('session', 'warn', "Invalid input from encoder!");
++$state->{stats}->{EncoderWheels}->{BadResult};
return
}
++$state->{stats}->{EncoderWheels}->{Success};
## got JSON back, cache_json it unless it's stale
if ($cfg->{Networks}->{$network}) {
cache_json($zip, $network, $server);
my $logstr = "Cached JSON for $network";
$logstr .= " ($server)" if $server;
log_to('irc', 'debug', $logstr);
} else {
log_to('irc', 'info', "Discarding JSON for stale network: $network");
}
}
sub js_worker_stderr {
my ($kernel, $err, $id) = @_[KERNEL, ARG0, ARG1];
log_to('session', 'warn', "Encoder reported err: $err");
++$state->{stats}->{EncoderWheels}->{Died};
## these will die() and go away.
}
sub js_worker_closed {
my ($kernel, $wheelid) = @_[KERNEL, ARG0];
my $wheel = delete $obj->{workers_by_wid}->{$wheelid};
if (defined $wheel) {
++$state->{stats}->{EncoderWheels}->{Closed};
my $pidof = $wheel->PID;
delete $obj->{workers_by_pid}->{$pidof};
$wheel->kill(9); ## just in case ...
}
}
sub js_worker_error {
## null-op; these should sigchld and go away
## (and you get no data either way)
}
sub js_worker_sigchld {
## child's gone
my ($kernel, $pidof) = @_[KERNEL, ARG1];
log_to('session', 'debug', "Worker sig_chld: $pidof");
++$state->{stats}->{EncoderWheels}->{SIGCHLD};
my $wheel;
return unless $wheel = delete $obj->{workers_by_pid}->{$pidof};
my $wheelid = $wheel->ID;
delete $obj->{workers_by_wid}->{$wheelid};
}
sub start_indexer {
if ($opts->{detach}) {
print "Starting detached indexer.\n";
my $fork = fork;
exit 1 if not defined $fork;
exit 0 if $fork;
POSIX::setsid();
$fork = fork;
exit 1 if not defined $fork;
exit 0 if $fork;
open(STDIN, '<', '/dev/null');
open(STDOUT, '>', '/dev/null');
open(STDERR, '>', '/dev/null');
umask(022);
}
get_cfg();
poco_init_session();
POE::Kernel->run;
}
start_indexer();
__END__
=pod
=head1 NAME
ircindexer-server-json - Serve IRC::Indexer JSON over HTTP
=head1 SYNOPSIS
## Create example httpd.cf:
$ ircindexer-examplecf -t httpd -o httpd.cf
$ $EDITOR httpd.cf
## Add some servers:
$ mkdir -p networks/cobaltirc
$ ircindexer-examplecf -t spec -o \
networks/cobaltirc/phoenix.xyloid.org
$ $EDITOR networks/cobaltirc/phoenix.xyloid.org
## Start server:
$ ircindexer-server-json -c httpd.cf
=head1 DESCRIPTION
Indexes sets of servers in parallel via L<IRC::Indexer>, exporting
information as JSON via HTTP for ease of retrieval.
Uses L<POEx::HTTP::Server> to provide HTTP daemon functionality.
JSON is encoded by forked L<IRC::Indexer::Process::JSONify> instances.
Intended to serve as the back-end to a presentation system for the
data collected. See L<IRC::Indexer::POD::ExampleClients> for some
examples of ways to make use of the exported data on the client side.
=head2 Fetching JSON
Networks or servers that are configured but not yet trawled will report
a 404 error with a string indicating that the trawl run is pending.
You can check 404s for network/server requests for status information.
If the first word of the 404 content is PENDING, the requested item is
pending a trawler run.
If the first word of the 404 content is NO_SUCH, the requested item is
not being indexed by the trawler.
=head3 List
A hash containing networks and servers scheduled for trawling is
available via B</list>:
## JSON hash containing network -> server mapping of
## configured trawlers:
Note that this is the list of configured network -> server maps, not
necessarily the list of B<trawled> servers.
That is to say, the network names are guaranteed valid, but the server
names may not be available under the /network/<NET>/server/ tree; for a
list of servers available for retrieval, see L</Servers>, below.
=head3 Networks
Network information hashes are exported via B</network/> routes:
## JSON array of available trawled networks:
## JSON Network Info hash for 'CobaltIRC':
## Compressed:
These are indexed by their configured network name.
=head3 Servers
Specific server hashes as described in L<IRC::Indexer::Trawl::Bot> are
exported via /network/<NETNAME>/server/<SERVERNAME>:
## List of trawled servers for this network:
## Trawl results from a single server:
## Compressed:
Note that servers are not available for retrieval until they are
trawled; depending on trawl intervals, it may take some time to cycle
through all listed servers for a network.
Also note that servers are indexed by their reported server name -- not
necessarily the name specified in their configuration.
=head3 Stats
Stats regarding the running instance are exported via B</stats>:
=head2 Reloading networks
You can reload the existing networks set and force a rehash by sending a
SIGHUP to the server.
Your NetworkDir will be re-scanned and any removed networks will be
pulled from the trawl cache. All networks will have their trawl runs
rescheduled immediately -- in other words, SIGHUP can also be used to
force a refresh.
=head1 AUTHOR
Jon Portnoy <avenj@cobaltirc.org>
=cut