[vibe.d] websocket and worker thread / task

madkote schroeder.compling at googlemail.com
Sun Jul 30 09:53:10 UTC 2023


Hi all,

I am moving some C++ code to D as experiment for a production 
service. So far, I like D, bindings to C/C++ and Web framework 
vibe.d - but there are some issue which I cannot solve.

What:
- websocket service
- receive huge amount data on socket in small packets
- each data package is processed by a "decoder" (CPU intensive 
task)
- data schould be processed in a separate thread to avoid 
blocking of service
- keep in mind there could be up to "cpu*2" clients in parallel

Problems
- I cannot send results from a thread through WS, see markers in 
the code "XXX"
- But sometimes casting works, sometimes not (1 client==NO, 4 
client=YES, but not for all)
- what is also strange, on first time cast of WS works and can 
send results. but when client connects next time, it does not 
work anymore.

Questions
- how to implement such pattern (it worked in C++ well)? ws + 
worker thread to compute and send results
- how also to pass input and results through queues back to WS 
(in case I want to use a task)? problem here, there is no async 
queue implementation for D and again it might not work due to 
scope

Thankful for any hints and help!

D code
```
import vibe.d;

import std.process;
import std.random;
import std.conv;

auto rnd = Random(42);

import std.concurrency : receiveOnly, send, spawn, Tid, thisTid;

// to simulate CPU load, pass high values
long fibonacci(long n) {
	if (n <= 1) return n;
	long a = 0;
	long b = 1;
	long c = 0;
	c = a + b;
	for (auto i = 2; i < n; i++) {
		a = b;
		b = c;
		c = a + b;
	}
	return c;
}

class Model {
	private string name_;
	this(string name) { this.name_ = name; }
	string get_name() { return this.name_; }
}

class Decoder {
     private Model model_;
     this(Model model) { this.model_ = model; }
     long fib(long n) { return fibonacci(n); }
     string get_name() { return this.model_.get_name(); }
}

int mytask(Tid parentId, Model m, WebSocket ws) {
	import std.stdio;
	Decoder d = new Decoder(m);
	writeln("T :: Starting", thisTid, "...", "parent", parentId, 
d.get_name());
	
	// XXX how to receive data? there is no queue, which would block 
on get
	// => use sleep to pop data periodically?
	
	try {
		// XXX issue: works only once, then Error writing data to 
socket.
		ws.send("Hello from task");
	} catch (Exception o) {
		logInfo(">T>> exception=%s", o);
	}
	
	sleep(5.seconds);
	writeln("T :: Finished ", thisTid, "...", "parent", parentId);
	
	d.destroy();
	return 0;
}

void myworker(Tid parentId, shared Model m, shared WebSocket s) {
	WebSocket ws = cast(WebSocket) s;
	Model mm = cast(Model) m;
	Decoder d = new Decoder(mm);
	
	import std.stdio;
	writeln("W :: Starting", thisTid, "...", "parent", parentId, 
d.get_name());
	logInfo("W > ws=%s code=%s", &ws, ws.closeCode);

	// XXX data are received from socket by send/receive?

	try {
		// XXX issue: works only once, then Error writing data to 
socket.
		ws.send("Hello from thread");
	} catch (Exception o) {
		logInfo(">W>> exception=%s", o);
	}
	
	sleep(5.seconds);
	writeln("W :: Finished ", thisTid, "...", "parent", parentId);
	
	d.destroy();
	send(parentId, 0);
}


class WebsocketService {	
	@path("/ws") void getWebsocket2(scope WebSocket ws){
		logInfo("Got new web socket connection.");
		logInfo("X> connected=%s, ws=%s code=%s", ws.connected, &ws, 
ws.closeCode);
		Model m = new Model("fibmodel");

		import std.concurrency;
		auto tw = std.concurrency.spawn(&myworker, thisTid, 
cast(shared) m, cast(shared) ws);

		import std.parallelism : task, taskPool, TaskPool;		
		auto ts = task!mytask(thisTid, m, ws);
		ts.executeInNewThread();

		bool runit = true;
		while (runit) {
			try {
				auto txt = ws.receiveText;
				logInfo("Receive '%s'.", txt);
				
				if (txt == "stop") {
					break;
				}

				// XXX which queue to use to send to the task?

				// XXX sent to thread?
				// send(tw, std.conv.to!int(std.conv.to!string(txt)));
			}
			catch (Exception e) {
				logInfo("can not send to client.");
			}
			finally {
				if (!ws.connected) runit = false;
			}
		}
		logInfo("Client disconnected - wait for worker.");
		auto tres = ts.yieldForce;
		receiveOnly!int;
		logInfo("Task result=%s", tres);
		logInfo("Client disconnected - worker is done too.");
		m.destroy();
	}
	
	@path("/ws2") void getWebsocket2(scope WebSocket ws){
		logInfo("Got new web socket connection.");
		Model m = new Model(message);
		Decoder d = new Decoder(m);
		logInfo("X> connected=%s, ws=%s code=%s", ws.connected, &ws, 
ws.closeCode);
		
		bool runit = true;
		while (runit) {
			try {
				auto txt = ws.receiveText;
				logInfo("Receive '%s'.", txt);
				
				if (txt == "stop") {
					break;
				}
				int num = std.conv.to!int(std.conv.to!string(txt));
				
				// XXX issue - here we block the main thread, and the service
				// has no chance to respond to other clients (e.g. ping pong)
				long res = d.fib(num);
				
				try {
					ws.send(to!string(res));
				} catch (Exception o) {
					logInfo(">WS>> exception=%s", o);
				}
			}
			catch (Exception e) {
				logInfo("can not send to client.");
			}
			finally {
				if (!ws.connected) runit = false;
			}
		}
		d.destroy();
		m.destroy();
		logInfo("Client disconnected - worker is done too.");
	}
}

void helloWorld(HTTPServerRequest req, HTTPServerResponse res)
{	
     res.writeBody("Hello");
}


void main()
{
     auto router = new URLRouter;
     router.registerWebInterface(new WebsocketService);
     router.get("/hello", &helloWorld);
     auto settings = new HTTPServerSettings;
     settings.port = 8080;
     settings.bindAddresses = ["::1", "127.0.0.1"];
     auto listener = listenHTTP(settings, router);
     scope (exit) {listener.stopListening();}
     runApplication();
}

```




More information about the Digitalmars-d mailing list