NAME

AnyEvent::InfluxDB - An asynchronous library for InfluxDB time-series database

VERSION

version 0.01

SYNOPSIS

use EV;
use AnyEvent;
use AnyEvent::Socket;
use AnyEvent::Handle;
use AnyEvent::InfluxDB;

my $db = AnyEvent::InfluxDB->new(
    server => 'http://localhost:8086',
    username => 'admin',
    password => 'password',
);

my $hdl;
tcp_server undef, 8888, sub {
    my ($fh, $host, $port) = @_;

    $hdl = AnyEvent::Handle->new(
        fh => $fh,
    );

    $hdl->push_read(
        line => sub {
            my (undef, $line) = @_;

            $db->write(
                database => 'mydb',
                data => $line,
                on_success => sub { print "$line written\n"; },
                on_error => sub { print "$line error: @_\n"; },
            );

            $hdl->on_drain(
                sub {
                    $hdl->fh->close;
                    undef $hdl;
                }
            );
        },
    );
};

EV::run;

DESCRIPTION

Asynchronous client library for InfluxDB time-series database v0.9.2 https://influxdb.com.

METHODS

new

my $db = AnyEvent::InfluxDB->new(
    server => 'http://localhost:8086',
    username => 'admin',
    password => 'password',
);

Returns object representing given server server connected using optionally provided username username and password password.

Default value of server is http://localhost:8086.

If the server protocol is https then by default no validation of remote host certificate is performed. This can be changed by setting ssl_options parameter with any options accepted by AnyEvent::TLS.

my $db = AnyEvent::InfluxDB->new(
    server => 'https://localhost:8086',
    username => 'admin',
    password => 'password',
    ssl_options => {
        verify => 1,
        verify_peername => 'https',
        ca_file => '/path/to/cacert.pem',
    }
);

As an debugging aid the on_request code reference may also be provided. It will be executed before each request with the method name, url and POST data if set.

my $db = AnyEvent::InfluxDB->new(
    on_request => sub {
        my ($method, $url, $post_data) = @_;
        print "$method $url\n";
        print "$post_data\n" if $post_data;
    }
);

Database Management

create_database

$cv = AE::cv;
$db->create_database(
    database => "mydb",
    on_success => $cv,
    on_error => sub {
        $cv->croak("Failed to create database: @_");
    }
);
$cv->recv;

Creates specified by database argument database.

The required on_success code reference is executed if request was successful, otherwise executes the required on_error code reference.

drop_database

$cv = AE::cv;
$db->drop_database(
    database => "mydb",
    on_success => $cv,
    on_error => sub {
        $cv->croak("Failed to drop database: @_");
    }
);
$cv->recv;

Drops specified by database argument database.

The required on_success code reference is executed if request was successful, otherwise executes the required on_error code reference.

show_databases

$cv = AE::cv;
$db->show_databases(
    on_success => $cv,
    on_error => sub {
        $cv->croak("Failed to list databases: @_");
    }
);
my @db_names = $cv->recv;
print "$_\n" for @db_names;

Returns list of known database names.

The required on_success code reference is executed if request was successful, otherwise executes the required on_error code reference.

Retention Policy Management

create_retention_policy

$cv = AE::cv;
$db->create_retention_policy(
    name => 'last_day',
    database => 'mydb',
    duration => '1d',
    replication => 1,
    default => 0,

    on_success => $cv,
    on_error => sub {
        $cv->croak("Failed to create retention policy: @_");
    }
);
$cv->recv;

Creates new retention policy named by name on database database with duration duration and replication factor replication. If default is provided and true the created retention policy becomes the default one.

The required on_success code reference is executed if request was successful, otherwise executes the required on_error code reference.

alter_retention_policy

$cv = AE::cv;
$db->alter_retention_policy(
    name => 'last_day',
    database => 'mydb',

    duration => '1d',
    replication => 1,
    default => 0,

    on_success => $cv,
    on_error => sub {
        $cv->croak("Failed to alter retention policy: @_");
    }
);
$cv->recv;

Modifies retention policy named by name on database database. At least one of duration duration, replication factor replication or flag default must be set.

The required on_success code reference is executed if request was successful, otherwise executes the required on_error code reference.

show_retention_policies

$cv = AE::cv;
$db->show_retention_policies(
    database => 'mydb',

    on_success => $cv,
    on_error => sub {
        $cv->croak("Failed to list retention policies: @_");
    }
);
my @retention_policies = $cv->recv;
for my $rp ( @retention_policies ) {
    print "Name: $rp->{name}\n";
    print "Duration: $rp->{duration}\n";
    print "Replication factor: $rp->{replicaN}\n";
    print "Default?: $rp->{default}\n";
}

Returns a list of hash references with keys name, duration, replicaN and default for each replication policy defined on database database.

The required on_success code reference is executed if request was successful, otherwise executes the required on_error code reference.

User Management

create_user

$cv = AE::cv;
$db->create_user(
    username => 'jdoe',
    password => 'mypassword',
    all_privileges => 1,

    on_success => $cv,
    on_error => sub {
        $cv->croak("Failed to create user: @_");
    }
);
$cv->recv;

Creates user with username and password. If flag all_privileges is set to true created user will be granted cluster administration privileges.

Note: password will be automatically enclosed in single quotes.

The required on_success code reference is executed if request was successful, otherwise executes the required on_error code reference.

set_user_password

$cv = AE::cv;
$db->set_user_password(
    username => 'jdoe',
    password => 'otherpassword',

    on_success => $cv,
    on_error => sub {
        $cv->croak("Failed to set password: @_");
    }
);
$cv->recv;

Sets password to password for the user identified by username.

The required on_success code reference is executed if request was successful, otherwise executes the required on_error code reference.

show_users

$cv = AE::cv;
$db->show_users(
    on_success => $cv,
    on_error => sub {
        $cv->croak("Failed to list users: @_");
    }
);
my @users = $cv->recv;
for my $u ( @users ) {
    print "Name: $u->{user}\n";
    print "Admin?: $u->{admin}\n";
}

Returns a list of hash references with keys user and admin for each defined user.

The required on_success code reference is executed if request was successful, otherwise executes the required on_error code reference.

grant_privileges

$cv = AE::cv;
$db->grant_privileges(
    username => 'jdoe',

    # privileges at single database
    database => 'mydb',
    access => 'ALL',

    # or to grant cluster administration privileges
    all_privileges => 1,

    on_success => $cv,
    on_error => sub {
        $cv->croak("Failed to grant privileges: @_");
    }
);
$cv->recv;

Grants to user username access access on database database. If flag all_privileges is set it grants cluster administration privileges instead.

The required on_success code reference is executed if request was successful, otherwise executes the required on_error code reference.

revoke_privileges

$cv = AE::cv;
$db->revoke_privileges(
    username => 'jdoe',

    # privileges at single database
    database => 'mydb',
    access => 'WRITE',

    # or to revoke cluster administration privileges
    all_privileges => 1,

    on_success => $cv,
    on_error => sub {
        $cv->croak("Failed to revoke privileges: @_");
    }
);
$cv->recv;

Revokes from user username access access on database database. If flag all_privileges is set it revokes cluster administration privileges instead.

The required on_success code reference is executed if request was successful, otherwise executes the required on_error code reference.

drop_user

$cv = AE::cv;
$db->drop_user(
    username => 'jdoe',

    on_success => $cv,
    on_error => sub {
        $cv->croak("Failed to drop user: @_");
    }
);
$cv->recv;

Drops user username.

The required on_success code reference is executed if request was successful, otherwise executes the required on_error code reference.

Schema Exploration

show_measurements

$cv = AE::cv;
$db->show_measurements(
    database => 'mydb',
    where => "host = 'server02'",

    on_success => $cv,
    on_error => sub {
        $cv->croak("Failed to list measurements: @_");
    }
);
my @measurements = $cv->recv;
print "$_\n" for @measurements;

Returns names of measurements from database database, filtered by optional where clause.

The required on_success code reference is executed if request was successful, otherwise executes the required on_error code reference.

drop_measurement

$cv = AE::cv;
$db->drop_measurement(
    database => 'mydb',
    measurement => 'cpu_load',

    on_success => $cv,
    on_error => sub {
        $cv->croak("Failed to drop measurement: @_");
    }
);
$cv->recv;

Drops measurement measurement.

The required on_success code reference is executed if request was successful, otherwise executes the required on_error code reference.

show_series

$cv = AE::cv;
$db->show_series(
    database => 'mydb',

    measurement => 'cpu_load',
    where => "host = 'server02'",

    on_success => $cv,
    on_error => sub {
        $cv->croak("Failed to list series: @_");
    }
);
my $series = $cv->recv;
for my $measurement ( sort keys %{ $series } ) {
    print "Measurement: $measurement\n";
    for my $s ( @{ $series->{$measurement} } ) {
        print " * $_: $s->{$_}\n" for sort keys %{ $s };
    }
}

Returns from database database and optional measurement measurement, optionally filtered by the where clause, an hash reference with measurements as keys and their unique tag sets as values.

The required on_success code reference is executed if request was successful, otherwise executes the required on_error code reference.

drop_series

$cv = AE::cv;
$db->drop_series(
    database => 'mydb',
    measurement => 'cpu_load',
    where => "host = 'server02'",

    on_success => $cv,
    on_error => sub {
        $cv->croak("Failed to drop measurement: @_");
    }
);
$cv->recv;

Drops series from measurement measurement filtered by where clause from database database.

The required on_success code reference is executed if request was successful, otherwise executes the required on_error code reference.

show_tag_keys

$cv = AE::cv;
$db->show_tag_keys(
    database => 'mydb',

    measurement => 'cpu_load',

    on_success => $cv,
    on_error => sub {
        $cv->croak("Failed to list tag keys: @_");
    }
);
my $tag_keys = $cv->recv;
for my $measurement ( sort keys %{ $tag_keys } ) {
    print "Measurement: $measurement\n";
    print " * $_\n" for @{ $tag_keys->{$measurement} };
}

Returns from database database and optional measurement measurement, optionally filtered by the where clause, an hash reference with measurements as keys and their unique tag keys as values.

The required on_success code reference is executed if request was successful, otherwise executes the required on_error code reference.

show_tag_values

$cv = AE::cv;
$db->show_tag_values(
    database => 'mydb',

    measurement => 'cpu_load',

    # single key
    key => 'host',
    # or a list of keys
    keys => [qw( host region )],

    where => "host = 'server02'",

    on_success => $cv,
    on_error => sub {
        $cv->croak("Failed to list tag values: @_");
    }
);
my $tag_values = $cv->recv;
for my $tag_key ( sort keys %{ $tag_values } ) {
    print "Tag key: $tag_key\n";
    print " * $_\n" for @{ $tag_values->{$tag_key} };
}

Returns from database database and optional measurement measurement, values from a single tag key key or a list of tag keys keys, optionally filtered by the where clause, an hash reference with tag keys as keys and their unique tag values as values.

The required on_success code reference is executed if request was successful, otherwise executes the required on_error code reference.

Continuous Queries

create_continuous_query

$cv = AE::cv;
$db->create_continuous_query(
    database => 'mydb',
    name => 'per5minutes',
    query => 'SELECT MEAN(value) INTO "cpu_load_per5m" FROM cpu_load GROUP BY time(5m)',

    on_success => $cv,
    on_error => sub {
        $cv->croak("Failed to create continuous query: @_");
    }
);
$cv->recv;

Creates new continuous query named by name on database database using query query.

The required on_success code reference is executed if request was successful, otherwise executes the required on_error code reference.

drop_continuous_query

$cv = AE::cv;
$db->drop_continuous_query(
    database => 'mydb',
    name => 'per5minutes',

    on_success => $cv,
    on_error => sub {
        $cv->croak("Failed to drop continuous query: @_");
    }
);
$cv->recv;

Drops continuous query named by name on database database.

The required on_success code reference is executed if request was successful, otherwise executes the required on_error code reference.

show_continuous_queries

$cv = AE::cv;
$db->show_continuous_queries(
    database => 'mydb',

    on_success => $cv,
    on_error => sub {
        $cv->croak("Failed to list continuous queries: @_");
    }
);
my @continuous_queries = $cv->recv;
for my $cq ( @continuous_queries ) {
    print "Name: $cq->{name}\n";
    print "Query: $cq->{query}\n";
}

Returns a list of hash references with keys name and query for each continuous query defined on database database.

The required on_success code reference is executed if request was successful, otherwise executes the required on_error code reference.

Writing Data

write

$cv = AE::cv;
$db->write(
    database => 'mydb',
    precision => 'n',
    rp => 'last_day',

    data => [
        # line protocol formatted
        'cpu_load,host=server02,region=eu-east sensor="top",value=0.64 1437868012260500137',

        # or as an hash
        {
            measurement => 'cpu_load',
            tags => {
                host => 'server02',
                region => 'eu-east',
            },
            fields => {
                value => '0.64',
                sensor => '"top"',
            },
            time => time() * 10**9
        }
    ],

    on_success => $cv,
    on_error => sub {
        $cv->croak("Failed to write data: @_");
    }
);
$cv->recv;

Writes to database database and optional retention policy rp, time-series data data with optional precision precision. The data can be specified as single scalar value or as array reference. In either case the scalar variables are expected to be an formatted using line protocol or if hash with required keys measurement and fields and optional tags and time.

The required on_success code reference is executed if request was successful, otherwise executes the required on_error code reference.

Querying Data

select

$cv = AE::cv;
$db->select(
    database => 'mydb',
    measurement => 'cpu_load',
    fields => 'host, count(value)',
    where => "region = 'eu-east' AND time > now() - 7d",

    group_by => 'time(5m), host',
    fill => 'previous',

    order_by => 'ASC',

    limit => 10,

    on_success => $cv,
    on_error => sub {
        $cv->croak("Failed to select data: @_");
    }
);
my $results = $cv->recv;
for my $row ( @{ $results } ) {
    print "Measurement: $row->{name}\n";
    print "Tags:\n";
    print " * $_ = $row->{tags}->{$_}\n" for keys %{ $row->{tags} || {} };
    print "Values:\n";
    for my $value ( @{ $row->{values} || [] } ) {
        print " * $_ = $value->{$_}\n" for keys %{ $value || {} };
    }
}

Executes an select query on database database created from provided arguments measurement measurement, fields to select fields, optional where clause, grouped by group_by and empty values filled with fill, ordered by order_by and number of results limited to limit.

The required on_success code reference is executed if request was successful, otherwise executes the required on_error code reference.

query

$cv = AE::cv;
$db->query(
    query => {
        db => 'mydb',
        q => 'SELECT * FROM cpu_load',
    },
    on_response => $cv,
);
my ($response_data, $response_headers) = $cv->recv;

Executes an arbitrary query using provided in query arguments.

The required on_response code reference is executed with the raw response data and headers as parameters.

CAVEATS

Following the optimistic nature of InfluxDB this modules does not validate any parameters. Also quoting and escaping special characters is to be done by the user of this library.

AUTHOR

Alex J. G. Burzyński <ajgb@cpan.org>

COPYRIGHT AND LICENSE

This software is copyright (c) 2015 by Alex J. G. Burzyński <ajgb@cpan.org>.

This is free software; you can redistribute it and/or modify it under the same terms as the Perl 5 programming language system itself.