Amu-chan/Source/amu.pl

From SDA Knowledge Base

< Amu-chan‎ | Source
Revision as of 00:37, 12 October 2008 by Njahnke (Talk | contribs)

(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)
Jump to: navigation, search
#!/usr/bin/perl
#nathan jahnke <njahnke@gmail.com>

### ATASHI NO KOKORO, UNLOCK! ###

# Amu is distributed under the terms of the Gnu General Public License 3.0:
# http://www.gnu.org/licenses/gpl-3.0.txt

#TODO
##delimiter is whatever the first character in a new queue is
##alternative jobs based on os
##encryption?

use warnings;
use strict 'subs';
package amu;

use IO::Socket;
use Sys::Hostname;
use Sys::HostIP;
use threads;
use threads::shared;
#use Thread::Queue;
use List::Util qw(shuffle);
use Cwd qw(abs_path);

$myversion = 44;
$VERBOSE = 1;
$amudelim = ";"; #this currently delimits only jobs and their indices (keys as a hash) in the queue
$port = 56677;
$myip = Sys::HostIP->ip;
$mypath = abs_path($0);
my %amustatus :shared = ();

open(AMUPL,$mypath);
@amupl = <AMUPL>;
close(AMUPL);

@remote_hosts = ();
if (-e "amu_remotehosts.txt") {
	open(REMOTEHOSTS,"amu_remotehosts.txt");
	while (<REMOTEHOSTS>) {
		chomp;
		if (($_ ne $myip) && (!($_ =~ /^#/))) {
			push(@remote_hosts,$_) if $_;
		}
	}
	close(REMOTEHOSTS);
}

#intercept ctrl-c or kill from cli
$SIG{INT} = $SIG{TERM} = \&amushutdown;
$SIG{HUP} = \&amurestart;

my %amuhash :shared = (); #it needs the my for some reason!?
{ #lock and load
	lock(%amuhash);
	$amuhash{"STATUS"} = "OK";
	$amuhash{"VERSION"} = $myversion;
	$amuhash{"QUEUE"} = "";
	$amuhash{"JOB"} = "";
	$amuhash{"REMOTE_HOSTS"} = join(" ",@remote_hosts);
}
print "starting server_thread ...\n" if $VERBOSE;
$server_thread = threads->create(\&server_thread);

#status updates every 5 seconds
print "starting status_threads_infinite_loop ...\n" if $VERBOSE;
threads->create(\&status_threads_infinite_loop);

print "Amu version $myversion started successfully!\n";


while ($amuhash{"STATUS"} =~ /OK|QUEUEING|JANKEN|WORKING/) {
	#update @remote_hosts - don't want to have to lock %amuhash every time we contact people
	{
		lock(%amuhash);
		@remote_hosts = split(" ",$amuhash{"REMOTE_HOSTS"});
	}

	if ($amuhash{"STATUS"} =~ /QUEUEING/) {
		print "entering status mode block QUEUEING\n" if $VERBOSE;
		if ($amuhash{"QUEUE"} ne '') {
			#distribute the queue to all known other amus ... a lot will get it and ignore because they're already in QUEUEING mode but this will help ensure everyone gets it - BEFORE we claim a job
			threads->create(\&status_threads, "DO ".$amuhash{"QUEUE"}, @remote_hosts)->join();
			my $jobnum;
			if ($amuhash{"QUEUE"} ne '') { #check again to make sure our queue hasn't been emptied since we sent the queue to others
				{
					print "about to lock the hash...\n" if $VERBOSE;
					lock(%amuhash);
					print "locked the hash\n" if $VERBOSE;
					my %jobs = split($amudelim,$amuhash{"QUEUE"});
					#pick random queue item
					my @joblist = (%jobs);
					$jobnum = int(rand(keys %jobs));
					$jobnum = $joblist[$jobnum*2];
					#claim it
					print "claiming job $jobnum\n" if $VERBOSE;
					$amuhash{"JOB"} = $jobnum;
					#and get ready to fight for it
					$amuhash{"STATUS"} = "JANKEN";
				}
				#sleep 5;
				%objections = &status_threads("CLAIM ".$jobnum, @remote_hosts);
			}
		}
		if ($amuhash{"STATUS"} ne "JANKEN") { #no more jobs for us to do
			print "no more jobs for us to do\n" if $VERBOSE;
			lock(%amuhash);
			$amuhash{"STATUS"} = "OK";
		}
	}
	if ($amuhash{"STATUS"} =~ /JANKEN/) {
		print "entering status mode block JANKEN\n" if $VERBOSE;
		if (%objections) {
			foreach $evilhost (keys %objections) {
				last if $amuhash{"STATUS"} eq "QUEUEING"; #i've already lost
				#generate my random number
				$random = rand();
				$random =~ s/^0.//;
				{
					lock(%amuhash);
					$amuhash{"JANKEN"} = $random;
				}
				#broadcast
				print "asking for janken - job $amuhash{JOB}\n" if $VERBOSE;
				my @janken = &status_threads("JANKEN", $evilhost);
				if (@janken) { #did anyone play with me?
					if ($janken[1] > $random) { #i lost
						print "i lost at janken\n" if $VERBOSE;
						lock(%amuhash);
						my %jobs = split($amudelim,$amuhash{"QUEUE"});
						if (exists($jobs{$amuhash{"JOB"}})) {
							#give this job up
							delete($jobs{$amuhash{"JOB"}});
							$amuhash{"QUEUE"} = join($amudelim,%jobs);
						}
						print "trying to get another job; changing status back to QUEUEING ...\n" if $VERBOSE;
						$amuhash{"STATUS"} = "QUEUEING"; #try to get another job
					} elsif ($janken[1] == $random) { #a tie
						print "a tie at janken!?\n" if $VERBOSE;
						redo;
					}
				} else {
					print "no responses for janken\n" if $VERBOSE;
				}
			}
			{
				lock(%amuhash);
				undef $amuhash{"JANKEN"};
			}
		}
		{
			lock(%amuhash);
			$amuhash{"STATUS"} = "WORKING" if $amuhash{"STATUS"} ne "QUEUEING";
		}
	}
	if ($amuhash{"STATUS"} =~ /WORKING/) {
		print "entering status mode block WORKING\n" if $VERBOSE;
		my ($jobnum, %jobs);
		{
			lock(%amuhash);
			%jobs = split($amudelim,$amuhash{"QUEUE"});
			$jobnum = $amuhash{"JOB"};
		}
		#run the command
		print "executing $jobs{$jobnum} ...\n" if $VERBOSE;
		system($jobs{$jobnum});
		#all dun
		{
			lock(%amuhash);
			my %jobs = split($amudelim,$amuhash{"QUEUE"});
			delete($jobs{$amuhash{"JOB"}});
			$amuhash{"QUEUE"} = join($amudelim,%jobs);
			$amuhash{"JOB"} = "";
			$amuhash{"STATUS"} = "QUEUEING";
		}
	}
	sleep 1;
}
sleep 15;
exec($mypath) if ($amuhash{"STATUS"} =~ /RESTART/);
exit 0 if ($amuhash{"STATUS"} =~ /SHUTDOWN/);


sub version_check {
	print "Checking the network for a newer version of Amu ...\n" if $VERBOSE;
	{
		lock(%amustatus);
		@knowngoodamus = (keys %amustatus);
	}
	%hostversions = &status_threads("VERSION", @knowngoodamus);
	foreach $amuversion (sort { $a < $b } keys %hostversions) {
		print "$amuversion : $hostversions{$amuversion}\n" if $VERBOSE;
		if ($amuversion > $myversion) {
			&status_threads("AMUPL", $hostversions{$amuversion});
			last;
		}
	}
}

sub status_threads_infinite_loop {
	while ($amuhash{"STATUS"} =~ /OK|QUEUEING|JANKEN|WORKING/) {
		if ($amuhash{"STATUS"} =~ /OK/) {
			{
				lock(%amuhash);
				@remote_hosts = split(" ",$amuhash{"REMOTE_HOSTS"});
			}
			@remote_hosts = shuffle(@remote_hosts);
			my ($status_thread) = threads->create(\&status_threads, "STATUS", @remote_hosts);
			my %foundamus = $status_thread->join();
			{
				lock(%amustatus);
				%amustatus = %foundamus;
			}
			&version_check;
		}
		sleep 5;
	}
	print "status_threads_infinite_loop shutting down\n" if $VERBOSE;
	return;
}

sub status_threads {
	my $command = shift;
	my %threads;
	my %foundamus; #temporary hash to avoid locking the shared one while we build up responses
	foreach $remote_host (@_) {
		($threads{$remote_host}) = threads->create(\&hostname_contact, $command, $remote_host);
	}
	foreach $remote_host (keys %threads) {
		@remote_response = $threads{$remote_host}->join();
		if (defined $remote_response[2]) { #not an error
			if ($remote_response[1] =~ /AMUPL/) {
				shift(@remote_response); #kill the remote host
				shift(@remote_response); #kill the command
				for (@remote_response) { #make sure this is the entire amu.pl
					$amuplgood = 1 if m/^__END__/;
				}
				if ($amuplgood) {
					open(NEWAMUPL,">$mypath");
					for (@remote_response) { #and print the new script
						print NEWAMUPL;
					}
					close(NEWAMUPL);
					print "Upgraded to a new version of Amu! Restarting Amu in 15 seconds ...\n";
					{
						lock(%amuhash);
						$amuhash{"STATUS"} = "RESTART";
					}
				}
			} elsif ($remote_response[1] =~ /VERSION/) {
				#backwards so that the version is the key and the host is the value: we want only one host - the first to reply - per version
				if (!(exists($foundamus{$remote_response[2]}))) {
					$foundamus{$remote_response[2]} = $remote_response[0];
				}
			} else {
				$foundamus{$remote_response[0]} = $remote_response[2];
			}
		} else {
			#handle errors
		}
	}
	return %foundamus;
}

sub hostname_contact {
	my $command = shift;
	my $remote_host = shift;
	my @retval = ($remote_host);
	print "trying $remote_host ...\n" if $VERBOSE;
	$| = 1 if $VERBOSE; #flush output buffer
	if (my $socket = IO::Socket::INET->new(PeerAddr => $remote_host,
					PeerPort => $port,
					Proto    => "tcp",
					Type     => SOCK_STREAM,
					Timeout  => 5 )) {
		print $socket "AMU $command\n";
		my @answer = <$socket>;
		print "contact with $remote_host\n" if $VERBOSE;
		my $firstline = $answer[0];
		if ($firstline) {
			chomp($firstline);
			@remote_host_status = split(" ",$firstline);
			$SYNTAX_ERROR_BAD_ARGUMENT = "SYNTAX_ERROR_BAD_ARGUMENT";
			if (($remote_host_status[0] =~ /AMU/) && (defined $remote_host_status[1]) && (defined $remote_host_status[2])) {
				if ($remote_host_status[1] =~ /STATUS/) {
					if ($remote_host_status[2] =~ /OK|QUEUEING|JANKEN|WORKING/) {
						push(@retval, $remote_host_status[1], $remote_host_status[2]);
					} else {
						push(@retval, $SYNTAX_ERROR_BAD_ARGUMENT);
					}
				} elsif ($remote_host_status[1] =~ /VERSION|CLAIM|JANKEN/) {
					if (defined $remote_host_status[2]) {
						push(@retval, $remote_host_status[1], join(" ",@remote_host_status[2..$#remote_host_status]));
					} else {
						push(@retval, $SYNTAX_ERROR_BAD_ARGUMENT);
					}
				} elsif ($remote_host_status[1] =~ /AMUPL/) {
					if ($remote_host_status[2] =~ m,#!/usr/bin/perl,) {
						push(@retval, $remote_host_status[1], $remote_host_status[2]."\n");
						shift(@answer); #don't need the first line again
						push(@retval, @answer);
					} else {
						push(@retval, $SYNTAX_ERROR_BAD_ARGUMENT);
					}
				} else {
					push(@retval, "SYNTAX_ERROR_BAD_COMMAND");
				}
			} else {
				push(@retval, "NOT_AMU");
			}
		}
		close($socket);
	} else {
		print "\n" if $VERBOSE;
		push(@retval, "CONNECTION_REFUSED_OR_TIMED_OUT");
	}
	return @retval;
}

sub server_thread {
	$server = IO::Socket::INET->new(LocalPort => $port,
					Type      => SOCK_STREAM,
					Reuse     => 1,
					Listen    => 10)
		or die "Couldn't be a tcp server on port $port : $@\n";
		
	while ($amuhash{"STATUS"} =~ /OK|QUEUEING|JANKEN|WORKING/) {
		$client = $server->accept();
		recv($client, $question, 1024, 0);
		chomp($question);
		print "$question\n" if $VERBOSE;
		@remote_question = split(" ",$question);
		if ($remote_question[0] =~ /AMU/) {
			lock(%amuhash);
			if (defined $amuhash{$remote_question[1]}) {
				print $client "AMU $remote_question[1] $amuhash{$remote_question[1]}\n";
			} elsif ($remote_question[1] =~ /AMUPL/) {
				print $client "AMU $remote_question[1] ";
				for (@amupl) {
					print $client $_;
				}
				print $client "\n"; #just in case there's no carriage return at the end of amu.pl
			} elsif (($remote_question[1] =~ /HOSTS/) && (defined $remote_question[2])) {
				print "received new host list\n" if $VERBOSE;
				my %newhosts = split(";",$remote_question[2]);
				$newhosts{$myip} = 1; #sign off that i have seen this list
				my @ignoranthosts;
				@remote_hosts = ();
				foreach $host (keys %newhosts) {
					push(@remote_hosts,$host) if $host ne $myip;
					if (!($newhosts{$host})) { #these haven't heard about the new host list yet
						print "$host hasn't yet heard about the new host list\n" if $VERBOSE;
						push(@ignoranthosts,$host);
					}
				}
				$amuhash{"REMOTE_HOSTS"} = join(" ",@remote_hosts);
				&status_threads("HOSTS ".join(";",%newhosts), @ignoranthosts);
			} elsif (($remote_question[1] =~ /DO/) && ($amuhash{"STATUS"} eq "OK") && (defined $remote_question[2])) {
				my $remote_jobs = join(" ",@remote_question[2..$#remote_question]);
				print "new job queue '$remote_jobs' has arrived\n" if $VERBOSE;
				#new job queue has arrived
				$amuhash{"QUEUE"} = $remote_jobs;
				$amuhash{"STATUS"} = "QUEUEING";
			} elsif (($remote_question[1] =~ /CLAIM/) && ($amuhash{"QUEUE"} ne '') && (defined $remote_question[2])) {
				my $remote_job = join(" ",@remote_question[2..$#remote_question]);
				if ($amuhash{"JOB"} eq $remote_job) {
					#no! that's my job!
					print "objecting to job $remote_job\n" if $VERBOSE;
					print $client "AMU ".join(" ",@remote_question[1..$#remote_question])."\n";
				} else {
					my %jobs = split($amudelim,$amuhash{"QUEUE"});
					if (exists($jobs{$remote_job})) {
						#since i have no objections about others claiming this job...
						print "deleting job $remote_job\n" if $VERBOSE;
						delete($jobs{$remote_job});
						$amuhash{"QUEUE"} = join($amudelim,%jobs);
					}
				}
			} else {
				#cool and spicy
			}
		}
		close($client);
	}
	close($server);
	print "server_thread shutting down\n" if $VERBOSE;
	return;
}

sub amurestart {
	{
		lock(%amuhash);
		$amuhash{"STATUS"} = "RESTART";
	}
	&hostname_contact("SHUTDOWN",$myip); #to knock the server thread out of its infinite loop
}

sub amushutdown {
	$| = 1;
	print "Shutting down in about 15 seconds ...";
	{
		lock(%amuhash);
		$amuhash{"STATUS"} = "SHUTDOWN";
	}
	&hostname_contact("SHUTDOWN",$myip); #to knock the server thread out of its infinite loop
}

__END__
Personal tools