I really like computer games like Factorio or Satisfactory In those games you have to mine your planet resources e.g iron/copper/uranium/coal and put an intricate supply chain of factory machines/betls until more complicated resources come out e.g copper wire, engine, fuel, etc'.
Here are a few screenshots:
Those games are really fun. It always felt like programming sometimes. It can bring your creative side to life, when reality sometimes does not have that.
There many games like that, but what most have in common is the performance issues once you get into the "megabase" territory. The megabase is where your factory grows large, and usually, everything starts to slow down once you get there.
Mostly, those games are single threaded, at least the simulation of the factory is single-threaded. Up until recently I thought that it was single thread because of programming simplicity, but then I read Factorio forum post about their performance issues, and it seems they tried multi-threading but said it did not improve drastically(15%), and claimed the issues was memory bottlenecked.
It seems weird to me, how my 32/48 threads ThreadRippers can be a bottleneck when using two or more cores so easily? Does it really make sense?
Lets try to write the concept of those games in Go language, a very simple, very high parallisem programming language. Lets make each factory/entity a go-routine, and the go-channels are the input/output of the factory.
package main | |
import ( | |
"fmt" | |
"time" | |
) | |
const Iron = 15 //"Iron" | |
type FactoryMessage struct { | |
messageValue int16 | |
} | |
func calcBlock(input <- chan FactoryMessage, output chan <- FactoryMessage, id int, processTime time.Duration) { | |
for { | |
msg := <-input | |
currentTime := time.Now() | |
<- time.After(processTime) | |
if time.Now().Sub(currentTime) -processTime > time.Millisecond * 20 { | |
fmt.Println("Extra delay...", id, time.Now().Sub(currentTime) - processTime) | |
} | |
output <- FactoryMessage{msg.messageValue} | |
} | |
} | |
func main() { | |
inputChannel := make(chan FactoryMessage, 0) | |
outputChannel := make(chan FactoryMessage, 0) | |
for j := 0; j < 1000; j++ { | |
tmpChannel1 := make(chan FactoryMessage, 0) | |
tmpChannel2 := make(chan FactoryMessage, 0) | |
for i := 0; i < 100; i++ { | |
if i == 0 { | |
go calcBlock(inputChannel, tmpChannel2, i, time.Millisecond*600) | |
} else if i == 99 { | |
go calcBlock(tmpChannel1, outputChannel, i, time.Millisecond*600) | |
} else { | |
go calcBlock(tmpChannel1, tmpChannel2, i, time.Millisecond*600) | |
} | |
tmpChannel1 = tmpChannel2 | |
tmpChannel2 = make(chan FactoryMessage, 0) | |
} | |
} | |
go func() { | |
for { | |
inputChannel <- FactoryMessage{Iron} | |
} | |
}() | |
counter := 0 | |
go func(localCounter *int) { | |
for { | |
<-outputChannel | |
*localCounter = *localCounter + 1 | |
} | |
}(&counter) | |
go func(localCounter *int) { | |
for { | |
fmt.Println("Elems count", *localCounter) | |
*localCounter = 0 | |
time.Sleep(time.Second * 5) | |
} | |
}(&counter) | |
time.Sleep(time.Hour * 9999) | |
} |
This is basically the implementation of what we described earlier, and well? its behaving pretty garbage, CPU is quite low (but much more than a single thread), but even on a very powerful ThreadRipper it does not really get us any good performance, probably not even better than the single thread, but why? how can it be?
Using AMD uProf to profile the process, it seems it get stuck a lot on some timer function and opcode. It might be a Windows thing, so I switched to linux.
In linux things still seems bad, but different.
For example the findrunnable function seems to be related to Go:
https://github.com/google/gvisor/issues/1942
Might be realted to the that fact they combine (or not) timers?
I remeber Akka having some kind of timer schduler, so lets try it:
//#full-example | |
package com.example | |
import akka.actor | |
import akka.actor.Status.Success | |
import akka.actor.{Actor, ActorRef, ActorSystem, Props} | |
import akka.dispatch.Mailboxes | |
import akka.pattern.{ask, pipe} | |
import akka.util.Timeout | |
import java.util.Date | |
import scala.concurrent.Future | |
import scala.concurrent.duration.{DurationInt, FiniteDuration} | |
import scala.language.postfixOps | |
import scala.concurrent.duration._ | |
import scala.util.Try | |
case class AddInput(ref: ActorRef) | |
case class AddOutput(ref: ActorRef) | |
class FactoryActor(duration: FiniteDuration) extends Actor { | |
var busy = false | |
var waitingOutput = false | |
//val system: actor.ActorSystem = akka.actor.ActorSystem("system") | |
var inputs = Set[ActorRef]() | |
var outputs = Set[ActorRef]() | |
var lastTime = System.currentTimeMillis() | |
//import system.dispatcher | |
override def receive: Receive = { | |
case "WantResource" => | |
//println("WantResource", this.self, sender(), new Date()) | |
if (!busy && !waitingOutput) { | |
sender() ! "RequestResource" | |
} | |
case "GotResource" => | |
if (!busy && !waitingOutput) { | |
//println("GotResource", this.self, sender(), new Date()) | |
//sender() ! "Processing" | |
busy = true | |
val self = this.self | |
val runnable = new Runnable() { | |
override def run(): Unit = { | |
//println("Processing") | |
//if(System.currentTimeMillis() - lastTime - duration.toMillis > 40) { | |
//println("Too long delay", System.currentTimeMillis() - lastTime - duration.toMillis) | |
//} | |
busy = false | |
waitingOutput = true | |
outputs.foreach(i => { | |
i.tell("WantResource", self) | |
}) | |
} | |
} | |
context.system.scheduler.scheduleOnce(duration, runnable)(context.dispatcher) | |
//lastTime = System.currentTimeMillis() | |
//system.scheduler.scheduleOnce(duration, this.self, "DoneProcess") | |
} | |
// case "DoneProcess" => | |
// //println("DoneProcess", this.self, sender()) | |
// val j = 0 | |
// case "Processing" => | |
// //println("Processing", this.self, sender()) | |
// if (waitingOutput) { | |
// waitingOutput = false | |
// } | |
case "RequestResource" => | |
//println("RequestResource", this.self, sender(), new Date()) | |
if (waitingOutput) { | |
sender() ! "GotResource" | |
waitingOutput = false | |
val self = this.self | |
inputs.foreach(i => { | |
i.tell("RequestResource", self) | |
}) | |
} | |
case AddInput(ref) => | |
//println("Input", this.self, ref) | |
ref ! "RequestResource" | |
inputs = inputs ++ Set(ref) | |
case AddOutput(ref) => | |
//println("Output", this.self, ref) | |
outputs = outputs ++ Set(ref) | |
} | |
} | |
case class FactorySource() extends Actor { | |
var sent = false | |
override def receive: Receive = { | |
case "RequestResource" => | |
if (!sent) { | |
sender() ! "GotResource" | |
//sent = true | |
} | |
case "Processing" => | |
val sure = false | |
} | |
} | |
case class FactorySink() extends Actor { | |
var amountOfMessages = 0 | |
context.system.scheduler.scheduleAtFixedRate(1 second, 10 second, this.self, "ResetTimer")(context.dispatcher) | |
override def receive: Receive = { | |
case "GotResource" => | |
amountOfMessages = amountOfMessages + 1 | |
sender() ! "Processing" | |
case "WantResource" => | |
sender() ! "RequestResource" | |
case "ResetTimer" => | |
println("Timer", amountOfMessages) | |
amountOfMessages = 0 | |
} | |
} | |
//#main-class | |
object AkkaQuickstart extends App { | |
akka.actor.LightArrayRevolverScheduler | |
//#actor-system | |
val system = ActorSystem("system") | |
system.logConfiguration() | |
val du = 500 millisecond | |
val output = system.actorOf(Props[FactorySink], "output") | |
for (j <- 1 to 1000 * 1) { | |
var tmpActor = system.actorOf(Props(classOf[FactoryActor], du)/*.withMailbox("akka.my-unbounded-dispatcher")*/) | |
val input = system.actorOf(Props[FactorySource], "input" + j) | |
//val output = system.actorOf(Props[FactorySink], "output" + j) | |
val rowLength = 100 | |
for (a <- 1 to rowLength) { | |
//system.actorOf(Props(classOf[FactoryActor], du)) | |
//ac = ac ++ Seq(system.actorOf(Props[FactorySource])) | |
//ac = ac ++ Seq(system.actorOf(Props[FactorySink])) | |
//println(a) | |
if (a == 1) { | |
tmpActor ! AddInput(input) | |
} else if (a == rowLength) { | |
tmpActor ! AddOutput(output) | |
} else { | |
val newActor = system.actorOf(Props(classOf[FactoryActor], du)/*.withMailbox("akka.my-unbounded-dispatcher")*/) | |
tmpActor ! AddOutput(newActor) | |
newActor ! AddInput(tmpActor) | |
tmpActor = newActor | |
} | |
} | |
} | |
//#actor-system | |
//#main-send-messages | |
//#main-send-messages | |
} | |
//#main-class | |
//#full-example |
We dont see really the JVM, only Kernel stuff, which is good.
Its probably means Akka handles Timers better:
https://doc.akka.io/docs/akka/2.5/scheduler.html
The surprise was the M1 (both regular and pro) which behaved much better and got much better performance (for both Go and Akka) than my 3960x/2950x ThreadRippers. Whicih I further tried to explain here: https://breaking-the-system.blogspot.com/2022/10/m1-is-much-faster-than-what-people.html
It seems that those benchmarks basically profile timers/scheduler/event-loop performance in some way or another, which is not ideal.
I also wanted to test CUDA:
#include <iostream> | |
#include <stdio.h> | |
#include <sys/time.h> | |
#include <unistd.h> | |
#define PROCESS_TIME 500 | |
#define BUSY 0b0010 | |
#define GOT_OUTPUT 0b0001 | |
#define MAX_INPUTS 2 | |
#define MAX_OUTPUTS 2 | |
#define MAX_ROW 100 | |
#define NUMBER_OF_DEVICES 1000 * 1000 * 100 | |
#define DEVIDE 256 | |
__global__ void checkInput(int *machineStatus, int *inputs, long int currentTime, long int *machineTime) { | |
auto threadId = (blockIdx.x * blockDim.x) + threadIdx.x; | |
//printf("checkInput tid: %d currentTime: %ld, prevTime: %ld, busy: %d, gotOutput: %d \n", threadId, currentTime, | |
// machineTime[threadId], machineStatus[threadId] & BUSY, machineStatus[threadId] & GOT_OUTPUT); | |
if ((machineStatus[threadId] & GOT_OUTPUT) || (machineStatus[threadId] & BUSY) ) { | |
return; | |
} | |
if(threadId % MAX_ROW == 0) { | |
machineStatus[threadId] ^= BUSY; | |
machineTime[threadId] = currentTime; | |
return; | |
} | |
for (int i = 0; i < MAX_INPUTS; i++) { | |
auto currentInput = inputs[(threadId * MAX_INPUTS) + i]; | |
if (currentInput > -1 && machineStatus[currentInput] & GOT_OUTPUT) { | |
machineStatus[currentInput] ^= GOT_OUTPUT; | |
machineStatus[threadId] ^= BUSY; | |
machineTime[threadId] = currentTime; | |
break; | |
} | |
} | |
} | |
__global__ void checkOutput(int *machineStatus, int *outputs, long int currentTime, long int *machineTime, int *wantOutput) { | |
auto threadId = (blockIdx.x * blockDim.x) + threadIdx.x; | |
if (!(machineStatus[threadId] & GOT_OUTPUT) || (machineStatus[threadId] & BUSY) ) { | |
return; | |
} | |
if(threadId % (MAX_ROW - 1) == 0 && threadId > 0) { | |
machineStatus[threadId] ^= GOT_OUTPUT; | |
wantOutput[threadId] = wantOutput[threadId] + 1; | |
return; | |
} | |
for (int i = 0; i < MAX_OUTPUTS; i++) { | |
auto currentOutput = outputs[(threadId * MAX_OUTPUTS) + i]; | |
if ( currentOutput > -1 && !machineStatus[currentOutput] & GOT_OUTPUT && !machineStatus[currentOutput] & BUSY) { | |
machineStatus[currentOutput] ^= BUSY; | |
machineStatus[threadId] ^= GOT_OUTPUT; | |
machineTime[currentOutput] = currentTime; | |
break; | |
} | |
} | |
} | |
__global__ void checkWorkIsDone(long int currentTime, long int *machineTime, int *machineStatus) { | |
auto threadId = (blockIdx.x * blockDim.x) + threadIdx.x; | |
//printf("checkWorkIsDone tid: %d currentTime: %ld, prevTime: %ld, busy: %d, gotOutput: %d \n", threadId, currentTime, | |
// machineTime[threadId], machineStatus[threadId] & BUSY, machineStatus[threadId] & GOT_OUTPUT); | |
if ((machineStatus[threadId] & GOT_OUTPUT) || !(machineStatus[threadId] & BUSY)) { | |
return; | |
} | |
if (currentTime - machineTime[threadId] > PROCESS_TIME) { | |
machineStatus[threadId] ^= GOT_OUTPUT; | |
machineStatus[threadId] ^= BUSY; | |
} | |
} | |
int main() { | |
auto *machineTime = (long *) malloc(NUMBER_OF_DEVICES * sizeof(long int)); | |
long int *machineTimeD; | |
cudaMalloc(&machineTimeD, NUMBER_OF_DEVICES * sizeof(long int)); | |
auto *machineStatus = (int *) malloc(NUMBER_OF_DEVICES * sizeof(int)); | |
int *machineStatusD; | |
cudaMalloc(&machineStatusD, NUMBER_OF_DEVICES * sizeof(int)); | |
auto *wantOutput = (int *) malloc(NUMBER_OF_DEVICES * sizeof(int)); | |
int *wantOutputD; | |
cudaMalloc(&wantOutputD, NUMBER_OF_DEVICES * sizeof(int)); | |
auto *inputs = (int *) malloc(NUMBER_OF_DEVICES * sizeof(int) * MAX_OUTPUTS); | |
int *inputsD; | |
cudaMalloc(&inputsD, NUMBER_OF_DEVICES * sizeof(int) * MAX_INPUTS); | |
auto *outputs = (int *) malloc(NUMBER_OF_DEVICES * sizeof(int) * MAX_OUTPUTS); | |
int *outputsD; | |
cudaMalloc(&outputsD, NUMBER_OF_DEVICES * sizeof(int) * MAX_OUTPUTS); | |
struct timeval tp{}; | |
gettimeofday(&tp, NULL); | |
long int ms = tp.tv_sec * 1000 + tp.tv_usec / 1000; | |
for (int i = 0; i < NUMBER_OF_DEVICES; i++) { | |
machineStatus[i] = 0; | |
machineTime[i] = 0; | |
if (i % MAX_ROW == 0) { | |
machineStatus[i] ^= BUSY; | |
machineTime[i] = ms; | |
} | |
wantOutput[i] = 0; | |
//requestResource[i] = -1; | |
for (int j = 0; j < MAX_INPUTS; j++) { | |
if( i % MAX_ROW == 0 && j == 0) { | |
inputs[(i* MAX_INPUTS) + j] = -1; | |
} else if (j == 0) { | |
inputs[(i* MAX_INPUTS) + j] = i - 1; | |
} else { | |
inputs[(i* MAX_INPUTS) + j] = -1; | |
} | |
} | |
for (int j = 0; j < MAX_OUTPUTS; j++) { | |
if( i % (MAX_ROW -1) == 0 && i > 0 && j == 0 ) { | |
outputs[(i* MAX_OUTPUTS) + j] = -1; | |
} else if (j == 0) { | |
outputs[(i* MAX_OUTPUTS) + j] = i + 1; | |
}else { | |
outputs[(i* MAX_INPUTS) + j] = -1; | |
} | |
} | |
} | |
cudaMemcpy(machineTimeD, machineTime, NUMBER_OF_DEVICES * sizeof(long int), cudaMemcpyHostToDevice); | |
cudaMemcpy(machineStatusD, machineStatus, NUMBER_OF_DEVICES * sizeof(int), cudaMemcpyHostToDevice); | |
cudaMemcpy(wantOutputD, wantOutput, NUMBER_OF_DEVICES * sizeof(int), cudaMemcpyHostToDevice); | |
cudaMemcpy(inputsD, inputs, NUMBER_OF_DEVICES * sizeof(int) * MAX_OUTPUTS, cudaMemcpyHostToDevice); | |
cudaMemcpy(outputsD, outputs, NUMBER_OF_DEVICES * sizeof(int) * MAX_OUTPUTS, cudaMemcpyHostToDevice); | |
int i = 0; | |
long int lastMs = 0; | |
while (true) { | |
gettimeofday(&tp, NULL); | |
ms = tp.tv_sec * 1000 + tp.tv_usec / 1000; | |
if (ms -lastMs > 1000 * 10) { | |
cudaMemcpy(wantOutput, wantOutputD, NUMBER_OF_DEVICES * sizeof(int), cudaMemcpyDeviceToHost); | |
printf("time %ld %ld %d\n", wantOutput[MAX_ROW - 1], ms, 0); | |
printf("time %ld %ld %d\n", wantOutput[(MAX_ROW -1) * 2 ], ms, 2); | |
printf("time %ld %ld %d\n", wantOutput[(MAX_ROW -1) *10], ms, 10); | |
printf("time %ld %ld %d\n", wantOutput[(MAX_ROW -1) * 20], ms, 20); | |
lastMs = ms; | |
i = 0; | |
} | |
checkInput<<<(NUMBER_OF_DEVICES+DEVIDE-1)/DEVIDE, DEVIDE>>>(machineStatusD, inputsD, ms, machineTimeD); | |
checkWorkIsDone<<<(NUMBER_OF_DEVICES+DEVIDE-1)/DEVIDE, DEVIDE>>>(ms, machineTimeD, machineStatusD); | |
checkOutput<<<(NUMBER_OF_DEVICES+DEVIDE-1)/DEVIDE , DEVIDE>>>(machineStatusD, outputsD, ms, machineTimeD, wantOutputD); | |
//usleep(1000 * 10); | |
i++; | |
} | |
return 0; | |
} |
The cuda approach is quite different. We basically have matrix which go through all the time in a loop and we check if enough time has passed for each factory. This means no insane scheduling really. This with the fact that GPUs are really good at traversing a lot of data means my 1080 ti could really crash this test, and get a decent result (100M "manufacturers").
The limit of the 1080 TI might be a memory. I wanted to test it with 32GB M1 Max (with Apple Metal) but still did not manage to get to it.
I learn a lot in this process (as probably there is much more to learn). Hope you as well!
I am not sure I can fully answer the original question I asked. CUDA for sure can do it, but it might not really I was looking for.
No comments:
Post a Comment