#!/usr/bin/env perl
package _PARALELL_JOBS;
use strict;
use warnings;
use IPC::Open3;
use Time::HiRes qw(usleep time);
use Fcntl;
use Carp;
use Getopt::Long;
use Log::Log4perl;
use IO::Handle;

my $Log = undef;
$|++;

use POSIX qw(:sys_wait_h);

sub _processes         { @_ > 1 ? $_[0]->{_processes}=$_[1]         : $_[0]->{_processes}}
sub _commandsToProcess { @_ > 1 ? $_[0]->{_commandsToProcess}=$_[1] : $_[0]->{_commandsToProcess}}
sub verbose            { @_ > 1 ? $_[0]->{_verbose}=$_[1]           : $_[0]->{_verbose}}
sub norun              { @_ > 1 ? $_[0]->{_norun}=$_[1]             : $_[0]->{_norun}}
sub cmdfile            { @_ > 1 ? $_[0]->{_cmdfile}=$_[1]           : $_[0]->{_cmdfile}}
sub maxjobs            { @_ > 1 ? $_[0]->{_maxjobs}=$_[1]           : $_[0]->{_maxjobs}}
sub quiet              { @_ > 1 ? $_[0]->{_quiet}=$_[1]             : $_[0]->{_quiet}}
sub jobnum             { @_ > 1 ? $_[0]->{_jobnum}=$_[1]            : $_[0]->{_jobnum}}
sub sleep              { @_ > 1 ? $_[0]->{_sleep}=$_[1]             : $_[0]->{_sleep}}

sub autoCmdLineArgs {
  return (
           qw(
              verbose
              norun
              cmdfile=s
              maxjobs=s
              sleep=s
              quiet
             )
         );
}

sub main {
  my($pkg) = @_;

  Log::Log4perl->init( $ENV{LOG4PERL_CONFIG} ) if $ENV{LOG4PERL_CONFIG};
  $Log = Log::Log4perl->get_logger( __PACKAGE__ );

  my $app = bless {}, $pkg;
  my %opts;
  foreach my $arg ($app->autoCmdLineArgs) {
    my $m = $arg;
    $m = substr($m,0,index($m,'=')) if -1 != index($m,'=');
    $opts{$arg} = \$app->{"_${m}"};
  }

  unless (GetOptions(%opts)) {
    confess "Error parsing command line arguments!\n";
  }

  if ($app->run) {
    print STDERR "EXIT SUCCESS\n";
    exit 0;
  }

  print STDERR "EXIT ERROR\n";
  exit 1;
}

sub run {
  my($self) = @_;

  $self->verbose($self->quiet ? 0 : 1);
  $self->sleep(1) unless $self->sleep;

  $self->jobnum(0);
  $self->_processes({});
  $self->maxjobs(2) unless $self->maxjobs;

  unless ( $self->quiet ) {
    print STDERR "Argument Summary:\n";
    foreach my $arg ($self->autoCmdLineArgs) {
      my $m = $arg;
      $m = substr($m,0,index($m,'=')) if -1 != index($m,'=');
      printf STDERR "%15s: %s\n", $m, $self->$m||'';
    }
  }

  $self->loadCommandFile;
  my($jobsRun,$errors) = $self->runCommandFile;
  print STDERR "FINISH: Out of $jobsRun jobs run, $errors had error exit codes\n";
  return 0 == $errors;
}

sub runCommandFile {
  my($self) = @_;
  my $totalErrors = 0;
  my $totalReaped = 0;
  while ($self->jobsLeftToProcess || $self->numJobsRunning ) {
    my $status = 0;
    my ($reaped,$errors) = $self->reapCompletedJobs;
    if ($reaped) {
      $status++;
      $totalReaped += $reaped;
      $totalErrors += $errors;
    }

    print STDERR "STATUS: ",scalar($self->jobsLeftToProcess),
	" jobs left to run, ",
	$self->numJobsRunning,
	" are running out of ",
	$self->maxjobs," max jobs\n";

    while ( $self->jobsLeftToProcess && $self->numJobsRunning < $self->maxjobs ) {
      $self->spawnNextJob;
      ++$status;
    }

    next if $status; # we either started or reaped a job
    $Log->info("Waiting for background processes...") if $Log->is_info;
    CORE::sleep($self->sleep);
  }

  return ($totalReaped,$totalErrors);
}

sub numJobsRunning {
  my($self) = @_;
  return scalar keys %{ $self->_processes };
}

sub jobsLeftToProcess {
  my($self) = @_;
  return scalar @{ $self->_commandsToProcess };
}

sub loadCommandFile {
  my($self) = @_;

  my $fh;
  confess "Error, you must pass a command file with --cmdfile=file" unless $self->cmdfile;
  confess "Error, can't read command file: ".$self->cmdfile unless -r $self->cmdfile;
  unless (open $fh, "<", $self->cmdfile) {
    confess "Error opening file: ",$self->cmdfile," : $!";
  }

  my @cmds = <$fh>;
  close $fh;
  chomp @cmds;
  $self->_commandsToProcess(\@cmds);
}

sub reapCompletedJobs {
  my($self) = @_;
  my($reaped,$errors) = (0,0);

  print STDERR "checking job statuses ",scalar(keys %{$self->_processes})," are running...\n" if $self->verbose;

  foreach my $childPid (keys %{ $self->_processes } ) {
    my $child = $self->_processes->{$childPid};
    my $pid = waitpid( $child->{PID}, WNOHANG );
    $Log->debug( " polled $child->{PID} => $pid\n" ) if $Log->is_debug;

    if ( -1 == $pid ) {
      print STDERR "ERROR on waitpid($child->{PID}), invalid pid? : $!\n";
      $child->{FINISHED} = 1;
      print STDERR $child->{ERROR} if $child->{ERROR};
      ++$reaped;
      ++$errors;
      next;
    }

    $self->_readChildHandles($child);

    if ( $pid == $child->{PID} ) {
      ++$reaped;
      $child->{FINISHED} = 1;
      print STDERR $child->{ERROR} if $child->{ERROR};
      $child->{EXIT_STATUS} = ($? >> 8);
      if ( $self->verbose ) {
        print "child($child->{CMD})==>\n",$child->{BUFF},"\n<==child($child->{CMD})\n";
        print "child($child->{CMD})==>\n",$child->{ERR_BUFF},"\n<==child($child->{CMD})\n" if $child->{ERR_BUFF};
      }
      else {
        print $child->{BUFF};
        print $child->{ERR_BUFF} if $child->{ERR_BUFF};
      }
    }
  }

  foreach my $pid ( keys %{$self->_processes} ) {
    my $child = $self->_processes->{$pid};
    if ( $child->{FINISHED} ) {
      $Log->debug("Removing completed process: $pid") if $Log->is_debug;
      delete $self->_processes->{$pid};
      if ( $child->{EXIT_STATUS} ) {
        ++$errors;
        print STDERR "CHILD: $child->{CMD} exited with $child->{EXIT_STATUS}\n";
      }
    }
  }

  return($reaped,$errors);
}

sub _readChildHandles {
  my($self,$child) = @_;
  my $nr = undef;
  $Log->debug( " reading input from $child->{CMD}\n" ) if $Log->is_debug;
  while( $nr = sysread($child->{READ_FH},$child->{BUFF},1024,length($child->{BUFF})) ) {
    $Log->debug( " reading input from $child->{CMD} => '$child->{BUFF}'\n" ) if $Log->is_debug;
  }

  while( $nr = sysread($child->{ERROR_FH},$child->{ERR_BUFF},1024,length($child->{ERR_BUFF})) ) {
    $Log->debug( " reading input from $child->{CMD} => '$child->{ERR_BUFF}'\n" ) if $Log->is_debug;
    print STDERR $child->{CMD}," =>",$child->{ERR_BUFF},"\n";
  }

  return $nr;
}

sub spawnNextJob {
  my($self) = @_;
  my $jobnum = $self->jobnum($self->jobnum+1);
  my $cmd = shift @{ $self->_commandsToProcess };
  return undef unless $cmd;
  print STDERR "RUN[$jobnum]: $cmd\n";
  return if $self->norun;
  my($in,$out,$rerr) = ('','');
  my $pid;
  eval {
    print STDERR "spawning new job: $cmd\n" if $self->verbose;
    $Log->debug("execing: ($cmd)") if $Log->is_debug;
    my $werr;
    pipe $rerr,$werr;
    close $werr;
    unless ($rerr) {
      print STDERR " *** error creating pipe for error read handle! : $!\n";
      exit;
    }
    $pid = open3($in,$out,$rerr,$cmd);
    unless( $pid ) {
      print STDERR " *** exec failure in child: $!\n";
      exit;
    }
  };

  if ($@) {
    die $@;
  }

  my $child_info = {
                    BUFF => '',
                    ERR_BUFF => '',
                    CMD => $cmd,
                    PID => $pid,
                    ERROR => undef,
                    WRITE_FH => $in,
                    READ_FH => $out,
                    ERROR_FH => $rerr,
                    FINISHED => undef,
                    EXIT_STATUS => undef,
                   };
  $self->_processes->{$pid} = $child_info;

  if($@) {
    $child_info->{ERROR} = "Error calling open3('$cmd'): $@";
    $child_info->{FINISHED} = 1;
  }

  foreach my $fh ($in,$out,$rerr) {
    next unless $fh;
    unless( $fh->fcntl( F_SETFL, O_NONBLOCK )) {
      confess "Error, unable to set nonblock on file handle: $!\n";
    }
  }

  return 1;
}

1;

package main;
_PARALELL_JOBS->main;


__DATA__

=head1 NAME

parllel-jobs

=head1 SNYOPSIS

  parllel-jobs --cmdfile=file 
    [--maxjobs=<number>] 
    [--quiet] [--sleep=<number>]
    [--norun] [--verbose]

=head1 DESCRIPTION

This is a simple shell utility that allows you to run several other
processes in parallel.

=head1 COMMAND LINE OPTIONS

=over 4

=item

--verbose

Be extra verbose.  This will print out more information about the
currently running processes.

=item

--norun

Don't actualy execute the commands - just do a dry run.

=item

--cmdfile=s

Specify the command file.

=item

--maxjobs=s

Specify the maximum number of parallel jobs to run at one time.

=item

--sleep=s

Set the sleep time (default is 1 second) to wait between polling for
process status changes.

=item

--quiet

Be quiet.  This supresses both messages that --verbose would print as
well as other messages.

=back

=head1 COMMAND FILE FORMAT

Command files should be one command per line in the file.  Each line
is just a single string that is executed the shell.

Example:

  # specify several downloads to be run in parallel
  wget http://some.host/software.tar.gz
  wget http://some.host/database.mdb
  wget http://some.host/movie-trailer.mpeg
  wget http://some.host/linux-distrubtion.iso

  # run several remove ssh commands (you'd 
  # really want to set up ssh-agent before
  # trying this though)
  ssh user@host process_job 1
  ssh user@host process_job 2
  ssh user@host process_job 3
  ssh user@host process_job 4
  ssh user@host process_job 5
  ssh user@host process_job 6


=head1 KNOWN BUGS

Processes that never exit will cause parallel-jobs to run forever :)

There are no facilities for adding more jobs to the queue while the
program is running.

=head1 AUTHORS

Kyle R. Burton <krburton@cpan.org> <mortis@neverlight.com>

=cut
