package Mojo::IOLoop::ReadWriteProcess::Queue; use Mojo::Base -base; use Mojo::IOLoop::ReadWriteProcess::Pool; use Mojo::IOLoop::ReadWriteProcess; use Mojo::IOLoop::ReadWriteProcess::Session; use constant DEBUG => $ENV{MOJO_PROCESS_DEBUG}; has queue => sub { Mojo::IOLoop::ReadWriteProcess::Pool->new() }; has pool => sub { Mojo::IOLoop::ReadWriteProcess::Pool->new() }; has done => sub { Mojo::IOLoop::ReadWriteProcess::Pool->new() }; has session => sub { Mojo::IOLoop::ReadWriteProcess::Session->singleton }; sub _dequeue { my ($self, $process) = @_; $self->pool($self->pool->grep(sub { $process ne $_ })); shift @{$self->queue} if ($self->queue->first && $self->pool->add($self->queue->first)); } sub exhausted { $_[0]->pool->size == 0 && shift->queue->size == 0 } sub consume { my $self = shift; $self->session->enable; $self->done->maximum_processes( $self->queue->maximum_processes + $self->pool->maximum_processes); until ($self->exhausted) { sleep .5; $self->session->consume_collected_info; $self->session->_protect( sub { $self->pool->each( sub { my $p = shift; return unless $p; return if exists $p->{started}; $p->{started}++; $p->once(stop => sub { $self->done->add($p); $self->_dequeue($p) }); $p->start; }); }); } } sub add { my $self = shift; $self->pool->add(@_) // $self->queue->add(@_); } sub AUTOLOAD { our $AUTOLOAD; my $fn = $AUTOLOAD; $fn =~ s/.*:://; return if $fn eq "DESTROY"; my $self = shift; return ( eval { $self->pool->Mojo::IOLoop::ReadWriteProcess::Pool::_cmd(@_, $fn) }, (grep(/once|on|emit/, $fn)) ? eval { $self->queue->Mojo::IOLoop::ReadWriteProcess::Pool::_cmd(@_, $fn) } : ()); } 1; =encoding utf-8 =head1 NAME Mojo::IOLoop::ReadWriteProcess::Queue - Queue for Mojo::IOLoop::ReadWriteProcess objects. =head1 SYNOPSIS use Mojo::IOLoop::ReadWriteProcess qw(queue process); my $n_proc = 20; my $fired; my $q = queue; $q->pool->maximum_processes(2); # Max 2 processes in parallel $q->queue->maximum_processes(10); # Max queue is 10 $q->add( process sub { return 42 } ) for 1..7; # Subscribe to all "stop" events in the pool $q->once(stop => sub { $fired++; }); # Consume the queue $q->consume(); my $all = $q->done; # All processes, Mojo::Collection of Mojo::IOLoop::ReadWriteProcess # Set your own running pool $q->pool(parallel sub { return 42 } => 5); # Set your own queue $q->queue(parallel sub { return 42 } => 20); $q->consume(); =head1 METHODS L<Mojo::IOLoop::ReadWriteProcess::Queue> inherits all methods from L<Mojo::Base> and implements the following new ones. Note: It proxies all the other methods of L<Mojo::IOLoop::ReadWriteProcess> for the whole process group. =head2 add use Mojo::IOLoop::ReadWriteProcess qw(queue process); my $q = queue(); $q->add(sub { print "Hello 2! " }); $q->add(process sub { print "Hello 2! " }); Add the process to the queue. =head2 consume use Mojo::IOLoop::ReadWriteProcess qw(queue); my $q = queue(); $q->add(sub { print "Hello 2! " }); $q->add(process sub { print "Hello 2! " }); $q->consume; # executes and exhaust the processes Starts the processes and empties the queue. Note: maximum_processes can be set both to the pool (number of process to be run in parallel), and for the queue (that gets exhausted during the C<consume()> phase). $q->pool->maximum_processes(2); # Max 2 processes in parallel $q->queue->maximum_processes(10); # Max queue is 10 =head2 exhausted use Mojo::IOLoop::ReadWriteProcess qw(queue); my $q = queue(); $q->add(sub { print "Hello 2! " }); $q->add(process sub { print "Hello 2! " }); $q->consume; # executes and exhaust the processes $q->exhausted; # 1 Returns 1 if the queue is exhausted. =head1 ENVIRONMENT You can set the MOJO_PROCESS_MAXIMUM_PROCESSES environment variable to specify the the maximum number of processes allowed in the pool and the queue, that are L<Mojo::IOLoop::ReadWriteProcess::Pool> instances. MOJO_PROCESS_MAXIMUM_PROCESSES=10000 =head1 LICENSE Copyright (C) Ettore Di Giacinto. This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself. =head1 AUTHOR Ettore Di Giacinto E<lt>edigiacinto@suse.comE<gt> =cut