# @(#)$Id: IPC.pm 1181 2012-04-17 19:06:07Z pjf $ package CatalystX::Usul::IPC; use strict; use warnings; use version; our $VERSION = qv( sprintf '0.7.%d', q$Rev: 1181 $ =~ /\d+/gmx ); use CatalystX::Usul::Constants; use CatalystX::Usul::Functions qw(arg_list is_arrayref strip_leader throw); use CatalystX::Usul::IPC::Response; use CatalystX::Usul::Table; use CatalystX::Usul::Time; use English qw(-no_match_vars); use File::Spec; use IO::Handle; use IO::Select; use IPC::Open3; use Module::Load::Conditional qw(can_load); use POSIX qw(WIFEXITED WNOHANG); use TryCatch; # requires qw(io); our ($ERROR, $WAITEDPID); my $SPECIAL_CHARS = do { my $x = join NUL, qw(< > | &); qr{ ([$x]) }mx }; sub child_list { my ($self, $pid, $procs) = @_; my @pids = (); unless (defined $procs) { my $ppt = $self->_new_proc_process_table; $procs = { map { $_->pid => $_->ppid } @{ $ppt->table } }; } if (exists $procs->{ $pid }) { for my $child (grep { $procs->{ $_ } == $pid } keys %{ $procs }) { push @pids, $self->child_list( $child, $procs ); # Recurse } push @pids, $pid; } return @pids; } sub popen { my ($self, $cmd, @rest) = @_; my ($e, $pid, @ready); $cmd or throw 'Command not specified'; is_arrayref $cmd and $cmd = join SPC, @{ $cmd }; my $args = arg_list @rest; my $err = IO::Handle->new(); my $out = IO::Handle->new(); my $in = IO::Handle->new(); my $res = CatalystX::Usul::IPC::Response->new(); { local ($CHILD_ERROR, $ERRNO, $WAITEDPID); local $ERROR = FALSE; try { local $SIG{CHLD} = \&__handler; local $SIG{PIPE} = \&__cleaner; $pid = open3( $in, $out, $err, $cmd ); for my $line (@{ $args->{in} || [] }) { print {$in} $line or throw error => 'IO error [_1]', args =>[ $ERRNO ]; } $in->close; } catch ($error) { $e = $error } not $e and $e = $ERROR and $e .= " - whilst executing ${cmd}"; } if ($e) { $err->close; $out->close; throw $e } my $selector = IO::Select->new(); $selector->add( $err, $out ); while (@ready = $selector->can_read) { for my $fh (@ready) { if (fileno $fh == fileno $err) { $e = __read_all_from( $fh ) } else { $res->out( $res->stdout( __read_all_from( $fh ) ) ) } if ($fh->eof) { $selector->remove( $fh ); $fh->close } } } waitpid $pid, 0; $e and throw $e; return $res; } sub process_exists { my ($self, @rest) = @_; my ($io, $file); my $args = arg_list @rest; my $pid = $args->{pid}; if ($file = $args->{file} and $io = $self->io( $file ) and $io->is_file) { $pid = $io->chomp->lock->getline; } (not $pid or $pid !~ m{ \d+ }mx) and return FALSE; return CORE::kill 0, $pid ? TRUE : FALSE; } sub process_table { my ($self, @rest) = @_; my $args = arg_list @rest; my $pat = $args->{pattern}; my $ptype = $args->{type }; my $user = $args->{user }; my $ppt = $self->_new_proc_process_table; my $has = { map { $_ => TRUE } $ppt->fields }; my $table = $self->_new_process_table(); my $count = 0; $table->values( [] ); if ($ptype == 3) { my %procs = map { $_->pid => $_ } @{ $ppt->table }; my @pids = $self->_list_pids_by_file_system( $args->{fsystem} ); for my $p (grep { defined } map { $procs{ $_ } } @pids) { push @{ $table->values }, $self->_set_fields( $has, $p ); $count++; } } else { for my $p (@{ $ppt->table }) { if ( ($ptype == 1 and __proc_belongs_to_user( $p->uid, $user )) or ($ptype == 2 and __cmd_matches_pattern( $p->cmndline, $pat ))){ push @{ $table->values }, $self->_set_fields( $has, $p ); $count++; } } } @{ $table->values } = sort { __pscomp( $a, $b ) } @{ $table->values }; $table->count( $count ); return $table; } sub run_cmd { my ($self, $cmd, @rest) = @_; $cmd or throw 'Command not specified'; if (is_arrayref $cmd) { if (can_load( modules => { 'IPC::Run' => q(0.84) } )) { return $self->_run_cmd_using_ipc_run( $cmd, @rest ); } $cmd = join SPC, @{ $cmd }; } return $self->_run_cmd_using_system( $cmd, @rest ); } sub signal_process { my ($self, $flag, $sig, $pids) = @_; my $opts = []; $sig and push @{ $opts }, q(-o), "sig=${sig}"; $flag and push @{ $opts }, q(-o), q(flag=one); return $self->run_cmd( [ $self->suid, qw(-nc signal_process), @{ $opts }, q(--), @{ $pids || [] } ] ); } sub signal_process_as_root { my ($self, @rest) = @_; my ($file, $io); my $args = arg_list @rest; my $sig = $args->{sig } || q(TERM); my $pids = $args->{pids} || []; $args->{pid} and push @{ $pids }, $args->{pid}; if ($file = $args->{file} and $io = $self->io( $file ) and $io->is_file) { push @{ $pids }, $io->chomp->lock->getlines; $sig eq q(TERM) and unlink $file; } (defined $pids->[0] and $pids->[0] =~ m{ \d+ }mx) or throw 'Process id bad'; for my $mpid (@{ $pids }) { if (exists $args->{flag} and $args->{flag} =~ m{ one }imx) { CORE::kill $sig, $mpid; next; } my @pids = reverse $self->child_list( $mpid ); CORE::kill $sig, $_ for (@pids); $args->{force} or next; sleep 3; @pids = reverse $self->child_list( $mpid ); CORE::kill q(KILL), $_ for (@pids); } return OK; } # Private methods sub _list_pids_by_file_system { my ($self, $fsystem) = @_; $fsystem or return (); my $args = { err => q(null), expected_rv => 1 }; # TODO: Make fuser OS dependent my $data = $self->run_cmd( "fuser ${fsystem}", $args )->out || NUL; $data =~ s{ [^0-9\s] }{}gmx; $data =~ s{ \s+ }{ }gmx; return sort { $a <=> $b } grep { defined && length } split SPC, $data; } sub _new_proc_process_table { my $self = shift; $self->ensure_class_loaded( q(Proc::ProcessTable) ); try { return Proc::ProcessTable->new( cache_ttys => TRUE ) } catch ($e) { throw $e } } sub _new_process_table { my $self = shift; my $table = CatalystX::Usul::Table->new ( caption => 'Process table generated on '.time2str(), flds => [ qw(uid pid ppid start time size state tty cmd) ], labels => { uid => 'User', pid => 'PID', ppid => 'PPID', start => 'Start Time', tty => 'TTY', time => 'Time', size => 'Size', state => 'State', cmd => 'Command' }, typelist => { pid => q(numeric), ppid => q(numeric), start => q(date), size => q(numeric), time => q(numeric) }, wrap => { cmd => 1 }, ); return $table; } sub _run_cmd_filter_out { my ($self, $text) = @_; return join "\n", map { strip_leader $_ } grep { not m{ (?: Started | Finished ) }msx } split m{ [\n] }msx, $text; } sub _run_cmd_ipc_run_args { my ($self, @rest) = @_; my $args = arg_list @rest; $args->{debug } ||= $self->debug; $args->{expected_rv} ||= 0; $args->{err } ||= NUL; $args->{out } ||= NUL; $args->{in } ||= q(stdin); is_arrayref $args->{in} and $args->{in} = join "\n", @{ $args->{in} }; return $args; } sub _run_cmd_system_args { my ($self, @rest) = @_; my $args = arg_list @rest; $args->{debug } ||= $self->debug; $args->{expected_rv} ||= 0; $args->{tempdir } ||= $self->tempdir; # Three different semi-random file names in the temp directory $args->{err_ref } ||= $self->tempfile( $args->{tempdir} ); $args->{out_ref } ||= $self->tempfile( $args->{tempdir} ); $args->{pid_ref } ||= $self->tempfile( $args->{tempdir} ); $args->{err } ||= q(out) if ($args->{async}); $args->{err } ||= $args->{err_ref}->pathname; $args->{out } ||= $args->{out_ref}->pathname; $args->{in } ||= q(stdin); $args->{async} and not $args->{debug} and $args->{out} = q(null); return $args; } sub _run_cmd_using_ipc_run { my ($self, $cmd, @rest) = @_; my ($buf_err, $buf_out, $error, $msg, $rv); my $args = $self->_run_cmd_ipc_run_args( @rest ); my $cmd_ref = __partition_command( $cmd ); my $cmd_str = join SPC, @{ $cmd }; my $null = File::Spec->devnull; my $err = $args->{err}; my $out = $args->{out}; my $in = $args->{in }; my @cmd_args = (); if ($in eq q(null)) { push @cmd_args, q(0<).$null } elsif ($in ne q(stdin)) { push @cmd_args, q(0<), \$in } if ($out eq q(null)) { push @cmd_args, q(1>).$null } elsif ($out ne q(stdout)) { push @cmd_args, q(1>), \$buf_out } if ($err eq q(out)) { push @cmd_args, q(2>&1) } elsif ($err eq q(null)) { push @cmd_args, q(2>).$null } elsif ($err ne q(stderr)) { push @cmd_args, q(2>), \$buf_err } $args->{debug} and $self->log_debug( "Running ${cmd_str}" ); try { $rv = __ipc_run_harness( $cmd_ref, @cmd_args ) } catch ($e) { throw $e } $args->{debug} and $self->log_debug( "Run harness returned ${rv}\n" ); my $res = CatalystX::Usul::IPC::Response->new(); $res->sig( $rv & 127 ); $res->core( $rv & 128 ); $rv = $res->rv( $rv >> 8 ); if ($out ne q(null) and $out ne q(stdout)) { $res->out( $self->_run_cmd_filter_out( $res->stdout( $buf_out ) ) ); } if ($err eq q(out)) { $res->stderr( $res->stdout ); $error = $res->out; chomp $error; } elsif ($err ne q(null) and $err ne q(stderr)) { $res->stderr( $error = $buf_err ); chomp $error; } else { $error = NUL } if ($rv > $args->{expected_rv}) { $error ||= 'Unknown error'; $msg = "Return value ${rv} error ${error}"; $args->{debug} and $self->log_debug( $msg ); throw error => $error, rv => $rv; } return $res; } sub _run_cmd_using_system { my ($self, $cmd, @rest) = @_; my ($error, $msg, $rv); my $args = $self->_run_cmd_system_args( @rest ); my $prog = $self->basename( (split SPC, $cmd)[ 0 ] ); my $null = File::Spec->devnull; my $err = $args->{err}; my $out = $args->{out}; my $in = $args->{in }; $cmd .= $in eq q(stdin) ? NUL : $in eq q(null) ? ' 0<'.$null : ' 0<'.$in; $cmd .= $out eq q(stdout) ? NUL : $out eq q(null) ? ' 1>'.$null : ' 1>'.$out; $cmd .= $err eq q(stderr) ? NUL : $err eq q(null) ? ' 2>'.$null : $err ne q(out) ? ' 2>'.$err : ' 2>&1'; $cmd .= ' & echo $! 1>'.$args->{pid_ref}->pathname if ($args->{async}); $args->{debug} and $self->log_debug( "Running ${cmd}" ); { local ($CHILD_ERROR, $ERRNO, $EVAL_ERROR); local $WAITEDPID = 0; local $ERROR = OK; eval { local $SIG{CHLD} = \&__handler; $rv = system $cmd }; $EVAL_ERROR and throw $EVAL_ERROR; $msg = "System returned ${rv} waitedpid ${WAITEDPID} error ${ERROR}\n"; $args->{debug} and $self->log_debug( $msg ); # On some systems the child handler reaps the child process so the system # call returns -1 and sets $ERRNO to No child processes. This line and # the child handler code fix the problem $rv == -1 and $WAITEDPID > 0 and $rv = $ERROR; if ($rv == -1) { $error = 'Program [_1] failed to start: [_2]'; throw error => $error, args => [ $prog, $ERRNO ], rv => -1; } } my $res = CatalystX::Usul::IPC::Response->new(); $res->sig( $rv & 127 ); $res->core( $rv & 128 ); $rv = $res->rv( $rv >> 8 ); if ($args->{async}) { if ($rv != 0) { $error = 'Program [_1] failed to start'; throw error => $error, args => [ $prog ], rv => $rv; } my $pid = $args->{pid_ref}->chomp->getline || 'unknown pid'; $res->out( "Started ${prog}(${pid}) in the background" ); $res->pid( $pid ); return $res; } if ($out ne q(stdout) and $out ne q(null) and -f $out) { my $text = $self->io( $out )->slurp; $res->out( $self->_run_cmd_filter_out( $res->stdout( $text ) ) ); } if ($err eq q(out)) { $res->stderr( $res->stdout ); $error = $res->out; chomp $error; } elsif ($err ne q(stderr) and $err ne q(null) and -f $err) { $res->stderr( $error = $self->io( $err )->slurp ); chomp $error; } else { $error = NUL } if ($rv > $args->{expected_rv}) { $error ||= 'Unknown error'; $msg = "Return value ${rv} error ${error}"; $args->{debug} and $self->log_debug( $msg ); throw error => $error, rv => $rv; } return $res; } sub _set_fields { my ($self, $has, $p) = @_; my $flds = {}; $flds->{id } = $has->{pid } ? $p->pid : NUL; $flds->{pid } = $has->{pid } ? $p->pid : NUL; $flds->{ppid } = $has->{ppid } ? $p->ppid : NUL; $flds->{start} = $has->{start } ? time2str( '%d %b %Y %H:%M', $p->start ) : NUL; $flds->{state} = $has->{state } ? $p->state : NUL; $flds->{tty } = $has->{ttydev} ? $p->ttydev : NUL; $flds->{time } = $has->{time } ? int $p->time / 1_000_000 : NUL; $flds->{uid } = $has->{uid } ? getpwuid $p->uid : NUL; if ($has->{ttydev} and $p->ttydev) { $flds->{tty} = $p->ttydev; } elsif ($has->{ttynum} and $p->ttynum) { $flds->{tty} = $p->ttynum; } else { $flds->{tty} = NUL } if ($has->{rss} and $p->rss) { $flds->{size} = int $p->rss/1_024; } elsif ($has->{size} and $p->size) { $flds->{size} = int $p->size/1_024; } else { $flds->{size} = NUL } if ($has->{exec} and $p->exec) { $flds->{cmd} = substr $p->exec, 0, 64; } elsif ($has->{cmndline} and $p->cmndline) { $flds->{cmd} = substr $p->cmndline, 0, 64; } elsif ($has->{fname} and $p->fname) { $flds->{cmd} = substr $p->fname, 0, 64; } else { $flds->{cmd} = NUL } return $flds; } # Private subroutines sub __cleaner { $ERROR = q(SIGPIPE); $WAITEDPID = wait; $SIG{PIPE} = \&__cleaner; return; } sub __cmd_matches_pattern { my ($cmd, $pattern) = @_; return !$pattern || $cmd =~ m{ $pattern }msx ? TRUE : FALSE; } sub __handler { local $ERRNO; # so that waitpid does not step on existing value while ((my $child_pid = waitpid -1, WNOHANG) > 0) { if (WIFEXITED( $CHILD_ERROR ) and $child_pid > $WAITEDPID) { $WAITEDPID = $child_pid; $ERROR = $CHILD_ERROR; } } $SIG{CHLD} = \&__handler; # in case of unreliable signals return; } sub __ipc_run_harness { my $h = IPC::Run::harness( @_ ); $h->run; return $h->full_result; } sub __partition_command { my $cmd = shift; my $aref = []; my @command = (); for my $item (grep { length && defined } @{ $cmd }) { if ($item =~ $SPECIAL_CHARS) { push @command, $aref, $item; $aref = [] } else { push @{ $aref }, $item } } if ($aref->[ 0 ]) { if ($command[ 0 ]) { push @command, $aref } else { @command = @{ $aref } } } return \@command; } sub __proc_belongs_to_user { my ($puid, $user) = @_; return !$user || $user eq q(All) || $user eq getpwuid $puid ? TRUE : FALSE; } sub __pscomp { my ($arg1, $arg2) = @_; my $result; $result = $arg1->{uid} cmp $arg2->{uid}; $result = $arg1->{pid} <=> $arg2->{pid} if ($result == 0); return $result; } sub __read_all_from { my $fh = shift; local $RS = undef; return <$fh>; } 1; __END__ =pod =head1 Name CatalystX::Usul::IPC - List/Create/Delete processes =head1 Version 0.7.$Revision: 1181 $ =head1 Synopsis use parent qw(CatalystX::Usul::IPC); =head1 Description Displays the process table and allows signals to be sent to selected processes =head1 Subroutines/Methods =head2 child_list @pids = $self->child_list( $pid ); Called with a process id for an argument this method returns a list of child process ids =head2 popen $response = $self->popen( $cmd, @input ); Uses L<IPC::Open3> to fork a command and pipe the lines of input into it. Returns a C<CatalystX::Usul::IPC::Response> object. The response object's C<out> method returns the B<STDOUT> from the command. Throws in the event of an error =head2 process_exists $bool = $self->process_exists( file => $path, pid => $pid ); Tests for the existence of the specified process. Either specify a path to a file containing the process id or specify the id directly =head2 process_table Generates the process table data used by the L<HTML::FormWidget> table subclass. Called by L<CatalystX::Usul::Model::Process/proc_table> =head2 run_cmd $response = $self->run_cmd( $cmd, $args ); Runs the given command by calling C<system>. The keys of the C<$args> hash are: =over 3 =item async If I<async> is true then the command is run in the background =item debug Debug status. Defaults to C<< $self->debug >> =item err Passing I<< err => q(out) >> mixes the normal and error output together =item in Input to the command =item log Logging object. Defaults to C<< $self->log >> =item out Destination for standard output =item tempdir Directory used to store the lock file and lock table if the C<fcntl> backend is used. Defaults to C<< $self->tempdir >> =back Returns a L<CatalystX::Usul::IPC::Response> object or throws an error. The response object has the following methods: =over 3 =item B<core> Returns true if the command generated a core dump =item B<err> Contains a cleaned up version of the command's B<STDERR> =item B<out> Contains a cleaned up version of the command's B<STDOUT> =item B<pid> The id of the background process. Only set if command is running I<async> =item B<rv> The return value of the command =item B<sig> If the command died as the result of receiving a signal return the signal number =item B<stderr> Contains the command's B<STDERR> =item B<stdout> Contains the command's B<STDOUT> =back =head2 signal_process Send a signal the the selected processes. Invokes the I<suid> root wrapper =head2 signal_process_as_root $self->signal_process( [{] param => value, ... [}] ); This is called by processes running as root to send signals to selected processes. The passed parameters can be either a list of key value pairs or a hash ref. Either a single B<pid>, or an array ref B<pids>, or B<file> must be passwd. The B<file> parameter should be a path to a file containing pids one per line. The B<sig> defaults to I<TERM>. If the B<flag> parameter is set to I<one> then the given signal will be sent once to each selected process. Otherwise each process and all of it's children will be sent the signal. If the B<force> parameter is set to true the after a grace period each process and it's children are sent signal I<KILL> =head2 __cleaner This interrupt handler traps the pipe signal =head2 __handler This interrupt handler traps the child signal =head1 Diagnostics None =head1 Configuration and Environment None =head1 Dependencies =over 3 =item L<CatalystX::Usul> =item L<CatalystX::Usul::Constants> =item L<CatalystX::Usul::IPC::Response> =item L<CatalystX::Usul::Table> =item L<IPC::Open3> =item L<IPC::SysV> =item L<Module::Load::Conditional> =item L<POSIX> =item L<Proc::ProcessTable> =back =head1 Incompatibilities There are no known incompatibilities in this module =head1 Bugs and Limitations There are no known bugs in this module. Please report problems to the address below. Patches are welcome =head1 Author Peter Flanigan, C<< <Support at RoxSoft.co.uk> >> =head1 License and Copyright Copyright (c) 2011 Peter Flanigan. All rights reserved This program is free software; you can redistribute it and/or modify it under the same terms as Perl itself. See L<perlartistic> This program is distributed in the hope that it will be useful, but WITHOUT WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE =cut # Local Variables: # mode: perl # tab-width: 3 # End: