[vibe.d] websocket and worker thread / task
madkote
schroeder.compling at googlemail.com
Sun Jul 30 09:53:10 UTC 2023
Hi all,
I am moving some C++ code to D as experiment for a production
service. So far, I like D, bindings to C/C++ and Web framework
vibe.d - but there are some issue which I cannot solve.
What:
- websocket service
- receive huge amount data on socket in small packets
- each data package is processed by a "decoder" (CPU intensive
task)
- data schould be processed in a separate thread to avoid
blocking of service
- keep in mind there could be up to "cpu*2" clients in parallel
Problems
- I cannot send results from a thread through WS, see markers in
the code "XXX"
- But sometimes casting works, sometimes not (1 client==NO, 4
client=YES, but not for all)
- what is also strange, on first time cast of WS works and can
send results. but when client connects next time, it does not
work anymore.
Questions
- how to implement such pattern (it worked in C++ well)? ws +
worker thread to compute and send results
- how also to pass input and results through queues back to WS
(in case I want to use a task)? problem here, there is no async
queue implementation for D and again it might not work due to
scope
Thankful for any hints and help!
D code
```
import vibe.d;
import std.process;
import std.random;
import std.conv;
auto rnd = Random(42);
import std.concurrency : receiveOnly, send, spawn, Tid, thisTid;
// to simulate CPU load, pass high values
long fibonacci(long n) {
if (n <= 1) return n;
long a = 0;
long b = 1;
long c = 0;
c = a + b;
for (auto i = 2; i < n; i++) {
a = b;
b = c;
c = a + b;
}
return c;
}
class Model {
private string name_;
this(string name) { this.name_ = name; }
string get_name() { return this.name_; }
}
class Decoder {
private Model model_;
this(Model model) { this.model_ = model; }
long fib(long n) { return fibonacci(n); }
string get_name() { return this.model_.get_name(); }
}
int mytask(Tid parentId, Model m, WebSocket ws) {
import std.stdio;
Decoder d = new Decoder(m);
writeln("T :: Starting", thisTid, "...", "parent", parentId,
d.get_name());
// XXX how to receive data? there is no queue, which would block
on get
// => use sleep to pop data periodically?
try {
// XXX issue: works only once, then Error writing data to
socket.
ws.send("Hello from task");
} catch (Exception o) {
logInfo(">T>> exception=%s", o);
}
sleep(5.seconds);
writeln("T :: Finished ", thisTid, "...", "parent", parentId);
d.destroy();
return 0;
}
void myworker(Tid parentId, shared Model m, shared WebSocket s) {
WebSocket ws = cast(WebSocket) s;
Model mm = cast(Model) m;
Decoder d = new Decoder(mm);
import std.stdio;
writeln("W :: Starting", thisTid, "...", "parent", parentId,
d.get_name());
logInfo("W > ws=%s code=%s", &ws, ws.closeCode);
// XXX data are received from socket by send/receive?
try {
// XXX issue: works only once, then Error writing data to
socket.
ws.send("Hello from thread");
} catch (Exception o) {
logInfo(">W>> exception=%s", o);
}
sleep(5.seconds);
writeln("W :: Finished ", thisTid, "...", "parent", parentId);
d.destroy();
send(parentId, 0);
}
class WebsocketService {
@path("/ws") void getWebsocket2(scope WebSocket ws){
logInfo("Got new web socket connection.");
logInfo("X> connected=%s, ws=%s code=%s", ws.connected, &ws,
ws.closeCode);
Model m = new Model("fibmodel");
import std.concurrency;
auto tw = std.concurrency.spawn(&myworker, thisTid,
cast(shared) m, cast(shared) ws);
import std.parallelism : task, taskPool, TaskPool;
auto ts = task!mytask(thisTid, m, ws);
ts.executeInNewThread();
bool runit = true;
while (runit) {
try {
auto txt = ws.receiveText;
logInfo("Receive '%s'.", txt);
if (txt == "stop") {
break;
}
// XXX which queue to use to send to the task?
// XXX sent to thread?
// send(tw, std.conv.to!int(std.conv.to!string(txt)));
}
catch (Exception e) {
logInfo("can not send to client.");
}
finally {
if (!ws.connected) runit = false;
}
}
logInfo("Client disconnected - wait for worker.");
auto tres = ts.yieldForce;
receiveOnly!int;
logInfo("Task result=%s", tres);
logInfo("Client disconnected - worker is done too.");
m.destroy();
}
@path("/ws2") void getWebsocket2(scope WebSocket ws){
logInfo("Got new web socket connection.");
Model m = new Model(message);
Decoder d = new Decoder(m);
logInfo("X> connected=%s, ws=%s code=%s", ws.connected, &ws,
ws.closeCode);
bool runit = true;
while (runit) {
try {
auto txt = ws.receiveText;
logInfo("Receive '%s'.", txt);
if (txt == "stop") {
break;
}
int num = std.conv.to!int(std.conv.to!string(txt));
// XXX issue - here we block the main thread, and the service
// has no chance to respond to other clients (e.g. ping pong)
long res = d.fib(num);
try {
ws.send(to!string(res));
} catch (Exception o) {
logInfo(">WS>> exception=%s", o);
}
}
catch (Exception e) {
logInfo("can not send to client.");
}
finally {
if (!ws.connected) runit = false;
}
}
d.destroy();
m.destroy();
logInfo("Client disconnected - worker is done too.");
}
}
void helloWorld(HTTPServerRequest req, HTTPServerResponse res)
{
res.writeBody("Hello");
}
void main()
{
auto router = new URLRouter;
router.registerWebInterface(new WebsocketService);
router.get("/hello", &helloWorld);
auto settings = new HTTPServerSettings;
settings.port = 8080;
settings.bindAddresses = ["::1", "127.0.0.1"];
auto listener = listenHTTP(settings, router);
scope (exit) {listener.stopListening();}
runApplication();
}
```
More information about the Digitalmars-d
mailing list