1 /++ 2 Fiber-based socket i/o built on Phobos' std.socket and Socket.select without any other dependencies. 3 4 5 This is meant to be a single-threaded event-driven basic network server. 6 7 --- 8 void main() { 9 auto fm = new FiberManager(); 10 // little tcp echo server 11 // exits when it gets "QUIT" on the socket. 12 Socket listener; 13 listener = fm.listenTcp6(6660, (Socket conn) { 14 while(true) { 15 char[128] buffer; 16 auto ret = conn.receive(buffer[]); 17 // keeps the Phobos interface so... 18 if(ret <= 0) // ...still need to check return values 19 break; 20 auto got = buffer[0 .. ret]; 21 if(got.length >= 4 && got[0 .. 4] == "QUIT") { 22 listener.close(); 23 break; 24 } else { 25 conn.send(got); 26 } 27 } 28 conn.close(); 29 }); 30 31 // simultaneously listen for and echo UDP packets 32 fm.makeFiber( () { 33 auto sock = fm.bindUdp4(9999); 34 char[128] buffer; 35 Address addr; 36 while(true) { 37 auto ret = sock.receiveFrom(buffer[], addr); 38 if(ret <= 0) 39 break; 40 import std.stdio; 41 auto got = buffer[0 .. ret]; 42 // print it to the console 43 writeln("Received UDP ", got); 44 // send the echo 45 sock.sendTo(got, addr); 46 47 if(got.length > 4 && got[0 .. 4] == "QUIT") { 48 break; // stop processing udp when told to quit too 49 } 50 } 51 }).call(); // need to call it the first time ourselves to get it started 52 53 // run the events. This keeps going until there are no more registered events; 54 // so when all registered sockets are closed or abandoned. 55 // 56 // So this will return when both QUIT messages are received and all clients disconnect. 57 import std.stdio; 58 writeln("Entering."); 59 60 fm.run(); 61 62 writeln("Exiting."); 63 } 64 --- 65 66 Note that DNS address lookups here may still block the whole thread, but other methods on `Socket` are overridden in the subclass ([FiberSocket]) to `yield` appropriately, so you should be able to reuse most existing code that uses Phobos' Socket with little to no modification. However, since it keeps the same interface as the original object, remember you still need to check your return values! 67 68 There's two big differences: 69 70 $(NUMBERED_LIST 71 * You should not modify the `blocking` flag on the Sockets. It is already set for you and changing it will... probably not hurt, but definitely won't help. 72 73 * You shouldn't construct the Sockets yourself, nor call `connect` or `listen` on them. Instead, use the methods in the [FiberManager] class. It will ensure you get the right objects initialized in the right way with the minimum amount of blocking. 74 75 The `listen` family of functions accept a delegate that is called per each connection in a fresh fiber. The `connect` family of functions can only be used from inside an existing fiber - if you do it in a connection handler from listening, it is already set up. If it is from your main thread though, you'll get an assert error unless you make your own fiber ahead of time. [FiberManager.makeFiber] can construct one for you, or you can call `new Fiber(...)` from `import core.thread.fiber` yourself. Put all the work with the connection inside that fiber so the manager can do its work most efficiently. 76 ) 77 78 There's several convenience functions to construct addresses for you too, or you may simply do `getAddress` or `new InternetAddress` and friends from `std.socket` yourself. 79 80 $(H2 Conceptual Overview) 81 82 A socket is a common programming object for communication over a network. Phobos has support for the basics and you can read more about that in my blog socket tutorial: http://dpldocs.info/this-week-in-d/Blog.Posted_2019_11_11.html 83 84 A lot of things describe [core.thread.fiber.Fiber|fibers] as lightweight threads, and that's not wrong, but I think that actually overcomplicates them. I prefer to think of a fiber as a function that can pause itself. You call it like a function, you write it like a function, but instead of always completing and returning, it can [core.thread.fiber.Fiber.yield|yield], which is putting itself on pause and returning to the caller. The caller then has a chance to resume the function when it chooses to simply by [core.thread.fiber.Fiber.call|calling] it again, and it picks up where it left off, or the caller can [core.thread.fiber.Fiber.reset|reset] the fiber function to the beginning and start over. 85 86 Fiber-based async i/o thus isn't as complicated as it sounds. The basic idea is you just write an ordinary function in the same style as if you were doing linear, blocking i/o calls, but instead of actually blocking, you register a callback to be woken up when the call can succeed, then yield yourself. This callback you register is simply your own fiber resume method; the event loop picks up where you left off. 87 88 With Phobos sockets (and most Unix i/o functions), you then retry the operation that would have blocked and carry on because the callback is triggered when the operation is ready. If you're using another async system, like Windows' Overlapped I/O callbacks, it is actually even easier, since that callback happens when the operation has already completed. In those cases, you register the fiber's resume function as the event callback, then yield. When you wake up, you can immediately carry on. 89 90 When a fiber is woken up, it continues executing from the last `yield` call. Just think of `yield` as being a pause button you press. 91 92 Understanding how it works means you can translate any callback-based i/o system to use fibers, since it would always follow that same pattern: register the fiber resume method, then yield. If it is a callback when the operation is ready, try it again when you wake up (so right after yield, you can loop back to the call), or if it is a callback when the operation is complete, you can immediately use the result when you wake up (so right after yield, you use it). 93 94 How does the event loop work? How do you know what fiber runs next? See, this is where the "lightweight thread" explanation complicates things. With a thread, the operating system is responsible for scheduling them and might even run several simultaneously. Fibers are much simpler: again, think of them as just being a function that can pause itself. Like with an ordinary function, just one runs at a time (in your thread anyway, of course adding threads can complicate fibers like it can complicate any other function). Like with an ordinary function, YOU choose which one you want to call and when. And when a fiber `yield`s, it is very much like an ordinary function `return`ing - it passes control back to you, the caller. The only difference is the Fiber object remembers where the function was when it yielded, so you can ask it to pick up where it left off. 95 96 The event loop therefore doesn't look all that special. If you've used `Socket.select` before, you'll recognize most of it. (`select` can be tricky to use though, `epoll` based code is actually simpler and more efficient... but this module only wanted to use Phobos' std.socket on its own. Besides, `select` still isn't that complicated, is cross-platform, and performs well enough for most tasks anyway.) It has a list of active sockets that it adds to either a read or write set, it calls the select function, then it loops back over and handles the events, if set. The only special thing is the event handler resumes the fiber instead of some other action. 97 98 I encourage you to view the source of this file and try to follow along. It isn't terribly long and can hopefully help to introduce you to a new world of possibilities. You can use Fibers in other cases too, for example, the game I'm working on uses them in enemy scripts. It sets up their action, then yields and lets the player take their turn. When it is the computer's turn again, the script fiber resumes. Same principle, simple code once you get to know it. 99 100 $(H2 Limitations) 101 `Socket.select` has a limit on the number of pending sockets at any time, and since you have to loop through them each iteration, it can get slow with huge numbers of concurrent connections. I'd note that you probably will not see this problem, but it certainly can happen. Similarly, there's `new` allocations for each socket and virtual calls throughout, which, again, probably will be good enough for you, but this module is not C10K+ "web scale". 102 103 It also cannot be combined with other event loops in the same thread. But, since the [FiberManager] only uses the thread you give it, you might consider running it here and other things along side in their own threads. 104 105 Credits: 106 vibe.d is the first time I recall even hearing of fibers and is the direct inspiration for this. 107 108 History: 109 Written December 26, 2020. First included in arsd-official dub release 9.1. 110 111 License: 112 BSL-1.0, same as Phobos 113 +/ 114 module arsd.fibersocket; // previously known as "centivibe" since it provides like 1/100th the functionality of vibe.d 115 116 public import std.socket; 117 import core.thread.fiber; 118 119 /// just because I forget how to enable this, trivial helper function 120 void allowBroadcast(Socket socket) { 121 socket.setOption(SocketOptionLevel.SOCKET, SocketOption.BROADCAST, 1); 122 } 123 124 /// Convenience function to loop and send until it it all sent or an error occurs. 125 ptrdiff_t sendAll(Socket s, scope const(void)[] data) { 126 auto ol = data.length; 127 while(data.length) { 128 auto ret = s.send(data); 129 if(ret <= 0) 130 return ret; 131 data = data[ret .. $]; 132 } 133 return ol; 134 } 135 136 /++ 137 Subclass of Phobos' socket that basically works the same way, except it yields back to the [FiberManager] when it would have blocked. 138 139 You should not modify the `blocking` flag on these and generally not construct them, connect them, or listen on them yourself (let [FiberManager] do the setup for you), but otherwise they work the same as the original Phobos [std.socket.Socket] and implement the very same interface. You can call the exact same functions with original Sockets or FiberSockets. 140 +/ 141 class FiberSocket : Socket { 142 enum PendingOperation { 143 none, read, write 144 } 145 146 protected this(FiberManager fm) pure nothrow @safe { 147 this.fm = fm; 148 super(); 149 } 150 151 /// You should probably call the helper functions in [FiberManager] instead. 152 this(FiberManager fm, AddressFamily af, SocketType st, Fiber fiber) { 153 assert(fm !is null); 154 155 this.fm = fm; 156 this.fiber = fiber; 157 super(af, st); 158 this.blocking = false; 159 } 160 161 void callFiber() { 162 fiber.call(); 163 } 164 165 private FiberManager fm; 166 private Fiber fiber; 167 private PendingOperation pendingOperation; 168 169 private void queue(PendingOperation op) @trusted nothrow { 170 pendingOperation = op; 171 fm.pendingSockets ~= this; 172 fiber.yield(); 173 } 174 175 protected override Socket accepting() pure nothrow { 176 return new FiberSocket(fm); 177 } 178 179 private ptrdiff_t magic(scope ptrdiff_t delegate() @safe what, PendingOperation op) @trusted { 180 try_again: 181 auto r = what(); 182 if(r == -1 && wouldHaveBlocked()) { 183 queue(op); 184 goto try_again; 185 } 186 return r; 187 } 188 189 /// Yielding override of the Phobos interface 190 override ptrdiff_t send(scope const(void)[] buf, SocketFlags flags) { 191 return magic( () { return super.send(buf, flags); }, PendingOperation.write); 192 } 193 /// ditto 194 override ptrdiff_t receive(scope void[] buf, SocketFlags flags) { 195 return magic( () { return super.receive(buf, flags); }, PendingOperation.read); 196 } 197 198 /// ditto 199 override ptrdiff_t receiveFrom(scope void[] buf, SocketFlags flags, ref Address from) @trusted { 200 return magic( () { return super.receiveFrom(buf, flags, from); }, PendingOperation.read); 201 } 202 /// ditto 203 override ptrdiff_t receiveFrom(scope void[] buf, SocketFlags flags) @trusted { 204 return magic( () { return super.receiveFrom(buf, flags); }, PendingOperation.read); 205 } 206 /// ditto 207 override ptrdiff_t sendTo(scope const(void)[] buf, SocketFlags flags, Address to) @trusted { 208 return magic( () { return super.sendTo(buf, flags, to); }, PendingOperation.write); 209 } 210 /// ditto 211 override ptrdiff_t sendTo(scope const(void)[] buf, SocketFlags flags) @trusted { 212 return magic( () { return super.sendTo(buf, flags); }, PendingOperation.write); 213 } 214 215 // lol overload sets 216 /// The Phobos overloads are still available too, they forward to the overrides in this class and thus work the same way. 217 alias send = typeof(super).send; 218 /// ditto 219 alias receive = typeof(super).receive; 220 /// ditto 221 alias sendTo = typeof(super).sendTo; 222 /// ditto 223 alias receiveFrom = typeof(super).receiveFrom; 224 } 225 226 /++ 227 The FiberManager is responsible for running your socket event loop and dispatching events to your fibers. It is your main point of interaction with this library. 228 229 Generally, a `FiberManager` will exist in your `main` function and take over that thread when you call [run]. You construct one, set up your listeners, etc., then call `run` and let it do its thing. 230 +/ 231 class FiberManager { 232 private FiberSocket[] pendingSockets; 233 234 private size_t defaultFiberStackSize; 235 236 /++ 237 Params: 238 defaultFiberStackSize = size, in bytes, of the fiber stacks [makeFiber] returns. If 0 (the default), use the druntime default. 239 +/ 240 this(size_t defaultFiberStackSize = 0) { 241 this.defaultFiberStackSize = defaultFiberStackSize; 242 } 243 244 /++ 245 Convenience function to make a worker fiber based on the manager's configuration. 246 247 This is used internally when connections come in. 248 +/ 249 public Fiber makeFiber(void delegate() fn) { 250 return defaultFiberStackSize ? new Fiber(fn, defaultFiberStackSize) : new Fiber(fn); 251 } 252 253 /++ 254 Convenience functions for creating listening sockets. These are trivial forwarders to [listenStream], constructing the appropriate [std.socket.Address] object for you. Note the address lookup does NOT at this time use the fiber io and may thus block your thread. 255 256 You can `close` the returned socket when you want to stop listening, or just ignore it if you want to listen for the whole duration of the program. 257 +/ 258 final Socket listenTcp6(ushort port, void delegate(Socket) connectionHandler, int backlog = 8) { 259 return listenStream(new Internet6Address(port), connectionHandler, backlog); 260 } 261 262 /// ditto 263 final Socket listenTcp6(string address, ushort port, void delegate(Socket) connectionHandler, int backlog = 8) { 264 return listenStream(new Internet6Address(address, port), connectionHandler, backlog); 265 } 266 267 /// ditto 268 final Socket listenTcp4(ushort port, void delegate(Socket) connectionHandler, int backlog = 8) { 269 return listenStream(new InternetAddress(port), connectionHandler, backlog); 270 } 271 272 /// ditto 273 final Socket listenTcp4(string address, ushort port, void delegate(Socket) connectionHandler, int backlog = 8) { 274 return listenStream(new InternetAddress(address, port), connectionHandler, backlog); 275 } 276 277 /// ditto 278 version(Posix) 279 final Socket listenUnix(string path, void delegate(Socket) connectionHandler, int backlog = 8) { 280 return listenStream(new UnixAddress(path), connectionHandler, backlog); 281 } 282 283 /++ 284 Core listen function for streaming connection-oriented sockets (TCP, etc.) 285 286 287 It will: 288 289 $(LIST 290 * Create a [FiberSocket] 291 * Create fibers on it for each incoming connection which call your `connectionHandler` 292 * Bind to the given `Address` 293 * Call `socket.listen(backlog)` 294 * Start `accept`ing connections. 295 ) 296 297 Returns: the listening socket. You shouldn't do much with this except maybe `close` it when you are done. 298 +/ 299 Socket listenStream(Address addr, void delegate(Socket) connectionHandler, int backlog) { 300 assert(connectionHandler !is null, "null connectionHandler passed to a listenTcp function"); 301 302 FiberSocket socket; 303 304 socket = new FiberSocket(this, addr.addressFamily, SocketType.STREAM, makeFiber( 305 delegate() { 306 while(socket.isAlive()) { 307 socket.queue(FiberSocket.PendingOperation.read); // put fiber on hold until ready to accept 308 309 auto ns = cast(FiberSocket) socket.accept(); 310 ns.blocking = false; 311 ns.fiber = makeFiber(delegate() { 312 connectionHandler(ns); 313 }); 314 // need to get the new connection started 315 ns.fiber.call(); 316 } 317 } 318 )); 319 socket.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, true); 320 socket.bind(addr); 321 socket.blocking = false; 322 socket.listen(backlog); 323 324 socket.callFiber(); 325 326 return socket; 327 } 328 329 /++ 330 Convenience functions that forward to [connectStream] for the given protocol. They connect, send, and receive in an async manner, but do not create their own fibers - you must already be in one when you call this function. 331 332 333 Connections only work if you are already in a fiber. This is the case in a connectionHandler, but not from your main function. You'll have to make your own worker fiber. (But tbh if you only have one connection anyway, you might as well use a standard Socket.) 334 335 If you are already in a connection handler set in the listen family of functions, you're all set - those are automatically in fibers. If you are in main though, you need to make a worker fiber. 336 337 Making a worker fiber is simple enough. You can do it with `new Fiber` or with [FiberManager.makeFiber] (the latter just calls the former with a size argument set up in the FiberManager constructor). 338 339 --- 340 auto fm = new FiberManager(); 341 fm.makeFiber(() { 342 auto socket = fm.connectTcp4(...); 343 344 socket.send(...); 345 }).call(); // you must call it the first time yourself so it self-registers 346 --- 347 348 OR 349 350 --- 351 import core.thread.fiber; 352 353 auto fiber = new Fiber(() { 354 auto socket = fm.connectTcp4(...); 355 // do stuff in here 356 }).call(); // same deal, still need to call it the first time yourself to give it a chance to self-register 357 --- 358 +/ 359 final Socket connectTcp4(string address, ushort port) { 360 return connectStream(new InternetAddress(address, port)); 361 } 362 363 /// ditto 364 final Socket connectTcp6(string address, ushort port) { 365 return connectStream(new Internet6Address(address, port)); 366 } 367 368 /// ditto 369 version(Posix) 370 final Socket connectUnix(string path) { 371 return connectStream(new UnixAddress(path)); 372 } 373 374 /++ 375 Connects a streaming socket to the given address that will yield to this FiberManager instead of blocking. 376 377 +/ 378 Socket connectStream(Address address) { 379 assert(Fiber.getThis !is null, "connect functions can only be used from inside preexisting fibers"); 380 FiberSocket socket = new FiberSocket(this, address.addressFamily, SocketType.STREAM, Fiber.getThis); 381 socket.connect(address); 382 socket.queue(FiberSocket.PendingOperation.write); // wait for it to connect 383 scope(failure) 384 socket.close(); 385 // and ensure the connection was successful before proceeding 386 int result; 387 if(socket.getOption(SocketOptionLevel.SOCKET, SocketOption.ERROR, result) < 0) 388 throw new Exception("get socket error failed"); 389 if(result != 0) 390 throw new Exception("Connect failed"); 391 return socket; 392 } 393 394 /++ 395 These are convenience functions that forward to [bindDatagram]. 396 397 UDP sockets don't connect per se, but the basically work the same as [connectStream]. See the caveat about requiring a premade Fiber from that page. 398 +/ 399 Socket bindUdp4(string address, ushort port) { 400 return bindDatagram(new InternetAddress(address, port)); 401 } 402 /// ditto 403 Socket bindUdp4(ushort port) { 404 return bindDatagram(new InternetAddress(port)); 405 } 406 /// ditto 407 Socket bindUdp6(string address, ushort port) { 408 return bindDatagram(new Internet6Address(address, port)); 409 } 410 /// ditto 411 Socket bindUdp6(ushort port) { 412 return bindDatagram(new Internet6Address(port)); 413 } 414 415 /++ 416 Only valid from inside a worker fiber, see [makeFiber]. 417 418 --- 419 fm.makeFiber(() { 420 auto sock = fm.bindDatagram(new InternetAddress(5555)); 421 sock.receiveFrom(....); 422 }).call(); // remember to call it the first time or it will never start! 423 +/ 424 Socket bindDatagram(Address address) { 425 assert(Fiber.getThis !is null, "bind datagram functions can only be used from inside preexisting fibers"); 426 FiberSocket socket = new FiberSocket(this, address.addressFamily, SocketType.DGRAM, Fiber.getThis); 427 socket.bind(address); 428 return socket; 429 } 430 431 /++ 432 Runs the program and manages the fibers and connections for you, calling the appropriate functions when new events arrive. 433 434 Returns when no connections are left open. 435 +/ 436 void run() { 437 auto readSet = new SocketSet; 438 auto writeSet = new SocketSet; 439 while(true) { 440 readSet.reset(); 441 writeSet.reset(); 442 int added; 443 for(int idx = 0; idx < pendingSockets.length; idx++) { 444 auto pending = pendingSockets[idx]; 445 if(!pending.isAlive()) { 446 // order not important here since we haven't done any real work yet 447 // really it shouldn't even be on the list. 448 pendingSockets[idx] = pendingSockets[$-1]; 449 pendingSockets = pendingSockets[0 .. $-1]; 450 pendingSockets.assumeSafeAppend(); 451 idx--; 452 continue; 453 } 454 final switch(pending.pendingOperation) { 455 case FiberSocket.PendingOperation.none: 456 assert(0); // why is this object on this list?! 457 case FiberSocket.PendingOperation.write: 458 writeSet.add(pending); 459 added++; 460 break; 461 case FiberSocket.PendingOperation.read: 462 readSet.add(pending); 463 added++; 464 break; 465 } 466 } 467 if(added == 0) 468 return; // no work to do, all connections closed 469 auto eventCount = Socket.select(readSet, writeSet, null);//, 5.seconds); 470 if(eventCount == -1) 471 continue; 472 for(int idx = 0; idx < pendingSockets.length && eventCount > 0; idx++) { 473 auto pending = pendingSockets[idx]; 474 SocketSet toCheck; 475 final switch(pending.pendingOperation) { 476 case FiberSocket.PendingOperation.none: 477 break; 478 case FiberSocket.PendingOperation.write: 479 toCheck = writeSet; 480 break; 481 case FiberSocket.PendingOperation.read: 482 toCheck = readSet; 483 break; 484 } 485 if(toCheck is null) 486 continue; 487 488 if(toCheck.isSet(pending)) { 489 eventCount--; 490 import std.algorithm.mutation; 491 // the order is fairly important since previous calls can append to 492 // this again, and we want to be sure we process the ones in this batch 493 // before seeing anything from the next batch. 494 pendingSockets = remove!(SwapStrategy.stable)(pendingSockets, idx); 495 pendingSockets.assumeSafeAppend(); 496 idx--; // the slot we used to have is now different, so it needs to be reprocessed 497 pending.fiber.call(); 498 } 499 } 500 } 501 } 502 }