#!/usr/bin/env perl package _PARALELL_JOBS; use strict; use warnings; use IPC::Open3; use Time::HiRes qw(usleep time); use Fcntl; use Carp; use Getopt::Long; use Log::Log4perl; use IO::Handle; my $Log = undef; $|++; use POSIX qw(:sys_wait_h); sub _processes { @_ > 1 ? $_[0]->{_processes}=$_[1] : $_[0]->{_processes}} sub _commandsToProcess { @_ > 1 ? $_[0]->{_commandsToProcess}=$_[1] : $_[0]->{_commandsToProcess}} sub verbose { @_ > 1 ? $_[0]->{_verbose}=$_[1] : $_[0]->{_verbose}} sub norun { @_ > 1 ? $_[0]->{_norun}=$_[1] : $_[0]->{_norun}} sub cmdfile { @_ > 1 ? $_[0]->{_cmdfile}=$_[1] : $_[0]->{_cmdfile}} sub maxjobs { @_ > 1 ? $_[0]->{_maxjobs}=$_[1] : $_[0]->{_maxjobs}} sub quiet { @_ > 1 ? $_[0]->{_quiet}=$_[1] : $_[0]->{_quiet}} sub jobnum { @_ > 1 ? $_[0]->{_jobnum}=$_[1] : $_[0]->{_jobnum}} sub sleep { @_ > 1 ? $_[0]->{_sleep}=$_[1] : $_[0]->{_sleep}} sub autoCmdLineArgs { return ( qw( verbose norun cmdfile=s maxjobs=s sleep=s quiet ) ); } sub main { my($pkg) = @_; Log::Log4perl->init( $ENV{LOG4PERL_CONFIG} ) if $ENV{LOG4PERL_CONFIG}; $Log = Log::Log4perl->get_logger( __PACKAGE__ ); my $app = bless {}, $pkg; my %opts; foreach my $arg ($app->autoCmdLineArgs) { my $m = $arg; $m = substr($m,0,index($m,'=')) if -1 != index($m,'='); $opts{$arg} = \$app->{"_${m}"}; } unless (GetOptions(%opts)) { confess "Error parsing command line arguments!\n"; } if ($app->run) { print STDERR "EXIT SUCCESS\n"; exit 0; } print STDERR "EXIT ERROR\n"; exit 1; } sub run { my($self) = @_; $self->verbose($self->quiet ? 0 : 1); $self->sleep(1) unless $self->sleep; $self->jobnum(0); $self->_processes({}); $self->maxjobs(2) unless $self->maxjobs; unless ( $self->quiet ) { print STDERR "Argument Summary:\n"; foreach my $arg ($self->autoCmdLineArgs) { my $m = $arg; $m = substr($m,0,index($m,'=')) if -1 != index($m,'='); printf STDERR "%15s: %s\n", $m, $self->$m||''; } } $self->loadCommandFile; my($jobsRun,$errors) = $self->runCommandFile; print STDERR "FINISH: Out of $jobsRun jobs run, $errors had error exit codes\n"; return 0 == $errors; } sub runCommandFile { my($self) = @_; my $totalErrors = 0; my $totalReaped = 0; while ($self->jobsLeftToProcess || $self->numJobsRunning ) { my $status = 0; my ($reaped,$errors) = $self->reapCompletedJobs; if ($reaped) { $status++; $totalReaped += $reaped; $totalErrors += $errors; } print STDERR "STATUS: ",scalar($self->jobsLeftToProcess), " jobs left to run, ", $self->numJobsRunning, " are running out of ", $self->maxjobs," max jobs\n"; while ( $self->jobsLeftToProcess && $self->numJobsRunning < $self->maxjobs ) { $self->spawnNextJob; ++$status; } next if $status; # we either started or reaped a job $Log->info("Waiting for background processes...") if $Log->is_info; CORE::sleep($self->sleep); } return ($totalReaped,$totalErrors); } sub numJobsRunning { my($self) = @_; return scalar keys %{ $self->_processes }; } sub jobsLeftToProcess { my($self) = @_; return scalar @{ $self->_commandsToProcess }; } sub loadCommandFile { my($self) = @_; my $fh; confess "Error, you must pass a command file with --cmdfile=file" unless $self->cmdfile; confess "Error, can't read command file: ".$self->cmdfile unless -r $self->cmdfile; unless (open $fh, "<", $self->cmdfile) { confess "Error opening file: ",$self->cmdfile," : $!"; } my @cmds = <$fh>; close $fh; chomp @cmds; $self->_commandsToProcess(\@cmds); } sub reapCompletedJobs { my($self) = @_; my($reaped,$errors) = (0,0); print STDERR "checking job statuses ",scalar(keys %{$self->_processes})," are running...\n" if $self->verbose; foreach my $childPid (keys %{ $self->_processes } ) { my $child = $self->_processes->{$childPid}; my $pid = waitpid( $child->{PID}, WNOHANG ); $Log->debug( " polled $child->{PID} => $pid\n" ) if $Log->is_debug; if ( -1 == $pid ) { print STDERR "ERROR on waitpid($child->{PID}), invalid pid? : $!\n"; $child->{FINISHED} = 1; print STDERR $child->{ERROR} if $child->{ERROR}; ++$reaped; ++$errors; next; } $self->_readChildHandles($child); if ( $pid == $child->{PID} ) { ++$reaped; $child->{FINISHED} = 1; print STDERR $child->{ERROR} if $child->{ERROR}; $child->{EXIT_STATUS} = ($? >> 8); if ( $self->verbose ) { print "child($child->{CMD})==>\n",$child->{BUFF},"\n<==child($child->{CMD})\n"; print "child($child->{CMD})==>\n",$child->{ERR_BUFF},"\n<==child($child->{CMD})\n" if $child->{ERR_BUFF}; } else { print $child->{BUFF}; print $child->{ERR_BUFF} if $child->{ERR_BUFF}; } } } foreach my $pid ( keys %{$self->_processes} ) { my $child = $self->_processes->{$pid}; if ( $child->{FINISHED} ) { $Log->debug("Removing completed process: $pid") if $Log->is_debug; delete $self->_processes->{$pid}; if ( $child->{EXIT_STATUS} ) { ++$errors; print STDERR "CHILD: $child->{CMD} exited with $child->{EXIT_STATUS}\n"; } } } return($reaped,$errors); } sub _readChildHandles { my($self,$child) = @_; my $nr = undef; $Log->debug( " reading input from $child->{CMD}\n" ) if $Log->is_debug; while( $nr = sysread($child->{READ_FH},$child->{BUFF},1024,length($child->{BUFF})) ) { $Log->debug( " reading input from $child->{CMD} => '$child->{BUFF}'\n" ) if $Log->is_debug; } while( $nr = sysread($child->{ERROR_FH},$child->{ERR_BUFF},1024,length($child->{ERR_BUFF})) ) { $Log->debug( " reading input from $child->{CMD} => '$child->{ERR_BUFF}'\n" ) if $Log->is_debug; print STDERR $child->{CMD}," =>",$child->{ERR_BUFF},"\n"; } return $nr; } sub spawnNextJob { my($self) = @_; my $jobnum = $self->jobnum($self->jobnum+1); my $cmd = shift @{ $self->_commandsToProcess }; return undef unless $cmd; print STDERR "RUN[$jobnum]: $cmd\n"; return if $self->norun; my($in,$out,$rerr) = ('',''); my $pid; eval { print STDERR "spawning new job: $cmd\n" if $self->verbose; $Log->debug("execing: ($cmd)") if $Log->is_debug; my $werr; pipe $rerr,$werr; close $werr; unless ($rerr) { print STDERR " *** error creating pipe for error read handle! : $!\n"; exit; } $pid = open3($in,$out,$rerr,$cmd); unless( $pid ) { print STDERR " *** exec failure in child: $!\n"; exit; } }; if ($@) { die $@; } my $child_info = { BUFF => '', ERR_BUFF => '', CMD => $cmd, PID => $pid, ERROR => undef, WRITE_FH => $in, READ_FH => $out, ERROR_FH => $rerr, FINISHED => undef, EXIT_STATUS => undef, }; $self->_processes->{$pid} = $child_info; if($@) { $child_info->{ERROR} = "Error calling open3('$cmd'): $@"; $child_info->{FINISHED} = 1; } foreach my $fh ($in,$out,$rerr) { next unless $fh; unless( $fh->fcntl( F_SETFL, O_NONBLOCK )) { confess "Error, unable to set nonblock on file handle: $!\n"; } } return 1; } 1; package main; _PARALELL_JOBS->main; __DATA__ =head1 NAME parllel-jobs =head1 SNYOPSIS parllel-jobs --cmdfile=file [--maxjobs=] [--quiet] [--sleep=] [--norun] [--verbose] =head1 DESCRIPTION This is a simple shell utility that allows you to run several other processes in parallel. =head1 COMMAND LINE OPTIONS =over 4 =item --verbose Be extra verbose. This will print out more information about the currently running processes. =item --norun Don't actualy execute the commands - just do a dry run. =item --cmdfile=s Specify the command file. =item --maxjobs=s Specify the maximum number of parallel jobs to run at one time. =item --sleep=s Set the sleep time (default is 1 second) to wait between polling for process status changes. =item --quiet Be quiet. This supresses both messages that --verbose would print as well as other messages. =back =head1 COMMAND FILE FORMAT Command files should be one command per line in the file. Each line is just a single string that is executed the shell. Example: # specify several downloads to be run in parallel wget http://some.host/software.tar.gz wget http://some.host/database.mdb wget http://some.host/movie-trailer.mpeg wget http://some.host/linux-distrubtion.iso # run several remove ssh commands (you'd # really want to set up ssh-agent before # trying this though) ssh user@host process_job 1 ssh user@host process_job 2 ssh user@host process_job 3 ssh user@host process_job 4 ssh user@host process_job 5 ssh user@host process_job 6 =head1 KNOWN BUGS Processes that never exit will cause parallel-jobs to run forever :) There are no facilities for adding more jobs to the queue while the program is running. =head1 AUTHORS Kyle R. Burton =cut