Dave Cross: Still Munging Data With Perl: Online event - Mar 17 Learn more

use strict;
use Coro;
our $VERSION = '0.1.0_16';
sub run {
my $opts = shift;
$opts->{no_pager}++; # causes problems with something in Coro?
my $ctx = App::bif::Context->new($opts);
my $dbw = $ctx->dbw;
if ( $opts->{path} ) {
foreach my $path ( @{ $opts->{path} } ) {
my @p = $dbw->get_projects($path);
$ctx->err( 'ProjectNotFound', 'project not found: ' . $path )
unless @p;
if ( $opts->{hub} ) {
foreach my $alias ( @{ $opts->{hub} } ) {
my @rl = $dbw->get_hub_locations($alias);
$ctx->err( 'HubNotFound', 'hub not found: ' . $alias )
unless @rl;
# Consider upping PRAGMA cache_size? Or handle that in Bif::Role::Sync?
my @hubs = $dbw->xhashes(
select => [ 'h.id', 'h.alias', 'hl.location', 't.uuid' ],
from => 'hubs h',
inner_join => 'topics t',
on => 't.id = h.id',
inner_join => 'hub_locations hl',
on => 'hl.id = h.default_location_id',
where => {
'h.local' => undef,
$opts->{hub} ? ( 'h.alias' => $opts->{hub} ) : (),
return $ctx->err( 'SyncNone', 'no (matching) hubs registered' )
unless @hubs;
$|++; # no buffering
foreach my $hub (@hubs) {
my $error;
my $cv = AE::cv;
$ctx->lprint("$hub->{alias}: connecting...");
my $client = Bif::Client->new(
db => $dbw,
hub => $hub->{location},
debug => $ctx->{debug},
debug_bs => $ctx->{debug_bs},
on_error => sub {
$error = shift;
my $stderr = $client->child->stderr;
my $stderr_watcher;
$stderr_watcher = AE::io $stderr, 0, sub {
my $line = $stderr->getline;
if ( !defined $line ) {
undef $stderr_watcher;
print STDERR "$hub->{alias}: $line";
my $coro = async {
eval {
sub {
sub {
$ctx->lprint("$hub->{alias} (meta): $_[0]");
my $previous = $dbw->get_max_update_id;
my $status = $client->sync_hub( $hub->{id} );
print "\n";
if ( $status eq 'RepoMatch'
or $status eq 'RepoSync' )
my @projects = $dbw->xhashes(
select => [ 'p.id', 'p.path' ],
from => 'projects p',
where => {
'p.hub_id' => $hub->{id},
'p.local' => 1,
? ( 'p.path' => $opts->{path} )
: (),
order_by => 'p.path',
foreach my $p (@projects) {
sub {
"$hub->{alias} [$p->{path}]: $_[0]"
$status = $client->sync_project( $p->{id} );
print "\n";
unless ( $status eq 'ProjectMatch'
or $status eq 'ProjectSync' )
$error = $status;
else {
$error = $status;
# Catch up on errors
undef $stderr_watcher;
while ( my $line = $stderr->getline ) {
print STDERR "$hub->{alias}: $line";
return if $error;
my $current = $dbw->get_max_update_id;
my $delta = $current - $previous;
author => $ctx->{user}->{name},
email => $ctx->{user}->{email},
message => "sync $hub->{location} "
. "[+$delta] $ctx->{message}",
$error .= $@ if $@;
return $cv->send( !$error );
if ( $cv->recv ) {
return $ctx->err( 'Unknown', $error );
# TODO make this dependent on how many updates received/made since the last
# analyze
return $ctx->ok('Sync');
=head1 NAME
bif-sync - exchange updates with hubs
=head1 VERSION
0.1.0_16 (2014-05-01)
bif sync [OPTIONS...]
The C<bif sync> command connects to all remote repositories registered
as hubs in the local repository and exchanges updates.
=item --hub, -H HUB
Limit the hubs to sync with. This option can be used multiple times.
=item --path, -p PATH
Limit the projects to sync. This option can be used multiple times.
=head1 SEE ALSO
=head1 AUTHOR
Mark Lawrence E<lt>nomad@null.netE<gt>
Copyright 2013-2014 Mark Lawrence <nomad@null.net>
This program is free software; you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by the
Free Software Foundation; either version 3 of the License, or (at your
option) any later version.