Sponsoring The Perl Toolchain Summit 2025: Help make this important event another success Learn more

use strict;
use Coro;
use DBIx::ThinSQL qw/qv/;
use Log::Any '$log';
our $VERSION = '0.1.0_16';
my %import_functions = (
NEW => {
issue => 'func_import_issue',
issue_status => 'func_import_issue_status',
project => 'func_import_project',
project_status => 'func_import_project_status',
hub => 'func_new_hub',
hub_location => 'func_import_hub_location',
task => 'func_import_task',
task_status => 'func_import_task_status',
update => 'func_import_update',
},
UPDATE => {
issue => 'func_import_issue_update',
issue_status => 'func_import_issue_status_update',
project => 'func_import_project_update',
project_status => 'func_import_project_status_update',
hub => 'func_import_hub_update',
hub_location => 'func_import_hub_location_update',
task => 'func_import_task_update',
task_status => 'func_import_task_status_update',
},
QUIT => {},
CANCEL => {},
);
sub recv_hub_updates {
my $self = shift;
my $db = $self->db;
my ( $action, $total ) = $self->read;
$total //= '*undef*';
if ( $action ne 'TOTAL' or $total !~ m/^\d+$/ ) {
return "expected TOTAL <int> (not $action $total)";
}
my $ucount;
my $i = $total;
my $got = 0;
$self->updates_recv("$got/$total");
$self->trigger_on_update;
while ( $got < $total ) {
my ( $action, $type, $ref ) = $self->read;
if ( !exists $import_functions{$action} ) {
return "not implemented: $action";
}
if ( !exists $import_functions{$action}->{$type} ) {
return "not implemented: $action $type";
}
if ( $action eq 'NEW' and $type eq 'update' ) {
$ucount = delete $ref->{ucount};
}
my $func = $import_functions{$action}->{$type};
# This should be a savepoint?
$db->xdo(
insert_into => $func,
values => $ref,
);
$ucount--;
if ( 0 == $ucount ) {
$db->xdo(
insert_into => 'func_merge_updates',
values => { merge => 1 },
);
}
$got++;
$self->updates_recv("$got/$total");
$self->trigger_on_update;
}
$self->updates_recv( ( ' ' x length("$got/") ) . $total );
$self->trigger_on_update;
return $total;
}
sub real_import_hub {
my $self = shift;
my $result = $self->recv_hub_updates;
if ( $result =~ m/^\d+$/ ) {
$self->write( 'Recv', $result );
return 'RepoImported';
}
$self->write( 'ProtocolError', $result );
return $result;
}
sub real_sync_hub {
my $self = shift;
my $id = shift || die caller;
my $prefix = shift;
my $tmp = shift || 'sync_' . sprintf( "%08x", rand(0xFFFFFFFF) );
$prefix = '' unless defined $prefix;
my $prefix2 = $prefix . '_';
my $db = $self->db;
my $on_update = $self->on_update;
$db->do("CREATE TEMPORARY TABLE $tmp(id INTEGER, ucount INTEGER)")
if ( $prefix eq '' );
$on_update->( 'matching: ' . $prefix2 ) if $on_update;
my @refs = $db->xarrays(
select => [qw/rm.prefix rm.hash/],
from => 'hubs_merkle rm',
where =>
[ 'rm.hub_id = ', qv($id), ' AND rm.prefix LIKE ', qv($prefix2) ],
);
my $here = { map { $_->[0] => $_->[1] } @refs };
$self->write( 'MATCH', $prefix2, $here );
my ( $action, $mprefix, $there ) = $self->read;
return "expected MATCH $prefix2 {} (not $action $mprefix ...)"
unless $action eq 'MATCH'
and $mprefix eq $prefix2
and ref $there eq 'HASH';
my @next;
my @missing;
while ( my ( $k, $v ) = each %$here ) {
if ( !exists $there->{$k} ) {
push( @missing, $k );
}
elsif ( $there->{$k} ne $v ) {
push( @next, $k );
}
}
if (@missing) {
my @where;
foreach my $miss (@missing) {
push( @where, ' OR ' ) if @where;
push( @where, "u.prefix LIKE ", qv( $miss . '%' ) ),;
}
$self->db->xdo(
insert_into => "$tmp(id,ucount)",
select => [ 'u.id', 'u.ucount' ],
from => 'updates u',
inner_join => 'hub_related_updates rru',
on => {
'rru.update_id' => \'u.id',
'rru.hub_id' => $id,
},
where => \@where,
);
}
if (@next) {
foreach my $next ( sort @next ) {
$self->real_sync_hub( $id, $next, $tmp );
}
}
return unless $prefix eq '';
my $send = async {
my ($total) = $self->db->xarray(
select => 'COALESCE(sum(t.ucount), 0)',
from => "$tmp t",
);
$self->write( 'TOTAL', $total );
my $update_list = $db->xprepare(
select => [
'u.id', 'u.uuid',
'p.uuid AS parent_uuid', 'u.mtime',
'u.mtimetz', 'u.author',
'u.email', 'u.lang',
'u.message', 'u.ucount',
],
from => "$tmp t",
inner_join => 'updates u',
on => 'u.id = t.id',
left_join => 'updates p',
on => 'p.id = u.parent_id',
order_by => 'u.id ASC',
);
$update_list->execute;
return $self->send_updates( $update_list, $total );
};
my $r1 = $self->recv_hub_updates;
my $r2 = $send->join;
if ( $r1 =~ m/^\d+$/ ) {
$self->write( 'Recv', $r1 );
my ( $recv, $count ) = $self->read;
return 'RepoSync' if $recv eq 'Recv' and $count == $r2;
$log->debug("MEH: $count $r2");
return $recv;
}
$self->write( 'ProtocolError', $r1 );
return $r1;
}
sub real_export_hub {
my $self = shift;
my $id = shift;
my ($total) = $self->db->xarray(
select => 'sum(u.ucount)',
from => 'hub_related_updates rru',
inner_join => 'updates u',
on => 'u.id = rru.update_id',
where => { 'rru.hub_id' => $id },
);
$self->write( 'TOTAL', $total );
my $sth = $self->db->xprepare(
select => [
'updates.id', 'updates.uuid',
'parents.uuid AS parent_uuid', 'updates.mtime',
'updates.mtimetz', 'updates.author',
'updates.email', 'updates.lang',
'updates.message', 'updates.ucount',
],
from => 'hub_related_updates AS rru',
inner_join => 'updates',
on => 'updates.id = rru.update_id',
left_join => 'updates AS parents',
on => 'parents.id = updates.parent_id',
where => { 'rru.hub_id' => $id },
order_by => 'updates.id ASC',
);
$sth->execute;
$self->send_updates( $sth, $total );
my ( $recv, $count ) = $self->read;
return 'RepoExported' if $recv eq 'Recv' and $count == $total;
return $recv;
}
1;