#!perl
use
5.10.1;
use
Fcntl
qw/:DEFAULT :flock/
;
my
$opts
= {
help
=>
sub
{
print
(
"ircindexer-server-json\n\n"
,
" -v, --version\n"
,
" Display IRC::Indexer version\n\n"
,
" -c, --config=PATH\n"
,
" Configuration file for this server.\n\n"
,
" -l, --loglevel=LEVEL\n"
,
" Log verbosity, one of: debug, info, warn\n"
,
" Overrides configuration file.\n\n"
,
" -d, --detach\n"
,
" Run as a daemon process.\n\n"
,
" -i, --interval=SECS\n"
,
" Interval between trawling servers on a specific network.\n"
,
" Defaults to 600 seconds (10 minutes)\n"
,
);
exit
0
},
version
=>
sub
{
print
(
"ircindexer-server-json (IRC::Indexer $IRC::Indexer::VERSION)\n"
,
" POEx::HTTP::Server $POEx::HTTP::Server::VERSION\n"
,
);
exit
0
},
detach
=> 0,
interval
=> 600,
};
GetOptions(
$opts
,
qw/
help
version
config=s
detach!
interval=i
loglevel=s
showsource
/
,
);
my
$obj
= {
Log
=>
undef
,
Cfg
=>
undef
,
active_trawlers
=> {},
workers_by_pid
=> {},
workers_by_wid
=> {},
};
my
$state
= {
timers
=> {
trawlers
=> {} },
stats
=> {
RunID
=> sha1_hex(Time::HiRes::
time
() .
rand
),
Version
=>
$IRC::Indexer::VERSION
,
StartedAt
=>
time
,
LoopTick
=> 0,
TotalNetworks
=> 0,
TotalServers
=> 0,
TrawlersExecuted
=> 0,
TrawlersCreated
=> 0,
LastTrawl
=> 0,
LastTrawledServer
=>
undef
,
LastTrawlerReaped
=>
undef
,
HTTP_Responses
=> 0,
},
};
my
$cfg
= {
NetworkDir
=>
undef
,
BindAddr
=>
'0.0.0.0'
,
ServerPort
=> 8700,
CacheDir
=>
undef
,
ListChans
=>
undef
,
Forking
=> 0,
MaxEncoders
=> 20,
MaxTrawlers
=> 20,
LogFile
=>
undef
,
LogLevel
=>
undef
,
LogHTTP
=> 1,
LogIRC
=> 1,
Networks
=> {},
};
my
$ArrayConf
= { };
my
$jsmemcache
= { };
sub
get_nets {
my
$conf_obj
=
$obj
->{Cfg};
die
"Configuration directive missing: NetworkDir\n"
unless
$cfg
->{NetworkDir};
my
$nethash
=
$conf_obj
->parse_nets(
$cfg
->{NetworkDir});
$cfg
->{Networks} =
$nethash
;
}
sub
get_cfg {
die
"No --config specified.\n"
unless
$opts
->{config};
die
"Specified conf nonexistant: $opts->{config}"
unless
-e
$opts
->{config};
die
"Specified conf not readable: $opts->{config}"
unless
-r
$opts
->{config};
my
$cfpath
=
$opts
->{config};
my
$conf
= IRC::Indexer::Conf->new;
$obj
->{Cfg} =
$conf
;
my
$cfhash
;
{
local
$@;
eval
{
$cfhash
=
$conf
->parse_conf(
$cfpath
) };
die
"Could not parse conf: $@"
if
$@;
}
for
my
$thisopt
(
keys
%$cfg
) {
$cfg
->{
$thisopt
} =
$cfhash
->{
$thisopt
};
}
my
$loglevel
=
$opts
->{loglevel} ||
$cfg
->{LogLevel} ||
'info'
;
if
(
$cfg
->{LogFile}) {
my
$handler
= IRC::Indexer::Logger->new(
LogFile
=>
$cfg
->{LogFile},
LogLevel
=>
$loglevel
,
);
$obj
->{Log} =
$handler
->logger;
}
if
(
$opts
->{detach}) {
open
'STDERR'
,
'>'
,
$cfg
->{LogFile}
if
$cfg
->{LogFile};
}
else
{
unless
(blessed
$obj
->{Log}) {
my
$handler
= IRC::Indexer::Logger->new(
DevNull
=> 1,
);
$obj
->{Log} =
$handler
->logger;
}
$obj
->{Log}->add(
screen
=> {
log_to
=>
"STDOUT"
,
maxlevel
=>
$loglevel
||
'info'
,
timeformat
=>
"%Y/%m/%d %H:%M:%S"
,
message_layout
=>
"[%T] %L %m"
,
},
);
}
$cfg
->{ServerPort} //= 8700;
get_nets();
}
sub
log_to {
my
(
$type
,
$level
,
@lines
) =
@_
;
return
unless
@lines
;
my
$log
=
$obj
->{Log};
return
unless
blessed
$log
;
given
(
lc
$type
) {
when
(
"http"
) {
return
unless
$cfg
->{LogHTTP};
$log
->
$level
(
"$type "
,
@lines
);
}
when
(
"irc"
) {
return
unless
$cfg
->{LogIRC};
$log
->
$level
(
"$type "
,
@lines
);
}
default
{
$log
->
$level
(
"$type "
,
@lines
);
}
}
}
sub
poco_cfg_httpd {
$state
->{htevents} = [
map
{
'h_'
.
$_
}
qw/
error_404
src
stats
list
server
servlist
network
netlist
/
];
push
(@{
$state
->{htevents}},
'post_request'
);
POEx::HTTP::Server->spawn(
retry
=> 0,
inet
=> {
LocalAddr
=>
$cfg
->{BindAddr},
LocalPort
=>
$cfg
->{ServerPort},
},
handlers
=> [
'^/$'
=>
'poe:indexerhttp/h_stats'
,
'^/stats'
=>
'poe:indexerhttp/h_stats'
,
'^/list'
=>
'poe:indexerhttp/h_list'
,
'^/network'
=>
'poe:indexerhttp/h_network'
,
'^/netlist'
=>
'poe:indexerhttp/h_netlist'
,
'^/src'
=>
'poe:indexerhttp/h_src'
,
''
=>
'poe:indexerhttp/h_error_404'
,
'post_request'
=>
'poe:indexerhttp/post_request'
,
],
);
log_to(
'http'
,
'info'
,
'HTTP configured'
);
return
1
}
sub
poco_init_session {
POE::Session->create(
package_states
=> [
main
=> [
qw/
_start
_stop
session_sig_int
session_sig_hup
e_init_nets
e_rehash
e_timer_chk
e_trawler_run
e_trawler_process
js_push_pending
js_worker_stdout
js_worker_stderr
js_worker_error
js_worker_closed
js_worker_sigchld
/
],
],
);
log_to(
'session'
,
'debug'
,
'Session initialized'
);
}
sub
_start {
my
(
$kernel
,
$heap
) =
@_
[KERNEL, HEAP];
$kernel
->alias_set(
'indexerhttp'
);
$kernel
->sig(
'INT'
,
'session_sig_int'
);
$kernel
->sig(
'TERM'
,
'session_sig_int'
);
$kernel
->sig(
'HUP'
,
'session_sig_hup'
);
log_to(
'session'
,
'info'
,
'Session started'
);
my
@htstates
= poco_cfg_httpd();
$kernel
->call(
$_
[SESSION],
'e_init_nets'
);
$kernel
->state(
$_
, __PACKAGE__)
for
@{
$state
->{htevents}};
}
sub
e_init_nets {
my
(
$kernel
,
$heap
) =
@_
[KERNEL, HEAP];
$ArrayConf
= { };
my
(
$total_nets
,
$total_servers
);
NETWORK:
for
my
$network
(
keys
%{
$cfg
->{Networks} }) {
++
$total_nets
;
my
$this_net
=
$cfg
->{Networks}->{
$network
};
SERVER:
for
my
$server
(
keys
%$this_net
) {
++
$total_servers
;
my
$this_cf
=
$this_net
->{
$server
};
log_to(
'irc'
,
'info'
,
"Init trawler: $network -> $server"
);
my
%trawlopts
=
%$this_cf
;
push
(@{
$ArrayConf
->{
$network
}},
[
$network
,
$server
, {
Server
=>
$server
,
%trawlopts
} ]
);
log_to(
'irc'
,
'debug'
,
"Trawler configured for $server"
);
}
}
log_to(
'irc'
,
'info'
,
"Trawling $total_servers servers across $total_nets networks"
);
$state
->{stats}->{TotalNetworks} =
$total_nets
;
$state
->{stats}->{TotalServers} =
$total_servers
;
$state
->{hasrun} = {};
$kernel
->yield(
'e_timer_chk'
);
}
sub
e_rehash {
my
(
$kernel
,
$heap
) =
@_
[KERNEL, HEAP];
$kernel
->
alarm
(
'e_timer_chk'
);
NETWORK:
for
my
$network
(
keys
%{
$obj
->{active_trawlers} }) {
log_to(
'irc'
,
'debug'
,
"Suspending trawler for $network"
);
my
$trawler
=
delete
$obj
->{active_trawlers}->{
$network
};
my
$sessid
=
$trawler
->ID //
next
NETWORK;
$kernel
->post(
$sessid
,
'shutdown'
);
}
my
@prev_nets
=
keys
%{
$cfg
->{Networks} };
get_nets();
CFG:
for
my
$old_net
(
@prev_nets
) {
unless
(
defined
$cfg
->{Networks}->{
$old_net
}) {
cache_clear(
$old_net
);
}
}
$kernel
->call(
$_
[SESSION],
'e_init_nets'
);
}
sub
session_sig_hup {
log_to(
'session'
,
'warn'
,
'SIGHUP, reloading'
);
$_
[KERNEL]->sig_handled();
$_
[KERNEL]->call(
$_
[SESSION],
'e_rehash'
);
}
sub
session_sig_int {
warn
"Exiting due to signal"
;
$_
[KERNEL]->yield(
'_stop'
);
}
sub
_stop {
for
my
$pidof
(
keys
%{
$obj
->{workers_by_pid} }) {
my
$wheel
=
delete
$obj
->{workers_by_pid}->{
$pidof
};
next
unless
ref
$wheel
;
my
$wid
=
$wheel
->ID;
delete
$obj
->{workers_by_wid}->{
$wid
};
$wheel
->
kill
(9);
}
if
(
$state
->{openDB}) {
for
my
$network
(
keys
%{
$state
->{openDB} }) {
db_close(
$network
);
}
}
log_to(
'session'
,
'warn'
,
"_stop received"
);
$_
[KERNEL]->signal(
$_
[KERNEL],
'shutdown'
);
}
sub
e_timer_chk {
my
(
$kernel
,
$heap
) =
@_
[KERNEL, HEAP];
my
$state_run
=
$state
->{hasrun};
++
$state
->{stats}->{LoopTick};
my
@trawlers
;
my
$sp
= 0;
NETWORK:
for
my
$network
(
keys
%$ArrayConf
) {
my
$last_trawled
=
$state_run
->{
$network
};
unless
(
$last_trawled
) {
$state_run
->{
$network
} =
time
();
unless
(@{
$ArrayConf
->{
$network
} }) {
log_to(
'irc'
,
'warn'
,
"No trawlers to start for $network"
);
next
NETWORK
}
log_to(
'irc'
,
'debug'
,
"Scheduling initial trawler for $network"
);
$kernel
->alarm_add(
'e_trawler_run'
,
time
+ (
$sp
+=1),
$ArrayConf
->{
$network
}->[0]
);
}
}
TRAWL:
for
my
$network
(
keys
%{
$obj
->{active_trawlers} }) {
my
$trawler
=
delete
$obj
->{active_trawlers}->{
$network
};
if
(
$trawler
->done) {
my
$prev
=
shift
@{
$ArrayConf
->{
$network
} };
push
(@{
$ArrayConf
->{
$network
} },
$prev
);
$kernel
->yield(
'e_trawler_process'
,
$network
,
$trawler
);
my
$next_trawler
=
$ArrayConf
->{
$network
}->[0];
if
(
defined
$next_trawler
) {
my
$next_run_at
=
time
+ (
$opts
->{interval} || 600);
$kernel
->alarm_add(
'e_trawler_run'
,
$next_run_at
,
$next_trawler
);
log_to(
'irc'
,
'debug'
,
"Scheduling next trawler for $network"
);
}
else
{
log_to(
'irc'
,
'warn'
,
"ArrayConf disappeared for $network"
);
}
}
else
{
$obj
->{active_trawlers}->{
$network
} =
$trawler
;
}
}
$kernel
->
alarm
(
'e_timer_chk'
=>
time
+ 1);
}
sub
e_trawler_run {
my
(
$kernel
,
$heap
,
$confitem
) =
@_
[KERNEL, HEAP, ARG0];
unless
(
ref
$confitem
eq
'ARRAY'
&&
$ArrayConf
->{
$confitem
->[0] }) {
log_to(
'irc'
,
'debug'
,
"skipped e_trawler_run for cancelled trawler"
);
return
}
my
(
$network
,
$server
,
$trawlcf
) =
@$confitem
;
my
$current_count
=
keys
%{
$obj
->{active_trawlers}};
if
(
$cfg
->{MaxTrawlers} &&
$current_count
>=
$cfg
->{MaxTrawlers}) {
++
$state
->{stats}->{ThrottledTrawler};
$kernel
->alarm_add(
'e_trawler_run'
,
time
+ 1,
$confitem
);
return
}
my
$trawler
= create_trawler(
%$trawlcf
);
$trawler
->run();
log_to(
'irc'
,
'info'
,
"Trawler run: $network: $server"
);
$obj
->{active_trawlers}->{
$network
} =
$trawler
;
++
$state
->{stats}->{TrawlersExecuted};
$state
->{stats}->{LastTrawl} =
time
;
$state
->{stats}->{LastTrawledServer} =
$server
;
}
sub
e_trawler_process {
my
(
$kernel
,
$heap
) =
@_
[KERNEL, HEAP];
my
(
$network
,
$trawler
) =
@_
[ARG0, ARG1];
my
$orig_server
=
$trawler
->trawler_for;
log_to(
'irc'
,
'debug'
,
"Processing trawler: $network: $orig_server"
);
$state
->{stats}->{LastTrawlerReaped} =
$network
;
if
(
my
$err
=
$trawler
->failed) {
log_to(
'irc'
,
'warn'
,
"Trawler failed: $network ($orig_server): $err"
);
$state
->{stats}->{LastFailure}->{
$network
} = {
Target
=>
$orig_server
,
Error
=>
$err
,
TS
=>
time
,
};
++
$state
->{stats}->{Fail}->{
$network
}->{
$orig_server
};
}
else
{
my
$info
=
$trawler
->info;
my
$server
=
$info
->servername;
++
$state
->{stats}->{Success}->{
$network
}->{
$server
};
log_to(
'irc'
,
'info'
,
"Completed trawler: $network ($server)"
);
$info
->channels
if
$cfg
->{ListChans};
my
$s_info
=
$info
->netinfo;
my
$netinfo_obj
;
if
(
$obj
->{NetInfo}->{
$network
}) {
my
$stored
=
$obj
->{NetInfo}->{
$network
};
$netinfo_obj
= IRC::Indexer::Report::Network->new(
FromHash
=>
$stored
,
NoChannels
=> 1,
);
}
else
{
$netinfo_obj
= IRC::Indexer::Report::Network->new(
NoChannels
=> 1,
);
}
$netinfo_obj
->add_server(
$info
);
undef
$info
;
my
$n_info
=
$netinfo_obj
->netinfo;
undef
$netinfo_obj
;
$obj
->{NetInfo}->{
$network
} = dclone(
$n_info
);
$n_info
->{HashChans} =
$s_info
->{HashChans};
$kernel
->call(
$_
[SESSION],
'js_push_pending'
,
[
$n_info
,
$network
]
);
undef
$n_info
;
$kernel
->call(
$_
[SESSION],
'js_push_pending'
,
[
$s_info
,
$network
,
$server
],
);
}
}
sub
create_trawler {
my
%trawlopts
=
@_
;
my
$this_cf
= \
%trawlopts
;
my
$trawler_class
;
if
(
$cfg
->{Forking}) {
$trawler_class
=
'Trawl::Forking'
;
}
else
{
$trawler_class
=
'Trawl::Bot'
;
}
$trawler_class
=
'IRC::Indexer::'
.
$trawler_class
;
my
$trawler
=
$trawler_class
->new(
Server
=>
$this_cf
->{Server},
Port
=>
$this_cf
->{Port},
Nickname
=>
$this_cf
->{Nickname},
Timeout
=>
$this_cf
->{Timeout},
Interval
=>
$this_cf
->{Interval},
BindAddr
=>
$this_cf
->{BindAddr},
UseIPV6
=>
$this_cf
->{UseIPV6},
);
++
$state
->{stats}->{TrawlersCreated};
return
$trawler
}
sub
db_open {
my
(
$network
,
%dbopts
) =
@_
;
$dbopts
{
lc
$_
} =
$dbopts
{
$_
}
for
keys
%dbopts
;
my
$cachedir
;
die
"db_open called but no CacheDir configured"
unless
$cachedir
=
$cfg
->{CacheDir};
make_path(
$cachedir
)
unless
-e
$cachedir
;
my
$dbpath
=
join
'/'
,
$cachedir
,
$network
;
my
$lockpath
=
$dbpath
.
'.lock'
;
open
my
$lock_fh
,
'>'
,
$lockpath
or
die
"lockfile open failed: $!"
;
my
$timer
= 0;
my
$timeout
= 3;
until
(
flock
$lock_fh
, LOCK_EX | LOCK_NB ) {
if
(
$timer
>
$timeout
) {
log_to(
'DB'
,
'warn'
,
"Lock timed out."
);
return
}
select
undef
,
undef
,
undef
, 0.1;
$timer
+= 0.1;
}
print
$lock_fh
$$;
$state
->{locks}->{
$network
} =
$lock_fh
;
my
%this_cache
;
tie
%this_cache
,
"DB_File"
,
$dbpath
,
O_CREAT|O_RDWR, 0666,
$DB_HASH
or
die
"Failed DB tie: $dbpath: $!"
;
my
$runid
=
$state
->{stats}->{RunID};
if
(
$this_cache
{
'_RUNID'
} &&
$this_cache
{
'_RUNID'
} ne
$runid
) {
log_to(
'DB'
,
'warn'
,
"Clearing stale DB: $network"
);
%this_cache
= ();
}
$this_cache
{
'_RUNID'
} =
$runid
;
unless
(
$dbopts
{gzip} ||
$dbopts
{raw}) {
(
tied
%this_cache
)->filter_fetch_value(
sub
{
$_
= memGunzip(
$_
) }
);
(
tied
%this_cache
)->filter_store_value(
sub
{
$_
= memGzip(
$_
) }
);
}
$state
->{openDB}->{
$network
} = \
%this_cache
;
++
$state
->{stats}->{DB}->{Open};
}
sub
db_write {
my
(
$data
,
$network
,
$server
) =
@_
;
++
$state
->{stats}->{DB}->{Write};
my
$opendb
=
$state
->{openDB}->{
$network
};
unless
(
$opendb
&&
tied
%$opendb
) {
die
"Attempted db_write on unopened DB ($network)"
}
if
(
$server
) {
$opendb
->{
$server
} =
$data
;
}
else
{
$opendb
->{
'_NETMETA'
} =
$data
;
}
return
1
}
sub
db_read {
my
(
$network
,
$server
) =
@_
;
++
$state
->{stats}->{DB}->{Read};
my
$opendb
=
$state
->{openDB}->{
$network
};
unless
(
$opendb
&&
tied
%$opendb
) {
die
"Attempted db_read on unopened DB ($network)"
}
my
$data
;
if
(
$server
) {
$data
=
$opendb
->{
$server
};
}
else
{
$data
=
$opendb
->{
'_NETMETA'
};
}
return
$data
}
sub
db_close {
my
(
$network
) =
@_
;
++
$state
->{stats}->{DB}->{Close};
my
$cachedir
;
die
"db_close called but no CacheDir configured"
unless
$cachedir
=
$cfg
->{CacheDir};
my
$dbpath
=
join
'/'
,
$cachedir
,
$network
;
my
$lockpath
=
$dbpath
.
'.lock'
;
my
$lock_fh
=
delete
$state
->{locks}->{
$network
};
my
$opendb
=
delete
$state
->{openDB}->{
$network
};
if
(
$lock_fh
) {
close
$lock_fh
;
unlink
$lockpath
;
}
if
(
$opendb
) {
untie
%$opendb
;
}
}
sub
db_unlink {
my
(
$network
) =
@_
;
++
$state
->{stats}->{DB}->{Unlink};
my
$cachedir
;
die
"db_unlink called but no CacheDir configured"
unless
$cachedir
=
$cfg
->{CacheDir};
my
$dbpath
=
join
'/'
,
$cachedir
,
$network
;
log_to(
'DB'
,
'debug'
,
"db_unlink: $dbpath"
);
unlink
(
$dbpath
) or log_to(
'DB'
,
'warn'
,
"db_unlink: $network: $!"
);
}
sub
cachedir_write {
my
(
$zipped
,
$network
,
$server
) =
@_
;
return
unless
$network
;
my
$cachedir
;
die
"cachedir_write called but no CacheDir configured"
unless
$cachedir
=
$cfg
->{CacheDir};
my
$timer0
= [gettimeofday];
db_open(
$network
,
Raw
=> 1);
db_write(
$zipped
,
$network
,
$server
);
db_close(
$network
);
my
$interval
= tv_interval(
$timer0
);
$state
->{stats}->{DB}->{
'WR_Total'
} += (
sprintf
(
"%.6f"
,
$interval
)+0);
$state
->{stats}->{DB}->{
'WR_Longest'
} =
$interval
if
$interval
> (
$state
->{stats}->{DB}->{
'WR_Longest'
}//0);
my
$runs
=
$state
->{stats}->{DB}->{Write} // 1;
my
$avg
=
$state
->{stats}->{DB}->{
'WR_Total'
} /
$runs
;
$state
->{stats}->{DB}->{
'WR_Avg'
} =
sprintf
(
"%.6f"
,
$avg
) + 0;
return
1
}
sub
cachedir_read {
my
(
$gzipped
,
$network
,
$server
) =
@_
;
return
unless
$network
;
$gzipped
= 0
if
$gzipped
eq
'json'
;
my
$cachedir
;
die
"cachedir_read called but no CacheDir configured"
unless
$cachedir
=
$cfg
->{CacheDir};
my
$timer0
= [gettimeofday];
db_open(
$network
,
Raw
=>
$gzipped
);
my
$data
= db_read(
$network
,
$server
);
db_close(
$network
);
my
$interval
= tv_interval(
$timer0
);
$state
->{stats}->{DB}->{
'RD_Total'
} += (
sprintf
(
"%.6f"
,
$interval
)+0);
$state
->{stats}->{DB}->{
'RD_Longest'
} =
$interval
if
$interval
> (
$state
->{stats}->{DB}->{
'RD_Longest'
}//0);
my
$read
=
$state
->{stats}->{DB}->{Read} // 1;
my
$avg
=
$state
->{stats}->{DB}->{
'RD_Total'
} /
$read
;
$state
->{stats}->{DB}->{
'RD_Avg'
} =
sprintf
(
"%.6f"
,
$avg
) + 0;
return
$data
}
sub
cache_clear {
my
(
$network
) =
@_
;
return
unless
$network
;
log_to(
'DB'
,
'warn'
,
"Clearing cache: $network"
);
if
(
$cfg
->{CacheDir}) {
db_unlink(
$network
);
}
else
{
delete
$jsmemcache
->{Networks}->{
$network
}
}
}
sub
cache_json {
my
(
$gzip
,
$network
,
$server
) =
@_
;
if
(
$server
) {
if
(
$cfg
->{CacheDir}) {
cachedir_write(
$gzip
,
$network
,
$server
);
}
else
{
$jsmemcache
->{Servers}->{
$network
}->{
$server
} = memGunzip(
$gzip
);
}
}
else
{
if
(
$cfg
->{CacheDir}) {
cachedir_write(
$gzip
,
$network
);
}
else
{
$jsmemcache
->{Networks}->{
$network
} = memGunzip(
$gzip
);
}
}
}
sub
restore_cache {
my
(
$gzipped
,
$network
,
$server
) =
@_
;
$gzipped
= 0
if
$gzipped
eq
'json'
;
if
(
$server
) {
if
(
$cfg
->{CacheDir}) {
return
cachedir_read(
$gzipped
,
$network
,
$server
)
}
else
{
return
unless
exists
$jsmemcache
->{Servers}->{
$network
};
my
$cachedjs
=
$jsmemcache
->{Servers}->{
$network
}->{
$server
};
return
memGzip(
$cachedjs
)
if
$cachedjs
and
$gzipped
;
return
$cachedjs
}
}
else
{
if
(
$cfg
->{CacheDir}) {
return
cachedir_read(
$gzipped
,
$network
)
}
else
{
my
$cachedjs
=
$jsmemcache
->{Networks}->{
$network
};
return
memGzip(
$cachedjs
)
if
$cachedjs
and
$gzipped
;
return
$cachedjs
}
}
}
sub
h_error_404 {
my
(
$kernel
,
$heap
) =
@_
[KERNEL, HEAP];
my
(
$req
,
$resp
,
$reason
) =
@_
[ARG0, ARG1, ARG2];
my
$uri
= URI->new(
$req
->uri );
my
$path
=
$uri
->path;
my
$conn
=
$req
->connection;
my
$addr
=
$conn
->remote_host;
log_to(
'http'
,
'info'
,
"$addr 404 $path"
);
$reason
=
"Undefined action: "
.
$req
->uri
unless
$reason
;
$resp
->error(404,
$reason
);
}
sub
h_src {
my
(
$kernel
,
$heap
) =
@_
[KERNEL, HEAP];
my
(
$req
,
$resp
) =
@_
[ARG0, ARG1];
unless
(
$opts
->{showsource}) {
$kernel
->yield(
h_error_404
=>
$req
,
$resp
);
return
}
my
$src
;
seek
(DATA, 0, 0);
{
local
$/;
$src
= <DATA>;
}
$resp
->content_type(
'text/plain'
);
$resp
->content(
$src
);
$resp
->respond;
my
$conn
=
$req
->connection;
my
$addr
=
$conn
->remote_host;
log_to(
'http'
,
'info'
,
"$addr 200 h_src"
);
$resp
->done;
}
sub
h_stats {
my
(
$kernel
,
$heap
) =
@_
[KERNEL, HEAP];
my
(
$req
,
$resp
) =
@_
[ARG0, ARG1];
my
$uri
= URI->new(
$req
->uri);
my
@chunks
=
$uri
->path_segments;
splice
@chunks
, 0, 2;
my
(
$method
) =
@chunks
;
$method
=
'json'
unless
$method
;
if
(
$method
eq
'text'
||
$method
eq
'plain'
) {
my
$n_count
=
$state
->{stats}->{TotalNetworks};
my
$s_count
=
$state
->{stats}->{TotalServers};
$resp
->content_type(
'text/plain'
);
$resp
->content(
"Trawling $s_count servers across $n_count networks\n\n"
.
"Version: $IRC::Indexer::VERSION\n\n"
);
}
else
{
my
@pending
=
keys
%{
$obj
->{active_trawlers} };
my
$ref
= {
ActiveTrawlers
=> \
@pending
, %{
$state
->{stats}} };
my
$json
= JSON::XS->new->utf8(1)->pretty->encode(
$ref
);
$resp
->content_type(
'application/json'
);
$resp
->content(
$json
);
}
$resp
->respond;
my
$conn
=
$req
->connection;
my
$addr
=
$conn
->remote_host;
log_to(
'http'
,
'info'
,
"$addr 200 h_stats"
);
$resp
->done;
}
sub
h_server {
my
(
$kernel
,
$heap
) =
@_
[KERNEL, HEAP];
my
(
$req
,
$resp
) =
@_
[ARG0, ARG1];
my
(
$network
,
$server
) =
@_
[ARG2, ARG3];
my
$zipped
;
unless
(
$zipped
= restore_cache(
'zip'
,
$network
,
$server
) ) {
my
$reason
;
if
(
defined
$cfg
->{Networks}->{
$network
}
&&
defined
$cfg
->{Networks}->{
$network
}->{
$server
}
) {
$reason
=
"PENDING $server ($network) pending trawler run"
;
}
else
{
$reason
=
"NO_SUCH No such server"
;
}
$kernel
->yield(
h_error_404
=>
$req
,
$resp
,
$reason
);
return
}
my
$uri
= URI->new(
$req
->uri);
if
(
$uri
->query &&
$uri
->query =~ /^gzip/i) {
$resp
->content_type(
'application/x-gzip'
);
$resp
->content(
$zipped
);
}
else
{
my
$json
= memGunzip(
$zipped
);
undef
$zipped
;
$resp
->content_type(
'application/json'
);
$resp
->content(
$json
);
}
$resp
->respond;
my
$conn
=
$req
->connection;
my
$addr
=
$conn
->remote_host;
log_to(
'http'
,
'info'
,
"$addr 200 h_server $network $server"
);
$resp
->done;
}
sub
h_servlist {
my
(
$kernel
,
$heap
) =
@_
[KERNEL, HEAP];
my
(
$req
,
$resp
,
$network
) =
@_
[ARG0, ARG1, ARG2];
my
$servlist
= [
keys
%{
$state
->{stats}->{Success}->{
$network
}}];
my
$jsify
= IRC::Indexer::Output::JSON->new(
Input
=>
$servlist
,
);
my
$json
=
$jsify
->
dump
;
$resp
->content_type(
'application/json'
);
$resp
->content(
$json
);
$resp
->respond;
my
$conn
=
$req
->connection;
my
$addr
=
$conn
->remote_host;
log_to(
'http'
,
'info'
,
"$addr 200 h_servlist $network"
);
$resp
->done;
}
sub
h_netlist {
my
(
$kernel
,
$heap
) =
@_
[KERNEL, HEAP];
my
(
$req
,
$resp
) =
@_
[ARG0, ARG1];
my
$netlist
= {
Configured
=> [
keys
%$ArrayConf
],
Trawled
=> [
keys
%{
$state
->{stats}->{Success}} ],
};
my
$jsify
= IRC::Indexer::Output::JSON->new(
Input
=>
$netlist
,
);
my
$json
=
$jsify
->
dump
;
$resp
->content_type(
'application/json'
);
$resp
->content(
$json
);
$resp
->respond;
my
$conn
=
$req
->connection;
my
$addr
=
$conn
->remote_host;
log_to(
'http'
,
'info'
,
"$addr 200 h_netlist"
);
$resp
->done;
}
sub
h_list {
my
(
$kernel
,
$heap
) =
@_
[KERNEL, HEAP];
my
(
$req
,
$resp
) =
@_
[ARG0, ARG1];
my
$jsify
= IRC::Indexer::Output::JSON->new(
Input
=>
$cfg
->{Networks},
);
my
$json
=
$jsify
->
dump
;
$resp
->content_type(
'application/json'
);
$resp
->content(
$json
);
$resp
->respond;
my
$conn
=
$req
->connection;
my
$addr
=
$conn
->remote_host;
log_to(
'http'
,
'info'
,
"$addr 200 h_list"
);
$resp
->done;
}
sub
h_network {
my
(
$kernel
,
$heap
) =
@_
[KERNEL, HEAP];
my
(
$req
,
$resp
) =
@_
[ARG0, ARG1];
my
$uri
= URI->new(
$req
->uri);
my
@chunks
=
$uri
->path_segments;
splice
@chunks
, 0, 2;
my
(
$network
) =
@chunks
;
unless
(
$network
) {
$kernel
->yield(
h_netlist
=>
$req
,
$resp
);
return
}
my
$zipped
;
unless
(
$zipped
= restore_cache(
'zip'
,
$network
) ) {
my
$reason
;
if
(
defined
$cfg
->{Networks}->{
$network
}) {
$reason
=
"PENDING $network pending trawler run"
;
}
else
{
$reason
=
"NO_SUCH No such network"
;
}
$kernel
->yield(
h_error_404
=>
$req
,
$resp
,
$reason
);
return
}
if
(
$chunks
[1] &&
$chunks
[1] eq
"server"
) {
my
$server
=
$chunks
[2];
unless
(
$server
) {
$kernel
->yield(
h_servlist
=>
$req
,
$resp
,
$network
);
return
}
$kernel
->yield(
h_server
=>
$req
,
$resp
,
$network
,
$server
);
return
}
if
(
$uri
->query &&
$uri
->query =~ /^gzip/i) {
$resp
->content_type(
'application/x-gzip'
);
$resp
->content(
$zipped
);
}
else
{
my
$json
= memGunzip(
$zipped
);
undef
$zipped
;
$resp
->content_type(
'application/json'
);
$resp
->content(
$json
);
}
$resp
->respond;
my
$conn
=
$req
->connection;
my
$addr
=
$conn
->remote_host;
log_to(
'http'
,
'info'
,
"$addr 200 h_network $network"
);
$resp
->done;
}
sub
post_request {
++
$state
->{stats}->{HTTP_Responses};
}
sub
js_push_pending {
my
(
$kernel
,
$heap
) =
@_
[KERNEL, HEAP];
my
$item
=
$_
[ARG0];
my
$current_workers
=
scalar
keys
%{
$obj
->{workers_by_pid}//{} };
if
(
$cfg
->{MaxEncoders} &&
$cfg
->{MaxEncoders} <=
$current_workers
) {
++
$state
->{stats}->{ThrottledEncoder};
$kernel
->alarm_add(
'js_push_pending'
, Time::HiRes::
time
+ 0.5,
$item
);
return
}
my
(
$network
) =
@$item
;
log_to(
'session'
,
'debug'
,
"Encoder spawn: $network"
);
my
$perlpath
=
$Config
{perlpath};
if
($^O ne
'VMS'
) {
$perlpath
.=
$Config
{_exe}
unless
$perlpath
=~ m/
$Config
{_exe}$/i;
}
my
$forkable
;
if
($^O eq
'MSWin32'
) {
$forkable
= \
&IRC::Indexer::Process::JSONify::worker
;
}
else
{
$forkable
= [
$perlpath
, (
map
{
"-I$_"
}
@INC
),
'-MIRC::Indexer::Process::JSONify'
,
'-e'
,
'IRC::Indexer::Process::JSONify->worker()'
];
}
my
$wheel
= POE::Wheel::Run->new(
Program
=>
$forkable
,
ErrorEvent
=>
'js_worker_error'
,
StdoutEvent
=>
'js_worker_stdout'
,
StderrEvent
=>
'js_worker_stderr'
,
CloseEvent
=>
'js_worker_closed'
,
StdioFilter
=> POE::Filter::Reference->new(),
);
++
$state
->{stats}->{EncoderWheels}->{Created};
my
$wheelid
=
$wheel
->ID;
my
$pidof
=
$wheel
->PID;
$kernel
->sig_child(
$pidof
,
'js_worker_sigchld'
);
$obj
->{workers_by_pid}->{
$pidof
} =
$wheel
;
$obj
->{workers_by_wid}->{
$wheelid
} =
$wheel
;
$wheel
->put(
$item
);
}
sub
js_worker_stdout {
my
(
$kernel
,
$item
) =
@_
[KERNEL, ARG0];
log_to(
'session'
,
'debug'
,
"Handling input from encoder"
);
my
(
$zip
,
$network
,
$server
) =
splice
@$item
, 0, 4;
unless
(
$network
) {
log_to(
'session'
,
'warn'
,
"Invalid input from encoder!"
);
++
$state
->{stats}->{EncoderWheels}->{BadResult};
return
}
++
$state
->{stats}->{EncoderWheels}->{Success};
if
(
$cfg
->{Networks}->{
$network
}) {
cache_json(
$zip
,
$network
,
$server
);
my
$logstr
=
"Cached JSON for $network"
;
$logstr
.=
" ($server)"
if
$server
;
log_to(
'irc'
,
'debug'
,
$logstr
);
}
else
{
log_to(
'irc'
,
'info'
,
"Discarding JSON for stale network: $network"
);
}
}
sub
js_worker_stderr {
my
(
$kernel
,
$err
,
$id
) =
@_
[KERNEL, ARG0, ARG1];
log_to(
'session'
,
'warn'
,
"Encoder reported err: $err"
);
++
$state
->{stats}->{EncoderWheels}->{Died};
}
sub
js_worker_closed {
my
(
$kernel
,
$wheelid
) =
@_
[KERNEL, ARG0];
my
$wheel
=
delete
$obj
->{workers_by_wid}->{
$wheelid
};
if
(
defined
$wheel
) {
++
$state
->{stats}->{EncoderWheels}->{Closed};
my
$pidof
=
$wheel
->PID;
delete
$obj
->{workers_by_pid}->{
$pidof
};
$wheel
->
kill
(9);
}
}
sub
js_worker_error {
}
sub
js_worker_sigchld {
my
(
$kernel
,
$pidof
) =
@_
[KERNEL, ARG1];
log_to(
'session'
,
'debug'
,
"Worker sig_chld: $pidof"
);
++
$state
->{stats}->{EncoderWheels}->{SIGCHLD};
my
$wheel
;
return
unless
$wheel
=
delete
$obj
->{workers_by_pid}->{
$pidof
};
my
$wheelid
=
$wheel
->ID;
delete
$obj
->{workers_by_wid}->{
$wheelid
};
}
sub
start_indexer {
if
(
$opts
->{detach}) {
print
"Starting detached indexer.\n"
;
my
$fork
=
fork
;
exit
1
if
not
defined
$fork
;
exit
0
if
$fork
;
POSIX::setsid();
$fork
=
fork
;
exit
1
if
not
defined
$fork
;
exit
0
if
$fork
;
open
(STDIN,
'<'
,
'/dev/null'
);
open
(STDOUT,
'>'
,
'/dev/null'
);
open
(STDERR,
'>'
,
'/dev/null'
);
umask
(022);
}
get_cfg();
poco_init_session();
POE::Kernel->run;
}
start_indexer();