BEGIN
{
use
vars
qw( $SUPPORTED_RE $SYSV_SUPPORTED $SEMOP_ARGS $SHEM_REPO $ID2OBJ $N $HAS_B64 )
;
$SUPPORTED_RE
=
qr/IPC\/
SysV/;
if
(
$Config
{extensions} =~ /
$SUPPORTED_RE
/ &&
$^O !~ /^(?:Android|dos|MSWin32|os2|VMS|riscos)/i &&
$Config
{d_msg} eq
'define'
&&
$Config
{d_sem} eq
'define'
&&
$Config
{d_semctl} eq
'define'
&&
$Config
{d_semget} eq
'define'
&&
$Config
{d_semop} eq
'define'
&&
$Config
{d_shm} eq
'define'
&&
$Config
{d_shmat} eq
'define'
&&
$Config
{d_shmctl} eq
'define'
&&
$Config
{d_shmdt} eq
'define'
&&
$Config
{d_shmget} eq
'define'
)
{
IPC::SysV->
import
(
qw( IPC_RMID IPC_PRIVATE IPC_SET IPC_STAT IPC_CREAT IPC_EXCL IPC_NOWAIT
SEM_UNDO S_IRWXU S_IRWXG S_IRWXO S_IRUSR S_IWUSR
GETNCNT GETZCNT GETVAL SETVAL GETPID GETALL SETALL
shmat shmdt memread memwrite ftok )
);
$SYSV_SUPPORTED
= 1;
no
strict
'subs'
;
eval
(
<<'EOT' );
our $SEMOP_ARGS =
{
(LOCK_EX) =>
[
1, 0, 0, # wait for readers to finish
2, 0, 0, # wait for writers to finish
2, 1, SEM_UNDO, # assert write lock
],
(LOCK_EX | LOCK_NB) =>
[
1, 0, IPC_NOWAIT, # wait for readers to finish
2, 0, IPC_NOWAIT, # wait for writers to finish
2, 1, (SEM_UNDO | IPC_NOWAIT), # assert write lock
],
(LOCK_EX | LOCK_UN) =>
[
2, -1, (SEM_UNDO | IPC_NOWAIT),
],
(LOCK_SH) =>
[
2, 0, 0, # wait for writers to finish
1, 1, SEM_UNDO, # assert shared read lock
],
(LOCK_SH | LOCK_NB) =>
[
2, 0, IPC_NOWAIT, # wait for writers to finish
1, 1, (SEM_UNDO | IPC_NOWAIT), # assert shared read lock
],
(LOCK_SH | LOCK_UN) =>
[
1, -1, (SEM_UNDO | IPC_NOWAIT), # remove shared read lock
],
};
EOT
if
( $@ )
{
warn
(
"Error while trying to eval \$SEMOP_ARGS: $@\n"
);
}
}
else
{
$SYSV_SUPPORTED
= 0;
}
$N
=
do
{
my
$foo
=
eval
{
pack
"L!"
, 0 }; $@ ?
''
:
'!'
};
$SHEM_REPO
= [];
$ID2OBJ
= {};
our
@EXPORT_OK
=
qw(LOCK_EX LOCK_SH LOCK_NB LOCK_UN)
;
our
%EXPORT_TAGS
= (
all
=> [
qw( LOCK_EX LOCK_SH LOCK_NB LOCK_UN )
],
lock
=> [
qw( LOCK_EX LOCK_SH LOCK_NB LOCK_UN )
],
'flock'
=> [
qw( LOCK_EX LOCK_SH LOCK_NB LOCK_UN )
],
);
our
$VERSION
=
'v0.4.2'
;
};
no
warnings
'redefine'
;
sub
init
{
my
$self
=
shift
(
@_
);
$self
->{base64} =
undef
;
$self
->{create} = 0;
$self
->{destroy} = 0;
$self
->{destroy_semaphore} = 0;
$self
->{exclusive} = 0;
no
strict
'subs'
;
$self
->{key} =
&IPC::SysV::IPC_PRIVATE
if
(
$SYSV_SUPPORTED
);
$self
->{mode} = 0666;
$self
->{serial} =
''
;
$self
->{size} = SHM_BUFSIZ;
$self
->{_init_strict_use_sub} = 1;
$self
->{_packing_method} =
'json'
;
$self
->SUPER::init(
@_
) ||
return
(
$self
->pass_error );
$self
->{addr} =
undef
();
$self
->{id} =
undef
();
$self
->{locked} = 0;
$self
->{owner} = $$;
$self
->{removed} = 0;
$self
->{removed_semaphore} = 0;
$self
->{semid} =
undef
();
return
(
$self
);
}
sub
addr {
return
(
shift
->_set_get_scalar(
'addr'
,
@_
) ); }
sub
as_hash {
return
(
$_
[0] ); }
sub
attach
{
my
$self
=
shift
(
@_
);
my
$flags
=
shift
(
@_
);
$flags
=
$self
->flags
if
( !
defined
(
$flags
) );
my
$addr
=
$self
->addr;
return
(
$addr
)
if
(
defined
(
$addr
) );
my
$id
=
$self
->id;
return
(
$self
->error(
"No shared memory id! Have you opened it first?"
) )
if
( !
length
(
$id
) );
$addr
= shmat(
$id
,
undef
(),
$flags
);
return
(
$self
->error(
"Unable to attach to shared memory: $!"
) )
if
( !
defined
(
$addr
) );
$self
->addr(
$addr
);
return
(
$addr
);
}
sub
base64 {
return
(
shift
->_set_get_scalar(
'base64'
,
@_
) ); }
sub
cbor {
return
(
shift
->_packing_method(
'cbor'
) ); }
sub
create {
return
(
shift
->_set_get_boolean(
'create'
,
@_
) ); }
sub
delete
{
return
(
shift
->remove(
@_
) ); }
sub
destroy
{
my
$self
=
shift
(
@_
);
if
(
@_
)
{
my
$val
=
shift
(
@_
);
$self
->_set_get_boolean(
'destroy'
,
$val
);
$self
->_set_get_boolean(
'destroy_semaphore'
,
$val
);
}
return
(
$self
->_set_get_boolean(
'destroy'
) );
}
sub
destroy_semaphore {
return
(
shift
->_set_get_boolean(
'destroy_semaphore'
,
@_
) ); }
sub
detach
{
my
$self
=
shift
(
@_
);
my
$addr
=
$self
->addr;
return
if
( !
defined
(
$addr
) );
my
$rv
= shmdt(
$addr
);
return
(
$self
->error(
"Unable to detach from shared memory: $!"
) )
if
( !
defined
(
$rv
) );
$self
->addr(
undef
() );
return
(
$self
);
}
sub
exclusive {
return
(
shift
->_set_get_boolean(
'exclusive'
,
@_
) ); }
sub
exists
{
my
$self
=
shift
(
@_
);
my
$opts
= {};
if
(
ref
(
$_
[0] ) eq
'HASH'
)
{
$opts
=
shift
(
@_
);
}
else
{
@$opts
{
qw( key mode size )
} =
@_
;
}
$opts
->{size} =
$self
->size
unless
(
length
(
$opts
->{size} ) );
$opts
->{size} =
int
(
$opts
->{size} );
my
$serial
;
if
(
length
(
$opts
->{key} ) )
{
$serial
=
$self
->_str2key(
$opts
->{key} );
}
else
{
$serial
=
$self
->serial;
}
my
$flags
=
$self
->flags({
mode
=> 0644 });
no
strict
'subs'
;
$flags
= (
$flags
^
&IPC::SysV::IPC_CREAT
);
my
$semid
;
local
$@;
my
@rv
=
eval
{
$semid
=
semget
(
$serial
, 0,
$flags
);
if
(
defined
(
$semid
) )
{
my
$arg
= 0;
my
$found
=
semctl
(
$semid
, SEM_MARKER,
&IPC::SysV::GETVAL
,
$arg
);
$arg
= 0;
semctl
(
$semid
, 0,
&IPC::SysV::IPC_RMID
,
$arg
);
return
(
$found
== SHM_EXISTS ? 1 : 0 );
}
else
{
return
(0)
if
( $! =~ /\bNo[[:blank:]]+such[[:blank:]]+file\b/ );
return
;
}
};
if
( $@ )
{
my
$arg
= 0;
if
(
$semid
)
{
local
$@;
if
( !
eval
{
semctl
(
$semid
, 0,
&IPC::SysV::IPC_RMID
,
$arg
);
})
{
warn
(
"Error trying to remove semaphore id ${semid} after checking if shared memory existed: $@"
)
if
(
$self
->_warnings_is_enabled );
}
}
return
(0);
}
else
{
return
(
$rv
[0] );
}
}
sub
flags
{
my
$self
=
shift
(
@_
);
my
$opts
=
$self
->_get_args_as_hash(
@_
);
no
warnings
'uninitialized'
;
no
strict
'subs'
;
$opts
->{create} =
$self
->create
unless
(
length
(
$opts
->{create} ) );
$opts
->{exclusive} =
$self
->exclusive
unless
(
length
(
$opts
->{exclusive} ) );
$opts
->{mode} =
$self
->mode
unless
(
length
(
$opts
->{mode} ) );
my
$flags
= 0;
$flags
|=
&IPC::SysV::IPC_CREAT
if
(
$opts
->{create} );
$flags
|=
&IPC::SysV::IPC_EXCL
if
(
$opts
->{exclusive} );
$flags
|= (
$opts
->{mode} || 0666 );
return
(
$flags
);
}
sub
id
{
my
$self
=
shift
(
@_
);
my
@callinfo
=
caller
;
no
warnings
'uninitialized'
;
if
(
@_
)
{
$self
->{id} =
shift
(
@_
);
}
return
(
$self
->{id} );
}
sub
json {
return
(
shift
->_packing_method(
'json'
) ); }
sub
key
{
my
$self
=
shift
(
@_
);
if
(
@_
)
{
$self
->{key} =
shift
(
@_
);
$self
->{serial} =
$self
->_str2key(
$self
->{key} );
}
return
(
$self
->{key} );
}
sub
lock
{
my
$self
=
shift
(
@_
);
my
$type
=
shift
(
@_
);
my
$timeout
=
shift
(
@_
);
$type
= LOCK_SH
if
( !
defined
(
$type
) );
return
(
$self
->unlock )
if
( (
$type
& LOCK_UN ) );
return
(1)
if
(
$self
->locked &
$type
);
$timeout
= 0
if
( !
defined
(
$timeout
) ||
$timeout
!~ /^\d+$/ );
$self
->unlock
if
(
$self
->locked );
my
$semid
=
$self
->semid;
return
(
$self
->error(
"No semaphore id set yet."
) )
if
( !
defined
(
$semid
) );
local
$@;
my
$rc
;
eval
{
local
$SIG
{ALRM} =
sub
{
die
(
"timeout"
); };
alarm
(
$timeout
);
$rc
=
$self
->op( @{
$SEMOP_ARGS
->{
$type
}} );
alarm
(0);
};
if
( $@ )
{
return
(
$self
->error(
"Unable to set a lock: $@"
) );
}
else
{
if
(
$rc
)
{
$self
->locked(
$type
);
}
else
{
return
(
$self
->error(
"Failed to set a lock on semaphore id \"$semid\" for lock type $type: $!"
) );
}
}
return
(
$self
);
}
sub
locked {
return
(
shift
->_set_get_scalar(
'locked'
,
@_
) ); }
sub
mode {
return
(
shift
->_set_get_scalar(
'mode'
,
@_
) ); }
sub
op
{
my
$self
=
shift
(
@_
);
return
(
$self
->error(
"No argument was provided!"
) )
if
( !
scalar
(
@_
) );
return
(
$self
->error(
"Invalid number of argument: '"
,
join
( ',
', @_ ), "'
." ) )
if
(
@_
% 3 );
my
$id
=
$self
->semid;
return
(
$self
->error(
"No semaphore set yet. You must open the shared memory first to set the semaphore."
) )
if
( !
length
(
$id
) );
my
$data
=
pack
(
"s$N*"
,
@_
);
my
$rv
;
no
strict
'subs'
;
$rv
=
semop
(
$id
,
$data
) ||
do
{
my
$serial
=
$self
->serial;
my
$semid
=
semget
(
$serial
, 3, IPC_CREAT | 0666 );
$rv
=
semop
(
$semid
,
$data
);
};
return
(
$rv
);
}
sub
open
{
my
$self
=
shift
(
@_
);
my
$opts
= {};
if
(
ref
(
$_
[0] ) eq
'HASH'
)
{
$opts
=
shift
(
@_
);
}
else
{
@$opts
{
qw( key mode size )
} =
@_
;
}
$opts
->{size} =
$self
->size
unless
(
length
(
$opts
->{size} ) );
$opts
->{size} =
int
(
$opts
->{size} );
$opts
->{mode} //=
''
;
$opts
->{key} //=
''
;
no
strict
'subs'
;
my
$serial
;
if
(
length
(
$opts
->{key} ) )
{
$serial
=
$self
->_str2key(
$opts
->{key} ) ||
return
(
$self
->error(
"Cannot get serial from key '$opts->{key}': "
,
$self
->error ) );
}
else
{
$serial
=
$self
->serial;
}
die
(
"There is no serial!!\n"
)
if
( !CORE::
length
(
$serial
) );
my
$create
= 0;
if
(
$opts
->{mode} eq
'w'
||
$opts
->{key} =~ s/^>// )
{
$create
++;
}
elsif
(
$opts
->{mode} eq
'r'
||
$opts
->{key} =~ s/^<// )
{
$create
= 0;
}
else
{
$create
=
$self
->create;
}
my
$flags
=
$self
->flags(
create
=>
$create
, (
$opts
->{mode} =~ /^\d+$/ ?
$opts
->{mode} : () ) );
my
$id
;
local
$@;
eval
{
$id
=
shmget
(
$serial
,
$opts
->{size},
$flags
);
if
(
defined
(
$id
) )
{
}
else
{
my
$newflags
= (
$flags
&
&IPC::SysV::IPC_CREAT
) ?
$flags
: (
$flags
|
&IPC::SysV::IPC_CREAT
);
my
$limit
= (
$serial
+ 10 );
while
(
$serial
<=
$limit
)
{
$id
=
shmget
(
$serial
,
$opts
->{size},
$newflags
|
&IPC::SysV::IPC_CREAT
);
$serial
++;
last
if
(
defined
(
$id
) );
}
}
};
if
( $@ )
{
if
( $@ =~ /
shmget
[[:blank:]\h]+not[[:blank:]\h]+implemented/i )
{
return
(
$self
->error(
"IPC SysV is supported, but somehow shmget is not implemented: $@"
) );
}
else
{
return
(
$self
->error(
"Error while trying to get the shared memory id: $@"
) );
}
}
if
( !
defined
(
$id
) )
{
return
(
$self
->error(
"Unable to create shared memory id with key \"$serial\" and flags \"$flags\": $!"
) );
}
$self
->serial(
$serial
);
my
$semid
;
local
$@;
eval
{
$semid
=
semget
(
$serial
, (
$create
? 3 : 0 ),
$flags
);
if
( !
defined
(
$semid
) )
{
my
$newflags
= (
$flags
|
&IPC::SysV::IPC_CREAT
);
$semid
=
semget
(
$serial
, 3,
$newflags
);
}
};
if
( $@ )
{
if
( $@ =~ /
semget
[[:blank:]\h]+not[[:blank:]\h]+implemented/i )
{
return
(
$self
->error(
"IPC SysV is supported, but somehow semget is not implemented: $@"
) );
}
else
{
return
(
$self
->error(
"Error while trying to get the semaphore for shared memory id: $@"
) );
}
}
return
(
$self
->error(
"Unable to get a semaphore for shared memory key \""
, (
$opts
->{key} ||
$self
->key ),
"\" wth id \"$id\": $!"
) )
if
( !
defined
(
$semid
) );
my
$new
=
$self
->new(
key
=> (
$opts
->{key} ||
$self
->key ),
debug
=>
$self
->debug,
mode
=>
$self
->mode,
destroy
=>
$self
->destroy,
destroy_semaphore
=>
$self
->destroy_semaphore,
_packing_method
=>
$self
->_packing_method,
) ||
return
(
$self
->error(
"Cannot create object with key '"
, (
$opts
->{key} ||
$self
->key ),
"': "
,
$self
->error ) );
$new
->key(
$self
->key );
$new
->serial(
$self
->serial );
$new
->id(
$id
);
$new
->semid(
$semid
);
CORE::
push
(
@$SHEM_REPO
,
$id
);
$ID2OBJ
->{
$id
} =
$new
;
if
( !
defined
(
$new
->op( @{
$SEMOP_ARGS
->{(LOCK_SH)}} ) ) )
{
return
(
$self
->error(
"Unable to set lock on sempahore: $!"
) );
}
my
$there
=
$new
->
stat
( SEM_MARKER );
$new
->size(
$opts
->{size} );
if
(
$there
== SHM_EXISTS )
{
}
else
{
$new
->
stat
( SEM_MARKER, SHM_EXISTS ) ||
return
(
$new
->error(
"Unable to set semaphore during object creation: "
,
$new
->error ) );
}
$new
->op( @{
$SEMOP_ARGS
->{(LOCK_SH | LOCK_UN)}} );
return
(
$new
);
}
sub
owner {
return
(
shift
->_set_get_scalar(
'owner'
,
@_
) ); }
sub
pid
{
my
$self
=
shift
(
@_
);
my
$sem
=
shift
(
@_
);
my
$semid
=
$self
->semid ||
return
(
$self
->error(
"No semaphore set yet. You must open the shared memory first to remove semaphore."
) );
no
strict
'subs'
;
my
$arg
= 0;
local
$@;
my
@rv
=
eval
{
my
$v
=
semctl
(
$semid
,
$sem
,
&IPC::SysV::GETPID
,
$arg
);
return
(
$v
? 0 +
$v
:
undef
() );
};
if
( $@ )
{
return
(
$self
->error(
"Error trying to get semaphore pid using semaphore id ${semid}: $@"
) );
}
return
(
$rv
[0] );
}
sub
rand
{
my
$self
=
shift
(
@_
);
my
$size
=
$self
->size || 1024;
no
strict
'subs'
;
local
$@;
my
$key
;
eval
{
$key
=
shmget
(
&IPC::SysV::IPC_PRIVATE
,
$size
,
&IPC::SysV::S_IRWXU
|
&IPC::SysV::S_IRWXG
|
&IPC::SysV::S_IRWXO
);
};
if
( $@ )
{
return
(
$self
->error(
"Error trying to get a random private key using shmget and IPC_PRIVATE: $@"
) );
}
if
( !
defined
(
$key
) )
{
return
(
$self
->error(
"Unable to generate a share memory key: $!"
) )
}
else
{
return
(
$key
);
}
}
sub
read
{
my
(
$self
,
$buf
) =
@_
;
my
$size
;
$size
=
int
(
$_
[2] )
if
(
scalar
(
@_
) > 2 );
$size
//=
int
(
$self
->size || SHM_BUFSIZ );
my
$id
=
$self
->id;
return
(
$self
->error(
"No shared memory id! Have you opened it first?"
) )
if
( !
length
(
$id
) );
my
$buffer
=
''
;
my
$addr
=
$self
->addr;
if
(
$addr
)
{
memread(
$addr
,
$buffer
, 0,
$size
) ||
return
(
$self
->error(
"Unable to read data from shared memory address \"$addr\" using memread: $!"
) );
}
else
{
shmread
(
$id
,
$buffer
, 0,
$size
) ||
return
(
$self
->error(
"Unable to read data from shared memory id \"$id\": $!"
) );
}
my
$packing
=
$self
->_packing_method;
my
$data
;
if
( CORE::
length
(
$buffer
) )
{
if
(
index
(
$buffer
,
'MG['
) == 0 )
{
my
$def
=
substr
(
$buffer
, 0,
index
(
$buffer
,
']'
) + 1,
''
);
my
$len
=
int
(
substr
(
$def
, 3, -1 ) );
substr
(
$buffer
,
$len
,
length
(
$buffer
),
''
);
}
local
$@;
eval
{
if
(
$packing
eq
'json'
)
{
$data
=
$self
->_decode_json(
$buffer
);
}
elsif
(
$packing
eq
'cbor'
)
{
$data
=
$self
->deserialise(
data
=>
$buffer
,
serialiser
=>
'CBOR::XS'
,
allow_sharing
=> 1,
(
defined
(
$self
->{base64} ) ? (
base64
=>
$self
->{base64} ) : () ),
);
}
elsif
(
$packing
eq
'sereal'
)
{
$data
=
$self
->deserialise(
data
=>
$buffer
,
serialiser
=>
'Sereal'
,
freeze_callbacks
=> 1,
(
defined
(
$self
->{base64} ) ? (
base64
=>
$self
->{base64} ) : () ),
);
}
else
{
$data
=
$self
->deserialise(
data
=>
$buffer
,
serialiser
=>
'Storable::Improved'
,
(
defined
(
$self
->{base64} ) ? (
base64
=>
$self
->{base64} ) : () ),
);
}
};
if
( $@ )
{
return
(
$self
->error(
"An error occured while decoding data using $packing with base64 set to '"
, (
$self
->{base64} // '
' ), "'
: $@" ) );
}
}
else
{
$data
=
$buffer
;
}
if
(
scalar
(
@_
) > 1 )
{
$_
[1] =
$data
;
return
( CORE::
length
(
$_
[1] ) ||
"0E0"
);
}
else
{
return
(
$data
);
}
}
sub
remove
{
my
$self
=
shift
(
@_
);
return
(1)
if
(
$self
->removed );
my
$id
=
$self
->id();
return
(
$self
->error(
"No shared memory id! Have you opened it first?"
) )
if
( !
length
(
$id
) );
my
$semid
=
$self
->semid;
return
(
$self
->error(
"No semaphore set yet. You must open the shared memory first to remove semaphore."
) )
if
( !
length
(
$semid
) );
$self
->unlock();
no
strict
'subs'
;
if
( !
defined
(
shmctl
(
$id
,
&IPC::SysV::IPC_RMID
, 0 ) ) )
{
return
(
$self
->error(
"Unable to remove share memory segement id '$id' (IPC_RMID is '"
,
&IPC::SysV::IPC_RMID
,
"'): $!"
) );
}
my
$rv
;
my
$arg
= 0;
if
( !
defined
(
$rv
=
semctl
(
$semid
, 0,
&IPC::SysV::IPC_RMID
,
$arg
) ) )
{
warn
(
"Warning only: could not remove the semaphore id \"$semid\": $!"
)
if
(
$self
->_warnings_is_enabled );
}
$self
->removed(
$rv
? 1 : 0 );
if
(
$rv
)
{
for
(
my
$i
= 0;
$i
<
scalar
(
@$SHEM_REPO
);
$i
++ )
{
my
$this_id
=
$SHEM_REPO
->[
$i
];
my
$obj
=
$ID2OBJ
->{
$this_id
};
if
( Scalar::Util::blessed(
$obj
) &&
$this_id
eq
$id
)
{
CORE::
splice
(
@$SHEM_REPO
,
$i
, 1 );
CORE::
delete
(
$ID2OBJ
->{
$this_id
} );
last
;
}
}
$self
->id(
undef
() );
$self
->semid(
undef
() );
}
return
(
$rv
? 1 : 0 );
}
sub
remove_semaphore
{
my
$self
=
shift
(
@_
);
return
(1)
if
(
$self
->removed_semaphore );
my
$semid
=
$self
->semid;
return
(
$self
->error(
"No semaphore set yet. You must open the shared memory first to remove semaphore."
) )
if
( !
length
(
$semid
) );
$self
->unlock();
my
$rv
;
no
strict
'subs'
;
my
$arg
= 0;
if
( !
defined
(
$rv
=
semctl
(
$semid
, 0,
&IPC::SysV::IPC_RMID
,
$arg
) ) )
{
warn
(
"Warning only: could not remove the semaphore id \"$semid\" with IPC::SysV::IPC_RMID value '"
,
&IPC::SysV::IPC_RMID
,
"': $!"
)
if
(
$self
->_warnings_is_enabled );
}
$self
->removed_semaphore(
$rv
? 1 : 0 );
$self
->semid(
undef
() );
return
(
$rv
? 1 : 0 );
}
sub
removed {
return
(
shift
->_set_get_boolean(
'removed'
,
@_
) ); }
sub
removed_semaphore {
return
(
shift
->_set_get_boolean(
'removed_semaphore'
,
@_
) ); }
sub
reset
{
my
$self
=
shift
(
@_
);
my
$default
;
if
(
@_
)
{
$default
=
shift
(
@_
);
}
else
{
$default
=
''
;
}
$self
->
lock
( LOCK_EX );
$self
->
write
(
$default
);
$self
->unlock;
return
(
$self
);
}
sub
semid {
return
(
shift
->_set_get_scalar(
'semid'
,
@_
) ); }
sub
sereal {
return
(
shift
->_packing_method(
'sereal'
) ); }
sub
serial {
return
(
shift
->_set_get_scalar(
'serial'
,
@_
) ); }
sub
serialiser {
return
(
shift
->_set_get_scalar(
'_packing_method'
,
@_
) ); }
{
no
warnings
'once'
;
*serializer
= \
&serialiser
;
}
sub
shmstat
{
my
$self
=
shift
(
@_
);
my
$data
=
''
;
my
$id
=
$self
->id ||
return
(
$self
->error(
"No shared memory id set!"
) );
no
strict
'subs'
;
shmctl
(
$id
,
&IPC::SysV::IPC_STAT
,
$data
) or
return
(
$self
->error(
"Unable to stat shared memory with id '$id': $!"
) );
return
( Module::Generic::SharedStat->new->
unpack
(
$data
) );
}
sub
size {
return
(
shift
->_set_get_scalar(
'size'
,
@_
) ); }
sub
stat
{
my
$self
=
shift
(
@_
);
my
$id
=
$self
->semid;
return
(
$self
->error(
"No semaphore set yet. You must open the shared memory first to set the semaphore."
) )
if
( !
length
(
$id
) );
no
strict
'subs'
;
if
(
@_
)
{
if
(
@_
== 1 )
{
my
$sem
=
shift
(
@_
);
my
$arg
= 0;
my
$v
=
semctl
(
$id
,
$sem
,
&IPC::SysV::GETVAL
,
$arg
);
return
(
$v
? 0 +
$v
:
undef
() );
}
else
{
my
(
$sem
,
$val
) =
@_
;
semctl
(
$id
,
$sem
,
&IPC::SysV::SETVAL
,
$val
) ||
return
(
$self
->error(
"Unable to semctl with semaphore id '$id', semaphore '$sem', SETVAL='"
,
&IPC::SysV::SETVAL
,
"' and value='$val': $!"
) );
}
}
else
{
my
$data
=
''
;
if
(
wantarray
() )
{
semctl
(
$id
, 0,
&IPC::SysV::GETALL
,
$data
) ||
return
( () );
return
( (
unpack
(
"s$N*"
,
$data
) ) );
}
else
{
semctl
(
$id
, 0,
&IPC::SysV::IPC_STAT
,
$data
) ||
return
(
$self
->error(
"Unable to stat semaphore with id '$id': $!"
) );
return
( Module::Generic::SemStat->new->
unpack
(
$data
) );
}
}
}
sub
storable {
return
(
shift
->_packing_method(
'storable'
) ); }
sub
supported {
return
(
$SYSV_SUPPORTED
); }
sub
unlock
{
my
$self
=
shift
(
@_
);
return
(1)
if
( !
$self
->locked );
my
$semid
=
$self
->semid;
return
(
$self
->error(
"No semaphore set yet. You must open the shared memory first to unlock semaphore."
) )
if
( !
length
(
$semid
) );
my
$type
= (
$self
->locked | LOCK_UN );
$type
^= LOCK_NB
if
(
$type
& LOCK_NB );
if
(
defined
(
$self
->op( @{
$SEMOP_ARGS
->{
$type
}} ) ) )
{
$self
->locked(0);
}
return
(
$self
);
}
sub
write
{
my
$self
=
shift
(
@_
);
my
$data
;
if
(
scalar
(
@_
) == 1 &&
ref
(
$_
[0] ) )
{
$data
=
shift
(
@_
);
}
else
{
$data
= \
join
(
''
,
@_
);
}
my
$id
=
$self
->id();
my
$size
=
int
(
$self
->size() ) || SHM_BUFSIZ;
my
$packing
=
$self
->_packing_method;
my
$encoded
;
if
(
$packing
eq
'json'
)
{
local
$@;
eval
{
$encoded
=
$self
->_encode_json(
$data
);
};
if
( $@ )
{
return
(
$self
->error(
"An error occured encoding data provided using $packing with base64 set to '"
, (
$self
->{base64} // '
' ), ": $@. Data was: '
$data
'" ) );
}
}
elsif
(
$packing
eq
'cbor'
)
{
local
$@;
eval
{
$encoded
=
$self
->serialise(
$data
,
serialiser
=>
'CBOR::XS'
,
allow_sharing
=> 1,
(
defined
(
$self
->{base64} ) ? (
base64
=>
$self
->{base64} ) : () ),
);
};
if
( $@ )
{
return
(
$self
->error(
"An error occured encoding data provided using $packing with base64 set to '"
, (
$self
->{base64} // '
' ), ": $@. Data was: '
$data
'" ) );
}
return
(
$self
->error(
"Unable to serialise "
, CORE::
length
(
$data
),
" bytes of data using CBOR::XS with base64 set to '"
, (
$self
->{base64} // '' ),
": "
,
$self
->error ) )
if
( !
defined
(
$encoded
) );
}
elsif
(
$packing
eq
'sereal'
)
{
$self
->_load_class(
'Sereal::Encoder'
) ||
return
(
$self
->pass_error );
my
$const
;
$const
= \&{
"Sereal\::Encoder::SRL_ZLIB"
}
if
(
defined
( &{
"Sereal\::Encoder::SRL_ZLIB"
} ) );
local
$@;
eval
{
$encoded
=
$self
->serialise(
$data
,
serialiser
=>
'Sereal'
,
freeze_callbacks
=> 1,
(
defined
(
$const
) ? (
compress
=>
$const
->() ) : () ),
(
defined
(
$self
->{base64} ) ? (
base64
=>
$self
->{base64} ) : () ),
);
};
if
( $@ )
{
return
(
$self
->error(
"An error occured encoding data provided using $packing with base64 set to '"
, (
$self
->{base64} // '
' ), ": $@. Data was: '
$data
'" ) );
}
return
(
$self
->error(
"Unable to serialise "
, CORE::
length
(
$data
),
" bytes of data using Sereal with base64 set to '"
, (
$self
->{base64} // '' ),
": "
,
$self
->error ) )
if
( !
defined
(
$encoded
) );
}
else
{
eval
{
$encoded
=
$self
->serialise(
$data
,
serialiser
=>
'Storable::Improved'
,
(
defined
(
$self
->{base64} ) ? (
base64
=>
$self
->{base64} ) : () ),
);
};
if
( $@ )
{
return
(
$self
->error(
"An error occured encoding data provided using $packing with base64 set to '"
, (
$self
->{base64} // '
' ), ": $@. Data was: '
$data
'" ) );
}
return
(
$self
->error(
"Unable to serialise "
, CORE::
length
(
$data
),
" bytes of data using Storable with base64 set to '"
, (
$self
->{base64} // '' ),
": "
,
$self
->error ) )
if
( !
defined
(
$encoded
) );
}
substr
(
$encoded
, 0, 0,
'MG['
.
length
(
$encoded
) .
']'
);
my
$len
=
length
(
$encoded
);
if
(
$len
>
$size
)
{
return
(
$self
->error(
"Data to write are "
,
length
(
$encoded
),
" bytes long and exceed the maximum you have set of '$size'."
) );
}
my
$addr
=
$self
->addr;
if
(
$addr
)
{
memwrite(
$addr
,
$encoded
, 0,
$len
) ||
return
(
$self
->error(
"Unable to write to shared memory address '$addr' using memwrite: $!"
) );
}
else
{
shmwrite
(
$id
,
$encoded
, 0,
$len
) ||
return
(
$self
->error(
"Unable to write to shared memory id '$id' with ${len} bytes of data encoded and memory size of $size: $!"
) );
}
return
(
$self
);
}
sub
_decode_json
{
my
$self
=
shift
(
@_
);
my
$data
=
shift
(
@_
);
return
(
$data
)
if
( !
defined
(
$data
) || !CORE::
length
(
$data
) );
my
$j
= JSON->new->utf8->relaxed->allow_nonref;
my
$seen
= {};
my
$crawl
;
$crawl
=
sub
{
my
$this
=
shift
(
@_
);
my
$type
= Scalar::Util::reftype(
$this
);
return
(
$this
)
if
( (
$type
eq
'HASH'
||
$type
eq
'ARRAY'
) && ++
$seen
->{ Scalar::Util::refaddr(
$this
) } > 1 );
if
(
$type
eq
'HASH'
)
{
if
( CORE::
exists
(
$this
->{__scalar_gen_shm} ) )
{
return
( \
$this
->{__scalar_gen_shm} );
}
foreach
my
$k
(
keys
(
%$this
) )
{
next
if
( !
ref
(
$this
->{
$k
} ) );
$this
->{
$k
} =
$crawl
->(
$this
->{
$k
} );
}
}
elsif
(
$type
eq
'ARRAY'
)
{
for
(
my
$i
= 0;
$i
<
scalar
(
@$this
);
$i
++ )
{
next
if
( !
ref
(
$this
->[
$i
] ) );
$this
->[
$i
] =
$crawl
->(
$this
->[
$i
] );
}
}
return
(
$this
);
};
my
$result
;
local
$@;
eval
{
my
$decoded
=
$j
->decode(
$data
);
$result
=
$crawl
->(
$decoded
);
};
if
( $@ )
{
return
(
$self
->error(
"An error occurred while trying to decode JSON data: $@"
) );
}
return
(
$result
);
}
sub
_encode_json
{
my
$self
=
shift
(
@_
);
my
$data
=
shift
(
@_
);
my
$seen
= {};
my
$crawl
;
$crawl
=
sub
{
my
$this
=
shift
(
@_
);
my
$type
= Scalar::Util::reftype(
$this
);
return
(
$this
)
if
( (
$type
eq
'HASH'
||
$type
eq
'ARRAY'
) && ++
$seen
->{ Scalar::Util::refaddr(
$this
) } > 1 );
if
(
$type
eq
'HASH'
)
{
foreach
my
$k
(
keys
(
%$this
) )
{
next
if
( !
ref
(
$this
->{
$k
} ) );
$this
->{
$k
} =
$crawl
->(
$this
->{
$k
} );
}
}
elsif
(
$type
eq
'ARRAY'
)
{
for
(
my
$i
= 0;
$i
<
scalar
(
@$this
);
$i
++ )
{
next
if
( !
ref
(
$this
->[
$i
] ) );
$this
->[
$i
] =
$crawl
->(
$this
->[
$i
] );
}
}
elsif
(
$type
eq
'SCALAR'
)
{
return
(
$this
)
if
(
$$this
eq
"1"
or
$$this
eq
"0"
);
my
$pkg
;
if
( (
$pkg
= Scalar::Util::blessed(
$this
) ) )
{
if
( overload::Method(
$this
=>
'""'
) )
{
$this
= {
__scalar_gen_shm
=>
"$this"
,
__package
=>
$pkg
};
}
else
{
$this
= {
__scalar_gen_shm
=>
$$this
,
__package
=>
$pkg
};
}
}
else
{
$this
= {
__scalar_gen_shm
=>
$$this
};
}
}
return
(
$this
);
};
my
$ref
=
$crawl
->(
$data
);
my
$j
= JSON->new->utf8->relaxed->allow_nonref->convert_blessed;
my
$encoded
;
local
$@;
eval
{
$encoded
=
$j
->encode(
$ref
);
};
if
( $@ )
{
return
(
$self
->error(
"An error occurred while trying to JSON encode data: $@"
) );
}
return
(
$encoded
);
}
sub
_packing_method {
return
(
shift
->_set_get_scalar(
'_packing_method'
,
@_
) ); }
sub
_str2key
{
my
$self
=
shift
(
@_
);
my
$key
=
shift
(
@_
);
no
strict
'subs'
;
if
( !
defined
(
$key
) ||
$key
eq
''
)
{
return
(
&IPC::SysV::IPC_PRIVATE
);
}
my
$path
;
(
$key
,
$path
) =
ref
(
$key
) eq
'ARRAY'
?
@$key
: (
$key
, [
getpwuid
($>)]->[7] );
$path
= [
getpwuid
(
$path
)]->[7]
if
(
$path
=~ /^\d+$/ );
$path
||= File::Spec->rootdir();
if
(
$key
=~ /^\d+$/ )
{
my
$id
=
&IPC::SysV::ftok
(
$path
,
$key
) ||
return
(
$self
->error(
"Unable to get a key using IPC::SysV::ftok: $!"
) );
return
(
$id
);
}
else
{
my
$hash
= Digest::SHA::sha1_base64(
$key
);
my
$id
=
ord
(
substr
(
$hash
, 0, 1 ) );
my
$val
=
&IPC::SysV::ftok
(
$path
,
$id
);
return
(
$val
);
}
}
sub
DESTROY
{
my
$self
=
shift
(
@_
);
return
unless
(
$self
->{id} );
$self
->unlock;
$self
->detach;
my
$rv
=
$self
->remove_semaphore;
if
(
$self
->destroy )
{
my
$stat
=
$self
->shmstat();
if
(
defined
(
$stat
) && (
$stat
->nattch() == 0 ) )
{
$self
->remove;
}
}
};
sub
FREEZE
{
my
$self
= CORE::
shift
(
@_
);
my
$serialiser
= CORE::
shift
(
@_
) //
''
;
my
$class
= CORE::
ref
(
$self
);
my
%hash
=
%$self
;
CORE::
delete
(
@hash
{
qw( addr id locked owner removed removed_semaphore semid )
} );
CORE::
return
( [
$class
, \
%hash
] )
if
(
$serialiser
eq
'Sereal'
&& Sereal::Encoder->VERSION <= version->parse(
'4.023'
) );
CORE::
return
(
$class
, \
%hash
);
}
sub
STORABLE_freeze { CORE::
return
( CORE::
shift
->FREEZE(
@_
) ); }
sub
STORABLE_thaw { CORE::
return
( CORE::
shift
->THAW(
@_
) ); }
sub
THAW
{
my
(
$self
,
undef
,
@args
) =
@_
;
my
$ref
= ( CORE::
scalar
(
@args
) == 1 && CORE::
ref
(
$args
[0] ) eq
'ARRAY'
) ? CORE::
shift
(
@args
) : \
@args
;
my
$class
= ( CORE::
defined
(
$ref
) && CORE::
ref
(
$ref
) eq
'ARRAY'
&& CORE::
scalar
(
@$ref
) > 1 ) ? CORE::
shift
(
@$ref
) : ( CORE::
ref
(
$self
) ||
$self
);
my
$hash
= CORE::
ref
(
$ref
) eq
'ARRAY'
? CORE::
shift
(
@$ref
) : {};
my
$new
;
if
( CORE::
ref
(
$self
) )
{
foreach
( CORE::
keys
(
%$hash
) )
{
$self
->{
$_
} = CORE::
delete
(
$hash
->{
$_
} );
}
$new
=
$self
;
}
else
{
$new
= CORE::
bless
(
$hash
=>
$class
);
}
CORE::
return
(
$new
);
}
END
{
foreach
my
$id
(
@$SHEM_REPO
)
{
my
$s
=
$ID2OBJ
->{
$id
} ||
next
;
next
if
(
$s
->removed || !
$s
->id || !
$s
->destroy );
$s
->detach;
$s
->remove;
}
};
{
package
Module::Generic::SharedStat;
our
$VERSION
=
'v0.1.0'
;
sub
new
{
my
$this
=
shift
(
@_
);
my
@vals
=
@_
;
return
(
bless
( [
@vals
] =>
ref
(
$this
) ||
$this
) );
}
sub
unpack
{
my
$self
=
shift
(
@_
);
my
$data
=
shift
(
@_
);
my
$d
= IPC::SharedMem::
stat
->new->
unpack
(
$data
);
return
(
$self
->new(
@$d
) );
}
sub
atime {
return
(
shift
->[ATIME] ); }
sub
cgid {
return
(
shift
->[CGID] ); }
sub
cpid {
return
(
shift
->[CPID] ); }
sub
ctime {
return
(
shift
->[CTIME] ); }
sub
cuid {
return
(
shift
->[CUID] ); }
sub
dtime {
return
(
shift
->[DTIME] ); }
sub
gid {
return
(
shift
->[GID] ); }
sub
lpid {
return
(
shift
->[LPID] ); }
sub
mode {
return
(
shift
->[MODE] ); }
sub
nattch {
return
(
shift
->[NATTCH] ); }
sub
segsz {
return
(
shift
->[SEGSZ] ); }
sub
uid {
return
(
shift
->[UID] ); }
sub
FREEZE
{
my
$self
= CORE::
shift
(
@_
);
my
$serialiser
= CORE::
shift
(
@_
) //
''
;
my
$class
= CORE::
ref
(
$self
);
my
@array
=
@$self
;
CORE::
return
( [
$class
, \
@array
] )
if
(
$serialiser
eq
'Sereal'
&& Sereal::Encoder->VERSION <= version->parse(
'4.023'
) );
CORE::
return
(
$class
, \
@array
);
}
sub
STORABLE_freeze { CORE::
return
( CORE::
shift
->FREEZE(
@_
) ); }
sub
STORABLE_thaw { CORE::
return
( CORE::
shift
->THAW(
@_
) ); }
sub
THAW
{
my
(
$self
,
undef
,
@args
) =
@_
;
my
$ref
= ( CORE::
scalar
(
@args
) == 1 && CORE::
ref
(
$args
[0] ) eq
'ARRAY'
) ? CORE::
shift
(
@args
) : \
@args
;
my
$class
= ( CORE::
defined
(
$ref
) && CORE::
ref
(
$ref
) eq
'ARRAY'
&& CORE::
scalar
(
@$ref
) > 1 ) ? CORE::
shift
(
@$ref
) : ( CORE::
ref
(
$self
) ||
$self
);
my
$array
= CORE::
ref
(
$ref
) eq
'ARRAY'
?
$ref
: [];
if
( CORE::
ref
(
$self
) )
{
@$self
=
@$array
;
CORE::
return
(
$self
);
}
else
{
my
$new
=
bless
(
$array
=>
$class
);
CORE::
return
(
$new
);
}
}
sub
TO_JSON { CORE::
return
( [ @{
$_
[0]} ] ); }
}
{
package
Module::Generic::SemStat;
our
$VERSION
=
'v0.1.0'
;
sub
new
{
my
$this
=
shift
(
@_
);
my
@vals
=
@_
;
return
(
bless
( [
@vals
] =>
ref
(
$this
) ||
$this
) );
}
sub
unpack
{
my
$self
=
shift
(
@_
);
my
$data
=
shift
(
@_
);
my
$d
= IPC::Semaphore::
stat
->new->
unpack
(
$data
);
return
(
$self
->new(
@$d
) );
}
sub
cgid {
return
(
shift
->[CGID] ); }
sub
ctime {
return
(
shift
->[CTIME] ); }
sub
cuid {
return
(
shift
->[CUID] ); }
sub
gid {
return
(
shift
->[GID] ); }
sub
mode {
return
(
shift
->[MODE] ); }
sub
nsems {
return
(
shift
->[NSEMS] ); }
sub
otime {
return
(
shift
->[OTIME] ); }
sub
uid {
return
(
shift
->[UID] ); }
sub
FREEZE
{
my
$self
= CORE::
shift
(
@_
);
my
$serialiser
= CORE::
shift
(
@_
) //
''
;
my
$class
= CORE::
ref
(
$self
);
my
@array
=
@$self
;
CORE::
return
( [
$class
, \
@array
] )
if
(
$serialiser
eq
'Sereal'
&& Sereal::Encoder->VERSION <= version->parse(
'4.023'
) );
CORE::
return
(
$class
, \
@array
);
}
sub
STORABLE_freeze { CORE::
return
( CORE::
shift
->FREEZE(
@_
) ); }
sub
STORABLE_thaw { CORE::
return
( CORE::
shift
->THAW(
@_
) ); }
sub
THAW
{
my
(
$self
,
undef
,
@args
) =
@_
;
my
$ref
= ( CORE::
scalar
(
@args
) == 1 && CORE::
ref
(
$args
[0] ) eq
'ARRAY'
) ? CORE::
shift
(
@args
) : \
@args
;
my
$class
= ( CORE::
defined
(
$ref
) && CORE::
ref
(
$ref
) eq
'ARRAY'
&& CORE::
scalar
(
@$ref
) > 1 ) ? CORE::
shift
(
@$ref
) : ( CORE::
ref
(
$self
) ||
$self
);
my
$array
= CORE::
ref
(
$ref
) eq
'ARRAY'
?
$ref
: [];
if
( CORE::
ref
(
$self
) )
{
@$self
=
@$array
;
CORE::
return
(
$self
);
}
else
{
my
$new
=
bless
(
$array
=>
$class
);
CORE::
return
(
$new
);
}
}
sub
TO_JSON { CORE::
return
( [ @{
$_
[0]} ] ); }
}
1;