$Hadoop::HDFS::Command::VERSION
=
'0.007'
;
use
5.010;
has
cmd_hdfs
=> (
is
=>
'rw'
,
isa
=>
sub
{
my
$val
=
shift
;
return
if
$val
&& -e
$val
&& -x _;
Carp::confess
sprintf
"The command `%s` either does not exist or not an executable!"
,
$val
,
;
},
default
=>
sub
{
'/usr/bin/hdfs'
},
lazy
=> 1,
);
has
enable_log
=> (
is
=>
'rw'
,
isa
=> Bool,
default
=>
sub
{ 0 },
lazy
=> 1,
);
has
trace_logs
=> (
is
=>
'rw'
,
isa
=> Bool,
default
=>
sub
{ 0 },
lazy
=> 1,
);
has
runas
=> (
is
=>
'rw'
,
isa
=> Str,
default
=>
scalar
getpwuid
$<,
lazy
=> 1,
);
before
[
'_capture'
,
'_capture_with_stdin'
] =>
sub
{
my
(
$self
,
$options
,
@cmd
) =
@_
;
unshift
@cmd
,
'sudo'
,
'-u'
,
$self
->runas
unless
$self
->runas eq
getpwuid
$<;
@_
= (
$self
,
$options
,
@cmd
);
};
sub
dfs {
my
$self
=
shift
;
my
$options
= Ref::Util::is_hashref
$_
[0] ?
shift
(
@_
) : {};
(
my
$cmd
=
shift
||
die
"No dfs command specified"
) =~ s{ \A [-]+ }{}xms;
my
$method
=
'_dfs_'
.
$cmd
;
Carp::croak
"'$cmd' is not implemented!"
if
!
$self
->can(
$method
);
$self
->
$method
(
$options
,
@_
);
}
sub
_dfs_ls {
my
$self
=
shift
;
state
$strp
;
my
$options
=
shift
;
my
@params
=
@_
;
my
@flags
=
qw( d h R )
;
my
(
$arg
,
$paths
) =
$self
->_parse_options(
\
@params
,
\
@flags
,
undef
,
{
require_params
=> 1,
},
);
my
$want_epoch
=
$options
->{want_epoch};
my
$cb
=
delete
$options
->{callback};
if
(
$cb
) {
die
"callback needs to be a CODE"
if
! Ref::Util::is_coderef
$cb
;
if
(
defined
wantarray
) {
Carp::croak
"You need to call this function in void context when callback is specified"
;
}
}
my
@response
=
$self
->_capture(
$options
,
$self
->cmd_hdfs,
qw( dfs -ls )
,
(
map
{
'-'
.
$_
}
grep
{
$arg
->{
$_
} }
@flags
),
@{
$paths
},
);
return
if
!
@response
;
if
(
$response
[0] &&
$response
[0] =~ m{ \A Found \s+ [0-9] }xms ) {
shift
@response
;
}
my
$space
=
q{ }
;
my
@rv
;
for
my
$line
(
@response
) {
my
(
$mode
,
$replication
,
$user
,
$group
,
@unknown
) =
split
m{ \s+ }xms,
$line
, 5;
my
@rest
=
map
{
split
$space
,
$_
}
@unknown
;
my
$size
;
if
(
$arg
->{h}) {
if
(
$rest
[0] eq
'0'
||
$rest
[1] !~ m{ [a-zA-Z_] }xms ) {
$size
=
shift
@rest
;
}
else
{
$size
=
join
$space
,
shift
@rest
,
shift
@rest
;
}
}
else
{
$size
=
shift
@rest
;
}
my
$date
=
join
' '
,
shift
@rest
,
shift
@rest
;
my
$path
=
shift
(
@rest
) ||
die
"Unable to parse $line to gather the path"
;
my
$is_dir
=
$mode
=~ m{ \A [d] }xms ? 1 : 0;
my
%record
= (
mode
=>
$mode
,
replication
=>
$replication
,
user
=>
$user
,
group
=>
$group
,
size
=>
$size
,
date
=>
$date
,
path
=>
$path
,
type
=>
$is_dir
?
'dir'
:
'file'
,
);
if
(
$want_epoch
) {
$strp
||= DateTime::Format::Strptime->new(
pattern
=>
'%Y-%m-%d %H:%M'
,
time_zone
=>
'CET'
,
on_error
=>
'croak'
,
);
eval
{
$record
{epoch} =
$strp
->parse_datetime(
$date
)->epoch;
1;
} or
do
{
my
$eval_error
= $@ ||
'Zombie error'
;
$self
->_log(
debug
=>
'Failed to convert %s into an epoch: %s'
,
$date
,
$eval_error
,
);
};
}
if
(
@rest
) {
$record
{path} =
join
$space
,
$record
{path},
@rest
;
}
if
(
$cb
) {
if
( !
$cb
->( \
%record
) ) {
$self
->_log(
info
=>
'Terminating the ls processing as the user callback did not return a true value.'
);
last
;
}
next
;
}
push
@rv
, {
%record
};
}
return
if
$cb
;
return
@rv
;
}
sub
_dfs_du {
my
$self
=
shift
;
my
$options
=
shift
;
my
@params
=
@_
;
my
@flags
=
qw( h s )
;
my
(
$arg
,
$paths
) =
$self
->_parse_options(
\
@params
,
\
@flags
,
undef
,
{
require_params
=> 1,
},
);
my
@rv
=
$self
->_capture(
$options
,
$self
->cmd_hdfs,
qw( dfs -du )
,
(
map
{
'-'
.
$_
}
grep
{
$arg
->{
$_
} }
@flags
),
@{
$paths
},
) or
die
"No output collected from -du command"
;
return
map
{
my
@val
=
split
m{ \s{2,} }xms,
$_
;
{
size
=>
shift
(
@val
),
name
=>
pop
(
@val
),
(
@val
? (
disk_space_consumed
=>
shift
(
@val
),
) : () ),
}
}
@rv
;
}
sub
_dfs_mv {
my
$self
=
shift
;
my
$options
=
shift
;
my
@params
=
@_
;
my
(
$arg
,
$paths
) =
$self
->_parse_options(
\
@params
,
[],
undef
,
{
require_params
=> 1,
},
);
my
$source
=
shift
@{
$paths
} ||
die
"Source path not specified"
;
my
$target
=
shift
@{
$paths
} ||
die
"Target path not specified"
;
$self
->_capture(
$options
,
$self
->cmd_hdfs,
qw( dfs -mv )
,
$source
=>
$target
,
);
return
;
}
sub
_dfs_rm {
my
$self
=
shift
;
my
$options
=
shift
;
my
@params
=
@_
;
my
@flags
=
qw( f r skipTrash )
;
my
(
$arg
,
$paths
) =
$self
->_parse_options(
\
@params
,
\
@flags
,
undef
,
{
require_params
=> 1,
},
);
my
@response
=
$self
->_capture(
$options
,
$self
->cmd_hdfs,
qw( dfs -rm )
,
(
map
{
'-'
.
$_
}
grep
{
$arg
->{
$_
} }
@flags
),
@{
$paths
},
);
return
@response
;
}
sub
_dfs_put {
my
$self
=
shift
;
my
$options
=
shift
;
my
@params
=
@_
;
my
@flags
=
qw( f p l - )
;
my
(
$arg
,
$paths
) =
$self
->_parse_options(
\
@params
,
\
@flags
,
undef
,
{
require_params
=> 1,
},
);
if
(
$paths
->[0] &&
$paths
->[0] eq
'\-'
) {
shift
@{
$paths
};
$options
->{stdin} =
pop
( @{
$paths
} ) ||
die
"stdin content not specified!"
;
}
if
( @{
$paths
} < (
$options
->{stdin} ? 1 : 2 ) ) {
die
"Missing arguments!"
;
}
my
@response
=
$self
->_capture_with_stdin(
$options
,
$self
->cmd_hdfs,
qw( dfs -put )
,
(
map
{
$_
eq
'-'
?
$_
:
'-'
.
$_
}
grep
{
$arg
->{
$_
} }
@flags
),
(
$options
->{stdin} ?
'-'
: () ),
@{
$paths
},
);
return
@response
;
}
sub
_dfs_test {
my
$self
=
shift
;
my
$options
=
shift
;
my
@params
=
@_
;
my
@flags
=
qw( d e f s z )
;
my
(
$arg
,
$paths
) =
$self
->_parse_options(
\
@params
,
\
@flags
,
undef
,
{
require_params
=> 1,
},
);
eval
{
$self
->_capture(
$options
,
$self
->cmd_hdfs,
qw( dfs -test )
,
(
map
{
'-'
.
$_
}
grep
{
$arg
->{
$_
} }
@flags
),
@{
$paths
},
);
return
1;
} or
return
0;
}
sub
_dfs_mkdir {
my
$self
=
shift
;
my
$options
=
shift
;
my
@params
=
@_
;
my
@flags
=
qw( p )
;
my
(
$arg
,
$paths
) =
$self
->_parse_options(
\
@params
,
\
@flags
,
undef
,
{
require_params
=> 1,
},
);
my
@response
=
$self
->_capture(
$options
,
$self
->cmd_hdfs,
qw( dfs -mkdir )
,
(
map
{
'-'
.
$_
}
grep
{
$arg
->{
$_
} }
@flags
),
@{
$paths
},
);
return
@response
}
sub
_dfs_chmod {
my
$self
=
shift
;
my
$options
=
shift
;
my
@params
=
@_
;
my
@flags
=
qw( p )
;
my
(
$arg
,
$paths
) =
$self
->_parse_options(
\
@params
,
\
@flags
,
undef
,
{
require_params
=> 1,
},
);
my
@response
=
$self
->_capture(
$options
,
$self
->cmd_hdfs,
qw( dfs -chmod )
,
(
map
{
'-'
.
$_
}
grep
{
$arg
->{
$_
} }
@flags
),
@{
$paths
},
);
return
@response
}
sub
_dfs_chown {
my
$self
=
shift
;
my
$options
=
shift
;
my
@params
=
@_
;
my
@flags
=
qw( p )
;
my
(
$arg
,
$paths
) =
$self
->_parse_options(
\
@params
,
\
@flags
,
undef
,
{
require_params
=> 1,
},
);
my
@response
=
$self
->_capture(
$options
,
$self
->cmd_hdfs,
qw( dfs -chown )
,
(
map
{
'-'
.
$_
}
grep
{
$arg
->{
$_
} }
@flags
),
@{
$paths
},
);
return
@response
}
sub
_dfs_get {
my
$self
=
shift
;
my
$options
=
shift
;
my
@params
=
@_
;
my
@flags
=
qw( p ignoreCrc crc )
;
my
(
$arg
,
$paths
) =
$self
->_parse_options(
\
@params
,
\
@flags
,
undef
,
{
require_params
=> 1,
},
);
my
@response
=
$self
->_capture(
$options
,
$self
->cmd_hdfs,
qw( dfs -get )
,
(
map
{
'-'
.
$_
}
grep
{
$arg
->{
$_
} }
@flags
),
@{
$paths
},
);
return
@response
}
sub
_dfs_getfacl {
my
$self
=
shift
;
my
$options
=
shift
;
my
@params
=
@_
;
my
@flags
=
qw( R )
;
my
(
$arg
,
$paths
) =
$self
->_parse_options(
\
@params
,
\
@flags
,
undef
,
{
require_params
=> 1,
},
);
my
@response
=
$self
->_capture(
$options
,
$self
->cmd_hdfs,
qw( dfs -getfacl )
,
(
map
{
'-'
.
$_
}
grep
{
$arg
->{
$_
} }
@flags
),
@{
$paths
},
);
my
%rv
;
for
my
$line
(
@response
) {
if
(
my
(
$match
) =
$line
=~ m{ \A [
my
(
$k
,
$v
) =
split
m{ [:] \s+ }xms,
$match
, 2;
$rv
{
$k
} =
$v
;
next
;
}
push
@{
$rv
{entries} ||= [] },
$line
;
}
return
\
%rv
;
}
sub
_dfs_setfacl {
my
$self
=
shift
;
my
$options
=
shift
;
my
@params
=
@_
;
my
@flags
=
qw( b k R )
;
my
@args
=
qw( m=s x=s set=s )
;
my
(
$arg
,
$paths
) =
$self
->_parse_options(
\
@params
,
\
@flags
,
\
@args
,
{
require_params
=> 1,
},
);
my
@acl_flags
=
map
{
'-'
.
$_
}
grep
{
$arg
->{
$_
} }
@flags
;
delete
@{
$arg
}{
@flags
};
my
@acl_args
=
map
{
my
$key
=
$_
eq
'set'
?
'--set'
:
'-'
.
$_
;
$key
=>
$arg
->{
$_
}
}
keys
%{
$arg
};
my
@response
=
$self
->_capture(
$options
,
$self
->cmd_hdfs,
qw( dfs -setfacl )
,
@acl_flags
,
@acl_args
,
@{
$paths
},
);
@response
;
}
sub
_parse_options {
my
$self
=
shift
;
my
(
$params
,
$flags
,
$opt
,
$conf
) =
@_
;
$conf
||= {};
my
@params
=
map
{
$_
eq
'-'
?
'\-'
:
$_
} @{
$params
};
my
@getopt_args
= (
\
@params
,
\
my
%arg
,
(
map
{ Ref::Util::is_arrayref
$_
? @{
$_
} : () }
$flags
,
$opt
,
),
);
if
(
$self
->trace_logs ) {
$self
->_log(
trace
=>
'_parse_options::getopt: %s'
, \
@getopt_args
);
}
Getopt::Long::GetOptionsFromArray(
@getopt_args
) ||
die
qq{Unable to parse parameters: '@{$params}
'};
if
(
$conf
->{require_params} && !
@params
) {
die
"No parameters were specified!"
;
}
if
(
$self
->trace_logs ) {
$self
->_log(
trace
=>
'_parse_options::rv: %s'
, Dumper [ \
%arg
, [
@params
] ] );
}
return
\
%arg
, [
@params
];
}
sub
_capture {
my
$self
=
shift
;
my
$options
=
shift
;
my
@cmd
=
@_
;
$self
->_log(
debug
=>
'Executing command: %s'
,
join
(
' '
,
@cmd
) );
my
$start
=
time
;
my
(
$stdout
,
$stderr
,
$fail
) = Capture::Tiny::capture {
system
(
@cmd
);
};
$self
->_log(
debug
=>
'Execution took %.3f seconds'
,
time
-
$start
);
if
(
$fail
) {
my
$code
=
$fail
>> 8;
$stderr
||=
'[no error]'
;
my
$msg
=
"External command (@cmd) failed with status=$code: $stderr"
;
if
(
$options
->{ignore_fail} ) {
if
( !
$options
->{silent} ) {
warn
"[Fatal error downgraded to a warning] $msg"
;
}
return
$self
->_split_on_newlines(
$stdout
||
''
);
}
die
$msg
;
}
if
(
$stderr
) {
warn
"Warning from external command: $stderr"
;
}
return
$self
->_split_on_newlines(
$stdout
);
}
sub
_capture_with_stdin {
my
$self
=
shift
;
my
$options
=
shift
;
my
@cmd
=
@_
;
my
$stdin
=
delete
$options
->{stdin};
$self
->_log(
debug
=>
'Executing command(IPC): %s'
,
join
(
' '
,
@cmd
) );
my
$start
=
time
;
my
$res
= IPC::Cmd::run_forked(
\
@cmd
,
{
(
$stdin
? (
child_stdin
=>
$stdin
,
) : () ),
terminate_on_parent_sudden_death
=> 1,
}
);
$self
->_log(
debug
=>
'Execution took %.3f seconds'
,
time
-
$start
);
my
(
$stdout
,
$stderr
,
$fail
);
my
$success
=
defined
$res
->{exit_code}
&&
$res
->{exit_code} == 0
&& !
$res
->{timeout};
$fail
=
$success
? 0 :
$res
->{exit_code};
$stderr
=
$res
->{stderr};
$stdout
=
$res
->{stdout};
if
(
$fail
) {
my
$code
=
$fail
>> 8;
$stderr
||=
$res
->{err_msg} ||
'[no error]'
;
my
$msg
=
"External command (@cmd) failed with status=$code: $stderr"
;
if
(
$options
->{ignore_fail} ) {
if
( !
$options
->{silent} ) {
warn
"[Fatal error downgraded to a warning] $msg"
;
}
return
$self
->_split_on_newlines(
$stdout
||
''
);
}
die
$msg
;
}
if
(
$stderr
) {
warn
"Warning from external command: $stderr"
;
}
return
$self
->_split_on_newlines(
$stdout
);
}
sub
_split_on_newlines {
my
$self
=
shift
;
my
$rv
=
shift
;
$rv
=~ s{ \A \s+ }{}xms;
$rv
=~ s{ \s+ \z }{}xms;
return
split
m{ \n+ }xms,
$rv
;
}
sub
_log {
my
$self
=
shift
;
return
if
!
$self
->enable_log;
my
(
$level
,
$tmpl
,
@param
) =
@_
;
my
$msg
=
sprintf
"[%s] %s\n"
,
uc
$level
,
$tmpl
;
printf
STDERR
$msg
,
@param
;
}
1;