Wednesday, 5 October 2022

How we can learn parallel computing from computer games

 I really like computer games like Factorio or Satisfactory In those games you have to mine your planet resources e.g iron/copper/uranium/coal and put an intricate supply chain of factory machines/betls until more complicated resources come out e.g copper wire, engine, fuel, etc'. 

Here are a few screenshots:






Those games are really fun. It always felt like programming sometimes. It can bring your creative side to life, when reality sometimes does not have that. 

There many games like that, but what most have in common is the performance issues once you get into the "megabase" territory. The megabase is where your factory grows large, and usually, everything starts to slow down once you get there.

Mostly, those games are single threaded, at least the simulation of the factory is single-threaded. Up until recently I thought that it was single thread because of programming simplicity, but then I read Factorio forum post about their performance issues, and it seems they tried multi-threading but said it did not improve drastically(15%), and claimed the issues was memory bottlenecked. 

It seems weird to me, how my 32/48 threads ThreadRippers can be a bottleneck when using two or more cores so easily? Does it really make sense?



Lets try to write the concept of those games in Go language, a very simple, very high parallisem programming language. Lets make each factory/entity a go-routine, and the go-channels are the input/output of the factory.

package main
import (
"fmt"
"time"
)
const Iron = 15 //"Iron"
type FactoryMessage struct {
messageValue int16
}
func calcBlock(input <- chan FactoryMessage, output chan <- FactoryMessage, id int, processTime time.Duration) {
for {
msg := <-input
currentTime := time.Now()
<- time.After(processTime)
if time.Now().Sub(currentTime) -processTime > time.Millisecond * 20 {
fmt.Println("Extra delay...", id, time.Now().Sub(currentTime) - processTime)
}
output <- FactoryMessage{msg.messageValue}
}
}
func main() {
inputChannel := make(chan FactoryMessage, 0)
outputChannel := make(chan FactoryMessage, 0)
for j := 0; j < 1000; j++ {
tmpChannel1 := make(chan FactoryMessage, 0)
tmpChannel2 := make(chan FactoryMessage, 0)
for i := 0; i < 100; i++ {
if i == 0 {
go calcBlock(inputChannel, tmpChannel2, i, time.Millisecond*600)
} else if i == 99 {
go calcBlock(tmpChannel1, outputChannel, i, time.Millisecond*600)
} else {
go calcBlock(tmpChannel1, tmpChannel2, i, time.Millisecond*600)
}
tmpChannel1 = tmpChannel2
tmpChannel2 = make(chan FactoryMessage, 0)
}
}
go func() {
for {
inputChannel <- FactoryMessage{Iron}
}
}()
counter := 0
go func(localCounter *int) {
for {
<-outputChannel
*localCounter = *localCounter + 1
}
}(&counter)
go func(localCounter *int) {
for {
fmt.Println("Elems count", *localCounter)
*localCounter = 0
time.Sleep(time.Second * 5)
}
}(&counter)
time.Sleep(time.Hour * 9999)
}
view raw GoFactorio.go hosted with ❤ by GitHub


This is basically the implementation of what we described earlier, and well? its behaving pretty garbage, CPU is quite low (but much more than a single thread), but even on a very powerful ThreadRipper it does not really get us any good performance, probably not even better than the single thread, but why? how can it be?

Using AMD uProf to profile the process, it seems it get stuck a lot on some timer function and opcode. It might be a Windows thing, so I switched to linux.

In linux things still seems bad, but different. 


For example the findrunnable function seems to be related to Go:

https://github.com/google/gvisor/issues/1942

Might be realted to the that fact they combine (or not) timers?

I remeber Akka having some kind of timer schduler, so lets try it:

 

 

//#full-example
package com.example
import akka.actor
import akka.actor.Status.Success
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import akka.dispatch.Mailboxes
import akka.pattern.{ask, pipe}
import akka.util.Timeout
import java.util.Date
import scala.concurrent.Future
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.language.postfixOps
import scala.concurrent.duration._
import scala.util.Try
case class AddInput(ref: ActorRef)
case class AddOutput(ref: ActorRef)
class FactoryActor(duration: FiniteDuration) extends Actor {
var busy = false
var waitingOutput = false
//val system: actor.ActorSystem = akka.actor.ActorSystem("system")
var inputs = Set[ActorRef]()
var outputs = Set[ActorRef]()
var lastTime = System.currentTimeMillis()
//import system.dispatcher
override def receive: Receive = {
case "WantResource" =>
//println("WantResource", this.self, sender(), new Date())
if (!busy && !waitingOutput) {
sender() ! "RequestResource"
}
case "GotResource" =>
if (!busy && !waitingOutput) {
//println("GotResource", this.self, sender(), new Date())
//sender() ! "Processing"
busy = true
val self = this.self
val runnable = new Runnable() {
override def run(): Unit = {
//println("Processing")
//if(System.currentTimeMillis() - lastTime - duration.toMillis > 40) {
//println("Too long delay", System.currentTimeMillis() - lastTime - duration.toMillis)
//}
busy = false
waitingOutput = true
outputs.foreach(i => {
i.tell("WantResource", self)
})
}
}
context.system.scheduler.scheduleOnce(duration, runnable)(context.dispatcher)
//lastTime = System.currentTimeMillis()
//system.scheduler.scheduleOnce(duration, this.self, "DoneProcess")
}
// case "DoneProcess" =>
// //println("DoneProcess", this.self, sender())
// val j = 0
// case "Processing" =>
// //println("Processing", this.self, sender())
// if (waitingOutput) {
// waitingOutput = false
// }
case "RequestResource" =>
//println("RequestResource", this.self, sender(), new Date())
if (waitingOutput) {
sender() ! "GotResource"
waitingOutput = false
val self = this.self
inputs.foreach(i => {
i.tell("RequestResource", self)
})
}
case AddInput(ref) =>
//println("Input", this.self, ref)
ref ! "RequestResource"
inputs = inputs ++ Set(ref)
case AddOutput(ref) =>
//println("Output", this.self, ref)
outputs = outputs ++ Set(ref)
}
}
case class FactorySource() extends Actor {
var sent = false
override def receive: Receive = {
case "RequestResource" =>
if (!sent) {
sender() ! "GotResource"
//sent = true
}
case "Processing" =>
val sure = false
}
}
case class FactorySink() extends Actor {
var amountOfMessages = 0
context.system.scheduler.scheduleAtFixedRate(1 second, 10 second, this.self, "ResetTimer")(context.dispatcher)
override def receive: Receive = {
case "GotResource" =>
amountOfMessages = amountOfMessages + 1
sender() ! "Processing"
case "WantResource" =>
sender() ! "RequestResource"
case "ResetTimer" =>
println("Timer", amountOfMessages)
amountOfMessages = 0
}
}
//#main-class
object AkkaQuickstart extends App {
akka.actor.LightArrayRevolverScheduler
//#actor-system
val system = ActorSystem("system")
system.logConfiguration()
val du = 500 millisecond
val output = system.actorOf(Props[FactorySink], "output")
for (j <- 1 to 1000 * 1) {
var tmpActor = system.actorOf(Props(classOf[FactoryActor], du)/*.withMailbox("akka.my-unbounded-dispatcher")*/)
val input = system.actorOf(Props[FactorySource], "input" + j)
//val output = system.actorOf(Props[FactorySink], "output" + j)
val rowLength = 100
for (a <- 1 to rowLength) {
//system.actorOf(Props(classOf[FactoryActor], du))
//ac = ac ++ Seq(system.actorOf(Props[FactorySource]))
//ac = ac ++ Seq(system.actorOf(Props[FactorySink]))
//println(a)
if (a == 1) {
tmpActor ! AddInput(input)
} else if (a == rowLength) {
tmpActor ! AddOutput(output)
} else {
val newActor = system.actorOf(Props(classOf[FactoryActor], du)/*.withMailbox("akka.my-unbounded-dispatcher")*/)
tmpActor ! AddOutput(newActor)
newActor ! AddInput(tmpActor)
tmpActor = newActor
}
}
}
//#actor-system
//#main-send-messages
//#main-send-messages
}
//#main-class
//#full-example

We dont see really the JVM, only Kernel stuff, which is good.
Its probably means Akka handles Timers better:
https://doc.akka.io/docs/akka/2.5/scheduler.html

The surprise was the M1 (both regular and pro) which behaved much better and got much better performance (for both Go and Akka) than my 3960x/2950x ThreadRippers. Whicih I further tried to explain here: https://breaking-the-system.blogspot.com/2022/10/m1-is-much-faster-than-what-people.html

It seems that those benchmarks basically profile timers/scheduler/event-loop performance in some way or another, which is not ideal. 

I also wanted to test CUDA: 


#include <iostream>
#include <stdio.h>
#include <sys/time.h>
#include <unistd.h>
#define PROCESS_TIME 500
#define BUSY 0b0010
#define GOT_OUTPUT 0b0001
#define MAX_INPUTS 2
#define MAX_OUTPUTS 2
#define MAX_ROW 100
#define NUMBER_OF_DEVICES 1000 * 1000 * 100
#define DEVIDE 256
__global__ void checkInput(int *machineStatus, int *inputs, long int currentTime, long int *machineTime) {
auto threadId = (blockIdx.x * blockDim.x) + threadIdx.x;
//printf("checkInput tid: %d currentTime: %ld, prevTime: %ld, busy: %d, gotOutput: %d \n", threadId, currentTime,
// machineTime[threadId], machineStatus[threadId] & BUSY, machineStatus[threadId] & GOT_OUTPUT);
if ((machineStatus[threadId] & GOT_OUTPUT) || (machineStatus[threadId] & BUSY) ) {
return;
}
if(threadId % MAX_ROW == 0) {
machineStatus[threadId] ^= BUSY;
machineTime[threadId] = currentTime;
return;
}
for (int i = 0; i < MAX_INPUTS; i++) {
auto currentInput = inputs[(threadId * MAX_INPUTS) + i];
if (currentInput > -1 && machineStatus[currentInput] & GOT_OUTPUT) {
machineStatus[currentInput] ^= GOT_OUTPUT;
machineStatus[threadId] ^= BUSY;
machineTime[threadId] = currentTime;
break;
}
}
}
__global__ void checkOutput(int *machineStatus, int *outputs, long int currentTime, long int *machineTime, int *wantOutput) {
auto threadId = (blockIdx.x * blockDim.x) + threadIdx.x;
if (!(machineStatus[threadId] & GOT_OUTPUT) || (machineStatus[threadId] & BUSY) ) {
return;
}
if(threadId % (MAX_ROW - 1) == 0 && threadId > 0) {
machineStatus[threadId] ^= GOT_OUTPUT;
wantOutput[threadId] = wantOutput[threadId] + 1;
return;
}
for (int i = 0; i < MAX_OUTPUTS; i++) {
auto currentOutput = outputs[(threadId * MAX_OUTPUTS) + i];
if ( currentOutput > -1 && !machineStatus[currentOutput] & GOT_OUTPUT && !machineStatus[currentOutput] & BUSY) {
machineStatus[currentOutput] ^= BUSY;
machineStatus[threadId] ^= GOT_OUTPUT;
machineTime[currentOutput] = currentTime;
break;
}
}
}
__global__ void checkWorkIsDone(long int currentTime, long int *machineTime, int *machineStatus) {
auto threadId = (blockIdx.x * blockDim.x) + threadIdx.x;
//printf("checkWorkIsDone tid: %d currentTime: %ld, prevTime: %ld, busy: %d, gotOutput: %d \n", threadId, currentTime,
// machineTime[threadId], machineStatus[threadId] & BUSY, machineStatus[threadId] & GOT_OUTPUT);
if ((machineStatus[threadId] & GOT_OUTPUT) || !(machineStatus[threadId] & BUSY)) {
return;
}
if (currentTime - machineTime[threadId] > PROCESS_TIME) {
machineStatus[threadId] ^= GOT_OUTPUT;
machineStatus[threadId] ^= BUSY;
}
}
int main() {
auto *machineTime = (long *) malloc(NUMBER_OF_DEVICES * sizeof(long int));
long int *machineTimeD;
cudaMalloc(&machineTimeD, NUMBER_OF_DEVICES * sizeof(long int));
auto *machineStatus = (int *) malloc(NUMBER_OF_DEVICES * sizeof(int));
int *machineStatusD;
cudaMalloc(&machineStatusD, NUMBER_OF_DEVICES * sizeof(int));
auto *wantOutput = (int *) malloc(NUMBER_OF_DEVICES * sizeof(int));
int *wantOutputD;
cudaMalloc(&wantOutputD, NUMBER_OF_DEVICES * sizeof(int));
auto *inputs = (int *) malloc(NUMBER_OF_DEVICES * sizeof(int) * MAX_OUTPUTS);
int *inputsD;
cudaMalloc(&inputsD, NUMBER_OF_DEVICES * sizeof(int) * MAX_INPUTS);
auto *outputs = (int *) malloc(NUMBER_OF_DEVICES * sizeof(int) * MAX_OUTPUTS);
int *outputsD;
cudaMalloc(&outputsD, NUMBER_OF_DEVICES * sizeof(int) * MAX_OUTPUTS);
struct timeval tp{};
gettimeofday(&tp, NULL);
long int ms = tp.tv_sec * 1000 + tp.tv_usec / 1000;
for (int i = 0; i < NUMBER_OF_DEVICES; i++) {
machineStatus[i] = 0;
machineTime[i] = 0;
if (i % MAX_ROW == 0) {
machineStatus[i] ^= BUSY;
machineTime[i] = ms;
}
wantOutput[i] = 0;
//requestResource[i] = -1;
for (int j = 0; j < MAX_INPUTS; j++) {
if( i % MAX_ROW == 0 && j == 0) {
inputs[(i* MAX_INPUTS) + j] = -1;
} else if (j == 0) {
inputs[(i* MAX_INPUTS) + j] = i - 1;
} else {
inputs[(i* MAX_INPUTS) + j] = -1;
}
}
for (int j = 0; j < MAX_OUTPUTS; j++) {
if( i % (MAX_ROW -1) == 0 && i > 0 && j == 0 ) {
outputs[(i* MAX_OUTPUTS) + j] = -1;
} else if (j == 0) {
outputs[(i* MAX_OUTPUTS) + j] = i + 1;
}else {
outputs[(i* MAX_INPUTS) + j] = -1;
}
}
}
cudaMemcpy(machineTimeD, machineTime, NUMBER_OF_DEVICES * sizeof(long int), cudaMemcpyHostToDevice);
cudaMemcpy(machineStatusD, machineStatus, NUMBER_OF_DEVICES * sizeof(int), cudaMemcpyHostToDevice);
cudaMemcpy(wantOutputD, wantOutput, NUMBER_OF_DEVICES * sizeof(int), cudaMemcpyHostToDevice);
cudaMemcpy(inputsD, inputs, NUMBER_OF_DEVICES * sizeof(int) * MAX_OUTPUTS, cudaMemcpyHostToDevice);
cudaMemcpy(outputsD, outputs, NUMBER_OF_DEVICES * sizeof(int) * MAX_OUTPUTS, cudaMemcpyHostToDevice);
int i = 0;
long int lastMs = 0;
while (true) {
gettimeofday(&tp, NULL);
ms = tp.tv_sec * 1000 + tp.tv_usec / 1000;
if (ms -lastMs > 1000 * 10) {
cudaMemcpy(wantOutput, wantOutputD, NUMBER_OF_DEVICES * sizeof(int), cudaMemcpyDeviceToHost);
printf("time %ld %ld %d\n", wantOutput[MAX_ROW - 1], ms, 0);
printf("time %ld %ld %d\n", wantOutput[(MAX_ROW -1) * 2 ], ms, 2);
printf("time %ld %ld %d\n", wantOutput[(MAX_ROW -1) *10], ms, 10);
printf("time %ld %ld %d\n", wantOutput[(MAX_ROW -1) * 20], ms, 20);
lastMs = ms;
i = 0;
}
checkInput<<<(NUMBER_OF_DEVICES+DEVIDE-1)/DEVIDE, DEVIDE>>>(machineStatusD, inputsD, ms, machineTimeD);
checkWorkIsDone<<<(NUMBER_OF_DEVICES+DEVIDE-1)/DEVIDE, DEVIDE>>>(ms, machineTimeD, machineStatusD);
checkOutput<<<(NUMBER_OF_DEVICES+DEVIDE-1)/DEVIDE , DEVIDE>>>(machineStatusD, outputsD, ms, machineTimeD, wantOutputD);
//usleep(1000 * 10);
i++;
}
return 0;
}
view raw factory.cu hosted with ❤ by GitHub

The cuda approach is quite different. We basically have matrix which go through all the time in a loop and we check if enough time has passed for each factory. This means no insane scheduling really. This with the fact that GPUs are really good at traversing a lot of data means my 1080 ti could really crash this test, and get a decent result (100M "manufacturers").

The limit of the 1080 TI might be a memory. I wanted to test it with 32GB M1 Max (with Apple Metal) but still did not manage to get to it. 


I learn a lot in this process (as probably there is much more to learn). Hope you as well!

I am not sure I can fully answer the original question I asked. CUDA for sure can do it, but it might not really I was looking for. 

No comments:

Post a Comment