The Perl and Raku Conference 2025: Greenville, South Carolina - June 27-29 Learn more

##----------------------------------------------------------------------------
## Module Generic - ~/lib/Module/Generic/SharedMem.pm
## Version v0.4.2
## Copyright(c) 2024 DEGUEST Pte. Ltd.
## Author: Jacques Deguest <jack@deguest.jp>
## Created 2021/01/18
## Modified 2025/03/12
## All rights reserved
##
## This program is free software; you can redistribute it and/or modify it
## under the same terms as Perl itself.
##----------------------------------------------------------------------------
BEGIN
{
use strict;
use warnings;
use vars qw( $SUPPORTED_RE $SYSV_SUPPORTED $SEMOP_ARGS $SHEM_REPO $ID2OBJ $N $HAS_B64 );
use Config;
use File::Spec ();
# use Nice::Try;
use Scalar::Util ();
# This is disruptive for everybody. Bad idea.
# use JSON 4.03 qw( -convert_blessed_universally );
use JSON 4.03;
use constant SHM_BUFSIZ => 65536;
use constant SEM_LOCKER => 0;
use constant SEM_MARKER => 0;
use constant SHM_LOCK_WAIT => 0;
use constant SHM_LOCK_EX => 1;
use constant SHM_LOCK_UN => -1;
use constant SHM_EXISTS => 1;
use constant LOCK_SH => 1;
use constant LOCK_EX => 2;
use constant LOCK_NB => 4;
use constant LOCK_UN => 8;
# if( $^O =~ /^(?:Android|cygwin|dos|MSWin32|os2|VMS|riscos)/ )
# Even better
$SUPPORTED_RE = qr/IPC\/SysV/;
if( $Config{extensions} =~ /$SUPPORTED_RE/ &&
$^O !~ /^(?:Android|dos|MSWin32|os2|VMS|riscos)/i &&
# we need semaphore and messages
$Config{d_msg} eq 'define' &&
$Config{d_sem} eq 'define' &&
$Config{d_semctl} eq 'define' &&
$Config{d_semget} eq 'define' &&
$Config{d_semop} eq 'define' &&
$Config{d_shm} eq 'define' &&
$Config{d_shmat} eq 'define' &&
$Config{d_shmctl} eq 'define' &&
$Config{d_shmdt} eq 'define' &&
$Config{d_shmget} eq 'define'
)
{
require IPC::SysV;
IPC::SysV->import( qw( IPC_RMID IPC_PRIVATE IPC_SET IPC_STAT IPC_CREAT IPC_EXCL IPC_NOWAIT
SEM_UNDO S_IRWXU S_IRWXG S_IRWXO S_IRUSR S_IWUSR
GETNCNT GETZCNT GETVAL SETVAL GETPID GETALL SETALL
shmat shmdt memread memwrite ftok ) );
$SYSV_SUPPORTED = 1;
no strict 'subs';
eval( <<'EOT' );
our $SEMOP_ARGS =
{
(LOCK_EX) =>
[
1, 0, 0, # wait for readers to finish
2, 0, 0, # wait for writers to finish
2, 1, SEM_UNDO, # assert write lock
],
(LOCK_EX | LOCK_NB) =>
[
1, 0, IPC_NOWAIT, # wait for readers to finish
2, 0, IPC_NOWAIT, # wait for writers to finish
2, 1, (SEM_UNDO | IPC_NOWAIT), # assert write lock
],
(LOCK_EX | LOCK_UN) =>
[
2, -1, (SEM_UNDO | IPC_NOWAIT),
],
(LOCK_SH) =>
[
2, 0, 0, # wait for writers to finish
1, 1, SEM_UNDO, # assert shared read lock
],
(LOCK_SH | LOCK_NB) =>
[
2, 0, IPC_NOWAIT, # wait for writers to finish
1, 1, (SEM_UNDO | IPC_NOWAIT), # assert shared read lock
],
(LOCK_SH | LOCK_UN) =>
[
1, -1, (SEM_UNDO | IPC_NOWAIT), # remove shared read lock
],
};
EOT
if( $@ )
{
warn( "Error while trying to eval \$SEMOP_ARGS: $@\n" );
}
}
else
{
$SYSV_SUPPORTED = 0;
}
# Credits IPC::SysV
$N = do { my $foo = eval { pack "L!", 0 }; $@ ? '' : '!' };
# Array to maintain the order in which shared memory object were created, so they can
# be removed in that order
$SHEM_REPO = [];
$ID2OBJ = {};
our @EXPORT_OK = qw(LOCK_EX LOCK_SH LOCK_NB LOCK_UN);
our %EXPORT_TAGS = (
all => [qw( LOCK_EX LOCK_SH LOCK_NB LOCK_UN )],
lock => [qw( LOCK_EX LOCK_SH LOCK_NB LOCK_UN )],
'flock' => [qw( LOCK_EX LOCK_SH LOCK_NB LOCK_UN )],
);
our $VERSION = 'v0.4.2';
};
# use strict;
no warnings 'redefine';
sub init
{
my $self = shift( @_ );
$self->{base64} = undef;
# Default action when accessing a shared memory? If 1, it will create it if it does not exist already
$self->{create} = 0;
# If true, this will destroy both the shared memory and the semaphore upon end
$self->{destroy} = 0;
# If true, this will destroy only the semaphore upon end
$self->{destroy_semaphore} = 0;
$self->{exclusive} = 0;
no strict 'subs';
$self->{key} = &IPC::SysV::IPC_PRIVATE if( $SYSV_SUPPORTED );
$self->{mode} = 0666;
$self->{serial} = '';
# SHM_BUFSIZ
$self->{size} = SHM_BUFSIZ;
$self->{_init_strict_use_sub} = 1;
# $self->{_packing_method} = 'storable';
# Storable keps breaking :(
# I leave the feature of using it as a choice to the user, but defaults to JSON
$self->{_packing_method} = 'json';
$self->SUPER::init( @_ ) || return( $self->pass_error );
$self->{addr} = undef();
$self->{id} = undef();
$self->{locked} = 0;
$self->{owner} = $$;
$self->{removed} = 0;
$self->{removed_semaphore} = 0;
$self->{semid} = undef();
return( $self );
}
sub addr { return( shift->_set_get_scalar( 'addr', @_ ) ); }
# This class does not convert to an HASH
sub as_hash { return( $_[0] ); }
sub attach
{
my $self = shift( @_ );
my $flags = shift( @_ );
$flags = $self->flags if( !defined( $flags ) );
my $addr = $self->addr;
return( $addr ) if( defined( $addr ) );
my $id = $self->id;
return( $self->error( "No shared memory id! Have you opened it first?" ) ) if( !length( $id ) );
$addr = shmat( $id, undef(), $flags );
return( $self->error( "Unable to attach to shared memory: $!" ) ) if( !defined( $addr ) );
$self->addr( $addr );
return( $addr );
}
sub base64 { return( shift->_set_get_scalar( 'base64', @_ ) ); }
sub cbor { return( shift->_packing_method( 'cbor' ) ); }
sub create { return( shift->_set_get_boolean( 'create', @_ ) ); }
sub delete { return( shift->remove( @_ ) ); }
sub destroy
{
my $self = shift( @_ );
if( @_ )
{
my $val = shift( @_ );
$self->_set_get_boolean( 'destroy', $val );
$self->_set_get_boolean( 'destroy_semaphore', $val );
}
return( $self->_set_get_boolean( 'destroy' ) );
}
sub destroy_semaphore { return( shift->_set_get_boolean( 'destroy_semaphore', @_ ) ); }
sub detach
{
my $self = shift( @_ );
my $addr = $self->addr;
return if( !defined( $addr ) );
my $rv = shmdt( $addr );
return( $self->error( "Unable to detach from shared memory: $!" ) ) if( !defined( $rv ) );
$self->addr( undef() );
return( $self );
}
sub exclusive { return( shift->_set_get_boolean( 'exclusive', @_ ) ); }
sub exists
{
my $self = shift( @_ );
my $opts = {};
if( ref( $_[0] ) eq 'HASH' )
{
$opts = shift( @_ );
}
else
{
@$opts{ qw( key mode size ) } = @_;
}
$opts->{size} = $self->size unless( length( $opts->{size} ) );
$opts->{size} = int( $opts->{size} );
my $serial;
if( length( $opts->{key} ) )
{
$serial = $self->_str2key( $opts->{key} );
# $serial = $opts->{key};
}
else
{
$serial = $self->serial;
# $serial = $self->key;
}
my $flags = $self->flags({ mode => 0644 });
# Remove the create bit
no strict 'subs';
$flags = ( $flags ^ &IPC::SysV::IPC_CREAT );
my $semid;
# try-catch
local $@;
my @rv = eval
{
$semid = semget( $serial, 0, $flags );
if( defined( $semid ) )
{
my $arg = 0;
my $found = semctl( $semid, SEM_MARKER, &IPC::SysV::GETVAL, $arg );
$arg = 0;
semctl( $semid, 0, &IPC::SysV::IPC_RMID, $arg );
return( $found == SHM_EXISTS ? 1 : 0 );
}
else
{
return(0) if( $! =~ /\bNo[[:blank:]]+such[[:blank:]]+file\b/ );
return;
}
};
if( $@ )
{
# warn( "Trying to access shared memory triggered error: $e" ) if( $self->_warnings_is_enabled );
my $arg = 0;
if( $semid )
{
# try-catch
local $@;
if( !eval
{
semctl( $semid, 0, &IPC::SysV::IPC_RMID, $arg );
})
{
warn( "Error trying to remove semaphore id ${semid} after checking if shared memory existed: $@" ) if( $self->_warnings_is_enabled );
}
}
return(0);
}
else
{
return( $rv[0] );
}
}
sub flags
{
my $self = shift( @_ );
my $opts = $self->_get_args_as_hash( @_ );
no warnings 'uninitialized';
no strict 'subs';
$opts->{create} = $self->create unless( length( $opts->{create} ) );
$opts->{exclusive} = $self->exclusive unless( length( $opts->{exclusive} ) );
$opts->{mode} = $self->mode unless( length( $opts->{mode} ) );
my $flags = 0;
$flags |= &IPC::SysV::IPC_CREAT if( $opts->{create} );
$flags |= &IPC::SysV::IPC_EXCL if( $opts->{exclusive} );
$flags |= ( $opts->{mode} || 0666 );
return( $flags );
}
# sub id { return( shift->_set_get_scalar( 'id', @_ ) ); }
sub id
{
my $self = shift( @_ );
my @callinfo = caller;
no warnings 'uninitialized';
if( @_ )
{
$self->{id} = shift( @_ );
}
return( $self->{id} );
}
sub json { return( shift->_packing_method( 'json' ) ); }
sub key
{
my $self = shift( @_ );
if( @_ )
{
$self->{key} = shift( @_ );
$self->{serial} = $self->_str2key( $self->{key} );
}
return( $self->{key} );
}
sub lock
{
my $self = shift( @_ );
my $type = shift( @_ );
my $timeout = shift( @_ );
# $type = LOCK_EX if( !defined( $type ) );
$type = LOCK_SH if( !defined( $type ) );
return( $self->unlock ) if( ( $type & LOCK_UN ) );
return(1) if( $self->locked & $type );
$timeout = 0 if( !defined( $timeout ) || $timeout !~ /^\d+$/ );
# If the lock is different, release it first
$self->unlock if( $self->locked );
my $semid = $self->semid;
return( $self->error( "No semaphore id set yet." ) ) if( !defined( $semid ) );
# try-catch
local $@;
my $rc;
eval
{
local $SIG{ALRM} = sub{ die( "timeout" ); };
alarm( $timeout );
$rc = $self->op( @{$SEMOP_ARGS->{ $type }} );
alarm(0);
};
if( $@ )
{
return( $self->error( "Unable to set a lock: $@" ) );
}
else
{
if( $rc )
{
$self->locked( $type );
}
else
{
return( $self->error( "Failed to set a lock on semaphore id \"$semid\" for lock type $type: $!" ) );
}
}
return( $self );
}
sub locked { return( shift->_set_get_scalar( 'locked', @_ ) ); }
sub mode { return( shift->_set_get_scalar( 'mode', @_ ) ); }
sub op
{
my $self = shift( @_ );
return( $self->error( "No argument was provided!" ) ) if( !scalar( @_ ) );
return( $self->error( "Invalid number of argument: '", join( ', ', @_ ), "'." ) ) if( @_ % 3 );
my $id = $self->semid;
return( $self->error( "No semaphore set yet. You must open the shared memory first to set the semaphore." ) ) if( !length( $id ) );
my $data = pack( "s$N*", @_ );
my $rv;
no strict 'subs';
$rv = semop( $id, $data ) || do
{
my $serial = $self->serial;
my $semid = semget( $serial, 3, IPC_CREAT | 0666 );
$rv = semop( $semid, $data );
};
return( $rv );
}
sub open
{
my $self = shift( @_ );
my $opts = {};
if( ref( $_[0] ) eq 'HASH' )
{
$opts = shift( @_ );
}
else
{
@$opts{ qw( key mode size ) } = @_;
}
$opts->{size} = $self->size unless( length( $opts->{size} ) );
$opts->{size} = int( $opts->{size} );
$opts->{mode} //= '';
$opts->{key} //= '';
no strict 'subs';
my $serial;
if( length( $opts->{key} ) )
{
$serial = $self->_str2key( $opts->{key} ) ||
return( $self->error( "Cannot get serial from key '$opts->{key}': ", $self->error ) );
# $serial = $opts->{key};
}
else
{
$serial = $self->serial;
# $serial = $self->key;
}
die( "There is no serial!!\n" ) if( !CORE::length( $serial ) );
my $create = 0;
if( $opts->{mode} eq 'w' || $opts->{key} =~ s/^>// )
{
$create++;
}
elsif( $opts->{mode} eq 'r' || $opts->{key} =~ s/^<// )
{
$create = 0;
}
else
{
$create = $self->create;
}
my $flags = $self->flags( create => $create, ( $opts->{mode} =~ /^\d+$/ ? $opts->{mode} : () ) );
my $id;
# try-catch
local $@;
eval
{
$id = shmget( $serial, $opts->{size}, $flags );
if( defined( $id ) )
{
# All is ok.
}
else
{
my $newflags = ( $flags & &IPC::SysV::IPC_CREAT ) ? $flags : ( $flags | &IPC::SysV::IPC_CREAT );
my $limit = ( $serial + 10 );
# &IPC::SysV::ftok has likely made the serial unique, but as stated in the manual page, there is no guarantee
while( $serial <= $limit )
{
$id = shmget( $serial, $opts->{size}, $newflags | &IPC::SysV::IPC_CREAT );
$serial++;
last if( defined( $id ) );
}
}
};
if( $@ )
{
if( $@ =~ /shmget[[:blank:]\h]+not[[:blank:]\h]+implemented/i )
{
return( $self->error( "IPC SysV is supported, but somehow shmget is not implemented: $@" ) );
}
else
{
return( $self->error( "Error while trying to get the shared memory id: $@" ) );
}
}
if( !defined( $id ) )
{
return( $self->error( "Unable to create shared memory id with key \"$serial\" and flags \"$flags\": $!" ) );
}
$self->serial( $serial );
# The value 3 can be anything above 0 and below the limit set by SEMMSL. On Linux system, this is usually 32,000. Seem semget(2) man page:
# "The argument nsems can be 0 (a don't care) when a semaphore set is not being created. Otherwise, nsems must be greater than 0 and less than or equal to the maximum number of semaphores per semaphore set (SEMMSL)."
my $semid;
# try-catch
local $@;
eval
{
$semid = semget( $serial, ( $create ? 3 : 0 ), $flags );
if( !defined( $semid ) )
{
my $newflags = ( $flags | &IPC::SysV::IPC_CREAT );
$semid = semget( $serial, 3, $newflags );
}
};
if( $@ )
{
if( $@ =~ /semget[[:blank:]\h]+not[[:blank:]\h]+implemented/i )
{
return( $self->error( "IPC SysV is supported, but somehow semget is not implemented: $@" ) );
}
else
{
return( $self->error( "Error while trying to get the semaphore for shared memory id: $@" ) );
}
}
return( $self->error( "Unable to get a semaphore for shared memory key \"", ( $opts->{key} || $self->key ), "\" wth id \"$id\": $!" ) ) if( !defined( $semid ) );
my $new = $self->new(
key => ( $opts->{key} || $self->key ),
debug => $self->debug,
mode => $self->mode,
destroy => $self->destroy,
destroy_semaphore => $self->destroy_semaphore,
_packing_method => $self->_packing_method,
) || return( $self->error( "Cannot create object with key '", ( $opts->{key} || $self->key ), "': ", $self->error ) );
$new->key( $self->key );
$new->serial( $self->serial );
$new->id( $id );
$new->semid( $semid );
CORE::push( @$SHEM_REPO, $id );
$ID2OBJ->{ $id } = $new;
if( !defined( $new->op( @{$SEMOP_ARGS->{(LOCK_SH)}} ) ) )
{
return( $self->error( "Unable to set lock on sempahore: $!" ) );
}
my $there = $new->stat( SEM_MARKER );
$new->size( $opts->{size} );
# $new->flags( $flags );
if( $there == SHM_EXISTS )
{
}
else
{
# We initialise the semaphore with value of 1
$new->stat( SEM_MARKER, SHM_EXISTS ) ||
return( $new->error( "Unable to set semaphore during object creation: ", $new->error ) );
}
$new->op( @{$SEMOP_ARGS->{(LOCK_SH | LOCK_UN)}} );
return( $new );
}
sub owner { return( shift->_set_get_scalar( 'owner', @_ ) ); }
sub pid
{
my $self = shift( @_ );
my $sem = shift( @_ );
my $semid = $self->semid ||
return( $self->error( "No semaphore set yet. You must open the shared memory first to remove semaphore." ) );
no strict 'subs';
my $arg = 0;
# try-catch
local $@;
my @rv = eval
{
my $v = semctl( $semid, $sem, &IPC::SysV::GETPID, $arg );
return( $v ? 0 + $v : undef() );
};
if( $@ )
{
return( $self->error( "Error trying to get semaphore pid using semaphore id ${semid}: $@" ) );
}
return( $rv[0] );
}
sub rand
{
my $self = shift( @_ );
my $size = $self->size || 1024;
no strict 'subs';
# try-catch
local $@;
my $key;
eval
{
$key = shmget( &IPC::SysV::IPC_PRIVATE, $size, &IPC::SysV::S_IRWXU | &IPC::SysV::S_IRWXG | &IPC::SysV::S_IRWXO );
};
if( $@ )
{
return( $self->error( "Error trying to get a random private key using shmget and IPC_PRIVATE: $@" ) );
}
if( !defined( $key ) )
{
return( $self->error( "Unable to generate a share memory key: $!" ) )
}
else
{
return( $key );
}
}
# $self->read( $buffer, $size );
# $self->read( $buffer );
# my $data = $self->read;
sub read
{
my( $self, $buf ) = @_;
my $size;
$size = int( $_[2] ) if( scalar( @_ ) > 2 );
# Optional length parameter for non-reference data only
$size //= int( $self->size || SHM_BUFSIZ );
my $id = $self->id;
return( $self->error( "No shared memory id! Have you opened it first?" ) ) if( !length( $id ) );
my $buffer = '';
my $addr = $self->addr;
if( $addr )
{
memread( $addr, $buffer, 0, $size ) ||
return( $self->error( "Unable to read data from shared memory address \"$addr\" using memread: $!" ) );
}
else
{
shmread( $id, $buffer, 0, $size ) ||
return( $self->error( "Unable to read data from shared memory id \"$id\": $!" ) );
}
# Get rid of nulls end padded
# 2022-08-03: Ok, null bytes are added to Storable and CBOR::XS serialised data,
# so we cannot just remove them. Instead we encapsulate the serialised data
# $buffer = unpack( "A*", $buffer );
my $packing = $self->_packing_method;
my $data;
if( CORE::length( $buffer ) )
{
# There may be encapsulation of data before writing data to memory.
# e.g.: MG[14]something here
if( index( $buffer, 'MG[' ) == 0 )
{
my $def = substr( $buffer, 0, index( $buffer, ']' ) + 1, '' );
# Get the string length stored
my $len = int( substr( $def, 3, -1 ) );
# Remove any possible remaining unwanted data
substr( $buffer, $len, length( $buffer ), '' );
}
# try-catch
local $@;
eval
{
if( $packing eq 'json' )
{
$data = $self->_decode_json( $buffer );
}
elsif( $packing eq 'cbor' )
{
$data = $self->deserialise(
data => $buffer,
serialiser => 'CBOR::XS',
allow_sharing => 1,
( defined( $self->{base64} ) ? ( base64 => $self->{base64} ) : () ),
);
}
elsif( $packing eq 'sereal' )
{
$data = $self->deserialise(
data => $buffer,
serialiser => 'Sereal',
freeze_callbacks => 1,
( defined( $self->{base64} ) ? ( base64 => $self->{base64} ) : () ),
);
}
# By default Storable::Improved
else
{
# $data = Storable::Improved::thaw( $buffer );
$data = $self->deserialise(
data => $buffer,
serialiser => 'Storable::Improved',
( defined( $self->{base64} ) ? ( base64 => $self->{base64} ) : () ),
);
}
};
if( $@ )
{
return( $self->error( "An error occured while decoding data using $packing with base64 set to '", ( $self->{base64} // '' ), "': $@" ) );
}
}
else
{
$data = $buffer;
}
if( scalar( @_ ) > 1 )
{
$_[1] = $data;
return( CORE::length( $_[1] ) || "0E0" );
}
else
{
return( $data );
}
}
sub remove
{
my $self = shift( @_ );
return(1) if( $self->removed );
my $id = $self->id();
return( $self->error( "No shared memory id! Have you opened it first?" ) ) if( !length( $id ) );
my $semid = $self->semid;
return( $self->error( "No semaphore set yet. You must open the shared memory first to remove semaphore." ) ) if( !length( $semid ) );
$self->unlock();
no strict 'subs';
# Remove share memory segment
if( !defined( shmctl( $id, &IPC::SysV::IPC_RMID, 0 ) ) )
{
return( $self->error( "Unable to remove share memory segement id '$id' (IPC_RMID is '", &IPC::SysV::IPC_RMID, "'): $!" ) );
}
# Remove semaphore
my $rv;
my $arg = 0;
if( !defined( $rv = semctl( $semid, 0, &IPC::SysV::IPC_RMID, $arg ) ) )
{
warn( "Warning only: could not remove the semaphore id \"$semid\": $!" ) if( $self->_warnings_is_enabled );
}
$self->removed( $rv ? 1 : 0 );
if( $rv )
{
for( my $i = 0; $i < scalar( @$SHEM_REPO ); $i++ )
{
my $this_id = $SHEM_REPO->[$i];
my $obj = $ID2OBJ->{ $this_id };
if( Scalar::Util::blessed( $obj ) && $this_id eq $id )
{
CORE::splice( @$SHEM_REPO, $i, 1 );
CORE::delete( $ID2OBJ->{ $this_id } );
last;
}
}
$self->id( undef() );
$self->semid( undef() );
}
return( $rv ? 1 : 0 );
}
sub remove_semaphore
{
my $self = shift( @_ );
return(1) if( $self->removed_semaphore );
my $semid = $self->semid;
return( $self->error( "No semaphore set yet. You must open the shared memory first to remove semaphore." ) ) if( !length( $semid ) );
$self->unlock();
my $rv;
no strict 'subs';
my $arg = 0;
if( !defined( $rv = semctl( $semid, 0, &IPC::SysV::IPC_RMID, $arg ) ) )
{
warn( "Warning only: could not remove the semaphore id \"$semid\" with IPC::SysV::IPC_RMID value '", &IPC::SysV::IPC_RMID, "': $!" ) if( $self->_warnings_is_enabled );
}
$self->removed_semaphore( $rv ? 1 : 0 );
$self->semid( undef() );
return( $rv ? 1 : 0 );
}
sub removed { return( shift->_set_get_boolean( 'removed', @_ ) ); }
sub removed_semaphore { return( shift->_set_get_boolean( 'removed_semaphore', @_ ) ); }
sub reset
{
my $self = shift( @_ );
my $default;
if( @_ )
{
$default = shift( @_ );
}
else
{
$default = '';
}
$self->lock( LOCK_EX );
$self->write( $default );
$self->unlock;
return( $self );
}
sub semid { return( shift->_set_get_scalar( 'semid', @_ ) ); }
sub sereal { return( shift->_packing_method( 'sereal' ) ); }
sub serial { return( shift->_set_get_scalar( 'serial', @_ ) ); }
sub serialiser { return( shift->_set_get_scalar( '_packing_method', @_ ) ); }
{
no warnings 'once';
*serializer = \&serialiser;
}
sub shmstat
{
my $self = shift( @_ );
my $data = '';
my $id = $self->id || return( $self->error( "No shared memory id set!" ) );
no strict 'subs';
shmctl( $id, &IPC::SysV::IPC_STAT, $data ) or
return( $self->error( "Unable to stat shared memory with id '$id': $!" ) );
return( Module::Generic::SharedStat->new->unpack( $data ) );
}
sub size { return( shift->_set_get_scalar( 'size', @_ ) ); }
sub stat
{
my $self = shift( @_ );
my $id = $self->semid;
return( $self->error( "No semaphore set yet. You must open the shared memory first to set the semaphore." ) ) if( !length( $id ) );
no strict 'subs';
if( @_ )
{
if( @_ == 1 )
{
my $sem = shift( @_ );
my $arg = 0;
my $v = semctl( $id, $sem, &IPC::SysV::GETVAL, $arg );
return( $v ? 0 + $v : undef() );
}
else
{
my( $sem, $val ) = @_;
semctl( $id, $sem, &IPC::SysV::SETVAL, $val ) ||
return( $self->error( "Unable to semctl with semaphore id '$id', semaphore '$sem', SETVAL='", &IPC::SysV::SETVAL, "' and value='$val': $!" ) );
}
}
else
{
my $data = '';
if( wantarray() )
{
semctl( $id, 0, &IPC::SysV::GETALL, $data ) || return( () );
return( ( unpack( "s$N*", $data ) ) );
}
else
{
semctl( $id, 0, &IPC::SysV::IPC_STAT, $data ) ||
return( $self->error( "Unable to stat semaphore with id '$id': $!" ) );
return( Module::Generic::SemStat->new->unpack( $data ) );
}
}
}
sub storable { return( shift->_packing_method( 'storable' ) ); }
sub supported { return( $SYSV_SUPPORTED ); }
sub unlock
{
my $self = shift( @_ );
return(1) if( !$self->locked );
my $semid = $self->semid;
return( $self->error( "No semaphore set yet. You must open the shared memory first to unlock semaphore." ) ) if( !length( $semid ) );
my $type = ( $self->locked | LOCK_UN );
$type ^= LOCK_NB if( $type & LOCK_NB );
if( defined( $self->op( @{$SEMOP_ARGS->{ $type }} ) ) )
{
$self->locked(0);
}
return( $self );
}
sub write
{
my $self = shift( @_ );
my $data;
if( scalar( @_ ) == 1 && ref( $_[0] ) )
{
$data = shift( @_ );
}
else
{
$data = \join( '', @_ );
}
my $id = $self->id();
my $size = int( $self->size() ) || SHM_BUFSIZ;
# my @callinfo = caller;
my $packing = $self->_packing_method;
my $encoded;
if( $packing eq 'json' )
{
# try-catch
local $@;
eval
{
$encoded = $self->_encode_json( $data );
};
if( $@ )
{
return( $self->error( "An error occured encoding data provided using $packing with base64 set to '", ( $self->{base64} // '' ), ": $@. Data was: '$data'" ) );
}
}
elsif( $packing eq 'cbor' )
{
# try-catch
local $@;
eval
{
$encoded = $self->serialise( $data,
serialiser => 'CBOR::XS',
allow_sharing => 1,
( defined( $self->{base64} ) ? ( base64 => $self->{base64} ) : () ),
);
};
if( $@ )
{
return( $self->error( "An error occured encoding data provided using $packing with base64 set to '", ( $self->{base64} // '' ), ": $@. Data was: '$data'" ) );
}
return( $self->error( "Unable to serialise ", CORE::length( $data ), " bytes of data using CBOR::XS with base64 set to '", ( $self->{base64} // '' ), ": ", $self->error ) ) if( !defined( $encoded ) );
}
elsif( $packing eq 'sereal' )
{
$self->_load_class( 'Sereal::Encoder' ) || return( $self->pass_error );
my $const;
$const = \&{"Sereal\::Encoder::SRL_ZLIB"} if( defined( &{"Sereal\::Encoder::SRL_ZLIB"} ) );
# try-catch
local $@;
eval
{
$encoded = $self->serialise( $data,
serialiser => 'Sereal',
freeze_callbacks => 1,
( defined( $const ) ? ( compress => $const->() ) : () ),
( defined( $self->{base64} ) ? ( base64 => $self->{base64} ) : () ),
);
};
if( $@ )
{
return( $self->error( "An error occured encoding data provided using $packing with base64 set to '", ( $self->{base64} // '' ), ": $@. Data was: '$data'" ) );
}
return( $self->error( "Unable to serialise ", CORE::length( $data ), " bytes of data using Sereal with base64 set to '", ( $self->{base64} // '' ), ": ", $self->error ) ) if( !defined( $encoded ) );
}
# Default to Storable::Improved
else
{
# local $Storable::forgive_me = 1;
# $encoded = Storable::Improved::freeze( $data );
eval
{
$encoded = $self->serialise( $data,
serialiser => 'Storable::Improved',
( defined( $self->{base64} ) ? ( base64 => $self->{base64} ) : () ),
);
};
if( $@ )
{
return( $self->error( "An error occured encoding data provided using $packing with base64 set to '", ( $self->{base64} // '' ), ": $@. Data was: '$data'" ) );
}
return( $self->error( "Unable to serialise ", CORE::length( $data ), " bytes of data using Storable with base64 set to '", ( $self->{base64} // '' ), ": ", $self->error ) ) if( !defined( $encoded ) );
}
# Simple encapsulation
# FYI: MG = Module::Generic
substr( $encoded, 0, 0, 'MG[' . length( $encoded ) . ']' );
my $len = length( $encoded );
if( $len > $size )
{
return( $self->error( "Data to write are ", length( $encoded ), " bytes long and exceed the maximum you have set of '$size'." ) );
}
my $addr = $self->addr;
if( $addr )
{
memwrite( $addr, $encoded, 0, $len ) ||
return( $self->error( "Unable to write to shared memory address '$addr' using memwrite: $!" ) );
}
else
{
# id, data, position, size
shmwrite( $id, $encoded, 0, $len ) ||
return( $self->error( "Unable to write to shared memory id '$id' with ${len} bytes of data encoded and memory size of $size: $!" ) );
}
return( $self );
}
sub _decode_json
{
my $self = shift( @_ );
my $data = shift( @_ );
# Nothing to do
return( $data ) if( !defined( $data ) || !CORE::length( $data ) );
my $j = JSON->new->utf8->relaxed->allow_nonref;
my $seen = {};
my $crawl;
$crawl = sub
{
my $this = shift( @_ );
my $type = Scalar::Util::reftype( $this );
return( $this ) if( ( $type eq 'HASH' || $type eq 'ARRAY' ) && ++$seen->{ Scalar::Util::refaddr( $this ) } > 1 );
if( $type eq 'HASH' )
{
# Found a former scalar reference, restore it
if( CORE::exists( $this->{__scalar_gen_shm} ) )
{
return( \$this->{__scalar_gen_shm} );
}
foreach my $k ( keys( %$this ) )
{
next if( !ref( $this->{ $k } ) );
$this->{ $k } = $crawl->( $this->{ $k } );
}
}
elsif( $type eq 'ARRAY' )
{
for( my $i = 0; $i < scalar( @$this ); $i++ )
{
next if( !ref( $this->[$i] ) );
$this->[$i] = $crawl->( $this->[$i] );
}
}
return( $this );
};
my $result;
# try-catch
local $@;
eval
{
my $decoded = $j->decode( $data );
$result = $crawl->( $decoded );
};
if( $@ )
{
return( $self->error( "An error occurred while trying to decode JSON data: $@" ) );
}
return( $result );
}
# Purpose of this method is to recursively check the given data and change scalar reference if they are anything else than 1 or 0, otherwise JSON would complain
sub _encode_json
{
my $self = shift( @_ );
my $data = shift( @_ );
my $seen = {};
my $crawl;
$crawl = sub
{
my $this = shift( @_ );
my $type = Scalar::Util::reftype( $this );
# Skip this reference if it is either hash or array and we have already seen it in order to avoid looping.
return( $this ) if( ( $type eq 'HASH' || $type eq 'ARRAY' ) && ++$seen->{ Scalar::Util::refaddr( $this ) } > 1 );
if( $type eq 'HASH' )
{
foreach my $k ( keys( %$this ) )
{
next if( !ref( $this->{ $k } ) );
$this->{ $k } = $crawl->( $this->{ $k } );
}
}
elsif( $type eq 'ARRAY' )
{
for( my $i = 0; $i < scalar( @$this ); $i++ )
{
next if( !ref( $this->[$i] ) );
$this->[$i] = $crawl->( $this->[$i] );
}
}
elsif( $type eq 'SCALAR' )
{
# The only supported value by JSON for a scalar reference
return( $this ) if( $$this eq "1" or $$this eq "0" );
my $pkg;
if( ( $pkg = Scalar::Util::blessed( $this ) ) )
{
if( overload::Method( $this => '""' ) )
{
$this = { __scalar_gen_shm => "$this", __package => $pkg };
}
else
{
$this = { __scalar_gen_shm => $$this, __package => $pkg };
}
}
else
{
$this = { __scalar_gen_shm => $$this };
}
}
return( $this );
};
my $ref = $crawl->( $data );
my $j = JSON->new->utf8->relaxed->allow_nonref->convert_blessed;
my $encoded;
# try-catch
local $@;
eval
{
$encoded = $j->encode( $ref );
};
if( $@ )
{
return( $self->error( "An error occurred while trying to JSON encode data: $@" ) );
}
return( $encoded );
}
sub _packing_method { return( shift->_set_get_scalar( '_packing_method', @_ ) ); }
sub _str2key
{
my $self = shift( @_ );
my $key = shift( @_ );
no strict 'subs';
if( !defined( $key ) || $key eq '' )
{
return( &IPC::SysV::IPC_PRIVATE );
}
my $path;
( $key, $path ) = ref( $key ) eq 'ARRAY' ? @$key : ( $key, [getpwuid($>)]->[7] );
$path = [getpwuid($path)]->[7] if( $path =~ /^\d+$/ );
$path ||= File::Spec->rootdir();
if( $key =~ /^\d+$/ )
{
my $id = &IPC::SysV::ftok( $path, $key ) ||
return( $self->error( "Unable to get a key using IPC::SysV::ftok: $!" ) );
return( $id );
}
else
{
# my $id = 0;
# $id += $_ for( unpack( "C*", $key ) );
require Digest::SHA;
my $hash = Digest::SHA::sha1_base64( $key );
my $id = ord( substr( $hash, 0, 1 ) );
# We use the root as a reliable and stable path.
# I initially though about using __FILE__, but during testing this would be in ./blib/lib and beside one user might use a version of this module somewhere while the one used under Apache/mod_perl2 could be somewhere else and this would render the generation of the IPC key unreliable and unrepeatable
# my $val = &IPC::SysV::ftok( File::Spec->rootdir(), $id );
my $val = &IPC::SysV::ftok( $path, $id );
return( $val );
}
}
sub DESTROY
{
my $self = shift( @_ );
return unless( $self->{id} );
$self->unlock;
$self->detach;
my $rv = $self->remove_semaphore;
if( $self->destroy )
{
my $stat = $self->shmstat();
# number of processes attached to the associated shared memory segment.
if( defined( $stat ) && ( $stat->nattch() == 0 ) )
{
$self->remove;
}
}
};
sub FREEZE
{
my $self = CORE::shift( @_ );
my $serialiser = CORE::shift( @_ ) // '';
my $class = CORE::ref( $self );
my %hash = %$self;
CORE::delete( @hash{ qw( addr id locked owner removed removed_semaphore semid ) } );
# Return an array reference rather than a list so this works with Sereal and CBOR
# On or before Sereal version 4.023, Sereal did not support multiple values returned
CORE::return( [$class, \%hash] ) if( $serialiser eq 'Sereal' && Sereal::Encoder->VERSION <= version->parse( '4.023' ) );
# But Storable want a list with the first element being the serialised element
CORE::return( $class, \%hash );
}
sub STORABLE_freeze { CORE::return( CORE::shift->FREEZE( @_ ) ); }
sub STORABLE_thaw { CORE::return( CORE::shift->THAW( @_ ) ); }
# NOTE: CBOR will call the THAW method with the stored classname as first argument, the constant string CBOR as second argument, and all values returned by FREEZE as remaining arguments.
# NOTE: Storable calls it with a blessed object it created followed with $cloning and any other arguments initially provided by STORABLE_freeze
sub THAW
{
my( $self, undef, @args ) = @_;
my $ref = ( CORE::scalar( @args ) == 1 && CORE::ref( $args[0] ) eq 'ARRAY' ) ? CORE::shift( @args ) : \@args;
my $class = ( CORE::defined( $ref ) && CORE::ref( $ref ) eq 'ARRAY' && CORE::scalar( @$ref ) > 1 ) ? CORE::shift( @$ref ) : ( CORE::ref( $self ) || $self );
my $hash = CORE::ref( $ref ) eq 'ARRAY' ? CORE::shift( @$ref ) : {};
my $new;
# Storable pattern requires to modify the object it created rather than returning a new one
if( CORE::ref( $self ) )
{
foreach( CORE::keys( %$hash ) )
{
$self->{ $_ } = CORE::delete( $hash->{ $_ } );
}
$new = $self;
}
else
{
$new = CORE::bless( $hash => $class );
}
CORE::return( $new );
}
END
{
foreach my $id ( @$SHEM_REPO )
{
my $s = $ID2OBJ->{ $id } || next;
next if( $s->removed || !$s->id || !$s->destroy );
$s->detach;
$s->remove;
}
};
# NOTE: Module::Generic::SharedStat class
{
package
Module::Generic::SharedStat;
use IPC::SysV;
require IPC::SharedMem;
our $VERSION = 'v0.1.0';
use constant UID => 0;
use constant GID => 1;
use constant CUID => 2;
use constant CGID => 3;
use constant MODE => 4;
use constant SEGSZ => 5;
use constant LPID => 6;
use constant CPID => 7;
use constant NATTCH => 8;
use constant ATIME => 9;
use constant DTIME => 10;
use constant CTIME => 11;
sub new
{
my $this = shift( @_ );
my @vals = @_;
return( bless( [ @vals ] => ref( $this ) || $this ) );
}
sub unpack
{
my $self = shift( @_ );
my $data = shift( @_ );
# XS method
my $d = IPC::SharedMem::stat->new->unpack( $data );
# my @unpacked = unpack( "i*", $data );
return( $self->new( @$d ) );
}
# time when the last attach was completed to the associated shared memory segment.
sub atime { return( shift->[ATIME] ); }
sub cgid { return( shift->[CGID] ); }
# process ID of the creator of the shared memory entry.
sub cpid { return( shift->[CPID] ); }
# time when the associated entry was created or changed.
sub ctime { return( shift->[CTIME] ); }
sub cuid { return( shift->[CUID] ); }
# time the last detach was completed on the associated shared memory segment.
sub dtime { return( shift->[DTIME] ); }
sub gid { return( shift->[GID] ); }
# process ID of the last process to attach or detach the shared memory segment.
sub lpid { return( shift->[LPID] ); }
sub mode { return( shift->[MODE] ); }
# number of processes attached to the associated shared memory segment.
sub nattch { return( shift->[NATTCH] ); }
# size of the associated shared memory segment in bytes.
sub segsz { return( shift->[SEGSZ] ); }
sub uid { return( shift->[UID] ); }
sub FREEZE
{
my $self = CORE::shift( @_ );
my $serialiser = CORE::shift( @_ ) // '';
my $class = CORE::ref( $self );
my @array = @$self;
# Return an array reference rather than a list so this works with Sereal
# On or before Sereal version 4.023, Sereal did not support multiple values returned
CORE::return( [$class, \@array] ) if( $serialiser eq 'Sereal' && Sereal::Encoder->VERSION <= version->parse( '4.023' ) );
# But CBOR and Storable want a list with the first element being the serialised element
CORE::return( $class, \@array );
}
sub STORABLE_freeze { CORE::return( CORE::shift->FREEZE( @_ ) ); }
sub STORABLE_thaw { CORE::return( CORE::shift->THAW( @_ ) ); }
# NOTE: CBOR will call the THAW method with the stored classname as first argument, the constant string CBOR as second argument, and all values returned by FREEZE as remaining arguments.
# NOTE: Storable calls it with a blessed object it created followed with $cloning and any other arguments initially provided by STORABLE_freeze
sub THAW
{
my( $self, undef, @args ) = @_;
my $ref = ( CORE::scalar( @args ) == 1 && CORE::ref( $args[0] ) eq 'ARRAY' ) ? CORE::shift( @args ) : \@args;
my $class = ( CORE::defined( $ref ) && CORE::ref( $ref ) eq 'ARRAY' && CORE::scalar( @$ref ) > 1 ) ? CORE::shift( @$ref ) : ( CORE::ref( $self ) || $self );
my $array = CORE::ref( $ref ) eq 'ARRAY' ? $ref : [];
# Storable pattern requires to modify the object it created rather than returning a new one
if( CORE::ref( $self ) )
{
@$self = @$array;
CORE::return( $self );
}
else
{
my $new = bless( $array => $class );
CORE::return( $new );
}
}
sub TO_JSON { CORE::return( [ @{$_[0]} ] ); }
}
# NOTE: Module::Generic::SemStat class
{
package
Module::Generic::SemStat;
use IPC::SysV;
require IPC::Semaphore;
our $VERSION = 'v0.1.0';
use constant UID => 0;
use constant GID => 1;
use constant CUID => 2;
use constant CGID => 3;
use constant MODE => 4;
use constant CTIME => 5;
use constant OTIME => 6;
use constant NSEMS => 7;
sub new
{
my $this = shift( @_ );
my @vals = @_;
return( bless( [ @vals ] => ref( $this ) || $this ) );
}
sub unpack
{
my $self = shift( @_ );
my $data = shift( @_ );
# my @unpacked = unpack( "i*", $data );
# XS method
my $d = IPC::Semaphore::stat->new->unpack( $data );
return( $self->new( @$d ) );
}
sub cgid { return( shift->[CGID] ); }
sub ctime { return( shift->[CTIME] ); }
sub cuid { return( shift->[CUID] ); }
sub gid { return( shift->[GID] ); }
sub mode { return( shift->[MODE] ); }
# number of semaphores in the set associated with the semaphore entry.
sub nsems { return( shift->[NSEMS] ); }
# time the last semaphore operation was completed on the set associated with the semaphore entry.
sub otime { return( shift->[OTIME] ); }
sub uid { return( shift->[UID] ); }
sub FREEZE
{
my $self = CORE::shift( @_ );
my $serialiser = CORE::shift( @_ ) // '';
my $class = CORE::ref( $self );
my @array = @$self;
# Return an array reference rather than a list so this works with Sereal
# On or before Sereal version 4.023, Sereal did not support multiple values returned
CORE::return( [$class, \@array] ) if( $serialiser eq 'Sereal' && Sereal::Encoder->VERSION <= version->parse( '4.023' ) );
# But CBOR and Storable want a list with the first element being the serialised element
CORE::return( $class, \@array );
}
sub STORABLE_freeze { CORE::return( CORE::shift->FREEZE( @_ ) ); }
sub STORABLE_thaw { CORE::return( CORE::shift->THAW( @_ ) ); }
# NOTE: CBOR will call the THAW method with the stored classname as first argument, the constant string CBOR as second argument, and all values returned by FREEZE as remaining arguments.
# NOTE: Storable calls it with a blessed object it created followed with $cloning and any other arguments initially provided by STORABLE_freeze
sub THAW
{
my( $self, undef, @args ) = @_;
my $ref = ( CORE::scalar( @args ) == 1 && CORE::ref( $args[0] ) eq 'ARRAY' ) ? CORE::shift( @args ) : \@args;
my $class = ( CORE::defined( $ref ) && CORE::ref( $ref ) eq 'ARRAY' && CORE::scalar( @$ref ) > 1 ) ? CORE::shift( @$ref ) : ( CORE::ref( $self ) || $self );
my $array = CORE::ref( $ref ) eq 'ARRAY' ? $ref : [];
# Storable pattern requires to modify the object it created rather than returning a new one
if( CORE::ref( $self ) )
{
@$self = @$array;
CORE::return( $self );
}
else
{
my $new = bless( $array => $class );
CORE::return( $new );
}
}
sub TO_JSON { CORE::return( [ @{$_[0]} ] ); }
}
1;
__END__