our
$DATE
=
'2021-08-01'
;
our
$VERSION
=
'0.290'
;
use
5.010001;
my
$cleanser
= Data::Clean::JSON->get_cleanser;
my
$cleanserfj
= Data::Clean::FromJSON->get_cleanser;
has
name
=> (
is
=>
'rw'
,
default
=>
sub
{
my
$name
= $0;
$name
=~ s!.*/!!;
$name
;
});
has
daemonize
=> (
is
=>
'rw'
,
default
=>
sub
{0});
has
pid_path
=> (
is
=>
'rw'
);
has
scoreboard_path
=> (
is
=>
'rw'
);
has
error_log_path
=> (
is
=>
'rw'
);
has
access_log_path
=> (
is
=>
'rw'
);
has
ports
=> (
is
=>
'rw'
,
default
=>
sub
{[]});
has
unix_sockets
=> (
is
=>
'rw'
,
default
=>
sub
{[]});
has
timeout
=> (
is
=>
'rw'
,
default
=>
sub
{120});
has
require_root
=> (
is
=>
'rw'
,
default
=>
sub
{0});
has
max_clients
=> (
is
=>
'rw'
,
default
=>
sub
{150});
has
start_servers
=> (
is
=>
'rw'
,
default
=>
sub
{3});
has
max_requests_per_child
=> (
is
=>
'rw'
,
default
=>
sub
{1000});
has
_daemon
=> (
is
=>
'rw'
);
has
_server_socks
=> (
is
=>
'rw'
);
has
riap_client
=> (
is
=>
'rw'
,
default
=>
sub
{
Perinci::Access->new(
handlers
=> {
pl
=> Perinci::Access::Perl->new(
load
=> 0,
set_function_properties
=> {
},
),
''
=> Perinci::Access::Schemeless->new(
load
=> 0,
set_function_properties
=> {
},
),
}
);
});
my
$json
= JSON::MaybeXS->new->allow_nonref;
sub
BUILD {
my
(
$self
) =
@_
;
my
$is_root
= $> ? 0 : 1;
my
$log_dir
=
$is_root
?
"/var/log"
: File::HomeDir->my_home;
my
$run_dir
=
$is_root
?
"/var/run"
: File::HomeDir->my_home;
unless
(
$self
->error_log_path) {
$self
->error_log_path(
$log_dir
.
"/"
.
$self
->name.
"-error.log"
);
}
unless
(
defined
$self
->access_log_path) {
$self
->scoreboard_path(
$log_dir
.
"/"
.
$self
->name.
".scoreboard"
);
}
unless
(
$self
->access_log_path) {
$self
->access_log_path(
$log_dir
.
"/"
.
$self
->name.
"-access.log"
);
}
unless
(
$self
->pid_path) {
$self
->pid_path(
$run_dir
.
"/"
.
$self
->name.
".pid"
);
}
unless
(
$self
->_daemon) {
my
$daemon
= Proc::Daemon::Prefork->new(
name
=>
$self
->name,
error_log_path
=>
$self
->error_log_path,
access_log_path
=>
$self
->access_log_path,
pid_path
=>
$self
->pid_path,
scoreboard_path
=>
$self
->scoreboard_path ||
undef
,
daemonize
=>
$self
->daemonize,
prefork
=>
$self
->start_servers,
max_children
=>
$self
->max_clients,
after_init
=>
sub
{
$self
->_after_init },
main_loop
=>
sub
{
$self
->_main_loop },
require_root
=>
$self
->require_root,
);
$self
->_daemon(
$daemon
);
}
}
sub
DESTROY {
my
$self
=
shift
;
my
$socks
=
$self
->unix_sockets;
if
(
defined
(
$socks
)) {
for
my
$sock
(
@$socks
) {
unlink
$sock
;
}
}
}
sub
run {
my
(
$self
) =
@_
;
$self
->_daemon->run;
}
sub
start {
my
$self
=
shift
;
$self
->run(
@_
);
}
sub
stop {
my
(
$self
) =
@_
;
$self
->_daemon->kill_running;
}
sub
restart {
my
(
$self
) =
@_
;
$self
->_daemon->kill_running;
$self
->_daemon->run;
}
sub
is_running {
my
(
$self
) =
@_
;
my
$pid
=
$self
->_daemon->check_pidfile;
$pid
? 1:0;
}
sub
_after_init {
my
(
$self
) =
@_
;
my
@server_socks
;
my
@server_sock_infos
;
my
$ary
;
$ary
=
$self
->unix_sockets;
if
(
defined
(
$ary
) &&
ref
(
$ary
) ne
'ARRAY'
) {
$ary
= [
split
/\s*,\s*/,
$ary
] }
$self
->unix_sockets(
$ary
);
for
my
$path
(
@$ary
) {
my
%args
;
$args
{Listen} = 1;
$args
{Timeout} =
$self
->timeout;
$args
{Local} =
$path
;
log_info(
"Binding to Unix socket %s ..."
,
$path
);
my
$sock
= IO::Socket::UNIX->new(
%args
);
die
"Unable to bind to Unix socket $path: $@"
unless
$sock
;
push
@server_socks
,
$sock
;
push
@server_sock_infos
,
"$path (unix)"
;
}
$ary
=
$self
->ports;
if
(
defined
(
$ary
) &&
ref
(
$ary
) ne
'ARRAY'
) {
$ary
= [
split
/\s*,\s*/,
$ary
] }
$self
->ports(
$ary
);
for
my
$port
(
@$ary
) {
my
%args
;
$args
{Listen} = 1;
$args
{Reuse} = 1;
$args
{Timeout} =
$self
->timeout;
if
(
$port
=~ /^(?:0\.0\.0\.0|\*)?:?(\d+)$/) {
$args
{LocalPort} = $1;
}
elsif
(
$port
=~ /^([^:]+):(\d+)$/) {
$args
{LocalHost} = $1;
$args
{LocalPort} = $2;
}
else
{
die
"Invalid port syntax `$port`, please specify "
.
":N or 1.2.3.4:N"
;
}
log_info(
"Binding to TCP socket %s ..."
,
$port
);
my
$sock
= IO::Socket::INET->new(
%args
);
die
"Unable to bind to TCP socket $port"
unless
$sock
;
push
@server_socks
,
$sock
;
push
@server_sock_infos
,
"$port (tcp)"
;
}
die
"Please specify at least one port or Unix socket"
unless
@server_socks
;
$self
->_server_socks(\
@server_socks
);
warn
"Will be binding to "
.
join
(
", "
,
@server_sock_infos
).
"\n"
;
$self
->before_prefork();
}
sub
before_prefork {}
sub
_main_loop {
my
(
$self
) =
@_
;
if
(
$self
->_daemon->{parent_pid} == $$) {
log_info(
"Entering main loop"
);
}
else
{
log_info(
"Child process started (PID $$)"
);
}
$self
->_daemon->update_scoreboard({
child_start_time
=>
time
()});
my
$sel
= IO::Select->new(@{
$self
->_server_socks });
CONN:
for
(
my
$i
=1;
$i
<=
$self
->max_requests_per_child;
$i
++) {
$self
->_daemon->set_label(
"listening"
);
my
@ready
=
$sel
->can_read();
SOCK:
for
my
$s
(
@ready
) {
my
$sock
=
$s
->
accept
();
next
unless
$sock
;
$self
->{_connect_time} =
time
();
$self
->_set_label_serving(
$sock
);
my
$timeout
= ${
*$sock
}{
'io_socket_timeout'
};
my
$fdset
=
""
;
vec
(
$fdset
,
$sock
->
fileno
, 1) = 1;
my
$last_child
= 0;
REQ:
while
(1) {
$self
->_daemon->update_scoreboard({
req_start_time
=>
time
(),
num_reqs
=>
$i
,
state
=>
"R"
,
});
$self
->{_start_req_time} =
time
();
my
$buf
=
$self
->_sysreadline(
$sock
,
$timeout
,
$fdset
);
last
CONN
unless
defined
$buf
;
$self
->{_finish_req_time} =
time
();
log_trace(
"Received line from client: %s"
,
$buf
);
if
(
$buf
=~ /\Aj(.*)\015?\012/) {
$self
->{_req_json} = $1;
}
else
{
$self
->{_res} = [400,
"Invalid request line"
];
$last_child
++;
goto
FINISH_REQ;
}
eval
{
$self
->{_req} =
$json
->decode(
$self
->{_req_json});
$cleanserfj
->clone_and_clean(
$self
->{_req});
decode_args_in_riap_req(
$self
->{_req});
};
my
$e
= $@;
if
(
$e
) {
$self
->{_res} = [400,
"Invalid JSON ($e)"
];
goto
FINISH_REQ;
}
if
(
ref
(
$self
->{_req}) ne
'HASH'
) {
$self
->{_res} = [400,
"Invalid request (not hash)"
];
goto
FINISH_REQ;
}
RES:
$self
->{_start_res_time} =
time
();
$self
->{_res} =
$self
->riap_client->request(
$self
->{_req}{action} =>
$self
->{_req}{uri},
$self
->{_req});
$self
->{_finish_res_time} =
time
();
FINISH_REQ:
$self
->_daemon->update_scoreboard({
state
=>
"W"
});
insert_riap_stuffs_to_res(
$self
->{_res},
$self
->{_req}{v});
$self
->{_res} =
$cleanser
->clone_and_clean(
$self
->{_res});
eval
{
$self
->{_res_json} =
$json
->encode(
$self
->{_res}) };
$e
= $@;
if
(
$e
) {
$self
->{_res} = [500,
"Can't encode result in JSON: $e"
];
$self
->{_res_json} =
$json
->encode(
$self
->{_res});
}
$self
->_write_sock(
$sock
,
"j"
.
$self
->{_res_json}.
"\015\012"
);
$self
->access_log(
$sock
);
$self
->_daemon->update_scoreboard({
state
=>
"_"
});
last
CONN
if
$last_child
;
}
$sock
->
close
;
}
}
}
sub
_sysreadline {
my
(
$self
,
$sock
,
$timeout
,
$fdset
) =
@_
;
if
(
$timeout
) {
my
$n
=
select
(
$fdset
,
undef
,
undef
,
$timeout
);
unless
(
$n
) {
return
undef
;
}
}
my
$buf
=
""
;
while
(1) {
my
$n
=
sysread
(
$sock
,
$buf
, 2048,
length
(
$buf
));
return
$buf
if
$buf
=~ /\012/ || !
$n
;
}
}
sub
_write_sock {
my
(
$self
,
$sock
,
$buffer
) =
@_
;
log_trace(
"Sending to client: %s"
,
$buffer
);
my
$tot_written
= 0;
while
(1) {
my
$written
=
$sock
->
syswrite
(
$buffer
,
length
(
$buffer
)-
$tot_written
,
$tot_written
);
$tot_written
+=
$written
;
last
unless
$tot_written
<
length
(
$buffer
);
}
}
sub
_set_label_serving {
my
(
$self
,
$sock
) =
@_
;
return
unless
$sock
;
my
$is_unix
=
$sock
->isa(
'IO::Socket::UNIX'
);
if
(
$is_unix
) {
my
$sock_path
=
$sock
->hostpath;
$self
->{_sock_peer} =
$sock_path
;
my
(
$pid
,
$uid
,
$gid
) =
$sock
->peercred;
log_trace(
"Unix socket info: path=$sock_path, "
.
"pid=$pid, uid=$uid, gid=$gid"
);
$self
->_daemon->set_label(
"serving unix (pid=$pid, uid=$uid, "
.
"path=$sock_path)"
);
}
else
{
my
$server_port
=
$sock
->sockport;
my
$remote_ip
=
$sock
->peerhost //
"127.0.0.1"
;
my
$remote_port
=
$sock
->peerport;
$self
->{_sock_peer} =
"$remote_ip:$remote_port"
;
if
(log_is_trace) {
log_trace(
join
(
""
,
"TCP socket info: "
,
"server_port=$server_port, "
,
"remote_ip=$remote_ip, "
,
"remote_port=$remote_port"
));
}
$self
->_daemon->set_label(
"serving TCP :$server_port ("
.
"remote=$remote_ip:$remote_port)"
);
}
}
sub
__safe {
my
$string
=
shift
;
$string
=~ s/([^[:
print
:]])/
"\\x"
.
unpack
(
"H*"
, $1)/eg
if
defined
$string
;
$string
;
}
sub
access_log {
my
(
$self
,
$sock
) =
@_
;
return
unless
$self
->access_log_path;
my
$max_args_len
= 1000;
my
$max_resp_len
= 1000;
my
$time
= POSIX::strftime(
"%d/%b/%Y:%H:%M:%S +0000"
,
gmtime
(
$self
->{_start_req_time}));
$self
->{_req} //= {};
my
(
$args_s
,
$args_len
,
$args_partial
);
if
(
$self
->{_req}{args}) {
$args_s
=
$json
->encode(
$self
->{_req}{args});
$args_len
=
length
(
$args_s
);
$args_partial
=
$args_len
>
$max_args_len
;
$args_s
=
substr
(
$args_s
, 0,
$self
->max_args_len)
if
$args_partial
;
}
else
{
$args_s
=
""
;
$args_len
= 0;
$args_partial
= 0;
}
my
$reqt
=
sprintf
(
"%.3fms"
,
1000*(
$self
->{_finish_req_time}-
$self
->{_start_req_time}));
if
(!
$self
->{_res}) {
warn
"BUG: No response generated"
;
$self
->{_res} = {};
}
my
(
$res_len
,
$res_partial
);
$res_len
=
length
(
$self
->{_res_json});
if
(
$res_len
>
$max_resp_len
) {
$res_partial
=
substr
(
$self
->{_res_json}, 0,
$max_resp_len
);
}
my
$rest
;
if
(
$self
->{_finish_res_time}) {
$rest
=
sprintf
(
"%.3fms"
,
1000*(
$self
->{_finish_res_time}-
$self
->{_start_res_time}));
}
else
{
$rest
=
"-"
;
}
my
$fmt
=
join
(
""
,
"%s "
,
"- "
,
"[%s] "
,
"\"%s %s\" "
,
"[args %s %s] %s "
,
"[res %s %s] %s "
,
"%s"
,
"\n"
);
my
$log_line
=
sprintf
(
$fmt
,
$self
->{_sock_peer} //
"?"
,
$time
,
__safe(
$self
->{_req}{action} //
"-"
),
__safe(
$self
->{_req}{uri} //
"-"
),
$args_len
.(
$args_partial
?
"p"
:
""
),
$args_s
,
$reqt
,
$res_len
.(
defined
(
$res_partial
) ?
"p"
:
""
),
$res_partial
//
$self
->{_res_json},
$rest
,
$self
->{_extra} //
""
,
);
if
(
$self
->daemonize) {
syswrite
(
$self
->_daemon->{_access_log},
$log_line
);
}
else
{
warn
$log_line
;
}
}
1;
Hide Show 192 lines of Pod