—use
Mojo::Base -base;
has
queue
=>
sub
{ Mojo::IOLoop::ReadWriteProcess::Pool->new() };
has
pool
=>
sub
{ Mojo::IOLoop::ReadWriteProcess::Pool->new() };
has
done
=>
sub
{ Mojo::IOLoop::ReadWriteProcess::Pool->new() };
has
session
=>
sub
{ Mojo::IOLoop::ReadWriteProcess::Session->singleton };
sub
_dequeue {
my
(
$self
,
$process
) =
@_
;
$self
->pool(
$self
->pool->
grep
(
sub
{
$process
ne
$_
}));
shift
@{
$self
->queue}
if
(
$self
->queue->first &&
$self
->pool->add(
$self
->queue->first));
}
sub
exhausted {
$_
[0]->pool->size == 0 &&
shift
->queue->size == 0 }
sub
consume {
my
$self
=
shift
;
$self
->session->enable;
$self
->done->maximum_processes(
$self
->queue->maximum_processes +
$self
->pool->maximum_processes);
until
(
$self
->exhausted) {
sleep
.5;
$self
->session->consume_collected_info;
$self
->session->_protect(
sub
{
$self
->pool->
each
(
sub
{
my
$p
=
shift
;
return
unless
$p
;
return
if
exists
$p
->{started};
$p
->{started}++;
$p
->once(
stop
=>
sub
{
$self
->done->add(
$p
);
$self
->_dequeue(
$p
) });
$p
->start;
});
});
}
}
sub
add {
my
$self
=
shift
;
$self
->pool->add(
@_
) //
$self
->queue->add(
@_
);
}
sub
AUTOLOAD {
our
$AUTOLOAD
;
my
$fn
=
$AUTOLOAD
;
$fn
=~ s/.*:://;
return
if
$fn
eq
"DESTROY"
;
my
$self
=
shift
;
return
(
eval
{
$self
->pool->Mojo::IOLoop::ReadWriteProcess::Pool::_cmd(
@_
,
$fn
) },
(
grep
(/once|on|emit/,
$fn
))
?
eval
{
$self
->queue->Mojo::IOLoop::ReadWriteProcess::Pool::_cmd(
@_
,
$fn
) }
: ());
}
1;
=encoding utf-8
=head1 NAME
Mojo::IOLoop::ReadWriteProcess::Queue - Queue for Mojo::IOLoop::ReadWriteProcess objects.
=head1 SYNOPSIS
use Mojo::IOLoop::ReadWriteProcess qw(queue process);
my $n_proc = 20;
my $fired;
my $q = queue;
$q->pool->maximum_processes(2); # Max 2 processes in parallel
$q->queue->maximum_processes(10); # Max queue is 10
$q->add( process sub { return 42 } ) for 1..7;
# Subscribe to all "stop" events in the pool
$q->once(stop => sub { $fired++; });
# Consume the queue
$q->consume();
my $all = $q->done; # All processes, Mojo::Collection of Mojo::IOLoop::ReadWriteProcess
# Set your own running pool
$q->pool(parallel sub { return 42 } => 5);
# Set your own queue
$q->queue(parallel sub { return 42 } => 20);
$q->consume();
=head1 METHODS
L<Mojo::IOLoop::ReadWriteProcess::Queue> inherits all methods from L<Mojo::Base> and implements
the following new ones.
Note: It proxies all the other methods of L<Mojo::IOLoop::ReadWriteProcess> for the whole process group.
=head2 add
use Mojo::IOLoop::ReadWriteProcess qw(queue process);
my $q = queue();
$q->add(sub { print "Hello 2! " });
$q->add(process sub { print "Hello 2! " });
Add the process to the queue.
=head2 consume
use Mojo::IOLoop::ReadWriteProcess qw(queue);
my $q = queue();
$q->add(sub { print "Hello 2! " });
$q->add(process sub { print "Hello 2! " });
$q->consume; # executes and exhaust the processes
Starts the processes and empties the queue.
Note: maximum_processes can be set both to the pool (number of process to be run in parallel),
and for the queue (that gets exhausted during the C<consume()> phase).
$q->pool->maximum_processes(2); # Max 2 processes in parallel
$q->queue->maximum_processes(10); # Max queue is 10
=head2 exhausted
use Mojo::IOLoop::ReadWriteProcess qw(queue);
my $q = queue();
$q->add(sub { print "Hello 2! " });
$q->add(process sub { print "Hello 2! " });
$q->consume; # executes and exhaust the processes
$q->exhausted; # 1
Returns 1 if the queue is exhausted.
=head1 ENVIRONMENT
You can set the MOJO_PROCESS_MAXIMUM_PROCESSES environment variable to specify the
the maximum number of processes allowed in the pool and the queue, that are
L<Mojo::IOLoop::ReadWriteProcess::Pool> instances.
MOJO_PROCESS_MAXIMUM_PROCESSES=10000
=head1 LICENSE
Copyright (C) Ettore Di Giacinto.
This library is free software; you can redistribute it and/or modify
it under the same terms as Perl itself.
=head1 AUTHOR
Ettore Di Giacinto E<lt>edigiacinto@suse.comE<gt>
=cut