Introduction to Coro

This tutorial will introduce you to the main features of the Coro module family.

It first introduces some basic concepts, and later gives a short overview of the module family.

What is Coro?

Coro started as a simple module that implemented a specific form of first class continuations called Coroutines. These basically allow you to capture the current point execution and jump to another point, while allowing you to return at any time, as kind of non-local jump, not unlike C's setjmp/longjmp. This is nowadays known as a Coro::State.

The natural application for these is to include a scheduler, resulting in cooperative threads, which is the main use case for Coro today. Still, much of the documentation and custom refers to these threads as "coroutines" or often just "coros".

A thread is very much like a stripped-down perl interpreter, or a process: Unlike a full interpreter process, a thread doesn't have its own variable or code namespaces - everything is shared. That means that when one thread modifies a variable (or any value, e.g. through a reference), then other threads immediately see this change when they look at the same variable or location.

Cooperative means that these threads must cooperate with each other, when it coems to CPU usage - only one thread ever has the CPU, and if another thread wants the CPU, the running thread has to give it up. The latter is either explicitly, by calling a function to do so, or implicity, when waiting on a resource (such as a Semaphore, or the completion of some I/O request).

Perl itself uses rather confusing terminilogy - what perl calls a "thread" is actually called a "process" everywhere else: The so-called "perl threads" are actually artifacts of the unix process emulation code used on Windows, which is consequently why they are actually processes and not threads. The biggest difference is that neither variables (nor code) are shared between processes.

Cooperative Threads

Cooperative threads is what the Coro module gives you:

use Coro;

To create a thread, you can use the async function that automatically gets exported from that module:

async {
   print "hello\n";
};

Async expects a code block as first argument (in indirect object notation). You can pass it more arguments, and these will end up in @_ when executing the codeblock, but since it is a closure, you can also just refer to any lexical variables that are currently visible.

If you save the above lines in a file and execute it as a perl program, you will not get any output.

The reasons is that, although you created a thread, and the thread is ready to execute (because async puts it into the so-called ready queue), it never gets any CPU time to actually execute, as the main program - which also is a thread almost like any other - never gives up the CPU but instead exits the whole program, by running off the end of the file.

To explicitly give up the CPU, use the cede function (which is often called yield in other thread implementations):

use Coro;

async {
   print "hello\n";
};

cede;

Running the above prints hello and exits.

This is not very interetsing, so let's try a slightly more interesting program:

use Coro;

async {
   print "async 1\n";
   cede;
   print "async 2\n";
};

print "main 1\n";
cede;
print "main 2\n";
cede;

Running this program prints:

main 1
async 1
main 2
async 2

This nicely illustrates the non-local jump ability: the main program prints the first line, and then yields the CPU to whatever other threads there are. And there is one other, which runs and prints "async 1", and itself yields the CPU. Since the only other thread available is the main program, it continues running and so on.

In more detail, async creates a new thread. All new threads start in a suspended state. To make them run, they need to be put into the ready queue, which is the second thing that async does. Each time a thread gives up the CPU, Coro runs a so-called scheduler. The scheduler selects the next thread from the ready queue, removes it from the queue, and runs it.

cede also does two things: first it puts the running thread into the ready queue, and then it jumps into the scheduler. This has the effect of giving up the CPU, but also ensures that, eventually, the thread gets run again.

In fact, cede could be implemented like this:

sub my_cede {
   $Coro::current->ready;
   schedule;
}

This works because $Coro::current always contains the currently running thread, and the scheduler itself can be called via Coro::schedule.

What's the effect of just calling schedule? Simple, the scheduler selects the next ready thread and runs it - the current thread, as it hasn't been put into the ready queue, will go to sleep until something wakes it up.

The following example remembers the current thread in a variable, creates a thread and then puts the main program to sleep.

The newly created thread uses rand to wake up the main thread by calling its ready method - or not.

use Coro;

my $wakeme = $Coro::current;

async {
   $wakeme->ready if 0.5 < rand;
};

schedule;

Now, when you run it, one of two things happen: Either the async thread wakes up main again, in which case the program silently exits, or it doesn't, in which case you get:

FATAL: deadlock detected at - line 0

Why is that? When the async thread falls of the end, it will be terminated (via a call to Coro::terminate) and the scheduler gets run again. Since the async thread hasn't woken up the main thread, and there aren't any other threads, there is nothing to wake up, and the program cannot continue. Since there are threads that could be running (main) but none are ready to do so, Coro signals a deadlock - no progress is possible.

In fact, there is an important case where progress is, in fact, possible - namely in an event-based program. In such a case, the program could well wait for external events, such as a timeout, or some data to arrive on a socket.

Since a deadlock in such a case would not be very useful, there is a module named Coro::AnyEvent that integrates threads into an event loop. It configures Coro in a way that instead of dieing with an error message, it instead runs the event loop in the hope of receiving an event that will wake up some thread.

Semaphores and other locks

Using only ready, cede and schedule to synchronise threads is difficult, especially if many threads are ready at the same time. Coro supports a number of primitives to help synchronising threads in easier ways. The first such primitives is Coro::Semaphore, which implements counting semaphores (binary semaphores are available as Coro::Signal, and there are Coro::SemaphoreSet and Coro::RWLock primitives as well):

use Coro;

my $sem = new Coro::Semaphore 0; # a locked semaphore

async {
   print "unlocking semaphore\n";
   $sem->up;
};

print "trying to lock semaphore\n";
$sem->down;
print "we got it!\n";

This program creates a locked semaphore (a semaphore with count 0) and tries to lock it. Since the semaphore is already locked, this will block the main thread until the semaphore becomes available.

This yields the CPU to the async thread, which unlocks the semaphore (and instantly terminates itself by returning).

Since the semaphore is now available, the main program locks it and continues.

Semaphores are most often used to lock resources, or to exclude other threads from accessing or using a resource. For example, consider a very costly function (that temporarily allocates a lot of ram, for example). You wouldn't want to have many threads calling this function, so you use a semaphore:

my $lock = new Coro::Semaphore; # unlocked initially

sub costly_function {
   $lock->down; # acquire semaphore

   # do costly operation that blocks

   $lock->up; # unlock it
}

No matter how many threads call costly_function, only one will run the body of it, all others will wait in the down call.

Why does the comment mention "operation the blocks"? That's because coro's threads are cooperative: unless costly_function willingly gives up the CPU, other threads of control will simply not run. This makes locking superfluous in cases where the fucntion itself never gives up the CPU, but when dealing with the outside world, this is rare.

Now consider what happens when the code dies after executing down, but before up. This will leave the semaphore in a locked state, which usually isn't what you want: normally you would want to free the lock again.

This is what the guard method is for:

my $lock = new Coro::Semaphore; # unlocked initially

sub costly_function {
   my $guard = $lock->guard; # acquire guard

   # do costly operation that blocks
}

This method downs the semaphore and returns a so-called guard object. Nothing happens as long as there are references to it, but when all references are gone, for example, when costly_function returns or throws an exception, it will automatically call up on the semaphore, no way to forget it. Even when the thread gets canceled by another thread will the guard object ensure that the lock is freed.

Apart from Coro::Semaphore and Coro::Signal, there is also a reader-writer lock (Coro::RWLock) and a semaphore set (Coro::SemaphoreSet).

Channels

Semaphores are fine, but usually you want to communicate by exchanging data as well. This is where Coro::Channel comes in useful: Channels are the Coro equivalent of a unix pipe (and very similar to amiga message ports :) - you can put stuff into it on one side, and read data from it on the other.

Here is a simple example that creates a thread and sends it numbers. The thread calculates the square of the number, and puts it into another channel, which the main thread reads the result from:

use Coro;

my $calculate = new Coro::Channel;
my $result    = new Coro::Channel;

async {
   # endless loop
   while () {
      my $num = $calculate->get; # read a number
      $num **= 2; # square it
      $result->put ($num); # put the result into the result queue
   }
};

for (1, 2, 5, 10, 77) {
   $calculate->put ($_);
   print "$_ ** 2 = ", $result->get, "\n";
}

Gives:

1 ** 2 = 1
2 ** 2 = 4
5 ** 2 = 25
10 ** 2 = 100
77 ** 2 = 5929

Both get and put methods can block the current thread: get first checks whether there is some data available, and if not, it block the current thread until some data arrives. put can also block, as each Channel has a "maximum buffering capacity", i.e. you cannot store more than a specific number of items, which can be configured when the Channel gets created.

In the above example, put never blocks, as the default capacity of a Channel is very high. So the for loop first puts data into the channel, then tries to get the result. Since the async thread hasn't put anything in there yet (on the firts iteration it hasn't even run yet), the result Channel is still empty, so the main thread blocks.

Since the only other runnable/ready thread at this point is the squaring thread, it will be woken up, will get the number, square it and put it into the result channel, waking up the main thread again. It will still continue to run, as waking up other threads just puts them into the ready queue, nothing less, nothing more.

Only when the async thread tries to get the next number from the calculate channel will it block (because nothing is there yet) and the main thread will continue running. And so on.

In general, Coro will only ever block a thread when it has to: Neither the Coro module itself nor any of its submodules will ever give up the CPU unless they have to, because they wait for some event to happen.

Be careful, however: when multiple threads put numbers into $calculate and read from $result, they won't know which result is theirs. The solution for this is to either use a semaphore, or send not just the number, but also your own private result channel.

Coro::Channel can buffer some amount of items.

What is mine, what is ours?

What, exactly, constitutes a thread? Obviously it contains the current point of execution. Not so obviously, it also has to include all lexical variables, that means, every thread has its own set of lexical variables. To see why this is necessary, consider this program:

use Coro;

sub printit {
   my ($string) = @_;

   cede;

   print $string;
}

async { printit "Hello, " };
async { printit "World!\n" };

cede; cede; # do it

The above prints Hello, World!\n. If printit wouldn't have its own per-thread $string variable, it would probably print World!\nWorld\n, which is rather unexpected, and would make it very difficult to make good use of threads.

There are quite a number of other things that are per-thread:

$_, @_, $@ and the regex result vars, $&, %+, $1, $2, ...

$_ is used much like a local variable, so it gets localised per-thread. The same is true for regex results ($1, $2 and so on).

@_ contains the arguments, so like lexicals, it also must be per-thread.

$@ is not obviously required to be per-thread, but it is quite useful.

$/ and the default output file handle

Threads most often block when doing I/O. Since $/ is used when reading lines, it would be very inconvenient if it were a shared variable, so it is per-thread.

The default output handle (see select) is a difficult case: sometimes being global is preferable, sometimes per-thread is preferable. Since per-thread seems to be more common, it is per-thread.

$SIG{__DIE__} and $SIG{__WARN__}

If these weren't per-thread, then common constructs such as:

eval {
   local $SIG{__DIE__} = sub { ... };
   ...
};

Would not allow coroutine switching. Since exception-handling is per-thread, those variables should be per-thread too.

Lots of other esoteric stuff

For example, $^H is per-thread. Most of the additional per-thread state is not directly visible to perl, but required to make the interpreter work. You won't normally notice these.

Everything else is shared between all threads. For example, the globals $a and $b are shared. When does that matter? When using sort, these variables become special, and therefore, switching threads when sorting might have surprising results.

Other examples are the $!, errno, $., the current input line number, $,, $\, $" and many other special variables.

While in some cases a good argument could be made for localising them to the thread, they are rarely used, and sometimes hard to localise.

Future versions of Coro might include more per-thread state when it becomes a problem.

Debugging

Sometimes it can be useful to find out what each thread is doing (or which threads exist in the first place). The Coro::Debug module has (among other goodies), a function that allows you to print a "ps"-like listing:

use Coro::Debug;

Coro::Debug::command "ps";

Running it just after $calculate->get outputs something similar to this:

     PID SC  RSS USES Description              Where
 8917312 -C  22k    0 [main::]                 [introscript:20]
 8964448 N-  152    0 [coro manager]           -
 8964520 N-  152    0 [unblock_sub scheduler]  -
 8591752 UC  152    1                          [introscript:12]
11546944 N-  152    0 [EV idle process]        -

Interesting - there is more going on in the background than one would expect. Ignoring the extra threads, the main thread has pid 8917312, and the one started by async has pid 8591752.

The latter is also the only thread that doesn't have a description, simply because we haven't set one. Setting one is easy, just put it into $Coro::current->{desc}:

async {
   $Coro::current->{desc} = "cruncher";
   ...
};

This can be rather useful when debugging a program, or when using the interactive debug shell of Coro::Debug.

The Real World - Event Loops

Coro really wants to run in a program using some event loop. In fact, most real-world programs using Coro's threads are written in a combination of event-based and thread-based techniques, as it is easy to get the best of both worlds with Coro.

Coro integrates well into any event loop supported by AnyEvent, simply by useing Coro::AnyEvent, but can take special advantage of the EV and Event modules.

Here is a simple finger client, using whatever event loop AnyEvent comes up with (Coro::Socket automatically initialises all the event stuff):

use Coro;
use Coro::Socket;

sub finger {
   my ($user, $host) = @_;

   my $fh = new Coro::Socket PeerHost => $host, PeerPort => "finger"
      or die "$user\@$host: $!";

   print $fh "$user\n";

   print "$user\@$host: $_" while <$fh>;
   print "$user\@$host: done\n";
}

# now finger a few accounts
for (
   (async { finger "abc", "cornell.edu" }),
   (async { finger "sebbo", "world.std.com" }),
   (async { finger "trouble", "noc.dfn.de" }),
) {
   $_->join; # wait for the result
}

There are quite a few new things here. First of all, there is Coro::Socket. This module works much the same way as IO::Socket::INET, except that it is coroutine-aware. This means that IO::Socket::INET, when waiting for the network, will block the whole process - that means all threads, which is clearly undesirable.

On the other hand, Coro::Socket knows how to give up the CPU to other threads when it waits for the network, which makes parallel processing possible.

The other new thing is the join method: All we want to do in this example is start three async threads and only exit when they have done their job. This could be done using a counting semaphore, but it is much simpler to synchronously wait for them to terminate, which is exactly what the join method does.

It doesn't matter that the three async's will probably finish in a different order then the for loop joins them - when the thread is still running, join simply waits. If the thread has already terminated, it will simply fetch its return status.

If you are experienced in event-based programming, you will see that the above program doesn't quite follow the normal pattern, where you start some work, and then run the event loop (e.v. EV::loop).

In fact, nontrivial programs follow this pattern even with Coro, so a Coro program that uses EV usually looks like this:

use EV;
use Coro;

# start coroutines or event watchers

EV::loop; # and loop

In fact, for debugging, you often do something like this:

use EV;
use Coro::Debug;

my $shell = new_unix_server Coro::Debug "/tmp/myshell";

EV::loop; # and loop

This runs your program, but also an interactive shell on the unix domain socket in /tmp/myshell. You can use the socat program to access it:

# socat readline /tmp/myshell
coro debug session. use help for more info

> ps
        PID SC  RSS USES Description              Where
  136672312 RC  19k 177k [main::]                 [myprog:28]
  136710424 -- 1268   48 [coro manager]           [Coro.pm:349]
> help
ps [w|v]                show the list of all coroutines (wide, verbose)
bt <pid>                show a full backtrace of coroutine <pid>
eval <pid> <perl>       evaluate <perl> expression in context of <pid>
trace <pid>             enable tracing for this coroutine
untrace <pid>           disable tracing for this coroutine
kill <pid> <reason>     throws the given <reason> string in <pid>
cancel <pid>            cancels this coroutine
ready <pid>             force <pid> into the ready queue
<anything else>         evaluate as perl and print results
<anything else> &       same as above, but evaluate asynchronously
                        you can use (find_coro <pid>) in perl expressions
                        to find the coro with the given pid, e.g.
                        (find_coro 9768720)->ready
loglevel <int>          enable logging for messages of level <int> and lower
exit                    end this session

Microsft victims can of course use the even less secure new_tcp_server constructor.

The Real World - File I/O

Disk I/O, while often much faster than the network, nevertheless can take quite a long time in which the CPU could do other things, if one would only be able to do something.

Fortunately, the IO::AIO module on CPAN allows you to move these I/O calls into the background, letting you do useful work in the foreground. It is event-/callback-based, but Coro has a nice interface to it, called Coro::AIO, which let's you use its functions naturally from within threads:

use Fcntl;
use Coro::AIO;

my $fh = aio_open "$filename~", O_WRONLY | O_CREAT, 0600
   or die "$filename~: $!";

aio_write $fh, 0, (length $data), $data, 0;
aio_fsync $fh;
aio_close $fh;
aio_rename "$filename~", "$filename";

The above creates a new file, writes data into it, syncs the data to disk and atomically replaces a base file with a new copy.

Other Modules

This introduction only mentions a very few methods and modules, Coro has many other functions (see the Coro manpage) and modules (documented in the SEE ALSO section of the Coro manpage).

Noteworthy modules are Coro::LWP (for parallel LWP requests, but see AnyEvent::HTTP for a better HTTP-only alternative), Coro::BDB, for when you need an asynchronous database, Coro::Handle, when you need to use any file handle in a coroutine (popular to access STDIN and STDOUT) and Coro::EV, the optimised interface to EV (which gets used automatically by Coro::AnyEvent).

AUTHOR

Marc Lehmann <schmorp@schmorp.de>
http://home.schmorp.de/