module fcgi; import std.socket, std.stdio, std.socketstream; import std.datetime; import std.concurrency; import core.thread; import std.process; import std.conv; private import std.c.windows.windows, std.c.windows.winsock; /////////////// // PROTOCOL END /////////////// S toStruct(S, T)(T[] buf) { static ubyte[S.sizeof] buf2; assert(buf.length >= S.sizeof); buf2 = buf[0..S.sizeof]; return cast(S)(buf2); } /// Who is responsible for closing the socket? enum SocketLifetimeOwner { application = 1, server = 2 } alias void delegate(Request) RequestCallback; alias void delegate(ushort, int, ProtocolStatus) EndRequestHandler; class Request { RequestId _id; Params _params; Role _role; SocketLifetimeOwner _socketOwner; EndRequestHandler _endRequestHandler; ubyte[] _input; ListenThread _server; bool _ended; this(RequestId id, ListenThread server, EndRequestHandler endRequestHandler) { _id = id; _endRequestHandler = endRequestHandler; _server = server; } @property ubyte[] input() pure nothrow @safe { return _input; } @property RequestId id() pure const nothrow @safe { return _id; } @property void id(RequestId id) pure nothrow @safe { _id = id; } @property const(Params) params() pure const nothrow @safe { return _params; } private @property void params(Params params) pure nothrow @safe { _params = params; } @property Role role() pure const nothrow @safe { return _role; } private @property void role(Role role) pure nothrow @safe { _role = role; } @property SocketLifetimeOwner socketOwner() pure const nothrow @safe { return _socketOwner; } @property void socketOwner(SocketLifetimeOwner owner) pure nothrow @safe { _socketOwner = owner; } void write(T)(const(T[]) data, RecordType method = RecordType.stdOut) { assert(!_ended); // Break into chunks of 65535 auto socket = _server._active; auto head = RecordHead(method, _id, cast(ushort)(data.length)); socket.send(cast(ubyte[RecordHead.sizeof])head); socket.send(cast(ubyte[])data); head.contentLength = 0; socket.send(cast(ubyte[RecordHead.sizeof])head); } void end() { _endRequestHandler(_id, 0, ProtocolStatus.requestComplete); _ended = true; } } class Params { private string[string] _params; private this(string[string] params) { _params = params; } private this(in ubyte[] raw) pure nothrow @trusted { int i; do { auto keyLen = cast(ubyte) raw[i++]; auto valLen = cast(ubyte) raw[i++]; auto key = cast(string)raw[i .. i+keyLen]; i += keyLen; auto value = cast(string)raw[i .. i+valLen]; i += valLen; _params[key] = value; } while(i < raw.length); } string opDispatch(string key)() const pure nothrow @safe { auto v = key in _params; return v ? *v : ""; } override string toString() { return to!string(_params); } } class ListenThread : Thread { this(string host, ushort port) { this(host, port, &processRequest); } this(string host, ushort port, RequestCallback callback) { _callback = callback; _listener = new TcpSocket(); _listener.blocking = false; _listener.bind(new InternetAddress(host, port)); debug writeln("listening to ", host, ":", port, " blocking:", _listener.blocking); super(&run); } protected void processRequest(Request request) { assert(false, " this should be overridden"); } private: Socket _listener; bool _quitting; Socket _active; Socket[] clients; RequestCallback _callback; enum MAX_CONNECTIONS = ubyte.max; void run() { ulong requests, reqPrSec, totMs, totHnReq, reqTmThisSec; ulong lastReq; auto sw = StopWatch(AutoStart.yes); clients.reserve(MAX_CONNECTIONS); auto ss = new SocketSet(MAX_CONNECTIONS+1); _listener.listen(MAX_CONNECTIONS); while(clients.length || !_quitting) { // Wait for incoming int select; do { ss.reset(); ss.add(_listener); foreach(client; clients) { ss.add(client); } // we'll time-out to check for quitting select = Socket.select(ss, null, null, 1000); } while(select <= 0 && !_quitting) if(_quitting) { _listener.shutdown(SocketShutdown.RECEIVE); continue; } // Process pending requests for(int i;i= 1_000) { auto rps = cast(double)reqPrSec/(sw.peek.hnsecs/10000000.0); totMs += sw.peek.hnsecs; auto trps = cast(double)requests/(totMs/10000000.0); sw.reset(); writeln("ReqPrSec: ", rps, " AvgPrCB: ", (cast(double)reqTmThisSec/cast(double)reqPrSec)/1000.0); writeln("TOT: ", requests, " HNS: ", 10000000.0/trps); reqPrSec = 0; reqTmThisSec = 0; } } _active = null; // Accept new connections if(ss.isSet(_listener)) { assert(clients.capacity); auto client = _listener.accept(); debug writeln(client.handle(), " accepted"); client.setOption(SocketOptionLevel.SOCKET, SocketOption.LINGER, std.socket.linger(1, 60)); client.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, 1); clients ~= client; } } _listener.shutdown(SocketShutdown.RECEIVE); _listener.close(); } void closeClient(Socket socket) { debug writeln(socket.handle(), " closing"); socket.shutdown(SocketShutdown.BOTH); socket.close(); } void endRequest( RequestId requestId, int applicationStatus, ProtocolStatus protocolStatus) { auto socket = _active; assert(socket); debug writeln(socket.handle(), " ending request ", requestId, " app: ", applicationStatus, " prot: ", protocolStatus); auto head = RecordHead(RecordType.endRequest, requestId, EndRequestBody.sizeof); socket.send(cast(ubyte[RecordHead.sizeof])head); auto endRequest = EndRequestBody(applicationStatus, protocolStatus); socket.send(cast(ubyte[EndRequestBody.sizeof])endRequest); } Request readRequest(Socket _socket) { static Request[RequestId] partialRequests; Request request; RecordHead head; static ubyte[RecordHead.sizeof] headBuf = void; static ubyte[ushort.max] content = void; bool readingStream; do { // Receive header assert(_socket.isAlive); debug writeln(_socket.handle(), " receive record"); auto len = _socket.receive(headBuf); if(len == Socket.ERROR || len == 0) { debug writeln(_socket.handle(), " finished or error, numPartialRequests: ", partialRequests.length); return null; } // Get (partial) request head = cast(RecordHead)headBuf; debug writeln(_socket.handle(), " ", head); auto existing = head.requestId in partialRequests; if(!existing || head.type == RecordType.beginRequest) { partialRequests[head.requestId] = request = new Request(head.requestId, this, &endRequest); } else { request = *existing; } // Read request data if(head.contentLength) { len = _socket.receive(content[0..head.contentLength]); assert(len == head.contentLength); } // Handle Record final switch(head.type) { case RecordType.beginRequest: debug writeln(_socket.handle(), " BEGINREQUEST"); auto beginRequest = toStruct!(BeginRequestBody)(content); request.role = beginRequest.role; request.socketOwner = beginRequest.keepConnection ? SocketLifetimeOwner.server : SocketLifetimeOwner.application; debug writeln(_socket.handle(), " Request: ", request.requestId, " role: ", request.role, " owner: ", request.socketOwner); break; case RecordType.abortRequest: writeln("ABORTREQUEST!"); assert(false); break; case RecordType.endRequest: writeln("ENDREQUEST!"); assert(false); break; case RecordType.params: debug writeln(_socket.handle(), " PARAMS"); if(head.contentLength) { assert(!request.params); debug writeln(_socket.handle(), " Parsing params"); request.params = new Params(content[0..head.contentLength].dup); } else { // params finished debug writeln(_socket.handle(), " -> DONE"); } break; case RecordType.stdIn: if(head.contentLength) { readingStream = true; // TODO: use some appender request._input ~= content[0..head.contentLength]; debug writeln(_socket.handle(), " STDIN! LEN:", head.contentLength, " - ", cast(char[])request._input); } else { readingStream = false; debug writeln(_socket.handle(), " STDIN DONE! - ", cast(char[])request._input); } break; case RecordType.stdOut: writeln("STDOUT!"); assert(false); break; case RecordType.stdErr: writeln("STDERR!"); assert(false); break; case RecordType.data: writeln("DATA!"); assert(false); break; case RecordType.getValues: writeln("GETVALUES!"); assert(false); break; case RecordType.getValuesResult: writeln("GETVALUESRESULT!"); assert(false); break; case RecordType.unknownType: writeln("UNKNOWNTYPE!"); assert(false); break; } if(head.paddingLength) { len = _socket.receive(content[0..head.paddingLength]); assert(len == head.paddingLength); assert(len <= ubyte.max); } } while(!(head.type == RecordType.stdIn && head.contentLength == 0)) debug writeln(_socket.handle(), " finished reading to stream"); assert(request); partialRequests.remove(request.id); return request; } } void onRequest(Request request) { debug writeln("request: ", request.requestId, " creating response"); //auto content = "request: "~to!string(cast(string[string])request.params._params)~""; auto content = "Hello World"; auto http = "HTTP/1.1 200 OK\r\nConnection: Keep-Alive\r\nContent-Type: text/html; charset=utf-8\r\nContent-Length:"~to!string(content.length)~"\r\n\r\n"; request.write(http); request.write(content); request.end(); } void main(string[] args) { auto listener = new ListenThread("localhost", to!ushort(args[1]), (Request r){ onRequest(r);}); listener.start(); } /******************************************************************************** * FastCGI Protocol * http://www.fastcgi.com/drupal/node/6?q=node/22 ********************************************************************************/ enum ubyte FastCGIVersion = 1; enum RequestId FastCGIManagementRequestId = 0; auto pack(A...)(in A arr) pure nothrow @safe { enum requiredSize = A.length * A[0].sizeof; static assert(requiredSize <= long.sizeof); enum signed = is(A[0] == byte) || is(A[0] == short) || is(A[0] == int) || is(A[0] == long); static if(requiredSize == 8 && signed) alias long R; else static if(requiredSize == 8 && !signed) alias ulong R; else static if(requiredSize == 4 && signed) alias int R; else static if(requiredSize == 4 && !signed) alias uint R; else static if(requiredSize == 2 && signed) alias short R; else static if(requiredSize == 2 && !signed) alias ushort R; else static if(requiredSize == 1 && signed) alias byte R; else static if(requiredSize == 1 && !signed) alias ubyte R; else static assert(false); R result; void add(int N)() { result |= arr[N] << ((A.length-N-1)*8); static if(N < A.length-1) add!(N+1)(); } add!(0)(); return result; } auto unpack(T)(const T v) pure nothrow @safe { enum N = T.sizeof/ubyte.sizeof; ubyte[N] result = void; void add(int I)() { result[I] = (v >> (I*8)) & 0xFF; static if(I < N-1) add!(I+1)(); } add!(0)(); return result; } enum RecordType : ubyte { beginRequest = 1, abortRequest = 2, endRequest = 3, params = 4, stdIn = 5, stdOut = 6, stdErr = 7, data = 8, getValues = 9, getValuesResult = 10, unknownType = 11, } alias ushort RequestId; struct RecordHead { ubyte protocolVersion = FastCGIVersion; RecordType type; ubyte requestIdB1; ubyte requestIdB0; ubyte contentLengthB1; ubyte contentLengthB0; ubyte paddingLength; ubyte reserved; this(RecordType type, RequestId requestId, ushort contentLength) pure nothrow @safe { this.type = type; this.requestId = requestId; this.contentLength = contentLength; this.protocolVersion = FastCGIVersion; } pure @safe const nothrow invariant() { assert(protocolVersion == FastCGIVersion); } @property RequestId requestId() const pure nothrow @safe { return pack(requestIdB1, requestIdB0); } @property void requestId(RequestId id) pure nothrow @safe { auto unpacked = unpack(id); requestIdB1 = unpacked[1]; requestIdB0 = unpacked[0]; } @property ushort contentLength() const pure nothrow @safe { return pack(contentLengthB1, contentLengthB0); } @property void contentLength(ushort len) pure nothrow @safe { auto unpacked = unpack(len); contentLengthB1 = unpacked[1]; contentLengthB0 = unpacked[0]; } @property bool isManagementRecord() const pure nothrow @safe { return requestId == FastCGIManagementRequestId; } @property bool isApplicationRecord() const pure nothrow @safe { return !isManagementRecord; } } enum Role : short { responder = 1, authorizer = 2, filter = 3 } enum Flags { keepConnection = 1 } struct BeginRequestBody { ubyte roleB1; ubyte roleB0; ubyte flags; ubyte[5] reserved; @property Role role() const pure nothrow @safe { return cast(Role)pack(roleB1, roleB0); } @property void role(Role role) pure nothrow @safe { auto unpacked = unpack(role); roleB1 = unpacked[1]; roleB0 = unpacked[0]; } @property bool keepConnection() const pure nothrow @safe { return (flags & Flags.keepConnection) != 0; } } enum ProtocolStatus : ubyte { requestComplete = 0, cannotMultiplexConnection = 1, overloaded = 2, unknownRole = 3, } struct EndRequestBody { ubyte appStatusB3; ubyte appStatusB2; ubyte appStatusB1; ubyte appStatusB0; ProtocolStatus protocolStatus; ubyte[3] reserved; this(int appStatus, ProtocolStatus status) pure nothrow @safe { this.appStatus = appStatus; this.protocolStatus = status; } @property int appStatus() const pure nothrow @safe { return pack(appStatusB3, appStatusB2, appStatusB1, appStatusB0); } @property void appStatus(int status) pure nothrow @safe { auto unpacked = unpack(status); appStatusB3 = unpacked[3]; appStatusB2 = unpacked[2]; appStatusB1 = unpacked[1]; appStatusB0 = unpacked[0]; } } struct UnknownTypeBody { ubyte type; ubyte[7] reserved; }