use 5.006;
use strict;
use warnings FATAL => 'all';
use Coro;
=head1 NAME
Net::SMTP::Bulk - NonBlocking batch SMTP using Net::SMTP interface
=head1 VERSION
Version 0.10
our $VERSION = '0.10';
This is a rewrite of Net::SMTP using AnyEvent and Coro as a backbone. It supports AUTH and SSL (no STARTTLS support yet). This module can be used as a drop in replacement for Net::SMTP. At this point this module is EXPIREMENTAL, so use at your own risk. Functionality can change at any time.
use Net::SMTP::Bulk;
my $smtp = Net::SMTP::Bulk->new($server, %options);
See Net::SMTP for syntax.
=head2 new($server,%options)
=head2 new(%options)
Host - Hostname or IP address
Port - The port to which to connect to on the server (default: 25)
Hello - The domain name you wish to connect to (default: [same as server])
Debug - Debug information (off: 0, on: 1) (default: 0 [disabled]) OPTIONAL
Secure - If you wish to use a secure connection. (0 - None, 1 - SSL [no verify]) OPTIONAL [Requires Net::SSLeay]
Threads - How many concurrent connections per host (default: 2) OPTIONAL
Callbacks - You can supply callback functions on certain conditions, these conditions include:
The callback must return 1 it to follow proper proceedures. You can overwrite the defaults by supplying a different return.
1 - Default
101 - Remove Thread
102 - Reconnect
=head2 new(%options, Hosts=>[\%options2,\%options3])
You can supply multiple hosts in an array.
*Requires Authen::SASL
=head2 mail( ADDRESS )
=head2 to( ADDRESS )
=head2 data()
=head2 datasend( DATA )
=head2 dataend( DATA )
=head2 reconnect( )
=head2 quit( )
sub new {
my $class=shift;
my %new=@_;
my $self={};
if ($#_ % 2 == 0) {
bless($self, $class||'Net::SMTP::Bulk');
$self->{debug} = (($new{Debug}||0) >= 1) ? int($new{Debug}):0;
$self->{debug_path} = $new{DebugPath}||'debug_[HOST]_[THREAD].txt';
$self->{func} = $new{Callbacks};
if (exists($new{Hosts})) {
} else {
return $self;
sub mail {
my $self=shift;
my $str=shift;
my $k=shift||$self->{last}[0];
push( @{$self->{queue}{ $k->[0] }{ $k->[1] }}, ['MAIL',250,'MAIL FROM: '.$str] );
sub to {
my $self=shift;
my $str=shift;
my $k=shift||$self->{last}[0];
push( @{$self->{queue}{ $k->[0] }{ $k->[1] }}, ['TO',250,'RCPT TO: '.$str] );
sub data {
my $self=shift;
my $k=shift||$self->{last}[0];
push( @{$self->{queue}{ $k->[0] }{ $k->[1] }}, ['DATA',354,'DATA'] );
$self->{data}{ $k->[0] }{ $k->[1] }='';
sub datasend {
my $self=shift;
my $str=shift;
my $k=shift||$self->{last}[0];
$self->{data}{ $k->[0] }{ $k->[1] }.=$str."\n";
sub dataend {
my $self=shift;
my $k=shift||$self->{last}[0];
push( @{$self->{queue}{ $k->[0] }{ $k->[1] }}, ['DATAEND',250,$self->{data}{ $k->[0] }{ $k->[1] }."\r\n."] );
$self->{queue_size}=$#{$self->{queue}{ $k->[0] }{ $k->[1] }} if $self->{queue_size} < $#{$self->{queue}{ $k->[0] }{ $k->[1] }};
$self->_BULK() if $self->{last}[1] == $#{ $self->{order} };
sub auth {
my $self=shift;
my $user;
my $pass;
my $k;
my $mech;
if ($#_ == 3 or $#_ == 2 and ref($_[2]) ne 'ARRAY') {
require MIME::Base64;
require Authen::SASL;
if ($self->{auth}{ $k->[0] }{ $k->[1] }[0] == 1) {
#already authenticated
} else {
$mech ||= uc(join(' ',(@{$self->{header}{$k->[0]}{$k->[1]}{auth}})));
$self->{objects}{$k->[0]}{$k->[1]}{sasl} = Authen::SASL->new(
mechanism => $mech,
callback => {
user => $user,
pass => $pass,
authname => $user,
) if (!exists($self->{objects}{$k->[0]}{$k->[1]}{sasl}));
$self->{objects}{ $k->[0] }{ $k->[1] }{sasl_client} = $self->{objects}{ $k->[0] }{ $k->[1] }{sasl}->client_new("smtp", $self->{host}{$k->[0]},1);
$self->_WRITE($k,'AUTH '.$self->{objects}{ $k->[0] }{ $k->[1] }{sasl_client}->mechanism);
if (my $str = $self->{objects}{ $k->[0] }{ $k->[1] }{sasl_client}->client_start) {
$self->_WRITE($k,MIME::Base64::encode_base64($str, ''));
do {
my $msg=MIME::Base64::decode_base64($self->{buffer}{ $k->[0] }{ $k->[1] });
} while($self->{status_code}{ $k->[0] }{ $k->[1] } == 334);
if ($self->{status_code}{ $k->[0] }{ $k->[1] } == 235) {
$self->{auth}{ $k->[0] }{ $k->[1] }=[1,$mech];
my $r=$self->_FUNC('auth_pass',$self,$k,0,$self->{queue}{ $k->[0] }{ $k->[1] });
if ($r != 1) {
return 1;
} else {
my $r=$self->_FUNC('auth_fail',$self,$k,0,$self->{queue}{ $k->[0] }{ $k->[1] });
if ($r == 1) {
$self->_FUNC_CALLBACK($k,0,101); #remove thread
} else {
return 0;
sub quit {
my $self=shift;
foreach my $dfh ( keys(%{ $self->{debug_fh} }) ) {
sub reconnect{
my $self=shift;
my $k=shift||$self->{last}[0];
$self->{fh}{ $k->[0] }{ $k->[1] }->close() if defined($self->{fh}{ $k->[0] }{ $k->[1] });
if ( $self->{auth}{ $k->[0] }{ $k->[1] }[0] == 1 ) {
$self->{auth}{ $k->[0] }{ $k->[1] }[0]=0;
my $auth=$self->auth($self->{auth}{ $k->[0] }{ $k->[1] }[1],'','',$k);
$self->{order}[ $self->{last}[1] ][2]=2;
return 1 if $auth == 1;
return 0;
sub _BULK {
my $self=shift;
foreach my $q ( 0..$self->{queue_size} ) {
foreach my $k (@{$self->{order}}) {
if ($k->[2] == 1 and exists($self->{queue}{ $k->[0] }{ $k->[1] }[$q])) {
$self->_WRITE($k,$self->{queue}{ $k->[0] }{ $k->[1] }[$q][2]);
foreach my $k (@{$self->{order}}) {
if ($k->[2] == 1 and exists($self->{queue}{ $k->[0] }{ $k->[1] }[$q])) {
if ($self->{status_code}{ $k->[0] }{ $k->[1] } == $self->{queue}{ $k->[0] }{ $k->[1] }[$q][1] ) {
no strict;
my $r=$self->_FUNC('pass',$self,$k,$q,$self->{queue}{ $k->[0] }{ $k->[1] });
if ($r != 1) {
} else {
if ( $self->{status_code}{ $k->[0] }{ $k->[1] } == -1 ) {
my $r=$self->_FUNC('hang',$self,$k,$q,$self->{queue}{ $k->[0] }{ $k->[1] });
if ($r == 1) {
} else {
} else {
my $r=$self->_FUNC('fail',$self,$k,$q,$self->{queue}{ $k->[0] }{ $k->[1] });
if ($r != 1) {
foreach my $order (0..$#{$self->{order}}) {
my $k=$self->{order}[$order];
$self->{order}[ $order ][2]=1 if $self->{order}[ $order ][2] == 2;
$self->{queue}{ $k->[0] }{ $k->[1] }=[];
sub _FUNC {
my $self=shift;
no strict;
my $func=shift;
return &{$self->{func}{$func}}(@_) if exists($self->{func}{$func});
return 1;
my $self=shift;
my $k=shift;
my $q=shift;
my $r=shift;
if ($r == 101) {
#remove thread
$self->{order}[ $self->{last}[1] ][2]=0;
$self->{last}=[ $self->{order}[ $self->{last}[1] ], $self->{last}[1] ];
$self->_DEBUG($k,'++REMOVE THREAD++') if $self->{debug} >= 1;
} elsif ($r == 102) {
$self->_DEBUG($k,'++RECONNECT++') if $self->{debug} >= 1;
my $reconnect=$self->reconnect($k);
if ($reconnect == 1) {
my $r2=$self->_FUNC('reconnect_pass',$self,$k,$q,$self->{queue}{ $k->[0] }{ $k->[1] });
if ($r2 != 1) {
} else {
my $r2=$self->_FUNC('reconnect_fail',$self,$k,$q,$self->{queue}{ $k->[0] }{ $k->[1] });
if ($r2 == 1) {
$self->_FUNC_CALLBACK($k,0,101); #remove thread
} else {
sub _PREPARE {
my $self=shift;
my $hosts=shift;
foreach my $i ( 0..$#{$hosts} ) {
my %new=( %{$hosts->[$i]} );
$self->{host}{ $new{Host} }||=$new{Host};
$self->{last}=[[$new{Host},0],0] if $i == 0;
if ($self->{host}{ $new{Host} }=~s/\:(\d+?)$//is) {
$self->{port}{ $new{Host} }=$1;
$self->{secure}{ $new{Host} }=$new{Secure}||$self->{defaults}{secure};
$self->{port}{ $new{Host} }||=$new{Port}||$self->{defaults}{port};
$self->{helo}{ $new{Host} }=$new{Hello}||$self->{host}{ $new{Host} };
$self->{timeout}{ $new{Host} }=$new{Timeout}||$self->{defaults}{timeout};
$self->{threads}{ $new{Host} }=$new{Threads}||$self->{defaults}{threads};
foreach my $t ( 0..$self->{threads}{ $new{Host} } ) {
if ($self->{debug} == 2) {
my $path=''.$self->{debug_path};
open( $self->{debug_fh}{ $new{Host}.':'.$t } , '>>'.$path );
$self->{auth}{ $new{Host} }{$t}=[0,''];
$self->{queue}{ $new{Host} }{$t}=[];
push(@{$self->{order}}, [$new{Host},$t,1] );
foreach my $t ( 0..$self->{threads}{ $new{Host} } ) {
foreach my $t ( 0..$self->{threads}{ $new{Host} } ) {
sub _CONNECT {
my $self=shift;
my $k=shift;
my $cb=Coro::rouse_cb;
my $g=tcp_connect($self->{host}{ $k->[0] }, $self->{port}{ $k->[0] },
my $sock=$_[0];
$self->_SECURE($k,$sock) if ($self->{secure}{ $k->[0] } == 1 or $self->{secure}{ $k->[0] } == 2);
$self->{fh}{ $k->[0] }{ $k->[1] }=unblock +(Coro::rouse_wait)[0];
sub _HELO {
my $self=shift;
my $k=shift;
if ($self->{status_code}{ $k->[0] }{ $k->[1] } == 220) {
my $r=$self->_FUNC('connect_pass',$self,$k,0,$self->{queue}{ $k->[0] }{ $k->[1] });
if ($r != 1) {
} else {
my $r=$self->_FUNC('connect_fail',$self,$k,0,$self->{queue}{ $k->[0] }{ $k->[1] });
if ($r == 1) {
$self->_FUNC_CALLBACK($k,0,101); #remove thread
} else {
$self->_WRITE($k,'EHLO '.$self->{helo}{ $k->[0] });
sub _NEXT {
my $self=shift;
my $k=shift;
my @next;
while (!exists($next[0])) {
if (exists($self->{order}[ $self->{last}[1] ]) and $self->{order}[ $self->{last}[1] ][2]==1) {
@next=( $self->{order}[ $self->{last}[1] ], $self->{last}[1] );
} else {
sub _SECURE {
my $self=shift;
my $k=shift;
my $sock=shift;
$self->{secure_sock}{ $k->[0] }{ $k->[1] }=$sock;
require IO::Socket::SSL;
my $sel = IO::Select->new($sock); # wait until it connected
$self->{secure_sel}{ $k->[0] }{ $k->[1] }=$sel;
if ($sel->can_write) {
$self->_DEBUG($k,'IO::Socket::INET connected') if $self->{debug} >= 1;
my %extra;
if ($self->{secure}{ $k->[0] } == 1) {
IO::Socket::SSL->start_SSL($sock, %extra, SSL_startHandshake => 0);
while (1) {
if ($sock->connect_SSL) { # will not block
$self->_DEBUG($k,'IO::Socket::SSL connected') if $self->{debug} >= 1;
else { # handshake still incomplete
$self->_DEBUG($k,'IO::Socket::SSL not connected yet') if $self->{debug} >= 1;
if ( IO::Socket::SSL->want_read() ) {
elsif ( IO::Socket::SSL->want_write()) {
else {
$self->_DEBUG($k,'IO::Socket::SSL unknown error: '. $IO::Socket::SSL::SSL_ERROR) if $self->{debug} >= 1;
my $self=shift;
if (exists($self->{secure_sel})) {
while (1) {
foreach my $h ( keys(%{$self->{secure_sel}}) ) {
foreach my $t ( keys(%{$self->{secure_sel}{$h}}) ) {
my $sock=$self->{secure_sock}{$h}{$t};
my $sel=$self->{secure_sel}{$h}{$t};
if ($sock->connect_SSL) { # will not block
$self->_DEBUG([$h,$t],'IO::Socket::SSL connected') if $self->{debug} >= 1;
} else { # handshake still incomplete
$self->_DEBUG([$h,$t],'IO::Socket::SSL not connected yet') if $self->{debug} >= 1;
if ( $sock->want_read() ) {
} elsif ( $sock->want_write()) {
} else {
$self->_DEBUG([$h,$t],'IO::Socket::SSL unknown error: '. $sock->errstr()) if $self->{debug} >= 1;
if ( keys(%{$self->{secure_sel}{$h}}) == 0 ) {
if ( keys(%{$self->{secure_sel}}) == 0 ) {
sub _HEADER {
my $self=shift;
my $k=shift;
my $fh= $self->{fh}{ $k->[0] }{ $k->[1] };
my $nb_fh = $fh->fh;
my $buf = \$fh->rbuf;
while () {
# now use buffer contents, modifying
# if necessary to reflect the removed data
last if $$buf ne ""; # we have leftover data
# read another buffer full of data
$fh->readable or die "end of file";
sysread $nb_fh, $$buf, 8192;
foreach my $line (split/[\r\n]+/,$$buf) {
$line=~m/^((\d{3})[ \-](\w+?)(?: (.*?)|)[\r\n]*?)$/is;
$self->_DEBUG($k,$1) if $self->{debug} >= 1;
my $status = lc($2);
my $head = lc($3);
$self->{header}{ $k->[0] }{ $k->[1] }{$head}=[split/ /,($4||'')];
sub _READ {
my $self=shift;
my $k=shift;
my $str;
my $waitcount=0;
if ($self->{fh}{ $k->[0] }{ $k->[1] }->readable()) {
$str=$self->{fh}{ $k->[0] }{ $k->[1] }->readline();
if (defined($str) and $str=~m/^((\d{3}).(.*?))[\r\n]+?$/) {
$self->_DEBUG($k,$1) if $self->{debug} >= 1;
$self->{buffer}{ $k->[0] }{ $k->[1] }=$1;
$self->{status_code}{ $k->[0] }{ $k->[1] }=$2;
$self->{status_text}{ $k->[0] }{ $k->[1] }=$3;
} else {
$self->{buffer}{ $k->[0] }{ $k->[1] }='';
$self->{status_code}{ $k->[0] }{ $k->[1] }=-1;
$self->{status_text}{ $k->[0] }{ $k->[1] }='';
sub _WRITE {
my $self=shift;
my $k=shift;
my $str=shift;
$self->_DEBUG($k,'>>'.$str) if $self->{debug} >= 1;
$self->{fh}{ $k->[0] }{ $k->[1] }->print($str."\r\n");
sub _DEBUG {
my $self=shift;
my $k=shift;
my $str=shift||'';
if ($self->{debug} == 1) {
print '['.$k->[0].':'.$k->[1].'] '.$str."\n";
} else {
print { $self->{debug_fh}{ $k->[0].':'.$k->[1] } } '['.$k->[0].':'.$k->[1].'] '.$str."\n";
=head1 AUTHOR
1; # End of Net::SMTP::Bulk