Difference between revisions of "Amu-chan/Source/amu.pl"
From SDA Knowledge Base
(New page: <pre><nowiki> #!/usr/bin/perl #nathan jahnke <njahnke@gmail.com> ### ATASHI NO KOKORO, UNLOCK! ### # Amu is distributed under the terms of the Gnu General Public License 3.0: # http://ww...) |
(No difference)
|
Latest revision as of 23:37, 11 October 2008
#!/usr/bin/perl #nathan jahnke <njahnke@gmail.com> ### ATASHI NO KOKORO, UNLOCK! ### # Amu is distributed under the terms of the Gnu General Public License 3.0: # http://www.gnu.org/licenses/gpl-3.0.txt #TODO ##delimiter is whatever the first character in a new queue is ##alternative jobs based on os ##encryption? use warnings; use strict 'subs'; package amu; use IO::Socket; use Sys::Hostname; use Sys::HostIP; use threads; use threads::shared; #use Thread::Queue; use List::Util qw(shuffle); use Cwd qw(abs_path); $myversion = 44; $VERBOSE = 1; $amudelim = ";"; #this currently delimits only jobs and their indices (keys as a hash) in the queue $port = 56677; $myip = Sys::HostIP->ip; $mypath = abs_path($0); my %amustatus :shared = (); open(AMUPL,$mypath); @amupl = <AMUPL>; close(AMUPL); @remote_hosts = (); if (-e "amu_remotehosts.txt") { open(REMOTEHOSTS,"amu_remotehosts.txt"); while (<REMOTEHOSTS>) { chomp; if (($_ ne $myip) && (!($_ =~ /^#/))) { push(@remote_hosts,$_) if $_; } } close(REMOTEHOSTS); } #intercept ctrl-c or kill from cli $SIG{INT} = $SIG{TERM} = \&amushutdown; $SIG{HUP} = \&amurestart; my %amuhash :shared = (); #it needs the my for some reason!? { #lock and load lock(%amuhash); $amuhash{"STATUS"} = "OK"; $amuhash{"VERSION"} = $myversion; $amuhash{"QUEUE"} = ""; $amuhash{"JOB"} = ""; $amuhash{"REMOTE_HOSTS"} = join(" ",@remote_hosts); } print "starting server_thread ...\n" if $VERBOSE; $server_thread = threads->create(\&server_thread); #status updates every 5 seconds print "starting status_threads_infinite_loop ...\n" if $VERBOSE; threads->create(\&status_threads_infinite_loop); print "Amu version $myversion started successfully!\n"; while ($amuhash{"STATUS"} =~ /OK|QUEUEING|JANKEN|WORKING/) { #update @remote_hosts - don't want to have to lock %amuhash every time we contact people { lock(%amuhash); @remote_hosts = split(" ",$amuhash{"REMOTE_HOSTS"}); } if ($amuhash{"STATUS"} =~ /QUEUEING/) { print "entering status mode block QUEUEING\n" if $VERBOSE; if ($amuhash{"QUEUE"} ne '') { #distribute the queue to all known other amus ... a lot will get it and ignore because they're already in QUEUEING mode but this will help ensure everyone gets it - BEFORE we claim a job threads->create(\&status_threads, "DO ".$amuhash{"QUEUE"}, @remote_hosts)->join(); my $jobnum; if ($amuhash{"QUEUE"} ne '') { #check again to make sure our queue hasn't been emptied since we sent the queue to others { print "about to lock the hash...\n" if $VERBOSE; lock(%amuhash); print "locked the hash\n" if $VERBOSE; my %jobs = split($amudelim,$amuhash{"QUEUE"}); #pick random queue item my @joblist = (%jobs); $jobnum = int(rand(keys %jobs)); $jobnum = $joblist[$jobnum*2]; #claim it print "claiming job $jobnum\n" if $VERBOSE; $amuhash{"JOB"} = $jobnum; #and get ready to fight for it $amuhash{"STATUS"} = "JANKEN"; } #sleep 5; %objections = &status_threads("CLAIM ".$jobnum, @remote_hosts); } } if ($amuhash{"STATUS"} ne "JANKEN") { #no more jobs for us to do print "no more jobs for us to do\n" if $VERBOSE; lock(%amuhash); $amuhash{"STATUS"} = "OK"; } } if ($amuhash{"STATUS"} =~ /JANKEN/) { print "entering status mode block JANKEN\n" if $VERBOSE; if (%objections) { foreach $evilhost (keys %objections) { last if $amuhash{"STATUS"} eq "QUEUEING"; #i've already lost #generate my random number $random = rand(); $random =~ s/^0.//; { lock(%amuhash); $amuhash{"JANKEN"} = $random; } #broadcast print "asking for janken - job $amuhash{JOB}\n" if $VERBOSE; my @janken = &status_threads("JANKEN", $evilhost); if (@janken) { #did anyone play with me? if ($janken[1] > $random) { #i lost print "i lost at janken\n" if $VERBOSE; lock(%amuhash); my %jobs = split($amudelim,$amuhash{"QUEUE"}); if (exists($jobs{$amuhash{"JOB"}})) { #give this job up delete($jobs{$amuhash{"JOB"}}); $amuhash{"QUEUE"} = join($amudelim,%jobs); } print "trying to get another job; changing status back to QUEUEING ...\n" if $VERBOSE; $amuhash{"STATUS"} = "QUEUEING"; #try to get another job } elsif ($janken[1] == $random) { #a tie print "a tie at janken!?\n" if $VERBOSE; redo; } } else { print "no responses for janken\n" if $VERBOSE; } } { lock(%amuhash); undef $amuhash{"JANKEN"}; } } { lock(%amuhash); $amuhash{"STATUS"} = "WORKING" if $amuhash{"STATUS"} ne "QUEUEING"; } } if ($amuhash{"STATUS"} =~ /WORKING/) { print "entering status mode block WORKING\n" if $VERBOSE; my ($jobnum, %jobs); { lock(%amuhash); %jobs = split($amudelim,$amuhash{"QUEUE"}); $jobnum = $amuhash{"JOB"}; } #run the command print "executing $jobs{$jobnum} ...\n" if $VERBOSE; system($jobs{$jobnum}); #all dun { lock(%amuhash); my %jobs = split($amudelim,$amuhash{"QUEUE"}); delete($jobs{$amuhash{"JOB"}}); $amuhash{"QUEUE"} = join($amudelim,%jobs); $amuhash{"JOB"} = ""; $amuhash{"STATUS"} = "QUEUEING"; } } sleep 1; } sleep 15; exec($mypath) if ($amuhash{"STATUS"} =~ /RESTART/); exit 0 if ($amuhash{"STATUS"} =~ /SHUTDOWN/); sub version_check { print "Checking the network for a newer version of Amu ...\n" if $VERBOSE; { lock(%amustatus); @knowngoodamus = (keys %amustatus); } %hostversions = &status_threads("VERSION", @knowngoodamus); foreach $amuversion (sort { $a < $b } keys %hostversions) { print "$amuversion : $hostversions{$amuversion}\n" if $VERBOSE; if ($amuversion > $myversion) { &status_threads("AMUPL", $hostversions{$amuversion}); last; } } } sub status_threads_infinite_loop { while ($amuhash{"STATUS"} =~ /OK|QUEUEING|JANKEN|WORKING/) { if ($amuhash{"STATUS"} =~ /OK/) { { lock(%amuhash); @remote_hosts = split(" ",$amuhash{"REMOTE_HOSTS"}); } @remote_hosts = shuffle(@remote_hosts); my ($status_thread) = threads->create(\&status_threads, "STATUS", @remote_hosts); my %foundamus = $status_thread->join(); { lock(%amustatus); %amustatus = %foundamus; } &version_check; } sleep 5; } print "status_threads_infinite_loop shutting down\n" if $VERBOSE; return; } sub status_threads { my $command = shift; my %threads; my %foundamus; #temporary hash to avoid locking the shared one while we build up responses foreach $remote_host (@_) { ($threads{$remote_host}) = threads->create(\&hostname_contact, $command, $remote_host); } foreach $remote_host (keys %threads) { @remote_response = $threads{$remote_host}->join(); if (defined $remote_response[2]) { #not an error if ($remote_response[1] =~ /AMUPL/) { shift(@remote_response); #kill the remote host shift(@remote_response); #kill the command for (@remote_response) { #make sure this is the entire amu.pl $amuplgood = 1 if m/^__END__/; } if ($amuplgood) { open(NEWAMUPL,">$mypath"); for (@remote_response) { #and print the new script print NEWAMUPL; } close(NEWAMUPL); print "Upgraded to a new version of Amu! Restarting Amu in 15 seconds ...\n"; { lock(%amuhash); $amuhash{"STATUS"} = "RESTART"; } } } elsif ($remote_response[1] =~ /VERSION/) { #backwards so that the version is the key and the host is the value: we want only one host - the first to reply - per version if (!(exists($foundamus{$remote_response[2]}))) { $foundamus{$remote_response[2]} = $remote_response[0]; } } else { $foundamus{$remote_response[0]} = $remote_response[2]; } } else { #handle errors } } return %foundamus; } sub hostname_contact { my $command = shift; my $remote_host = shift; my @retval = ($remote_host); print "trying $remote_host ...\n" if $VERBOSE; $| = 1 if $VERBOSE; #flush output buffer if (my $socket = IO::Socket::INET->new(PeerAddr => $remote_host, PeerPort => $port, Proto => "tcp", Type => SOCK_STREAM, Timeout => 5 )) { print $socket "AMU $command\n"; my @answer = <$socket>; print "contact with $remote_host\n" if $VERBOSE; my $firstline = $answer[0]; if ($firstline) { chomp($firstline); @remote_host_status = split(" ",$firstline); $SYNTAX_ERROR_BAD_ARGUMENT = "SYNTAX_ERROR_BAD_ARGUMENT"; if (($remote_host_status[0] =~ /AMU/) && (defined $remote_host_status[1]) && (defined $remote_host_status[2])) { if ($remote_host_status[1] =~ /STATUS/) { if ($remote_host_status[2] =~ /OK|QUEUEING|JANKEN|WORKING/) { push(@retval, $remote_host_status[1], $remote_host_status[2]); } else { push(@retval, $SYNTAX_ERROR_BAD_ARGUMENT); } } elsif ($remote_host_status[1] =~ /VERSION|CLAIM|JANKEN/) { if (defined $remote_host_status[2]) { push(@retval, $remote_host_status[1], join(" ",@remote_host_status[2..$#remote_host_status])); } else { push(@retval, $SYNTAX_ERROR_BAD_ARGUMENT); } } elsif ($remote_host_status[1] =~ /AMUPL/) { if ($remote_host_status[2] =~ m,#!/usr/bin/perl,) { push(@retval, $remote_host_status[1], $remote_host_status[2]."\n"); shift(@answer); #don't need the first line again push(@retval, @answer); } else { push(@retval, $SYNTAX_ERROR_BAD_ARGUMENT); } } else { push(@retval, "SYNTAX_ERROR_BAD_COMMAND"); } } else { push(@retval, "NOT_AMU"); } } close($socket); } else { print "\n" if $VERBOSE; push(@retval, "CONNECTION_REFUSED_OR_TIMED_OUT"); } return @retval; } sub server_thread { $server = IO::Socket::INET->new(LocalPort => $port, Type => SOCK_STREAM, Reuse => 1, Listen => 10) or die "Couldn't be a tcp server on port $port : $@\n"; while ($amuhash{"STATUS"} =~ /OK|QUEUEING|JANKEN|WORKING/) { $client = $server->accept(); recv($client, $question, 1024, 0); chomp($question); print "$question\n" if $VERBOSE; @remote_question = split(" ",$question); if ($remote_question[0] =~ /AMU/) { lock(%amuhash); if (defined $amuhash{$remote_question[1]}) { print $client "AMU $remote_question[1] $amuhash{$remote_question[1]}\n"; } elsif ($remote_question[1] =~ /AMUPL/) { print $client "AMU $remote_question[1] "; for (@amupl) { print $client $_; } print $client "\n"; #just in case there's no carriage return at the end of amu.pl } elsif (($remote_question[1] =~ /HOSTS/) && (defined $remote_question[2])) { print "received new host list\n" if $VERBOSE; my %newhosts = split(";",$remote_question[2]); $newhosts{$myip} = 1; #sign off that i have seen this list my @ignoranthosts; @remote_hosts = (); foreach $host (keys %newhosts) { push(@remote_hosts,$host) if $host ne $myip; if (!($newhosts{$host})) { #these haven't heard about the new host list yet print "$host hasn't yet heard about the new host list\n" if $VERBOSE; push(@ignoranthosts,$host); } } $amuhash{"REMOTE_HOSTS"} = join(" ",@remote_hosts); &status_threads("HOSTS ".join(";",%newhosts), @ignoranthosts); } elsif (($remote_question[1] =~ /DO/) && ($amuhash{"STATUS"} eq "OK") && (defined $remote_question[2])) { my $remote_jobs = join(" ",@remote_question[2..$#remote_question]); print "new job queue '$remote_jobs' has arrived\n" if $VERBOSE; #new job queue has arrived $amuhash{"QUEUE"} = $remote_jobs; $amuhash{"STATUS"} = "QUEUEING"; } elsif (($remote_question[1] =~ /CLAIM/) && ($amuhash{"QUEUE"} ne '') && (defined $remote_question[2])) { my $remote_job = join(" ",@remote_question[2..$#remote_question]); if ($amuhash{"JOB"} eq $remote_job) { #no! that's my job! print "objecting to job $remote_job\n" if $VERBOSE; print $client "AMU ".join(" ",@remote_question[1..$#remote_question])."\n"; } else { my %jobs = split($amudelim,$amuhash{"QUEUE"}); if (exists($jobs{$remote_job})) { #since i have no objections about others claiming this job... print "deleting job $remote_job\n" if $VERBOSE; delete($jobs{$remote_job}); $amuhash{"QUEUE"} = join($amudelim,%jobs); } } } else { #cool and spicy } } close($client); } close($server); print "server_thread shutting down\n" if $VERBOSE; return; } sub amurestart { { lock(%amuhash); $amuhash{"STATUS"} = "RESTART"; } &hostname_contact("SHUTDOWN",$myip); #to knock the server thread out of its infinite loop } sub amushutdown { $| = 1; print "Shutting down in about 15 seconds ..."; { lock(%amuhash); $amuhash{"STATUS"} = "SHUTDOWN"; } &hostname_contact("SHUTDOWN",$myip); #to knock the server thread out of its infinite loop } __END__