NAME
AnyEvent::InfluxDB - An asynchronous library for InfluxDB time-series database
VERSION
version 0.01
SYNOPSIS
use EV;
use AnyEvent;
use AnyEvent::Socket;
use AnyEvent::Handle;
use AnyEvent::InfluxDB;
my $db = AnyEvent::InfluxDB->new(
server => 'http://localhost:8086',
username => 'admin',
password => 'password',
);
my $hdl;
tcp_server undef, 8888, sub {
my ($fh, $host, $port) = @_;
$hdl = AnyEvent::Handle->new(
fh => $fh,
);
$hdl->push_read(
line => sub {
my (undef, $line) = @_;
$db->write(
database => 'mydb',
data => $line,
on_success => sub { print "$line written\n"; },
on_error => sub { print "$line error: @_\n"; },
);
$hdl->on_drain(
sub {
$hdl->fh->close;
undef $hdl;
}
);
},
);
};
EV::run;
DESCRIPTION
Asynchronous client library for InfluxDB time-series database v0.9.2 https://influxdb.com.
METHODS
new
my $db = AnyEvent::InfluxDB->new(
server => 'http://localhost:8086',
username => 'admin',
password => 'password',
);
Returns object representing given server server
connected using optionally provided username username
and password password
.
Default value of server
is http://localhost:8086
.
If the server protocol is https
then by default no validation of remote host certificate is performed. This can be changed by setting ssl_options
parameter with any options accepted by AnyEvent::TLS.
my $db = AnyEvent::InfluxDB->new(
server => 'https://localhost:8086',
username => 'admin',
password => 'password',
ssl_options => {
verify => 1,
verify_peername => 'https',
ca_file => '/path/to/cacert.pem',
}
);
As an debugging aid the on_request
code reference may also be provided. It will be executed before each request with the method name, url and POST data if set.
my $db = AnyEvent::InfluxDB->new(
on_request => sub {
my ($method, $url, $post_data) = @_;
print "$method $url\n";
print "$post_data\n" if $post_data;
}
);
Database Management
create_database
$cv = AE::cv;
$db->create_database(
database => "mydb",
on_success => $cv,
on_error => sub {
$cv->croak("Failed to create database: @_");
}
);
$cv->recv;
Creates specified by database
argument database.
The required on_success
code reference is executed if request was successful, otherwise executes the required on_error
code reference.
drop_database
$cv = AE::cv;
$db->drop_database(
database => "mydb",
on_success => $cv,
on_error => sub {
$cv->croak("Failed to drop database: @_");
}
);
$cv->recv;
Drops specified by database
argument database.
The required on_success
code reference is executed if request was successful, otherwise executes the required on_error
code reference.
show_databases
$cv = AE::cv;
$db->show_databases(
on_success => $cv,
on_error => sub {
$cv->croak("Failed to list databases: @_");
}
);
my @db_names = $cv->recv;
print "$_\n" for @db_names;
Returns list of known database names.
The required on_success
code reference is executed if request was successful, otherwise executes the required on_error
code reference.
Retention Policy Management
create_retention_policy
$cv = AE::cv;
$db->create_retention_policy(
name => 'last_day',
database => 'mydb',
duration => '1d',
replication => 1,
default => 0,
on_success => $cv,
on_error => sub {
$cv->croak("Failed to create retention policy: @_");
}
);
$cv->recv;
Creates new retention policy named by name
on database database
with duration duration
and replication factor replication
. If default
is provided and true the created retention policy becomes the default one.
The required on_success
code reference is executed if request was successful, otherwise executes the required on_error
code reference.
alter_retention_policy
$cv = AE::cv;
$db->alter_retention_policy(
name => 'last_day',
database => 'mydb',
duration => '1d',
replication => 1,
default => 0,
on_success => $cv,
on_error => sub {
$cv->croak("Failed to alter retention policy: @_");
}
);
$cv->recv;
Modifies retention policy named by name
on database database
. At least one of duration duration
, replication factor replication
or flag default
must be set.
The required on_success
code reference is executed if request was successful, otherwise executes the required on_error
code reference.
show_retention_policies
$cv = AE::cv;
$db->show_retention_policies(
database => 'mydb',
on_success => $cv,
on_error => sub {
$cv->croak("Failed to list retention policies: @_");
}
);
my @retention_policies = $cv->recv;
for my $rp ( @retention_policies ) {
print "Name: $rp->{name}\n";
print "Duration: $rp->{duration}\n";
print "Replication factor: $rp->{replicaN}\n";
print "Default?: $rp->{default}\n";
}
Returns a list of hash references with keys name
, duration
, replicaN
and default
for each replication policy defined on database database
.
The required on_success
code reference is executed if request was successful, otherwise executes the required on_error
code reference.
User Management
create_user
$cv = AE::cv;
$db->create_user(
username => 'jdoe',
password => 'mypassword',
all_privileges => 1,
on_success => $cv,
on_error => sub {
$cv->croak("Failed to create user: @_");
}
);
$cv->recv;
Creates user with username
and password
. If flag all_privileges
is set to true created user will be granted cluster administration privileges.
Note: password
will be automatically enclosed in single quotes.
The required on_success
code reference is executed if request was successful, otherwise executes the required on_error
code reference.
set_user_password
$cv = AE::cv;
$db->set_user_password(
username => 'jdoe',
password => 'otherpassword',
on_success => $cv,
on_error => sub {
$cv->croak("Failed to set password: @_");
}
);
$cv->recv;
Sets password to password
for the user identified by username
.
The required on_success
code reference is executed if request was successful, otherwise executes the required on_error
code reference.
show_users
$cv = AE::cv;
$db->show_users(
on_success => $cv,
on_error => sub {
$cv->croak("Failed to list users: @_");
}
);
my @users = $cv->recv;
for my $u ( @users ) {
print "Name: $u->{user}\n";
print "Admin?: $u->{admin}\n";
}
Returns a list of hash references with keys user
and admin
for each defined user.
The required on_success
code reference is executed if request was successful, otherwise executes the required on_error
code reference.
grant_privileges
$cv = AE::cv;
$db->grant_privileges(
username => 'jdoe',
# privileges at single database
database => 'mydb',
access => 'ALL',
# or to grant cluster administration privileges
all_privileges => 1,
on_success => $cv,
on_error => sub {
$cv->croak("Failed to grant privileges: @_");
}
);
$cv->recv;
Grants to user username
access access
on database database
. If flag all_privileges
is set it grants cluster administration privileges instead.
The required on_success
code reference is executed if request was successful, otherwise executes the required on_error
code reference.
revoke_privileges
$cv = AE::cv;
$db->revoke_privileges(
username => 'jdoe',
# privileges at single database
database => 'mydb',
access => 'WRITE',
# or to revoke cluster administration privileges
all_privileges => 1,
on_success => $cv,
on_error => sub {
$cv->croak("Failed to revoke privileges: @_");
}
);
$cv->recv;
Revokes from user username
access access
on database database
. If flag all_privileges
is set it revokes cluster administration privileges instead.
The required on_success
code reference is executed if request was successful, otherwise executes the required on_error
code reference.
drop_user
$cv = AE::cv;
$db->drop_user(
username => 'jdoe',
on_success => $cv,
on_error => sub {
$cv->croak("Failed to drop user: @_");
}
);
$cv->recv;
Drops user username
.
The required on_success
code reference is executed if request was successful, otherwise executes the required on_error
code reference.
Schema Exploration
show_measurements
$cv = AE::cv;
$db->show_measurements(
database => 'mydb',
where => "host = 'server02'",
on_success => $cv,
on_error => sub {
$cv->croak("Failed to list measurements: @_");
}
);
my @measurements = $cv->recv;
print "$_\n" for @measurements;
Returns names of measurements from database database
, filtered by optional where
clause.
The required on_success
code reference is executed if request was successful, otherwise executes the required on_error
code reference.
drop_measurement
$cv = AE::cv;
$db->drop_measurement(
database => 'mydb',
measurement => 'cpu_load',
on_success => $cv,
on_error => sub {
$cv->croak("Failed to drop measurement: @_");
}
);
$cv->recv;
Drops measurement measurement
.
The required on_success
code reference is executed if request was successful, otherwise executes the required on_error
code reference.
show_series
$cv = AE::cv;
$db->show_series(
database => 'mydb',
measurement => 'cpu_load',
where => "host = 'server02'",
on_success => $cv,
on_error => sub {
$cv->croak("Failed to list series: @_");
}
);
my $series = $cv->recv;
for my $measurement ( sort keys %{ $series } ) {
print "Measurement: $measurement\n";
for my $s ( @{ $series->{$measurement} } ) {
print " * $_: $s->{$_}\n" for sort keys %{ $s };
}
}
Returns from database database
and optional measurement measurement
, optionally filtered by the where
clause, an hash reference with measurements as keys and their unique tag sets as values.
The required on_success
code reference is executed if request was successful, otherwise executes the required on_error
code reference.
drop_series
$cv = AE::cv;
$db->drop_series(
database => 'mydb',
measurement => 'cpu_load',
where => "host = 'server02'",
on_success => $cv,
on_error => sub {
$cv->croak("Failed to drop measurement: @_");
}
);
$cv->recv;
Drops series from measurement measurement
filtered by where
clause from database database
.
The required on_success
code reference is executed if request was successful, otherwise executes the required on_error
code reference.
show_tag_keys
$cv = AE::cv;
$db->show_tag_keys(
database => 'mydb',
measurement => 'cpu_load',
on_success => $cv,
on_error => sub {
$cv->croak("Failed to list tag keys: @_");
}
);
my $tag_keys = $cv->recv;
for my $measurement ( sort keys %{ $tag_keys } ) {
print "Measurement: $measurement\n";
print " * $_\n" for @{ $tag_keys->{$measurement} };
}
Returns from database database
and optional measurement measurement
, optionally filtered by the where
clause, an hash reference with measurements as keys and their unique tag keys as values.
The required on_success
code reference is executed if request was successful, otherwise executes the required on_error
code reference.
show_tag_values
$cv = AE::cv;
$db->show_tag_values(
database => 'mydb',
measurement => 'cpu_load',
# single key
key => 'host',
# or a list of keys
keys => [qw( host region )],
where => "host = 'server02'",
on_success => $cv,
on_error => sub {
$cv->croak("Failed to list tag values: @_");
}
);
my $tag_values = $cv->recv;
for my $tag_key ( sort keys %{ $tag_values } ) {
print "Tag key: $tag_key\n";
print " * $_\n" for @{ $tag_values->{$tag_key} };
}
Returns from database database
and optional measurement measurement
, values from a single tag key key
or a list of tag keys keys
, optionally filtered by the where
clause, an hash reference with tag keys as keys and their unique tag values as values.
The required on_success
code reference is executed if request was successful, otherwise executes the required on_error
code reference.
Continuous Queries
create_continuous_query
$cv = AE::cv;
$db->create_continuous_query(
database => 'mydb',
name => 'per5minutes',
query => 'SELECT MEAN(value) INTO "cpu_load_per5m" FROM cpu_load GROUP BY time(5m)',
on_success => $cv,
on_error => sub {
$cv->croak("Failed to create continuous query: @_");
}
);
$cv->recv;
Creates new continuous query named by name
on database database
using query query
.
The required on_success
code reference is executed if request was successful, otherwise executes the required on_error
code reference.
drop_continuous_query
$cv = AE::cv;
$db->drop_continuous_query(
database => 'mydb',
name => 'per5minutes',
on_success => $cv,
on_error => sub {
$cv->croak("Failed to drop continuous query: @_");
}
);
$cv->recv;
Drops continuous query named by name
on database database
.
The required on_success
code reference is executed if request was successful, otherwise executes the required on_error
code reference.
show_continuous_queries
$cv = AE::cv;
$db->show_continuous_queries(
database => 'mydb',
on_success => $cv,
on_error => sub {
$cv->croak("Failed to list continuous queries: @_");
}
);
my @continuous_queries = $cv->recv;
for my $cq ( @continuous_queries ) {
print "Name: $cq->{name}\n";
print "Query: $cq->{query}\n";
}
Returns a list of hash references with keys name
and query
for each continuous query defined on database database
.
The required on_success
code reference is executed if request was successful, otherwise executes the required on_error
code reference.
Writing Data
write
$cv = AE::cv;
$db->write(
database => 'mydb',
precision => 'n',
rp => 'last_day',
data => [
# line protocol formatted
'cpu_load,host=server02,region=eu-east sensor="top",value=0.64 1437868012260500137',
# or as an hash
{
measurement => 'cpu_load',
tags => {
host => 'server02',
region => 'eu-east',
},
fields => {
value => '0.64',
sensor => '"top"',
},
time => time() * 10**9
}
],
on_success => $cv,
on_error => sub {
$cv->croak("Failed to write data: @_");
}
);
$cv->recv;
Writes to database database
and optional retention policy rp
, time-series data data
with optional precision precision
. The data
can be specified as single scalar value or as array reference. In either case the scalar variables are expected to be an formatted using line protocol or if hash with required keys measurement
and fields
and optional tags
and time
.
The required on_success
code reference is executed if request was successful, otherwise executes the required on_error
code reference.
Querying Data
select
$cv = AE::cv;
$db->select(
database => 'mydb',
measurement => 'cpu_load',
fields => 'host, count(value)',
where => "region = 'eu-east' AND time > now() - 7d",
group_by => 'time(5m), host',
fill => 'previous',
order_by => 'ASC',
limit => 10,
on_success => $cv,
on_error => sub {
$cv->croak("Failed to select data: @_");
}
);
my $results = $cv->recv;
for my $row ( @{ $results } ) {
print "Measurement: $row->{name}\n";
print "Tags:\n";
print " * $_ = $row->{tags}->{$_}\n" for keys %{ $row->{tags} || {} };
print "Values:\n";
for my $value ( @{ $row->{values} || [] } ) {
print " * $_ = $value->{$_}\n" for keys %{ $value || {} };
}
}
Executes an select query on database database
created from provided arguments measurement measurement
, fields to select fields
, optional where
clause, grouped by group_by
and empty values filled with fill
, ordered by order_by
and number of results limited to limit
.
The required on_success
code reference is executed if request was successful, otherwise executes the required on_error
code reference.
query
$cv = AE::cv;
$db->query(
query => {
db => 'mydb',
q => 'SELECT * FROM cpu_load',
},
on_response => $cv,
);
my ($response_data, $response_headers) = $cv->recv;
Executes an arbitrary query using provided in query
arguments.
The required on_response
code reference is executed with the raw response data and headers as parameters.
CAVEATS
Following the optimistic nature of InfluxDB this modules does not validate any parameters. Also quoting and escaping special characters is to be done by the user of this library.
AUTHOR
Alex J. G. Burzyński <ajgb@cpan.org>
COPYRIGHT AND LICENSE
This software is copyright (c) 2015 by Alex J. G. Burzyński <ajgb@cpan.org>.
This is free software; you can redistribute it and/or modify it under the same terms as the Perl 5 programming language system itself.