https://kb.speeddemosarchive.com/index.php?title=Amu-chan/Source/amu.pl&feed=atom&action=history
Amu-chan/Source/amu.pl - Revision history
2024-03-28T19:10:45Z
Revision history for this page on the wiki
MediaWiki 1.23.9
https://kb.speeddemosarchive.com/index.php?title=Amu-chan/Source/amu.pl&diff=2582&oldid=prev
Njahnke: New page: <pre><nowiki> #!/usr/bin/perl #nathan jahnke <njahnke@gmail.com> ### ATASHI NO KOKORO, UNLOCK! ### # Amu is distributed under the terms of the Gnu General Public License 3.0: # http://ww...
2008-10-12T05:37:49Z
<p>New page: <pre><nowiki> #!/usr/bin/perl #nathan jahnke <njahnke@gmail.com> ### ATASHI NO KOKORO, UNLOCK! ### # Amu is distributed under the terms of the Gnu General Public License 3.0: # http://ww...</p>
<p><b>New page</b></p><div><pre><nowiki><br />
#!/usr/bin/perl<br />
#nathan jahnke <njahnke@gmail.com><br />
<br />
### ATASHI NO KOKORO, UNLOCK! ###<br />
<br />
# Amu is distributed under the terms of the Gnu General Public License 3.0:<br />
# http://www.gnu.org/licenses/gpl-3.0.txt<br />
<br />
#TODO<br />
##delimiter is whatever the first character in a new queue is<br />
##alternative jobs based on os<br />
##encryption?<br />
<br />
use warnings;<br />
use strict 'subs';<br />
package amu;<br />
<br />
use IO::Socket;<br />
use Sys::Hostname;<br />
use Sys::HostIP;<br />
use threads;<br />
use threads::shared;<br />
#use Thread::Queue;<br />
use List::Util qw(shuffle);<br />
use Cwd qw(abs_path);<br />
<br />
$myversion = 44;<br />
$VERBOSE = 1;<br />
$amudelim = ";"; #this currently delimits only jobs and their indices (keys as a hash) in the queue<br />
$port = 56677;<br />
$myip = Sys::HostIP->ip;<br />
$mypath = abs_path($0);<br />
my %amustatus :shared = ();<br />
<br />
open(AMUPL,$mypath);<br />
@amupl = <AMUPL>;<br />
close(AMUPL);<br />
<br />
@remote_hosts = ();<br />
if (-e "amu_remotehosts.txt") {<br />
open(REMOTEHOSTS,"amu_remotehosts.txt");<br />
while (<REMOTEHOSTS>) {<br />
chomp;<br />
if (($_ ne $myip) && (!($_ =~ /^#/))) {<br />
push(@remote_hosts,$_) if $_;<br />
}<br />
}<br />
close(REMOTEHOSTS);<br />
}<br />
<br />
#intercept ctrl-c or kill from cli<br />
$SIG{INT} = $SIG{TERM} = \&amushutdown;<br />
$SIG{HUP} = \&amurestart;<br />
<br />
my %amuhash :shared = (); #it needs the my for some reason!?<br />
{ #lock and load<br />
lock(%amuhash);<br />
$amuhash{"STATUS"} = "OK";<br />
$amuhash{"VERSION"} = $myversion;<br />
$amuhash{"QUEUE"} = "";<br />
$amuhash{"JOB"} = "";<br />
$amuhash{"REMOTE_HOSTS"} = join(" ",@remote_hosts);<br />
}<br />
print "starting server_thread ...\n" if $VERBOSE;<br />
$server_thread = threads->create(\&server_thread);<br />
<br />
#status updates every 5 seconds<br />
print "starting status_threads_infinite_loop ...\n" if $VERBOSE;<br />
threads->create(\&status_threads_infinite_loop);<br />
<br />
print "Amu version $myversion started successfully!\n";<br />
<br />
<br />
while ($amuhash{"STATUS"} =~ /OK|QUEUEING|JANKEN|WORKING/) {<br />
#update @remote_hosts - don't want to have to lock %amuhash every time we contact people<br />
{<br />
lock(%amuhash);<br />
@remote_hosts = split(" ",$amuhash{"REMOTE_HOSTS"});<br />
}<br />
<br />
if ($amuhash{"STATUS"} =~ /QUEUEING/) {<br />
print "entering status mode block QUEUEING\n" if $VERBOSE;<br />
if ($amuhash{"QUEUE"} ne '') {<br />
#distribute the queue to all known other amus ... a lot will get it and ignore because they're already in QUEUEING mode but this will help ensure everyone gets it - BEFORE we claim a job<br />
threads->create(\&status_threads, "DO ".$amuhash{"QUEUE"}, @remote_hosts)->join();<br />
my $jobnum;<br />
if ($amuhash{"QUEUE"} ne '') { #check again to make sure our queue hasn't been emptied since we sent the queue to others<br />
{<br />
print "about to lock the hash...\n" if $VERBOSE;<br />
lock(%amuhash);<br />
print "locked the hash\n" if $VERBOSE;<br />
my %jobs = split($amudelim,$amuhash{"QUEUE"});<br />
#pick random queue item<br />
my @joblist = (%jobs);<br />
$jobnum = int(rand(keys %jobs));<br />
$jobnum = $joblist[$jobnum*2];<br />
#claim it<br />
print "claiming job $jobnum\n" if $VERBOSE;<br />
$amuhash{"JOB"} = $jobnum;<br />
#and get ready to fight for it<br />
$amuhash{"STATUS"} = "JANKEN";<br />
}<br />
#sleep 5;<br />
%objections = &status_threads("CLAIM ".$jobnum, @remote_hosts);<br />
}<br />
}<br />
if ($amuhash{"STATUS"} ne "JANKEN") { #no more jobs for us to do<br />
print "no more jobs for us to do\n" if $VERBOSE;<br />
lock(%amuhash);<br />
$amuhash{"STATUS"} = "OK";<br />
}<br />
}<br />
if ($amuhash{"STATUS"} =~ /JANKEN/) {<br />
print "entering status mode block JANKEN\n" if $VERBOSE;<br />
if (%objections) {<br />
foreach $evilhost (keys %objections) {<br />
last if $amuhash{"STATUS"} eq "QUEUEING"; #i've already lost<br />
#generate my random number<br />
$random = rand();<br />
$random =~ s/^0.//;<br />
{<br />
lock(%amuhash);<br />
$amuhash{"JANKEN"} = $random;<br />
}<br />
#broadcast<br />
print "asking for janken - job $amuhash{JOB}\n" if $VERBOSE;<br />
my @janken = &status_threads("JANKEN", $evilhost);<br />
if (@janken) { #did anyone play with me?<br />
if ($janken[1] > $random) { #i lost<br />
print "i lost at janken\n" if $VERBOSE;<br />
lock(%amuhash);<br />
my %jobs = split($amudelim,$amuhash{"QUEUE"});<br />
if (exists($jobs{$amuhash{"JOB"}})) {<br />
#give this job up<br />
delete($jobs{$amuhash{"JOB"}});<br />
$amuhash{"QUEUE"} = join($amudelim,%jobs);<br />
}<br />
print "trying to get another job; changing status back to QUEUEING ...\n" if $VERBOSE;<br />
$amuhash{"STATUS"} = "QUEUEING"; #try to get another job<br />
} elsif ($janken[1] == $random) { #a tie<br />
print "a tie at janken!?\n" if $VERBOSE;<br />
redo;<br />
}<br />
} else {<br />
print "no responses for janken\n" if $VERBOSE;<br />
}<br />
}<br />
{<br />
lock(%amuhash);<br />
undef $amuhash{"JANKEN"};<br />
}<br />
}<br />
{<br />
lock(%amuhash);<br />
$amuhash{"STATUS"} = "WORKING" if $amuhash{"STATUS"} ne "QUEUEING";<br />
}<br />
}<br />
if ($amuhash{"STATUS"} =~ /WORKING/) {<br />
print "entering status mode block WORKING\n" if $VERBOSE;<br />
my ($jobnum, %jobs);<br />
{<br />
lock(%amuhash);<br />
%jobs = split($amudelim,$amuhash{"QUEUE"});<br />
$jobnum = $amuhash{"JOB"};<br />
}<br />
#run the command<br />
print "executing $jobs{$jobnum} ...\n" if $VERBOSE;<br />
system($jobs{$jobnum});<br />
#all dun<br />
{<br />
lock(%amuhash);<br />
my %jobs = split($amudelim,$amuhash{"QUEUE"});<br />
delete($jobs{$amuhash{"JOB"}});<br />
$amuhash{"QUEUE"} = join($amudelim,%jobs);<br />
$amuhash{"JOB"} = "";<br />
$amuhash{"STATUS"} = "QUEUEING";<br />
}<br />
}<br />
sleep 1;<br />
}<br />
sleep 15;<br />
exec($mypath) if ($amuhash{"STATUS"} =~ /RESTART/);<br />
exit 0 if ($amuhash{"STATUS"} =~ /SHUTDOWN/);<br />
<br />
<br />
sub version_check {<br />
print "Checking the network for a newer version of Amu ...\n" if $VERBOSE;<br />
{<br />
lock(%amustatus);<br />
@knowngoodamus = (keys %amustatus);<br />
}<br />
%hostversions = &status_threads("VERSION", @knowngoodamus);<br />
foreach $amuversion (sort { $a < $b } keys %hostversions) {<br />
print "$amuversion : $hostversions{$amuversion}\n" if $VERBOSE;<br />
if ($amuversion > $myversion) {<br />
&status_threads("AMUPL", $hostversions{$amuversion});<br />
last;<br />
}<br />
}<br />
}<br />
<br />
sub status_threads_infinite_loop {<br />
while ($amuhash{"STATUS"} =~ /OK|QUEUEING|JANKEN|WORKING/) {<br />
if ($amuhash{"STATUS"} =~ /OK/) {<br />
{<br />
lock(%amuhash);<br />
@remote_hosts = split(" ",$amuhash{"REMOTE_HOSTS"});<br />
}<br />
@remote_hosts = shuffle(@remote_hosts);<br />
my ($status_thread) = threads->create(\&status_threads, "STATUS", @remote_hosts);<br />
my %foundamus = $status_thread->join();<br />
{<br />
lock(%amustatus);<br />
%amustatus = %foundamus;<br />
}<br />
&version_check;<br />
}<br />
sleep 5;<br />
}<br />
print "status_threads_infinite_loop shutting down\n" if $VERBOSE;<br />
return;<br />
}<br />
<br />
sub status_threads {<br />
my $command = shift;<br />
my %threads;<br />
my %foundamus; #temporary hash to avoid locking the shared one while we build up responses<br />
foreach $remote_host (@_) {<br />
($threads{$remote_host}) = threads->create(\&hostname_contact, $command, $remote_host);<br />
}<br />
foreach $remote_host (keys %threads) {<br />
@remote_response = $threads{$remote_host}->join();<br />
if (defined $remote_response[2]) { #not an error<br />
if ($remote_response[1] =~ /AMUPL/) {<br />
shift(@remote_response); #kill the remote host<br />
shift(@remote_response); #kill the command<br />
for (@remote_response) { #make sure this is the entire amu.pl<br />
$amuplgood = 1 if m/^__END__/;<br />
}<br />
if ($amuplgood) {<br />
open(NEWAMUPL,">$mypath");<br />
for (@remote_response) { #and print the new script<br />
print NEWAMUPL;<br />
}<br />
close(NEWAMUPL);<br />
print "Upgraded to a new version of Amu! Restarting Amu in 15 seconds ...\n";<br />
{<br />
lock(%amuhash);<br />
$amuhash{"STATUS"} = "RESTART";<br />
}<br />
}<br />
} elsif ($remote_response[1] =~ /VERSION/) {<br />
#backwards so that the version is the key and the host is the value: we want only one host - the first to reply - per version<br />
if (!(exists($foundamus{$remote_response[2]}))) {<br />
$foundamus{$remote_response[2]} = $remote_response[0];<br />
}<br />
} else {<br />
$foundamus{$remote_response[0]} = $remote_response[2];<br />
}<br />
} else {<br />
#handle errors<br />
}<br />
}<br />
return %foundamus;<br />
}<br />
<br />
sub hostname_contact {<br />
my $command = shift;<br />
my $remote_host = shift;<br />
my @retval = ($remote_host);<br />
print "trying $remote_host ...\n" if $VERBOSE;<br />
$| = 1 if $VERBOSE; #flush output buffer<br />
if (my $socket = IO::Socket::INET->new(PeerAddr => $remote_host,<br />
PeerPort => $port,<br />
Proto => "tcp",<br />
Type => SOCK_STREAM,<br />
Timeout => 5 )) {<br />
print $socket "AMU $command\n";<br />
my @answer = <$socket>;<br />
print "contact with $remote_host\n" if $VERBOSE;<br />
my $firstline = $answer[0];<br />
if ($firstline) {<br />
chomp($firstline);<br />
@remote_host_status = split(" ",$firstline);<br />
$SYNTAX_ERROR_BAD_ARGUMENT = "SYNTAX_ERROR_BAD_ARGUMENT";<br />
if (($remote_host_status[0] =~ /AMU/) && (defined $remote_host_status[1]) && (defined $remote_host_status[2])) {<br />
if ($remote_host_status[1] =~ /STATUS/) {<br />
if ($remote_host_status[2] =~ /OK|QUEUEING|JANKEN|WORKING/) {<br />
push(@retval, $remote_host_status[1], $remote_host_status[2]);<br />
} else {<br />
push(@retval, $SYNTAX_ERROR_BAD_ARGUMENT);<br />
}<br />
} elsif ($remote_host_status[1] =~ /VERSION|CLAIM|JANKEN/) {<br />
if (defined $remote_host_status[2]) {<br />
push(@retval, $remote_host_status[1], join(" ",@remote_host_status[2..$#remote_host_status]));<br />
} else {<br />
push(@retval, $SYNTAX_ERROR_BAD_ARGUMENT);<br />
}<br />
} elsif ($remote_host_status[1] =~ /AMUPL/) {<br />
if ($remote_host_status[2] =~ m,#!/usr/bin/perl,) {<br />
push(@retval, $remote_host_status[1], $remote_host_status[2]."\n");<br />
shift(@answer); #don't need the first line again<br />
push(@retval, @answer);<br />
} else {<br />
push(@retval, $SYNTAX_ERROR_BAD_ARGUMENT);<br />
}<br />
} else {<br />
push(@retval, "SYNTAX_ERROR_BAD_COMMAND");<br />
}<br />
} else {<br />
push(@retval, "NOT_AMU");<br />
}<br />
}<br />
close($socket);<br />
} else {<br />
print "\n" if $VERBOSE;<br />
push(@retval, "CONNECTION_REFUSED_OR_TIMED_OUT");<br />
}<br />
return @retval;<br />
}<br />
<br />
sub server_thread {<br />
$server = IO::Socket::INET->new(LocalPort => $port,<br />
Type => SOCK_STREAM,<br />
Reuse => 1,<br />
Listen => 10)<br />
or die "Couldn't be a tcp server on port $port : $@\n";<br />
<br />
while ($amuhash{"STATUS"} =~ /OK|QUEUEING|JANKEN|WORKING/) {<br />
$client = $server->accept();<br />
recv($client, $question, 1024, 0);<br />
chomp($question);<br />
print "$question\n" if $VERBOSE;<br />
@remote_question = split(" ",$question);<br />
if ($remote_question[0] =~ /AMU/) {<br />
lock(%amuhash);<br />
if (defined $amuhash{$remote_question[1]}) {<br />
print $client "AMU $remote_question[1] $amuhash{$remote_question[1]}\n";<br />
} elsif ($remote_question[1] =~ /AMUPL/) {<br />
print $client "AMU $remote_question[1] ";<br />
for (@amupl) {<br />
print $client $_;<br />
}<br />
print $client "\n"; #just in case there's no carriage return at the end of amu.pl<br />
} elsif (($remote_question[1] =~ /HOSTS/) && (defined $remote_question[2])) {<br />
print "received new host list\n" if $VERBOSE;<br />
my %newhosts = split(";",$remote_question[2]);<br />
$newhosts{$myip} = 1; #sign off that i have seen this list<br />
my @ignoranthosts;<br />
@remote_hosts = ();<br />
foreach $host (keys %newhosts) {<br />
push(@remote_hosts,$host) if $host ne $myip;<br />
if (!($newhosts{$host})) { #these haven't heard about the new host list yet<br />
print "$host hasn't yet heard about the new host list\n" if $VERBOSE;<br />
push(@ignoranthosts,$host);<br />
}<br />
}<br />
$amuhash{"REMOTE_HOSTS"} = join(" ",@remote_hosts);<br />
&status_threads("HOSTS ".join(";",%newhosts), @ignoranthosts);<br />
} elsif (($remote_question[1] =~ /DO/) && ($amuhash{"STATUS"} eq "OK") && (defined $remote_question[2])) {<br />
my $remote_jobs = join(" ",@remote_question[2..$#remote_question]);<br />
print "new job queue '$remote_jobs' has arrived\n" if $VERBOSE;<br />
#new job queue has arrived<br />
$amuhash{"QUEUE"} = $remote_jobs;<br />
$amuhash{"STATUS"} = "QUEUEING";<br />
} elsif (($remote_question[1] =~ /CLAIM/) && ($amuhash{"QUEUE"} ne '') && (defined $remote_question[2])) {<br />
my $remote_job = join(" ",@remote_question[2..$#remote_question]);<br />
if ($amuhash{"JOB"} eq $remote_job) {<br />
#no! that's my job!<br />
print "objecting to job $remote_job\n" if $VERBOSE;<br />
print $client "AMU ".join(" ",@remote_question[1..$#remote_question])."\n";<br />
} else {<br />
my %jobs = split($amudelim,$amuhash{"QUEUE"});<br />
if (exists($jobs{$remote_job})) {<br />
#since i have no objections about others claiming this job...<br />
print "deleting job $remote_job\n" if $VERBOSE;<br />
delete($jobs{$remote_job});<br />
$amuhash{"QUEUE"} = join($amudelim,%jobs);<br />
}<br />
}<br />
} else {<br />
#cool and spicy<br />
}<br />
}<br />
close($client);<br />
}<br />
close($server);<br />
print "server_thread shutting down\n" if $VERBOSE;<br />
return;<br />
}<br />
<br />
sub amurestart {<br />
{<br />
lock(%amuhash);<br />
$amuhash{"STATUS"} = "RESTART";<br />
}<br />
&hostname_contact("SHUTDOWN",$myip); #to knock the server thread out of its infinite loop<br />
}<br />
<br />
sub amushutdown {<br />
$| = 1;<br />
print "Shutting down in about 15 seconds ...";<br />
{<br />
lock(%amuhash);<br />
$amuhash{"STATUS"} = "SHUTDOWN";<br />
}<br />
&hostname_contact("SHUTDOWN",$myip); #to knock the server thread out of its infinite loop<br />
}<br />
<br />
__END__<br />
</nowiki></pre></div>
Njahnke