1 /++ 2 $(PITFALL 3 Please note: the api and behavior of this module is not externally stable at this time. See the documentation on specific functions for details. 4 ) 5 6 Shared core functionality including exception helpers, library loader, event loop, and possibly more. Maybe command line processor and uda helper and some basic shared annotation types. 7 8 I'll probably move the url, websocket, and ssl stuff in here too as they are often shared. Maybe a small internationalization helper type (a hook for external implementation) and COM helpers too. I might move the process helpers out to their own module - even things in here are not considered stable to library users at this time! 9 10 If you use this directly outside the arsd library despite its current instability caveats, you might consider using `static import` since names in here are likely to clash with Phobos if you use them together. `static import` will let you easily disambiguate and avoid name conflict errors if I add more here. Some names even clash deliberately to remind me to avoid some antipatterns inside the arsd modules! 11 12 ## Contributor notes 13 14 arsd.core should be focused on things that enable interoperability primarily and secondarily increased code quality between other, otherwise independent arsd modules. As a foundational library, it is not permitted to import anything outside the druntime `core` namespace, except in templates and examples not normally compiled in. This keeps it independent and avoids transitive dependency spillover to end users while also keeping compile speeds fast. To help keep builds snappy, also avoid significant use of ctfe inside this module. 15 16 On my linux computer, `dmd -unittest -main core.d` takes about a quarter second to run. We do not want this to grow. 17 18 `@safe` compatibility is ok when it isn't too big of a hassle. `@nogc` is a non-goal. I might accept it on some of the trivial functions but if it means changing the logic in any way to support, you will need a compelling argument to justify it. The arsd libs are supposed to be reliable and easy to use. That said, of course, don't be unnecessarily wasteful - if you can easily provide a reliable and easy to use way to let advanced users do their thing without hurting the other cases, let's discuss it. 19 20 If functionality is not needed by multiple existing arsd modules, consider adding a new module instead of adding it to the core. 21 22 Unittests should generally be hidden behind a special version guard so they don't interfere with end user tests. 23 24 History: 25 Added March 2023 (dub v11.0). Several functions were migrated in here at that time, noted individually. Members without a note were added with the module. 26 +/ 27 module arsd.core; 28 29 // FIXME: add callbacks on file open for tracing dependencies dynamically 30 31 // see for useful info: https://devblogs.microsoft.com/dotnet/how-async-await-really-works/ 32 33 // see: https://wiki.openssl.org/index.php/Simple_TLS_Server 34 35 import core.thread; 36 import core..volatile; 37 import core.atomic; 38 import core.time; 39 40 import core.stdc.errno; 41 42 import core.attribute; 43 static if(!__traits(hasMember, core.attribute, "mustuse")) 44 enum mustuse; 45 46 // FIXME: add an arena allocator? can do task local destruction maybe. 47 48 // the three implementations are windows, epoll, and kqueue 49 version(Windows) { 50 version=Arsd_core_windows; 51 52 // import core.sys.windows.windows; 53 import core.sys.windows.winbase; 54 import core.sys.windows.windef; 55 import core.sys.windows.winnls; 56 import core.sys.windows.winuser; 57 import core.sys.windows.winsock2; 58 59 pragma(lib, "user32"); 60 pragma(lib, "ws2_32"); 61 } else version(linux) { 62 version=Arsd_core_epoll; 63 64 static if(__VERSION__ >= 2098) { 65 version=Arsd_core_has_cloexec; 66 } 67 } else version(FreeBSD) { 68 version=Arsd_core_kqueue; 69 70 import core.sys.freebsd.sys.event; 71 } else version(DragonFlyBSD) { 72 // NOT ACTUALLY TESTED 73 version=Arsd_core_kqueue; 74 75 import core.sys.dragonflybsd.sys.event; 76 } else version(NetBSD) { 77 // NOT ACTUALLY TESTED 78 version=Arsd_core_kqueue; 79 80 import core.sys.netbsd.sys.event; 81 } else version(OpenBSD) { 82 version=Arsd_core_kqueue; 83 84 // THIS FILE DOESN'T ACTUALLY EXIST, WE NEED TO MAKE IT 85 import core.sys.openbsd.sys.event; 86 } else version(OSX) { 87 version=Arsd_core_kqueue; 88 89 import core.sys.darwin.sys.event; 90 } 91 92 version(Posix) { 93 import core.sys.posix.signal; 94 import core.sys.posix.unistd; 95 96 import core.sys.posix.sys.un; 97 import core.sys.posix.sys.socket; 98 import core.sys.posix.netinet.in_; 99 } 100 101 // FIXME: the exceptions should actually give some explanatory text too (at least sometimes) 102 103 /+ 104 ========================= 105 GENERAL UTILITY FUNCTIONS 106 ========================= 107 +/ 108 109 // enum stringz : const(char)* { init = null } 110 111 /++ 112 A wrapper around a `const(char)*` to indicate that it is a zero-terminated C string. 113 +/ 114 struct stringz { 115 private const(char)* raw; 116 117 /++ 118 Wraps the given pointer in the struct. Note that it retains a copy of the pointer. 119 +/ 120 this(const(char)* raw) { 121 this.raw = raw; 122 } 123 124 /++ 125 Returns the original raw pointer back out. 126 +/ 127 const(char)* ptr() const { 128 return raw; 129 } 130 131 /++ 132 Borrows a slice of the pointer up to (but not including) the zero terminator. 133 +/ 134 const(char)[] borrow() const { 135 if(raw is null) 136 return null; 137 138 const(char)* p = raw; 139 int length; 140 while(*p++) length++; 141 142 return raw[0 .. length]; 143 } 144 } 145 146 /++ 147 A limited variant to hold just a few types. It is made for the use of packing a small amount of extra data into error messages. 148 +/ 149 /+ 150 * if length and ptr are both 0, it is null 151 * if ptr == 1, length is an integer 152 * if ptr == 2, length is an unsigned integer (suggest printing in hex) 153 * if ptr == 3, length is a combination of flags (suggest printing in binary) 154 * if ptr == 4, length is a unix permission thing (suggest printing in octal) 155 * if ptr == 5, length is a double float 156 * if ptr == 15, length must be 0. this holds an empty, non-null, SSO string. 157 * if ptr >= 16 && < 24, length is reinterpret-casted a small string of length of (ptr & 0x7) + 1 158 * if length == size_t.max, ptr is interpreted as a stringz 159 * if ptr >= 1024, it is a non-null D string or byte array. It is a string if the length high bit is clear, a byte array if it is set. the length is what is left after you mask that out. 160 161 All other ptr values are reserved for future expansion. 162 +/ 163 struct LimitedVariant { 164 165 /++ 166 167 +/ 168 enum Contains { 169 null_, 170 intDecimal, 171 intHex, 172 intBinary, 173 intOctal, 174 double_, 175 emptySso, 176 stringSso, 177 stringz, 178 string, 179 bytes, 180 181 invalid, 182 } 183 184 /++ 185 186 +/ 187 Contains contains() const { 188 auto tag = cast(size_t) ptr; 189 if(ptr is null && length is null) 190 return Contains.null_; 191 else switch(tag) { 192 case 1: return Contains.intDecimal; 193 case 2: return Contains.intHex; 194 case 3: return Contains.intBinary; 195 case 4: return Contains.intOctal; 196 case 5: return Contains.double_; 197 case 15: return length is null ? Contains.emptySso : Contains.invalid; 198 default: 199 if(tag >= 16 && tag < 24) { 200 return Contains.stringSso; 201 } else if(tag >= 1024) { 202 if(cast(size_t) length == size_t.max) 203 return Contains.stringz; 204 else 205 return isHighBitSet ? Contains.bytes : Contains..string; 206 } else { 207 return Contains.invalid; 208 } 209 } 210 } 211 212 /// ditto 213 bool containsInt() const { 214 with(Contains) 215 switch(contains) { 216 case intDecimal, intHex, intBinary, intOctal: 217 return true; 218 default: 219 return false; 220 } 221 } 222 223 /// ditto 224 bool containsString() const { 225 with(Contains) 226 switch(contains) { 227 case null_, emptySso, stringSso, string: 228 // case stringz: 229 return true; 230 default: 231 return false; 232 } 233 } 234 235 /// ditto 236 bool containsDouble() const { 237 with(Contains) 238 switch(contains) { 239 case double_: 240 return true; 241 default: 242 return false; 243 } 244 } 245 246 /// ditto 247 bool containsBytes() const { 248 with(Contains) 249 switch(contains) { 250 case bytes, null_: 251 return true; 252 default: 253 return false; 254 } 255 } 256 257 private const(void)* length; 258 private const(ubyte)* ptr; 259 260 private void Throw() const { 261 throw ArsdException!"LimitedVariant"(cast(size_t) length, cast(size_t) ptr); 262 } 263 264 private bool isHighBitSet() const { 265 return (cast(size_t) length >> (size_t.sizeof * 8 - 1) & 0x1) != 0; 266 } 267 268 /++ 269 getString gets a reference to the string stored internally, see [toString] to get a string representation or whatever is inside. 270 271 +/ 272 const(char)[] getString() const return { 273 with(Contains) 274 switch(contains()) { 275 case null_: 276 return null; 277 case emptySso: 278 return (cast(const(char)*) ptr)[0 .. 0]; // zero length, non-null 279 case stringSso: 280 auto len = ((cast(size_t) ptr) & 0x7) + 1; 281 return (cast(char*) &length)[0 .. len]; 282 case string: 283 return (cast(const(char)*) ptr)[0 .. cast(size_t) length]; 284 default: 285 Throw(); assert(0); 286 } 287 } 288 289 /// ditto 290 long getInt() const { 291 if(containsInt) 292 return cast(long) length; 293 else 294 Throw(); 295 assert(0); 296 } 297 298 /// ditto 299 double getDouble() const { 300 if(containsDouble) 301 return *cast(double*) &length; 302 else 303 Throw(); 304 assert(0); 305 } 306 307 /// ditto 308 const(ubyte)[] getBytes() const { 309 with(Contains) 310 switch(contains()) { 311 case null_: 312 return null; 313 case bytes: 314 return ptr[0 .. (cast(size_t) length) & ((1UL << (size_t.sizeof * 8 - 1)) - 1)]; 315 default: 316 Throw(); assert(0); 317 } 318 } 319 320 /++ 321 322 +/ 323 string toString() const { 324 325 string intHelper(string prefix, int radix) { 326 char[128] buffer; 327 buffer[0 .. prefix.length] = prefix[]; 328 char[] toUse = buffer[prefix.length .. $]; 329 330 auto got = intToString(getInt(), toUse[], IntToStringArgs().withRadix(radix)); 331 332 return buffer[0 .. prefix.length + got.length].idup; 333 } 334 335 with(Contains) 336 final switch(contains()) { 337 case null_: 338 return "<null>"; 339 case intDecimal: 340 return intHelper("", 10); 341 case intHex: 342 return intHelper("0x", 16); 343 case intBinary: 344 return intHelper("0b", 2); 345 case intOctal: 346 return intHelper("0o", 8); 347 case emptySso, stringSso, string: 348 return getString().idup; 349 case bytes: 350 auto b = getBytes(); 351 352 return "<bytes>"; // FIXME 353 354 case double_: 355 assert(0); // FIXME 356 case stringz: 357 assert(0); // FIXME 358 case invalid: 359 return "<invalid>"; 360 } 361 } 362 363 /++ 364 365 +/ 366 this(string s) { 367 ptr = cast(const(ubyte)*) s.ptr; 368 length = cast(void*) s.length; 369 } 370 371 /// ditto 372 this(const(ubyte)[] b) { 373 ptr = cast(const(ubyte)*) b.ptr; 374 length = cast(void*) (b.length | (1UL << (size_t.sizeof * 8 - 1))); 375 } 376 377 /// ditto 378 this(long l, int base = 10) { 379 int tag; 380 switch(base) { 381 case 10: tag = 1; break; 382 case 16: tag = 2; break; 383 case 2: tag = 3; break; 384 case 8: tag = 4; break; 385 default: assert(0, "You passed an invalid base to LimitedVariant"); 386 } 387 ptr = cast(ubyte*) tag; 388 length = cast(void*) l; 389 } 390 391 /// ditto 392 version(none) 393 this(double d) { 394 // this crashes dmd! omg 395 assert(0); 396 // ptr = cast(ubyte*) 15; 397 // length = cast(void*) *cast(size_t*) &d; 398 } 399 } 400 401 unittest { 402 LimitedVariant v = LimitedVariant("foo"); 403 assert(v.containsString()); 404 assert(!v.containsInt()); 405 assert(v.getString() == "foo"); 406 407 LimitedVariant v2 = LimitedVariant(4); 408 assert(v2.containsInt()); 409 assert(!v2.containsString()); 410 assert(v2.getInt() == 4); 411 412 LimitedVariant v3 = LimitedVariant(cast(ubyte[]) [1, 2, 3]); 413 assert(v3.containsBytes()); 414 assert(!v3.containsString()); 415 assert(v3.getBytes() == [1, 2, 3]); 416 } 417 418 /++ 419 This is a dummy type to indicate the end of normal arguments and the beginning of the file/line inferred args. It is meant to ensure you don't accidentally send a string that is interpreted as a filename when it was meant to be a normal argument to the function and trigger the wrong overload. 420 +/ 421 struct ArgSentinel {} 422 423 /++ 424 A trivial wrapper around C's malloc that creates a D slice. It multiples n by T.sizeof and returns the slice of the pointer from 0 to n. 425 426 Please note that the ptr might be null - it is your responsibility to check that, same as normal malloc. Check `ret is null` specifically, since `ret.length` will always be `n`, even if the `malloc` failed. 427 428 Remember to `free` the returned pointer with `core.stdc.stdlib.free(ret.ptr);` 429 430 $(TIP 431 I strongly recommend you simply use the normal garbage collector unless you have a very specific reason not to. 432 ) 433 434 See_Also: 435 [mallocedStringz] 436 +/ 437 T[] mallocSlice(T)(size_t n) { 438 import c = core.stdc.stdlib; 439 440 return (cast(T*) c.malloc(n * T.sizeof))[0 .. n]; 441 } 442 443 /++ 444 Uses C's malloc to allocate a copy of `original` with an attached zero terminator. It may return a slice with a `null` pointer (but non-zero length!) if `malloc` fails and you are responsible for freeing the returned pointer with `core.stdc.stdlib.free(ret.ptr)`. 445 446 $(TIP 447 I strongly recommend you use [CharzBuffer] or Phobos' [std.string.toStringz] instead unless there's a special reason not to. 448 ) 449 450 See_Also: 451 [CharzBuffer] for a generally better alternative. You should only use `mallocedStringz` where `CharzBuffer` cannot be used (e.g. when druntime is not usable or you have no stack space for the temporary buffer). 452 453 [mallocSlice] is the function this function calls, so the notes in its documentation applies here too. 454 +/ 455 char[] mallocedStringz(in char[] original) { 456 auto slice = mallocSlice!char(original.length + 1); 457 if(slice is null) 458 return null; 459 slice[0 .. original.length] = original[]; 460 slice[original.length] = 0; 461 return slice; 462 } 463 464 /++ 465 Basically a `scope class` you can return from a function or embed in another aggregate. 466 +/ 467 struct OwnedClass(Class) { 468 ubyte[__traits(classInstanceSize, Class)] rawData; 469 470 static OwnedClass!Class defaultConstructed() { 471 OwnedClass!Class i = OwnedClass!Class.init; 472 i.initializeRawData(); 473 return i; 474 } 475 476 private void initializeRawData() @trusted { 477 if(!this) 478 rawData[] = cast(ubyte[]) typeid(Class).initializer[]; 479 } 480 481 this(T...)(T t) { 482 initializeRawData(); 483 rawInstance.__ctor(t); 484 } 485 486 bool opCast(T : bool)() @trusted { 487 return !(*(cast(void**) rawData.ptr) is null); 488 } 489 490 @disable this(); 491 @disable this(this); 492 493 Class rawInstance() return @trusted { 494 if(!this) 495 throw new Exception("null"); 496 return cast(Class) rawData.ptr; 497 } 498 499 alias rawInstance this; 500 501 ~this() @trusted { 502 if(this) 503 .destroy(rawInstance()); 504 } 505 } 506 507 508 509 version(Posix) 510 package(arsd) void makeNonBlocking(int fd) { 511 import core.sys.posix.fcntl; 512 auto flags = fcntl(fd, F_GETFL, 0); 513 if(flags == -1) 514 throw new ErrnoApiException("fcntl get", errno); 515 flags |= O_NONBLOCK; 516 auto s = fcntl(fd, F_SETFL, flags); 517 if(s == -1) 518 throw new ErrnoApiException("fcntl set", errno); 519 } 520 521 version(Posix) 522 package(arsd) void setCloExec(int fd) { 523 import core.sys.posix.fcntl; 524 auto flags = fcntl(fd, F_GETFD, 0); 525 if(flags == -1) 526 throw new ErrnoApiException("fcntl get", errno); 527 flags |= FD_CLOEXEC; 528 auto s = fcntl(fd, F_SETFD, flags); 529 if(s == -1) 530 throw new ErrnoApiException("fcntl set", errno); 531 } 532 533 534 /++ 535 A helper object for temporarily constructing a string appropriate for the Windows API from a D UTF-8 string. 536 537 538 It will use a small internal static buffer is possible, and allocate a new buffer if the string is too big. 539 540 History: 541 Moved from simpledisplay.d to core.d in March 2023 (dub v11.0). 542 +/ 543 version(Windows) 544 struct WCharzBuffer { 545 private wchar[] buffer; 546 private wchar[128] staticBuffer = void; 547 548 /// Length of the string, excluding the zero terminator. 549 size_t length() { 550 return buffer.length; 551 } 552 553 // Returns the pointer to the internal buffer. You must assume its lifetime is less than that of the WCharzBuffer. It is zero-terminated. 554 wchar* ptr() { 555 return buffer.ptr; 556 } 557 558 /// Returns the slice of the internal buffer, excluding the zero terminator (though there is one present right off the end of the slice). You must assume its lifetime is less than that of the WCharzBuffer. 559 wchar[] slice() { 560 return buffer; 561 } 562 563 /// Copies it into a static array of wchars 564 void copyInto(R)(ref R r) { 565 static if(is(R == wchar[N], size_t N)) { 566 r[0 .. this.length] = slice[]; 567 r[this.length] = 0; 568 } else static assert(0, "can only copy into wchar[n], not " ~ R.stringof); 569 } 570 571 /++ 572 conversionFlags = [WindowsStringConversionFlags] 573 +/ 574 this(in char[] data, int conversionFlags = 0) { 575 conversionFlags |= WindowsStringConversionFlags.zeroTerminate; // this ALWAYS zero terminates cuz of its name 576 auto sz = sizeOfConvertedWstring(data, conversionFlags); 577 if(sz > staticBuffer.length) 578 buffer = new wchar[](sz); 579 else 580 buffer = staticBuffer[]; 581 582 buffer = makeWindowsString(data, buffer, conversionFlags); 583 } 584 } 585 586 /++ 587 Alternative for toStringz 588 589 History: 590 Added March 18, 2023 (dub v11.0) 591 +/ 592 struct CharzBuffer { 593 private char[] buffer; 594 private char[128] staticBuffer = void; 595 596 /// Length of the string, excluding the zero terminator. 597 size_t length() { 598 assert(buffer.length > 0); 599 return buffer.length - 1; 600 } 601 602 // Returns the pointer to the internal buffer. You must assume its lifetime is less than that of the CharzBuffer. It is zero-terminated. 603 char* ptr() { 604 return buffer.ptr; 605 } 606 607 /// Returns the slice of the internal buffer, excluding the zero terminator (though there is one present right off the end of the slice). You must assume its lifetime is less than that of the CharzBuffer. 608 char[] slice() { 609 assert(buffer.length > 0); 610 return buffer[0 .. $-1]; 611 } 612 613 /// Copies it into a static array of chars 614 void copyInto(R)(ref R r) { 615 static if(is(R == char[N], size_t N)) { 616 r[0 .. this.length] = slice[]; 617 r[this.length] = 0; 618 } else static assert(0, "can only copy into char[n], not " ~ R.stringof); 619 } 620 621 @disable this(); 622 @disable this(this); 623 624 /++ 625 Copies `data` into the CharzBuffer, allocating a new one if needed, and zero-terminates it. 626 +/ 627 this(in char[] data) { 628 if(data.length + 1 > staticBuffer.length) 629 buffer = new char[](data.length + 1); 630 else 631 buffer = staticBuffer[]; 632 633 buffer[0 .. data.length] = data[]; 634 buffer[data.length] = 0; 635 } 636 } 637 638 /++ 639 Given the string `str`, converts it to a string compatible with the Windows API and puts the result in `buffer`, returning the slice of `buffer` actually used. `buffer` must be at least [sizeOfConvertedWstring] elements long. 640 641 History: 642 Moved from simpledisplay.d to core.d in March 2023 (dub v11.0). 643 +/ 644 version(Windows) 645 wchar[] makeWindowsString(in char[] str, wchar[] buffer, int conversionFlags = WindowsStringConversionFlags.zeroTerminate) { 646 if(str.length == 0) 647 return null; 648 649 int pos = 0; 650 dchar last; 651 foreach(dchar c; str) { 652 if(c <= 0xFFFF) { 653 if((conversionFlags & WindowsStringConversionFlags.convertNewLines) && c == 10 && last != 13) 654 buffer[pos++] = 13; 655 buffer[pos++] = cast(wchar) c; 656 } else if(c <= 0x10FFFF) { 657 buffer[pos++] = cast(wchar)((((c - 0x10000) >> 10) & 0x3FF) + 0xD800); 658 buffer[pos++] = cast(wchar)(((c - 0x10000) & 0x3FF) + 0xDC00); 659 } 660 661 last = c; 662 } 663 664 if(conversionFlags & WindowsStringConversionFlags.zeroTerminate) { 665 buffer[pos] = 0; 666 } 667 668 return buffer[0 .. pos]; 669 } 670 671 /++ 672 Converts the Windows API string `str` to a D UTF-8 string, storing it in `buffer`. Returns the slice of `buffer` actually used. 673 674 History: 675 Moved from simpledisplay.d to core.d in March 2023 (dub v11.0). 676 +/ 677 version(Windows) 678 char[] makeUtf8StringFromWindowsString(in wchar[] str, char[] buffer) { 679 if(str.length == 0) 680 return null; 681 682 auto got = WideCharToMultiByte(CP_UTF8, 0, str.ptr, cast(int) str.length, buffer.ptr, cast(int) buffer.length, null, null); 683 if(got == 0) { 684 if(GetLastError() == ERROR_INSUFFICIENT_BUFFER) 685 throw new object.Exception("not enough buffer"); 686 else 687 throw new object.Exception("conversion"); // FIXME: GetLastError 688 } 689 return buffer[0 .. got]; 690 } 691 692 /++ 693 Converts the Windows API string `str` to a newly-allocated D UTF-8 string. 694 695 History: 696 Moved from simpledisplay.d to core.d in March 2023 (dub v11.0). 697 +/ 698 version(Windows) 699 string makeUtf8StringFromWindowsString(in wchar[] str) { 700 char[] buffer; 701 auto got = WideCharToMultiByte(CP_UTF8, 0, str.ptr, cast(int) str.length, null, 0, null, null); 702 buffer.length = got; 703 704 // it is unique because we just allocated it above! 705 return cast(string) makeUtf8StringFromWindowsString(str, buffer); 706 } 707 708 /// ditto 709 version(Windows) 710 string makeUtf8StringFromWindowsString(wchar* str) { 711 char[] buffer; 712 auto got = WideCharToMultiByte(CP_UTF8, 0, str, -1, null, 0, null, null); 713 buffer.length = got; 714 715 got = WideCharToMultiByte(CP_UTF8, 0, str, -1, buffer.ptr, cast(int) buffer.length, null, null); 716 if(got == 0) { 717 if(GetLastError() == ERROR_INSUFFICIENT_BUFFER) 718 throw new object.Exception("not enough buffer"); 719 else 720 throw new object.Exception("conversion"); // FIXME: GetLastError 721 } 722 return cast(string) buffer[0 .. got]; 723 } 724 725 // only used from minigui rn 726 package int findIndexOfZero(in wchar[] str) { 727 foreach(idx, wchar ch; str) 728 if(ch == 0) 729 return cast(int) idx; 730 return cast(int) str.length; 731 } 732 package int findIndexOfZero(in char[] str) { 733 foreach(idx, char ch; str) 734 if(ch == 0) 735 return cast(int) idx; 736 return cast(int) str.length; 737 } 738 739 /++ 740 Returns a minimum buffer length to hold the string `s` with the given conversions. It might be slightly larger than necessary, but is guaranteed to be big enough to hold it. 741 742 History: 743 Moved from simpledisplay.d to core.d in March 2023 (dub v11.0). 744 +/ 745 version(Windows) 746 int sizeOfConvertedWstring(in char[] s, int conversionFlags) { 747 int size = 0; 748 749 if(conversionFlags & WindowsStringConversionFlags.convertNewLines) { 750 // need to convert line endings, which means the length will get bigger. 751 752 // BTW I betcha this could be faster with some simd stuff. 753 char last; 754 foreach(char ch; s) { 755 if(ch == 10 && last != 13) 756 size++; // will add a 13 before it... 757 size++; 758 last = ch; 759 } 760 } else { 761 // no conversion necessary, just estimate based on length 762 /* 763 I don't think there's any string with a longer length 764 in code units when encoded in UTF-16 than it has in UTF-8. 765 This will probably over allocate, but that's OK. 766 */ 767 size = cast(int) s.length; 768 } 769 770 if(conversionFlags & WindowsStringConversionFlags.zeroTerminate) 771 size++; 772 773 return size; 774 } 775 776 /++ 777 Used by [makeWindowsString] and [WCharzBuffer] 778 779 History: 780 Moved from simpledisplay.d to core.d in March 2023 (dub v11.0). 781 +/ 782 version(Windows) 783 enum WindowsStringConversionFlags : int { 784 /++ 785 Append a zero terminator to the string. 786 +/ 787 zeroTerminate = 1, 788 /++ 789 Converts newlines from \n to \r\n. 790 +/ 791 convertNewLines = 2, 792 } 793 794 /++ 795 An int printing function that doesn't need to import Phobos. Can do some of the things std.conv.to and std.format.format do. 796 797 The buffer must be sized to hold the converted number. 32 chars is enough for most anything. 798 799 Returns: the slice of `buffer` containing the converted number. 800 +/ 801 char[] intToString(long value, char[] buffer, IntToStringArgs args = IntToStringArgs.init) { 802 const int radix = args.radix ? args.radix : 10; 803 const int digitsPad = args.padTo; 804 const int groupSize = args.groupSize; 805 806 int pos; 807 808 if(value < 0) { 809 buffer[pos++] = '-'; 810 value = -value; 811 } 812 813 int start = pos; 814 int digitCount; 815 816 do { 817 auto remainder = value % radix; 818 value = value / radix; 819 820 buffer[pos++] = cast(char) (remainder < 10 ? (remainder + '0') : (remainder - 10 + args.ten)); 821 digitCount++; 822 } while(value); 823 824 if(digitsPad > 0) { 825 while(digitCount < digitsPad) { 826 buffer[pos++] = args.padWith; 827 digitCount++; 828 } 829 } 830 831 assert(pos >= 1); 832 assert(pos - start > 0); 833 834 auto reverseSlice = buffer[start .. pos]; 835 for(int i = 0; i < reverseSlice.length / 2; i++) { 836 auto paired = cast(int) reverseSlice.length - i - 1; 837 char tmp = reverseSlice[i]; 838 reverseSlice[i] = reverseSlice[paired]; 839 reverseSlice[paired] = tmp; 840 } 841 842 return buffer[0 .. pos]; 843 } 844 845 /// ditto 846 struct IntToStringArgs { 847 private { 848 ubyte padTo; 849 char padWith; 850 ubyte radix; 851 char ten; 852 ubyte groupSize; 853 char separator; 854 } 855 856 IntToStringArgs withPadding(int padTo, char padWith = '0') { 857 IntToStringArgs args = this; 858 args.padTo = cast(ubyte) padTo; 859 args.padWith = padWith; 860 return args; 861 } 862 863 IntToStringArgs withRadix(int radix, char ten = 'a') { 864 IntToStringArgs args = this; 865 args.radix = cast(ubyte) radix; 866 args.ten = ten; 867 return args; 868 } 869 870 IntToStringArgs withGroupSeparator(int groupSize, char separator = '_') { 871 IntToStringArgs args = this; 872 args.groupSize = cast(ubyte) groupSize; 873 args.separator = separator; 874 return args; 875 } 876 } 877 878 unittest { 879 char[32] buffer; 880 assert(intToString(0, buffer[]) == "0"); 881 assert(intToString(-1, buffer[]) == "-1"); 882 assert(intToString(-132, buffer[]) == "-132"); 883 assert(intToString(-1932, buffer[]) == "-1932"); 884 assert(intToString(1, buffer[]) == "1"); 885 assert(intToString(132, buffer[]) == "132"); 886 assert(intToString(1932, buffer[]) == "1932"); 887 888 assert(intToString(0x1, buffer[], IntToStringArgs().withRadix(16)) == "1"); 889 assert(intToString(0x1b, buffer[], IntToStringArgs().withRadix(16)) == "1b"); 890 assert(intToString(0xef1, buffer[], IntToStringArgs().withRadix(16)) == "ef1"); 891 892 assert(intToString(0xef1, buffer[], IntToStringArgs().withRadix(16).withPadding(8)) == "00000ef1"); 893 assert(intToString(-0xef1, buffer[], IntToStringArgs().withRadix(16).withPadding(8)) == "-00000ef1"); 894 assert(intToString(-0xef1, buffer[], IntToStringArgs().withRadix(16, 'A').withPadding(8, ' ')) == "- EF1"); 895 } 896 897 /++ 898 History: 899 Moved from color.d to core.d in March 2023 (dub v11.0). 900 +/ 901 nothrow @safe @nogc pure 902 inout(char)[] stripInternal(return inout(char)[] s) { 903 bool isAllWhitespace = true; 904 foreach(i, char c; s) 905 if(c != ' ' && c != '\t' && c != '\n' && c != '\r') { 906 s = s[i .. $]; 907 isAllWhitespace = false; 908 break; 909 } 910 911 if(isAllWhitespace) 912 return s[$..$]; 913 914 for(int a = cast(int)(s.length - 1); a > 0; a--) { 915 char c = s[a]; 916 if(c != ' ' && c != '\t' && c != '\n' && c != '\r') { 917 s = s[0 .. a + 1]; 918 break; 919 } 920 } 921 922 return s; 923 } 924 925 /// ditto 926 nothrow @safe @nogc pure 927 inout(char)[] stripRightInternal(return inout(char)[] s) { 928 bool isAllWhitespace = true; 929 foreach_reverse(a, c; s) { 930 if(c != ' ' && c != '\t' && c != '\n' && c != '\r') { 931 s = s[0 .. a + 1]; 932 isAllWhitespace = false; 933 break; 934 } 935 } 936 if(isAllWhitespace) 937 s = s[0..0]; 938 939 return s; 940 941 } 942 943 /++ 944 Shortcut for converting some types to string without invoking Phobos (but it will as a last resort). 945 946 History: 947 Moved from color.d to core.d in March 2023 (dub v11.0). 948 +/ 949 string toStringInternal(T)(T t) { 950 char[32] buffer; 951 static if(is(T : string)) 952 return t; 953 else static if(is(T : long)) 954 return intToString(t, buffer[]).idup; 955 else static if(is(T == enum)) { 956 switch(t) { 957 foreach(memberName; __traits(allMembers, T)) { 958 case __traits(getMember, T, memberName): 959 return memberName; 960 } 961 default: 962 return "<unknown>"; 963 } 964 } else { 965 import std.conv; 966 return to!string(t); 967 } 968 } 969 970 /++ 971 972 +/ 973 string flagsToString(Flags)(ulong value) { 974 string r; 975 976 void add(string memberName) { 977 if(r.length) 978 r ~= " | "; 979 r ~= memberName; 980 } 981 982 string none = "<none>"; 983 984 foreach(memberName; __traits(allMembers, Flags)) { 985 auto flag = cast(ulong) __traits(getMember, Flags, memberName); 986 if(flag) { 987 if((value & flag) == flag) 988 add(memberName); 989 } else { 990 none = memberName; 991 } 992 } 993 994 if(r.length == 0) 995 r = none; 996 997 return r; 998 } 999 1000 unittest { 1001 enum MyFlags { 1002 none = 0, 1003 a = 1, 1004 b = 2 1005 } 1006 1007 assert(flagsToString!MyFlags(3) == "a | b"); 1008 assert(flagsToString!MyFlags(0) == "none"); 1009 assert(flagsToString!MyFlags(2) == "b"); 1010 } 1011 1012 /++ 1013 This populates a struct from a list of values (or other expressions, but it only looks at the values) based on types of the members, with one exception: `bool` members.. maybe. 1014 1015 It is intended for collecting a record of relevant UDAs off a symbol in a single call like this: 1016 1017 --- 1018 struct Name { 1019 string n; 1020 } 1021 1022 struct Validator { 1023 string regex; 1024 } 1025 1026 struct FormInfo { 1027 Name name; 1028 Validator validator; 1029 } 1030 1031 @Name("foo") @Validator(".*") 1032 void foo() {} 1033 1034 auto info = populateFromUdas!(FormInfo, __traits(getAttributes, foo)); 1035 assert(info.name == Name("foo")); 1036 assert(info.validator == Validator(".*")); 1037 --- 1038 1039 Note that instead of UDAs, you can also pass a variadic argument list and get the same result, but the function is `populateFromArgs` and you pass them as the runtime list to bypass "args cannot be evaluated at compile time" errors: 1040 1041 --- 1042 void foo(T...)(T t) { 1043 auto info = populateFromArgs!(FormInfo)(t); 1044 // assuming the call below 1045 assert(info.name == Name("foo")); 1046 assert(info.validator == Validator(".*")); 1047 } 1048 1049 foo(Name("foo"), Validator(".*")); 1050 --- 1051 1052 The benefit of this over constructing the struct directly is that the arguments can be reordered or missing. Its value is diminished with named arguments in the language. 1053 +/ 1054 template populateFromUdas(Struct, UDAs...) { 1055 enum Struct populateFromUdas = () { 1056 Struct ret; 1057 foreach(memberName; __traits(allMembers, Struct)) { 1058 alias memberType = typeof(__traits(getMember, Struct, memberName)); 1059 foreach(uda; UDAs) { 1060 static if(is(memberType == PresenceOf!a, a)) { 1061 static if(__traits(isSame, a, uda)) 1062 __traits(getMember, ret, memberName) = true; 1063 } 1064 else 1065 static if(is(typeof(uda) : memberType)) { 1066 __traits(getMember, ret, memberName) = uda; 1067 } 1068 } 1069 } 1070 1071 return ret; 1072 }(); 1073 } 1074 1075 /// ditto 1076 Struct populateFromArgs(Struct, Args...)(Args args) { 1077 Struct ret; 1078 foreach(memberName; __traits(allMembers, Struct)) { 1079 alias memberType = typeof(__traits(getMember, Struct, memberName)); 1080 foreach(arg; args) { 1081 static if(is(typeof(arg == memberType))) { 1082 __traits(getMember, ret, memberName) = arg; 1083 } 1084 } 1085 } 1086 1087 return ret; 1088 } 1089 1090 /// ditto 1091 struct PresenceOf(alias a) { 1092 bool there; 1093 alias there this; 1094 } 1095 1096 /// 1097 unittest { 1098 enum a; 1099 enum b; 1100 struct Name { string name; } 1101 struct Info { 1102 Name n; 1103 PresenceOf!a athere; 1104 PresenceOf!b bthere; 1105 int c; 1106 } 1107 1108 void test() @a @Name("test") {} 1109 1110 auto info = populateFromUdas!(Info, __traits(getAttributes, test)); 1111 assert(info.n == Name("test")); // but present ones are in there 1112 assert(info.athere == true); // non-values can be tested with PresenceOf!it, which works like a bool 1113 assert(info.bthere == false); 1114 assert(info.c == 0); // absent thing will keep the default value 1115 } 1116 1117 /++ 1118 Declares a delegate property with several setters to allow for handlers that don't care about the arguments. 1119 1120 Throughout the arsd library, you will often see types of these to indicate that you can set listeners with or without arguments. If you care about the details of the callback event, you can set a delegate that declares them. And if you don't, you can set one that doesn't even declare them and it will be ignored. 1121 +/ 1122 struct FlexibleDelegate(DelegateType) { 1123 // please note that Parameters and ReturnType are public now! 1124 static if(is(DelegateType FunctionType == delegate)) 1125 static if(is(FunctionType Parameters == __parameters)) 1126 static if(is(DelegateType ReturnType == return)) { 1127 1128 /++ 1129 Calls the currently set delegate. 1130 1131 Diagnostics: 1132 If the callback delegate has not been set, this may cause a null pointer dereference. 1133 +/ 1134 ReturnType opCall(Parameters args) { 1135 return dg(args); 1136 } 1137 1138 /++ 1139 Use `if(thing)` to check if the delegate is null or not. 1140 +/ 1141 bool opCast(T : bool)() { 1142 return dg !is null; 1143 } 1144 1145 /++ 1146 These opAssign overloads are what puts the flexibility in the flexible delegate. 1147 1148 Bugs: 1149 The other overloads do not keep attributes like `nothrow` on the `dg` parameter, making them unusable if `DelegateType` requires them. I consider the attributes more trouble than they're worth anyway, and the language's poor support for composing them doesn't help any. I have no need for them and thus no plans to add them in the overloads at this time. 1150 +/ 1151 void opAssign(DelegateType dg) { 1152 this.dg = dg; 1153 } 1154 1155 /// ditto 1156 void opAssign(ReturnType delegate() dg) { 1157 this.dg = (Parameters ignored) => dg(); 1158 } 1159 1160 /// ditto 1161 void opAssign(ReturnType function(Parameters params) dg) { 1162 this.dg = (Parameters params) => dg(params); 1163 } 1164 1165 /// ditto 1166 void opAssign(ReturnType function() dg) { 1167 this.dg = (Parameters ignored) => dg(); 1168 } 1169 1170 /// ditto 1171 void opAssign(typeof(null) explicitNull) { 1172 this.dg = null; 1173 } 1174 1175 private DelegateType dg; 1176 } 1177 else static assert(0, DelegateType.stringof ~ " failed return value check"); 1178 else static assert(0, DelegateType.stringof ~ " failed parameters check"); 1179 else static assert(0, DelegateType.stringof ~ " failed delegate check"); 1180 } 1181 1182 /++ 1183 1184 +/ 1185 unittest { 1186 // you don't have to put the arguments in a struct, but i recommend 1187 // you do as it is more future proof - you can add more info to the 1188 // struct without breaking user code that consumes it. 1189 struct MyEventArguments { 1190 1191 } 1192 1193 // then you declare it just adding FlexibleDelegate!() around the 1194 // plain delegate type you'd normally use 1195 FlexibleDelegate!(void delegate(MyEventArguments args)) callback; 1196 1197 // until you set it, it will be null and thus be false in any boolean check 1198 assert(!callback); 1199 1200 // can set it to the properly typed thing 1201 callback = delegate(MyEventArguments args) {}; 1202 1203 // and now it is no longer null 1204 assert(callback); 1205 1206 // or if you don't care about the args, you can leave them off 1207 callback = () {}; 1208 1209 // and it works if the compiler types you as a function instead of delegate too 1210 // (which happens automatically if you don't access any local state or if you 1211 // explicitly define it as a function) 1212 1213 callback = function(MyEventArguments args) { }; 1214 1215 // can set it back to null explicitly if you ever wanted 1216 callback = null; 1217 1218 // the reflection info used internally also happens to be exposed publicly 1219 // which can actually sometimes be nice so if the language changes, i'll change 1220 // the code to keep this working. 1221 static assert(is(callback.ReturnType == void)); 1222 1223 // which can be convenient if the params is an annoying type since you can 1224 // consistently use something like this too 1225 callback = (callback.Parameters params) {}; 1226 1227 // check for null and call it pretty normally 1228 if(callback) 1229 callback(MyEventArguments()); 1230 } 1231 1232 /+ 1233 ====================== 1234 ERROR HANDLING HELPERS 1235 ====================== 1236 +/ 1237 1238 /+ + 1239 arsd code shouldn't be using Exception. Really, I don't think any code should be - instead, construct an appropriate object with structured information. 1240 1241 If you want to catch someone else's Exception, use `catch(object.Exception e)`. 1242 +/ 1243 //package deprecated struct Exception {} 1244 1245 1246 /++ 1247 Base class representing my exceptions. You should almost never work with this directly, but you might catch it as a generic thing. Catch it before generic `object.Exception` or `object.Throwable` in any catch chains. 1248 1249 1250 $(H3 General guidelines for exceptions) 1251 1252 The purpose of an exception is to cancel a task that has proven to be impossible and give the programmer enough information to use at a higher level to decide what to do about it. 1253 1254 Cancelling a task is accomplished with the `throw` keyword. The transmission of information to a higher level is done by the language runtime. The decision point is marked by the `catch` keyword. The part missing - the job of the `Exception` class you construct and throw - is to gather the information that will be useful at a later decision point. 1255 1256 It is thus important that you gather as much useful information as possible and keep it in a way that the code catching the exception can still interpret it when constructing an exception. Other concerns are secondary to this to this primary goal. 1257 1258 With this in mind, here's some guidelines for exception handling in arsd code. 1259 1260 $(H4 Allocations and lifetimes) 1261 1262 Don't get clever with exception allocations. You don't know what the catcher is going to do with an exception and you don't want the error handling scheme to introduce its own tricky bugs. Remember, an exception object's first job is to deliver useful information up the call chain in a way this code can use it. You don't know what this code is or what it is going to do. 1263 1264 Keep your memory management schemes simple and let the garbage collector do its job. 1265 1266 $(LIST 1267 * All thrown exceptions should be allocated with the `new` keyword. 1268 1269 * Members inside the exception should be value types or have infinite lifetime (that is, be GC managed). 1270 1271 * While this document is concerned with throwing, you might want to add additional information to an in-flight exception, and this is done by catching, so you need to know how that works too, and there is a global compiler switch that can change things, so even inside arsd we can't completely avoid its implications. 1272 1273 DIP1008's presence complicates things a bit on the catch side - if you catch an exception and return it from a function, remember to `ex.refcount = ex.refcount + 1;` so you don't introduce more use-after-free woes for those unfortunate souls. 1274 ) 1275 1276 $(H4 Error strings) 1277 1278 Strings can deliver useful information to people reading the message, but are often suboptimal for delivering useful information to other chunks of code. Remember, an exception's first job is to be caught by another block of code. Printing to users is a last resort; even if you want a user-readable error message, an exception is not the ideal way to deliver one since it is constructed in the guts of a failed task, without the higher level context of what the user was actually trying to do. User error messages ought to be made from information in the exception, combined with higher level knowledge. This is best done in a `catch` block, not a `throw` statement. 1279 1280 As such, I recommend that you: 1281 1282 $(LIST 1283 * Don't concatenate error strings at the throw site. Instead, pass the data you would have used to build the string as actual data to the constructor. This lets catchers see the original data without having to try to extract it from a string. For unique data, you will likely need a unique exception type. More on this in the next section. 1284 1285 * Don't construct error strings in a constructor either, for the same reason. Pass the useful data up the call chain, as exception members, to the maximum extent possible. Exception: if you are passed some data with a temporary lifetime that is important enough to pass up the chain. You may `.idup` or `to!string` to preserve as much data as you can before it is lost, but still store it in a separate member of the Exception subclass object. 1286 1287 * $(I Do) construct strings out of public members in [getAdditionalPrintableInformation]. When this is called, the user has requested as much relevant information as reasonable in string format. Still, avoid concatenation - it lets you pass as many key/value pairs as you like to the caller. They can concatenate as needed. However, note the words "public members" - everything you do in `getAdditionalPrintableInformation` ought to also be possible for code that caught your exception via your public methods and properties. 1288 ) 1289 1290 $(H4 Subclasses) 1291 1292 Any exception with unique data types should be a unique class. Whenever practical, this should be one you write and document at the top-level of a module. But I know we get lazy - me too - and this is why in standard D we'd often fall back to `throw new Exception("some string " ~ some info)`. To help resist these urges, I offer some helper functions to use instead that better achieve the key goal of exceptions - passing structured data up a call chain - while still being convenient to write. 1293 1294 See: [ArsdException], [Win32Enforce] 1295 1296 +/ 1297 class ArsdExceptionBase : object.Exception { 1298 /++ 1299 Don't call this except from other exceptions; this is essentially an abstract class. 1300 1301 Params: 1302 operation = the specific operation that failed, throwing the exception 1303 +/ 1304 package this(string operation, string file = __FILE__, size_t line = __LINE__, Throwable next = null) { 1305 super(operation, file, line, next); 1306 } 1307 1308 /++ 1309 The toString method will print out several components: 1310 1311 $(LIST 1312 * The file, line, static message, and object class name from the constructor. You can access these independently with the members `file`, `line`, `msg`, and [printableExceptionName]. 1313 * The generic category codes stored with this exception 1314 * Additional members stored with the exception child classes (e.g. platform error codes, associated function arguments) 1315 * The stack trace associated with the exception. You can access these lines independently with `foreach` over the `info` member. 1316 ) 1317 1318 This is meant to be read by the developer, not end users. You should wrap your user-relevant tasks in a try/catch block and construct more appropriate error messages from context available there, using the individual properties of the exception to add richness. 1319 +/ 1320 final override void toString(scope void delegate(in char[]) sink) const { 1321 // class name and info from constructor 1322 sink(printableExceptionName); 1323 sink("@"); 1324 sink(file); 1325 sink("("); 1326 char[16] buffer; 1327 sink(intToString(line, buffer[])); 1328 sink("): "); 1329 sink(message); 1330 1331 getAdditionalPrintableInformation((string name, in char[] value) { 1332 sink("\n"); 1333 sink(name); 1334 sink(": "); 1335 sink(value); 1336 }); 1337 1338 // full stack trace 1339 sink("\n----------------\n"); 1340 foreach(str; info) { 1341 sink(str); 1342 sink("\n"); 1343 } 1344 } 1345 /// ditto 1346 final override string toString() { 1347 string s; 1348 toString((in char[] chunk) { s ~= chunk; }); 1349 return s; 1350 } 1351 1352 /++ 1353 Users might like to see additional information with the exception. API consumers should pull this out of properties on your child class, but the parent class might not be able to deal with the arbitrary types at runtime the children can introduce, so bringing them all down to strings simplifies that. 1354 1355 Overrides should always call `super.getAdditionalPrintableInformation(sink);` before adding additional information by calling the sink with other arguments afterward. 1356 1357 You should spare no expense in preparing this information - translate error codes, build rich strings, whatever it takes - to make the information here useful to the reader. 1358 +/ 1359 void getAdditionalPrintableInformation(scope void delegate(string name, in char[] value) sink) const { 1360 1361 } 1362 1363 /++ 1364 This is the name of the exception class, suitable for printing. This should be static data (e.g. a string literal). Override it in subclasses. 1365 +/ 1366 string printableExceptionName() const { 1367 return typeid(this).name; 1368 } 1369 1370 /// deliberately hiding `Throwable.msg`. Use [message] and [toString] instead. 1371 @disable final void msg() {} 1372 1373 override const(char)[] message() const { 1374 return super.msg; 1375 } 1376 } 1377 1378 /++ 1379 1380 +/ 1381 class InvalidArgumentsException : ArsdExceptionBase { 1382 static struct InvalidArgument { 1383 string name; 1384 string description; 1385 LimitedVariant givenValue; 1386 } 1387 1388 InvalidArgument[] invalidArguments; 1389 1390 this(InvalidArgument[] invalidArguments, string functionName = __PRETTY_FUNCTION__, string file = __FILE__, size_t line = __LINE__, Throwable next = null) { 1391 this.invalidArguments = invalidArguments; 1392 super(functionName, file, line, next); 1393 } 1394 1395 this(string argumentName, string argumentDescription, LimitedVariant givenArgumentValue = LimitedVariant.init, string functionName = __PRETTY_FUNCTION__, string file = __FILE__, size_t line = __LINE__, Throwable next = null) { 1396 this([ 1397 InvalidArgument(argumentName, argumentDescription, givenArgumentValue) 1398 ], functionName, file, line, next); 1399 } 1400 1401 this(string argumentName, string argumentDescription, string functionName = __PRETTY_FUNCTION__, string file = __FILE__, size_t line = __LINE__, Throwable next = null) { 1402 this(argumentName, argumentDescription, LimitedVariant.init, functionName, file, line, next); 1403 } 1404 1405 override void getAdditionalPrintableInformation(scope void delegate(string name, in char[] value) sink) const { 1406 // FIXME: print the details better 1407 foreach(arg; invalidArguments) 1408 sink("invalidArguments[]", arg.name ~ " " ~ arg.description); 1409 } 1410 } 1411 1412 /++ 1413 Base class for when you've requested a feature that is not available. It may not be available because it is possible, but not yet implemented, or it might be because it is impossible on your operating system. 1414 +/ 1415 class FeatureUnavailableException : ArsdExceptionBase { 1416 this(string featureName = __PRETTY_FUNCTION__, string file = __FILE__, size_t line = __LINE__, Throwable next = null) { 1417 super(featureName, file, line, next); 1418 } 1419 } 1420 1421 /++ 1422 This means the feature could be done, but I haven't gotten around to implementing it yet. If you email me, I might be able to add it somewhat quickly and get back to you. 1423 +/ 1424 class NotYetImplementedException : FeatureUnavailableException { 1425 this(string featureName = __PRETTY_FUNCTION__, string file = __FILE__, size_t line = __LINE__, Throwable next = null) { 1426 super(featureName, file, line, next); 1427 } 1428 1429 } 1430 1431 /++ 1432 This means the feature is not supported by your current operating system. You might be able to get it in an update, but you might just have to find an alternate way of doing things. 1433 +/ 1434 class NotSupportedException : FeatureUnavailableException { 1435 this(string featureName, string file = __FILE__, size_t line = __LINE__, Throwable next = null) { 1436 super(featureName, file, line, next); 1437 } 1438 } 1439 1440 /++ 1441 This is a generic exception with attached arguments. It is used when I had to throw something but didn't want to write a new class. 1442 1443 You can catch an ArsdException to get its passed arguments out. 1444 1445 You can pass either a base class or a string as `Type`. 1446 1447 See the examples for how to use it. 1448 +/ 1449 template ArsdException(alias Type, DataTuple...) { 1450 static if(DataTuple.length) 1451 alias Parent = ArsdException!(Type, DataTuple[0 .. $-1]); 1452 else 1453 alias Parent = ArsdExceptionBase; 1454 1455 class ArsdException : Parent { 1456 DataTuple data; 1457 1458 this(DataTuple data, string file = __FILE__, size_t line = __LINE__) { 1459 this.data = data; 1460 static if(is(Parent == ArsdExceptionBase)) 1461 super(null, file, line); 1462 else 1463 super(data[0 .. $-1], file, line); 1464 } 1465 1466 static opCall(R...)(R r, string file = __FILE__, size_t line = __LINE__) { 1467 return new ArsdException!(Type, DataTuple, R)(r, file, line); 1468 } 1469 1470 override string printableExceptionName() const { 1471 static if(DataTuple.length) 1472 enum str = "ArsdException!(" ~ Type.stringof ~ ", " ~ DataTuple.stringof[1 .. $-1] ~ ")"; 1473 else 1474 enum str = "ArsdException!" ~ Type.stringof; 1475 return str; 1476 } 1477 1478 override void getAdditionalPrintableInformation(scope void delegate(string name, in char[] value) sink) const { 1479 ArsdExceptionBase.getAdditionalPrintableInformation(sink); 1480 1481 foreach(idx, datum; data) { 1482 enum int lol = cast(int) idx; 1483 enum key = "[" ~ lol.stringof ~ "] " ~ DataTuple[idx].stringof; 1484 sink(key, toStringInternal(datum)); 1485 } 1486 } 1487 } 1488 } 1489 1490 /// This example shows how you can throw and catch the ad-hoc exception types. 1491 unittest { 1492 // you can throw and catch by matching the string and argument types 1493 try { 1494 // throw it with parenthesis after the template args (it uses opCall to construct) 1495 throw ArsdException!"Test"(); 1496 // you could also `throw new ArsdException!"test";`, but that gets harder with args 1497 // as we'll see in the following example 1498 assert(0); // remove from docs 1499 } catch(ArsdException!"Test" e) { // catch it without them 1500 // this has no useful information except for the type 1501 // but you can catch it like this and it is still more than generic Exception 1502 } 1503 1504 // an exception's job is to deliver useful information up the chain 1505 // and you can do that easily by passing arguments: 1506 1507 try { 1508 throw ArsdException!"Test"(4, "four"); 1509 // you could also `throw new ArsdException!("Test", int, string)(4, "four")` 1510 // but now you start to see how the opCall convenience constructor simplifies things 1511 assert(0); // remove from docs 1512 } catch(ArsdException!("Test", int, string) e) { // catch it and use info by specifying types 1513 assert(e.data[0] == 4); // and extract arguments like this 1514 assert(e.data[1] == "four"); 1515 } 1516 1517 // a throw site can add additional information without breaking code that catches just some 1518 // generally speaking, each additional argument creates a new subclass on top of the previous args 1519 // so you can cast 1520 1521 try { 1522 throw ArsdException!"Test"(4, "four", 9); 1523 assert(0); // remove from docs 1524 } catch(ArsdException!("Test", int, string) e) { // this catch still works 1525 assert(e.data[0] == 4); 1526 assert(e.data[1] == "four"); 1527 // but if you were to print it, all the members would be there 1528 // import std.stdio; writeln(e); // would show something like: 1529 /+ 1530 ArsdException!("Test", int, string, int)@file.d(line): 1531 [0] int: 4 1532 [1] string: four 1533 [2] int: 9 1534 +/ 1535 // indicating that there's additional information available if you wanted to process it 1536 1537 // and meanwhile: 1538 ArsdException!("Test", int) e2 = e; // this implicit cast works thanks to the parent-child relationship 1539 ArsdException!"Test" e3 = e; // this works too, the base type/string still matches 1540 1541 // so catching those types would work too 1542 } 1543 } 1544 1545 /++ 1546 A tagged union that holds an error code from system apis, meaning one from Windows GetLastError() or C's errno. 1547 1548 You construct it with `SystemErrorCode(thing)` and the overloaded constructor tags and stores it. 1549 +/ 1550 struct SystemErrorCode { 1551 /// 1552 enum Type { 1553 errno, /// 1554 win32 /// 1555 } 1556 1557 const Type type; /// 1558 const int code; /// You should technically cast it back to DWORD if it is a win32 code 1559 1560 /++ 1561 C/unix error are typed as signed ints... 1562 Windows' errors are typed DWORD, aka unsigned... 1563 1564 so just passing them straight up will pick the right overload here to set the tag. 1565 +/ 1566 this(int errno) { 1567 this.type = Type.errno; 1568 this.code = errno; 1569 } 1570 1571 /// ditto 1572 this(uint win32) { 1573 this.type = Type.win32; 1574 this.code = win32; 1575 } 1576 1577 /++ 1578 Returns if the code indicated success. 1579 1580 Please note that many calls do not actually set a code to success, but rather just don't touch it. Thus this may only be true on `init`. 1581 +/ 1582 bool wasSuccessful() const { 1583 final switch(type) { 1584 case Type.errno: 1585 return this.code == 0; 1586 case Type.win32: 1587 return this.code == 0; 1588 } 1589 } 1590 1591 /++ 1592 Constructs a string containing both the code and the explanation string. 1593 +/ 1594 string toString() const { 1595 return codeAsString ~ " " ~ errorString; 1596 } 1597 1598 /++ 1599 The numeric code itself as a string. 1600 1601 See [errorString] for a text explanation of the code. 1602 +/ 1603 string codeAsString() const { 1604 char[16] buffer; 1605 final switch(type) { 1606 case Type.errno: 1607 return intToString(code, buffer[]).idup; 1608 case Type.win32: 1609 buffer[0 .. 2] = "0x"; 1610 return buffer[0 .. 2 + intToString(code, buffer[2 .. $], IntToStringArgs().withRadix(16).withPadding(8)).length].idup; 1611 } 1612 } 1613 1614 /++ 1615 A text explanation of the code. See [codeAsString] for a string representation of the numeric representation. 1616 +/ 1617 string errorString() const { 1618 final switch(type) { 1619 case Type.errno: 1620 import core.stdc.string; 1621 auto strptr = strerror(code); 1622 auto orig = strptr; 1623 int len; 1624 while(*strptr++) { 1625 len++; 1626 } 1627 1628 return orig[0 .. len].idup; 1629 case Type.win32: 1630 version(Windows) { 1631 wchar[256] buffer; 1632 auto size = FormatMessageW( 1633 FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, 1634 null, 1635 code, 1636 MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), 1637 buffer.ptr, 1638 buffer.length, 1639 null 1640 ); 1641 1642 return makeUtf8StringFromWindowsString(buffer[0 .. size]).stripInternal; 1643 } else { 1644 return null; 1645 } 1646 } 1647 } 1648 } 1649 1650 /++ 1651 1652 +/ 1653 struct SavedArgument { 1654 string name; 1655 LimitedVariant value; 1656 } 1657 1658 /++ 1659 1660 +/ 1661 class SystemApiException : ArsdExceptionBase { 1662 this(string msg, int originalErrorNo, scope SavedArgument[] args = null, string file = __FILE__, size_t line = __LINE__, Throwable next = null) { 1663 this(msg, SystemErrorCode(originalErrorNo), args, file, line, next); 1664 } 1665 1666 version(Windows) 1667 this(string msg, DWORD windowsError, scope SavedArgument[] args = null, string file = __FILE__, size_t line = __LINE__, Throwable next = null) { 1668 this(msg, SystemErrorCode(windowsError), args, file, line, next); 1669 } 1670 1671 this(string msg, SystemErrorCode code, SavedArgument[] args = null, string file = __FILE__, size_t line = __LINE__, Throwable next = null) { 1672 this.errorCode = code; 1673 1674 // discard stuff that won't fit 1675 if(args.length > this.args.length) 1676 args = args[0 .. this.args.length]; 1677 1678 this.args[0 .. args.length] = args[]; 1679 1680 super(msg, file, line, next); 1681 } 1682 1683 /++ 1684 1685 +/ 1686 const SystemErrorCode errorCode; 1687 1688 /++ 1689 1690 +/ 1691 const SavedArgument[8] args; 1692 1693 override void getAdditionalPrintableInformation(scope void delegate(string name, in char[] value) sink) const { 1694 super.getAdditionalPrintableInformation(sink); 1695 sink("Error code", errorCode.toString()); 1696 1697 foreach(arg; args) 1698 if(arg.name !is null) 1699 sink(arg.name, arg.value.toString()); 1700 } 1701 1702 } 1703 1704 /++ 1705 The low level use of this would look like `throw new WindowsApiException("MsgWaitForMultipleObjectsEx", GetLastError())` but it is meant to be used from higher level things like [Win32Enforce]. 1706 1707 History: 1708 Moved from simpledisplay.d to core.d in March 2023 (dub v11.0). 1709 +/ 1710 alias WindowsApiException = SystemApiException; 1711 1712 /++ 1713 History: 1714 Moved from simpledisplay.d to core.d in March 2023 (dub v11.0). 1715 +/ 1716 alias ErrnoApiException = SystemApiException; 1717 1718 /++ 1719 Calls the C API function `fn`. If it returns an error value, it throws an [ErrnoApiException] (or subclass) after getting `errno`. 1720 +/ 1721 template ErrnoEnforce(alias fn, alias errorValue = void) { 1722 static if(is(typeof(fn) Return == return)) 1723 static if(is(typeof(fn) Params == __parameters)) { 1724 static if(is(errorValue == void)) { 1725 static if(is(typeof(null) : Return)) 1726 enum errorValueToUse = null; 1727 else static if(is(Return : long)) 1728 enum errorValueToUse = -1; 1729 else 1730 static assert(0, "Please pass the error value"); 1731 } else { 1732 enum errorValueToUse = errorValue; 1733 } 1734 1735 Return ErrnoEnforce(Params params, ArgSentinel sentinel = ArgSentinel.init, string file = __FILE__, size_t line = __LINE__) { 1736 import core.stdc.errno; 1737 1738 Return value = fn(params); 1739 1740 if(value == errorValueToUse) { 1741 SavedArgument[] args; // FIXME 1742 /+ 1743 static foreach(idx; 0 .. Params.length) 1744 args ~= SavedArgument( 1745 __traits(identifier, Params[idx .. idx + 1]), 1746 params[idx] 1747 ); 1748 +/ 1749 throw new ErrnoApiException(__traits(identifier, fn), errno, args, file, line); 1750 } 1751 1752 return value; 1753 } 1754 } 1755 } 1756 1757 version(Windows) { 1758 /++ 1759 Calls the Windows API function `fn`. If it returns an error value, it throws a [WindowsApiException] (or subclass) after calling `GetLastError()`. 1760 +/ 1761 template Win32Enforce(alias fn, alias errorValue = void) { 1762 static if(is(typeof(fn) Return == return)) 1763 static if(is(typeof(fn) Params == __parameters)) { 1764 static if(is(errorValue == void)) { 1765 static if(is(Return == BOOL)) 1766 enum errorValueToUse = false; 1767 else static if(is(Return : HANDLE)) 1768 enum errorValueToUse = NULL; 1769 else static if(is(Return == DWORD)) 1770 enum errorValueToUse = cast(DWORD) 0xffffffff; 1771 else 1772 static assert(0, "Please pass the error value"); 1773 } else { 1774 enum errorValueToUse = errorValue; 1775 } 1776 1777 Return Win32Enforce(Params params, ArgSentinel sentinel = ArgSentinel.init, string file = __FILE__, size_t line = __LINE__) { 1778 Return value = fn(params); 1779 1780 if(value == errorValueToUse) { 1781 auto error = GetLastError(); 1782 SavedArgument[] args; // FIXME 1783 throw new WindowsApiException(__traits(identifier, fn), error, args, file, line); 1784 } 1785 1786 return value; 1787 } 1788 } 1789 } 1790 1791 } 1792 1793 /+ 1794 =============== 1795 EVENT LOOP CORE 1796 =============== 1797 +/ 1798 1799 /+ 1800 UI threads 1801 need to get window messages in addition to all the other jobs 1802 I/O Worker threads 1803 need to get commands for read/writes, run them, and send the reply back. not necessary on Windows 1804 if interrupted, check cancel flags. 1805 CPU Worker threads 1806 gets functions, runs them, send reply back. should send a cancel flag to periodically check 1807 Task worker threads 1808 runs fibers and multiplexes them 1809 1810 1811 General procedure: 1812 issue the read/write command 1813 if it would block on linux, epoll associate it. otherwise do the callback immediately 1814 1815 callbacks have default affinity to the current thread, meaning their callbacks always run here 1816 accepts can usually be dispatched to any available thread tho 1817 1818 // In other words, a single thread can be associated with, at most, one I/O completion port. 1819 1820 Realistically, IOCP only used if there is no thread affinity. If there is, just do overlapped w/ sleepex. 1821 1822 1823 case study: http server 1824 1825 1) main thread starts the server. it does an accept loop with no thread affinity. the main thread does NOT check the global queue (the iocp/global epoll) 1826 2) connections come in and are assigned to first available thread via the iocp/global epoll 1827 3) these run local event loops until the connection task is finished 1828 1829 EVENT LOOP TYPES: 1830 1) main ui thread - MsgWaitForMultipleObjectsEx / epoll on the local ui. it does NOT check the any worker thread thing! 1831 The main ui thread should never terminate until the program is ready to close. 1832 You can have additional ui threads in theory but im not really gonna support that in full; most things will assume there is just the one. simpledisplay's gui thread is the primary if it exists. (and sdpy will prolly continue to be threaded the way it is now) 1833 1834 The biggest complication is the TerminalDirectToEmulator, where the primary ui thread is NOT the thread that runs `main` 1835 2) worker thread GetQueuedCompletionStatusEx / epoll on the local thread fd and the global epoll fd 1836 3) local event loop - check local things only. SleepEx / epoll on local thread fd. This more of a compatibility hack for `waitForCompletion` outside a fiber. 1837 1838 i'll use: 1839 * QueueUserAPC to send interruptions to a worker thread 1840 * PostQueuedCompletionStatus is to send interruptions to any available thread. 1841 * PostMessage to a window 1842 * ??? to a fiber task 1843 1844 I also need a way to de-duplicate events in the queue so if you try to push the same thing it won't trigger multiple times.... I might want to keep a duplicate of the thing... really, what I'd do is post the "event wake up" message and keep the queue in my own thing. (WM_PAINT auto-coalesces) 1845 1846 Destructors need to be able to post messages back to a specific task to queue thread-affinity cleanup. This must be GC safe. 1847 1848 A task might want to wait on certain events. If the task is a fiber, it yields and gets called upon the event. If the task is a thread, it really has to call the event loop... which can be a loop of loops we want to avoid. `waitForCompletion` is more often gonna be used just to run the loop at top level tho... it might not even check for the global info availability so it'd run the local thing only. 1849 1850 APCs should not themselves enter an alterable wait cuz it can stack overflow. So generally speaking, they should avoid calling fibers or other event loops. 1851 +/ 1852 1853 /++ 1854 You can also pass a handle to a specific thread, if you have one. 1855 +/ 1856 enum ThreadToRunIn { 1857 /++ 1858 The callback should be only run by the same thread that set it. 1859 +/ 1860 CurrentThread, 1861 /++ 1862 The UI thread is a special one - it is the supervisor of the workers and the controller of gui and console handles. It is the first thread to call [arsd_core_init] actively running an event loop unless there is a thread that has actively asserted the ui supervisor role. FIXME is this true after i implemen it? 1863 1864 A ui thread should be always quickly responsive to new events. 1865 1866 There should only be one main ui thread, in which simpledisplay and minigui can be used. 1867 1868 Other threads can run like ui threads, but are considered temporary and only concerned with their own needs (it is the default style of loop 1869 for an undeclared thread but will not receive messages from other threads unless there is no other option) 1870 1871 1872 Ad-Hoc thread - something running an event loop that isn't another thing 1873 Controller thread - running an explicit event loop instance set as not a task runner or blocking worker 1874 UI thread - simpledisplay's event loop, which it will require remain live for the duration of the program (running two .eventLoops without a parent EventLoop instance will become illegal, throwing at runtime if it happens telling people to change their code) 1875 1876 Windows HANDLES will always be listened on the thread itself that is requesting, UNLESS it is a worker/helper thread, in which case it goes to a coordinator thread. since it prolly can't rely on the parent per se this will have to be one created by arsd core init, UNLESS the parent is inside an explicit EventLoop structure. 1877 1878 All use the MsgWaitForMultipleObjectsEx pattern 1879 1880 1881 +/ 1882 UiThread, 1883 /++ 1884 The callback can be called from any available worker thread. It will be added to a global queue and the first thread to see it will run it. 1885 1886 These will not run on the UI thread unless there is no other option on the platform (and all platforms this lib supports have other options). 1887 1888 These are expected to run cooperatively multitasked things; functions that frequently yield as they wait on other tasks. Think a fiber. 1889 1890 A task runner should be generally responsive to new events. 1891 +/ 1892 AnyAvailableTaskRunnerThread, 1893 /++ 1894 These are expected to run longer blocking, but independent operations. Think an individual function with no context. 1895 1896 A blocking worker can wait hundreds of milliseconds between checking for new events. 1897 +/ 1898 AnyAvailableBlockingWorkerThread, 1899 /++ 1900 The callback will be duplicated across all threads known to the arsd.core event loop. 1901 1902 It adds it to an immutable queue that each thread will go through... might just replace with an exit() function. 1903 1904 1905 so to cancel all associated tasks for like a web server, it could just have the tasks atomicAdd to a counter and subtract when they are finished. Then you have a single semaphore you signal the number of times you have an active thing and wait for them to acknowledge it. 1906 1907 threads should report when they start running the loop and they really should report when they terminate but that isn't reliable 1908 1909 1910 hmmm what if: all user-created threads (the public api) count as ui threads. only ones created in here are task runners or helpers. ui threads can wait on a global event to exit. 1911 1912 there's still prolly be one "the" ui thread, which does the handle listening on windows and is the one sdpy wants. 1913 +/ 1914 BroadcastToAllThreads, 1915 } 1916 1917 /++ 1918 Initializes the arsd core event loop and creates its worker threads. You don't actually have to call this, since the first use of an arsd.core function that requires it will call it implicitly, but calling it yourself gives you a chance to control the configuration more explicitly if you want to. 1919 +/ 1920 void arsd_core_init(int numberOfWorkers = 0) { 1921 1922 } 1923 1924 version(Windows) 1925 class WindowsHandleReader_ex { 1926 // Windows handles are always dispatched to the main ui thread, which can then send a command back to a worker thread to run the callback if needed 1927 this(HANDLE handle) {} 1928 } 1929 1930 version(Posix) 1931 class PosixFdReader_ex { 1932 // posix readers can just register with whatever instance we want to handle the callback 1933 } 1934 1935 /++ 1936 1937 +/ 1938 interface ICoreEventLoop { 1939 /++ 1940 Runs the event loop for this thread until the `until` delegate returns `true`. 1941 +/ 1942 final void run(scope bool delegate() until) { 1943 while(!until()) { 1944 runOnce(); 1945 } 1946 } 1947 1948 /++ 1949 Runs a single iteration of the event loop for this thread. It will return when the first thing happens, but that thing might be totally uninteresting to anyone, or it might trigger significant work you'll wait on. 1950 +/ 1951 void runOnce(); 1952 1953 // to send messages between threads, i'll queue up a function that just call dispatchMessage. can embed the arg inside the callback helper prolly. 1954 // tho i might prefer to actually do messages w/ run payloads so it is easier to deduplicate i can still dedupe by insepcting the call args so idk 1955 1956 version(Posix) { 1957 @mustuse 1958 static struct UnregisterToken { 1959 private CoreEventLoopImplementation impl; 1960 private int fd; 1961 private CallbackHelper cb; 1962 1963 /++ 1964 Unregisters the file descriptor from the event loop and releases the reference to the callback held by the event loop (which will probably free it). 1965 1966 You must call this when you're done. Normally, this will be right before you close the fd (Which is often after the other side closes it, meaning you got a 0 length read). 1967 +/ 1968 void unregister() { 1969 assert(impl !is null, "Cannot reuse unregister token"); 1970 1971 version(Arsd_core_epoll) { 1972 impl.unregisterFd(fd); 1973 } else version(Arsd_core_kqueue) { 1974 // intentionally blank - all registrations are one-shot there 1975 // FIXME: actually it might not have gone off yet, in that case we do need to delete the filter 1976 } else static assert(0); 1977 1978 cb.release(); 1979 this = typeof(this).init; 1980 } 1981 } 1982 1983 @mustuse 1984 static struct RearmToken { 1985 private bool readable; 1986 private CoreEventLoopImplementation impl; 1987 private int fd; 1988 private CallbackHelper cb; 1989 private uint flags; 1990 1991 /++ 1992 Calls [UnregisterToken.unregister] 1993 +/ 1994 void unregister() { 1995 assert(impl !is null, "cannot reuse rearm token after unregistering it"); 1996 1997 version(Arsd_core_epoll) { 1998 impl.unregisterFd(fd); 1999 } else version(Arsd_core_kqueue) { 2000 // intentionally blank - all registrations are one-shot there 2001 // FIXME: actually it might not have gone off yet, in that case we do need to delete the filter 2002 } else static assert(0); 2003 2004 cb.release(); 2005 this = typeof(this).init; 2006 } 2007 2008 /++ 2009 Rearms the event so you will get another callback next time it is ready. 2010 +/ 2011 void rearm() { 2012 assert(impl !is null, "cannot reuse rearm token after unregistering it"); 2013 impl.rearmFd(this); 2014 } 2015 } 2016 2017 UnregisterToken addCallbackOnFdReadable(int fd, CallbackHelper cb); 2018 RearmToken addCallbackOnFdReadableOneShot(int fd, CallbackHelper cb); 2019 RearmToken addCallbackOnFdWritableOneShot(int fd, CallbackHelper cb); 2020 } 2021 } 2022 2023 /++ 2024 Get the event loop associated with this thread 2025 +/ 2026 ICoreEventLoop getThisThreadEventLoop(EventLoopType type = EventLoopType.AdHoc) { 2027 static ICoreEventLoop loop; 2028 if(loop is null) 2029 loop = new CoreEventLoopImplementation(); 2030 return loop; 2031 } 2032 2033 /++ 2034 The internal types that will be exposed through other api things. 2035 +/ 2036 package(arsd) enum EventLoopType { 2037 /++ 2038 The event loop is being run temporarily and the thread doesn't promise to keep running it. 2039 +/ 2040 AdHoc, 2041 /++ 2042 The event loop struct has been instantiated at top level. Its destructor will run when the 2043 function exits, which is only at the end of the entire block of work it is responsible for. 2044 2045 It must be in scope for the whole time the arsd event loop functions are expected to be used 2046 (meaning it should generally be top-level in `main`) 2047 +/ 2048 Explicit, 2049 /++ 2050 A specialization of `Explicit`, so all the same rules apply there, but this is specifically the event loop coming from simpledisplay or minigui. It will run for the duration of the UI's existence. 2051 +/ 2052 Ui, 2053 /++ 2054 A special event loop specifically for threads that listen to the task runner queue and handle I/O events from running tasks. Typically, a task runner runs cooperatively multitasked coroutines (so they prefer not to block the whole thread). 2055 +/ 2056 TaskRunner, 2057 /++ 2058 A special event loop specifically for threads that listen to the helper function request queue. Helper functions are expected to run independently for a somewhat long time (them blocking the thread for some time is normal) and send a reply message back to the requester. 2059 +/ 2060 HelperWorker 2061 } 2062 2063 /+ 2064 Tasks are given an object to talk to their parent... can be a dialog where it is like 2065 2066 sendBuffer 2067 waitForWordToProceed 2068 2069 in a loop 2070 2071 2072 Tasks are assigned to a worker thread and may share it with other tasks. 2073 +/ 2074 2075 2076 // the GC may not be able to see this! remember, it can be hidden inside kernel buffers 2077 private class CallbackHelper { 2078 import core.memory; 2079 2080 void call() { 2081 if(callback) 2082 callback(); 2083 } 2084 2085 void delegate() callback; 2086 void*[3] argsStore; 2087 2088 void addref() { 2089 atomicOp!"+="(refcount, 1); 2090 } 2091 2092 void release() { 2093 if(atomicOp!"-="(refcount, 1) <= 0) { 2094 if(flags & 1) 2095 GC.removeRoot(cast(void*) this); 2096 } 2097 } 2098 2099 private shared(int) refcount; 2100 private uint flags; 2101 2102 this(void function() callback) { 2103 this( () { callback(); } ); 2104 } 2105 2106 this(void delegate() callback, bool addRoot = true) { 2107 if(addRoot) { 2108 GC.addRoot(cast(void*) this); 2109 this.flags |= 1; 2110 } 2111 2112 this.addref(); 2113 this.callback = callback; 2114 } 2115 } 2116 2117 /++ 2118 This represents a file. Technically, file paths aren't actually strings (for example, on Linux, they need not be valid utf-8, while a D string is supposed to be), even though we almost always use them like that. 2119 2120 This type is meant to represent a filename / path. I might not keep it around. 2121 +/ 2122 struct FilePath { 2123 string path; 2124 2125 bool isNull() { 2126 return path is null; 2127 } 2128 2129 bool opCast(T:bool)() { 2130 return !isNull; 2131 } 2132 2133 string toString() { 2134 return path; 2135 } 2136 2137 //alias toString this; 2138 } 2139 2140 /++ 2141 Represents a generic async, waitable request. 2142 +/ 2143 class AsyncOperationRequest { 2144 /++ 2145 Actually issues the request, starting the operation. 2146 +/ 2147 abstract void start(); 2148 /++ 2149 Cancels the request. This will cause `isComplete` to return true once the cancellation has been processed, but [AsyncOperationResponse.wasSuccessful] will return `false` (unless it completed before the cancellation was processed, in which case it is still allowed to finish successfully). 2150 2151 After cancelling a request, you should still wait for it to complete to ensure that the task has actually released its resources before doing anything else on it. 2152 2153 Once a cancellation request has been sent, it cannot be undone. 2154 +/ 2155 abstract void cancel(); 2156 2157 /++ 2158 Returns `true` if the operation has been completed. It may be completed successfully, cancelled, or have errored out - to check this, call [waitForCompletion] and check the members on the response object. 2159 +/ 2160 abstract bool isComplete(); 2161 /++ 2162 Waits until the request has completed - successfully or otherwise - and returns the response object. It will run an ad-hoc event loop that may call other callbacks while waiting. 2163 2164 The response object may be embedded in the request object - do not reuse the request until you are finished with the response and do not keep the response around longer than you keep the request. 2165 2166 2167 Note to implementers: all subclasses should override this and return their specific response object. You can use the top-level `waitForFirstToCompleteByIndex` function with a single-element static array to help with the implementation. 2168 +/ 2169 abstract AsyncOperationResponse waitForCompletion(); 2170 2171 /++ 2172 2173 +/ 2174 // abstract void repeat(); 2175 } 2176 2177 /++ 2178 2179 +/ 2180 interface AsyncOperationResponse { 2181 /++ 2182 Returns true if the request completed successfully, finishing what it was supposed to. 2183 2184 Should be set to `false` if the request was cancelled before completing or encountered an error. 2185 +/ 2186 bool wasSuccessful(); 2187 } 2188 2189 /++ 2190 It returns the $(I request) so you can identify it more easily. `request.waitForCompletion()` is guaranteed to return the response without any actual wait, since it is already complete when this function returns. 2191 2192 Please note that "completion" is not necessary successful completion; a request being cancelled or encountering an error also counts as it being completed. 2193 2194 The `waitForFirstToCompleteByIndex` version instead returns the index of the array entry that completed first. 2195 2196 It is your responsibility to remove the completed request from the array before calling the function again, since any request already completed will always be immediately returned. 2197 2198 You might prefer using [asTheyComplete], which will give each request as it completes and loop over until all of them are complete. 2199 2200 Returns: 2201 `null` or `requests.length` if none completed before returning. 2202 +/ 2203 AsyncOperationRequest waitForFirstToComplete(AsyncOperationRequest[] requests...) { 2204 auto idx = waitForFirstToCompleteByIndex(requests); 2205 if(idx == requests.length) 2206 return null; 2207 return requests[idx]; 2208 } 2209 /// ditto 2210 size_t waitForFirstToCompleteByIndex(AsyncOperationRequest[] requests...) { 2211 size_t helper() { 2212 foreach(idx, request; requests) 2213 if(request.isComplete()) 2214 return idx; 2215 return requests.length; 2216 } 2217 2218 auto idx = helper(); 2219 // if one is already done, return it 2220 if(idx != requests.length) 2221 return idx; 2222 2223 // otherwise, run the ad-hoc event loop until one is 2224 // FIXME: what if we are inside a fiber? 2225 auto el = getThisThreadEventLoop(); 2226 el.run(() => (idx = helper()) != requests.length); 2227 2228 return idx; 2229 } 2230 2231 /++ 2232 Waits for all the `requests` to complete, giving each one through the range interface as it completes. 2233 2234 This meant to be used in a foreach loop. 2235 2236 The `requests` array and its contents must remain valid for the lifetime of the returned range. Its contents may be shuffled as the requests complete (the implementation works through an unstable sort+remove). 2237 +/ 2238 AsTheyCompleteRange asTheyComplete(AsyncOperationRequest[] requests...) { 2239 return AsTheyCompleteRange(requests); 2240 } 2241 /// ditto 2242 struct AsTheyCompleteRange { 2243 AsyncOperationRequest[] requests; 2244 2245 this(AsyncOperationRequest[] requests) { 2246 this.requests = requests; 2247 2248 if(requests.length == 0) 2249 return; 2250 2251 // wait for first one to complete, then move it to the front of the array 2252 moveFirstCompleteToFront(); 2253 } 2254 2255 private void moveFirstCompleteToFront() { 2256 auto idx = waitForFirstToCompleteByIndex(requests); 2257 2258 auto tmp = requests[0]; 2259 requests[0] = requests[idx]; 2260 requests[idx] = tmp; 2261 } 2262 2263 bool empty() { 2264 return requests.length == 0; 2265 } 2266 2267 void popFront() { 2268 assert(!empty); 2269 /+ 2270 this needs to 2271 1) remove the front of the array as being already processed (unless it is the initial priming call) 2272 2) wait for one of them to complete 2273 3) move the complete one to the front of the array 2274 +/ 2275 2276 requests[0] = requests[$-1]; 2277 requests = requests[0 .. $-1]; 2278 2279 if(requests.length) 2280 moveFirstCompleteToFront(); 2281 } 2282 2283 AsyncOperationRequest front() { 2284 return requests[0]; 2285 } 2286 } 2287 2288 version(Windows) { 2289 alias NativeFileHandle = HANDLE; /// 2290 alias NativeSocketHandle = SOCKET; /// 2291 alias NativePipeHandle = HANDLE; /// 2292 } else version(Posix) { 2293 alias NativeFileHandle = int; /// 2294 alias NativeSocketHandle = int; /// 2295 alias NativePipeHandle = int; /// 2296 } 2297 2298 /++ 2299 An `AbstractFile` represents a file handle on the operating system level. You cannot do much with it. 2300 +/ 2301 class AbstractFile { 2302 private { 2303 NativeFileHandle handle; 2304 } 2305 2306 /++ 2307 +/ 2308 enum OpenMode { 2309 readOnly, /// C's "r", the file is read 2310 writeWithTruncation, /// C's "w", the file is blanked upon opening so it only holds what you write 2311 appendOnly, /// C's "a", writes will always be appended to the file 2312 readAndWrite /// C's "r+", writes will overwrite existing parts of the file based on where you seek (default is at the beginning) 2313 } 2314 2315 /++ 2316 +/ 2317 enum RequirePreexisting { 2318 no, 2319 yes 2320 } 2321 2322 /+ 2323 enum SpecialFlags { 2324 randomAccessExpected, /// FILE_FLAG_SEQUENTIAL_SCAN is turned off and posix_fadvise(POSIX_FADV_SEQUENTIAL) 2325 skipCache, /// O_DSYNC, FILE_FLAG_NO_BUFFERING and maybe WRITE_THROUGH. note that metadata still goes through the cache, FlushFileBuffers and fsync can still do those 2326 temporary, /// FILE_ATTRIBUTE_TEMPORARY on Windows, idk how to specify on linux. also FILE_FLAG_DELETE_ON_CLOSE can be combined to make a (almost) all memory file. kinda like a private anonymous mmap i believe. 2327 deleteWhenClosed, /// Windows has a flag for this but idk if it is of any real use 2328 async, /// open it in overlapped mode, all reads and writes must then provide an offset. Only implemented on Windows 2329 } 2330 +/ 2331 2332 /++ 2333 2334 +/ 2335 protected this(bool async, FilePath filename, OpenMode mode = OpenMode.readOnly, RequirePreexisting require = RequirePreexisting.no, uint specialFlags = 0) { 2336 version(Windows) { 2337 DWORD access; 2338 DWORD creation; 2339 2340 final switch(mode) { 2341 case OpenMode.readOnly: 2342 access = GENERIC_READ; 2343 creation = OPEN_EXISTING; 2344 break; 2345 case OpenMode.writeWithTruncation: 2346 access = GENERIC_WRITE; 2347 2348 final switch(require) { 2349 case RequirePreexisting.no: 2350 creation = CREATE_ALWAYS; 2351 break; 2352 case RequirePreexisting.yes: 2353 creation = TRUNCATE_EXISTING; 2354 break; 2355 } 2356 break; 2357 case OpenMode.appendOnly: 2358 access = FILE_APPEND_DATA; 2359 2360 final switch(require) { 2361 case RequirePreexisting.no: 2362 creation = CREATE_ALWAYS; 2363 break; 2364 case RequirePreexisting.yes: 2365 creation = OPEN_EXISTING; 2366 break; 2367 } 2368 break; 2369 case OpenMode.readAndWrite: 2370 access = GENERIC_READ | GENERIC_WRITE; 2371 2372 final switch(require) { 2373 case RequirePreexisting.no: 2374 creation = CREATE_NEW; 2375 break; 2376 case RequirePreexisting.yes: 2377 creation = OPEN_EXISTING; 2378 break; 2379 } 2380 break; 2381 } 2382 2383 WCharzBuffer wname = WCharzBuffer(filename.path); 2384 2385 auto handle = CreateFileW( 2386 wname.ptr, 2387 access, 2388 FILE_SHARE_READ, 2389 null, 2390 creation, 2391 FILE_ATTRIBUTE_NORMAL | (async ? FILE_FLAG_OVERLAPPED : 0), 2392 null 2393 ); 2394 2395 if(handle == INVALID_HANDLE_VALUE) { 2396 // FIXME: throw the filename and other params here too 2397 SavedArgument[3] args; 2398 args[0] = SavedArgument("filename", LimitedVariant(filename.path)); 2399 args[1] = SavedArgument("access", LimitedVariant(access, 2)); 2400 args[2] = SavedArgument("requirePreexisting", LimitedVariant(require == RequirePreexisting.yes)); 2401 throw new WindowsApiException("CreateFileW", GetLastError(), args[]); 2402 } 2403 2404 this.handle = handle; 2405 } else version(Posix) { 2406 import core.sys.posix.unistd; 2407 import core.sys.posix.fcntl; 2408 2409 CharzBuffer namez = CharzBuffer(filename.path); 2410 int flags; 2411 2412 // FIXME does mac not have cloexec for real or is this just a druntime problem????? 2413 version(Arsd_core_has_cloexec) { 2414 flags = O_CLOEXEC; 2415 } else { 2416 scope(success) 2417 setCloExec(this.handle); 2418 } 2419 2420 if(async) 2421 flags |= O_NONBLOCK; 2422 2423 final switch(mode) { 2424 case OpenMode.readOnly: 2425 flags |= O_RDONLY; 2426 break; 2427 case OpenMode.writeWithTruncation: 2428 flags |= O_WRONLY | O_TRUNC; 2429 2430 final switch(require) { 2431 case RequirePreexisting.no: 2432 flags |= O_CREAT; 2433 break; 2434 case RequirePreexisting.yes: 2435 break; 2436 } 2437 break; 2438 case OpenMode.appendOnly: 2439 flags |= O_APPEND; 2440 2441 final switch(require) { 2442 case RequirePreexisting.no: 2443 flags |= O_CREAT; 2444 break; 2445 case RequirePreexisting.yes: 2446 break; 2447 } 2448 break; 2449 case OpenMode.readAndWrite: 2450 flags |= O_RDWR; 2451 2452 final switch(require) { 2453 case RequirePreexisting.no: 2454 flags |= O_CREAT; 2455 break; 2456 case RequirePreexisting.yes: 2457 break; 2458 } 2459 break; 2460 } 2461 2462 auto perms = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH; 2463 int fd = open(namez.ptr, flags, perms); 2464 if(fd == -1) { 2465 SavedArgument[3] args; 2466 args[0] = SavedArgument("filename", LimitedVariant(filename.path)); 2467 args[1] = SavedArgument("flags", LimitedVariant(flags, 2)); 2468 args[2] = SavedArgument("perms", LimitedVariant(perms, 8)); 2469 throw new ErrnoApiException("open", errno, args[]); 2470 } 2471 2472 this.handle = fd; 2473 } 2474 } 2475 2476 /++ 2477 2478 +/ 2479 private this(NativeFileHandle handleToWrap) { 2480 this.handle = handleToWrap; 2481 } 2482 2483 // only available on some types of file 2484 long size() { return 0; } 2485 2486 // note that there is no fsync thing, instead use the special flag. 2487 2488 /++ 2489 2490 +/ 2491 void close() { 2492 version(Windows) { 2493 Win32Enforce!CloseHandle(handle); 2494 handle = null; 2495 } else version(Posix) { 2496 import unix = core.sys.posix.unistd; 2497 import core.sys.posix.fcntl; 2498 2499 ErrnoEnforce!(unix.close)(handle); 2500 handle = -1; 2501 } 2502 } 2503 } 2504 2505 /++ 2506 2507 +/ 2508 class File : AbstractFile { 2509 2510 /++ 2511 Opens a file in synchronous access mode. 2512 2513 The permission mask is on used on posix systems FIXME: implement it 2514 +/ 2515 this(FilePath filename, OpenMode mode = OpenMode.readOnly, RequirePreexisting require = RequirePreexisting.no, uint specialFlags = 0, uint permMask = 0) { 2516 super(false, filename, mode, require, specialFlags); 2517 } 2518 2519 /++ 2520 2521 +/ 2522 ubyte[] read(scope ubyte[] buffer) { 2523 return null; 2524 } 2525 2526 /++ 2527 2528 +/ 2529 void write(in void[] buffer) { 2530 } 2531 2532 enum Seek { 2533 current, 2534 fromBeginning, 2535 fromEnd 2536 } 2537 2538 // Seeking/telling/sizing is not permitted when appending and some files don't support it 2539 // also not permitted in async mode 2540 void seek(long where, Seek fromWhence) {} 2541 long tell() { return 0; } 2542 } 2543 2544 /++ 2545 Only one operation can be pending at any time in the current implementation. 2546 +/ 2547 class AsyncFile : AbstractFile { 2548 /++ 2549 Opens a file in asynchronous access mode. 2550 +/ 2551 this(FilePath filename, OpenMode mode = OpenMode.readOnly, RequirePreexisting require = RequirePreexisting.no, uint specialFlags = 0, uint permissionMask = 0) { 2552 // FIXME: implement permissionMask 2553 super(true, filename, mode, require, specialFlags); 2554 } 2555 2556 package(arsd) this(NativeFileHandle adoptPreSetup) { 2557 super(adoptPreSetup); 2558 } 2559 2560 /// 2561 AsyncReadRequest read(ubyte[] buffer, long offset = 0) { 2562 return new AsyncReadRequest(this, buffer, offset); 2563 } 2564 2565 /// 2566 AsyncWriteRequest write(const(void)[] buffer, long offset = 0) { 2567 return new AsyncWriteRequest(this, cast(ubyte[]) buffer, offset); 2568 } 2569 2570 } 2571 2572 /++ 2573 Reads or writes a file in one call. It might internally yield, but is generally blocking if it returns values. The callback ones depend on the implementation. 2574 2575 Tip: prefer the callback ones. If settings where async is possible, it will do async, and if not, it will sync. 2576 2577 NOT IMPLEMENTED 2578 +/ 2579 void writeFile(string filename, const(void)[] contents) { 2580 2581 } 2582 2583 /// ditto 2584 string readTextFile(string filename, string fileEncoding = null) { 2585 return null; 2586 } 2587 2588 /// ditto 2589 const(ubyte[]) readBinaryFile(string filename) { 2590 return null; 2591 } 2592 2593 /+ 2594 private Class recycleObject(Class, Args...)(Class objectToRecycle, Args args) { 2595 if(objectToRecycle is null) 2596 return new Class(args); 2597 // destroy nulls out the vtable which is the first thing in the object 2598 // so if it hasn't already been destroyed, we'll do it here 2599 if((*cast(void**) objectToRecycle) !is null) { 2600 assert(typeid(objectToRecycle) is typeid(Class)); // to make sure we're actually recycling the right kind of object 2601 .destroy(objectToRecycle); 2602 } 2603 2604 // then go ahead and reinitialize it 2605 ubyte[] rawData = (cast(ubyte*) cast(void*) objectToRecycle)[0 .. __traits(classInstanceSize, Class)]; 2606 rawData[] = (cast(ubyte[]) typeid(Class).initializer)[]; 2607 2608 objectToRecycle.__ctor(args); 2609 2610 return objectToRecycle; 2611 } 2612 +/ 2613 2614 /+ 2615 /++ 2616 Preallocates a class object without initializing it. 2617 2618 This is suitable *only* for passing to one of the functions in here that takes a preallocated object for recycling. 2619 +/ 2620 Class preallocate(Class)() { 2621 import core.memory; 2622 // FIXME: can i pass NO_SCAN here? 2623 return cast(Class) GC.calloc(__traits(classInstanceSize, Class), 0, typeid(Class)); 2624 } 2625 2626 OwnedClass!Class preallocateOnStack(Class)() { 2627 2628 } 2629 +/ 2630 2631 // thanks for a random person on stack overflow for this function 2632 version(Windows) 2633 BOOL MyCreatePipeEx( 2634 PHANDLE lpReadPipe, 2635 PHANDLE lpWritePipe, 2636 LPSECURITY_ATTRIBUTES lpPipeAttributes, 2637 DWORD nSize, 2638 DWORD dwReadMode, 2639 DWORD dwWriteMode 2640 ) 2641 { 2642 HANDLE ReadPipeHandle, WritePipeHandle; 2643 DWORD dwError; 2644 CHAR[MAX_PATH] PipeNameBuffer; 2645 2646 if (nSize == 0) { 2647 nSize = 4096; 2648 } 2649 2650 // FIXME: should be atomic op and gshared 2651 static shared(int) PipeSerialNumber = 0; 2652 2653 import core.stdc.string; 2654 import core.stdc.stdio; 2655 2656 sprintf(PipeNameBuffer.ptr, 2657 "\\\\.\\Pipe\\ArsdCoreAnonymousPipe.%08x.%08x".ptr, 2658 GetCurrentProcessId(), 2659 atomicOp!"+="(PipeSerialNumber, 1) 2660 ); 2661 2662 ReadPipeHandle = CreateNamedPipeA( 2663 PipeNameBuffer.ptr, 2664 1/*PIPE_ACCESS_INBOUND*/ | dwReadMode, 2665 0/*PIPE_TYPE_BYTE*/ | 0/*PIPE_WAIT*/, 2666 1, // Number of pipes 2667 nSize, // Out buffer size 2668 nSize, // In buffer size 2669 120 * 1000, // Timeout in ms 2670 lpPipeAttributes 2671 ); 2672 2673 if (! ReadPipeHandle) { 2674 return FALSE; 2675 } 2676 2677 WritePipeHandle = CreateFileA( 2678 PipeNameBuffer.ptr, 2679 GENERIC_WRITE, 2680 0, // No sharing 2681 lpPipeAttributes, 2682 OPEN_EXISTING, 2683 FILE_ATTRIBUTE_NORMAL | dwWriteMode, 2684 null // Template file 2685 ); 2686 2687 if (INVALID_HANDLE_VALUE == WritePipeHandle) { 2688 dwError = GetLastError(); 2689 CloseHandle( ReadPipeHandle ); 2690 SetLastError(dwError); 2691 return FALSE; 2692 } 2693 2694 *lpReadPipe = ReadPipeHandle; 2695 *lpWritePipe = WritePipeHandle; 2696 return( TRUE ); 2697 } 2698 2699 2700 2701 /+ 2702 2703 // this is probably useless. 2704 2705 /++ 2706 Creates a pair of anonymous pipes ready for async operations. 2707 2708 You can pass some preallocated objects to recycle if you like. 2709 +/ 2710 AsyncAnonymousPipe[2] anonymousPipePair(AsyncAnonymousPipe[2] preallocatedObjects = [null, null], bool inheritable = false) { 2711 version(Posix) { 2712 int[2] fds; 2713 auto ret = pipe(fds); 2714 2715 if(ret == -1) 2716 throw new SystemApiException("pipe", errno); 2717 2718 // FIXME: do we want them inheritable? and do we want both sides to be async? 2719 if(!inheritable) { 2720 setCloExec(fds[0]); 2721 setCloExec(fds[1]); 2722 } 2723 // if it is inherited, do we actually want it non-blocking? 2724 makeNonBlocking(fds[0]); 2725 makeNonBlocking(fds[1]); 2726 2727 return [ 2728 recycleObject(preallocatedObjects[0], fds[0]), 2729 recycleObject(preallocatedObjects[1], fds[1]), 2730 ]; 2731 } else version(Windows) { 2732 HANDLE rp, wp; 2733 // FIXME: do we want them inheritable? and do we want both sides to be async? 2734 if(!MyCreatePipeEx(&rp, &wp, null, 0, FILE_FLAG_OVERLAPPED, FILE_FLAG_OVERLAPPED)) 2735 throw new SystemApiException("MyCreatePipeEx", GetLastError()); 2736 return [ 2737 recycleObject(preallocatedObjects[0], rp), 2738 recycleObject(preallocatedObjects[1], wp), 2739 ]; 2740 } else throw ArsdException!"NotYetImplemented"(); 2741 } 2742 // on posix, just do pipe() w/ non block 2743 // on windows, do an overlapped named pipe server, connect, stop listening, return pair. 2744 +/ 2745 2746 /+ 2747 class NamedPipe : AsyncFile { 2748 2749 } 2750 +/ 2751 2752 /++ 2753 A named pipe ready to accept connections. 2754 2755 A Windows named pipe is an IPC mechanism usable on local machines or across a Windows network. 2756 +/ 2757 version(Windows) 2758 class NamedPipeServer { 2759 // unix domain socket or windows named pipe 2760 2761 // Promise!AsyncAnonymousPipe connect; 2762 // Promise!AsyncAnonymousPipe accept; 2763 2764 // when a new connection arrives, it calls your callback 2765 // can be on a specific thread or on any thread 2766 } 2767 2768 private version(Windows) extern(Windows) { 2769 const(char)* inet_ntop(int, const void*, char*, socklen_t); 2770 } 2771 2772 /++ 2773 Some functions that return arrays allow you to provide your own buffer. These are indicated in the type system as `UserProvidedBuffer!Type`, and you get to decide what you want to happen if the buffer is too small via the [OnOutOfSpace] parameter. 2774 2775 These are usually optional, since an empty user provided buffer with the default policy of reallocate will also work fine for whatever needs to be returned, thanks to the garbage collector taking care of it for you. 2776 2777 The API inside `UserProvidedBuffer` is all private to the arsd library implementation; your job is just to provide the buffer to it with [provideBuffer] or a constructor call and decide on your on-out-of-space policy. 2778 2779 $(TIP 2780 To properly size a buffer, I suggest looking at what covers about 80% of cases. Trying to cover everything often leads to wasted buffer space, and if you use a reallocate policy it can cover the rest. You might be surprised how far just two elements can go! 2781 ) 2782 2783 History: 2784 Added August 4, 2023 (dub v11.0) 2785 +/ 2786 struct UserProvidedBuffer(T) { 2787 private T[] buffer; 2788 private int actualLength; 2789 private OnOutOfSpace policy; 2790 2791 /++ 2792 2793 +/ 2794 public this(scope T[] buffer, OnOutOfSpace policy = OnOutOfSpace.reallocate) { 2795 this.buffer = buffer; 2796 this.policy = policy; 2797 } 2798 2799 package(arsd) bool append(T item) { 2800 if(actualLength < buffer.length) { 2801 buffer[actualLength++] = item; 2802 return true; 2803 } else final switch(policy) { 2804 case OnOutOfSpace.discard: 2805 return false; 2806 case OnOutOfSpace.exception: 2807 throw ArsdException!"Buffer out of space"(buffer.length, actualLength); 2808 case OnOutOfSpace.reallocate: 2809 buffer ~= item; 2810 actualLength++; 2811 return true; 2812 } 2813 } 2814 2815 package(arsd) T[] slice() return { 2816 return buffer[0 .. actualLength]; 2817 } 2818 } 2819 2820 /// ditto 2821 UserProvidedBuffer!T provideBuffer(T)(scope T[] buffer, OnOutOfSpace policy = OnOutOfSpace.reallocate) { 2822 return UserProvidedBuffer!T(buffer, policy); 2823 } 2824 2825 /++ 2826 Possible policies for [UserProvidedBuffer]s that run out of space. 2827 +/ 2828 enum OnOutOfSpace { 2829 reallocate, /// reallocate the buffer with the GC to make room 2830 discard, /// discard all contents that do not fit in your provided buffer 2831 exception, /// throw an exception if there is data that would not fit in your provided buffer 2832 } 2833 2834 2835 /++ 2836 For functions that give you an unknown address, you can use this to hold it. 2837 2838 Can get: 2839 ip4 2840 ip6 2841 unix 2842 abstract_ 2843 2844 name lookup for connect (stream or dgram) 2845 request canonical name? 2846 2847 interface lookup for bind (stream or dgram) 2848 +/ 2849 struct SocketAddress { 2850 import core.sys.posix.netdb; 2851 2852 /++ 2853 Provides the set of addresses to listen on all supported protocols on the machine for the given interfaces. `localhost` only listens on the loopback interface, whereas `allInterfaces` will listen on loopback as well as the others on the system (meaning it may be publicly exposed to the internet). 2854 2855 If you provide a buffer, I recommend using one of length two, so `SocketAddress[2]`, since this usually provides one address for ipv4 and one for ipv6. 2856 +/ 2857 static SocketAddress[] localhost(ushort port, return UserProvidedBuffer!SocketAddress buffer = null) { 2858 buffer.append(ip6("::1", port)); 2859 buffer.append(ip4("127.0.0.1", port)); 2860 return buffer.slice; 2861 } 2862 2863 /// ditto 2864 static SocketAddress[] allInterfaces(ushort port, return UserProvidedBuffer!SocketAddress buffer = null) { 2865 char[16] str; 2866 return allInterfaces(intToString(port, str[]), buffer); 2867 } 2868 2869 /// ditto 2870 static SocketAddress[] allInterfaces(scope const char[] serviceOrPort, return UserProvidedBuffer!SocketAddress buffer = null) { 2871 addrinfo hints; 2872 hints.ai_flags = AI_PASSIVE; 2873 hints.ai_socktype = SOCK_STREAM; // just to filter it down a little tbh 2874 return get(null, serviceOrPort, &hints, buffer); 2875 } 2876 2877 /++ 2878 Returns a single address object for the given protocol and parameters. 2879 2880 You probably should generally prefer [get], [localhost], or [allInterfaces] to have more flexible code. 2881 +/ 2882 static SocketAddress ip4(scope const char[] address, ushort port, bool forListening = false) { 2883 return getSingleAddress(AF_INET, AI_NUMERICHOST | (forListening ? AI_PASSIVE : 0), address, port); 2884 } 2885 2886 /// ditto 2887 static SocketAddress ip4(ushort port) { 2888 return ip4(null, port, true); 2889 } 2890 2891 /// ditto 2892 static SocketAddress ip6(scope const char[] address, ushort port, bool forListening = false) { 2893 return getSingleAddress(AF_INET6, AI_NUMERICHOST | (forListening ? AI_PASSIVE : 0), address, port); 2894 } 2895 2896 /// ditto 2897 static SocketAddress ip6(ushort port) { 2898 return ip6(null, port, true); 2899 } 2900 2901 /// ditto 2902 static SocketAddress unix(scope const char[] path) { 2903 // FIXME 2904 SocketAddress addr; 2905 return addr; 2906 } 2907 2908 /// ditto 2909 static SocketAddress abstract_(scope const char[] path) { 2910 char[190] buffer = void; 2911 buffer[0] = 0; 2912 buffer[1 .. path.length] = path[]; 2913 return unix(buffer[0 .. 1 + path.length]); 2914 } 2915 2916 private static SocketAddress getSingleAddress(int family, int flags, scope const char[] address, ushort port) { 2917 addrinfo hints; 2918 hints.ai_family = family; 2919 hints.ai_flags = flags; 2920 2921 char[16] portBuffer; 2922 char[] portString = intToString(port, portBuffer[]); 2923 2924 SocketAddress[1] addr; 2925 auto res = get(address, portString, &hints, provideBuffer(addr[])); 2926 if(res.length == 0) 2927 throw ArsdException!"bad address"(address.idup, port); 2928 return res[0]; 2929 } 2930 2931 /++ 2932 Calls `getaddrinfo` and returns the array of results. It will populate the data into the buffer you provide, if you provide one, otherwise it will allocate its own. 2933 +/ 2934 static SocketAddress[] get(scope const char[] nodeName, scope const char[] serviceOrPort, addrinfo* hints = null, return UserProvidedBuffer!SocketAddress buffer = null, scope bool delegate(scope addrinfo* ai) filter = null) @trusted { 2935 addrinfo* res; 2936 CharzBuffer node = nodeName; 2937 CharzBuffer service = serviceOrPort; 2938 auto ret = getaddrinfo(nodeName is null ? null : node.ptr, serviceOrPort is null ? null : service.ptr, hints, &res); 2939 if(ret == 0) { 2940 auto current = res; 2941 while(current) { 2942 if(filter is null || filter(current)) { 2943 SocketAddress addr; 2944 addr.addrlen = cast(socklen_t) current.ai_addrlen; 2945 switch(current.ai_family) { 2946 case AF_INET: 2947 addr.in4 = * cast(sockaddr_in*) current.ai_addr; 2948 break; 2949 case AF_INET6: 2950 addr.in6 = * cast(sockaddr_in6*) current.ai_addr; 2951 break; 2952 case AF_UNIX: 2953 addr.unix_address = * cast(sockaddr_un*) current.ai_addr; 2954 break; 2955 default: 2956 // skip 2957 } 2958 2959 if(!buffer.append(addr)) 2960 break; 2961 } 2962 2963 current = current.ai_next; 2964 } 2965 2966 freeaddrinfo(res); 2967 } else { 2968 version(Windows) { 2969 throw new WindowsApiException("getaddrinfo", ret); 2970 } else { 2971 const char* error = gai_strerror(ret); 2972 } 2973 } 2974 2975 return buffer.slice; 2976 } 2977 2978 /++ 2979 Returns a string representation of the address that identifies it in a custom format. 2980 2981 $(LIST 2982 * Unix domain socket addresses are their path prefixed with "unix:", unless they are in the abstract namespace, in which case it is prefixed with "abstract:" and the zero is trimmed out. For example, "unix:/tmp/pipe". 2983 2984 * IPv4 addresses are written in dotted decimal followed by a colon and the port number. For example, "127.0.0.1:8080". 2985 2986 * IPv6 addresses are written in colon separated hex format, but enclosed in brackets, then followed by the colon and port number. For example, "[::1]:8080". 2987 ) 2988 +/ 2989 string toString() const @trusted { 2990 char[200] buffer; 2991 switch(address.sa_family) { 2992 case AF_INET: 2993 auto writable = stringz(inet_ntop(address.sa_family, &in4.sin_addr, buffer.ptr, buffer.length)); 2994 auto it = writable.borrow; 2995 buffer[it.length] = ':'; 2996 auto numbers = intToString(port, buffer[it.length + 1 .. $]); 2997 return buffer[0 .. it.length + 1 + numbers.length].idup; 2998 case AF_INET6: 2999 buffer[0] = '['; 3000 auto writable = stringz(inet_ntop(address.sa_family, &in6.sin6_addr, buffer.ptr + 1, buffer.length - 1)); 3001 auto it = writable.borrow; 3002 buffer[it.length + 1] = ']'; 3003 buffer[it.length + 2] = ':'; 3004 auto numbers = intToString(port, buffer[it.length + 3 .. $]); 3005 return buffer[0 .. it.length + 3 + numbers.length].idup; 3006 case AF_UNIX: 3007 // FIXME: it might be abstract in which case stringz is wrong!!!!! 3008 auto writable = stringz(cast(char*) unix_address.sun_path.ptr).borrow; 3009 if(writable.length == 0) 3010 return "unix:"; 3011 string prefix = writable[0] == 0 ? "abstract:" : "unix:"; 3012 buffer[0 .. prefix.length] = prefix[]; 3013 buffer[prefix.length .. prefix.length + writable.length] = writable[writable[0] == 0 ? 1 : 0 .. $]; 3014 return buffer.idup; 3015 case AF_UNSPEC: 3016 return "<unspecified address>"; 3017 default: 3018 return "<unsupported address>"; // FIXME 3019 } 3020 } 3021 3022 ushort port() const @trusted { 3023 switch(address.sa_family) { 3024 case AF_INET: 3025 return ntohs(in4.sin_port); 3026 case AF_INET6: 3027 return ntohs(in6.sin6_port); 3028 default: 3029 return 0; 3030 } 3031 } 3032 3033 /+ 3034 @safe unittest { 3035 SocketAddress[4] buffer; 3036 foreach(addr; SocketAddress.get("arsdnet.net", "http", null, provideBuffer(buffer[]))) 3037 writeln(addr.toString()); 3038 } 3039 +/ 3040 3041 /+ 3042 unittest { 3043 // writeln(SocketAddress.ip4(null, 4444, true)); 3044 // writeln(SocketAddress.ip4("400.3.2.1", 4444)); 3045 // writeln(SocketAddress.ip4("bar", 4444)); 3046 foreach(addr; localhost(4444)) 3047 writeln(addr.toString()); 3048 } 3049 +/ 3050 3051 socklen_t addrlen = typeof(this).sizeof - socklen_t.sizeof; // the size of the union below 3052 3053 union { 3054 sockaddr address; 3055 3056 sockaddr_storage storage; 3057 3058 sockaddr_in in4; 3059 sockaddr_in6 in6; 3060 3061 sockaddr_un unix_address; 3062 } 3063 3064 /+ 3065 this(string node, string serviceOrPort, int family = 0) { 3066 // need to populate the approrpiate address and the length and make sure you set sa_family 3067 } 3068 +/ 3069 3070 int domain() { 3071 return address.sa_family; 3072 } 3073 sockaddr* rawAddr() return { 3074 return &address; 3075 } 3076 socklen_t rawAddrLength() { 3077 return addrlen; 3078 } 3079 3080 // FIXME it is AF_BLUETOOTH 3081 // see: https://people.csail.mit.edu/albert/bluez-intro/x79.html 3082 // see: https://learn.microsoft.com/en-us/windows/win32/Bluetooth/bluetooth-programming-with-windows-sockets 3083 } 3084 3085 private version(Windows) { 3086 struct sockaddr_un { 3087 ushort sun_family; 3088 char[108] sun_path; 3089 } 3090 } 3091 3092 class AsyncSocket : AsyncFile { 3093 // otherwise: accept, bind, connect, shutdown, close. 3094 3095 static auto lastError() { 3096 version(Windows) 3097 return WSAGetLastError(); 3098 else 3099 return errno; 3100 } 3101 3102 static bool wouldHaveBlocked() { 3103 auto error = lastError; 3104 version(Windows) { 3105 return error == WSAEWOULDBLOCK || error == WSAETIMEDOUT; 3106 } else { 3107 return error == EAGAIN || error == EWOULDBLOCK; 3108 } 3109 } 3110 3111 version(Windows) 3112 enum INVALID = INVALID_SOCKET; 3113 else 3114 enum INVALID = -1; 3115 3116 // type is mostly SOCK_STREAM or SOCK_DGRAM 3117 /++ 3118 Creates a socket compatible with the given address. It does not actually connect or bind, nor store the address. You will want to pass it again to those functions: 3119 3120 --- 3121 auto socket = new Socket(address, Socket.Type.Stream); 3122 socket.connect(address).waitForCompletion(); 3123 --- 3124 +/ 3125 this(SocketAddress address, int type, int protocol = 0) { 3126 // need to look up these values for linux 3127 // type |= SOCK_NONBLOCK | SOCK_CLOEXEC; 3128 3129 handle_ = socket(address.domain(), type, protocol); 3130 if(handle == INVALID) 3131 throw new SystemApiException("socket", lastError()); 3132 3133 super(cast(NativeFileHandle) handle); // I think that cast is ok on Windows... i think 3134 3135 version(Posix) { 3136 makeNonBlocking(handle); 3137 setCloExec(handle); 3138 } 3139 3140 if(address.domain == AF_INET6) { 3141 int opt = 1; 3142 setsockopt(handle, IPPROTO_IPV6 /*SOL_IPV6*/, IPV6_V6ONLY, &opt, opt.sizeof); 3143 } 3144 3145 // FIXME: chekc for broadcast 3146 3147 // FIXME: REUSEADDR ? 3148 3149 // FIXME: also set NO_DELAY prolly 3150 // int opt = 1; 3151 // setsockopt(handle, IPPROTO_TCP, TCP_NODELAY, &opt, opt.sizeof); 3152 } 3153 3154 /++ 3155 Enabling NODELAY can give latency improvements if you are managing buffers on your end 3156 +/ 3157 void setNoDelay(bool enabled) { 3158 3159 } 3160 3161 /++ 3162 3163 `allowQuickRestart` will set the SO_REUSEADDR on unix and SO_DONTLINGER on Windows, 3164 allowing the application to be quickly restarted despite there still potentially being 3165 pending data in the tcp stack. 3166 3167 See https://stackoverflow.com/questions/3229860/what-is-the-meaning-of-so-reuseaddr-setsockopt-option-linux for more information. 3168 3169 If you already set your appropriate socket options or value correctness and reliability of the network stream over restart speed, leave this at the default `false`. 3170 +/ 3171 void bind(SocketAddress address, bool allowQuickRestart = false) { 3172 if(allowQuickRestart) { 3173 // FIXME 3174 } 3175 3176 auto ret = .bind(handle, address.rawAddr, address.rawAddrLength); 3177 if(ret == -1) 3178 throw new SystemApiException("bind", lastError); 3179 } 3180 3181 /++ 3182 You must call [bind] before this. 3183 3184 The backlog should be set to a value where your application can reliably catch up on the backlog in a reasonable amount of time under average load. It is meant to smooth over short duration bursts and making it too big will leave clients hanging - which might cause them to try to reconnect, thinking things got lost in transit, adding to your impossible backlog. 3185 3186 I personally tend to set this to be two per worker thread unless I have actual real world measurements saying to do something else. It is a bit arbitrary and not based on legitimate reasoning, it just seems to work for me (perhaps just because it has never really been put to the test). 3187 +/ 3188 void listen(int backlog) { 3189 auto ret = .listen(handle, backlog); 3190 if(ret == -1) 3191 throw new SystemApiException("listen", lastError); 3192 } 3193 3194 /++ 3195 +/ 3196 void shutdown(int how) { 3197 auto ret = .shutdown(handle, how); 3198 if(ret == -1) 3199 throw new SystemApiException("shutdown", lastError); 3200 } 3201 3202 /++ 3203 +/ 3204 override void close() { 3205 version(Windows) 3206 closesocket(handle); 3207 else 3208 .close(handle); 3209 handle_ = -1; 3210 } 3211 3212 /++ 3213 You can also construct your own request externally to control the memory more. 3214 +/ 3215 AsyncConnectRequest connect(SocketAddress address, ubyte[] bufferToSend = null) { 3216 return new AsyncConnectRequest(this, address, bufferToSend); 3217 } 3218 3219 /++ 3220 You can also construct your own request externally to control the memory more. 3221 +/ 3222 AsyncAcceptRequest accept() { 3223 return new AsyncAcceptRequest(this); 3224 } 3225 3226 // note that send is just sendto w/ a null address 3227 // and receive is just receivefrom w/ a null address 3228 /++ 3229 You can also construct your own request externally to control the memory more. 3230 +/ 3231 AsyncSendRequest send(const(ubyte)[] buffer, int flags = 0) { 3232 return new AsyncSendRequest(this, buffer, null, flags); 3233 } 3234 3235 /++ 3236 You can also construct your own request externally to control the memory more. 3237 +/ 3238 AsyncReceiveRequest receive(ubyte[] buffer, int flags = 0) { 3239 return new AsyncReceiveRequest(this, buffer, null, flags); 3240 } 3241 3242 /++ 3243 You can also construct your own request externally to control the memory more. 3244 +/ 3245 AsyncSendRequest sendTo(const(ubyte)[] buffer, SocketAddress* address, int flags = 0) { 3246 return new AsyncSendRequest(this, buffer, address, flags); 3247 } 3248 /++ 3249 You can also construct your own request externally to control the memory more. 3250 +/ 3251 AsyncReceiveRequest receiveFrom(ubyte[] buffer, SocketAddress* address, int flags = 0) { 3252 return new AsyncReceiveRequest(this, buffer, address, flags); 3253 } 3254 3255 /++ 3256 +/ 3257 SocketAddress localAddress() { 3258 SocketAddress addr; 3259 getsockname(handle, &addr.address, &addr.addrlen); 3260 return addr; 3261 } 3262 /++ 3263 +/ 3264 SocketAddress peerAddress() { 3265 SocketAddress addr; 3266 getpeername(handle, &addr.address, &addr.addrlen); 3267 return addr; 3268 } 3269 3270 // for unix sockets on unix only: send/receive fd, get peer creds 3271 3272 /++ 3273 3274 +/ 3275 final NativeSocketHandle handle() { 3276 return handle_; 3277 } 3278 3279 private NativeSocketHandle handle_; 3280 } 3281 3282 /++ 3283 Initiates a connection request and optionally sends initial data as soon as possible. 3284 3285 Calls `ConnectEx` on Windows and emulates it on other systems. 3286 3287 The entire buffer is sent before the operation is considered complete. 3288 3289 NOT IMPLEMENTED / NOT STABLE 3290 +/ 3291 class AsyncConnectRequest : AsyncOperationRequest { 3292 // FIXME: i should take a list of addresses and take the first one that succeeds, so a getaddrinfo can be sent straight in. 3293 this(AsyncSocket socket, SocketAddress address, ubyte[] dataToWrite) { 3294 3295 } 3296 3297 override void start() {} 3298 override void cancel() {} 3299 override bool isComplete() { return true; } 3300 override AsyncConnectResponse waitForCompletion() { assert(0); } 3301 } 3302 /++ 3303 +/ 3304 class AsyncConnectResponse : AsyncOperationResponse { 3305 const SystemErrorCode errorCode; 3306 3307 this(SystemErrorCode errorCode) { 3308 this.errorCode = errorCode; 3309 } 3310 3311 override bool wasSuccessful() { 3312 return errorCode.wasSuccessful; 3313 } 3314 3315 } 3316 3317 // FIXME: TransmitFile/sendfile support 3318 3319 /++ 3320 Calls `AcceptEx` on Windows and emulates it on other systems. 3321 3322 NOT IMPLEMENTED / NOT STABLE 3323 +/ 3324 class AsyncAcceptRequest : AsyncOperationRequest { 3325 AsyncSocket socket; 3326 3327 override void start() {} 3328 override void cancel() {} 3329 override bool isComplete() { return true; } 3330 override AsyncConnectResponse waitForCompletion() { assert(0); } 3331 3332 3333 struct LowLevelOperation { 3334 AsyncSocket file; 3335 ubyte[] buffer; 3336 SocketAddress* address; 3337 3338 this(typeof(this.tupleof) args) { 3339 this.tupleof = args; 3340 } 3341 3342 version(Windows) { 3343 auto opCall(OVERLAPPED* overlapped, LPOVERLAPPED_COMPLETION_ROUTINE ocr) { 3344 WSABUF buf; 3345 buf.len = cast(int) buffer.length; 3346 buf.buf = cast(typeof(buf.buf)) buffer.ptr; 3347 3348 uint flags; 3349 3350 if(address is null) 3351 return WSARecv(file.handle, &buf, 1, null, &flags, overlapped, ocr); 3352 else { 3353 return WSARecvFrom(file.handle, &buf, 1, null, &flags, &(address.address), &(address.addrlen), overlapped, ocr); 3354 } 3355 } 3356 } else { 3357 auto opCall() { 3358 int flags; 3359 if(address is null) 3360 return core.sys.posix.sys.socket.recv(file.handle, buffer.ptr, buffer.length, flags); 3361 else 3362 return core.sys.posix.sys.socket.recvfrom(file.handle, buffer.ptr, buffer.length, flags, &(address.address), &(address.addrlen)); 3363 } 3364 } 3365 3366 string errorString() { 3367 return "Receive"; 3368 } 3369 } 3370 mixin OverlappedIoRequest!(AsyncAcceptResponse, LowLevelOperation); 3371 3372 this(AsyncSocket socket, ubyte[] buffer = null, SocketAddress* address = null) { 3373 llo = LowLevelOperation(socket, buffer, address); 3374 this.response = typeof(this.response).defaultConstructed; 3375 } 3376 3377 // can also look up the local address 3378 } 3379 /++ 3380 +/ 3381 class AsyncAcceptResponse : AsyncOperationResponse { 3382 AsyncSocket newSocket; 3383 const SystemErrorCode errorCode; 3384 3385 this(SystemErrorCode errorCode, ubyte[] buffer) { 3386 this.errorCode = errorCode; 3387 } 3388 3389 this(AsyncSocket newSocket, SystemErrorCode errorCode) { 3390 this.newSocket = newSocket; 3391 this.errorCode = errorCode; 3392 } 3393 3394 override bool wasSuccessful() { 3395 return errorCode.wasSuccessful; 3396 } 3397 } 3398 3399 /++ 3400 +/ 3401 class AsyncReceiveRequest : AsyncOperationRequest { 3402 struct LowLevelOperation { 3403 AsyncSocket file; 3404 ubyte[] buffer; 3405 int flags; 3406 SocketAddress* address; 3407 3408 this(typeof(this.tupleof) args) { 3409 this.tupleof = args; 3410 } 3411 3412 version(Windows) { 3413 auto opCall(OVERLAPPED* overlapped, LPOVERLAPPED_COMPLETION_ROUTINE ocr) { 3414 WSABUF buf; 3415 buf.len = cast(int) buffer.length; 3416 buf.buf = cast(typeof(buf.buf)) buffer.ptr; 3417 3418 uint flags = this.flags; 3419 3420 if(address is null) 3421 return WSARecv(file.handle, &buf, 1, null, &flags, overlapped, ocr); 3422 else { 3423 return WSARecvFrom(file.handle, &buf, 1, null, &flags, &(address.address), &(address.addrlen), overlapped, ocr); 3424 } 3425 } 3426 } else { 3427 auto opCall() { 3428 if(address is null) 3429 return core.sys.posix.sys.socket.recv(file.handle, buffer.ptr, buffer.length, flags); 3430 else 3431 return core.sys.posix.sys.socket.recvfrom(file.handle, buffer.ptr, buffer.length, flags, &(address.address), &(address.addrlen)); 3432 } 3433 } 3434 3435 string errorString() { 3436 return "Receive"; 3437 } 3438 } 3439 mixin OverlappedIoRequest!(AsyncReceiveResponse, LowLevelOperation); 3440 3441 this(AsyncSocket socket, ubyte[] buffer, SocketAddress* address, int flags) { 3442 llo = LowLevelOperation(socket, buffer, flags, address); 3443 this.response = typeof(this.response).defaultConstructed; 3444 } 3445 3446 } 3447 /++ 3448 +/ 3449 class AsyncReceiveResponse : AsyncOperationResponse { 3450 const ubyte[] bufferWritten; 3451 const SystemErrorCode errorCode; 3452 3453 this(SystemErrorCode errorCode, const(ubyte)[] bufferWritten) { 3454 this.errorCode = errorCode; 3455 this.bufferWritten = bufferWritten; 3456 } 3457 3458 override bool wasSuccessful() { 3459 return errorCode.wasSuccessful; 3460 } 3461 } 3462 3463 /++ 3464 +/ 3465 class AsyncSendRequest : AsyncOperationRequest { 3466 struct LowLevelOperation { 3467 AsyncSocket file; 3468 const(ubyte)[] buffer; 3469 int flags; 3470 SocketAddress* address; 3471 3472 this(typeof(this.tupleof) args) { 3473 this.tupleof = args; 3474 } 3475 3476 version(Windows) { 3477 auto opCall(OVERLAPPED* overlapped, LPOVERLAPPED_COMPLETION_ROUTINE ocr) { 3478 WSABUF buf; 3479 buf.len = cast(int) buffer.length; 3480 buf.buf = cast(typeof(buf.buf)) buffer.ptr; 3481 3482 if(address is null) 3483 return WSASend(file.handle, &buf, 1, null, flags, overlapped, ocr); 3484 else { 3485 return WSASendTo(file.handle, &buf, 1, null, flags, address.rawAddr, address.rawAddrLength, overlapped, ocr); 3486 } 3487 } 3488 } else { 3489 auto opCall() { 3490 if(address is null) 3491 return core.sys.posix.sys.socket.send(file.handle, buffer.ptr, buffer.length, flags); 3492 else 3493 return core.sys.posix.sys.socket.sendto(file.handle, buffer.ptr, buffer.length, flags, address.rawAddr, address.rawAddrLength); 3494 } 3495 } 3496 3497 string errorString() { 3498 return "Send"; 3499 } 3500 } 3501 mixin OverlappedIoRequest!(AsyncSendResponse, LowLevelOperation); 3502 3503 this(AsyncSocket socket, const(ubyte)[] buffer, SocketAddress* address, int flags) { 3504 llo = LowLevelOperation(socket, buffer, flags, address); 3505 this.response = typeof(this.response).defaultConstructed; 3506 } 3507 } 3508 3509 /++ 3510 +/ 3511 class AsyncSendResponse : AsyncOperationResponse { 3512 const ubyte[] bufferWritten; 3513 const SystemErrorCode errorCode; 3514 3515 this(SystemErrorCode errorCode, const(ubyte)[] bufferWritten) { 3516 this.errorCode = errorCode; 3517 this.bufferWritten = bufferWritten; 3518 } 3519 3520 override bool wasSuccessful() { 3521 return errorCode.wasSuccessful; 3522 } 3523 3524 } 3525 3526 /++ 3527 A set of sockets bound and ready to accept connections on worker threads. 3528 3529 Depending on the specified address, it can be tcp, tcpv6, unix domain, or all of the above. 3530 3531 NOT IMPLEMENTED / NOT STABLE 3532 +/ 3533 class StreamServer { 3534 AsyncSocket[] sockets; 3535 3536 this(SocketAddress[] listenTo, int backlog = 8) { 3537 foreach(listen; listenTo) { 3538 auto socket = new AsyncSocket(listen, SOCK_STREAM); 3539 3540 // FIXME: allInterfaces for ipv6 also covers ipv4 so the bind can fail... 3541 // so we have to permit it to fail w/ address in use if we know we already 3542 // are listening to ipv6 3543 3544 // or there is a setsockopt ipv6 only thing i could set. 3545 3546 socket.bind(listen); 3547 socket.listen(backlog); 3548 sockets ~= socket; 3549 3550 // writeln(socket.localAddress.port); 3551 } 3552 3553 // i have to start accepting on each thread for each socket... 3554 } 3555 // when a new connection arrives, it calls your callback 3556 // can be on a specific thread or on any thread 3557 3558 3559 void start() { 3560 foreach(socket; sockets) { 3561 auto request = socket.accept(); 3562 request.start(); 3563 } 3564 } 3565 } 3566 3567 /+ 3568 unittest { 3569 auto ss = new StreamServer(SocketAddress.localhost(0)); 3570 } 3571 +/ 3572 3573 /++ 3574 A socket bound and ready to use receiveFrom 3575 3576 Depending on the address, it can be udp or unix domain. 3577 3578 NOT IMPLEMENTED / NOT STABLE 3579 +/ 3580 class DatagramListener { 3581 // whenever a udp message arrives, it calls your callback 3582 // can be on a specific thread or on any thread 3583 3584 // UDP is realistically just an async read on the bound socket 3585 // just it can get the "from" data out and might need the "more in packet" flag 3586 } 3587 3588 /++ 3589 Just in case I decide to change the implementation some day. 3590 +/ 3591 alias AsyncAnonymousPipe = AsyncFile; 3592 3593 3594 // AsyncAnonymousPipe connectNamedPipe(AsyncAnonymousPipe preallocated, string name) 3595 3596 // unix fifos are considered just non-seekable files and have no special support in the lib; open them as a regular file w/ the async flag. 3597 3598 // DIRECTORY LISTINGS 3599 // not async, so if you want that, do it in a helper thread 3600 // just a convenient function to have (tho phobos has a decent one too, importing it expensive af) 3601 3602 /++ 3603 Note that the order of items called for your delegate is undefined; if you want it sorted, you'll have to collect and sort yourself. But it *might* be sorted by the OS (on Windows, it almost always is), so consider that when choosing a sorting algorithm. 3604 3605 History: 3606 previously in minigui as a private function. Moved to arsd.core on April 3, 2023 3607 +/ 3608 GetFilesResult getFiles(string directory, scope void delegate(string name, bool isDirectory) dg) { 3609 // FIXME: my buffers here aren't great lol 3610 3611 SavedArgument[1] argsForException() { 3612 return [ 3613 SavedArgument("directory", LimitedVariant(directory)), 3614 ]; 3615 } 3616 3617 version(Windows) { 3618 WIN32_FIND_DATA data; 3619 // FIXME: if directory ends with / or \\ ? 3620 WCharzBuffer search = WCharzBuffer(directory ~ "/*"); 3621 auto handle = FindFirstFileW(search.ptr, &data); 3622 scope(exit) if(handle !is INVALID_HANDLE_VALUE) FindClose(handle); 3623 if(handle is INVALID_HANDLE_VALUE) { 3624 if(GetLastError() == ERROR_FILE_NOT_FOUND) 3625 return GetFilesResult.fileNotFound; 3626 throw new WindowsApiException("FindFirstFileW", GetLastError(), argsForException()[]); 3627 } 3628 3629 try_more: 3630 3631 string name = makeUtf8StringFromWindowsString(data.cFileName[0 .. findIndexOfZero(data.cFileName[])]); 3632 3633 dg(name, (data.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY) ? true : false); 3634 3635 auto ret = FindNextFileW(handle, &data); 3636 if(ret == 0) { 3637 if(GetLastError() == ERROR_NO_MORE_FILES) 3638 return GetFilesResult.success; 3639 throw new WindowsApiException("FindNextFileW", GetLastError(), argsForException()[]); 3640 } 3641 3642 goto try_more; 3643 3644 } else version(Posix) { 3645 import core.sys.posix.dirent; 3646 import core.stdc.errno; 3647 auto dir = opendir((directory ~ "\0").ptr); 3648 scope(exit) 3649 if(dir) closedir(dir); 3650 if(dir is null) 3651 throw new ErrnoApiException("opendir", errno, argsForException()); 3652 3653 auto dirent = readdir(dir); 3654 if(dirent is null) 3655 return GetFilesResult.fileNotFound; 3656 3657 try_more: 3658 3659 string name = dirent.d_name[0 .. findIndexOfZero(dirent.d_name[])].idup; 3660 3661 dg(name, dirent.d_type == DT_DIR); 3662 3663 dirent = readdir(dir); 3664 if(dirent is null) 3665 return GetFilesResult.success; 3666 3667 goto try_more; 3668 } else static assert(0); 3669 } 3670 3671 /// ditto 3672 enum GetFilesResult { 3673 success, 3674 fileNotFound 3675 } 3676 3677 /++ 3678 This is currently a simplified glob where only the * wildcard in the first or last position gets special treatment or a single * in the middle. 3679 3680 More things may be added later to be more like what Phobos supports. 3681 +/ 3682 bool matchesFilePattern(scope const(char)[] name, scope const(char)[] pattern) { 3683 if(pattern.length == 0) 3684 return false; 3685 if(pattern == "*") 3686 return true; 3687 if(pattern.length > 2 && pattern[0] == '*' && pattern[$-1] == '*') { 3688 // if the rest of pattern appears in name, it is good 3689 return name.indexOf(pattern[1 .. $-1]) != -1; 3690 } else if(pattern[0] == '*') { 3691 // if the rest of pattern is at end of name, it is good 3692 return name.endsWith(pattern[1 .. $]); 3693 } else if(pattern[$-1] == '*') { 3694 // if the rest of pattern is at start of name, it is good 3695 return name.startsWith(pattern[0 .. $-1]); 3696 } else if(pattern.length >= 3) { 3697 auto idx = pattern.indexOf("*"); 3698 if(idx != -1) { 3699 auto lhs = pattern[0 .. idx]; 3700 auto rhs = pattern[idx + 1 .. $]; 3701 if(name.length >= lhs.length + rhs.length) { 3702 return name.startsWith(lhs) && name.endsWith(rhs); 3703 } else { 3704 return false; 3705 } 3706 } 3707 } 3708 3709 return name == pattern; 3710 } 3711 3712 unittest { 3713 assert("test.html".matchesFilePattern("*")); 3714 assert("test.html".matchesFilePattern("*.html")); 3715 assert("test.html".matchesFilePattern("*.*")); 3716 assert("test.html".matchesFilePattern("test.*")); 3717 assert(!"test.html".matchesFilePattern("pest.*")); 3718 assert(!"test.html".matchesFilePattern("*.dhtml")); 3719 3720 assert("test.html".matchesFilePattern("t*.html")); 3721 assert(!"test.html".matchesFilePattern("e*.html")); 3722 } 3723 3724 package(arsd) int indexOf(scope const(char)[] haystack, scope const(char)[] needle) { 3725 if(haystack.length < needle.length) 3726 return -1; 3727 if(haystack == needle) 3728 return 0; 3729 foreach(i; 0 .. haystack.length - needle.length + 1) 3730 if(haystack[i .. i + needle.length] == needle) 3731 return cast(int) i; 3732 return -1; 3733 } 3734 3735 unittest { 3736 assert("foo".indexOf("f") == 0); 3737 assert("foo".indexOf("o") == 1); 3738 assert("foo".indexOf("foo") == 0); 3739 assert("foo".indexOf("oo") == 1); 3740 assert("foo".indexOf("fo") == 0); 3741 assert("foo".indexOf("boo") == -1); 3742 assert("foo".indexOf("food") == -1); 3743 } 3744 3745 package(arsd) bool endsWith(scope const(char)[] haystack, scope const(char)[] needle) { 3746 if(needle.length > haystack.length) 3747 return false; 3748 return haystack[$ - needle.length .. $] == needle; 3749 } 3750 3751 unittest { 3752 assert("foo".endsWith("o")); 3753 assert("foo".endsWith("oo")); 3754 assert("foo".endsWith("foo")); 3755 assert(!"foo".endsWith("food")); 3756 assert(!"foo".endsWith("d")); 3757 } 3758 3759 package(arsd) bool startsWith(scope const(char)[] haystack, scope const(char)[] needle) { 3760 if(needle.length > haystack.length) 3761 return false; 3762 return haystack[0 .. needle.length] == needle; 3763 } 3764 3765 unittest { 3766 assert("foo".startsWith("f")); 3767 assert("foo".startsWith("fo")); 3768 assert("foo".startsWith("foo")); 3769 assert(!"foo".startsWith("food")); 3770 assert(!"foo".startsWith("d")); 3771 } 3772 3773 3774 // FILE/DIR WATCHES 3775 // linux does it by name, windows and bsd do it by handle/descriptor 3776 // dispatches change event to either your thread or maybe the any task` queue. 3777 3778 /++ 3779 PARTIALLY IMPLEMENTED / NOT STABLE 3780 3781 +/ 3782 class DirectoryWatcher { 3783 private { 3784 version(Arsd_core_windows) { 3785 OVERLAPPED overlapped; 3786 HANDLE hDirectory; 3787 ubyte[] buffer; 3788 3789 extern(Windows) 3790 static void overlappedCompletionRoutine(DWORD dwErrorCode, DWORD dwNumberOfBytesTransferred, LPOVERLAPPED lpOverlapped) { 3791 typeof(this) rr = cast(typeof(this)) (cast(void*) lpOverlapped - typeof(this).overlapped.offsetof); 3792 3793 // dwErrorCode 3794 auto response = rr.buffer[0 .. dwNumberOfBytesTransferred]; 3795 3796 while(response.length) { 3797 auto fni = cast(FILE_NOTIFY_INFORMATION*) response.ptr; 3798 auto filename = fni.FileName[0 .. fni.FileNameLength]; 3799 3800 if(fni.NextEntryOffset) 3801 response = response[fni.NextEntryOffset .. $]; 3802 else 3803 response = response[$..$]; 3804 3805 // FIXME: I think I need to pin every overlapped op while it is pending 3806 // and unpin it when it is returned. GC.addRoot... but i don't wanna do that 3807 // every op so i guess i should do a refcount scheme similar to the other callback helper. 3808 3809 rr.changeHandler( 3810 FilePath(makeUtf8StringFromWindowsString(filename)), // FIXME: this is a relative path 3811 ChangeOperation.unknown // FIXME this is fni.Action 3812 ); 3813 } 3814 3815 rr.requestRead(); 3816 } 3817 3818 void requestRead() { 3819 DWORD ignored; 3820 if(!ReadDirectoryChangesW( 3821 hDirectory, 3822 buffer.ptr, 3823 cast(int) buffer.length, 3824 recursive, 3825 FILE_NOTIFY_CHANGE_LAST_WRITE | FILE_NOTIFY_CHANGE_CREATION | FILE_NOTIFY_CHANGE_FILE_NAME, 3826 &ignored, 3827 &overlapped, 3828 &overlappedCompletionRoutine 3829 )) { 3830 auto error = GetLastError(); 3831 /+ 3832 if(error == ERROR_IO_PENDING) { 3833 // not expected here, the docs say it returns true when queued 3834 } 3835 +/ 3836 3837 throw new SystemApiException("ReadDirectoryChangesW", error); 3838 } 3839 } 3840 } else version(Arsd_core_epoll) { 3841 static int inotifyfd = -1; // this is TLS since it is associated with the thread's event loop 3842 static ICoreEventLoop.UnregisterToken inotifyToken; 3843 static CallbackHelper inotifycb; 3844 static DirectoryWatcher[int] watchMappings; 3845 3846 static ~this() { 3847 if(inotifyfd != -1) { 3848 close(inotifyfd); 3849 inotifyfd = -1; 3850 } 3851 } 3852 3853 import core.sys.linux.sys.inotify; 3854 3855 int watchId = -1; 3856 3857 static void inotifyReady() { 3858 // read from it 3859 ubyte[256 /* NAME_MAX + 1 */ + inotify_event.sizeof] sbuffer; 3860 3861 auto ret = read(inotifyfd, sbuffer.ptr, sbuffer.length); 3862 if(ret == -1) { 3863 auto errno = errno; 3864 if(errno == EAGAIN || errno == EWOULDBLOCK) 3865 return; 3866 throw new SystemApiException("read inotify", errno); 3867 } else if(ret == 0) { 3868 assert(0, "I don't think this is ever supposed to happen"); 3869 } 3870 3871 auto buffer = sbuffer[0 .. ret]; 3872 3873 while(buffer.length > 0) { 3874 inotify_event* event = cast(inotify_event*) buffer.ptr; 3875 buffer = buffer[inotify_event.sizeof .. $]; 3876 char[] filename = cast(char[]) buffer[0 .. event.len]; 3877 buffer = buffer[event.len .. $]; 3878 3879 // note that filename is padded with zeroes, so it is actually a stringz 3880 3881 if(auto obj = event.wd in watchMappings) { 3882 (*obj).changeHandler( 3883 FilePath(stringz(filename.ptr).borrow.idup), // FIXME: this is a relative path 3884 ChangeOperation.unknown // FIXME 3885 ); 3886 } else { 3887 // it has probably already been removed 3888 } 3889 } 3890 } 3891 } else version(Arsd_core_kqueue) { 3892 int fd; 3893 CallbackHelper cb; 3894 } 3895 3896 FilePath path; 3897 string globPattern; 3898 bool recursive; 3899 void delegate(FilePath filename, ChangeOperation op) changeHandler; 3900 } 3901 3902 enum ChangeOperation { 3903 unknown, 3904 deleted, // NOTE_DELETE, IN_DELETE, FILE_NOTIFY_CHANGE_FILE_NAME 3905 written, // NOTE_WRITE / NOTE_EXTEND / NOTE_TRUNCATE, IN_MODIFY, FILE_NOTIFY_CHANGE_LAST_WRITE / FILE_NOTIFY_CHANGE_SIZE 3906 renamed, // NOTE_RENAME, the moved from/to in linux, FILE_NOTIFY_CHANGE_FILE_NAME 3907 metadataChanged // NOTE_ATTRIB, IN_ATTRIB, FILE_NOTIFY_CHANGE_ATTRIBUTES 3908 3909 // there is a NOTE_OPEN on freebsd 13, and the access change on Windows. and an open thing on linux. so maybe i can do note open/note_read too. 3910 } 3911 3912 /+ 3913 Windows and Linux work best when you watch directories. The operating system tells you the name of files as they change. 3914 3915 BSD doesn't support this. You can only get names and reports when a file is modified by watching specific files. AS such, when you watch a directory on those systems, your delegate will be called with a null path. Cross-platform applications should check for this and not assume the name is always usable. 3916 3917 inotify is kinda clearly the best of the bunch, with Windows in second place, and kqueue dead last. 3918 3919 3920 If path to watch is a directory, it signals when a file inside the directory (only one layer deep) is created or modified. This is the most efficient on Windows and Linux. 3921 3922 If a path is a file, it only signals when that specific file is written. This is most efficient on BSD. 3923 3924 3925 The delegate is called when something happens. Note that the path modified may not be accurate on all systems when you are watching a directory. 3926 +/ 3927 3928 /++ 3929 Watches a directory and its contents. If the `globPattern` is `null`, it will not attempt to add child items but also will not filter it, meaning you will be left with platform-specific behavior. 3930 3931 On Windows, the globPattern is just used to filter events. 3932 3933 On Linux, the `recursive` flag, if set, will cause it to add additional OS-level watches for each subdirectory. 3934 3935 On BSD, anything other than a null pattern will cause a directory scan to add files to the watch list. 3936 3937 For best results, use the most limited thing you need, as watches can get quite involved on the bsd systems. 3938 3939 Newly added files and subdirectories may not be automatically added in all cases, meaning if it is added and then subsequently modified, you might miss a notification. 3940 3941 If the event queue is too busy, the OS may skip a notification. 3942 3943 You should always offer some way for the user to force a refresh and not rely on notifications being present; they are a convenience when they work, not an always reliable method. 3944 +/ 3945 this(FilePath directoryToWatch, string globPattern, bool recursive, void delegate(FilePath pathModified, ChangeOperation op) dg) { 3946 this.path = directoryToWatch; 3947 this.globPattern = globPattern; 3948 this.recursive = recursive; 3949 this.changeHandler = dg; 3950 3951 version(Arsd_core_windows) { 3952 WCharzBuffer wname = directoryToWatch.path; 3953 buffer = new ubyte[](1024); 3954 hDirectory = CreateFileW( 3955 wname.ptr, 3956 GENERIC_READ, 3957 FILE_SHARE_READ, 3958 null, 3959 OPEN_EXISTING, 3960 FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED | FILE_FLAG_BACKUP_SEMANTICS, 3961 null 3962 ); 3963 if(hDirectory == INVALID_HANDLE_VALUE) 3964 throw new SystemApiException("CreateFileW", GetLastError()); 3965 3966 requestRead(); 3967 } else version(Arsd_core_epoll) { 3968 auto el = getThisThreadEventLoop(); 3969 3970 // no need for sync because it is thread-local 3971 if(inotifyfd == -1) { 3972 inotifyfd = inotify_init1(IN_NONBLOCK | IN_CLOEXEC); 3973 if(inotifyfd == -1) 3974 throw new SystemApiException("inotify_init1", errno); 3975 3976 inotifycb = new CallbackHelper(&inotifyReady); 3977 inotifyToken = el.addCallbackOnFdReadable(inotifyfd, inotifycb); 3978 } 3979 3980 uint event_mask = IN_CREATE | IN_MODIFY | IN_DELETE; // FIXME 3981 CharzBuffer dtw = directoryToWatch.path; 3982 auto watchId = inotify_add_watch(inotifyfd, dtw.ptr, event_mask); 3983 if(watchId < -1) 3984 throw new SystemApiException("inotify_add_watch", errno, [SavedArgument("path", LimitedVariant(directoryToWatch.path))]); 3985 3986 watchMappings[watchId] = this; 3987 3988 // FIXME: recursive needs to add child things individually 3989 3990 } else version(Arsd_core_kqueue) { 3991 auto el = cast(CoreEventLoopImplementation) getThisThreadEventLoop(); 3992 3993 // FIXME: need to scan for globPattern 3994 // when a new file is added, i'll have to diff my list to detect it and open it too 3995 // and recursive might need to scan down too. 3996 3997 kevent_t ev; 3998 3999 import core.sys.posix.fcntl; 4000 CharzBuffer buffer = CharzBuffer(directoryToWatch.path); 4001 fd = ErrnoEnforce!open(buffer.ptr, O_RDONLY); 4002 setCloExec(fd); 4003 4004 cb = new CallbackHelper(&triggered); 4005 4006 EV_SET(&ev, fd, EVFILT_VNODE, EV_ADD | EV_ENABLE | EV_CLEAR, NOTE_WRITE, 0, cast(void*) cb); 4007 ErrnoEnforce!kevent(el.kqueuefd, &ev, 1, null, 0, null); 4008 } else assert(0, "Not yet implemented for this platform"); 4009 } 4010 4011 private void triggered() { 4012 writeln("triggered"); 4013 } 4014 4015 void dispose() { 4016 version(Arsd_core_windows) { 4017 CloseHandle(hDirectory); 4018 } else version(Arsd_core_epoll) { 4019 watchMappings.remove(watchId); // I could also do this on the IN_IGNORE notification but idk 4020 inotify_rm_watch(inotifyfd, watchId); 4021 } else version(Arsd_core_kqueue) { 4022 ErrnoEnforce!close(fd); 4023 fd = -1; 4024 } 4025 } 4026 } 4027 4028 version(none) 4029 void main() { 4030 4031 // auto file = new AsyncFile(FilePath("test.txt"), AsyncFile.OpenMode.writeWithTruncation, AsyncFile.RequirePreexisting.yes); 4032 4033 /+ 4034 getFiles("c:/windows\\", (string filename, bool isDirectory) { 4035 writeln(filename, " ", isDirectory ? "[dir]": "[file]"); 4036 }); 4037 +/ 4038 4039 auto w = new DirectoryWatcher(FilePath("."), "*", false, (path, op) { 4040 writeln(path.path); 4041 }); 4042 getThisThreadEventLoop().run(() => false); 4043 } 4044 4045 /++ 4046 This starts up a local pipe. If it is already claimed, it just communicates with the existing one through the interface. 4047 +/ 4048 class SingleInstanceApplication { 4049 // FIXME 4050 } 4051 4052 version(none) 4053 void main() { 4054 4055 auto file = new AsyncFile(FilePath("test.txt"), AsyncFile.OpenMode.writeWithTruncation, AsyncFile.RequirePreexisting.yes); 4056 4057 auto buffer = cast(ubyte[]) "hello"; 4058 auto wr = new AsyncWriteRequest(file, buffer, 0); 4059 wr.start(); 4060 4061 wr.waitForCompletion(); 4062 4063 file.close(); 4064 } 4065 4066 /++ 4067 Implementation details of some requests. You shouldn't need to know any of this, the interface is all public. 4068 +/ 4069 mixin template OverlappedIoRequest(Response, LowLevelOperation) { 4070 private { 4071 LowLevelOperation llo; 4072 4073 OwnedClass!Response response; 4074 4075 version(Windows) { 4076 OVERLAPPED overlapped; 4077 4078 extern(Windows) 4079 static void overlappedCompletionRoutine(DWORD dwErrorCode, DWORD dwNumberOfBytesTransferred, LPOVERLAPPED lpOverlapped) { 4080 typeof(this) rr = cast(typeof(this)) (cast(void*) lpOverlapped - typeof(this).overlapped.offsetof); 4081 4082 rr.response = typeof(rr.response)(SystemErrorCode(dwErrorCode), rr.llo.buffer[0 .. dwNumberOfBytesTransferred]); 4083 rr.state_ = State.complete; 4084 4085 // FIXME: on complete? 4086 4087 // this will queue our CallbackHelper and that should be run at the end of the event loop after it is woken up by the APC run 4088 } 4089 } 4090 4091 version(Posix) { 4092 ICoreEventLoop.RearmToken eventRegistration; 4093 CallbackHelper cb; 4094 4095 final CallbackHelper getCb() { 4096 if(cb is null) 4097 cb = new CallbackHelper(&cbImpl); 4098 return cb; 4099 } 4100 4101 final void cbImpl() { 4102 // it is ready to complete, time to do it 4103 auto ret = llo(); 4104 markCompleted(ret, errno); 4105 } 4106 4107 void markCompleted(long ret, int errno) { 4108 // maybe i should queue an apc to actually do it, to ensure the event loop has cycled... FIXME 4109 if(ret == -1) 4110 response = typeof(response)(SystemErrorCode(errno), null); 4111 else 4112 response = typeof(response)(SystemErrorCode(0), llo.buffer[0 .. cast(size_t) ret]); 4113 state_ = State.complete; 4114 } 4115 } 4116 } 4117 4118 enum State { 4119 unused, 4120 started, 4121 inProgress, 4122 complete 4123 } 4124 private State state_; 4125 4126 override void start() { 4127 assert(state_ == State.unused); 4128 4129 state_ = State.started; 4130 4131 version(Windows) { 4132 if(llo(&overlapped, &overlappedCompletionRoutine)) { 4133 // all good, though GetLastError() might have some informative info 4134 } else { 4135 // operation failed, the operation is always ReadFileEx or WriteFileEx so it won't give the io pending thing here 4136 // should i issue error async? idk 4137 state_ = State.complete; 4138 throw new SystemApiException(llo.errorString(), GetLastError()); 4139 } 4140 4141 // ReadFileEx always queues, even if it completed synchronously. I *could* check the get overlapped result and sleepex here but i'm prolly better off just letting the event loop do its thing anyway. 4142 } else version(Posix) { 4143 4144 // first try to just do it 4145 auto ret = llo(); 4146 4147 auto errno = errno; 4148 if(ret == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) { // unable to complete right now, register and try when it is ready 4149 eventRegistration = getThisThreadEventLoop().addCallbackOnFdReadableOneShot(this.llo.file.handle, this.getCb); 4150 } else { 4151 // i could set errors sync or async and since it couldn't even start, i think a sync exception is the right way 4152 if(ret == -1) 4153 throw new SystemApiException(llo.errorString(), errno); 4154 markCompleted(ret, errno); // it completed synchronously (if it is an error nor not is handled by the completion handler) 4155 } 4156 } 4157 } 4158 4159 4160 override void cancel() { 4161 if(state_ == State.complete) 4162 return; // it has already finished, just leave it alone, no point discarding what is already done 4163 version(Windows) { 4164 if(state_ != State.unused) 4165 Win32Enforce!CancelIoEx(llo.file.AbstractFile.handle, &overlapped); 4166 // Windows will notify us when the cancellation is complete, so we need to wait for that before updating the state 4167 } else version(Posix) { 4168 if(state_ != State.unused) 4169 eventRegistration.unregister(); 4170 markCompleted(-1, ECANCELED); 4171 } 4172 } 4173 4174 override bool isComplete() { 4175 // just always let the event loop do it instead 4176 return state_ == State.complete; 4177 4178 /+ 4179 version(Windows) { 4180 return HasOverlappedIoCompleted(&overlapped); 4181 } else version(Posix) { 4182 return state_ == State.complete; 4183 4184 } 4185 +/ 4186 } 4187 4188 override Response waitForCompletion() { 4189 if(state_ == State.unused) 4190 start(); 4191 4192 // FIXME: if we are inside a fiber, we can set a oncomplete callback and then yield instead... 4193 if(state_ != State.complete) 4194 getThisThreadEventLoop().run(&isComplete); 4195 4196 /+ 4197 version(Windows) { 4198 SleepEx(INFINITE, true); 4199 4200 //DWORD numberTransferred; 4201 //Win32Enforce!GetOverlappedResult(file.handle, &overlapped, &numberTransferred, true); 4202 } else version(Posix) { 4203 getThisThreadEventLoop().run(&isComplete); 4204 } 4205 +/ 4206 4207 return response; 4208 } 4209 } 4210 4211 /++ 4212 You can write to a file asynchronously by creating one of these. 4213 +/ 4214 final class AsyncWriteRequest : AsyncOperationRequest { 4215 struct LowLevelOperation { 4216 AsyncFile file; 4217 ubyte[] buffer; 4218 long offset; 4219 4220 this(typeof(this.tupleof) args) { 4221 this.tupleof = args; 4222 } 4223 4224 version(Windows) { 4225 auto opCall(OVERLAPPED* overlapped, LPOVERLAPPED_COMPLETION_ROUTINE ocr) { 4226 overlapped.Offset = (cast(ulong) offset) & 0xffff_ffff; 4227 overlapped.OffsetHigh = ((cast(ulong) offset) >> 32) & 0xffff_ffff; 4228 return WriteFileEx(file.handle, buffer.ptr, cast(int) buffer.length, overlapped, ocr); 4229 } 4230 } else { 4231 auto opCall() { 4232 return core.sys.posix.unistd.write(file.handle, buffer.ptr, buffer.length); 4233 } 4234 } 4235 4236 string errorString() { 4237 return "Write"; 4238 } 4239 } 4240 mixin OverlappedIoRequest!(AsyncWriteResponse, LowLevelOperation); 4241 4242 this(AsyncFile file, ubyte[] buffer, long offset) { 4243 this.llo = LowLevelOperation(file, buffer, offset); 4244 response = typeof(response).defaultConstructed; 4245 } 4246 } 4247 4248 /++ 4249 4250 +/ 4251 class AsyncWriteResponse : AsyncOperationResponse { 4252 const ubyte[] bufferWritten; 4253 const SystemErrorCode errorCode; 4254 4255 this(SystemErrorCode errorCode, const(ubyte)[] bufferWritten) { 4256 this.errorCode = errorCode; 4257 this.bufferWritten = bufferWritten; 4258 } 4259 4260 override bool wasSuccessful() { 4261 return errorCode.wasSuccessful; 4262 } 4263 } 4264 4265 /++ 4266 4267 +/ 4268 final class AsyncReadRequest : AsyncOperationRequest { 4269 struct LowLevelOperation { 4270 AsyncFile file; 4271 ubyte[] buffer; 4272 long offset; 4273 4274 this(typeof(this.tupleof) args) { 4275 this.tupleof = args; 4276 } 4277 4278 version(Windows) { 4279 auto opCall(OVERLAPPED* overlapped, LPOVERLAPPED_COMPLETION_ROUTINE ocr) { 4280 overlapped.Offset = (cast(ulong) offset) & 0xffff_ffff; 4281 overlapped.OffsetHigh = ((cast(ulong) offset) >> 32) & 0xffff_ffff; 4282 return ReadFileEx(file.handle, buffer.ptr, cast(int) buffer.length, overlapped, ocr); 4283 } 4284 } else { 4285 auto opCall() { 4286 return core.sys.posix.unistd.read(file.handle, buffer.ptr, buffer.length); 4287 } 4288 } 4289 4290 string errorString() { 4291 return "Read"; 4292 } 4293 } 4294 mixin OverlappedIoRequest!(AsyncReadResponse, LowLevelOperation); 4295 4296 /++ 4297 The file must have the overlapped flag enabled on Windows and the nonblock flag set on Posix. 4298 4299 The buffer MUST NOT be touched by you - not used by another request, modified, read, or freed, including letting a static array going out of scope - until this request's `isComplete` returns `true`. 4300 4301 The offset is where to start reading a disk file. For all other types of files, pass 0. 4302 +/ 4303 this(AsyncFile file, ubyte[] buffer, long offset) { 4304 this.llo = LowLevelOperation(file, buffer, offset); 4305 response = typeof(response).defaultConstructed; 4306 } 4307 4308 /++ 4309 4310 +/ 4311 // abstract void repeat(); 4312 } 4313 4314 /++ 4315 4316 +/ 4317 class AsyncReadResponse : AsyncOperationResponse { 4318 const ubyte[] bufferRead; 4319 const SystemErrorCode errorCode; 4320 4321 this(SystemErrorCode errorCode, const(ubyte)[] bufferRead) { 4322 this.errorCode = errorCode; 4323 this.bufferRead = bufferRead; 4324 } 4325 4326 override bool wasSuccessful() { 4327 return errorCode.wasSuccessful; 4328 } 4329 } 4330 4331 /+ 4332 Tasks: 4333 startTask() 4334 startSubTask() - what if it just did this when it knows it is being run from inside a task? 4335 runHelperFunction() - whomever it reports to is the parent 4336 +/ 4337 4338 class ScheduableTask : Fiber { 4339 private void delegate() dg; 4340 4341 // linked list stuff 4342 private static ScheduableTask taskRoot; 4343 private ScheduableTask previous; 4344 private ScheduableTask next; 4345 4346 // need the controlling thread to know how to wake it up if it receives a message 4347 private Thread controllingThread; 4348 4349 // the api 4350 4351 this(void delegate() dg) { 4352 assert(dg !is null); 4353 4354 this.dg = dg; 4355 super(&taskRunner); 4356 4357 if(taskRoot !is null) { 4358 this.next = taskRoot; 4359 taskRoot.previous = this; 4360 } 4361 taskRoot = this; 4362 } 4363 4364 /+ 4365 enum BehaviorOnCtrlC { 4366 ignore, 4367 cancel, 4368 deliverMessage 4369 } 4370 +/ 4371 4372 private bool cancelled; 4373 4374 public void cancel() { 4375 this.cancelled = true; 4376 // if this is running, we can throw immediately 4377 // otherwise if we're calling from an appropriate thread, we can call it immediately 4378 // otherwise we need to queue a wakeup to its own thread. 4379 // tbh we should prolly just queue it every time 4380 } 4381 4382 private void taskRunner() { 4383 try { 4384 dg(); 4385 } catch(TaskCancelledException tce) { 4386 // this space intentionally left blank; 4387 // the purpose of this exception is to just 4388 // let the fiber's destructors run before we 4389 // let it die. 4390 } catch(Throwable t) { 4391 if(taskUncaughtException is null) { 4392 throw t; 4393 } else { 4394 taskUncaughtException(t); 4395 } 4396 } finally { 4397 if(this is taskRoot) { 4398 taskRoot = taskRoot.next; 4399 if(taskRoot !is null) 4400 taskRoot.previous = null; 4401 } else { 4402 assert(this.previous !is null); 4403 assert(this.previous.next is this); 4404 this.previous.next = this.next; 4405 if(this.next !is null) 4406 this.next.previous = this.previous; 4407 } 4408 } 4409 } 4410 } 4411 4412 /++ 4413 4414 +/ 4415 void delegate(Throwable t) taskUncaughtException; 4416 4417 /++ 4418 Gets an object that lets you control a schedulable task (which is a specialization of a fiber) and can be used in an `if` statement. 4419 4420 --- 4421 if(auto controller = inSchedulableTask()) { 4422 controller.yieldUntilReadable(...); 4423 } 4424 --- 4425 4426 History: 4427 Added August 11, 2023 (dub v11.1) 4428 +/ 4429 SchedulableTaskController inSchedulableTask() { 4430 import core.thread.fiber; 4431 4432 if(auto fiber = Fiber.getThis) { 4433 return SchedulableTaskController(cast(ScheduableTask) fiber); 4434 } 4435 4436 return SchedulableTaskController(null); 4437 } 4438 4439 /// ditto 4440 struct SchedulableTaskController { 4441 private this(ScheduableTask fiber) { 4442 this.fiber = fiber; 4443 } 4444 4445 private ScheduableTask fiber; 4446 4447 /++ 4448 4449 +/ 4450 bool opCast(T : bool)() { 4451 return fiber !is null; 4452 } 4453 4454 /++ 4455 4456 +/ 4457 version(Posix) 4458 void yieldUntilReadable(NativeFileHandle handle) { 4459 assert(fiber !is null); 4460 4461 auto cb = new CallbackHelper(() { fiber.call(); }); 4462 4463 // FIXME: if the fd is already registered in this thread it can throw... 4464 version(Windows) 4465 auto rearmToken = getThisThreadEventLoop().addCallbackOnFdReadableOneShot(handle, cb); 4466 else 4467 auto rearmToken = getThisThreadEventLoop().addCallbackOnFdReadableOneShot(handle, cb); 4468 4469 // FIXME: this is only valid if the fiber is only ever going to run in this thread! 4470 fiber.yield(); 4471 4472 rearmToken.unregister(); 4473 4474 // what if there are other messages, like a ctrl+c? 4475 if(fiber.cancelled) 4476 throw new TaskCancelledException(); 4477 } 4478 4479 version(Windows) 4480 void yieldUntilSignaled(NativeFileHandle handle) { 4481 // add it to the WaitForMultipleObjects thing w/ a cb 4482 } 4483 } 4484 4485 class TaskCancelledException : object.Exception { 4486 this() { 4487 super("Task cancelled"); 4488 } 4489 } 4490 4491 private class CoreWorkerThread : Thread { 4492 this(EventLoopType type) { 4493 this.type = type; 4494 4495 // task runners are supposed to have smallish stacks since they either just run a single callback or call into fibers 4496 // the helper runners might be a bit bigger tho 4497 super(&run); 4498 } 4499 void run() { 4500 eventLoop = getThisThreadEventLoop(this.type); 4501 atomicOp!"+="(startedCount, 1); 4502 atomicOp!"+="(runningCount, 1); 4503 scope(exit) { 4504 atomicOp!"-="(runningCount, 1); 4505 } 4506 4507 eventLoop.run(() => cancelled); 4508 } 4509 4510 private bool cancelled; 4511 4512 void cancel() { 4513 cancelled = true; 4514 } 4515 4516 EventLoopType type; 4517 ICoreEventLoop eventLoop; 4518 4519 __gshared static { 4520 CoreWorkerThread[] taskRunners; 4521 CoreWorkerThread[] helperRunners; 4522 ICoreEventLoop mainThreadLoop; 4523 4524 // for the helper function thing on the bsds i could have my own little circular buffer of availability 4525 4526 shared(int) startedCount; 4527 shared(int) runningCount; 4528 4529 bool started; 4530 4531 void setup(int numberOfTaskRunners, int numberOfHelpers) { 4532 assert(!started); 4533 synchronized { 4534 mainThreadLoop = getThisThreadEventLoop(); 4535 4536 foreach(i; 0 .. numberOfTaskRunners) { 4537 auto nt = new CoreWorkerThread(EventLoopType.TaskRunner); 4538 taskRunners ~= nt; 4539 nt.start(); 4540 } 4541 foreach(i; 0 .. numberOfHelpers) { 4542 auto nt = new CoreWorkerThread(EventLoopType.HelperWorker); 4543 helperRunners ~= nt; 4544 nt.start(); 4545 } 4546 4547 const expectedCount = numberOfHelpers + numberOfTaskRunners; 4548 4549 while(startedCount < expectedCount) { 4550 Thread.yield(); 4551 } 4552 4553 started = true; 4554 } 4555 } 4556 4557 void cancelAll() { 4558 foreach(runner; taskRunners) 4559 runner.cancel(); 4560 foreach(runner; helperRunners) 4561 runner.cancel(); 4562 4563 } 4564 } 4565 } 4566 4567 private int numberOfCpus() { 4568 return 4; // FIXME 4569 } 4570 4571 /++ 4572 To opt in to the full functionality of this module with customization opportunity, create one and only one of these objects that is valid for exactly the lifetime of the application. 4573 4574 Normally, this means writing a main like this: 4575 4576 --- 4577 import arsd.core; 4578 void main() { 4579 ArsdCoreApplication app = ArsdCoreApplication("Your app name"); 4580 4581 // do your setup here 4582 4583 // the rest of your code here 4584 } 4585 --- 4586 4587 Its destructor runs the event loop then waits to for the workers to finish to clean them up. 4588 +/ 4589 // FIXME: single instance? 4590 struct ArsdCoreApplication { 4591 private ICoreEventLoop impl; 4592 4593 /++ 4594 default number of threads is to split your cpus between blocking function runners and task runners 4595 +/ 4596 this(string applicationName) { 4597 auto num = numberOfCpus(); 4598 num /= 2; 4599 if(num <= 0) 4600 num = 1; 4601 this(applicationName, num, num); 4602 } 4603 4604 /++ 4605 4606 +/ 4607 this(string applicationName, int numberOfTaskRunners, int numberOfHelpers) { 4608 impl = getThisThreadEventLoop(EventLoopType.Explicit); 4609 CoreWorkerThread.setup(numberOfTaskRunners, numberOfHelpers); 4610 } 4611 4612 @disable this(); 4613 @disable this(this); 4614 /++ 4615 This must be deterministically destroyed. 4616 +/ 4617 @disable new(); 4618 4619 ~this() { 4620 if(!alreadyRun) 4621 run(); 4622 exitApplication(); 4623 waitForWorkersToExit(3000); 4624 } 4625 4626 void exitApplication() { 4627 CoreWorkerThread.cancelAll(); 4628 } 4629 4630 void waitForWorkersToExit(int timeoutMilliseconds) { 4631 4632 } 4633 4634 private bool alreadyRun; 4635 4636 void run() { 4637 impl.run(() => false); 4638 alreadyRun = true; 4639 } 4640 } 4641 4642 4643 private class CoreEventLoopImplementation : ICoreEventLoop { 4644 4645 version(Arsd_core_kqueue) { 4646 // this thread apc dispatches go as a custom event to the queue 4647 // the other queues go through one byte at a time pipes (barf). freebsd 13 and newest nbsd have eventfd too tho so maybe i can use them but the other kqueue systems don't. 4648 4649 void runOnce() { 4650 kevent_t[16] ev; 4651 //timespec tout = timespec(1, 0); 4652 auto nev = kevent(kqueuefd, null, 0, ev.ptr, ev.length, null/*&tout*/); 4653 if(nev == -1) { 4654 // FIXME: EINTR 4655 throw new SystemApiException("kevent", errno); 4656 } else if(nev == 0) { 4657 // timeout 4658 } else { 4659 foreach(event; ev[0 .. nev]) { 4660 if(event.filter == EVFILT_SIGNAL) { 4661 // FIXME: I could prolly do this better tbh 4662 markSignalOccurred(cast(int) event.ident); 4663 signalChecker(); 4664 } else { 4665 // FIXME: event.filter more specific? 4666 CallbackHelper cb = cast(CallbackHelper) event.udata; 4667 cb.call(); 4668 } 4669 } 4670 } 4671 } 4672 4673 // FIXME: idk how to make one event that multiple kqueues can listen to w/o being shared 4674 // maybe a shared kqueue could work that the thread kqueue listen to (which i rejected for 4675 // epoll cuz it caused thundering herd problems but maybe it'd work here) 4676 4677 UnregisterToken addCallbackOnFdReadable(int fd, CallbackHelper cb) { 4678 kevent_t ev; 4679 4680 EV_SET(&ev, fd, EVFILT_READ, EV_ADD | EV_ENABLE/* | EV_ONESHOT*/, 0, 0, cast(void*) cb); 4681 4682 ErrnoEnforce!kevent(kqueuefd, &ev, 1, null, 0, null); 4683 4684 return UnregisterToken(this, fd, cb); 4685 } 4686 4687 RearmToken addCallbackOnFdReadableOneShot(int fd, CallbackHelper cb) { 4688 kevent_t ev; 4689 4690 EV_SET(&ev, fd, EVFILT_READ, EV_ADD | EV_ENABLE/* | EV_ONESHOT*/, 0, 0, cast(void*) cb); 4691 4692 ErrnoEnforce!kevent(kqueuefd, &ev, 1, null, 0, null); 4693 4694 return RearmToken(true, this, fd, cb, 0); 4695 } 4696 4697 RearmToken addCallbackOnFdWritableOneShot(int fd, CallbackHelper cb) { 4698 kevent_t ev; 4699 4700 EV_SET(&ev, fd, EVFILT_WRITE, EV_ADD | EV_ENABLE/* | EV_ONESHOT*/, 0, 0, cast(void*) cb); 4701 4702 ErrnoEnforce!kevent(kqueuefd, &ev, 1, null, 0, null); 4703 4704 return RearmToken(false, this, fd, cb, 0); 4705 } 4706 4707 private void rearmFd(RearmToken token) { 4708 if(token.readable) 4709 cast(void) addCallbackOnFdReadableOneShot(token.fd, token.cb); 4710 else 4711 cast(void) addCallbackOnFdWritableOneShot(token.fd, token.cb); 4712 } 4713 4714 private void triggerGlobalEvent() { 4715 ubyte a; 4716 import core.sys.posix.unistd; 4717 write(kqueueGlobalFd[1], &a, 1); 4718 } 4719 4720 private this() { 4721 kqueuefd = ErrnoEnforce!kqueue(); 4722 setCloExec(kqueuefd); // FIXME O_CLOEXEC 4723 4724 if(kqueueGlobalFd[0] == 0) { 4725 import core.sys.posix.unistd; 4726 pipe(kqueueGlobalFd); 4727 setCloExec(kqueueGlobalFd[0]); 4728 setCloExec(kqueueGlobalFd[1]); 4729 4730 signal(SIGINT, SIG_IGN); // FIXME 4731 } 4732 4733 kevent_t ev; 4734 4735 EV_SET(&ev, SIGCHLD, EVFILT_SIGNAL, EV_ADD | EV_ENABLE, 0, 0, null); 4736 ErrnoEnforce!kevent(kqueuefd, &ev, 1, null, 0, null); 4737 EV_SET(&ev, SIGINT, EVFILT_SIGNAL, EV_ADD | EV_ENABLE, 0, 0, null); 4738 ErrnoEnforce!kevent(kqueuefd, &ev, 1, null, 0, null); 4739 4740 globalEventSent = new CallbackHelper(&readGlobalEvent); 4741 EV_SET(&ev, kqueueGlobalFd[0], EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, cast(void*) globalEventSent); 4742 ErrnoEnforce!kevent(kqueuefd, &ev, 1, null, 0, null); 4743 } 4744 4745 private int kqueuefd = -1; 4746 4747 private CallbackHelper globalEventSent; 4748 void readGlobalEvent() { 4749 kevent_t event; 4750 4751 import core.sys.posix.unistd; 4752 ubyte a; 4753 read(kqueueGlobalFd[0], &a, 1); 4754 4755 // FIXME: the thread is woken up, now we need to check the circualr buffer queue 4756 } 4757 4758 private __gshared int[2] kqueueGlobalFd; 4759 } 4760 4761 /+ 4762 // this setup needs no extra allocation 4763 auto op = read(file, buffer); 4764 op.oncomplete = &thisfiber.call; 4765 op.start(); 4766 thisfiber.yield(); 4767 auto result = op.waitForCompletion(); // guaranteed to return instantly thanks to previous setup 4768 4769 can generically abstract that into: 4770 4771 auto result = thisTask.await(read(file, buffer)); 4772 4773 4774 You MUST NOT use buffer in any way - not read, modify, deallocate, reuse, anything - until the PendingOperation is complete. 4775 4776 Note that PendingOperation may just be a wrapper around an internally allocated object reference... but then if you do a waitForFirstToComplete what happens? 4777 4778 those could of course just take the value type things 4779 +/ 4780 4781 4782 version(Arsd_core_windows) { 4783 // all event loops share the one iocp, Windows 4784 // manages how to do it 4785 __gshared HANDLE iocpTaskRunners; 4786 __gshared HANDLE iocpWorkers; 4787 4788 HANDLE[] handles; 4789 4790 // i think to terminate i just have to post the message at least once for every thread i know about, maybe a few more times for threads i don't know about. 4791 4792 bool isWorker; // if it is a worker we wait on the iocp, if not we wait on msg 4793 4794 void runOnce() { 4795 if(isWorker) { 4796 // this function is only supported on Windows Vista and up, so using this 4797 // means dropping support for XP. 4798 //GetQueuedCompletionStatusEx(); 4799 assert(0); // FIXME 4800 } else { 4801 auto wto = 0; 4802 4803 auto waitResult = MsgWaitForMultipleObjectsEx( 4804 cast(int) handles.length, handles.ptr, 4805 (wto == 0 ? INFINITE : wto), /* timeout */ 4806 0x04FF, /* QS_ALLINPUT */ 4807 0x0002 /* MWMO_ALERTABLE */ | 0x0004 /* MWMO_INPUTAVAILABLE */); 4808 4809 enum WAIT_OBJECT_0 = 0; 4810 if(waitResult >= WAIT_OBJECT_0 && waitResult < handles.length + WAIT_OBJECT_0) { 4811 auto h = handles[waitResult - WAIT_OBJECT_0]; 4812 // FIXME: run the handle ready callback 4813 } else if(waitResult == handles.length + WAIT_OBJECT_0) { 4814 // message ready 4815 int count; 4816 MSG message; 4817 while(PeekMessage(&message, null, 0, 0, PM_NOREMOVE)) { // need to peek since sometimes MsgWaitForMultipleObjectsEx returns even though GetMessage can block. tbh i don't fully understand it but the docs say it is foreground activation 4818 auto ret = GetMessage(&message, null, 0, 0); 4819 if(ret == -1) 4820 throw new WindowsApiException("GetMessage", GetLastError()); 4821 TranslateMessage(&message); 4822 DispatchMessage(&message); 4823 4824 count++; 4825 if(count > 10) 4826 break; // take the opportunity to catch up on other events 4827 4828 if(ret == 0) { // WM_QUIT 4829 // EventLoop.quitApplication(); 4830 assert(0); // FIXME 4831 //break; 4832 } 4833 } 4834 } else if(waitResult == 0x000000C0L /* WAIT_IO_COMPLETION */) { 4835 SleepEx(0, true); // I call this to give it a chance to do stuff like async io 4836 } else if(waitResult == 258L /* WAIT_TIMEOUT */) { 4837 // timeout, should never happen since we aren't using it 4838 } else if(waitResult == 0xFFFFFFFF) { 4839 // failed 4840 throw new WindowsApiException("MsgWaitForMultipleObjectsEx", GetLastError()); 4841 } else { 4842 // idk.... 4843 } 4844 } 4845 } 4846 } 4847 4848 version(Posix) { 4849 private __gshared uint sigChildHappened = 0; 4850 private __gshared uint sigIntrHappened = 0; 4851 4852 static void signalChecker() { 4853 if(cas(&sigChildHappened, 1, 0)) { 4854 while(true) { // multiple children could have exited before we processed the notification 4855 4856 import core.sys.posix.sys.wait; 4857 4858 int status; 4859 auto pid = waitpid(-1, &status, WNOHANG); 4860 if(pid == -1) { 4861 import core.stdc.errno; 4862 auto errno = errno; 4863 if(errno == ECHILD) 4864 break; // also all done, there are no children left 4865 // no need to check EINTR since we set WNOHANG 4866 throw new ErrnoApiException("waitpid", errno); 4867 } 4868 if(pid == 0) 4869 break; // all done, all children are still running 4870 4871 // look up the pid for one of our objects 4872 // if it is found, inform it of its status 4873 // and then inform its controlling thread 4874 // to wake up so it can check its waitForCompletion, 4875 // trigger its callbacks, etc. 4876 4877 ExternalProcess.recordChildTerminated(pid, status); 4878 } 4879 4880 } 4881 if(cas(&sigIntrHappened, 1, 0)) { 4882 // FIXME 4883 import core.stdc.stdlib; 4884 exit(0); 4885 } 4886 } 4887 4888 /++ 4889 Informs the arsd.core system that the given signal happened. You can call this from inside a signal handler. 4890 +/ 4891 public static void markSignalOccurred(int sigNumber) nothrow { 4892 import core.sys.posix.unistd; 4893 4894 if(sigNumber == SIGCHLD) 4895 volatileStore(&sigChildHappened, 1); 4896 if(sigNumber == SIGINT) 4897 volatileStore(&sigIntrHappened, 1); 4898 4899 version(Arsd_core_epoll) { 4900 ulong writeValue = 1; 4901 write(signalPipeFd, &writeValue, writeValue.sizeof); 4902 } 4903 } 4904 } 4905 4906 version(Arsd_core_epoll) { 4907 4908 import core.sys.linux.epoll; 4909 import core.sys.linux.sys.eventfd; 4910 4911 private this() { 4912 4913 if(!globalsInitialized) { 4914 synchronized { 4915 if(!globalsInitialized) { 4916 // blocking signals is problematic because it is inherited by child processes 4917 // and that can be problematic for general purpose stuff so i use a self pipe 4918 // here. though since it is linux, im using an eventfd instead just to notify 4919 signalPipeFd = ErrnoEnforce!eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); 4920 signalReaderCallback = new CallbackHelper(&signalReader); 4921 4922 runInTaskRunnerQueue = new CallbackQueue("task runners", true); 4923 runInHelperThreadQueue = new CallbackQueue("helper threads", true); 4924 4925 setSignalHandlers(); 4926 4927 globalsInitialized = true; 4928 } 4929 } 4930 } 4931 4932 epollfd = epoll_create1(EPOLL_CLOEXEC); 4933 4934 // FIXME: ensure UI events get top priority 4935 4936 // global listeners 4937 4938 // FIXME: i should prolly keep the tokens and release them when tearing down. 4939 4940 cast(void) addCallbackOnFdReadable(signalPipeFd, signalReaderCallback); 4941 if(true) { // FIXME: if this is a task runner vs helper thread vs ui thread 4942 cast(void) addCallbackOnFdReadable(runInTaskRunnerQueue.fd, runInTaskRunnerQueue.callback); 4943 runInTaskRunnerQueue.callback.addref(); 4944 } else { 4945 cast(void) addCallbackOnFdReadable(runInHelperThreadQueue.fd, runInHelperThreadQueue.callback); 4946 runInHelperThreadQueue.callback.addref(); 4947 } 4948 4949 // local listener 4950 thisThreadQueue = new CallbackQueue("this thread", false); 4951 cast(void) addCallbackOnFdReadable(thisThreadQueue.fd, thisThreadQueue.callback); 4952 4953 // what are we going to do about timers? 4954 } 4955 4956 void teardown() { 4957 import core.sys.posix.fcntl; 4958 import core.sys.posix.unistd; 4959 4960 close(epollfd); 4961 epollfd = -1; 4962 4963 thisThreadQueue.teardown(); 4964 4965 // FIXME: should prolly free anything left in the callback queue, tho those could also be GC managed tbh. 4966 } 4967 4968 /+ // i call it explicitly at the thread exit instead, but worker threads aren't really supposed to exit generally speaking till process done anyway 4969 static ~this() { 4970 teardown(); 4971 } 4972 +/ 4973 4974 static void teardownGlobals() { 4975 import core.sys.posix.fcntl; 4976 import core.sys.posix.unistd; 4977 4978 synchronized { 4979 restoreSignalHandlers(); 4980 close(signalPipeFd); 4981 signalReaderCallback.release(); 4982 4983 runInTaskRunnerQueue.teardown(); 4984 runInHelperThreadQueue.teardown(); 4985 4986 globalsInitialized = false; 4987 } 4988 4989 } 4990 4991 4992 private static final class CallbackQueue { 4993 int fd = -1; 4994 string name; 4995 CallbackHelper callback; 4996 SynchronizedCircularBuffer!CallbackHelper queue; 4997 4998 this(string name, bool dequeueIsShared) { 4999 this.name = name; 5000 queue = typeof(queue)(this); 5001 5002 fd = ErrnoEnforce!eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK | (dequeueIsShared ? EFD_SEMAPHORE : 0)); 5003 5004 callback = new CallbackHelper(dequeueIsShared ? &sharedDequeueCb : &threadLocalDequeueCb); 5005 } 5006 5007 bool resetEvent() { 5008 import core.sys.posix.unistd; 5009 ulong count; 5010 return read(fd, &count, count.sizeof) == count.sizeof; 5011 } 5012 5013 void sharedDequeueCb() { 5014 if(resetEvent()) { 5015 auto cb = queue.dequeue(); 5016 cb.call(); 5017 cb.release(); 5018 } 5019 } 5020 5021 void threadLocalDequeueCb() { 5022 CallbackHelper[16] buffer; 5023 foreach(cb; queue.dequeueSeveral(buffer[], () { resetEvent(); })) { 5024 cb.call(); 5025 cb.release(); 5026 } 5027 } 5028 5029 void enqueue(CallbackHelper cb) { 5030 if(queue.enqueue(cb)) { 5031 import core.sys.posix.unistd; 5032 ulong count = 1; 5033 ErrnoEnforce!write(fd, &count, count.sizeof); 5034 } else { 5035 throw new ArsdException!"queue is full"(name); 5036 } 5037 } 5038 5039 void teardown() { 5040 import core.sys.posix.fcntl; 5041 import core.sys.posix.unistd; 5042 5043 close(fd); 5044 fd = -1; 5045 5046 callback.release(); 5047 } 5048 } 5049 5050 // there's a global instance of this we refer back to 5051 private __gshared { 5052 bool globalsInitialized; 5053 5054 CallbackHelper signalReaderCallback; 5055 5056 CallbackQueue runInTaskRunnerQueue; 5057 CallbackQueue runInHelperThreadQueue; 5058 5059 int exitEventFd = -1; // FIXME: implement 5060 } 5061 5062 // and then the local loop 5063 private { 5064 int epollfd = -1; 5065 5066 CallbackQueue thisThreadQueue; 5067 } 5068 5069 // signal stuff { 5070 import core.sys.posix.signal; 5071 5072 private __gshared sigaction_t oldSigIntr; 5073 private __gshared sigaction_t oldSigChld; 5074 private __gshared sigaction_t oldSigPipe; 5075 5076 private __gshared int signalPipeFd = -1; 5077 // sigpipe not important, i handle errors on the writes 5078 5079 public static void setSignalHandlers() { 5080 static extern(C) void interruptHandler(int sigNumber) nothrow { 5081 markSignalOccurred(sigNumber); 5082 5083 /+ 5084 // calling the old handler is non-trivial since there can be ignore 5085 // or default or a plain handler or a sigaction 3 arg handler and i 5086 // i don't think it is worth teh complication 5087 sigaction_t* oldHandler; 5088 if(sigNumber == SIGCHLD) 5089 oldHandler = &oldSigChld; 5090 else if(sigNumber == SIGINT) 5091 oldHandler = &oldSigIntr; 5092 if(oldHandler && oldHandler.sa_handler) 5093 oldHandler 5094 +/ 5095 } 5096 5097 sigaction_t n; 5098 n.sa_handler = &interruptHandler; 5099 n.sa_mask = cast(sigset_t) 0; 5100 n.sa_flags = 0; 5101 sigaction(SIGINT, &n, &oldSigIntr); 5102 sigaction(SIGCHLD, &n, &oldSigChld); 5103 5104 n.sa_handler = SIG_IGN; 5105 sigaction(SIGPIPE, &n, &oldSigPipe); 5106 } 5107 5108 public static void restoreSignalHandlers() { 5109 sigaction(SIGINT, &oldSigIntr, null); 5110 sigaction(SIGCHLD, &oldSigChld, null); 5111 sigaction(SIGPIPE, &oldSigPipe, null); 5112 } 5113 5114 private static void signalReader() { 5115 import core.sys.posix.unistd; 5116 ulong number; 5117 read(signalPipeFd, &number, number.sizeof); 5118 5119 signalChecker(); 5120 } 5121 // signal stuff done } 5122 5123 // the any thread poll is just registered in the this thread poll w/ exclusive. nobody actaully epoll_waits 5124 // on the global one directly. 5125 5126 void runOnce() { 5127 epoll_event[16] events; 5128 auto ret = epoll_wait(epollfd, events.ptr, cast(int) events.length, -1); // FIXME: timeout 5129 if(ret == -1) { 5130 import core.stdc.errno; 5131 if(errno == EINTR) { 5132 return; 5133 } 5134 throw new ErrnoApiException("epoll_wait", errno); 5135 } else if(ret == 0) { 5136 // timeout 5137 } else { 5138 // loop events and call associated callbacks 5139 foreach(event; events[0 .. ret]) { 5140 auto flags = event.events; 5141 auto cbObject = cast(CallbackHelper) event.data.ptr; 5142 5143 // FIXME: or if it is an error... 5144 // EPOLLERR - write end of pipe when read end closed or other error. and EPOLLHUP - terminal hangup or read end when write end close (but it will give 0 reading after that soon anyway) 5145 5146 cbObject.call(); 5147 } 5148 } 5149 } 5150 5151 // building blocks for low-level integration with the loop 5152 5153 UnregisterToken addCallbackOnFdReadable(int fd, CallbackHelper cb) { 5154 epoll_event event; 5155 event.data.ptr = cast(void*) cb; 5156 event.events = EPOLLIN | EPOLLEXCLUSIVE; 5157 if(epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event) == -1) 5158 throw new ErrnoApiException("epoll_ctl", errno); 5159 5160 return UnregisterToken(this, fd, cb); 5161 } 5162 5163 /++ 5164 Adds a one-off callback that you can optionally rearm when it happens. 5165 +/ 5166 RearmToken addCallbackOnFdReadableOneShot(int fd, CallbackHelper cb) { 5167 epoll_event event; 5168 event.data.ptr = cast(void*) cb; 5169 event.events = EPOLLIN | EPOLLONESHOT; 5170 if(epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event) == -1) 5171 throw new ErrnoApiException("epoll_ctl", errno); 5172 5173 return RearmToken(true, this, fd, cb, EPOLLIN | EPOLLONESHOT); 5174 } 5175 5176 /++ 5177 Adds a one-off callback that you can optionally rearm when it happens. 5178 +/ 5179 RearmToken addCallbackOnFdWritableOneShot(int fd, CallbackHelper cb) { 5180 epoll_event event; 5181 event.data.ptr = cast(void*) cb; 5182 event.events = EPOLLOUT | EPOLLONESHOT; 5183 if(epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event) == -1) 5184 throw new ErrnoApiException("epoll_ctl", errno); 5185 5186 return RearmToken(false, this, fd, cb, EPOLLOUT | EPOLLONESHOT); 5187 } 5188 5189 private void unregisterFd(int fd) { 5190 epoll_event event; 5191 if(epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, &event) == -1) 5192 throw new ErrnoApiException("epoll_ctl", errno); 5193 } 5194 5195 private void rearmFd(RearmToken token) { 5196 epoll_event event; 5197 event.data.ptr = cast(void*) token.cb; 5198 event.events = token.flags; 5199 if(epoll_ctl(epollfd, EPOLL_CTL_MOD, token.fd, &event) == -1) 5200 throw new ErrnoApiException("epoll_ctl", errno); 5201 } 5202 5203 // Disk files will have to be sent as messages to a worker to do the read and report back a completion packet. 5204 } 5205 5206 version(Arsd_core_kqueue) { 5207 // FIXME 5208 } 5209 5210 // cross platform adapters 5211 void setTimeout() {} 5212 void addFileOrDirectoryChangeListener(FilePath name, uint flags, bool recursive = false) {} 5213 } 5214 5215 // deduplication???????// 5216 bool postMessage(ThreadToRunIn destination, void delegate() code) { 5217 return false; 5218 } 5219 bool postMessage(ThreadToRunIn destination, Object message) { 5220 return false; 5221 } 5222 5223 /+ 5224 void main() { 5225 // FIXME: the offset doesn't seem to be done right 5226 auto file = new AsyncFile(FilePath("test.txt"), AsyncFile.OpenMode.writeWithTruncation); 5227 file.write("hello", 10).waitForCompletion(); 5228 } 5229 +/ 5230 5231 // to test the mailboxes 5232 /+ 5233 void main() { 5234 /+ 5235 import std.stdio; 5236 Thread[4] pool; 5237 5238 bool shouldExit; 5239 5240 static int received; 5241 5242 static void tester() { 5243 received++; 5244 //writeln(cast(void*) Thread.getThis, " ", received); 5245 } 5246 5247 foreach(ref thread; pool) { 5248 thread = new Thread(() { 5249 getThisThreadEventLoop().run(() { 5250 return shouldExit; 5251 }); 5252 }); 5253 thread.start(); 5254 } 5255 5256 getThisThreadEventLoop(); // ensure it is all initialized before proceeding. FIXME: i should have an ensure initialized function i do on most the public apis. 5257 5258 int lol; 5259 5260 try 5261 foreach(i; 0 .. 6000) { 5262 CoreEventLoopImplementation.runInTaskRunnerQueue.enqueue(new CallbackHelper(&tester)); 5263 lol = cast(int) i; 5264 } 5265 catch(ArsdExceptionBase e) { 5266 Thread.sleep(50.msecs); 5267 writeln(e); 5268 writeln(lol); 5269 } 5270 5271 import core.stdc.stdlib; 5272 exit(0); 5273 5274 version(none) 5275 foreach(i; 0 .. 100) 5276 CoreEventLoopImplementation.runInTaskRunnerQueue.enqueue(new CallbackHelper(&tester)); 5277 5278 5279 foreach(ref thread; pool) { 5280 thread.join(); 5281 } 5282 +/ 5283 5284 5285 static int received; 5286 5287 static void tester() { 5288 received++; 5289 //writeln(cast(void*) Thread.getThis, " ", received); 5290 } 5291 5292 5293 5294 auto ev = cast(CoreEventLoopImplementation) getThisThreadEventLoop(); 5295 foreach(i; 0 .. 100) 5296 ev.thisThreadQueue.enqueue(new CallbackHelper(&tester)); 5297 foreach(i; 0 .. 100 / 16 + 1) 5298 ev.runOnce(); 5299 import std.conv; 5300 assert(received == 100, to!string(received)); 5301 5302 } 5303 +/ 5304 5305 /++ 5306 This is primarily a helper for the event queues. It is public in the hope it might be useful, 5307 but subject to change without notice; I will treat breaking it the same as if it is private. 5308 (That said, it is a simple little utility that does its job, so it is unlikely to change much. 5309 The biggest change would probably be letting it grow and changing from inline to dynamic array.) 5310 5311 It is a fixed-size ring buffer that synchronizes on a given object you give it in the constructor. 5312 5313 After enqueuing something, you should probably set an event to notify the other threads. This is left 5314 as an exercise to you (or another wrapper). 5315 +/ 5316 struct SynchronizedCircularBuffer(T, size_t maxSize = 128) { 5317 private T[maxSize] ring; 5318 private int front; 5319 private int back; 5320 5321 private Object synchronizedOn; 5322 5323 @disable this(); 5324 5325 /++ 5326 The Object's monitor is used to synchronize the methods in here. 5327 +/ 5328 this(Object synchronizedOn) { 5329 this.synchronizedOn = synchronizedOn; 5330 } 5331 5332 /++ 5333 Note the potential race condition between calling this and actually dequeuing something. You might 5334 want to acquire the lock on the object before calling this (nested synchronized things are allowed 5335 as long as the same thread is the one doing it). 5336 +/ 5337 bool isEmpty() { 5338 synchronized(this.synchronizedOn) { 5339 return front == back; 5340 } 5341 } 5342 5343 /++ 5344 Note the potential race condition between calling this and actually queuing something. 5345 +/ 5346 bool isFull() { 5347 synchronized(this.synchronizedOn) { 5348 return isFullUnsynchronized(); 5349 } 5350 } 5351 5352 private bool isFullUnsynchronized() nothrow const { 5353 return ((back + 1) % ring.length) == front; 5354 5355 } 5356 5357 /++ 5358 If this returns true, you should signal listening threads (with an event or a semaphore, 5359 depending on how you dequeue it). If it returns false, the queue was full and your thing 5360 was NOT added. You might wait and retry later (you could set up another event to signal it 5361 has been read and wait for that, or maybe try on a timer), or just fail and throw an exception 5362 or to abandon the message. 5363 +/ 5364 bool enqueue(T what) { 5365 synchronized(this.synchronizedOn) { 5366 if(isFullUnsynchronized()) 5367 return false; 5368 ring[(back++) % ring.length] = what; 5369 return true; 5370 } 5371 } 5372 5373 private T dequeueUnsynchronized() nothrow { 5374 assert(front != back); 5375 return ring[(front++) % ring.length]; 5376 } 5377 5378 /++ 5379 If you are using a semaphore to signal, you can call this once for each count of it 5380 and you can do that separately from this call (though they should be paired). 5381 5382 If you are using an event, you should use [dequeueSeveral] instead to drain it. 5383 +/ 5384 T dequeue() { 5385 synchronized(this.synchronizedOn) { 5386 return dequeueUnsynchronized(); 5387 } 5388 } 5389 5390 /++ 5391 Note that if you use a semaphore to signal waiting threads, you should probably not call this. 5392 5393 If you use a set/reset event, there's a potential race condition between the dequeue and event 5394 reset. This is why the `runInsideLockIfEmpty` delegate is there - when it is empty, before it 5395 unlocks, it will give you a chance to reset the event. Otherwise, it can remain set to indicate 5396 that there's still pending data in the queue. 5397 +/ 5398 T[] dequeueSeveral(return T[] buffer, scope void delegate() runInsideLockIfEmpty = null) { 5399 int pos; 5400 synchronized(this.synchronizedOn) { 5401 while(pos < buffer.length && front != back) { 5402 buffer[pos++] = dequeueUnsynchronized(); 5403 } 5404 if(front == back && runInsideLockIfEmpty !is null) 5405 runInsideLockIfEmpty(); 5406 } 5407 return buffer[0 .. pos]; 5408 } 5409 } 5410 5411 unittest { 5412 Object object = new Object(); 5413 auto queue = SynchronizedCircularBuffer!CallbackHelper(object); 5414 assert(queue.isEmpty); 5415 foreach(i; 0 .. queue.ring.length - 1) 5416 queue.enqueue(cast(CallbackHelper) cast(void*) i); 5417 assert(queue.isFull); 5418 5419 foreach(i; 0 .. queue.ring.length - 1) 5420 assert(queue.dequeue() is (cast(CallbackHelper) cast(void*) i)); 5421 assert(queue.isEmpty); 5422 5423 foreach(i; 0 .. queue.ring.length - 1) 5424 queue.enqueue(cast(CallbackHelper) cast(void*) i); 5425 assert(queue.isFull); 5426 5427 CallbackHelper[] buffer = new CallbackHelper[](300); 5428 auto got = queue.dequeueSeveral(buffer); 5429 assert(got.length == queue.ring.length - 1); 5430 assert(queue.isEmpty); 5431 foreach(i, item; got) 5432 assert(item is (cast(CallbackHelper) cast(void*) i)); 5433 5434 foreach(i; 0 .. 8) 5435 queue.enqueue(cast(CallbackHelper) cast(void*) i); 5436 buffer = new CallbackHelper[](4); 5437 got = queue.dequeueSeveral(buffer); 5438 assert(got.length == 4); 5439 foreach(i, item; got) 5440 assert(item is (cast(CallbackHelper) cast(void*) i)); 5441 got = queue.dequeueSeveral(buffer); 5442 assert(got.length == 4); 5443 foreach(i, item; got) 5444 assert(item is (cast(CallbackHelper) cast(void*) (i+4))); 5445 got = queue.dequeueSeveral(buffer); 5446 assert(got.length == 0); 5447 assert(queue.isEmpty); 5448 } 5449 5450 /++ 5451 5452 +/ 5453 enum ByteOrder { 5454 irrelevant, 5455 littleEndian, 5456 bigEndian, 5457 } 5458 5459 /++ 5460 A class to help write a stream of binary data to some target. 5461 5462 NOT YET FUNCTIONAL 5463 +/ 5464 class WritableStream { 5465 /++ 5466 5467 +/ 5468 this(size_t bufferSize) { 5469 this(new ubyte[](bufferSize)); 5470 } 5471 5472 /// ditto 5473 this(ubyte[] buffer) { 5474 this.buffer = buffer; 5475 } 5476 5477 /++ 5478 5479 +/ 5480 final void put(T)(T value, ByteOrder byteOrder = ByteOrder.irrelevant, string file = __FILE__, size_t line = __LINE__) { 5481 static if(T.sizeof == 8) 5482 ulong b; 5483 else static if(T.sizeof == 4) 5484 uint b; 5485 else static if(T.sizeof == 2) 5486 ushort b; 5487 else static if(T.sizeof == 1) 5488 ubyte b; 5489 else static assert(0, "unimplemented type, try using just the basic types"); 5490 5491 if(byteOrder == ByteOrder.irrelevant && T.sizeof > 1) 5492 throw new InvalidArgumentsException("byteOrder", "byte order must be specified for type " ~ T.stringof ~ " because it is bigger than one byte", "WritableStream.put", file, line); 5493 5494 final switch(byteOrder) { 5495 case ByteOrder.irrelevant: 5496 writeOneByte(b); 5497 break; 5498 case ByteOrder.littleEndian: 5499 foreach(i; 0 .. T.sizeof) { 5500 writeOneByte(b & 0xff); 5501 b >>= 8; 5502 } 5503 break; 5504 case ByteOrder.bigEndian: 5505 int amount = T.sizeof * 8 - 8; 5506 foreach(i; 0 .. T.sizeof) { 5507 writeOneByte((b >> amount) & 0xff); 5508 amount -= 8; 5509 } 5510 break; 5511 } 5512 } 5513 5514 /// ditto 5515 final void put(T : E[], E)(T value, ByteOrder elementByteOrder = ByteOrder.irrelevant, string file = __FILE__, size_t line = __LINE__) { 5516 foreach(item; value) 5517 put(item, elementByteOrder, file, line); 5518 } 5519 5520 /++ 5521 Performs a final flush() call, then marks the stream as closed, meaning no further data will be written to it. 5522 +/ 5523 void close() { 5524 isClosed_ = true; 5525 } 5526 5527 /++ 5528 Writes what is currently in the buffer to the target and waits for the target to accept it. 5529 Please note: if you are subclassing this to go to a different target 5530 +/ 5531 void flush() {} 5532 5533 /++ 5534 Returns true if either you closed it or if the receiving end closed their side, indicating they 5535 don't want any more data. 5536 +/ 5537 bool isClosed() { 5538 return isClosed_; 5539 } 5540 5541 // hasRoomInBuffer 5542 // canFlush 5543 // waitUntilCanFlush 5544 5545 // flushImpl 5546 // markFinished / close - tells the other end you're done 5547 5548 private final writeOneByte(ubyte value) { 5549 if(bufferPosition == buffer.length) 5550 flush(); 5551 5552 buffer[bufferPosition++] = value; 5553 } 5554 5555 5556 private { 5557 ubyte[] buffer; 5558 int bufferPosition; 5559 bool isClosed_; 5560 } 5561 } 5562 5563 /++ 5564 A stream can be used by just one task at a time, but one task can consume multiple streams. 5565 5566 Streams may be populated by async sources (in which case they must be called from a fiber task), 5567 from a function generating the data on demand (including an input range), from memory, or from a synchronous file. 5568 5569 A stream of heterogeneous types is compatible with input ranges. 5570 5571 It reads binary data. 5572 +/ 5573 class ReadableStream { 5574 5575 this() { 5576 5577 } 5578 5579 /++ 5580 Gets data of the specified type `T` off the stream. The byte order of the T on the stream must be specified unless it is irrelevant (e.g. single byte entries). 5581 5582 --- 5583 // get an int out of a big endian stream 5584 int i = stream.get!int(ByteOrder.bigEndian); 5585 5586 // get i bytes off the stream 5587 ubyte[] data = stream.get!(ubyte[])(i); 5588 --- 5589 +/ 5590 final T get(T)(ByteOrder byteOrder = ByteOrder.irrelevant, string file = __FILE__, size_t line = __LINE__) { 5591 if(byteOrder == ByteOrder.irrelevant && T.sizeof > 1) 5592 throw new InvalidArgumentsException("byteOrder", "byte order must be specified for type " ~ T.stringof ~ " because it is bigger than one byte", "ReadableStream.get", file, line); 5593 5594 // FIXME: what if it is a struct? 5595 5596 while(bufferedLength() < T.sizeof) 5597 waitForAdditionalData(); 5598 5599 static if(T.sizeof == 1) { 5600 ubyte ret = consumeOneByte(); 5601 return *cast(T*) &ret; 5602 } else { 5603 static if(T.sizeof == 8) 5604 ulong ret; 5605 else static if(T.sizeof == 4) 5606 uint ret; 5607 else static if(T.sizeof == 2) 5608 ushort ret; 5609 else static assert(0, "unimplemented type, try using just the basic types"); 5610 5611 if(byteOrder == ByteOrder.littleEndian) { 5612 typeof(ret) buffer; 5613 foreach(b; 0 .. T.sizeof) { 5614 buffer = consumeOneByte(); 5615 buffer <<= T.sizeof * 8 - 8; 5616 5617 ret >>= 8; 5618 ret |= buffer; 5619 } 5620 } else { 5621 foreach(b; 0 .. T.sizeof) { 5622 ret <<= 8; 5623 ret |= consumeOneByte(); 5624 } 5625 } 5626 5627 return *cast(T*) &ret; 5628 } 5629 } 5630 5631 /// ditto 5632 final T get(T : E[], E)(size_t length, ByteOrder elementByteOrder = ByteOrder.irrelevant, string file = __FILE__, size_t line = __LINE__) { 5633 if(elementByteOrder == ByteOrder.irrelevant && E.sizeof > 1) 5634 throw new InvalidArgumentsException("elementByteOrder", "byte order must be specified for type " ~ E.stringof ~ " because it is bigger than one byte", "ReadableStream.get", file, line); 5635 5636 // if the stream is closed before getting the length or the terminator, should we send partial stuff 5637 // or just throw? 5638 5639 while(bufferedLength() < length * E.sizeof) 5640 waitForAdditionalData(); 5641 5642 T ret; 5643 5644 ret.length = length; 5645 5646 if(false && elementByteOrder == ByteOrder.irrelevant) { 5647 // ret[] = 5648 // FIXME: can prolly optimize 5649 } else { 5650 foreach(i; 0 .. length) 5651 ret[i] = get!E(elementByteOrder); 5652 } 5653 5654 return ret; 5655 5656 } 5657 5658 /// ditto 5659 final T get(T : E[], E)(scope bool delegate(E e) isTerminatingSentinel, ByteOrder elementByteOrder = ByteOrder.irrelevant, string file = __FILE__, size_t line = __LINE__) { 5660 if(byteOrder == ByteOrder.irrelevant && E.sizeof > 1) 5661 throw new InvalidArgumentsException("elementByteOrder", "byte order must be specified for type " ~ E.stringof ~ " because it is bigger than one byte", "ReadableStream.get", file, line); 5662 5663 assert(0, "Not implemented"); 5664 } 5665 5666 /++ 5667 5668 +/ 5669 bool isClosed() { 5670 return isClosed_; 5671 } 5672 5673 // Control side of things 5674 5675 private bool isClosed_; 5676 5677 /++ 5678 Feeds data into the stream, which can be consumed by `get`. If a task is waiting for more 5679 data to satisfy its get requests, this will trigger those tasks to resume. 5680 5681 If you feed it empty data, it will mark the stream as closed. 5682 +/ 5683 void feedData(ubyte[] data) { 5684 if(data.length == 0) 5685 isClosed_ = true; 5686 5687 currentBuffer = data; 5688 // this is a borrowed buffer, so we won't keep the reference long term 5689 scope(exit) 5690 currentBuffer = null; 5691 5692 if(waitingTask !is null) { 5693 waitingTask.call(); 5694 } 5695 } 5696 5697 /++ 5698 You basically have to use this thing from a task 5699 +/ 5700 protected void waitForAdditionalData() { 5701 Fiber task = Fiber.getThis; 5702 5703 assert(task !is null); 5704 5705 if(waitingTask !is null && waitingTask !is task) 5706 throw new ArsdException!"streams can only have one waiting task"; 5707 5708 // copy any pending data in our buffer to the longer-term buffer 5709 if(currentBuffer.length) 5710 leftoverBuffer ~= currentBuffer; 5711 5712 waitingTask = task; 5713 task.yield(); 5714 } 5715 5716 private Fiber waitingTask; 5717 private ubyte[] leftoverBuffer; 5718 private ubyte[] currentBuffer; 5719 5720 private size_t bufferedLength() { 5721 return leftoverBuffer.length + currentBuffer.length; 5722 } 5723 5724 private ubyte consumeOneByte() { 5725 ubyte b; 5726 if(leftoverBuffer.length) { 5727 b = leftoverBuffer[0]; 5728 leftoverBuffer = leftoverBuffer[1 .. $]; 5729 } else if(currentBuffer.length) { 5730 b = currentBuffer[0]; 5731 currentBuffer = currentBuffer[1 .. $]; 5732 } else { 5733 assert(0, "consuming off an empty buffer is impossible"); 5734 } 5735 5736 return b; 5737 } 5738 } 5739 5740 // FIXME: do a stringstream too 5741 5742 unittest { 5743 auto stream = new ReadableStream(); 5744 5745 int position; 5746 char[16] errorBuffer; 5747 5748 auto fiber = new Fiber(() { 5749 position = 1; 5750 int a = stream.get!int(ByteOrder.littleEndian); 5751 assert(a == 10, intToString(a, errorBuffer[])); 5752 position = 2; 5753 ubyte b = stream.get!ubyte; 5754 assert(b == 33); 5755 position = 3; 5756 5757 // ubyte[] c = stream.get!(ubyte[])(3); 5758 // int[] d = stream.get!(int[])(3); 5759 }); 5760 5761 fiber.call(); 5762 assert(position == 1); 5763 stream.feedData([10, 0, 0, 0]); 5764 assert(position == 2); 5765 stream.feedData([33]); 5766 assert(position == 3); 5767 5768 // stream.feedData([1,2,3]); 5769 // stream.feedData([1,2,3,4,1,2,3,4,1,2,3,4]); 5770 } 5771 5772 /++ 5773 UNSTABLE, NOT FULLY IMPLEMENTED. DO NOT USE YET. 5774 5775 You might use this like: 5776 5777 --- 5778 auto proc = new ExternalProcess(); 5779 auto stdoutStream = new ReadableStream(); 5780 5781 // to use a stream you can make one and have a task consume it 5782 runTask({ 5783 while(!stdoutStream.isClosed) { 5784 auto line = stdoutStream.get!string(e => e == '\n'); 5785 } 5786 }); 5787 5788 // then make the process feed into the stream 5789 proc.onStdoutAvailable = (got) { 5790 stdoutStream.feedData(got); // send it to the stream for processing 5791 stdout.rawWrite(got); // forward it through to our own thing 5792 // could also append it to a buffer to return it on complete 5793 }; 5794 proc.start(); 5795 --- 5796 5797 Please note that this does not currently and I have no plans as of this writing to add support for any kind of direct file descriptor passing. It always pipes them back to the parent for processing. If you don't want this, call the lower level functions yourself; the reason this class is here is to aid integration in the arsd.core event loop. Of course, I might change my mind on this. 5798 5799 Bugs: 5800 Not implemented at all on Windows yet. 5801 +/ 5802 class ExternalProcess /*: AsyncOperationRequest*/ { 5803 5804 private static version(Posix) { 5805 __gshared ExternalProcess[pid_t] activeChildren; 5806 5807 void recordChildCreated(pid_t pid, ExternalProcess proc) { 5808 synchronized(typeid(ExternalProcess)) { 5809 activeChildren[pid] = proc; 5810 } 5811 } 5812 5813 void recordChildTerminated(pid_t pid, int status) { 5814 synchronized(typeid(ExternalProcess)) { 5815 if(pid in activeChildren) { 5816 auto ac = activeChildren[pid]; 5817 ac.completed = true; 5818 ac.status = status; 5819 activeChildren.remove(pid); 5820 } 5821 } 5822 } 5823 } 5824 5825 // FIXME: config to pass through a shell or not 5826 5827 /++ 5828 This is the native version for Windows. 5829 +/ 5830 this(string program, string commandLine) { 5831 version(Posix) { 5832 assert(0, "not implemented command line to posix args yet"); 5833 } 5834 else throw new NotYetImplementedException(); 5835 } 5836 5837 this(string commandLine) { 5838 version(Posix) { 5839 assert(0, "not implemented command line to posix args yet"); 5840 } 5841 else throw new NotYetImplementedException(); 5842 } 5843 5844 this(string[] args) { 5845 version(Posix) { 5846 this.program = FilePath(args[0]); 5847 this.args = args; 5848 } 5849 else throw new NotYetImplementedException(); 5850 } 5851 5852 /++ 5853 This is the native version for Posix. 5854 +/ 5855 this(FilePath program, string[] args) { 5856 version(Posix) { 5857 this.program = program; 5858 this.args = args; 5859 } 5860 else throw new NotYetImplementedException(); 5861 } 5862 5863 // you can modify these before calling start 5864 int stdoutBufferSize = 32 * 1024; 5865 int stderrBufferSize = 8 * 1024; 5866 5867 void start() { 5868 version(Posix) { 5869 int ret; 5870 5871 int[2] stdinPipes; 5872 ret = pipe(stdinPipes); 5873 if(ret == -1) 5874 throw new ErrnoApiException("stdin pipe", errno); 5875 5876 scope(failure) { 5877 close(stdinPipes[0]); 5878 close(stdinPipes[1]); 5879 } 5880 5881 stdinFd = stdinPipes[1]; 5882 5883 int[2] stdoutPipes; 5884 ret = pipe(stdoutPipes); 5885 if(ret == -1) 5886 throw new ErrnoApiException("stdout pipe", errno); 5887 5888 scope(failure) { 5889 close(stdoutPipes[0]); 5890 close(stdoutPipes[1]); 5891 } 5892 5893 stdoutFd = stdoutPipes[0]; 5894 5895 int[2] stderrPipes; 5896 ret = pipe(stderrPipes); 5897 if(ret == -1) 5898 throw new ErrnoApiException("stderr pipe", errno); 5899 5900 scope(failure) { 5901 close(stderrPipes[0]); 5902 close(stderrPipes[1]); 5903 } 5904 5905 stderrFd = stderrPipes[0]; 5906 5907 5908 int[2] errorReportPipes; 5909 ret = pipe(errorReportPipes); 5910 if(ret == -1) 5911 throw new ErrnoApiException("error reporting pipe", errno); 5912 5913 scope(failure) { 5914 close(errorReportPipes[0]); 5915 close(errorReportPipes[1]); 5916 } 5917 5918 setCloExec(errorReportPipes[0]); 5919 setCloExec(errorReportPipes[1]); 5920 5921 auto forkRet = fork(); 5922 if(forkRet == -1) 5923 throw new ErrnoApiException("fork", errno); 5924 5925 if(forkRet == 0) { 5926 // child side 5927 5928 // FIXME can we do more error checking that is actually useful here? 5929 // these operations are virtually guaranteed to succeed given the setup anyway. 5930 5931 // FIXME pty too 5932 5933 void fail(int step) { 5934 import core.stdc.errno; 5935 auto code = errno; 5936 5937 // report the info back to the parent then exit 5938 5939 int[2] msg = [step, code]; 5940 auto ret = write(errorReportPipes[1], msg.ptr, msg.sizeof); 5941 5942 // but if this fails there's not much we can do... 5943 5944 import core.stdc.stdlib; 5945 exit(1); 5946 } 5947 5948 // dup2 closes the fd it is replacing automatically 5949 dup2(stdinPipes[0], 0); 5950 dup2(stdoutPipes[1], 1); 5951 dup2(stderrPipes[1], 2); 5952 5953 // don't need either of the original pipe fds anymore 5954 close(stdinPipes[0]); 5955 close(stdinPipes[1]); 5956 close(stdoutPipes[0]); 5957 close(stdoutPipes[1]); 5958 close(stderrPipes[0]); 5959 close(stderrPipes[1]); 5960 5961 // the error reporting pipe will be closed upon exec since we set cloexec before fork 5962 // and everything else should have cloexec set too hopefully. 5963 5964 if(beforeExec) 5965 beforeExec(); 5966 5967 // i'm not sure that a fully-initialized druntime is still usable 5968 // after a fork(), so i'm gonna stick to the C lib in here. 5969 5970 const(char)* file = mallocedStringz(program.path).ptr; 5971 if(file is null) 5972 fail(1); 5973 const(char)*[] argv = mallocSlice!(const(char)*)(args.length + 1); 5974 if(argv is null) 5975 fail(2); 5976 foreach(idx, arg; args) { 5977 argv[idx] = mallocedStringz(args[idx]).ptr; 5978 if(argv[idx] is null) 5979 fail(3); 5980 } 5981 argv[args.length] = null; 5982 5983 auto rete = execvp/*e*/(file, argv.ptr/*, envp*/); 5984 if(rete == -1) { 5985 fail(4); 5986 } else { 5987 // unreachable code, exec never returns if it succeeds 5988 assert(0); 5989 } 5990 } else { 5991 pid = forkRet; 5992 5993 recordChildCreated(pid, this); 5994 5995 // close our copy of the write side of the error reporting pipe 5996 // so the read will immediately give eof when the fork closes it too 5997 ErrnoEnforce!close(errorReportPipes[1]); 5998 5999 int[2] msg; 6000 // this will block to wait for it to actually either start up or fail to exec (which should be near instant) 6001 auto val = read(errorReportPipes[0], msg.ptr, msg.sizeof); 6002 6003 if(val == -1) 6004 throw new ErrnoApiException("read error report", errno); 6005 6006 if(val == msg.sizeof) { 6007 // error happened 6008 // FIXME: keep the step part of the error report too 6009 throw new ErrnoApiException("exec", msg[1]); 6010 } else if(val == 0) { 6011 // pipe closed, meaning exec succeeded 6012 } else { 6013 assert(0); // never supposed to happen 6014 } 6015 6016 // set the ones we keep to close upon future execs 6017 // FIXME should i set NOBLOCK at this time too? prolly should 6018 setCloExec(stdinPipes[1]); 6019 setCloExec(stdoutPipes[0]); 6020 setCloExec(stderrPipes[0]); 6021 6022 // and close the others 6023 ErrnoEnforce!close(stdinPipes[0]); 6024 ErrnoEnforce!close(stdoutPipes[1]); 6025 ErrnoEnforce!close(stderrPipes[1]); 6026 6027 ErrnoEnforce!close(errorReportPipes[0]); 6028 6029 // and now register the ones we need to read with the event loop so it can call the callbacks 6030 // also need to listen to SIGCHLD to queue up the terminated callback. FIXME 6031 6032 stdoutUnregisterToken = getThisThreadEventLoop().addCallbackOnFdReadable(stdoutFd, new CallbackHelper(&stdoutReadable)); 6033 stderrUnregisterToken = getThisThreadEventLoop().addCallbackOnFdReadable(stderrFd, new CallbackHelper(&stderrReadable)); 6034 } 6035 } 6036 } 6037 6038 private version(Posix) { 6039 import core.sys.posix.unistd; 6040 import core.sys.posix.fcntl; 6041 6042 int stdinFd = -1; 6043 int stdoutFd = -1; 6044 int stderrFd = -1; 6045 6046 ICoreEventLoop.UnregisterToken stdoutUnregisterToken; 6047 ICoreEventLoop.UnregisterToken stderrUnregisterToken; 6048 6049 pid_t pid = -1; 6050 6051 public void delegate() beforeExec; 6052 6053 FilePath program; 6054 string[] args; 6055 6056 void stdoutReadable() { 6057 if(stdoutReadBuffer is null) 6058 stdoutReadBuffer = new ubyte[](stdoutBufferSize); 6059 auto ret = read(stdoutFd, stdoutReadBuffer.ptr, stdoutReadBuffer.length); 6060 if(ret == -1) 6061 throw new ErrnoApiException("read", errno); 6062 if(onStdoutAvailable) { 6063 onStdoutAvailable(stdoutReadBuffer[0 .. ret]); 6064 } 6065 6066 if(ret == 0) { 6067 stdoutUnregisterToken.unregister(); 6068 6069 close(stdoutFd); 6070 stdoutFd = -1; 6071 } 6072 } 6073 6074 void stderrReadable() { 6075 if(stderrReadBuffer is null) 6076 stderrReadBuffer = new ubyte[](stderrBufferSize); 6077 auto ret = read(stderrFd, stderrReadBuffer.ptr, stderrReadBuffer.length); 6078 if(ret == -1) 6079 throw new ErrnoApiException("read", errno); 6080 if(onStderrAvailable) { 6081 onStderrAvailable(stderrReadBuffer[0 .. ret]); 6082 } 6083 6084 if(ret == 0) { 6085 stderrUnregisterToken.unregister(); 6086 6087 close(stderrFd); 6088 stderrFd = -1; 6089 } 6090 } 6091 } 6092 6093 private ubyte[] stdoutReadBuffer; 6094 private ubyte[] stderrReadBuffer; 6095 6096 void waitForCompletion() { 6097 getThisThreadEventLoop().run(&this.isComplete); 6098 } 6099 6100 bool isComplete() { 6101 return completed; 6102 } 6103 6104 bool completed; 6105 int status = int.min; 6106 6107 /++ 6108 If blocking, it will block the current task until the write succeeds. 6109 6110 Write `null` as data to close the pipe. Once the pipe is closed, you must not try to write to it again. 6111 +/ 6112 void writeToStdin(in void[] data) { 6113 version(Posix) { 6114 if(data is null) { 6115 close(stdinFd); 6116 stdinFd = -1; 6117 } else { 6118 // FIXME: check the return value again and queue async writes 6119 auto ret = write(stdinFd, data.ptr, data.length); 6120 if(ret == -1) 6121 throw new ErrnoApiException("write", errno); 6122 } 6123 } 6124 6125 } 6126 6127 void delegate(ubyte[] got) onStdoutAvailable; 6128 void delegate(ubyte[] got) onStderrAvailable; 6129 void delegate(int code) onTermination; 6130 6131 // pty? 6132 } 6133 6134 // FIXME: comment this out 6135 /+ 6136 unittest { 6137 auto proc = new ExternalProcess(FilePath("/bin/cat"), ["/bin/cat"]); 6138 6139 getThisThreadEventLoop(); // initialize it 6140 6141 int c = 0; 6142 proc.onStdoutAvailable = delegate(ubyte[] got) { 6143 if(c == 0) 6144 assert(cast(string) got == "hello!"); 6145 else 6146 assert(got.length == 0); 6147 // import std.stdio; writeln(got); 6148 c++; 6149 }; 6150 6151 proc.start(); 6152 6153 assert(proc.pid != -1); 6154 6155 6156 import std.stdio; 6157 Thread[4] pool; 6158 6159 bool shouldExit; 6160 6161 static int received; 6162 6163 proc.writeToStdin("hello!"); 6164 proc.writeToStdin(null); // closes the pipe 6165 6166 proc.waitForCompletion(); 6167 6168 assert(proc.status == 0); 6169 6170 assert(c == 2); 6171 6172 // writeln("here"); 6173 } 6174 +/ 6175 6176 // to test the thundering herd on signal handling 6177 version(none) 6178 unittest { 6179 Thread[4] pool; 6180 foreach(ref thread; pool) { 6181 thread = new class Thread { 6182 this() { 6183 super({ 6184 int count; 6185 getThisThreadEventLoop().run(() { 6186 if(count > 4) return true; 6187 count++; 6188 return false; 6189 }); 6190 }); 6191 } 6192 }; 6193 thread.start(); 6194 } 6195 foreach(ref thread; pool) { 6196 thread.join(); 6197 } 6198 } 6199 6200 /+ 6201 ================= 6202 STDIO REPLACEMENT 6203 ================= 6204 +/ 6205 6206 private void appendToBuffer(ref char[] buffer, ref int pos, scope const(char)[] what) { 6207 auto required = pos + what.length; 6208 if(buffer.length < required) 6209 buffer.length = required; 6210 buffer[pos .. pos + what.length] = what[]; 6211 pos += what.length; 6212 } 6213 6214 private void appendToBuffer(ref char[] buffer, ref int pos, long what) { 6215 if(buffer.length < pos + 16) 6216 buffer.length = pos + 16; 6217 auto sliced = intToString(what, buffer[pos .. $]); 6218 pos += sliced.length; 6219 } 6220 6221 /++ 6222 A `writeln` that actually works, at least for some basic types. 6223 6224 It works correctly on Windows, using the correct functions to write unicode to the console. even allocating a console if needed. If the output has been redirected to a file or pipe, it writes UTF-8. 6225 6226 This always does text. See also WritableStream and WritableTextStream when they are implemented. 6227 +/ 6228 void writeln(T...)(T t) { 6229 char[256] bufferBacking; 6230 char[] buffer = bufferBacking[]; 6231 int pos; 6232 6233 foreach(arg; t) { 6234 static if(is(typeof(arg) : const char[])) { 6235 appendToBuffer(buffer, pos, arg); 6236 } else static if(is(typeof(arg) : stringz)) { 6237 appendToBuffer(buffer, pos, arg.borrow); 6238 } else static if(is(typeof(arg) : long)) { 6239 appendToBuffer(buffer, pos, arg); 6240 } else static if(is(typeof(arg.toString()) : const char[])) { 6241 appendToBuffer(buffer, pos, arg.toString()); 6242 } else { 6243 appendToBuffer(buffer, pos, "<" ~ typeof(arg).stringof ~ ">"); 6244 } 6245 } 6246 6247 appendToBuffer(buffer, pos, "\n"); 6248 6249 actuallyWriteToStdout(buffer[0 .. pos]); 6250 } 6251 6252 private void actuallyWriteToStdout(scope char[] buffer) @trusted { 6253 version(Windows) { 6254 import core.sys.windows.wincon; 6255 6256 auto hStdOut = GetStdHandle(STD_OUTPUT_HANDLE); 6257 if(hStdOut == null || hStdOut == INVALID_HANDLE_VALUE) { 6258 AllocConsole(); 6259 hStdOut = GetStdHandle(STD_OUTPUT_HANDLE); 6260 } 6261 6262 if(GetFileType(hStdOut) == FILE_TYPE_CHAR) { 6263 wchar[256] wbuffer; 6264 auto toWrite = makeWindowsString(buffer, wbuffer, WindowsStringConversionFlags.convertNewLines); 6265 6266 DWORD written; 6267 WriteConsoleW(hStdOut, toWrite.ptr, cast(DWORD) toWrite.length, &written, null); 6268 } else { 6269 DWORD written; 6270 WriteFile(hStdOut, buffer.ptr, cast(DWORD) buffer.length, &written, null); 6271 } 6272 } else { 6273 import unix = core.sys.posix.unistd; 6274 unix.write(1, buffer.ptr, buffer.length); 6275 } 6276 } 6277 6278 /+ 6279 6280 STDIO 6281 6282 /++ 6283 Please note using this will create a compile-time dependency on [arsd.terminal] 6284 6285 6286 6287 so my writeln replacement: 6288 6289 1) if the std output handle is null, alloc one 6290 2) if it is a character device, write out the proper Unicode text. 6291 3) otherwise write out UTF-8.... maybe with a BOM but maybe not. it is tricky to know what the other end of a pipe expects... 6292 [8:15 AM] 6293 im actually tempted to make the write binary to stdout functions throw an exception if it is a character console / interactive terminal instead of letting you spam it right out 6294 [8:16 AM] 6295 of course you can still cheat by casting binary data to string and using the write string function (and this might be appropriate sometimes) but there kinda is a legit difference between a text output and a binary output device 6296 6297 Stdout can represent either 6298 6299 +/ 6300 void writeln(){} { 6301 6302 } 6303 6304 stderr? 6305 6306 /++ 6307 Please note using this will create a compile-time dependency on [arsd.terminal] 6308 6309 It can be called from a task. 6310 6311 It works correctly on Windows and is user friendly on Linux (using arsd.terminal.getline) 6312 while also working if stdin has been redirected (where arsd.terminal itself would throw) 6313 6314 6315 so say you run a program on an interactive terminal. the program tries to open the stdin binary stream 6316 6317 instead of throwing, the prompt could change to indicate the binary data is expected and you can feed it in either by typing it up,,,, or running some command like maybe <file.bin to have the library do what the shell would have done and feed that to the rest of the program 6318 6319 +/ 6320 string readln()() { 6321 6322 } 6323 6324 6325 // if using stdio as a binary output thing you can pretend it is a file w/ stream capability 6326 struct File { 6327 WritableStream ostream; 6328 ReadableStream istream; 6329 6330 ulong tell; 6331 void seek(ulong to) {} 6332 6333 void sync(); 6334 void close(); 6335 } 6336 6337 // these are a bit special because if it actually is an interactive character device, it might be different than other files and even different than other pipes. 6338 WritableStream stdoutStream() { return null; } 6339 WritableStream stderrStream() { return null; } 6340 ReadableStream stdinStream() { return null; } 6341 6342 +/ 6343 6344 6345 /+ 6346 6347 6348 /+ 6349 Druntime appears to have stuff for darwin, freebsd. I might have to add some for openbsd here and maybe netbsd if i care to test it. 6350 +/ 6351 6352 /+ 6353 6354 arsd_core_init(number_of_worker_threads) 6355 6356 Building-block things wanted for the event loop integration: 6357 * ui 6358 * windows 6359 * terminal / console 6360 * generic 6361 * adopt fd 6362 * adopt windows handle 6363 * shared lib 6364 * load 6365 * timers (relative and real time) 6366 * create 6367 * update 6368 * cancel 6369 * file/directory watches 6370 * file created 6371 * file deleted 6372 * file modified 6373 * file ops 6374 * open 6375 * close 6376 * read 6377 * write 6378 * seek 6379 * sendfile on linux, TransmitFile on Windows 6380 * let completion handlers run in the io worker thread instead of signaling back 6381 * pipe ops (anonymous or named) 6382 * create 6383 * read 6384 * write 6385 * get info about other side of the pipe 6386 * network ops (stream + datagram, ip, ipv6, unix) 6387 * address look up 6388 * connect 6389 * start tls 6390 * listen 6391 * send 6392 * receive 6393 * get peer info 6394 * process ops 6395 * spawn 6396 * notifications when it is terminated or fork or execs 6397 * send signal 6398 * i/o pipes 6399 * thread ops (isDaemon?) 6400 * spawn 6401 * talk to its event loop 6402 * termination notification 6403 * signals 6404 * ctrl+c is the only one i really care about but the others might be made available too. sigchld needs to be done as an impl detail of process ops. 6405 * custom messages 6406 * should be able to send messages from finalizers... 6407 6408 * want to make sure i can stream stuff on top of it all too. 6409 6410 ======== 6411 6412 These things all refer back to a task-local thing that queues the tasks. If it is a fiber, it uses that 6413 and if it is a thread it uses that... 6414 6415 tls IArsdCoreEventLoop curentTaskInterface; // this yields on the wait for calls. the fiber swapper will swap this too. 6416 tls IArsdCoreEventLoop currentThreadInterface; // this blocks on the event loop 6417 6418 shared IArsdCoreEventLoop currentProcessInterface; // this dispatches to any available thread 6419 +/ 6420 6421 6422 /+ 6423 You might have configurable tasks that do not auto-start, e.g. httprequest. maybe @mustUse on those 6424 6425 then some that do auto-start, e.g. setTimeout 6426 6427 6428 timeouts: duration, MonoTime, or SysTime? duration is just a timer monotime auto-adjusts the when, systime sets a real time timerfd 6429 6430 tasks can be set to: 6431 thread affinity - this, any, specific reference 6432 reports to - defaults to this, can also pass down a parent reference. if reports to dies, all its subordinates are cancelled. 6433 6434 6435 you can send a message to a task... maybe maybe just to a task runner (which is itself a task?) 6436 6437 auto file = readFile(x); 6438 auto timeout = setTimeout(y); 6439 auto completed = waitForFirstToCompleteThenCancelOthers(file, timeout); 6440 if(completed == 0) { 6441 file.... 6442 } else { 6443 timeout.... 6444 } 6445 6446 /+ 6447 A task will run on a thread (with possible migration), and report to a task. 6448 +/ 6449 6450 // a compute task is run on a helper thread 6451 auto task = computeTask((shared(bool)* cancellationRequested) { 6452 // or pass in a yield thing... prolly a TaskController which has cancellationRequested and yield controls as well as send message to parent (sync or async) 6453 6454 // you'd periodically send messages back to the parent 6455 }, RunOn.AnyAvailable, Affinity.CanMigrate); 6456 6457 auto task = Task((TaskController controller) { 6458 foreach(x, 0 .. 1000) { 6459 if(x % 10 == 0) 6460 controller.yield(); // periodically yield control, which also checks for cancellation for us 6461 // do some work 6462 6463 controller.sendMessage(...); 6464 controller.sendProgress(x); // yields it for a foreach stream kind of thing 6465 } 6466 6467 return something; // automatically sends the something as the result in a TaskFinished message 6468 }); 6469 6470 foreach(item; task) // waitsForProgress, sendProgress sends an item and the final return sends an item 6471 {} 6472 6473 6474 see ~/test/task.d 6475 6476 // an io task is run locally via the event loops 6477 auto task2 = ioTask(() { 6478 6479 }); 6480 6481 6482 6483 waitForEvent 6484 +/ 6485 6486 /+ 6487 Most functions should prolly take a thread arg too, which defaults 6488 to this thread, but you can also pass it a reference, or a "any available" thing. 6489 6490 This can be a ufcs overload 6491 +/ 6492 6493 interface SemiSynchronousTask { 6494 6495 } 6496 6497 struct TimeoutCompletionResult { 6498 bool completed; 6499 6500 bool opCast(T : bool)() { 6501 return completed; 6502 } 6503 } 6504 6505 struct Timeout { 6506 void reschedule(Duration when) { 6507 6508 } 6509 6510 void cancel() { 6511 6512 } 6513 6514 TimeoutCompletionResult waitForCompletion() { 6515 return TimeoutCompletionResult(false); 6516 } 6517 } 6518 6519 Timeout setTimeout(void delegate() dg, int msecs, int permittedJitter = 20) { 6520 return Timeout.init; 6521 } 6522 6523 void clearTimeout(Timeout timeout) { 6524 timeout.cancel(); 6525 } 6526 6527 void createInterval() {} 6528 void clearInterval() {} 6529 6530 /++ 6531 Schedules a task at the given wall clock time. 6532 +/ 6533 void scheduleTask() {} 6534 6535 struct IoOperationCompletionResult { 6536 enum Status { 6537 cancelled, 6538 completed 6539 } 6540 6541 Status status; 6542 6543 int error; 6544 int bytesWritten; 6545 6546 bool opCast(T : bool)() { 6547 return status == Status.completed; 6548 } 6549 } 6550 6551 struct IoOperation { 6552 void cancel() {} 6553 6554 IoOperationCompletionResult waitForCompletion() { 6555 return IoOperationCompletionResult.init; 6556 } 6557 6558 // could contain a scoped class in here too so it stack allocated 6559 } 6560 6561 // Should return both the object and the index in the array! 6562 Result waitForFirstToComplete(Operation[]...) {} 6563 6564 IoOperation read(IoHandle handle, ubyte[] buffer 6565 6566 /+ 6567 class IoOperation {} 6568 6569 // an io operation and its buffer must not be modified or freed 6570 // in between a call to enqueue and a call to waitForCompletion 6571 // if you used the whenComplete callback, make sure it is NOT gc'd or scope thing goes out of scope in the mean time 6572 // if its dtor runs, it'd be forced to be cancelled... 6573 6574 scope IoOperation op = new IoOperation(buffer_size); 6575 op.start(); 6576 op.waitForCompletion(); 6577 +/ 6578 6579 /+ 6580 will want: 6581 read, write 6582 send, recv 6583 6584 cancel 6585 6586 open file, open (named or anonymous) pipe, open process 6587 connect, accept 6588 SSL 6589 close 6590 6591 postEvent 6592 postAPC? like run in gui thread / async 6593 waitForEvent ? needs to handle a timeout and a cancellation. would only work in the fiber task api. 6594 6595 waitForSuccess 6596 6597 interrupt handler 6598 6599 onPosixReadReadiness 6600 onPosixWriteReadiness 6601 6602 onWindowsHandleReadiness 6603 - but they're one-offs so you gotta reregister for each event 6604 +/ 6605 6606 6607 6608 /+ 6609 arsd.core.uda 6610 6611 you define a model struct with the types you want to extract 6612 6613 you get it with like Model extract(Model, UDAs...)(Model default) 6614 6615 defaultModel!alias > defaultModel!Type(defaultModel("identifier")) 6616 6617 6618 6619 6620 6621 6622 6623 6624 6625 6626 so while i laid there sleep deprived i did think a lil more on some uda stuff. it isn't especially novel but a combination of a few other techniques 6627 6628 you might be like 6629 6630 struct MyUdas { 6631 DbName name; 6632 DbIgnore ignore; 6633 } 6634 6635 elsewhere 6636 6637 foreach(alias; allMembers) { 6638 auto udas = getUdas!(MyUdas, __traits(getAttributes, alias))(MyUdas(DbName(__traits(identifier, alias)))); 6639 } 6640 6641 6642 so you pass the expected type and the attributes as the template params, then the runtime params are the default values for the given types 6643 6644 so what the thing does essentially is just sets the values of the given thing to the udas based on type then returns the modified instance 6645 6646 so the end result is you keep the last ones. it wouldn't report errors if multiple things added but it p simple to understand, simple to document (even though the default values are not in the struct itself, you can put ddocs in them), and uses the tricks to minimize generated code size 6647 +/ 6648 6649 +/ 6650 6651 private version(Windows) extern(Windows) { 6652 BOOL CancelIoEx(HANDLE, LPOVERLAPPED); 6653 6654 struct WSABUF { 6655 ULONG len; 6656 ubyte* buf; 6657 } 6658 alias LPWSABUF = WSABUF*; 6659 6660 // https://learn.microsoft.com/en-us/windows/win32/api/winsock2/ns-winsock2-wsaoverlapped 6661 // "The WSAOVERLAPPED structure is compatible with the Windows OVERLAPPED structure." 6662 // so ima lie here in the bindings. 6663 6664 int WSASend(SOCKET, LPWSABUF, DWORD, LPDWORD, DWORD, LPOVERLAPPED, LPOVERLAPPED_COMPLETION_ROUTINE); 6665 int WSASendTo(SOCKET, LPWSABUF, DWORD, LPDWORD, DWORD, const sockaddr*, int, LPOVERLAPPED, LPOVERLAPPED_COMPLETION_ROUTINE); 6666 6667 int WSARecv(SOCKET, LPWSABUF, DWORD, LPDWORD, LPDWORD, LPOVERLAPPED, LPOVERLAPPED_COMPLETION_ROUTINE); 6668 int WSARecvFrom(SOCKET, LPWSABUF, DWORD, LPDWORD, LPDWORD, sockaddr*, LPINT, LPOVERLAPPED, LPOVERLAPPED_COMPLETION_ROUTINE); 6669 }