< Amu-chan | Source
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)
#!/usr/bin/perl
#nathan jahnke <njahnke@gmail.com>
### ATASHI NO KOKORO, UNLOCK! ###
# Amu is distributed under the terms of the Gnu General Public License 3.0:
# http://www.gnu.org/licenses/gpl-3.0.txt
#TODO
##delimiter is whatever the first character in a new queue is
##alternative jobs based on os
##encryption?
use warnings;
use strict 'subs';
package amu;
use IO::Socket;
use Sys::Hostname;
use Sys::HostIP;
use threads;
use threads::shared;
#use Thread::Queue;
use List::Util qw(shuffle);
use Cwd qw(abs_path);
$myversion = 44;
$VERBOSE = 1;
$amudelim = ";"; #this currently delimits only jobs and their indices (keys as a hash) in the queue
$port = 56677;
$myip = Sys::HostIP->ip;
$mypath = abs_path($0);
my %amustatus :shared = ();
open(AMUPL,$mypath);
@amupl = <AMUPL>;
close(AMUPL);
@remote_hosts = ();
if (-e "amu_remotehosts.txt") {
open(REMOTEHOSTS,"amu_remotehosts.txt");
while (<REMOTEHOSTS>) {
chomp;
if (($_ ne $myip) && (!($_ =~ /^#/))) {
push(@remote_hosts,$_) if $_;
}
}
close(REMOTEHOSTS);
}
#intercept ctrl-c or kill from cli
$SIG{INT} = $SIG{TERM} = \&amushutdown;
$SIG{HUP} = \&amurestart;
my %amuhash :shared = (); #it needs the my for some reason!?
{ #lock and load
lock(%amuhash);
$amuhash{"STATUS"} = "OK";
$amuhash{"VERSION"} = $myversion;
$amuhash{"QUEUE"} = "";
$amuhash{"JOB"} = "";
$amuhash{"REMOTE_HOSTS"} = join(" ",@remote_hosts);
}
print "starting server_thread ...\n" if $VERBOSE;
$server_thread = threads->create(\&server_thread);
#status updates every 5 seconds
print "starting status_threads_infinite_loop ...\n" if $VERBOSE;
threads->create(\&status_threads_infinite_loop);
print "Amu version $myversion started successfully!\n";
while ($amuhash{"STATUS"} =~ /OK|QUEUEING|JANKEN|WORKING/) {
#update @remote_hosts - don't want to have to lock %amuhash every time we contact people
{
lock(%amuhash);
@remote_hosts = split(" ",$amuhash{"REMOTE_HOSTS"});
}
if ($amuhash{"STATUS"} =~ /QUEUEING/) {
print "entering status mode block QUEUEING\n" if $VERBOSE;
if ($amuhash{"QUEUE"} ne '') {
#distribute the queue to all known other amus ... a lot will get it and ignore because they're already in QUEUEING mode but this will help ensure everyone gets it - BEFORE we claim a job
threads->create(\&status_threads, "DO ".$amuhash{"QUEUE"}, @remote_hosts)->join();
my $jobnum;
if ($amuhash{"QUEUE"} ne '') { #check again to make sure our queue hasn't been emptied since we sent the queue to others
{
print "about to lock the hash...\n" if $VERBOSE;
lock(%amuhash);
print "locked the hash\n" if $VERBOSE;
my %jobs = split($amudelim,$amuhash{"QUEUE"});
#pick random queue item
my @joblist = (%jobs);
$jobnum = int(rand(keys %jobs));
$jobnum = $joblist[$jobnum*2];
#claim it
print "claiming job $jobnum\n" if $VERBOSE;
$amuhash{"JOB"} = $jobnum;
#and get ready to fight for it
$amuhash{"STATUS"} = "JANKEN";
}
#sleep 5;
%objections = &status_threads("CLAIM ".$jobnum, @remote_hosts);
}
}
if ($amuhash{"STATUS"} ne "JANKEN") { #no more jobs for us to do
print "no more jobs for us to do\n" if $VERBOSE;
lock(%amuhash);
$amuhash{"STATUS"} = "OK";
}
}
if ($amuhash{"STATUS"} =~ /JANKEN/) {
print "entering status mode block JANKEN\n" if $VERBOSE;
if (%objections) {
foreach $evilhost (keys %objections) {
last if $amuhash{"STATUS"} eq "QUEUEING"; #i've already lost
#generate my random number
$random = rand();
$random =~ s/^0.//;
{
lock(%amuhash);
$amuhash{"JANKEN"} = $random;
}
#broadcast
print "asking for janken - job $amuhash{JOB}\n" if $VERBOSE;
my @janken = &status_threads("JANKEN", $evilhost);
if (@janken) { #did anyone play with me?
if ($janken[1] > $random) { #i lost
print "i lost at janken\n" if $VERBOSE;
lock(%amuhash);
my %jobs = split($amudelim,$amuhash{"QUEUE"});
if (exists($jobs{$amuhash{"JOB"}})) {
#give this job up
delete($jobs{$amuhash{"JOB"}});
$amuhash{"QUEUE"} = join($amudelim,%jobs);
}
print "trying to get another job; changing status back to QUEUEING ...\n" if $VERBOSE;
$amuhash{"STATUS"} = "QUEUEING"; #try to get another job
} elsif ($janken[1] == $random) { #a tie
print "a tie at janken!?\n" if $VERBOSE;
redo;
}
} else {
print "no responses for janken\n" if $VERBOSE;
}
}
{
lock(%amuhash);
undef $amuhash{"JANKEN"};
}
}
{
lock(%amuhash);
$amuhash{"STATUS"} = "WORKING" if $amuhash{"STATUS"} ne "QUEUEING";
}
}
if ($amuhash{"STATUS"} =~ /WORKING/) {
print "entering status mode block WORKING\n" if $VERBOSE;
my ($jobnum, %jobs);
{
lock(%amuhash);
%jobs = split($amudelim,$amuhash{"QUEUE"});
$jobnum = $amuhash{"JOB"};
}
#run the command
print "executing $jobs{$jobnum} ...\n" if $VERBOSE;
system($jobs{$jobnum});
#all dun
{
lock(%amuhash);
my %jobs = split($amudelim,$amuhash{"QUEUE"});
delete($jobs{$amuhash{"JOB"}});
$amuhash{"QUEUE"} = join($amudelim,%jobs);
$amuhash{"JOB"} = "";
$amuhash{"STATUS"} = "QUEUEING";
}
}
sleep 1;
}
sleep 15;
exec($mypath) if ($amuhash{"STATUS"} =~ /RESTART/);
exit 0 if ($amuhash{"STATUS"} =~ /SHUTDOWN/);
sub version_check {
print "Checking the network for a newer version of Amu ...\n" if $VERBOSE;
{
lock(%amustatus);
@knowngoodamus = (keys %amustatus);
}
%hostversions = &status_threads("VERSION", @knowngoodamus);
foreach $amuversion (sort { $a < $b } keys %hostversions) {
print "$amuversion : $hostversions{$amuversion}\n" if $VERBOSE;
if ($amuversion > $myversion) {
&status_threads("AMUPL", $hostversions{$amuversion});
last;
}
}
}
sub status_threads_infinite_loop {
while ($amuhash{"STATUS"} =~ /OK|QUEUEING|JANKEN|WORKING/) {
if ($amuhash{"STATUS"} =~ /OK/) {
{
lock(%amuhash);
@remote_hosts = split(" ",$amuhash{"REMOTE_HOSTS"});
}
@remote_hosts = shuffle(@remote_hosts);
my ($status_thread) = threads->create(\&status_threads, "STATUS", @remote_hosts);
my %foundamus = $status_thread->join();
{
lock(%amustatus);
%amustatus = %foundamus;
}
&version_check;
}
sleep 5;
}
print "status_threads_infinite_loop shutting down\n" if $VERBOSE;
return;
}
sub status_threads {
my $command = shift;
my %threads;
my %foundamus; #temporary hash to avoid locking the shared one while we build up responses
foreach $remote_host (@_) {
($threads{$remote_host}) = threads->create(\&hostname_contact, $command, $remote_host);
}
foreach $remote_host (keys %threads) {
@remote_response = $threads{$remote_host}->join();
if (defined $remote_response[2]) { #not an error
if ($remote_response[1] =~ /AMUPL/) {
shift(@remote_response); #kill the remote host
shift(@remote_response); #kill the command
for (@remote_response) { #make sure this is the entire amu.pl
$amuplgood = 1 if m/^__END__/;
}
if ($amuplgood) {
open(NEWAMUPL,">$mypath");
for (@remote_response) { #and print the new script
print NEWAMUPL;
}
close(NEWAMUPL);
print "Upgraded to a new version of Amu! Restarting Amu in 15 seconds ...\n";
{
lock(%amuhash);
$amuhash{"STATUS"} = "RESTART";
}
}
} elsif ($remote_response[1] =~ /VERSION/) {
#backwards so that the version is the key and the host is the value: we want only one host - the first to reply - per version
if (!(exists($foundamus{$remote_response[2]}))) {
$foundamus{$remote_response[2]} = $remote_response[0];
}
} else {
$foundamus{$remote_response[0]} = $remote_response[2];
}
} else {
#handle errors
}
}
return %foundamus;
}
sub hostname_contact {
my $command = shift;
my $remote_host = shift;
my @retval = ($remote_host);
print "trying $remote_host ...\n" if $VERBOSE;
$| = 1 if $VERBOSE; #flush output buffer
if (my $socket = IO::Socket::INET->new(PeerAddr => $remote_host,
PeerPort => $port,
Proto => "tcp",
Type => SOCK_STREAM,
Timeout => 5 )) {
print $socket "AMU $command\n";
my @answer = <$socket>;
print "contact with $remote_host\n" if $VERBOSE;
my $firstline = $answer[0];
if ($firstline) {
chomp($firstline);
@remote_host_status = split(" ",$firstline);
$SYNTAX_ERROR_BAD_ARGUMENT = "SYNTAX_ERROR_BAD_ARGUMENT";
if (($remote_host_status[0] =~ /AMU/) && (defined $remote_host_status[1]) && (defined $remote_host_status[2])) {
if ($remote_host_status[1] =~ /STATUS/) {
if ($remote_host_status[2] =~ /OK|QUEUEING|JANKEN|WORKING/) {
push(@retval, $remote_host_status[1], $remote_host_status[2]);
} else {
push(@retval, $SYNTAX_ERROR_BAD_ARGUMENT);
}
} elsif ($remote_host_status[1] =~ /VERSION|CLAIM|JANKEN/) {
if (defined $remote_host_status[2]) {
push(@retval, $remote_host_status[1], join(" ",@remote_host_status[2..$#remote_host_status]));
} else {
push(@retval, $SYNTAX_ERROR_BAD_ARGUMENT);
}
} elsif ($remote_host_status[1] =~ /AMUPL/) {
if ($remote_host_status[2] =~ m,#!/usr/bin/perl,) {
push(@retval, $remote_host_status[1], $remote_host_status[2]."\n");
shift(@answer); #don't need the first line again
push(@retval, @answer);
} else {
push(@retval, $SYNTAX_ERROR_BAD_ARGUMENT);
}
} else {
push(@retval, "SYNTAX_ERROR_BAD_COMMAND");
}
} else {
push(@retval, "NOT_AMU");
}
}
close($socket);
} else {
print "\n" if $VERBOSE;
push(@retval, "CONNECTION_REFUSED_OR_TIMED_OUT");
}
return @retval;
}
sub server_thread {
$server = IO::Socket::INET->new(LocalPort => $port,
Type => SOCK_STREAM,
Reuse => 1,
Listen => 10)
or die "Couldn't be a tcp server on port $port : $@\n";
while ($amuhash{"STATUS"} =~ /OK|QUEUEING|JANKEN|WORKING/) {
$client = $server->accept();
recv($client, $question, 1024, 0);
chomp($question);
print "$question\n" if $VERBOSE;
@remote_question = split(" ",$question);
if ($remote_question[0] =~ /AMU/) {
lock(%amuhash);
if (defined $amuhash{$remote_question[1]}) {
print $client "AMU $remote_question[1] $amuhash{$remote_question[1]}\n";
} elsif ($remote_question[1] =~ /AMUPL/) {
print $client "AMU $remote_question[1] ";
for (@amupl) {
print $client $_;
}
print $client "\n"; #just in case there's no carriage return at the end of amu.pl
} elsif (($remote_question[1] =~ /HOSTS/) && (defined $remote_question[2])) {
print "received new host list\n" if $VERBOSE;
my %newhosts = split(";",$remote_question[2]);
$newhosts{$myip} = 1; #sign off that i have seen this list
my @ignoranthosts;
@remote_hosts = ();
foreach $host (keys %newhosts) {
push(@remote_hosts,$host) if $host ne $myip;
if (!($newhosts{$host})) { #these haven't heard about the new host list yet
print "$host hasn't yet heard about the new host list\n" if $VERBOSE;
push(@ignoranthosts,$host);
}
}
$amuhash{"REMOTE_HOSTS"} = join(" ",@remote_hosts);
&status_threads("HOSTS ".join(";",%newhosts), @ignoranthosts);
} elsif (($remote_question[1] =~ /DO/) && ($amuhash{"STATUS"} eq "OK") && (defined $remote_question[2])) {
my $remote_jobs = join(" ",@remote_question[2..$#remote_question]);
print "new job queue '$remote_jobs' has arrived\n" if $VERBOSE;
#new job queue has arrived
$amuhash{"QUEUE"} = $remote_jobs;
$amuhash{"STATUS"} = "QUEUEING";
} elsif (($remote_question[1] =~ /CLAIM/) && ($amuhash{"QUEUE"} ne '') && (defined $remote_question[2])) {
my $remote_job = join(" ",@remote_question[2..$#remote_question]);
if ($amuhash{"JOB"} eq $remote_job) {
#no! that's my job!
print "objecting to job $remote_job\n" if $VERBOSE;
print $client "AMU ".join(" ",@remote_question[1..$#remote_question])."\n";
} else {
my %jobs = split($amudelim,$amuhash{"QUEUE"});
if (exists($jobs{$remote_job})) {
#since i have no objections about others claiming this job...
print "deleting job $remote_job\n" if $VERBOSE;
delete($jobs{$remote_job});
$amuhash{"QUEUE"} = join($amudelim,%jobs);
}
}
} else {
#cool and spicy
}
}
close($client);
}
close($server);
print "server_thread shutting down\n" if $VERBOSE;
return;
}
sub amurestart {
{
lock(%amuhash);
$amuhash{"STATUS"} = "RESTART";
}
&hostname_contact("SHUTDOWN",$myip); #to knock the server thread out of its infinite loop
}
sub amushutdown {
$| = 1;
print "Shutting down in about 15 seconds ...";
{
lock(%amuhash);
$amuhash{"STATUS"} = "SHUTDOWN";
}
&hostname_contact("SHUTDOWN",$myip); #to knock the server thread out of its infinite loop
}
__END__