package Perinci::Access::Simple::Server::Socket; use 5.010001; use strict; use warnings; use Log::Any '$log'; our $VERSION = '0.15'; # VERSION use Data::Clean::FromJSON; use Data::Clean::JSON; use File::HomeDir; use IO::Handle::Record; # to get peercred() in IO::Socket::UNIX use IO::Select; use IO::Socket qw(:crlf); use IO::Socket::INET; use IO::Socket::UNIX; use JSON; use Perinci::Access; use SHARYANTO::Proc::Daemon::Prefork; use Time::HiRes qw(gettimeofday tv_interval); use URI::Escape; use Moo; my $cleanser = Data::Clean::JSON->get_cleanser; my $cleanserfj = Data::Clean::FromJSON->get_cleanser; has name => ( is => 'rw', default => sub { my $name = $0; $name =~ s!.*/!!; $name; }); has daemonize => (is => 'rw', default=>sub{0}); has pid_path => (is => 'rw'); has scoreboard_path => (is => 'rw'); has error_log_path => (is => 'rw'); has access_log_path => (is => 'rw'); has ports => (is => 'rw', default => sub{[]}); has unix_sockets => (is => 'rw', default => sub{[]}); has timeout => (is => 'rw', default => sub{120}); has require_root => (is => 'rw', default => sub{0}); has max_clients => (is => 'rw', default => sub{150}); has start_servers => (is => 'rw', default => sub{3}); has max_requests_per_child => (is => 'rw', default=>sub{1000}); has _daemon => (is => 'rw'); # SHARYANTO::Proc::Daemon::Prefork has _server_socks => (is => 'rw'); # store server sockets has _pa => ( # Perinci::Access object is => 'rw', default => sub { require Perinci::Access::Perl; require Perinci::Access::Schemeless; Perinci::Access->new( handlers => { pl => Perinci::Access::Perl->new( load => 0, extra_wrapper_convert => { #timeout => 300, }, #use_tx => $self->{use_tx}, #custom_tx_manager => $self->{custom_tx_manager}, ), '' => Perinci::Access::Schemeless->new( load => 0, extra_wrapper_convert => { #timeout => 300, }, #use_tx => $self->{use_tx}, #custom_tx_manager => $self->{custom_tx_manager}, ), } ); }); my $json = JSON->new->allow_nonref; sub BUILD { my ($self) = @_; my $is_root = $> ? 0 : 1; my $log_dir = $is_root ? "/var/log" : File::HomeDir->my_home; my $run_dir = $is_root ? "/var/run" : File::HomeDir->my_home; unless ($self->error_log_path) { $self->error_log_path($log_dir."/".$self->name."-error.log"); } unless (defined $self->access_log_path) { $self->scoreboard_path($log_dir."/".$self->name.".scoreboard"); } unless ($self->access_log_path) { $self->access_log_path($log_dir."/".$self->name."-access.log"); } unless ($self->pid_path) { $self->pid_path($run_dir."/".$self->name.".pid"); } unless ($self->_daemon) { my $daemon = SHARYANTO::Proc::Daemon::Prefork->new( name => $self->name, error_log_path => $self->error_log_path, access_log_path => $self->access_log_path, pid_path => $self->pid_path, scoreboard_path => $self->scoreboard_path || undef, daemonize => $self->daemonize, prefork => $self->start_servers, max_children => $self->max_clients, after_init => sub { $self->_after_init }, main_loop => sub { $self->_main_loop }, require_root => $self->require_root, # currently auto reloading is turned off ); $self->_daemon($daemon); } } sub DESTROY { my $self = shift; my $socks = $self->unix_sockets; if (defined($socks)) { for my $sock (@$socks) { unlink $sock; } } } sub run { my ($self) = @_; $self->_daemon->run; } # alias for run() sub start { my $self = shift; $self->run(@_); } sub stop { my ($self) = @_; $self->_daemon->kill_running; } sub restart { my ($self) = @_; $self->_daemon->kill_running; $self->_daemon->run; } sub is_running { my ($self) = @_; my $pid = $self->_daemon->check_pidfile; $pid ? 1:0; } sub _after_init { my ($self) = @_; my @server_socks; my @server_sock_infos; my $ary; $ary = $self->unix_sockets; if (defined($ary) && ref($ary) ne 'ARRAY') { $ary = [split /\s*,\s*/,$ary] } $self->unix_sockets($ary); for my $path (@$ary) { my %args; $args{Listen} = 1; $args{Timeout} = $self->timeout; $args{Local} = $path; $log->infof("Binding to Unix socket %s ...", $path); my $sock = IO::Socket::UNIX->new(%args); die "Unable to bind to Unix socket $path: $@" unless $sock; push @server_socks, $sock; push @server_sock_infos, "$path (unix)"; } $ary = $self->ports; if (defined($ary) && ref($ary) ne 'ARRAY') { $ary = [split /\s*,\s*/,$ary] } $self->ports($ary); for my $port (@$ary) { my %args; $args{Listen} = 1; $args{Reuse} = 1; $args{Timeout} = $self->timeout; if ($port =~ /^(?:0\.0\.0\.0|\*)?:?(\d+)$/) { $args{LocalPort} = $1; } elsif ($port =~ /^([^:]+):(\d+)$/) { $args{LocalHost} = $1; $args{LocalPort} = $2; } else { die "Invalid port syntax `$port`, please specify ". ":N or 1.2.3.4:N"; } $log->infof("Binding to TCP socket %s ...", $port); my $sock = IO::Socket::INET->new(%args); die "Unable to bind to TCP socket $port" unless $sock; push @server_socks, $sock; push @server_sock_infos, "$port (tcp)"; } die "Please specify at least one port or Unix socket" unless @server_socks; $self->_server_socks(\@server_socks); warn "Will be binding to ".join(", ", @server_sock_infos)."\n"; $self->before_prefork(); } sub before_prefork {} sub _main_loop { my ($self) = @_; if ($self->_daemon->{parent_pid} == $$) { $log->info("Entering main loop"); } else { $log->info("Child process started (PID $$)"); } $self->_daemon->update_scoreboard({child_start_time=>time()}); my $sel = IO::Select->new(@{ $self->_server_socks }); for (my $i=1; $i<=$self->max_requests_per_child; $i++) { $self->_daemon->set_label("listening"); my @ready = $sel->can_read(); SOCK: for my $s (@ready) { my $sock = $s->accept(); # sock can be undef next unless $sock; $self->{_connect_time} = [gettimeofday]; $self->_set_label_serving($sock); my $timeout = ${*$sock}{'io_socket_timeout'}; my $fdset = ""; vec($fdset, $sock->fileno, 1) = 1; my $buf = ""; REQ: while (1) { $self->_daemon->update_scoreboard({ req_start_time => time(), num_reqs => $i, state => "R", }); $self->{_start_req_time} = [gettimeofday]; READ_REQ: while (1) { # loop until we have the whole request in $buf $buf =~ s/\A(?:\015?\012)+//; # ignore leading blank lines #$log->tracef("buf=%s (%d bytes)", $buf, length($buf)); if ($buf =~ /\Aj(.*)\015?\012/) { $self->{_req_json} = $1; $log->tracef("Received JSON from client: %s", $self->{_req_json}); $buf = substr($buf, 1+length($1)); last READ_REQ; } elsif ($buf =~ /\AJ(\d+)(\015?\012)(.+)\015?\012/s && length($3) >= $1) { $self->{_req_json} = substr($3, 0, $1); $log->tracef("Received JSON from client: %s", $self->{_req_json}); $buf = substr($buf, 1+length($1)+length($2)+length($3)); last READ_REQ; } elsif ($buf =~ /\A([^jJ].*)/) { $log->tracef("Received from client: %s", $1); $self->{_finish_req_time} = [gettimeofday]; $self->{_res} = [400, "Invalid request line"]; $buf =~ s/\A.+//; goto FINISH_REQ; } last REQ unless $self->_need_more( $sock, $buf, $timeout, $fdset); } $self->{_finish_req_time} = [gettimeofday]; eval { $self->{_req} = $json->decode($self->{_req_json}); $cleanserfj->clean_in_place($self->{_req}); }; my $e = $@; if ($e) { $self->{_res} = [400, "Invalid JSON ($e)"]; goto FINISH_REQ; } if (ref($self->{_req}) ne 'HASH') { $self->{_res} = [400, "Invalid request (not hash)"]; goto FINISH_REQ; } RES: $self->{_start_res_time} = [gettimeofday]; $self->{_res} = $self->_pa->request( $self->{_req}{action} => $self->{_req}{uri}, $self->{_req}); $self->{_finish_res_time} = [gettimeofday]; FINISH_REQ: $self->_daemon->update_scoreboard({state => "W"}); $cleanser->clean_in_place($self->{_res}); eval { $self->{_res_json} = $json->encode($self->{_res}) }; $e = $@; if ($e) { $self->{_res} = [500, "Can't encode result in JSON: $e"]; $self->{_res_json} = $json->encode($self->{_res}); } $self->_write_sock($sock, "J" . length($self->{_res_json}). "\015\012"); $self->_write_sock($sock, $self->{_res_json}); $self->_write_sock($sock, "\015\012"); $self->access_log($sock); $self->_daemon->update_scoreboard({state => "_"}); } $sock->close; } } } sub _need_more { my $self = shift; my $sock = shift; #my($buf,$timeout,$fdset) = @_; if ($_[1]) { my($timeout, $fdset) = @_[1,2]; #print STDERR "select(,,,$timeout)\n" if $DEBUG; my $n = select($fdset,undef,undef,$timeout); unless ($n) { #$self->reason(defined($n) ? "Timeout" : "select: $!"); return; } } #print STDERR "sysread()\n" if $DEBUG; my $n = sysread($sock, $_[0], 2048, length($_[0])); #$self->reason(defined($n) ? "Client closed" : "sysread: $!") unless $n; $n; } sub _write_sock { my ($self, $sock, $buffer) = @_; $log->tracef("Sending to client: %s", $buffer); # large $buffer might need to be written in several steps, especially in # SSL sockets which might have smaller buffer size (like 16k) my $tot_written = 0; while (1) { my $written = $sock->syswrite( $buffer, length($buffer)-$tot_written, $tot_written); # XXX what to do on error, i.e. $written is undef? $tot_written += $written; last unless $tot_written < length($buffer); } } sub _set_label_serving { my ($self, $sock) = @_; # sock can be undef when client timed out return unless $sock; my $is_unix = $sock->isa('IO::Socket::UNIX'); if ($is_unix) { my $sock_path = $sock->hostpath; $self->{_sock_peer} = $sock_path; my ($pid, $uid, $gid) = $sock->peercred; $log->trace("Unix socket info: path=$sock_path, ". "pid=$pid, uid=$uid, gid=$gid"); $self->_daemon->set_label("serving unix (pid=$pid, uid=$uid, ". "path=$sock_path)"); } else { my $server_port = $sock->sockport; my $remote_ip = $sock->peerhost // "127.0.0.1"; my $remote_port = $sock->peerport; $self->{_sock_peer} = "$remote_ip:$remote_port"; if ($log->is_trace) { $log->trace(join("", "TCP socket info: ", "server_port=$server_port, ", "remote_ip=$remote_ip, ", "remote_port=$remote_port")); } $self->_daemon->set_label("serving TCP :$server_port (". "remote=$remote_ip:$remote_port)"); } } sub __safe { my $string = shift; $string =~ s/([^[:print:]])/"\\x" . unpack("H*", $1)/eg if defined $string; $string; } sub access_log { my ($self, $sock) = @_; return unless $self->access_log_path; my $max_args_len = 1000; my $max_resp_len = 1000; #my $reqh = $req->headers; #if ($log->is_trace) { #$log->tracef("\$self->{sock_peerhost}=%s, (gmtime(\$self->{_finish_req_time}))[0]=%s, \$req->method=%s, \$req->uri->as_string=%s, \$self->{_res_status}=%s, \$self->{res_content_length}=%s, ". # "\$reqh->header('referer')=%s, \$reqh->header('user-agent')=%s", # $self->{_sock_peer}, # (gmtime($self->{_finish_req_time}))[0], # $req->method, # $req->uri->as_string, # $self->{_res_status}, # $self->{_res_content_length}, # scalar($reqh->header("referer")), # scalar($reqh->header("user-agent")), # ); #} my $time = POSIX::strftime("%d/%b/%Y:%H:%M:%S +0000", gmtime($self->{_start_req_time})); $self->{_req} //= {}; my ($args_s, $args_len, $args_partial); # args_partial = bool if ($self->{_req}{args}) { $args_s = $json->encode($self->{_req}{args}); $args_len = length($args_s); $args_partial = $args_len > $max_args_len; $args_s = substr($args_s, 0, $self->max_args_len) if $args_partial; } else { $args_s = ""; $args_len = 0; $args_partial = 0; } my $reqt = sprintf("%.3fms", 1000*tv_interval( $self->{_start_req_time}, $self->{_finish_req_time})); if (!$self->{_res}) { warn "BUG: No response generated"; $self->{_res} = {}; } my ($res_len, $res_partial); # res_partial = undef or partial res $res_len = length($self->{_res_json}); if ($res_len > $max_resp_len) { $res_partial = substr($self->{_res_json}, 0, $max_resp_len); } my $rest; if ($self->{_finish_res_time}) { $rest = sprintf("%.3fms", 1000*tv_interval($self->{_start_res_time}, $self->{_finish_res_time})); } else { $rest = "-"; } my $fmt = join( "", "%s ", # client addr "- ", # XXX auth user "[%s] ", # time "\"%s %s\" ", # riap action + URI "[args %s %s] %s ", # args + reqt "[res %s %s] %s ", # res + rest "%s", # extra info "\n" ); my $log_line = sprintf( $fmt, $self->{_sock_peer} // "?", $time, __safe($self->{_req}{action} // "-"), __safe($self->{_req}{uri} // "-"), $args_len.($args_partial ? "p" : ""), $args_s, $reqt, $res_len.(defined($res_partial) ? "p" : ""), $res_partial // $self->{_res_json}, $rest, $self->{_extra} // "", ); #$log->tracef("Riap access log: %s", $log_line); if ($self->daemonize) { syswrite($self->_daemon->{_access_log}, $log_line); } else { warn $log_line; } } 1; # ABSTRACT: Implement Riap::Simple server over sockets __END__ =pod =encoding UTF-8 =head1 NAME Perinci::Access::Simple::Server::Socket - Implement Riap::Simple server over sockets =head1 VERSION version 0.15 =head1 SYNOPSIS #!/usr/bin/perl use Perinci::Access::Simple::Server::Socket; my $server = Perinci::Access::Simple::Server::Socket->new( ports => ['127.0.0.1:5678'], # default none unix_sockets => ['/var/run/riap-simple.sock'], # default none #start_servers => 0, # default 3, 0=don't prefork #max_clients => 0, # default 3, 0=don't prefork #max_requests_per_child => 100, # default 1000 #daemonize => 1, # default 0 ); $server->run; Or use using the included peri-sockserve script: % peri-sockserve -p 127.0.0.1:5678 -s /path/to/unix/sock Foo::Bar Baz::* =head1 DESCRIPTION This module implements L<Riap::Simple> server over sockets. It features preforking, multiple interface and Unix sockets. This module uses L<Log::Any> for logging. This module uses L<Moo> for object system. =head1 FUNCTIONS None are exported by default, but they are exportable. =head1 ATTRIBUTES =head2 name => STR (default is basename of $0) Name of server, for display in process table ('ps ax'). =head2 daemonize => BOOL (default 0) Whether to daemonize (go into background). =head2 ports => ARRAY OF STR (default []) One or more TCP ports to listen to. Default is none. Each port can be in the form of N, ":N", "0.0.0.0:N" (all means the same thing, to bind to all interfaces) or "1.2.3.4:N" (to bind to a specific network interface). A string is also accepted, it will be split (delimiter ,) beforehand. Since server does not support any encryption, it is recommended to bind to localhost (127.0.0.1). =head2 unix_sockets => ARRAY OF STR (default []) Location of Unix sockets. Default is none, which means not listening to Unix socket. Each element should be an absolute path. A string is also accepted, it will be split (delimiter ,) beforehand. You must at least specify one port or one Unix socket, or server will refuse to run. =head2 timeout => BOOL (default 120) Socket timeout. Will be passed to IO::Socket. =head2 require_root => BOOL (default 0) Whether to require running as root. Passed to L<SHARYANTO::Proc::Daemon::Prefork>'s constructor. =head2 pid_path => STR (default /var/run/<name>.pid or ~/<name>.pid) Location of PID file. =head2 scoreboard_path => STR (default /var/run/<name>.scoreboard or ~/<name>.scoreboard) Location of scoreboard file (used for communication between parent and child processes). If you disable this (by setting scoreboard_path => 0), autoadjusting number of children won't work (number of children will be kept at 'start_servers'). =head2 error_log_path => STR (default /var/log/<name>-error.log or ~/<name>-error.log) Location of error log. Default is /var/log/<name>-error.log. It will be opened in append mode. =head2 access_log_path => STR (default /var/log/<name>-access.log or ~/<name>-access.log) Location of access log. It will be opened in append mode. =head2 start_servers => INT (default 3) Number of children to fork at the start of run. If you set this to 0, the server becomes a nonforking one. Tip: You can set start_servers to 0 and 'daemonize' to false for debugging. =head2 max_clients => INT (default 150) Maximum number of children processes to maintain. If server is busy, number of children will be increased from the original 'start_servers' up until this value. =head2 max_requests_per_child => INT (default 1000) Number of requests each child will serve until it exists. =head1 METHODS =for Pod::Coverage BUILD =head2 $server = Perinci::Access::Simple::Server::Socket->new(%args) Create a new instance of server. %args can be used to set attributes. =head2 $server->run() Run server. =head2 $server->start() Alias for run(). =head2 $server->stop() Stop running server. =head2 $server->restart() Restart server. =head2 $server->is_running() => BOOL Check whether server is running. =head2 $server->before_prefork() This is a hook provided for subclasses to do something before the daemon is preforking. For example, you can preload Perl modules here so that each child doesn't have to load modules separately (= inefficient). =head2 $server->access_log($sock) Write access log entry. =head1 SEE ALSO L<Riap::Simple>, L<Riap>, L<Rinci> L<peri-sockserve>, simple command-line interface for this module. L<Perinci::Access::Simple::Client>, L<Perinci::Access> =head1 HOMEPAGE Please visit the project's homepage at L<https://metacpan.org/release/Perinci-Access-Simple-Server>. =head1 SOURCE Source repository is at L<https://github.com/sharyanto/perl-Perinci-Access-Simple-Server>. =head1 BUGS Please report any bugs or feature requests on the bugtracker website L<https://rt.cpan.org/Public/Dist/Display.html?Name=Perinci-Access-Simple-Server> When submitting a bug or request, please include a test-file or a patch to an existing test-file that illustrates the bug or desired feature. =head1 AUTHOR Steven Haryanto <stevenharyanto@gmail.com> =head1 COPYRIGHT AND LICENSE This software is copyright (c) 2013 by Steven Haryanto. This is free software; you can redistribute it and/or modify it under the same terms as the Perl 5 programming language system itself. =cut