package Mojo::Reactor::Epoll;
use Mojo::Base 'Mojo::Reactor';

$ENV{MOJO_REACTOR} ||= 'Mojo::Reactor::Epoll';

use Carp 'croak';
use Linux::Epoll;
use List::Util 'min';
use Mojo::Util qw(md5_sum steady_time);
use Scalar::Util 'weaken';
use Time::HiRes 'usleep';

use constant DEBUG => $ENV{MOJO_REACTOR_EPOLL_DEBUG} || 0;

our $VERSION = '0.007';

sub again {
	my ($self, $id) = @_;
	croak 'Timer not active' unless my $timer = $self->{timers}{$id};
	$timer->{time} = steady_time + $timer->{after};
}

sub io {
	my ($self, $handle, $cb) = @_;
	my $fd = fileno $handle;
	$self->{io}{$fd}{cb} = $cb;
	warn "-- Set IO watcher for $fd\n" if DEBUG;
	return $self->watch($handle, 1, 1);
}

sub is_running { !!shift->{running} }

sub new {
	my $class = shift;
	my $self = $class->SUPER::new(@_);
	$self->{epoll} = Linux::Epoll->new;
	return $self;
}

sub next_tick {
	my ($self, $cb) = @_;
	push @{$self->{next_tick}}, $cb;
	$self->{next_timer} //= $self->timer(0 => \&_next);
	return undef;
}

sub one_tick {
	my $self = shift;
	
	# Just one tick
	local $self->{running} = 1 unless $self->{running};
	
	# Wait for one event
	my $i;
	until ($i || !$self->{running}) {
		# Stop automatically if there is nothing to watch
		return $self->stop unless keys %{$self->{timers}} || keys %{$self->{io}};
		
		# Calculate ideal timeout based on timers
		my $min = min map { $_->{time} } values %{$self->{timers}};
		my $timeout = defined $min ? ($min - steady_time) : 0.5;
		$timeout = 0 if $timeout < 0;
		
		# I/O
		if (my $watched = keys %{$self->{io}}) {
			# Set max events to half the number of descriptors, to a minimum of 10
			my $maxevents = int $watched/2;
			$maxevents = 10 if $maxevents < 10;
			
			my $count = $self->{epoll}->wait($maxevents, $timeout);
			$i += $count if defined $count;
		}
		
		# Wait for timeout if epoll can't be used
		elsif ($timeout) { usleep $timeout * 1000000 }
		
		# Timers (time should not change in between timers)
		my $now = steady_time;
		for my $id (keys %{$self->{timers}}) {
			next unless my $t = $self->{timers}{$id};
			next unless $t->{time} <= $now;
			
			# Recurring timer
			if (exists $t->{recurring}) { $t->{time} = $now + $t->{recurring} }
			
			# Normal timer
			else { $self->remove($id) }
			
			++$i and $self->_try('Timer', $t->{cb}) if $t->{cb};
		}
	}
}

sub recurring { shift->_timer(1, @_) }

sub remove {
	my ($self, $remove) = @_;
	if (ref $remove) {
		my $fd = fileno $remove;
		if (exists $self->{io}{$fd} and exists $self->{io}{$fd}{epoll_cb}) {
			$self->{epoll}->delete($remove);
		}
		warn "-- Removed IO watcher for $fd\n" if DEBUG;
		return !!delete $self->{io}{$fd};
	} else {
		warn "-- Removed timer $remove\n" if DEBUG;
		return !!delete $self->{timers}{$remove};
	}
}

sub reset {
	my $self = shift;
	delete @{$self}{qw(epoll io next_tick next_timer timers)};
	$self->{epoll} = Linux::Epoll->new;
}

sub start {
	my $self = shift;
	$self->{running}++;
	$self->one_tick while $self->{running};
}

sub stop { delete shift->{running} }

sub timer { shift->_timer(0, @_) }

sub watch {
	my ($self, $handle, $read, $write) = @_;
	
	my $fd = fileno $handle;
	croak 'I/O watcher not active' unless my $io = $self->{io}{$fd};
	
	my @events;
	push @events, 'in', 'prio' if $read;
	push @events, 'out' if $write;
	
	my $op = exists $io->{epoll_cb} ? 'modify' : 'add';
	
	weaken $self;
	my $cb = $io->{epoll_cb} // sub {
		my ($events) = @_;
		if ($events->{in} or $events->{prio} or $events->{hup} or $events->{err}) {
			return unless exists $self->{io}{$fd};
			$self->_try('I/O watcher', $self->{io}{$fd}{cb}, 0);
		}
		if ($events->{out} or $events->{hup} or $events->{err}) {
			return unless exists $self->{io}{$fd};
			$self->_try('I/O watcher', $self->{io}{$fd}{cb}, 1);
		}
	};
	$self->{epoll}->$op($handle, \@events, $cb);
	
	# Cache callback for future modify operations, after successfully added to epoll
	$io->{epoll_cb} //= $cb;
	
	return $self;
}

sub _id {
	my $self = shift;
	my $id;
	do { $id = md5_sum 't' . steady_time . rand 999 } while $self->{timers}{$id};
	return $id;
}

sub _next {
	my $self = shift;
	delete $self->{next_timer};
	while (my $cb = shift @{$self->{next_tick}}) { $self->$cb }
}

sub _timer {
	my ($self, $recurring, $after, $cb) = @_;
	
	my $id = $self->_id;
	my $timer = $self->{timers}{$id}
		= {cb => $cb, after => $after, time => steady_time + $after};
	$timer->{recurring} = $after if $recurring;
	
	if (DEBUG) {
		my $is_recurring = $recurring ? ' (recurring)' : '';
		warn "-- Set timer $id after $after seconds$is_recurring\n";
	}
	
	return $id;
}

sub _try {
	my ($self, $what, $cb) = (shift, shift, shift);
	eval { $self->$cb(@_); 1 } or $self->emit(error => "$what failed: $@");
}

=head1 NAME

Mojo::Reactor::Epoll - epoll backend for Mojo::Reactor

=head1 SYNOPSIS

  use Mojo::Reactor::Epoll;

  # Watch if handle becomes readable or writable
  my $reactor = Mojo::Reactor::Epoll->new;
  $reactor->io($first => sub {
    my ($reactor, $writable) = @_;
    say $writable ? 'First handle is writable' : 'First handle is readable';
  });

  # Change to watching only if handle becomes writable
  $reactor->watch($first, 0, 1);

  # Turn file descriptor into handle and watch if it becomes readable
  my $second = IO::Handle->new_from_fd($fd, 'r');
  $reactor->io($second => sub {
    my ($reactor, $writable) = @_;
    say $writable ? 'Second handle is writable' : 'Second handle is readable';
  })->watch($second, 1, 0);

  # Add a timer
  $reactor->timer(15 => sub {
    my $reactor = shift;
    $reactor->remove($first);
    $reactor->remove($second);
    say 'Timeout!';
  });

  # Start reactor if necessary
  $reactor->start unless $reactor->is_running;

  # Or in an application using Mojo::IOLoop
  use Mojo::Reactor::Epoll;
  use Mojo::IOLoop;
  
  # Or in a Mojolicious application
  $ MOJO_REACTOR=Mojo::Reactor::Epoll hypnotoad script/myapp

=head1 DESCRIPTION

L<Mojo::Reactor::Epoll> is an event reactor for L<Mojo::IOLoop> that uses the
L<epoll(7)> Linux subsystem. The usage is exactly the same as other
L<Mojo::Reactor> implementations such as L<Mojo::Reactor::Poll>.
L<Mojo::Reactor::Epoll> will be used as the default backend for L<Mojo::IOLoop>
if it is loaded before L<Mojo::IOLoop> or any module using the loop. However,
when invoking a L<Mojolicious> application through L<morbo> or L<hypnotoad>,
the reactor must be set as the default by setting the C<MOJO_REACTOR>
environment variable to C<Mojo::Reactor::Epoll>.

=head1 EVENTS

L<Mojo::Reactor::Epoll> inherits all events from L<Mojo::Reactor>.

=head1 METHODS

L<Mojo::Reactor::Epoll> inherits all methods from L<Mojo::Reactor> and
implements the following new ones.

=head2 again

  $reactor->again($id);

Restart timer. Note that this method requires an active timer.

=head2 io

  $reactor = $reactor->io($handle => sub {...});

Watch handle for I/O events, invoking the callback whenever handle becomes
readable or writable.

  # Callback will be invoked twice if handle becomes readable and writable
  $reactor->io($handle => sub {
    my ($reactor, $writable) = @_;
    say $writable ? 'Handle is writable' : 'Handle is readable';
  });

=head2 is_running

  my $bool = $reactor->is_running;

Check if reactor is running.

=head2 next_tick

  my $undef = $reactor->next_tick(sub {...});

Invoke callback as soon as possible, but not before returning or other
callbacks that have been registered with this method, always returns C<undef>.

=head2 one_tick

  $reactor->one_tick;

Run reactor until an event occurs or no events are being watched anymore. Note
that this method can recurse back into the reactor, so you need to be careful.

  # Don't block longer than 0.5 seconds
  my $id = $reactor->timer(0.5 => sub {});
  $reactor->one_tick;
  $reactor->remove($id);

=head2 recurring

  my $id = $reactor->recurring(0.25 => sub {...});

Create a new recurring timer, invoking the callback repeatedly after a given
amount of time in seconds.

=head2 remove

  my $bool = $reactor->remove($handle);
  my $bool = $reactor->remove($id);

Remove handle or timer.

=head2 reset

  $reactor->reset;

Remove all handles and timers.

=head2 start

  $reactor->start;

Start watching for I/O and timer events, this will block until L</"stop"> is
called or no events are being watched anymore.

  # Start reactor only if it is not running already
  $reactor->start unless $reactor->is_running;

=head2 stop

  $reactor->stop;

Stop watching for I/O and timer events.

=head2 timer

  my $id = $reactor->timer(0.5 => sub {...});

Create a new timer, invoking the callback after a given amount of time in
seconds.

=head2 watch

  $reactor = $reactor->watch($handle, $readable, $writable);

Change I/O events to watch handle for with true and false values. Note that
this method requires an active I/O watcher.

  # Watch only for readable events
  $reactor->watch($handle, 1, 0);

  # Watch only for writable events
  $reactor->watch($handle, 0, 1);

  # Watch for readable and writable events
  $reactor->watch($handle, 1, 1);

  # Pause watching for events
  $reactor->watch($handle, 0, 0);

=head1 BUGS

Report any issues on the public bugtracker.

=head1 AUTHOR

Dan Book, C<dbook@cpan.org>

=head1 COPYRIGHT AND LICENSE

Copyright 2015, Dan Book.

This library is free software; you may redistribute it and/or modify it under
the terms of the Artistic License version 2.0.

=head1 SEE ALSO

L<Mojolicious>, L<Mojo::IOLoop>, L<Linux::Epoll>

=cut

1;