#!/usr/bin/env perl use strict; use warnings; use Net::Async::AMQP; use IO::Async::Loop; use IO::Async::Loop::Epoll; use IO::Async::Timer::Periodic; use Future::Utils qw(fmap0); use feature qw(say); use Getopt::Long; my %args; $args{localhost} = []; GetOptions( "host|h=s" => \$args{host}, "user|u=s" => \$args{user}, "pass=s" => \$args{pass}, "port=i" => \$args{port}, "vhost|v=s" => \$args{vhost}, "parallel=i" => \$args{parallel}, "localhost=s" => $args{localhost}, ) or die("Error in command line arguments\n"); my $loop = IO::Async::Loop->new; $loop->resolver->configure( min_workers => 2, max_workers => 2, ); say "start"; my %stats; $loop->add(IO::Async::Timer::Periodic->new( interval => 2, reschedule => 'skip', on_tick => sub { say join ', ', map { sprintf "%d %s", $stats{$_}, $_ } sort keys %stats; } )->start); my $true = (Net::AMQP->VERSION >= 0.06) ? Net::AMQP::Value->true : 1; my %mq; my @hosts = @{ delete($args{host}) || [qw(localhost)] };, my $parallel = delete $args{parallel} || 128; (fmap0 { ++$stats{active}; my $mq = Net::Async::AMQP->new( loop => $loop, heartbeat_interval => 0, ); my $k = "$mq"; push @hosts, my $host = shift @hosts; $mq{$k} = Future->wait_any( $mq->connect( %args, local_host => $host, client_properties => { capabilities => { 'consumer_cancel_notify' => $true, 'connection.blocked' => $true, }, }, )->on_fail(sub { ++$stats{failed}; warn "Failure: @_\n" }) ->on_done(sub { ++$stats{success} }), $loop->timeout_future(after => 30) ->on_fail(sub { ++$stats{timeout} }) )->on_ready(sub { --$stats{active}; ++$stats{total} }) ->on_fail(sub { delete $mq{$k} }) ->else(sub { Future->wrap }) } concurrent => $parallel, generate => sub { 1 })->get;