package XAS::Spooler::Connector;

our $VERSION = '0.02';

use POE;
use Try::Tiny;
use XAS::Lib::POE::PubSub;

use XAS::Class
  debug      => 0,
  version    => $VERSION,
  base       => 'XAS::Lib::Stomp::POE::Client',
  accessors  => 'events',
  constants  => 'TRUE FALSE ARRAY',
  codec      => 'JSON',
  filesystem => 'File',
  vars => {
    PARAMS => {
      -hostname  => { optional => 1, default => undef },
    }
  }
;

#use Data::Dumper;

# ---------------------------------------------------------------------
# Public Events
# ---------------------------------------------------------------------

sub handle_receipt {
    my ($self, $frame) = @_[OBJECT, ARG0];

    my $alias = $self->alias;
    my ($palias, $filename) = split(';', $frame->header->receipt_id);

    $self->log->debug("$alias: alias = $palias, file = $filename");

    $poe_kernel->post($palias, 'unlink_file', $filename);

}

sub connection_down {
    my ($self) = $_[OBJECT];

    my $alias = $self->alias;

    $self->log->debug("$alias: entering connection_down()");

    $self->events->publish(
        -event => 'pause_processing'
    );

    $self->log->debug("$alias: leaving connection_down()");

}

sub connection_up {
    my ($self) = $_[OBJECT];

    my $alias = $self->alias;

    $self->log->debug("$alias: entering connection_up()");

    $self->events->publish(
        -event => 'resume_processing'
    );

    $self->log->debug("$alias: leaving connection_up()");

}

sub send_packet {
    my ($self, $palias, $type, $queue, $data, $file) = @_[OBJECT,ARG0..ARG4];

    my $alias = $self->alias;

    try {

        my $message = {
            hostname  => $self->hostname,
            timestamp => time(),
            type      => $type,
            data      => decode($data),
        };

        my $packet = encode($message);

        my $frame = $self->stomp->send(
            -destination => $queue, 
            -message     => $packet, 
            -receipt     => sprintf("%s;%s", $palias, $file->name),
            -persistent  => 'true'
        );

        $self->log->info("$alias: sending $file to $queue");

        $poe_kernel->call($alias, 'write_data', $frame);

    } catch {

        my $ex = $_;

        $self->log->error("$alias: unable to encode/decode packet, reason: $ex");
        $self->log->debug("$alias: alias = $palias, file = $file");

        $poe_kernel->post($palias, 'unlink_file', $file);

    };

}

# ---------------------------------------------------------------------
# Public Methods
# ---------------------------------------------------------------------

sub session_initialize {
    my $self = shift;

    my $alias = $self->alias;

    $self->log->debug("$alias: entering session_initialize()");

    $poe_kernel->state('send_packet', $self);

    # walk the chain

    $self->SUPER::session_initialize();

    $self->log->debug("$alias: leaving session_initialize()");

}

# ---------------------------------------------------------------------
# Private Methods
# ---------------------------------------------------------------------

sub init {
    my $class = shift;

    my $self = $class->SUPER::init(@_);

    unless (defined($self->{'hostname'})) {

        $self->{'hostname'} = $self->env->host;

    }

    $self->{'events'} = XAS::Lib::POE::PubSub->new();

    return $self;

}

1;

__END__

=head1 NAME

XAS::Spooler::Connector - Perl extension for the XAS environment

=head1 SYNOPSIS

  use XAS::Spooler::Connector;

  my $connection = XAS::Spooler::Connector->new(
      -alias           => 'connector',
      -host            => $hostname,
      -port            => $port,
      -retry_reconnect => TRUE,
      -tcp_keepalive   => TRUE,
      -hostname        => $env->host,
  );

=head1 DESCRIPTION

This module connects to a message queue server for spoolers. All of the spool
processors funnel messages thru this module. If the connection is lost to
the server, it signals the processor to stop processing until it is able to
reconnect to the server.

=head1 METHODS

=head2 new

This method inherits from L<XAS::Lib::Stomp::POE::Client|XAS::Lib::Stomp::POE::Client>
and takes these additional parameters:

=over

=item B<-hostname>

An optional name for the host that is processing these spool files.

=back

=head1 PUBLIC EVENTS

=head2 connection_down(OBJECT)

This event broadcasts that the connection had been dropped.

=over 4

=item B<OBJECT>

The handle for the current self.

=back

=head2 connection_up(OBJECT)

This event broadcasts when the connection is established.

=over 4

=item B<OBJECT>

The handle for the current self.

=back

=head2 send_packet(OBJECT,ARG0, ARG1, ARG2, ARG3, ARG4)

Process the data received from the processors. This processing includes
creating the standard message header, decoding the data and creating a
serialized message using JSON. This message is then sent to message queue
server.

=over 4

=item B<OBJECT>

The handle for the current self.

=item B<ARG0>

The alias of the processor.

=item B<ARG1>

The type of data.

=item B<ARG2>

The queue to send the message too.

=item B<ARG3>

The actual data to process. This is usually a JSON formated string.

=item B<ARG4>

The full qualified name of the file that was processed. This, along with
the processor alias, is used for the STOMP receipt.

=back

=head1 SEE ALSO

=over 4

=item L<XAS::Spooler::Processor|XAS::Spooler::Processor>

=item L<XAS::Spooler|XAS::Spooler>

=item L<XAS|XAS>

=back

=head1 AUTHOR

Kevin L. Esteb, E<lt>kevin@kesteb.usE<gt>

=head1 COPYRIGHT AND LICENSE

Copyright (c) 2012-2015 Kevin L. Esteb

This is free software; you can redistribute it and/or modify it under
the terms of the Artistic License 2.0. For details, see the full text
of the license at http://www.perlfoundation.org/artistic_license_2_0.

=cut