Message Passing for the Non-Blocked Mind
Introduction and Terminology
This is a tutorial about how to get the swing of the new AnyEvent::MP module, which allows us to transparently pass messages to our own process and to other processes on another or the same host.
What kind of messages? Well, basically a message here means a list of Perl strings, numbers, hashes and arrays, anything that can be expressed as a JSON text (as JSON is used by default in the protocol).
And next you might ask: between which entities are those messages being "passed"? Physically within or between nodes: a nodes is basically a process/program that use AnyEvent::MP and can run either on the same or different hosts.
To make this more managable, every node can contain any number of ports: Ports are ultimately the receivers of your messages.
In this tutorial I'll show you how to write a simple chat server based on AnyEvent::MP. This example is used because it nicely shows how to organise a simple application, but keep in mind that every node trusts any other, so this chat cannot be used to implement a real chat server and client system, but it can be used to implement a distributed chat server for example.
System Requirements and System Setup
Before we can start we have to make sure some things work on your system.
You should of course first make sure that AnyEvent and AnyEvent::MP are installed. But how to do that is out of scope of this tutorial.
Then we have to setup a shared secret: for two AnyEvent::MP nodes to be able to communicate with each other and authenticate each other it is necessary to setup the same shared secret for both of them (use use TLS certificates).
The easiest way is to set this up is to use the aemp utility:
aemp gensecret
This creates a $HOME/.perl-anyevent-mp config file and generates a random shared secret. You can copy this file to any other system and then communicate with it. You can also select your own shared secret (aemp setsecret) and for increased security requirements you can even create a TLS certificate (aemp gencert), causing connections to not just be authenticated, but also to be encrypted.
Connections will only be successful when the nodes that want to connect to each other have the same shared secret (or successfully verify the TLS certificate of the other side).
If something does not work as expected, and for example tcpdump shows that the connections are closed almost immediatly, you should make sure that ~/.perl-anyevent-mp is the same on all hosts/user accounts that you try to connect with each other!
The Chat Client
OK, lets start by implementing the "frontend" of the client. We will develop the client first and postpone the server for later, as the most complex things actually happen in the client.
We will use AnyEvent::Handle to do non-blocking IO read on standard input (all of this code deals with actually handling user input, no message passing yet):
#!perl
use AnyEvent;
use AnyEvent::Handle;
sub send_message {
die "This is where we will send the messages to the server"
. "in the next step of this tutorial.\n"
}
# make an AnyEvent condition variable for the 'quit' condition
# (when we want to exit the client).
my $quit_cv = AnyEvent->condvar;
my $stdin_hdl = AnyEvent::Handle->new (
fh => *STDIN,
on_error => sub { $quit_cv->send },
on_read => sub {
my ($hdl) = @_;
$hdl->push_read (line => sub {
my ($hdl, $line) = @_;
if ($line =~ /^\/quit/) { # /quit will end the client
$quit_cv->send;
} else {
send_message ($line);
}
});
}
);
$quit_cv->recv;
This is now a very basic client. Explaining explicitly what AnyEvent::Handle does or what a condvar is all about is out of scope of this document, please consult AnyEvent::Intro or the manual pages for AnyEvent and AnyEvent::Handle.
First Steps Into Messaging
To supply the send_message
function we now take a look at AnyEvent::MP. This is an example of how it might look like:
... # the use lines from the above snippet
use AnyEvent::MP;
sub send_message {
my ($msg) = @_;
snd $server_port, message => $msg;
}
... # the rest of the above script
The snd
function is exported by AnyEvent::MP, it stands for 'send a message'. The first argument is the port (a port is something that can receive messages, represented by a printable string) of the server which will receive the message. How we get this port will be explained in the next step.
The remaining arguments of snd
are message
and $msg
, the first two elements of the message (a message in AnyEvent::MP is a simple list of values, which can be sent to a port).
So all the function does is send the two values message
(a constant string to tell the server what to expect) and the actual message string.
Thats all fine and simple so far, but where do we get the $server_port
? Well, we need to get the unique port id of the server's port where it wants to receive all the incoming chat messages. A port id is unfortunately a very unique string, which we are unable to know in advance. But AnyEvent::MP supports the concept of 'registered ports', which is basically a port on the server side registered under a well known name.
For example, the server has a port for receiving chat messages with a unique port id and registers it under the name chatter
.
BTW, these "registered port names" should follow similar rules as Perl identifiers, so you should prefix them with your package/module name to make them unique, unless you use them in the main program.
As messages can only be sent to a port id and not just to some name we have to ask the server node for the port id of the port registered as chatter
.
Finding The Chatter Port
Ok, lots of talk, now some code. Now we will actually get the $server_port
from the backend:
...
use AnyEvent::MP;
my $server_node = "127.0.0.1:1299";
my $client_port = port;
snd $server_node, lookup => "chatter", $client_port, "resolved";
my $resolved_cv = AnyEvent->condvar;
my $server_port;
# setup a receiver callback for the 'resolved' message:
rcv $client_port, resolved => sub {
my ($tag, $chatter_port_id) = @_;
print "Resolved the server port 'chatter' to $chatter_port_id\n";
$server_port = $chatter_port_id;
$resolved_cv->send;
1
};
# lets block the client until we have resolved the server port.
$resolved_cv->recv;
# now setup another receiver callback for the chat messages:
rcv $client_port, message => sub {
my ($tag, $msg) = @_;
print "chat> $msg\n";
0
};
# send a 'join' message to the server:
snd $server_port, join => "$client_port";
sub send_message { ...
Now that was a lot of new stuff:
First we define the $server_node
: In order to refer to another node we need some kind of string to reference it - the node reference. The noderef is basically a comma separated list of address:port
pairs. We assume in this tutorial that the server runs on 127.0.0.1
(localhost) on port 1299, which results in the noderef 127.0.0.1:1299
.
Next, in order to receive a reply from the other node or the server we need to have a port that messages can be sent to. This is what the port
function will do for us, it just creates a new local port and returns it's port ID that can then be used to receive messages.
When you look carefully, you will see that the first snd
uses the $server_node
(a noderef) as destination port. Well, what I didn't tell you yet is that each node has a default port to receive messages. The ID of this port is the same as the noderef.
This default port provides some special services for us, for example resolving a registered name to a port id (a-ha! finally!).
This is exactly what this line does:
snd $server_node, lookup => "chatter", $client_port, "resolved";
This sends a message with first element being lookup
, followed by the (hopefully) registered port name that we want to resolve to a port id: chatter
. And in order for the server node to be able to send us back the resolved port ID we have to tell it where to send it: The result message will be sent to $client_port
(the port id of the port we just created), and will have the string resolved
as the first element.
When the node receives this message, it will look up the name, gobble up all the extra arguments we passed, append the resolved name, and send the resulting list as a message.
Next we register a receiver for this lookup
-request.
rcv $client_port, resolved => sub {
my ($tag, $chatter_port_id) = @_;
...
1
};
This sets up a receiver on our own port for messages with the first element being the string resolved
. Receivers can match the contents of the messages before actually executing the specified callback.
Please note that the every rcv
callback has to return either a true or a false value, indicating whether it is successful/done (true) or still wants to continue (false) receiving messages.
In this case we tell the $client_port
to look into all the messages it receives and look for the string resolved
in the first element of the message. If it is found, the given callback will be called with the message elements as arguments.
Using a string as the first element of the message is called tagging the message. It's common practise to code the 'type' of a message into it's first element, as this allows for simple matching.
The result message will contain the port ID of the well known port chatter
as second element, which will be stored in $chatter_port_id
.
This port ID will then be stored in $server_port
, followed by calling send
on $resolved_cv> so the program will continue.
The callback then returns a 1
(a true value), to indicate that it has done it's job and doesn't want to receive further resolved
messages.
After this the chat message receiver callback is registered with the port:
rcv $client_port, message => sub {
my ($tag, $msg) = @_;
print "chat> $msg\n";
0
};
We assume that all messages that are broadcast to the clients by the server contain the string tag message
as first element, and the actual message as second element. The callback returns a false value this time, to indicate that it is not yet done and wants to receive further messages.
The last thing to do is to tell the server to send us new chat messages from other clients. We do so by sending the message join
followed by our own port ID.
# send the server a 'join' message:
snd $server_port, join => $client_port;
This way the server knows where to send all the new messages to.
The Completed Client
This is the complete client script:
#!perl
use AnyEvent;
use AnyEvent::Handle;
use AnyEvent::MP;
my $server_node = "127.0.0.1:1299";
my $client_port = port;
snd $server_node, lookup => "chatter", $client_port, "resolved";
my $resolved_cv = AnyEvent->condvar;
my $server_port;
# setup a receiver callback for the 'resolved' message:
rcv $client_port, resolved => sub {
my ($tag, $chatter_port_id) = @_;
print "Resolved the server port 'chatter' to $chatter_port_id\n";
$server_port = $chatter_port_id;
$resolved_cv->send;
1
};
# lets block the client until we have resolved the server port.
$resolved_cv->recv;
# now setup another receiver callback for the chat messages:
rcv $client_port, message => sub {
my ($tag, $msg) = @_;
print "chat> $msg\n";
0
};
# send a 'join' message to the server:
snd $server_port, join => "$client_port";
sub send_message {
my ($msg) = @_;
snd $server_port, message => $msg;
}
# make an AnyEvent condition variable for the 'quit' condition
# (when we want to exit the client).
my $quit_cv = AnyEvent->condvar;
my $stdin_hdl = AnyEvent::Handle->new (
fh => *STDIN,
on_error => sub { $quit_cv->send },
on_read => sub {
my ($hdl) = @_;
$hdl->push_read (line => sub {
my ($hdl, $line) = @_;
if ($line =~ /^\/quit/) { # /quit will end the client
$quit_cv->send;
} else {
send_message ($line);
}
});
}
);
$quit_cv->recv;
The Server
Ok, we finally come to the server.
The server of course also needs to set up a port, and in addition needs to register it, so the clients can find it.
Again, let's jump directly into the code:
#!perl
use AnyEvent;
use AnyEvent::MP;
become_public "127.0.0.1:1299";
my $chatter_port = port;
reg $chatter_port, "chatter";
my %client_ports;
rcv $chatter_port,
join => sub {
my ($tag, $client_port) = @_;
print "got new client port: $client_port\n";
$client_ports{$client_port} = 1;
0
},
message => sub {
my ($tag, $msg) = @_;
print "message> $msg\n";
snd $_, message => $msg
for keys %client_ports;
0
};
AnyEvent->condvar->recv;
That is all. Looks much simpler than the client, doesn't it?
Let's quickly look over it, as rcv
has already been discussed in the client part of this tutorial above.
First this:
become_public "127.0.0.1:1299";
This will tell our node to become a public node, which means that it can be contacted via TCP. The first argument should be the noderef the server wants to be reachable at. In this case it's the TCP port 1299 on 127.0.0.1
.
Next we set up two receivers, one for the join
messages and another one for the actual messages of type messsage
. This is done with a single call to rcv
, which allows multiple match => $callback
pairs.
In the join
callback we receive the client port, which is simply remembered in the %client_ports
hash. In the message
callback we just iterate through all known %client_ports
and relay the message to them.
That concludes the server.
The Remaining Problems
The implementation as shown still has some bugs. For instance: How does the server know that the client isn't there anymore, so it can clean up the %client_ports
hash? Also, the chat messages have no originator, so we don't know who actually sent the message (which would be quite useful for human-to-human interaction: to know who the other one is :).
But aside from these issues I hope this tutorial showed you the basics of AnyEvent::MP and explained some common idioms.
How to solve the reliability and %client_ports
cleanup problem will be explained later in this tutorial (TODO).
Inside The Protocol
Now, for the interested parties, let me explain some details about the protocol that AnyEvent::MP nodes use to communicate to each other. If you are not interested you can skip this section.
Usually TCP is used for communication. Each node, if configured to be a public node with the initialise_node
function will listen on the configured TCP port (default is 4040).
If then one node wants to send a message to another node it will connect to the host and port given in the port ID.
Then some handshaking occurs to check whether both nodes know the shared secret. Optionally, TLS can be enabled (about how to do this exactly please consult the AnyEvent::MP man page, just a hint: It should be enough to put the private key and (self signed) certificate in the ~/.aemp-secret
file of all nodes).
After the handshake, messages will be exchanged using a serialiser (usually JSON is used for this, but it is also possible to use other serialization formats such as Storable).
SEE ALSO
AUTHOR
Robin Redeker <elmex@ta-sa.org>