1 /++ 2 $(PITFALL 3 Please note: the api and behavior of this module is not externally stable at this time. See the documentation on specific functions for details. 4 ) 5 6 Shared core functionality including exception helpers, library loader, event loop, and possibly more. Maybe command line processor and uda helper and some basic shared annotation types. 7 8 I'll probably move the url, websocket, and ssl stuff in here too as they are often shared. Maybe a small internationalization helper type (a hook for external implementation) and COM helpers too. I might move the process helpers out to their own module - even things in here are not considered stable to library users at this time! 9 10 If you use this directly outside the arsd library despite its current instability caveats, you might consider using `static import` since names in here are likely to clash with Phobos if you use them together. `static import` will let you easily disambiguate and avoid name conflict errors if I add more here. Some names even clash deliberately to remind me to avoid some antipatterns inside the arsd modules! 11 12 ## Contributor notes 13 14 arsd.core should be focused on things that enable interoperability primarily and secondarily increased code quality between other, otherwise independent arsd modules. As a foundational library, it is not permitted to import anything outside the druntime `core` namespace, except in templates and examples not normally compiled in. This keeps it independent and avoids transitive dependency spillover to end users while also keeping compile speeds fast. To help keep builds snappy, also avoid significant use of ctfe inside this module. 15 16 On my linux computer, `dmd -unittest -main core.d` takes about a quarter second to run. We do not want this to grow. 17 18 `@safe` compatibility is ok when it isn't too big of a hassle. `@nogc` is a non-goal. I might accept it on some of the trivial functions but if it means changing the logic in any way to support, you will need a compelling argument to justify it. The arsd libs are supposed to be reliable and easy to use. That said, of course, don't be unnecessarily wasteful - if you can easily provide a reliable and easy to use way to let advanced users do their thing without hurting the other cases, let's discuss it. 19 20 If functionality is not needed by multiple existing arsd modules, consider adding a new module instead of adding it to the core. 21 22 Unittests should generally be hidden behind a special version guard so they don't interfere with end user tests. 23 24 History: 25 Added March 2023 (dub v11.0). Several functions were migrated in here at that time, noted individually. Members without a note were added with the module. 26 +/ 27 module arsd.core; 28 29 // FIXME: add callbacks on file open for tracing dependencies dynamically 30 31 // see for useful info: https://devblogs.microsoft.com/dotnet/how-async-await-really-works/ 32 33 // see: https://wiki.openssl.org/index.php/Simple_TLS_Server 34 35 import core.thread; 36 import core..volatile; 37 import core.atomic; 38 import core.time; 39 40 import core.stdc.errno; 41 42 import core.attribute; 43 static if(!__traits(hasMember, core.attribute, "mustuse")) 44 enum mustuse; 45 46 // FIXME: add an arena allocator? can do task local destruction maybe. 47 48 // the three implementations are windows, epoll, and kqueue 49 version(Windows) { 50 version=Arsd_core_windows; 51 52 // import core.sys.windows.windows; 53 import core.sys.windows.winbase; 54 import core.sys.windows.windef; 55 import core.sys.windows.winnls; 56 import core.sys.windows.winuser; 57 import core.sys.windows.winsock2; 58 59 pragma(lib, "user32"); 60 pragma(lib, "ws2_32"); 61 } else version(linux) { 62 version=Arsd_core_epoll; 63 64 version=Arsd_core_has_cloexec; 65 } else version(FreeBSD) { 66 version=Arsd_core_kqueue; 67 68 import core.sys.freebsd.sys.event; 69 } else version(DragonFlyBSD) { 70 // NOT ACTUALLY TESTED 71 version=Arsd_core_kqueue; 72 73 import core.sys.dragonflybsd.sys.event; 74 } else version(NetBSD) { 75 // NOT ACTUALLY TESTED 76 version=Arsd_core_kqueue; 77 78 import core.sys.netbsd.sys.event; 79 } else version(OpenBSD) { 80 version=Arsd_core_kqueue; 81 82 // THIS FILE DOESN'T ACTUALLY EXIST, WE NEED TO MAKE IT 83 import core.sys.openbsd.sys.event; 84 } else version(OSX) { 85 version=Arsd_core_kqueue; 86 87 import core.sys.darwin.sys.event; 88 } 89 90 version(Posix) { 91 import core.sys.posix.signal; 92 import core.sys.posix.unistd; 93 94 import core.sys.posix.sys.un; 95 import core.sys.posix.sys.socket; 96 import core.sys.posix.netinet.in_; 97 } 98 99 // FIXME: the exceptions should actually give some explanatory text too (at least sometimes) 100 101 /+ 102 ========================= 103 GENERAL UTILITY FUNCTIONS 104 ========================= 105 +/ 106 107 // enum stringz : const(char)* { init = null } 108 109 /++ 110 A wrapper around a `const(char)*` to indicate that it is a zero-terminated C string. 111 +/ 112 struct stringz { 113 private const(char)* raw; 114 115 /++ 116 Wraps the given pointer in the struct. Note that it retains a copy of the pointer. 117 +/ 118 this(const(char)* raw) { 119 this.raw = raw; 120 } 121 122 /++ 123 Returns the original raw pointer back out. 124 +/ 125 const(char)* ptr() const { 126 return raw; 127 } 128 129 /++ 130 Borrows a slice of the pointer up to (but not including) the zero terminator. 131 +/ 132 const(char)[] borrow() const { 133 if(raw is null) 134 return null; 135 136 const(char)* p = raw; 137 int length; 138 while(*p++) length++; 139 140 return raw[0 .. length]; 141 } 142 } 143 144 /++ 145 A limited variant to hold just a few types. It is made for the use of packing a small amount of extra data into error messages. 146 +/ 147 /+ 148 * if length and ptr are both 0, it is null 149 * if ptr == 1, length is an integer 150 * if ptr == 2, length is an unsigned integer (suggest printing in hex) 151 * if ptr == 3, length is a combination of flags (suggest printing in binary) 152 * if ptr == 4, length is a unix permission thing (suggest printing in octal) 153 * if ptr == 5, length is a double float 154 * if ptr == 15, length must be 0. this holds an empty, non-null, SSO string. 155 * if ptr >= 16 && < 24, length is reinterpret-casted a small string of length of (ptr & 0x7) + 1 156 * if length == size_t.max, ptr is interpreted as a stringz 157 * if ptr >= 1024, it is a non-null D string or byte array. It is a string if the length high bit is clear, a byte array if it is set. the length is what is left after you mask that out. 158 159 All other ptr values are reserved for future expansion. 160 +/ 161 struct LimitedVariant { 162 163 /++ 164 165 +/ 166 enum Contains { 167 null_, 168 intDecimal, 169 intHex, 170 intBinary, 171 intOctal, 172 double_, 173 emptySso, 174 stringSso, 175 stringz, 176 string, 177 bytes, 178 179 invalid, 180 } 181 182 /++ 183 184 +/ 185 Contains contains() const { 186 auto tag = cast(size_t) ptr; 187 if(ptr is null && length is null) 188 return Contains.null_; 189 else switch(tag) { 190 case 1: return Contains.intDecimal; 191 case 2: return Contains.intHex; 192 case 3: return Contains.intBinary; 193 case 4: return Contains.intOctal; 194 case 5: return Contains.double_; 195 case 15: return length is null ? Contains.emptySso : Contains.invalid; 196 default: 197 if(tag >= 16 && tag < 24) { 198 return Contains.stringSso; 199 } else if(tag >= 1024) { 200 if(cast(size_t) length == size_t.max) 201 return Contains.stringz; 202 else 203 return isHighBitSet ? Contains.bytes : Contains..string; 204 } else { 205 return Contains.invalid; 206 } 207 } 208 } 209 210 /// ditto 211 bool containsInt() const { 212 with(Contains) 213 switch(contains) { 214 case intDecimal, intHex, intBinary, intOctal: 215 return true; 216 default: 217 return false; 218 } 219 } 220 221 /// ditto 222 bool containsString() const { 223 with(Contains) 224 switch(contains) { 225 case null_, emptySso, stringSso, string: 226 // case stringz: 227 return true; 228 default: 229 return false; 230 } 231 } 232 233 /// ditto 234 bool containsDouble() const { 235 with(Contains) 236 switch(contains) { 237 case double_: 238 return true; 239 default: 240 return false; 241 } 242 } 243 244 /// ditto 245 bool containsBytes() const { 246 with(Contains) 247 switch(contains) { 248 case bytes, null_: 249 return true; 250 default: 251 return false; 252 } 253 } 254 255 private const(void)* length; 256 private const(ubyte)* ptr; 257 258 private void Throw() const { 259 throw ArsdException!"LimitedVariant"(cast(size_t) length, cast(size_t) ptr); 260 } 261 262 private bool isHighBitSet() const { 263 return (cast(size_t) length >> (size_t.sizeof * 8 - 1) & 0x1) != 0; 264 } 265 266 /++ 267 getString gets a reference to the string stored internally, see [toString] to get a string representation or whatever is inside. 268 269 +/ 270 const(char)[] getString() const return { 271 with(Contains) 272 switch(contains()) { 273 case null_: 274 return null; 275 case emptySso: 276 return (cast(const(char)*) ptr)[0 .. 0]; // zero length, non-null 277 case stringSso: 278 auto len = ((cast(size_t) ptr) & 0x7) + 1; 279 return (cast(char*) &length)[0 .. len]; 280 case string: 281 return (cast(const(char)*) ptr)[0 .. cast(size_t) length]; 282 default: 283 Throw(); assert(0); 284 } 285 } 286 287 /// ditto 288 long getInt() const { 289 if(containsInt) 290 return cast(long) length; 291 else 292 Throw(); 293 assert(0); 294 } 295 296 /// ditto 297 double getDouble() const { 298 if(containsDouble) 299 return *cast(double*) &length; 300 else 301 Throw(); 302 assert(0); 303 } 304 305 /// ditto 306 const(ubyte)[] getBytes() const { 307 with(Contains) 308 switch(contains()) { 309 case null_: 310 return null; 311 case bytes: 312 return ptr[0 .. (cast(size_t) length) & ((1UL << (size_t.sizeof * 8 - 1)) - 1)]; 313 default: 314 Throw(); assert(0); 315 } 316 } 317 318 /++ 319 320 +/ 321 string toString() const { 322 323 string intHelper(string prefix, int radix) { 324 char[128] buffer; 325 buffer[0 .. prefix.length] = prefix[]; 326 char[] toUse = buffer[prefix.length .. $]; 327 328 auto got = intToString(getInt(), toUse[], IntToStringArgs().withRadix(radix)); 329 330 return buffer[0 .. prefix.length + got.length].idup; 331 } 332 333 with(Contains) 334 final switch(contains()) { 335 case null_: 336 return "<null>"; 337 case intDecimal: 338 return intHelper("", 10); 339 case intHex: 340 return intHelper("0x", 16); 341 case intBinary: 342 return intHelper("0b", 2); 343 case intOctal: 344 return intHelper("0o", 8); 345 case emptySso, stringSso, string: 346 return getString().idup; 347 case bytes: 348 auto b = getBytes(); 349 350 return "<bytes>"; // FIXME 351 352 case double_: 353 assert(0); // FIXME 354 case stringz: 355 assert(0); // FIXME 356 case invalid: 357 return "<invalid>"; 358 } 359 } 360 361 /++ 362 363 +/ 364 this(string s) { 365 ptr = cast(const(ubyte)*) s.ptr; 366 length = cast(void*) s.length; 367 } 368 369 /// ditto 370 this(const(ubyte)[] b) { 371 ptr = cast(const(ubyte)*) b.ptr; 372 length = cast(void*) (b.length | (1UL << (size_t.sizeof * 8 - 1))); 373 } 374 375 /// ditto 376 this(long l, int base = 10) { 377 int tag; 378 switch(base) { 379 case 10: tag = 1; break; 380 case 16: tag = 2; break; 381 case 2: tag = 3; break; 382 case 8: tag = 4; break; 383 default: assert(0, "You passed an invalid base to LimitedVariant"); 384 } 385 ptr = cast(ubyte*) tag; 386 length = cast(void*) l; 387 } 388 389 /// ditto 390 version(none) 391 this(double d) { 392 // this crashes dmd! omg 393 assert(0); 394 // ptr = cast(ubyte*) 15; 395 // length = cast(void*) *cast(size_t*) &d; 396 } 397 } 398 399 unittest { 400 LimitedVariant v = LimitedVariant("foo"); 401 assert(v.containsString()); 402 assert(!v.containsInt()); 403 assert(v.getString() == "foo"); 404 405 LimitedVariant v2 = LimitedVariant(4); 406 assert(v2.containsInt()); 407 assert(!v2.containsString()); 408 assert(v2.getInt() == 4); 409 410 LimitedVariant v3 = LimitedVariant(cast(ubyte[]) [1, 2, 3]); 411 assert(v3.containsBytes()); 412 assert(!v3.containsString()); 413 assert(v3.getBytes() == [1, 2, 3]); 414 } 415 416 /++ 417 This is a dummy type to indicate the end of normal arguments and the beginning of the file/line inferred args. It is meant to ensure you don't accidentally send a string that is interpreted as a filename when it was meant to be a normal argument to the function and trigger the wrong overload. 418 +/ 419 struct ArgSentinel {} 420 421 /++ 422 A trivial wrapper around C's malloc that creates a D slice. It multiples n by T.sizeof and returns the slice of the pointer from 0 to n. 423 424 Please note that the ptr might be null - it is your responsibility to check that, same as normal malloc. Check `ret is null` specifically, since `ret.length` will always be `n`, even if the `malloc` failed. 425 426 Remember to `free` the returned pointer with `core.stdc.stdlib.free(ret.ptr);` 427 428 $(TIP 429 I strongly recommend you simply use the normal garbage collector unless you have a very specific reason not to. 430 ) 431 432 See_Also: 433 [mallocedStringz] 434 +/ 435 T[] mallocSlice(T)(size_t n) { 436 import c = core.stdc.stdlib; 437 438 return (cast(T*) c.malloc(n * T.sizeof))[0 .. n]; 439 } 440 441 /++ 442 Uses C's malloc to allocate a copy of `original` with an attached zero terminator. It may return a slice with a `null` pointer (but non-zero length!) if `malloc` fails and you are responsible for freeing the returned pointer with `core.stdc.stdlib.free(ret.ptr)`. 443 444 $(TIP 445 I strongly recommend you use [CharzBuffer] or Phobos' [std.string.toStringz] instead unless there's a special reason not to. 446 ) 447 448 See_Also: 449 [CharzBuffer] for a generally better alternative. You should only use `mallocedStringz` where `CharzBuffer` cannot be used (e.g. when druntime is not usable or you have no stack space for the temporary buffer). 450 451 [mallocSlice] is the function this function calls, so the notes in its documentation applies here too. 452 +/ 453 char[] mallocedStringz(in char[] original) { 454 auto slice = mallocSlice!char(original.length + 1); 455 if(slice is null) 456 return null; 457 slice[0 .. original.length] = original[]; 458 slice[original.length] = 0; 459 return slice; 460 } 461 462 /++ 463 Basically a `scope class` you can return from a function or embed in another aggregate. 464 +/ 465 struct OwnedClass(Class) { 466 ubyte[__traits(classInstanceSize, Class)] rawData; 467 468 static OwnedClass!Class defaultConstructed() { 469 OwnedClass!Class i = OwnedClass!Class.init; 470 i.initializeRawData(); 471 return i; 472 } 473 474 private void initializeRawData() @trusted { 475 if(!this) 476 rawData[] = cast(ubyte[]) typeid(Class).initializer[]; 477 } 478 479 this(T...)(T t) { 480 initializeRawData(); 481 rawInstance.__ctor(t); 482 } 483 484 bool opCast(T : bool)() @trusted { 485 return !(*(cast(void**) rawData.ptr) is null); 486 } 487 488 @disable this(); 489 @disable this(this); 490 491 Class rawInstance() return @trusted { 492 if(!this) 493 throw new Exception("null"); 494 return cast(Class) rawData.ptr; 495 } 496 497 alias rawInstance this; 498 499 ~this() @trusted { 500 if(this) 501 .destroy(rawInstance()); 502 } 503 } 504 505 506 507 version(Posix) 508 package(arsd) void makeNonBlocking(int fd) { 509 import core.sys.posix.fcntl; 510 auto flags = fcntl(fd, F_GETFL, 0); 511 if(flags == -1) 512 throw new ErrnoApiException("fcntl get", errno); 513 flags |= O_NONBLOCK; 514 auto s = fcntl(fd, F_SETFL, flags); 515 if(s == -1) 516 throw new ErrnoApiException("fcntl set", errno); 517 } 518 519 version(Posix) 520 package(arsd) void setCloExec(int fd) { 521 import core.sys.posix.fcntl; 522 auto flags = fcntl(fd, F_GETFD, 0); 523 if(flags == -1) 524 throw new ErrnoApiException("fcntl get", errno); 525 flags |= FD_CLOEXEC; 526 auto s = fcntl(fd, F_SETFD, flags); 527 if(s == -1) 528 throw new ErrnoApiException("fcntl set", errno); 529 } 530 531 532 /++ 533 A helper object for temporarily constructing a string appropriate for the Windows API from a D UTF-8 string. 534 535 536 It will use a small internal static buffer is possible, and allocate a new buffer if the string is too big. 537 538 History: 539 Moved from simpledisplay.d to core.d in March 2023 (dub v11.0). 540 +/ 541 version(Windows) 542 struct WCharzBuffer { 543 private wchar[] buffer; 544 private wchar[128] staticBuffer = void; 545 546 /// Length of the string, excluding the zero terminator. 547 size_t length() { 548 return buffer.length; 549 } 550 551 // Returns the pointer to the internal buffer. You must assume its lifetime is less than that of the WCharzBuffer. It is zero-terminated. 552 wchar* ptr() { 553 return buffer.ptr; 554 } 555 556 /// Returns the slice of the internal buffer, excluding the zero terminator (though there is one present right off the end of the slice). You must assume its lifetime is less than that of the WCharzBuffer. 557 wchar[] slice() { 558 return buffer; 559 } 560 561 /// Copies it into a static array of wchars 562 void copyInto(R)(ref R r) { 563 static if(is(R == wchar[N], size_t N)) { 564 r[0 .. this.length] = slice[]; 565 r[this.length] = 0; 566 } else static assert(0, "can only copy into wchar[n], not " ~ R.stringof); 567 } 568 569 /++ 570 conversionFlags = [WindowsStringConversionFlags] 571 +/ 572 this(in char[] data, int conversionFlags = 0) { 573 conversionFlags |= WindowsStringConversionFlags.zeroTerminate; // this ALWAYS zero terminates cuz of its name 574 auto sz = sizeOfConvertedWstring(data, conversionFlags); 575 if(sz > staticBuffer.length) 576 buffer = new wchar[](sz); 577 else 578 buffer = staticBuffer[]; 579 580 buffer = makeWindowsString(data, buffer, conversionFlags); 581 } 582 } 583 584 /++ 585 Alternative for toStringz 586 587 History: 588 Added March 18, 2023 (dub v11.0) 589 +/ 590 struct CharzBuffer { 591 private char[] buffer; 592 private char[128] staticBuffer = void; 593 594 /// Length of the string, excluding the zero terminator. 595 size_t length() { 596 assert(buffer.length > 0); 597 return buffer.length - 1; 598 } 599 600 // Returns the pointer to the internal buffer. You must assume its lifetime is less than that of the CharzBuffer. It is zero-terminated. 601 char* ptr() { 602 return buffer.ptr; 603 } 604 605 /// Returns the slice of the internal buffer, excluding the zero terminator (though there is one present right off the end of the slice). You must assume its lifetime is less than that of the CharzBuffer. 606 char[] slice() { 607 assert(buffer.length > 0); 608 return buffer[0 .. $-1]; 609 } 610 611 /// Copies it into a static array of chars 612 void copyInto(R)(ref R r) { 613 static if(is(R == char[N], size_t N)) { 614 r[0 .. this.length] = slice[]; 615 r[this.length] = 0; 616 } else static assert(0, "can only copy into char[n], not " ~ R.stringof); 617 } 618 619 @disable this(); 620 @disable this(this); 621 622 /++ 623 Copies `data` into the CharzBuffer, allocating a new one if needed, and zero-terminates it. 624 +/ 625 this(in char[] data) { 626 if(data.length + 1 > staticBuffer.length) 627 buffer = new char[](data.length + 1); 628 else 629 buffer = staticBuffer[]; 630 631 buffer[0 .. data.length] = data[]; 632 buffer[data.length] = 0; 633 } 634 } 635 636 /++ 637 Given the string `str`, converts it to a string compatible with the Windows API and puts the result in `buffer`, returning the slice of `buffer` actually used. `buffer` must be at least [sizeOfConvertedWstring] elements long. 638 639 History: 640 Moved from simpledisplay.d to core.d in March 2023 (dub v11.0). 641 +/ 642 version(Windows) 643 wchar[] makeWindowsString(in char[] str, wchar[] buffer, int conversionFlags = WindowsStringConversionFlags.zeroTerminate) { 644 if(str.length == 0) 645 return null; 646 647 int pos = 0; 648 dchar last; 649 foreach(dchar c; str) { 650 if(c <= 0xFFFF) { 651 if((conversionFlags & WindowsStringConversionFlags.convertNewLines) && c == 10 && last != 13) 652 buffer[pos++] = 13; 653 buffer[pos++] = cast(wchar) c; 654 } else if(c <= 0x10FFFF) { 655 buffer[pos++] = cast(wchar)((((c - 0x10000) >> 10) & 0x3FF) + 0xD800); 656 buffer[pos++] = cast(wchar)(((c - 0x10000) & 0x3FF) + 0xDC00); 657 } 658 659 last = c; 660 } 661 662 if(conversionFlags & WindowsStringConversionFlags.zeroTerminate) { 663 buffer[pos] = 0; 664 } 665 666 return buffer[0 .. pos]; 667 } 668 669 /++ 670 Converts the Windows API string `str` to a D UTF-8 string, storing it in `buffer`. Returns the slice of `buffer` actually used. 671 672 History: 673 Moved from simpledisplay.d to core.d in March 2023 (dub v11.0). 674 +/ 675 version(Windows) 676 char[] makeUtf8StringFromWindowsString(in wchar[] str, char[] buffer) { 677 if(str.length == 0) 678 return null; 679 680 auto got = WideCharToMultiByte(CP_UTF8, 0, str.ptr, cast(int) str.length, buffer.ptr, cast(int) buffer.length, null, null); 681 if(got == 0) { 682 if(GetLastError() == ERROR_INSUFFICIENT_BUFFER) 683 throw new object.Exception("not enough buffer"); 684 else 685 throw new object.Exception("conversion"); // FIXME: GetLastError 686 } 687 return buffer[0 .. got]; 688 } 689 690 /++ 691 Converts the Windows API string `str` to a newly-allocated D UTF-8 string. 692 693 History: 694 Moved from simpledisplay.d to core.d in March 2023 (dub v11.0). 695 +/ 696 version(Windows) 697 string makeUtf8StringFromWindowsString(in wchar[] str) { 698 char[] buffer; 699 auto got = WideCharToMultiByte(CP_UTF8, 0, str.ptr, cast(int) str.length, null, 0, null, null); 700 buffer.length = got; 701 702 // it is unique because we just allocated it above! 703 return cast(string) makeUtf8StringFromWindowsString(str, buffer); 704 } 705 706 /// ditto 707 version(Windows) 708 string makeUtf8StringFromWindowsString(wchar* str) { 709 char[] buffer; 710 auto got = WideCharToMultiByte(CP_UTF8, 0, str, -1, null, 0, null, null); 711 buffer.length = got; 712 713 got = WideCharToMultiByte(CP_UTF8, 0, str, -1, buffer.ptr, cast(int) buffer.length, null, null); 714 if(got == 0) { 715 if(GetLastError() == ERROR_INSUFFICIENT_BUFFER) 716 throw new object.Exception("not enough buffer"); 717 else 718 throw new object.Exception("conversion"); // FIXME: GetLastError 719 } 720 return cast(string) buffer[0 .. got]; 721 } 722 723 // only used from minigui rn 724 package int findIndexOfZero(in wchar[] str) { 725 foreach(idx, wchar ch; str) 726 if(ch == 0) 727 return cast(int) idx; 728 return cast(int) str.length; 729 } 730 package int findIndexOfZero(in char[] str) { 731 foreach(idx, char ch; str) 732 if(ch == 0) 733 return cast(int) idx; 734 return cast(int) str.length; 735 } 736 737 /++ 738 Returns a minimum buffer length to hold the string `s` with the given conversions. It might be slightly larger than necessary, but is guaranteed to be big enough to hold it. 739 740 History: 741 Moved from simpledisplay.d to core.d in March 2023 (dub v11.0). 742 +/ 743 version(Windows) 744 int sizeOfConvertedWstring(in char[] s, int conversionFlags) { 745 int size = 0; 746 747 if(conversionFlags & WindowsStringConversionFlags.convertNewLines) { 748 // need to convert line endings, which means the length will get bigger. 749 750 // BTW I betcha this could be faster with some simd stuff. 751 char last; 752 foreach(char ch; s) { 753 if(ch == 10 && last != 13) 754 size++; // will add a 13 before it... 755 size++; 756 last = ch; 757 } 758 } else { 759 // no conversion necessary, just estimate based on length 760 /* 761 I don't think there's any string with a longer length 762 in code units when encoded in UTF-16 than it has in UTF-8. 763 This will probably over allocate, but that's OK. 764 */ 765 size = cast(int) s.length; 766 } 767 768 if(conversionFlags & WindowsStringConversionFlags.zeroTerminate) 769 size++; 770 771 return size; 772 } 773 774 /++ 775 Used by [makeWindowsString] and [WCharzBuffer] 776 777 History: 778 Moved from simpledisplay.d to core.d in March 2023 (dub v11.0). 779 +/ 780 version(Windows) 781 enum WindowsStringConversionFlags : int { 782 /++ 783 Append a zero terminator to the string. 784 +/ 785 zeroTerminate = 1, 786 /++ 787 Converts newlines from \n to \r\n. 788 +/ 789 convertNewLines = 2, 790 } 791 792 /++ 793 An int printing function that doesn't need to import Phobos. Can do some of the things std.conv.to and std.format.format do. 794 795 The buffer must be sized to hold the converted number. 32 chars is enough for most anything. 796 797 Returns: the slice of `buffer` containing the converted number. 798 +/ 799 char[] intToString(long value, char[] buffer, IntToStringArgs args = IntToStringArgs.init) { 800 const int radix = args.radix ? args.radix : 10; 801 const int digitsPad = args.padTo; 802 const int groupSize = args.groupSize; 803 804 int pos; 805 806 if(value < 0) { 807 buffer[pos++] = '-'; 808 value = -value; 809 } 810 811 int start = pos; 812 int digitCount; 813 814 do { 815 auto remainder = value % radix; 816 value = value / radix; 817 818 buffer[pos++] = cast(char) (remainder < 10 ? (remainder + '0') : (remainder - 10 + args.ten)); 819 digitCount++; 820 } while(value); 821 822 if(digitsPad > 0) { 823 while(digitCount < digitsPad) { 824 buffer[pos++] = args.padWith; 825 digitCount++; 826 } 827 } 828 829 assert(pos >= 1); 830 assert(pos - start > 0); 831 832 auto reverseSlice = buffer[start .. pos]; 833 for(int i = 0; i < reverseSlice.length / 2; i++) { 834 auto paired = cast(int) reverseSlice.length - i - 1; 835 char tmp = reverseSlice[i]; 836 reverseSlice[i] = reverseSlice[paired]; 837 reverseSlice[paired] = tmp; 838 } 839 840 return buffer[0 .. pos]; 841 } 842 843 /// ditto 844 struct IntToStringArgs { 845 private { 846 ubyte padTo; 847 char padWith; 848 ubyte radix; 849 char ten; 850 ubyte groupSize; 851 char separator; 852 } 853 854 IntToStringArgs withPadding(int padTo, char padWith = '0') { 855 IntToStringArgs args = this; 856 args.padTo = cast(ubyte) padTo; 857 args.padWith = padWith; 858 return args; 859 } 860 861 IntToStringArgs withRadix(int radix, char ten = 'a') { 862 IntToStringArgs args = this; 863 args.radix = cast(ubyte) radix; 864 args.ten = ten; 865 return args; 866 } 867 868 IntToStringArgs withGroupSeparator(int groupSize, char separator = '_') { 869 IntToStringArgs args = this; 870 args.groupSize = cast(ubyte) groupSize; 871 args.separator = separator; 872 return args; 873 } 874 } 875 876 unittest { 877 char[32] buffer; 878 assert(intToString(0, buffer[]) == "0"); 879 assert(intToString(-1, buffer[]) == "-1"); 880 assert(intToString(-132, buffer[]) == "-132"); 881 assert(intToString(-1932, buffer[]) == "-1932"); 882 assert(intToString(1, buffer[]) == "1"); 883 assert(intToString(132, buffer[]) == "132"); 884 assert(intToString(1932, buffer[]) == "1932"); 885 886 assert(intToString(0x1, buffer[], IntToStringArgs().withRadix(16)) == "1"); 887 assert(intToString(0x1b, buffer[], IntToStringArgs().withRadix(16)) == "1b"); 888 assert(intToString(0xef1, buffer[], IntToStringArgs().withRadix(16)) == "ef1"); 889 890 assert(intToString(0xef1, buffer[], IntToStringArgs().withRadix(16).withPadding(8)) == "00000ef1"); 891 assert(intToString(-0xef1, buffer[], IntToStringArgs().withRadix(16).withPadding(8)) == "-00000ef1"); 892 assert(intToString(-0xef1, buffer[], IntToStringArgs().withRadix(16, 'A').withPadding(8, ' ')) == "- EF1"); 893 } 894 895 /++ 896 History: 897 Moved from color.d to core.d in March 2023 (dub v11.0). 898 +/ 899 nothrow @safe @nogc pure 900 inout(char)[] stripInternal(return inout(char)[] s) { 901 foreach(i, char c; s) 902 if(c != ' ' && c != '\t' && c != '\n' && c != '\r') { 903 s = s[i .. $]; 904 break; 905 } 906 for(int a = cast(int)(s.length - 1); a > 0; a--) { 907 char c = s[a]; 908 if(c != ' ' && c != '\t' && c != '\n' && c != '\r') { 909 s = s[0 .. a + 1]; 910 break; 911 } 912 } 913 914 return s; 915 } 916 917 nothrow @safe @nogc pure 918 inout(char)[] stripRightInternal(return inout(char)[] s) { 919 for(int a = cast(int)(s.length - 1); a > 0; a--) { 920 char c = s[a]; 921 if(c != ' ' && c != '\t' && c != '\n' && c != '\r') { 922 s = s[0 .. a + 1]; 923 break; 924 } 925 } 926 927 return s; 928 929 } 930 931 /++ 932 Shortcut for converting some types to string without invoking Phobos (but it will as a last resort). 933 934 History: 935 Moved from color.d to core.d in March 2023 (dub v11.0). 936 +/ 937 string toStringInternal(T)(T t) { 938 char[32] buffer; 939 static if(is(T : string)) 940 return t; 941 else static if(is(T : long)) 942 return intToString(t, buffer[]).idup; 943 else static if(is(T == enum)) { 944 switch(t) { 945 foreach(memberName; __traits(allMembers, T)) { 946 case __traits(getMember, T, memberName): 947 return memberName; 948 } 949 default: 950 return "<unknown>"; 951 } 952 } else { 953 import std.conv; 954 return to!string(t); 955 } 956 } 957 958 /++ 959 960 +/ 961 string flagsToString(Flags)(ulong value) { 962 string r; 963 964 void add(string memberName) { 965 if(r.length) 966 r ~= " | "; 967 r ~= memberName; 968 } 969 970 string none = "<none>"; 971 972 foreach(memberName; __traits(allMembers, Flags)) { 973 auto flag = cast(ulong) __traits(getMember, Flags, memberName); 974 if(flag) { 975 if((value & flag) == flag) 976 add(memberName); 977 } else { 978 none = memberName; 979 } 980 } 981 982 if(r.length == 0) 983 r = none; 984 985 return r; 986 } 987 988 unittest { 989 enum MyFlags { 990 none = 0, 991 a = 1, 992 b = 2 993 } 994 995 assert(flagsToString!MyFlags(3) == "a | b"); 996 assert(flagsToString!MyFlags(0) == "none"); 997 assert(flagsToString!MyFlags(2) == "b"); 998 } 999 1000 /++ 1001 This populates a struct from a list of values (or other expressions, but it only looks at the values) based on types of the members, with one exception: `bool` members.. maybe. 1002 1003 It is intended for collecting a record of relevant UDAs off a symbol in a single call like this: 1004 1005 --- 1006 struct Name { 1007 string n; 1008 } 1009 1010 struct Validator { 1011 string regex; 1012 } 1013 1014 struct FormInfo { 1015 Name name; 1016 Validator validator; 1017 } 1018 1019 @Name("foo") @Validator(".*") 1020 void foo() {} 1021 1022 auto info = populateFromUdas!(FormInfo, __traits(getAttributes, foo)); 1023 assert(info.name == Name("foo")); 1024 assert(info.validator == Validator(".*")); 1025 --- 1026 1027 Note that instead of UDAs, you can also pass a variadic argument list and get the same result, but the function is `populateFromArgs` and you pass them as the runtime list to bypass "args cannot be evaluated at compile time" errors: 1028 1029 --- 1030 void foo(T...)(T t) { 1031 auto info = populateFromArgs!(FormInfo)(t); 1032 // assuming the call below 1033 assert(info.name == Name("foo")); 1034 assert(info.validator == Validator(".*")); 1035 } 1036 1037 foo(Name("foo"), Validator(".*")); 1038 --- 1039 1040 The benefit of this over constructing the struct directly is that the arguments can be reordered or missing. Its value is diminished with named arguments in the language. 1041 +/ 1042 template populateFromUdas(Struct, UDAs...) { 1043 enum Struct populateFromUdas = () { 1044 Struct ret; 1045 foreach(memberName; __traits(allMembers, Struct)) { 1046 alias memberType = typeof(__traits(getMember, Struct, memberName)); 1047 foreach(uda; UDAs) { 1048 static if(is(memberType == PresenceOf!a, a)) { 1049 static if(__traits(isSame, a, uda)) 1050 __traits(getMember, ret, memberName) = true; 1051 } 1052 else 1053 static if(is(typeof(uda) : memberType)) { 1054 __traits(getMember, ret, memberName) = uda; 1055 } 1056 } 1057 } 1058 1059 return ret; 1060 }(); 1061 } 1062 1063 /// ditto 1064 Struct populateFromArgs(Struct, Args...)(Args args) { 1065 Struct ret; 1066 foreach(memberName; __traits(allMembers, Struct)) { 1067 alias memberType = typeof(__traits(getMember, Struct, memberName)); 1068 foreach(arg; args) { 1069 static if(is(typeof(arg == memberType))) { 1070 __traits(getMember, ret, memberName) = arg; 1071 } 1072 } 1073 } 1074 1075 return ret; 1076 } 1077 1078 /// ditto 1079 struct PresenceOf(alias a) { 1080 bool there; 1081 alias there this; 1082 } 1083 1084 /// 1085 unittest { 1086 enum a; 1087 enum b; 1088 struct Name { string name; } 1089 struct Info { 1090 Name n; 1091 PresenceOf!a athere; 1092 PresenceOf!b bthere; 1093 int c; 1094 } 1095 1096 void test() @a @Name("test") {} 1097 1098 auto info = populateFromUdas!(Info, __traits(getAttributes, test)); 1099 assert(info.n == Name("test")); // but present ones are in there 1100 assert(info.athere == true); // non-values can be tested with PresenceOf!it, which works like a bool 1101 assert(info.bthere == false); 1102 assert(info.c == 0); // absent thing will keep the default value 1103 } 1104 1105 /++ 1106 Declares a delegate property with several setters to allow for handlers that don't care about the arguments. 1107 1108 Throughout the arsd library, you will often see types of these to indicate that you can set listeners with or without arguments. If you care about the details of the callback event, you can set a delegate that declares them. And if you don't, you can set one that doesn't even declare them and it will be ignored. 1109 +/ 1110 struct FlexibleDelegate(DelegateType) { 1111 // please note that Parameters and ReturnType are public now! 1112 static if(is(DelegateType FunctionType == delegate)) 1113 static if(is(FunctionType Parameters == __parameters)) 1114 static if(is(DelegateType ReturnType == return)) { 1115 1116 /++ 1117 Calls the currently set delegate. 1118 1119 Diagnostics: 1120 If the callback delegate has not been set, this may cause a null pointer dereference. 1121 +/ 1122 ReturnType opCall(Parameters args) { 1123 return dg(args); 1124 } 1125 1126 /++ 1127 Use `if(thing)` to check if the delegate is null or not. 1128 +/ 1129 bool opCast(T : bool)() { 1130 return dg !is null; 1131 } 1132 1133 /++ 1134 These opAssign overloads are what puts the flexibility in the flexible delegate. 1135 1136 Bugs: 1137 The other overloads do not keep attributes like `nothrow` on the `dg` parameter, making them unusable if `DelegateType` requires them. I consider the attributes more trouble than they're worth anyway, and the language's poor support for composing them doesn't help any. I have no need for them and thus no plans to add them in the overloads at this time. 1138 +/ 1139 void opAssign(DelegateType dg) { 1140 this.dg = dg; 1141 } 1142 1143 /// ditto 1144 void opAssign(ReturnType delegate() dg) { 1145 this.dg = (Parameters ignored) => dg(); 1146 } 1147 1148 /// ditto 1149 void opAssign(ReturnType function(Parameters params) dg) { 1150 this.dg = (Parameters params) => dg(params); 1151 } 1152 1153 /// ditto 1154 void opAssign(ReturnType function() dg) { 1155 this.dg = (Parameters ignored) => dg(); 1156 } 1157 1158 /// ditto 1159 void opAssign(typeof(null) explicitNull) { 1160 this.dg = null; 1161 } 1162 1163 private DelegateType dg; 1164 } 1165 else static assert(0, DelegateType.stringof ~ " failed return value check"); 1166 else static assert(0, DelegateType.stringof ~ " failed parameters check"); 1167 else static assert(0, DelegateType.stringof ~ " failed delegate check"); 1168 } 1169 1170 /++ 1171 1172 +/ 1173 unittest { 1174 // you don't have to put the arguments in a struct, but i recommend 1175 // you do as it is more future proof - you can add more info to the 1176 // struct without breaking user code that consumes it. 1177 struct MyEventArguments { 1178 1179 } 1180 1181 // then you declare it just adding FlexibleDelegate!() around the 1182 // plain delegate type you'd normally use 1183 FlexibleDelegate!(void delegate(MyEventArguments args)) callback; 1184 1185 // until you set it, it will be null and thus be false in any boolean check 1186 assert(!callback); 1187 1188 // can set it to the properly typed thing 1189 callback = delegate(MyEventArguments args) {}; 1190 1191 // and now it is no longer null 1192 assert(callback); 1193 1194 // or if you don't care about the args, you can leave them off 1195 callback = () {}; 1196 1197 // and it works if the compiler types you as a function instead of delegate too 1198 // (which happens automatically if you don't access any local state or if you 1199 // explicitly define it as a function) 1200 1201 callback = function(MyEventArguments args) { }; 1202 1203 // can set it back to null explicitly if you ever wanted 1204 callback = null; 1205 1206 // the reflection info used internally also happens to be exposed publicly 1207 // which can actually sometimes be nice so if the language changes, i'll change 1208 // the code to keep this working. 1209 static assert(is(callback.ReturnType == void)); 1210 1211 // which can be convenient if the params is an annoying type since you can 1212 // consistently use something like this too 1213 callback = (callback.Parameters params) {}; 1214 1215 // check for null and call it pretty normally 1216 if(callback) 1217 callback(MyEventArguments()); 1218 } 1219 1220 /+ 1221 ====================== 1222 ERROR HANDLING HELPERS 1223 ====================== 1224 +/ 1225 1226 /+ + 1227 arsd code shouldn't be using Exception. Really, I don't think any code should be - instead, construct an appropriate object with structured information. 1228 1229 If you want to catch someone else's Exception, use `catch(object.Exception e)`. 1230 +/ 1231 //package deprecated struct Exception {} 1232 1233 1234 /++ 1235 Base class representing my exceptions. You should almost never work with this directly, but you might catch it as a generic thing. Catch it before generic `object.Exception` or `object.Throwable` in any catch chains. 1236 1237 1238 $(H3 General guidelines for exceptions) 1239 1240 The purpose of an exception is to cancel a task that has proven to be impossible and give the programmer enough information to use at a higher level to decide what to do about it. 1241 1242 Cancelling a task is accomplished with the `throw` keyword. The transmission of information to a higher level is done by the language runtime. The decision point is marked by the `catch` keyword. The part missing - the job of the `Exception` class you construct and throw - is to gather the information that will be useful at a later decision point. 1243 1244 It is thus important that you gather as much useful information as possible and keep it in a way that the code catching the exception can still interpret it when constructing an exception. Other concerns are secondary to this to this primary goal. 1245 1246 With this in mind, here's some guidelines for exception handling in arsd code. 1247 1248 $(H4 Allocations and lifetimes) 1249 1250 Don't get clever with exception allocations. You don't know what the catcher is going to do with an exception and you don't want the error handling scheme to introduce its own tricky bugs. Remember, an exception object's first job is to deliver useful information up the call chain in a way this code can use it. You don't know what this code is or what it is going to do. 1251 1252 Keep your memory management schemes simple and let the garbage collector do its job. 1253 1254 $(LIST 1255 * All thrown exceptions should be allocated with the `new` keyword. 1256 1257 * Members inside the exception should be value types or have infinite lifetime (that is, be GC managed). 1258 1259 * While this document is concerned with throwing, you might want to add additional information to an in-flight exception, and this is done by catching, so you need to know how that works too, and there is a global compiler switch that can change things, so even inside arsd we can't completely avoid its implications. 1260 1261 DIP1008's presence complicates things a bit on the catch side - if you catch an exception and return it from a function, remember to `ex.refcount = ex.refcount + 1;` so you don't introduce more use-after-free woes for those unfortunate souls. 1262 ) 1263 1264 $(H4 Error strings) 1265 1266 Strings can deliver useful information to people reading the message, but are often suboptimal for delivering useful information to other chunks of code. Remember, an exception's first job is to be caught by another block of code. Printing to users is a last resort; even if you want a user-readable error message, an exception is not the ideal way to deliver one since it is constructed in the guts of a failed task, without the higher level context of what the user was actually trying to do. User error messages ought to be made from information in the exception, combined with higher level knowledge. This is best done in a `catch` block, not a `throw` statement. 1267 1268 As such, I recommend that you: 1269 1270 $(LIST 1271 * Don't concatenate error strings at the throw site. Instead, pass the data you would have used to build the string as actual data to the constructor. This lets catchers see the original data without having to try to extract it from a string. For unique data, you will likely need a unique exception type. More on this in the next section. 1272 1273 * Don't construct error strings in a constructor either, for the same reason. Pass the useful data up the call chain, as exception members, to the maximum extent possible. Exception: if you are passed some data with a temporary lifetime that is important enough to pass up the chain. You may `.idup` or `to!string` to preserve as much data as you can before it is lost, but still store it in a separate member of the Exception subclass object. 1274 1275 * $(I Do) construct strings out of public members in [getAdditionalPrintableInformation]. When this is called, the user has requested as much relevant information as reasonable in string format. Still, avoid concatenation - it lets you pass as many key/value pairs as you like to the caller. They can concatenate as needed. However, note the words "public members" - everything you do in `getAdditionalPrintableInformation` ought to also be possible for code that caught your exception via your public methods and properties. 1276 ) 1277 1278 $(H4 Subclasses) 1279 1280 Any exception with unique data types should be a unique class. Whenever practical, this should be one you write and document at the top-level of a module. But I know we get lazy - me too - and this is why in standard D we'd often fall back to `throw new Exception("some string " ~ some info)`. To help resist these urges, I offer some helper functions to use instead that better achieve the key goal of exceptions - passing structured data up a call chain - while still being convenient to write. 1281 1282 See: [ArsdException], [Win32Enforce] 1283 1284 +/ 1285 class ArsdExceptionBase : object.Exception { 1286 /++ 1287 Don't call this except from other exceptions; this is essentially an abstract class. 1288 1289 Params: 1290 operation = the specific operation that failed, throwing the exception 1291 +/ 1292 package this(string operation, string file = __FILE__, size_t line = __LINE__, Throwable next = null) { 1293 super(operation, file, line, next); 1294 } 1295 1296 /++ 1297 The toString method will print out several components: 1298 1299 $(LIST 1300 * The file, line, static message, and object class name from the constructor. You can access these independently with the members `file`, `line`, `msg`, and [printableExceptionName]. 1301 * The generic category codes stored with this exception 1302 * Additional members stored with the exception child classes (e.g. platform error codes, associated function arguments) 1303 * The stack trace associated with the exception. You can access these lines independently with `foreach` over the `info` member. 1304 ) 1305 1306 This is meant to be read by the developer, not end users. You should wrap your user-relevant tasks in a try/catch block and construct more appropriate error messages from context available there, using the individual properties of the exception to add richness. 1307 +/ 1308 final override void toString(scope void delegate(in char[]) sink) const { 1309 // class name and info from constructor 1310 sink(printableExceptionName); 1311 sink("@"); 1312 sink(file); 1313 sink("("); 1314 char[16] buffer; 1315 sink(intToString(line, buffer[])); 1316 sink("): "); 1317 sink(message); 1318 1319 getAdditionalPrintableInformation((string name, in char[] value) { 1320 sink("\n"); 1321 sink(name); 1322 sink(": "); 1323 sink(value); 1324 }); 1325 1326 // full stack trace 1327 sink("\n----------------\n"); 1328 foreach(str; info) { 1329 sink(str); 1330 sink("\n"); 1331 } 1332 } 1333 /// ditto 1334 final override string toString() { 1335 string s; 1336 toString((in char[] chunk) { s ~= chunk; }); 1337 return s; 1338 } 1339 1340 /++ 1341 Users might like to see additional information with the exception. API consumers should pull this out of properties on your child class, but the parent class might not be able to deal with the arbitrary types at runtime the children can introduce, so bringing them all down to strings simplifies that. 1342 1343 Overrides should always call `super.getAdditionalPrintableInformation(sink);` before adding additional information by calling the sink with other arguments afterward. 1344 1345 You should spare no expense in preparing this information - translate error codes, build rich strings, whatever it takes - to make the information here useful to the reader. 1346 +/ 1347 void getAdditionalPrintableInformation(scope void delegate(string name, in char[] value) sink) const { 1348 1349 } 1350 1351 /++ 1352 This is the name of the exception class, suitable for printing. This should be static data (e.g. a string literal). Override it in subclasses. 1353 +/ 1354 string printableExceptionName() const { 1355 return typeid(this).name; 1356 } 1357 1358 /// deliberately hiding `Throwable.msg`. Use [message] and [toString] instead. 1359 @disable final void msg() {} 1360 1361 override const(char)[] message() const { 1362 return super.msg; 1363 } 1364 } 1365 1366 /++ 1367 1368 +/ 1369 class InvalidArgumentsException : ArsdExceptionBase { 1370 static struct InvalidArgument { 1371 string name; 1372 string description; 1373 LimitedVariant givenValue; 1374 } 1375 1376 InvalidArgument[] invalidArguments; 1377 1378 this(InvalidArgument[] invalidArguments, string functionName = __PRETTY_FUNCTION__, string file = __FILE__, size_t line = __LINE__, Throwable next = null) { 1379 this.invalidArguments = invalidArguments; 1380 super(functionName, file, line, next); 1381 } 1382 1383 this(string argumentName, string argumentDescription, LimitedVariant givenArgumentValue = LimitedVariant.init, string functionName = __PRETTY_FUNCTION__, string file = __FILE__, size_t line = __LINE__, Throwable next = null) { 1384 this([ 1385 InvalidArgument(argumentName, argumentDescription, givenArgumentValue) 1386 ], functionName, file, line, next); 1387 } 1388 1389 this(string argumentName, string argumentDescription, string functionName = __PRETTY_FUNCTION__, string file = __FILE__, size_t line = __LINE__, Throwable next = null) { 1390 this(argumentName, argumentDescription, LimitedVariant.init, functionName, file, line, next); 1391 } 1392 1393 override void getAdditionalPrintableInformation(scope void delegate(string name, in char[] value) sink) const { 1394 // FIXME: print the details better 1395 foreach(arg; invalidArguments) 1396 sink("invalidArguments[]", arg.name ~ " " ~ arg.description); 1397 } 1398 } 1399 1400 /++ 1401 Base class for when you've requested a feature that is not available. It may not be available because it is possible, but not yet implemented, or it might be because it is impossible on your operating system. 1402 +/ 1403 class FeatureUnavailableException : ArsdExceptionBase { 1404 this(string featureName = __PRETTY_FUNCTION__, string file = __FILE__, size_t line = __LINE__, Throwable next = null) { 1405 super(featureName, file, line, next); 1406 } 1407 } 1408 1409 /++ 1410 This means the feature could be done, but I haven't gotten around to implementing it yet. If you email me, I might be able to add it somewhat quickly and get back to you. 1411 +/ 1412 class NotYetImplementedException : FeatureUnavailableException { 1413 this(string featureName = __PRETTY_FUNCTION__, string file = __FILE__, size_t line = __LINE__, Throwable next = null) { 1414 super(featureName, file, line, next); 1415 } 1416 1417 } 1418 1419 /++ 1420 This means the feature is not supported by your current operating system. You might be able to get it in an update, but you might just have to find an alternate way of doing things. 1421 +/ 1422 class NotSupportedException : FeatureUnavailableException { 1423 this(string featureName, string file = __FILE__, size_t line = __LINE__, Throwable next = null) { 1424 super(featureName, file, line, next); 1425 } 1426 } 1427 1428 /++ 1429 This is a generic exception with attached arguments. It is used when I had to throw something but didn't want to write a new class. 1430 1431 You can catch an ArsdException to get its passed arguments out. 1432 1433 You can pass either a base class or a string as `Type`. 1434 1435 See the examples for how to use it. 1436 +/ 1437 template ArsdException(alias Type, DataTuple...) { 1438 static if(DataTuple.length) 1439 alias Parent = ArsdException!(Type, DataTuple[0 .. $-1]); 1440 else 1441 alias Parent = ArsdExceptionBase; 1442 1443 class ArsdException : Parent { 1444 DataTuple data; 1445 1446 this(DataTuple data, string file = __FILE__, size_t line = __LINE__) { 1447 this.data = data; 1448 static if(is(Parent == ArsdExceptionBase)) 1449 super(null, file, line); 1450 else 1451 super(data[0 .. $-1], file, line); 1452 } 1453 1454 static opCall(R...)(R r, string file = __FILE__, size_t line = __LINE__) { 1455 return new ArsdException!(Type, DataTuple, R)(r, file, line); 1456 } 1457 1458 override string printableExceptionName() const { 1459 static if(DataTuple.length) 1460 enum str = "ArsdException!(" ~ Type.stringof ~ ", " ~ DataTuple.stringof[1 .. $-1] ~ ")"; 1461 else 1462 enum str = "ArsdException!" ~ Type.stringof; 1463 return str; 1464 } 1465 1466 override void getAdditionalPrintableInformation(scope void delegate(string name, in char[] value) sink) const { 1467 ArsdExceptionBase.getAdditionalPrintableInformation(sink); 1468 1469 foreach(idx, datum; data) { 1470 enum int lol = cast(int) idx; 1471 enum key = "[" ~ lol.stringof ~ "] " ~ DataTuple[idx].stringof; 1472 sink(key, toStringInternal(datum)); 1473 } 1474 } 1475 } 1476 } 1477 1478 /// This example shows how you can throw and catch the ad-hoc exception types. 1479 unittest { 1480 // you can throw and catch by matching the string and argument types 1481 try { 1482 // throw it with parenthesis after the template args (it uses opCall to construct) 1483 throw ArsdException!"Test"(); 1484 // you could also `throw new ArsdException!"test";`, but that gets harder with args 1485 // as we'll see in the following example 1486 assert(0); // remove from docs 1487 } catch(ArsdException!"Test" e) { // catch it without them 1488 // this has no useful information except for the type 1489 // but you can catch it like this and it is still more than generic Exception 1490 } 1491 1492 // an exception's job is to deliver useful information up the chain 1493 // and you can do that easily by passing arguments: 1494 1495 try { 1496 throw ArsdException!"Test"(4, "four"); 1497 // you could also `throw new ArsdException!("Test", int, string)(4, "four")` 1498 // but now you start to see how the opCall convenience constructor simplifies things 1499 assert(0); // remove from docs 1500 } catch(ArsdException!("Test", int, string) e) { // catch it and use info by specifying types 1501 assert(e.data[0] == 4); // and extract arguments like this 1502 assert(e.data[1] == "four"); 1503 } 1504 1505 // a throw site can add additional information without breaking code that catches just some 1506 // generally speaking, each additional argument creates a new subclass on top of the previous args 1507 // so you can cast 1508 1509 try { 1510 throw ArsdException!"Test"(4, "four", 9); 1511 assert(0); // remove from docs 1512 } catch(ArsdException!("Test", int, string) e) { // this catch still works 1513 assert(e.data[0] == 4); 1514 assert(e.data[1] == "four"); 1515 // but if you were to print it, all the members would be there 1516 // import std.stdio; writeln(e); // would show something like: 1517 /+ 1518 ArsdException!("Test", int, string, int)@file.d(line): 1519 [0] int: 4 1520 [1] string: four 1521 [2] int: 9 1522 +/ 1523 // indicating that there's additional information available if you wanted to process it 1524 1525 // and meanwhile: 1526 ArsdException!("Test", int) e2 = e; // this implicit cast works thanks to the parent-child relationship 1527 ArsdException!"Test" e3 = e; // this works too, the base type/string still matches 1528 1529 // so catching those types would work too 1530 } 1531 } 1532 1533 /++ 1534 A tagged union that holds an error code from system apis, meaning one from Windows GetLastError() or C's errno. 1535 1536 You construct it with `SystemErrorCode(thing)` and the overloaded constructor tags and stores it. 1537 +/ 1538 struct SystemErrorCode { 1539 /// 1540 enum Type { 1541 errno, /// 1542 win32 /// 1543 } 1544 1545 const Type type; /// 1546 const int code; /// You should technically cast it back to DWORD if it is a win32 code 1547 1548 /++ 1549 C/unix error are typed as signed ints... 1550 Windows' errors are typed DWORD, aka unsigned... 1551 1552 so just passing them straight up will pick the right overload here to set the tag. 1553 +/ 1554 this(int errno) { 1555 this.type = Type.errno; 1556 this.code = errno; 1557 } 1558 1559 /// ditto 1560 this(uint win32) { 1561 this.type = Type.win32; 1562 this.code = win32; 1563 } 1564 1565 /++ 1566 Returns if the code indicated success. 1567 1568 Please note that many calls do not actually set a code to success, but rather just don't touch it. Thus this may only be true on `init`. 1569 +/ 1570 bool wasSuccessful() const { 1571 final switch(type) { 1572 case Type.errno: 1573 return this.code == 0; 1574 case Type.win32: 1575 return this.code == 0; 1576 } 1577 } 1578 1579 /++ 1580 Constructs a string containing both the code and the explanation string. 1581 +/ 1582 string toString() const { 1583 return codeAsString ~ " " ~ errorString; 1584 } 1585 1586 /++ 1587 The numeric code itself as a string. 1588 1589 See [errorString] for a text explanation of the code. 1590 +/ 1591 string codeAsString() const { 1592 char[16] buffer; 1593 final switch(type) { 1594 case Type.errno: 1595 return intToString(code, buffer[]).idup; 1596 case Type.win32: 1597 buffer[0 .. 2] = "0x"; 1598 return buffer[0 .. 2 + intToString(code, buffer[2 .. $], IntToStringArgs().withRadix(16).withPadding(8)).length].idup; 1599 } 1600 } 1601 1602 /++ 1603 A text explanation of the code. See [codeAsString] for a string representation of the numeric representation. 1604 +/ 1605 string errorString() const { 1606 final switch(type) { 1607 case Type.errno: 1608 import core.stdc.string; 1609 auto strptr = strerror(code); 1610 auto orig = strptr; 1611 int len; 1612 while(*strptr++) { 1613 len++; 1614 } 1615 1616 return orig[0 .. len].idup; 1617 case Type.win32: 1618 version(Windows) { 1619 wchar[256] buffer; 1620 auto size = FormatMessageW( 1621 FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, 1622 null, 1623 code, 1624 MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), 1625 buffer.ptr, 1626 buffer.length, 1627 null 1628 ); 1629 1630 return makeUtf8StringFromWindowsString(buffer[0 .. size]).stripInternal; 1631 } else { 1632 return null; 1633 } 1634 } 1635 } 1636 } 1637 1638 /++ 1639 1640 +/ 1641 struct SavedArgument { 1642 string name; 1643 LimitedVariant value; 1644 } 1645 1646 /++ 1647 1648 +/ 1649 class SystemApiException : ArsdExceptionBase { 1650 this(string msg, int originalErrorNo, scope SavedArgument[] args = null, string file = __FILE__, size_t line = __LINE__, Throwable next = null) { 1651 this(msg, SystemErrorCode(originalErrorNo), args, file, line, next); 1652 } 1653 1654 version(Windows) 1655 this(string msg, DWORD windowsError, scope SavedArgument[] args = null, string file = __FILE__, size_t line = __LINE__, Throwable next = null) { 1656 this(msg, SystemErrorCode(windowsError), args, file, line, next); 1657 } 1658 1659 this(string msg, SystemErrorCode code, SavedArgument[] args = null, string file = __FILE__, size_t line = __LINE__, Throwable next = null) { 1660 this.errorCode = code; 1661 1662 // discard stuff that won't fit 1663 if(args.length > this.args.length) 1664 args = args[0 .. this.args.length]; 1665 1666 this.args[0 .. args.length] = args[]; 1667 1668 super(msg, file, line, next); 1669 } 1670 1671 /++ 1672 1673 +/ 1674 const SystemErrorCode errorCode; 1675 1676 /++ 1677 1678 +/ 1679 const SavedArgument[8] args; 1680 1681 override void getAdditionalPrintableInformation(scope void delegate(string name, in char[] value) sink) const { 1682 super.getAdditionalPrintableInformation(sink); 1683 sink("Error code", errorCode.toString()); 1684 1685 foreach(arg; args) 1686 if(arg.name !is null) 1687 sink(arg.name, arg.value.toString()); 1688 } 1689 1690 } 1691 1692 /++ 1693 The low level use of this would look like `throw new WindowsApiException("MsgWaitForMultipleObjectsEx", GetLastError())` but it is meant to be used from higher level things like [Win32Enforce]. 1694 1695 History: 1696 Moved from simpledisplay.d to core.d in March 2023 (dub v11.0). 1697 +/ 1698 alias WindowsApiException = SystemApiException; 1699 1700 /++ 1701 History: 1702 Moved from simpledisplay.d to core.d in March 2023 (dub v11.0). 1703 +/ 1704 alias ErrnoApiException = SystemApiException; 1705 1706 /++ 1707 Calls the C API function `fn`. If it returns an error value, it throws an [ErrnoApiException] (or subclass) after getting `errno`. 1708 +/ 1709 template ErrnoEnforce(alias fn, alias errorValue = void) { 1710 static if(is(typeof(fn) Return == return)) 1711 static if(is(typeof(fn) Params == __parameters)) { 1712 static if(is(errorValue == void)) { 1713 static if(is(typeof(null) : Return)) 1714 enum errorValueToUse = null; 1715 else static if(is(Return : long)) 1716 enum errorValueToUse = -1; 1717 else 1718 static assert(0, "Please pass the error value"); 1719 } else { 1720 enum errorValueToUse = errorValue; 1721 } 1722 1723 Return ErrnoEnforce(Params params, ArgSentinel sentinel = ArgSentinel.init, string file = __FILE__, size_t line = __LINE__) { 1724 import core.stdc.errno; 1725 1726 Return value = fn(params); 1727 1728 if(value == errorValueToUse) { 1729 SavedArgument[] args; // FIXME 1730 /+ 1731 static foreach(idx; 0 .. Params.length) 1732 args ~= SavedArgument( 1733 __traits(identifier, Params[idx .. idx + 1]), 1734 params[idx] 1735 ); 1736 +/ 1737 throw new ErrnoApiException(__traits(identifier, fn), errno, args, file, line); 1738 } 1739 1740 return value; 1741 } 1742 } 1743 } 1744 1745 version(Windows) { 1746 /++ 1747 Calls the Windows API function `fn`. If it returns an error value, it throws a [WindowsApiException] (or subclass) after calling `GetLastError()`. 1748 +/ 1749 template Win32Enforce(alias fn, alias errorValue = void) { 1750 static if(is(typeof(fn) Return == return)) 1751 static if(is(typeof(fn) Params == __parameters)) { 1752 static if(is(errorValue == void)) { 1753 static if(is(Return == BOOL)) 1754 enum errorValueToUse = false; 1755 else static if(is(Return : HANDLE)) 1756 enum errorValueToUse = NULL; 1757 else static if(is(Return == DWORD)) 1758 enum errorValueToUse = cast(DWORD) 0xffffffff; 1759 else 1760 static assert(0, "Please pass the error value"); 1761 } else { 1762 enum errorValueToUse = errorValue; 1763 } 1764 1765 Return Win32Enforce(Params params, ArgSentinel sentinel = ArgSentinel.init, string file = __FILE__, size_t line = __LINE__) { 1766 Return value = fn(params); 1767 1768 if(value == errorValueToUse) { 1769 auto error = GetLastError(); 1770 SavedArgument[] args; // FIXME 1771 throw new WindowsApiException(__traits(identifier, fn), error, args, file, line); 1772 } 1773 1774 return value; 1775 } 1776 } 1777 } 1778 1779 } 1780 1781 /+ 1782 =============== 1783 EVENT LOOP CORE 1784 =============== 1785 +/ 1786 1787 /+ 1788 UI threads 1789 need to get window messages in addition to all the other jobs 1790 I/O Worker threads 1791 need to get commands for read/writes, run them, and send the reply back. not necessary on Windows 1792 if interrupted, check cancel flags. 1793 CPU Worker threads 1794 gets functions, runs them, send reply back. should send a cancel flag to periodically check 1795 Task worker threads 1796 runs fibers and multiplexes them 1797 1798 1799 General procedure: 1800 issue the read/write command 1801 if it would block on linux, epoll associate it. otherwise do the callback immediately 1802 1803 callbacks have default affinity to the current thread, meaning their callbacks always run here 1804 accepts can usually be dispatched to any available thread tho 1805 1806 // In other words, a single thread can be associated with, at most, one I/O completion port. 1807 1808 Realistically, IOCP only used if there is no thread affinity. If there is, just do overlapped w/ sleepex. 1809 1810 1811 case study: http server 1812 1813 1) main thread starts the server. it does an accept loop with no thread affinity. the main thread does NOT check the global queue (the iocp/global epoll) 1814 2) connections come in and are assigned to first available thread via the iocp/global epoll 1815 3) these run local event loops until the connection task is finished 1816 1817 EVENT LOOP TYPES: 1818 1) main ui thread - MsgWaitForMultipleObjectsEx / epoll on the local ui. it does NOT check the any worker thread thing! 1819 The main ui thread should never terminate until the program is ready to close. 1820 You can have additional ui threads in theory but im not really gonna support that in full; most things will assume there is just the one. simpledisplay's gui thread is the primary if it exists. (and sdpy will prolly continue to be threaded the way it is now) 1821 1822 The biggest complication is the TerminalDirectToEmulator, where the primary ui thread is NOT the thread that runs `main` 1823 2) worker thread GetQueuedCompletionStatusEx / epoll on the local thread fd and the global epoll fd 1824 3) local event loop - check local things only. SleepEx / epoll on local thread fd. This more of a compatibility hack for `waitForCompletion` outside a fiber. 1825 1826 i'll use: 1827 * QueueUserAPC to send interruptions to a worker thread 1828 * PostQueuedCompletionStatus is to send interruptions to any available thread. 1829 * PostMessage to a window 1830 * ??? to a fiber task 1831 1832 I also need a way to de-duplicate events in the queue so if you try to push the same thing it won't trigger multiple times.... I might want to keep a duplicate of the thing... really, what I'd do is post the "event wake up" message and keep the queue in my own thing. (WM_PAINT auto-coalesces) 1833 1834 Destructors need to be able to post messages back to a specific task to queue thread-affinity cleanup. This must be GC safe. 1835 1836 A task might want to wait on certain events. If the task is a fiber, it yields and gets called upon the event. If the task is a thread, it really has to call the event loop... which can be a loop of loops we want to avoid. `waitForCompletion` is more often gonna be used just to run the loop at top level tho... it might not even check for the global info availability so it'd run the local thing only. 1837 1838 APCs should not themselves enter an alterable wait cuz it can stack overflow. So generally speaking, they should avoid calling fibers or other event loops. 1839 +/ 1840 1841 /++ 1842 You can also pass a handle to a specific thread, if you have one. 1843 +/ 1844 enum ThreadToRunIn { 1845 /++ 1846 The callback should be only run by the same thread that set it. 1847 +/ 1848 CurrentThread, 1849 /++ 1850 The UI thread is a special one - it is the supervisor of the workers and the controller of gui and console handles. It is the first thread to call [arsd_core_init] actively running an event loop unless there is a thread that has actively asserted the ui supervisor role. FIXME is this true after i implemen it? 1851 1852 A ui thread should be always quickly responsive to new events. 1853 1854 There should only be one main ui thread, in which simpledisplay and minigui can be used. 1855 1856 Other threads can run like ui threads, but are considered temporary and only concerned with their own needs (it is the default style of loop 1857 for an undeclared thread but will not receive messages from other threads unless there is no other option) 1858 1859 1860 Ad-Hoc thread - something running an event loop that isn't another thing 1861 Controller thread - running an explicit event loop instance set as not a task runner or blocking worker 1862 UI thread - simpledisplay's event loop, which it will require remain live for the duration of the program (running two .eventLoops without a parent EventLoop instance will become illegal, throwing at runtime if it happens telling people to change their code) 1863 1864 Windows HANDLES will always be listened on the thread itself that is requesting, UNLESS it is a worker/helper thread, in which case it goes to a coordinator thread. since it prolly can't rely on the parent per se this will have to be one created by arsd core init, UNLESS the parent is inside an explicit EventLoop structure. 1865 1866 All use the MsgWaitForMultipleObjectsEx pattern 1867 1868 1869 +/ 1870 UiThread, 1871 /++ 1872 The callback can be called from any available worker thread. It will be added to a global queue and the first thread to see it will run it. 1873 1874 These will not run on the UI thread unless there is no other option on the platform (and all platforms this lib supports have other options). 1875 1876 These are expected to run cooperatively multitasked things; functions that frequently yield as they wait on other tasks. Think a fiber. 1877 1878 A task runner should be generally responsive to new events. 1879 +/ 1880 AnyAvailableTaskRunnerThread, 1881 /++ 1882 These are expected to run longer blocking, but independent operations. Think an individual function with no context. 1883 1884 A blocking worker can wait hundreds of milliseconds between checking for new events. 1885 +/ 1886 AnyAvailableBlockingWorkerThread, 1887 /++ 1888 The callback will be duplicated across all threads known to the arsd.core event loop. 1889 1890 It adds it to an immutable queue that each thread will go through... might just replace with an exit() function. 1891 1892 1893 so to cancel all associated tasks for like a web server, it could just have the tasks atomicAdd to a counter and subtract when they are finished. Then you have a single semaphore you signal the number of times you have an active thing and wait for them to acknowledge it. 1894 1895 threads should report when they start running the loop and they really should report when they terminate but that isn't reliable 1896 1897 1898 hmmm what if: all user-created threads (the public api) count as ui threads. only ones created in here are task runners or helpers. ui threads can wait on a global event to exit. 1899 1900 there's still prolly be one "the" ui thread, which does the handle listening on windows and is the one sdpy wants. 1901 +/ 1902 BroadcastToAllThreads, 1903 } 1904 1905 /++ 1906 Initializes the arsd core event loop and creates its worker threads. You don't actually have to call this, since the first use of an arsd.core function that requires it will call it implicitly, but calling it yourself gives you a chance to control the configuration more explicitly if you want to. 1907 +/ 1908 void arsd_core_init(int numberOfWorkers = 0) { 1909 1910 } 1911 1912 version(Windows) 1913 class WindowsHandleReader_ex { 1914 // Windows handles are always dispatched to the main ui thread, which can then send a command back to a worker thread to run the callback if needed 1915 this(HANDLE handle) {} 1916 } 1917 1918 version(Posix) 1919 class PosixFdReader_ex { 1920 // posix readers can just register with whatever instance we want to handle the callback 1921 } 1922 1923 /++ 1924 1925 +/ 1926 interface ICoreEventLoop { 1927 /++ 1928 Runs the event loop for this thread until the `until` delegate returns `true`. 1929 +/ 1930 final void run(scope bool delegate() until) { 1931 while(!until()) { 1932 runOnce(); 1933 } 1934 } 1935 1936 /++ 1937 Runs a single iteration of the event loop for this thread. It will return when the first thing happens, but that thing might be totally uninteresting to anyone, or it might trigger significant work you'll wait on. 1938 +/ 1939 void runOnce(); 1940 1941 // to send messages between threads, i'll queue up a function that just call dispatchMessage. can embed the arg inside the callback helper prolly. 1942 // tho i might prefer to actually do messages w/ run payloads so it is easier to deduplicate i can still dedupe by insepcting the call args so idk 1943 1944 version(Posix) { 1945 @mustuse 1946 static struct UnregisterToken { 1947 private CoreEventLoopImplementation impl; 1948 private int fd; 1949 private CallbackHelper cb; 1950 1951 /++ 1952 Unregisters the file descriptor from the event loop and releases the reference to the callback held by the event loop (which will probably free it). 1953 1954 You must call this when you're done. Normally, this will be right before you close the fd (Which is often after the other side closes it, meaning you got a 0 length read). 1955 +/ 1956 void unregister() { 1957 assert(impl !is null, "Cannot reuse unregister token"); 1958 1959 version(Arsd_core_epoll) { 1960 impl.unregisterFd(fd); 1961 } else version(Arsd_core_kqueue) { 1962 // intentionally blank - all registrations are one-shot there 1963 // FIXME: actually it might not have gone off yet, in that case we do need to delete the filter 1964 } else static assert(0); 1965 1966 cb.release(); 1967 this = typeof(this).init; 1968 } 1969 } 1970 1971 @mustuse 1972 static struct RearmToken { 1973 private bool readable; 1974 private CoreEventLoopImplementation impl; 1975 private int fd; 1976 private CallbackHelper cb; 1977 private uint flags; 1978 1979 /++ 1980 Calls [UnregisterToken.unregister] 1981 +/ 1982 void unregister() { 1983 assert(impl !is null, "cannot reuse rearm token after unregistering it"); 1984 1985 version(Arsd_core_epoll) { 1986 impl.unregisterFd(fd); 1987 } else version(Arsd_core_kqueue) { 1988 // intentionally blank - all registrations are one-shot there 1989 // FIXME: actually it might not have gone off yet, in that case we do need to delete the filter 1990 } else static assert(0); 1991 1992 cb.release(); 1993 this = typeof(this).init; 1994 } 1995 1996 /++ 1997 Rearms the event so you will get another callback next time it is ready. 1998 +/ 1999 void rearm() { 2000 assert(impl !is null, "cannot reuse rearm token after unregistering it"); 2001 impl.rearmFd(this); 2002 } 2003 } 2004 2005 UnregisterToken addCallbackOnFdReadable(int fd, CallbackHelper cb); 2006 RearmToken addCallbackOnFdReadableOneShot(int fd, CallbackHelper cb); 2007 RearmToken addCallbackOnFdWritableOneShot(int fd, CallbackHelper cb); 2008 } 2009 } 2010 2011 /++ 2012 Get the event loop associated with this thread 2013 +/ 2014 ICoreEventLoop getThisThreadEventLoop(EventLoopType type = EventLoopType.AdHoc) { 2015 static ICoreEventLoop loop; 2016 if(loop is null) 2017 loop = new CoreEventLoopImplementation(); 2018 return loop; 2019 } 2020 2021 /++ 2022 The internal types that will be exposed through other api things. 2023 +/ 2024 package(arsd) enum EventLoopType { 2025 /++ 2026 The event loop is being run temporarily and the thread doesn't promise to keep running it. 2027 +/ 2028 AdHoc, 2029 /++ 2030 The event loop struct has been instantiated at top level. Its destructor will run when the 2031 function exits, which is only at the end of the entire block of work it is responsible for. 2032 2033 It must be in scope for the whole time the arsd event loop functions are expected to be used 2034 (meaning it should generally be top-level in `main`) 2035 +/ 2036 Explicit, 2037 /++ 2038 A specialization of `Explicit`, so all the same rules apply there, but this is specifically the event loop coming from simpledisplay or minigui. It will run for the duration of the UI's existence. 2039 +/ 2040 Ui, 2041 /++ 2042 A special event loop specifically for threads that listen to the task runner queue and handle I/O events from running tasks. Typically, a task runner runs cooperatively multitasked coroutines (so they prefer not to block the whole thread). 2043 +/ 2044 TaskRunner, 2045 /++ 2046 A special event loop specifically for threads that listen to the helper function request queue. Helper functions are expected to run independently for a somewhat long time (them blocking the thread for some time is normal) and send a reply message back to the requester. 2047 +/ 2048 HelperWorker 2049 } 2050 2051 /+ 2052 Tasks are given an object to talk to their parent... can be a dialog where it is like 2053 2054 sendBuffer 2055 waitForWordToProceed 2056 2057 in a loop 2058 2059 2060 Tasks are assigned to a worker thread and may share it with other tasks. 2061 +/ 2062 2063 2064 // the GC may not be able to see this! remember, it can be hidden inside kernel buffers 2065 private class CallbackHelper { 2066 import core.memory; 2067 2068 void call() { 2069 if(callback) 2070 callback(); 2071 } 2072 2073 void delegate() callback; 2074 void*[3] argsStore; 2075 2076 void addref() { 2077 atomicOp!"+="(refcount, 1); 2078 } 2079 2080 void release() { 2081 if(atomicOp!"-="(refcount, 1) <= 0) { 2082 if(flags & 1) 2083 GC.removeRoot(cast(void*) this); 2084 } 2085 } 2086 2087 private shared(int) refcount; 2088 private uint flags; 2089 2090 this(void function() callback) { 2091 this( () { callback(); } ); 2092 } 2093 2094 this(void delegate() callback, bool addRoot = true) { 2095 if(addRoot) { 2096 GC.addRoot(cast(void*) this); 2097 this.flags |= 1; 2098 } 2099 2100 this.addref(); 2101 this.callback = callback; 2102 } 2103 } 2104 2105 /++ 2106 This represents a file. Technically, file paths aren't actually strings (for example, on Linux, they need not be valid utf-8, while a D string is supposed to be), even though we almost always use them like that. 2107 2108 This type is meant to represent a filename / path. I might not keep it around. 2109 +/ 2110 struct FilePath { 2111 string path; 2112 2113 bool isNull() { 2114 return path is null; 2115 } 2116 2117 bool opCast(T:bool)() { 2118 return !isNull; 2119 } 2120 2121 string toString() { 2122 return path; 2123 } 2124 2125 //alias toString this; 2126 } 2127 2128 /++ 2129 Represents a generic async, waitable request. 2130 +/ 2131 class AsyncOperationRequest { 2132 /++ 2133 Actually issues the request, starting the operation. 2134 +/ 2135 abstract void start(); 2136 /++ 2137 Cancels the request. This will cause `isComplete` to return true once the cancellation has been processed, but [AsyncOperationResponse.wasSuccessful] will return `false` (unless it completed before the cancellation was processed, in which case it is still allowed to finish successfully). 2138 2139 After cancelling a request, you should still wait for it to complete to ensure that the task has actually released its resources before doing anything else on it. 2140 2141 Once a cancellation request has been sent, it cannot be undone. 2142 +/ 2143 abstract void cancel(); 2144 2145 /++ 2146 Returns `true` if the operation has been completed. It may be completed successfully, cancelled, or have errored out - to check this, call [waitForCompletion] and check the members on the response object. 2147 +/ 2148 abstract bool isComplete(); 2149 /++ 2150 Waits until the request has completed - successfully or otherwise - and returns the response object. It will run an ad-hoc event loop that may call other callbacks while waiting. 2151 2152 The response object may be embedded in the request object - do not reuse the request until you are finished with the response and do not keep the response around longer than you keep the request. 2153 2154 2155 Note to implementers: all subclasses should override this and return their specific response object. You can use the top-level `waitForFirstToCompleteByIndex` function with a single-element static array to help with the implementation. 2156 +/ 2157 abstract AsyncOperationResponse waitForCompletion(); 2158 2159 /++ 2160 2161 +/ 2162 // abstract void repeat(); 2163 } 2164 2165 /++ 2166 2167 +/ 2168 interface AsyncOperationResponse { 2169 /++ 2170 Returns true if the request completed successfully, finishing what it was supposed to. 2171 2172 Should be set to `false` if the request was cancelled before completing or encountered an error. 2173 +/ 2174 bool wasSuccessful(); 2175 } 2176 2177 /++ 2178 It returns the $(I request) so you can identify it more easily. `request.waitForCompletion()` is guaranteed to return the response without any actual wait, since it is already complete when this function returns. 2179 2180 Please note that "completion" is not necessary successful completion; a request being cancelled or encountering an error also counts as it being completed. 2181 2182 The `waitForFirstToCompleteByIndex` version instead returns the index of the array entry that completed first. 2183 2184 It is your responsibility to remove the completed request from the array before calling the function again, since any request already completed will always be immediately returned. 2185 2186 You might prefer using [asTheyComplete], which will give each request as it completes and loop over until all of them are complete. 2187 2188 Returns: 2189 `null` or `requests.length` if none completed before returning. 2190 +/ 2191 AsyncOperationRequest waitForFirstToComplete(AsyncOperationRequest[] requests...) { 2192 auto idx = waitForFirstToCompleteByIndex(requests); 2193 if(idx == requests.length) 2194 return null; 2195 return requests[idx]; 2196 } 2197 /// ditto 2198 size_t waitForFirstToCompleteByIndex(AsyncOperationRequest[] requests...) { 2199 size_t helper() { 2200 foreach(idx, request; requests) 2201 if(request.isComplete()) 2202 return idx; 2203 return requests.length; 2204 } 2205 2206 auto idx = helper(); 2207 // if one is already done, return it 2208 if(idx != requests.length) 2209 return idx; 2210 2211 // otherwise, run the ad-hoc event loop until one is 2212 // FIXME: what if we are inside a fiber? 2213 auto el = getThisThreadEventLoop(); 2214 el.run(() => (idx = helper()) != requests.length); 2215 2216 return idx; 2217 } 2218 2219 /++ 2220 Waits for all the `requests` to complete, giving each one through the range interface as it completes. 2221 2222 This meant to be used in a foreach loop. 2223 2224 The `requests` array and its contents must remain valid for the lifetime of the returned range. Its contents may be shuffled as the requests complete (the implementation works through an unstable sort+remove). 2225 +/ 2226 AsTheyCompleteRange asTheyComplete(AsyncOperationRequest[] requests...) { 2227 return AsTheyCompleteRange(requests); 2228 } 2229 /// ditto 2230 struct AsTheyCompleteRange { 2231 AsyncOperationRequest[] requests; 2232 2233 this(AsyncOperationRequest[] requests) { 2234 this.requests = requests; 2235 2236 if(requests.length == 0) 2237 return; 2238 2239 // wait for first one to complete, then move it to the front of the array 2240 moveFirstCompleteToFront(); 2241 } 2242 2243 private void moveFirstCompleteToFront() { 2244 auto idx = waitForFirstToCompleteByIndex(requests); 2245 2246 auto tmp = requests[0]; 2247 requests[0] = requests[idx]; 2248 requests[idx] = tmp; 2249 } 2250 2251 bool empty() { 2252 return requests.length == 0; 2253 } 2254 2255 void popFront() { 2256 assert(!empty); 2257 /+ 2258 this needs to 2259 1) remove the front of the array as being already processed (unless it is the initial priming call) 2260 2) wait for one of them to complete 2261 3) move the complete one to the front of the array 2262 +/ 2263 2264 requests[0] = requests[$-1]; 2265 requests = requests[0 .. $-1]; 2266 2267 if(requests.length) 2268 moveFirstCompleteToFront(); 2269 } 2270 2271 AsyncOperationRequest front() { 2272 return requests[0]; 2273 } 2274 } 2275 2276 version(Windows) { 2277 alias NativeFileHandle = HANDLE; /// 2278 alias NativeSocketHandle = SOCKET; /// 2279 alias NativePipeHandle = HANDLE; /// 2280 } else version(Posix) { 2281 alias NativeFileHandle = int; /// 2282 alias NativeSocketHandle = int; /// 2283 alias NativePipeHandle = int; /// 2284 } 2285 2286 /++ 2287 An `AbstractFile` represents a file handle on the operating system level. You cannot do much with it. 2288 +/ 2289 class AbstractFile { 2290 private { 2291 NativeFileHandle handle; 2292 } 2293 2294 /++ 2295 +/ 2296 enum OpenMode { 2297 readOnly, /// C's "r", the file is read 2298 writeWithTruncation, /// C's "w", the file is blanked upon opening so it only holds what you write 2299 appendOnly, /// C's "a", writes will always be appended to the file 2300 readAndWrite /// C's "r+", writes will overwrite existing parts of the file based on where you seek (default is at the beginning) 2301 } 2302 2303 /++ 2304 +/ 2305 enum RequirePreexisting { 2306 no, 2307 yes 2308 } 2309 2310 /+ 2311 enum SpecialFlags { 2312 randomAccessExpected, /// FILE_FLAG_SEQUENTIAL_SCAN is turned off and posix_fadvise(POSIX_FADV_SEQUENTIAL) 2313 skipCache, /// O_DSYNC, FILE_FLAG_NO_BUFFERING and maybe WRITE_THROUGH. note that metadata still goes through the cache, FlushFileBuffers and fsync can still do those 2314 temporary, /// FILE_ATTRIBUTE_TEMPORARY on Windows, idk how to specify on linux. also FILE_FLAG_DELETE_ON_CLOSE can be combined to make a (almost) all memory file. kinda like a private anonymous mmap i believe. 2315 deleteWhenClosed, /// Windows has a flag for this but idk if it is of any real use 2316 async, /// open it in overlapped mode, all reads and writes must then provide an offset. Only implemented on Windows 2317 } 2318 +/ 2319 2320 /++ 2321 2322 +/ 2323 protected this(bool async, FilePath filename, OpenMode mode = OpenMode.readOnly, RequirePreexisting require = RequirePreexisting.no, uint specialFlags = 0) { 2324 version(Windows) { 2325 DWORD access; 2326 DWORD creation; 2327 2328 final switch(mode) { 2329 case OpenMode.readOnly: 2330 access = GENERIC_READ; 2331 creation = OPEN_EXISTING; 2332 break; 2333 case OpenMode.writeWithTruncation: 2334 access = GENERIC_WRITE; 2335 2336 final switch(require) { 2337 case RequirePreexisting.no: 2338 creation = CREATE_ALWAYS; 2339 break; 2340 case RequirePreexisting.yes: 2341 creation = TRUNCATE_EXISTING; 2342 break; 2343 } 2344 break; 2345 case OpenMode.appendOnly: 2346 access = FILE_APPEND_DATA; 2347 2348 final switch(require) { 2349 case RequirePreexisting.no: 2350 creation = CREATE_ALWAYS; 2351 break; 2352 case RequirePreexisting.yes: 2353 creation = OPEN_EXISTING; 2354 break; 2355 } 2356 break; 2357 case OpenMode.readAndWrite: 2358 access = GENERIC_READ | GENERIC_WRITE; 2359 2360 final switch(require) { 2361 case RequirePreexisting.no: 2362 creation = CREATE_NEW; 2363 break; 2364 case RequirePreexisting.yes: 2365 creation = OPEN_EXISTING; 2366 break; 2367 } 2368 break; 2369 } 2370 2371 WCharzBuffer wname = WCharzBuffer(filename.path); 2372 2373 auto handle = CreateFileW( 2374 wname.ptr, 2375 access, 2376 FILE_SHARE_READ, 2377 null, 2378 creation, 2379 FILE_ATTRIBUTE_NORMAL | (async ? FILE_FLAG_OVERLAPPED : 0), 2380 null 2381 ); 2382 2383 if(handle == INVALID_HANDLE_VALUE) { 2384 // FIXME: throw the filename and other params here too 2385 SavedArgument[3] args; 2386 args[0] = SavedArgument("filename", LimitedVariant(filename.path)); 2387 args[1] = SavedArgument("access", LimitedVariant(access, 2)); 2388 args[2] = SavedArgument("requirePreexisting", LimitedVariant(require == RequirePreexisting.yes)); 2389 throw new WindowsApiException("CreateFileW", GetLastError(), args[]); 2390 } 2391 2392 this.handle = handle; 2393 } else version(Posix) { 2394 import core.sys.posix.unistd; 2395 import core.sys.posix.fcntl; 2396 2397 CharzBuffer namez = CharzBuffer(filename.path); 2398 int flags; 2399 2400 // FIXME does mac not have cloexec for real or is this just a druntime problem????? 2401 version(Arsd_core_has_cloexec) { 2402 flags = O_CLOEXEC; 2403 } else { 2404 scope(success) 2405 setCloExec(this.handle); 2406 } 2407 2408 if(async) 2409 flags |= O_NONBLOCK; 2410 2411 final switch(mode) { 2412 case OpenMode.readOnly: 2413 flags |= O_RDONLY; 2414 break; 2415 case OpenMode.writeWithTruncation: 2416 flags |= O_WRONLY | O_TRUNC; 2417 2418 final switch(require) { 2419 case RequirePreexisting.no: 2420 flags |= O_CREAT; 2421 break; 2422 case RequirePreexisting.yes: 2423 break; 2424 } 2425 break; 2426 case OpenMode.appendOnly: 2427 flags |= O_APPEND; 2428 2429 final switch(require) { 2430 case RequirePreexisting.no: 2431 flags |= O_CREAT; 2432 break; 2433 case RequirePreexisting.yes: 2434 break; 2435 } 2436 break; 2437 case OpenMode.readAndWrite: 2438 flags |= O_RDWR; 2439 2440 final switch(require) { 2441 case RequirePreexisting.no: 2442 flags |= O_CREAT; 2443 break; 2444 case RequirePreexisting.yes: 2445 break; 2446 } 2447 break; 2448 } 2449 2450 auto perms = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH; 2451 int fd = open(namez.ptr, flags, perms); 2452 if(fd == -1) { 2453 SavedArgument[3] args; 2454 args[0] = SavedArgument("filename", LimitedVariant(filename.path)); 2455 args[1] = SavedArgument("flags", LimitedVariant(flags, 2)); 2456 args[2] = SavedArgument("perms", LimitedVariant(perms, 8)); 2457 throw new ErrnoApiException("open", errno, args[]); 2458 } 2459 2460 this.handle = fd; 2461 } 2462 } 2463 2464 /++ 2465 2466 +/ 2467 private this(NativeFileHandle handleToWrap) { 2468 this.handle = handleToWrap; 2469 } 2470 2471 // only available on some types of file 2472 long size() { return 0; } 2473 2474 // note that there is no fsync thing, instead use the special flag. 2475 2476 /++ 2477 2478 +/ 2479 void close() { 2480 version(Windows) { 2481 Win32Enforce!CloseHandle(handle); 2482 handle = null; 2483 } else version(Posix) { 2484 import unix = core.sys.posix.unistd; 2485 import core.sys.posix.fcntl; 2486 2487 ErrnoEnforce!(unix.close)(handle); 2488 handle = -1; 2489 } 2490 } 2491 } 2492 2493 /++ 2494 2495 +/ 2496 class File : AbstractFile { 2497 2498 /++ 2499 Opens a file in synchronous access mode. 2500 2501 The permission mask is on used on posix systems FIXME: implement it 2502 +/ 2503 this(FilePath filename, OpenMode mode = OpenMode.readOnly, RequirePreexisting require = RequirePreexisting.no, uint specialFlags = 0, uint permMask = 0) { 2504 super(false, filename, mode, require, specialFlags); 2505 } 2506 2507 /++ 2508 2509 +/ 2510 ubyte[] read(scope ubyte[] buffer) { 2511 return null; 2512 } 2513 2514 /++ 2515 2516 +/ 2517 void write(in void[] buffer) { 2518 } 2519 2520 enum Seek { 2521 current, 2522 fromBeginning, 2523 fromEnd 2524 } 2525 2526 // Seeking/telling/sizing is not permitted when appending and some files don't support it 2527 // also not permitted in async mode 2528 void seek(long where, Seek fromWhence) {} 2529 long tell() { return 0; } 2530 } 2531 2532 /++ 2533 Only one operation can be pending at any time in the current implementation. 2534 +/ 2535 class AsyncFile : AbstractFile { 2536 /++ 2537 Opens a file in asynchronous access mode. 2538 +/ 2539 this(FilePath filename, OpenMode mode = OpenMode.readOnly, RequirePreexisting require = RequirePreexisting.no, uint specialFlags = 0, uint permissionMask = 0) { 2540 // FIXME: implement permissionMask 2541 super(true, filename, mode, require, specialFlags); 2542 } 2543 2544 package(arsd) this(NativeFileHandle adoptPreSetup) { 2545 super(adoptPreSetup); 2546 } 2547 2548 /// 2549 AsyncReadRequest read(ubyte[] buffer, long offset = 0) { 2550 return new AsyncReadRequest(this, buffer, offset); 2551 } 2552 2553 /// 2554 AsyncWriteRequest write(const(void)[] buffer, long offset = 0) { 2555 return new AsyncWriteRequest(this, cast(ubyte[]) buffer, offset); 2556 } 2557 2558 } 2559 2560 /++ 2561 Reads or writes a file in one call. It might internally yield, but is generally blocking if it returns values. The callback ones depend on the implementation. 2562 2563 Tip: prefer the callback ones. If settings where async is possible, it will do async, and if not, it will sync. 2564 +/ 2565 void writeFile(string filename, const(void)[] contents) { 2566 2567 } 2568 2569 /// ditto 2570 string readTextFile(string filename, string fileEncoding = null) { 2571 return null; 2572 } 2573 2574 /// ditto 2575 const(ubyte[]) readBinaryFile(string filename) { 2576 return null; 2577 } 2578 2579 /+ 2580 private Class recycleObject(Class, Args...)(Class objectToRecycle, Args args) { 2581 if(objectToRecycle is null) 2582 return new Class(args); 2583 // destroy nulls out the vtable which is the first thing in the object 2584 // so if it hasn't already been destroyed, we'll do it here 2585 if((*cast(void**) objectToRecycle) !is null) { 2586 assert(typeid(objectToRecycle) is typeid(Class)); // to make sure we're actually recycling the right kind of object 2587 .destroy(objectToRecycle); 2588 } 2589 2590 // then go ahead and reinitialize it 2591 ubyte[] rawData = (cast(ubyte*) cast(void*) objectToRecycle)[0 .. __traits(classInstanceSize, Class)]; 2592 rawData[] = (cast(ubyte[]) typeid(Class).initializer)[]; 2593 2594 objectToRecycle.__ctor(args); 2595 2596 return objectToRecycle; 2597 } 2598 +/ 2599 2600 /+ 2601 /++ 2602 Preallocates a class object without initializing it. 2603 2604 This is suitable *only* for passing to one of the functions in here that takes a preallocated object for recycling. 2605 +/ 2606 Class preallocate(Class)() { 2607 import core.memory; 2608 // FIXME: can i pass NO_SCAN here? 2609 return cast(Class) GC.calloc(__traits(classInstanceSize, Class), 0, typeid(Class)); 2610 } 2611 2612 OwnedClass!Class preallocateOnStack(Class)() { 2613 2614 } 2615 +/ 2616 2617 // thanks for a random person on stack overflow for this function 2618 version(Windows) 2619 BOOL MyCreatePipeEx( 2620 PHANDLE lpReadPipe, 2621 PHANDLE lpWritePipe, 2622 LPSECURITY_ATTRIBUTES lpPipeAttributes, 2623 DWORD nSize, 2624 DWORD dwReadMode, 2625 DWORD dwWriteMode 2626 ) 2627 { 2628 HANDLE ReadPipeHandle, WritePipeHandle; 2629 DWORD dwError; 2630 CHAR[MAX_PATH] PipeNameBuffer; 2631 2632 if (nSize == 0) { 2633 nSize = 4096; 2634 } 2635 2636 // FIXME: should be atomic op and gshared 2637 static shared(int) PipeSerialNumber = 0; 2638 2639 import core.stdc.string; 2640 import core.stdc.stdio; 2641 2642 sprintf(PipeNameBuffer.ptr, 2643 "\\\\.\\Pipe\\ArsdCoreAnonymousPipe.%08x.%08x".ptr, 2644 GetCurrentProcessId(), 2645 atomicOp!"+="(PipeSerialNumber, 1) 2646 ); 2647 2648 ReadPipeHandle = CreateNamedPipeA( 2649 PipeNameBuffer.ptr, 2650 1/*PIPE_ACCESS_INBOUND*/ | dwReadMode, 2651 0/*PIPE_TYPE_BYTE*/ | 0/*PIPE_WAIT*/, 2652 1, // Number of pipes 2653 nSize, // Out buffer size 2654 nSize, // In buffer size 2655 120 * 1000, // Timeout in ms 2656 lpPipeAttributes 2657 ); 2658 2659 if (! ReadPipeHandle) { 2660 return FALSE; 2661 } 2662 2663 WritePipeHandle = CreateFileA( 2664 PipeNameBuffer.ptr, 2665 GENERIC_WRITE, 2666 0, // No sharing 2667 lpPipeAttributes, 2668 OPEN_EXISTING, 2669 FILE_ATTRIBUTE_NORMAL | dwWriteMode, 2670 null // Template file 2671 ); 2672 2673 if (INVALID_HANDLE_VALUE == WritePipeHandle) { 2674 dwError = GetLastError(); 2675 CloseHandle( ReadPipeHandle ); 2676 SetLastError(dwError); 2677 return FALSE; 2678 } 2679 2680 *lpReadPipe = ReadPipeHandle; 2681 *lpWritePipe = WritePipeHandle; 2682 return( TRUE ); 2683 } 2684 2685 2686 2687 /+ 2688 2689 // this is probably useless. 2690 2691 /++ 2692 Creates a pair of anonymous pipes ready for async operations. 2693 2694 You can pass some preallocated objects to recycle if you like. 2695 +/ 2696 AsyncAnonymousPipe[2] anonymousPipePair(AsyncAnonymousPipe[2] preallocatedObjects = [null, null], bool inheritable = false) { 2697 version(Posix) { 2698 int[2] fds; 2699 auto ret = pipe(fds); 2700 2701 if(ret == -1) 2702 throw new SystemApiException("pipe", errno); 2703 2704 // FIXME: do we want them inheritable? and do we want both sides to be async? 2705 if(!inheritable) { 2706 setCloExec(fds[0]); 2707 setCloExec(fds[1]); 2708 } 2709 // if it is inherited, do we actually want it non-blocking? 2710 makeNonBlocking(fds[0]); 2711 makeNonBlocking(fds[1]); 2712 2713 return [ 2714 recycleObject(preallocatedObjects[0], fds[0]), 2715 recycleObject(preallocatedObjects[1], fds[1]), 2716 ]; 2717 } else version(Windows) { 2718 HANDLE rp, wp; 2719 // FIXME: do we want them inheritable? and do we want both sides to be async? 2720 if(!MyCreatePipeEx(&rp, &wp, null, 0, FILE_FLAG_OVERLAPPED, FILE_FLAG_OVERLAPPED)) 2721 throw new SystemApiException("MyCreatePipeEx", GetLastError()); 2722 return [ 2723 recycleObject(preallocatedObjects[0], rp), 2724 recycleObject(preallocatedObjects[1], wp), 2725 ]; 2726 } else throw ArsdException!"NotYetImplemented"(); 2727 } 2728 // on posix, just do pipe() w/ non block 2729 // on windows, do an overlapped named pipe server, connect, stop listening, return pair. 2730 +/ 2731 2732 /+ 2733 class NamedPipe : AsyncFile { 2734 2735 } 2736 +/ 2737 2738 /++ 2739 A named pipe ready to accept connections. 2740 2741 A Windows named pipe is an IPC mechanism usable on local machines or across a Windows network. 2742 +/ 2743 version(Windows) 2744 class NamedPipeServer { 2745 // unix domain socket or windows named pipe 2746 2747 // Promise!AsyncAnonymousPipe connect; 2748 // Promise!AsyncAnonymousPipe accept; 2749 2750 // when a new connection arrives, it calls your callback 2751 // can be on a specific thread or on any thread 2752 } 2753 2754 private version(Windows) extern(Windows) { 2755 const(char)* inet_ntop(int, const void*, char*, socklen_t); 2756 } 2757 2758 /++ 2759 Some functions that return arrays allow you to provide your own buffer. These are indicated in the type system as `UserProvidedBuffer!Type`, and you get to decide what you want to happen if the buffer is too small via the [OnOutOfSpace] parameter. 2760 2761 These are usually optional, since an empty user provided buffer with the default policy of reallocate will also work fine for whatever needs to be returned, thanks to the garbage collector taking care of it for you. 2762 2763 The API inside `UserProvidedBuffer` is all private to the arsd library implementation; your job is just to provide the buffer to it with [provideBuffer] or a constructor call and decide on your on-out-of-space policy. 2764 2765 $(TIP 2766 To properly size a buffer, I suggest looking at what covers about 80% of cases. Trying to cover everything often leads to wasted buffer space, and if you use a reallocate policy it can cover the rest. You might be surprised how far just two elements can go! 2767 ) 2768 2769 History: 2770 Added August 4, 2023 (dub v11.0) 2771 +/ 2772 struct UserProvidedBuffer(T) { 2773 private T[] buffer; 2774 private int actualLength; 2775 private OnOutOfSpace policy; 2776 2777 /++ 2778 2779 +/ 2780 public this(scope T[] buffer, OnOutOfSpace policy = OnOutOfSpace.reallocate) { 2781 this.buffer = buffer; 2782 this.policy = policy; 2783 } 2784 2785 package(arsd) bool append(T item) { 2786 if(actualLength < buffer.length) { 2787 buffer[actualLength++] = item; 2788 return true; 2789 } else final switch(policy) { 2790 case OnOutOfSpace.discard: 2791 return false; 2792 case OnOutOfSpace.exception: 2793 throw ArsdException!"Buffer out of space"(buffer.length, actualLength); 2794 case OnOutOfSpace.reallocate: 2795 buffer ~= item; 2796 actualLength++; 2797 return true; 2798 } 2799 } 2800 2801 package(arsd) T[] slice() return { 2802 return buffer[0 .. actualLength]; 2803 } 2804 } 2805 2806 /// ditto 2807 UserProvidedBuffer!T provideBuffer(T)(scope T[] buffer, OnOutOfSpace policy = OnOutOfSpace.reallocate) { 2808 return UserProvidedBuffer!T(buffer, policy); 2809 } 2810 2811 /++ 2812 Possible policies for [UserProvidedBuffer]s that run out of space. 2813 +/ 2814 enum OnOutOfSpace { 2815 reallocate, /// reallocate the buffer with the GC to make room 2816 discard, /// discard all contents that do not fit in your provided buffer 2817 exception, /// throw an exception if there is data that would not fit in your provided buffer 2818 } 2819 2820 2821 /++ 2822 For functions that give you an unknown address, you can use this to hold it. 2823 2824 Can get: 2825 ip4 2826 ip6 2827 unix 2828 abstract_ 2829 2830 name lookup for connect (stream or dgram) 2831 request canonical name? 2832 2833 interface lookup for bind (stream or dgram) 2834 +/ 2835 struct SocketAddress { 2836 import core.sys.posix.netdb; 2837 2838 /++ 2839 Provides the set of addresses to listen on all supported protocols on the machine for the given interfaces. `localhost` only listens on the loopback interface, whereas `allInterfaces` will listen on loopback as well as the others on the system (meaning it may be publicly exposed to the internet). 2840 2841 If you provide a buffer, I recommend using one of length two, so `SocketAddress[2]`, since this usually provides one address for ipv4 and one for ipv6. 2842 +/ 2843 static SocketAddress[] localhost(ushort port, return UserProvidedBuffer!SocketAddress buffer = null) { 2844 buffer.append(ip6("::1", port)); 2845 buffer.append(ip4("127.0.0.1", port)); 2846 return buffer.slice; 2847 } 2848 2849 /// ditto 2850 static SocketAddress[] allInterfaces(ushort port, return UserProvidedBuffer!SocketAddress buffer = null) { 2851 char[16] str; 2852 return allInterfaces(intToString(port, str[]), buffer); 2853 } 2854 2855 /// ditto 2856 static SocketAddress[] allInterfaces(scope const char[] serviceOrPort, return UserProvidedBuffer!SocketAddress buffer = null) { 2857 addrinfo hints; 2858 hints.ai_flags = AI_PASSIVE; 2859 hints.ai_socktype = SOCK_STREAM; // just to filter it down a little tbh 2860 return get(null, serviceOrPort, &hints, buffer); 2861 } 2862 2863 /++ 2864 Returns a single address object for the given protocol and parameters. 2865 2866 You probably should generally prefer [get], [localhost], or [allInterfaces] to have more flexible code. 2867 +/ 2868 static SocketAddress ip4(scope const char[] address, ushort port, bool forListening = false) { 2869 return getSingleAddress(AF_INET, AI_NUMERICHOST | (forListening ? AI_PASSIVE : 0), address, port); 2870 } 2871 2872 /// ditto 2873 static SocketAddress ip4(ushort port) { 2874 return ip4(null, port, true); 2875 } 2876 2877 /// ditto 2878 static SocketAddress ip6(scope const char[] address, ushort port, bool forListening = false) { 2879 return getSingleAddress(AF_INET6, AI_NUMERICHOST | (forListening ? AI_PASSIVE : 0), address, port); 2880 } 2881 2882 /// ditto 2883 static SocketAddress ip6(ushort port) { 2884 return ip6(null, port, true); 2885 } 2886 2887 /// ditto 2888 static SocketAddress unix(scope const char[] path) { 2889 // FIXME 2890 SocketAddress addr; 2891 return addr; 2892 } 2893 2894 /// ditto 2895 static SocketAddress abstract_(scope const char[] path) { 2896 char[190] buffer = void; 2897 buffer[0] = 0; 2898 buffer[1 .. path.length] = path[]; 2899 return unix(buffer[0 .. 1 + path.length]); 2900 } 2901 2902 private static SocketAddress getSingleAddress(int family, int flags, scope const char[] address, ushort port) { 2903 addrinfo hints; 2904 hints.ai_family = family; 2905 hints.ai_flags = flags; 2906 2907 char[16] portBuffer; 2908 char[] portString = intToString(port, portBuffer[]); 2909 2910 SocketAddress[1] addr; 2911 auto res = get(address, portString, &hints, provideBuffer(addr[])); 2912 if(res.length == 0) 2913 throw ArsdException!"bad address"(address.idup, port); 2914 return res[0]; 2915 } 2916 2917 /++ 2918 Calls `getaddrinfo` and returns the array of results. It will populate the data into the buffer you provide, if you provide one, otherwise it will allocate its own. 2919 +/ 2920 static SocketAddress[] get(scope const char[] nodeName, scope const char[] serviceOrPort, addrinfo* hints = null, return UserProvidedBuffer!SocketAddress buffer = null, scope bool delegate(scope addrinfo* ai) filter = null) @trusted { 2921 addrinfo* res; 2922 CharzBuffer node = nodeName; 2923 CharzBuffer service = serviceOrPort; 2924 auto ret = getaddrinfo(nodeName is null ? null : node.ptr, serviceOrPort is null ? null : service.ptr, hints, &res); 2925 if(ret == 0) { 2926 auto current = res; 2927 while(current) { 2928 if(filter is null || filter(current)) { 2929 SocketAddress addr; 2930 addr.addrlen = cast(socklen_t) current.ai_addrlen; 2931 switch(current.ai_family) { 2932 case AF_INET: 2933 addr.in4 = * cast(sockaddr_in*) current.ai_addr; 2934 break; 2935 case AF_INET6: 2936 addr.in6 = * cast(sockaddr_in6*) current.ai_addr; 2937 break; 2938 case AF_UNIX: 2939 addr.unix_address = * cast(sockaddr_un*) current.ai_addr; 2940 break; 2941 default: 2942 // skip 2943 } 2944 2945 if(!buffer.append(addr)) 2946 break; 2947 } 2948 2949 current = current.ai_next; 2950 } 2951 2952 freeaddrinfo(res); 2953 } else { 2954 version(Windows) { 2955 throw new WindowsApiException("getaddrinfo", ret); 2956 } else { 2957 const char* error = gai_strerror(ret); 2958 } 2959 } 2960 2961 return buffer.slice; 2962 } 2963 2964 /++ 2965 Returns a string representation of the address that identifies it in a custom format. 2966 2967 $(LIST 2968 * Unix domain socket addresses are their path prefixed with "unix:", unless they are in the abstract namespace, in which case it is prefixed with "abstract:" and the zero is trimmed out. For example, "unix:/tmp/pipe". 2969 2970 * IPv4 addresses are written in dotted decimal followed by a colon and the port number. For example, "127.0.0.1:8080". 2971 2972 * IPv6 addresses are written in colon separated hex format, but enclosed in brackets, then followed by the colon and port number. For example, "[::1]:8080". 2973 ) 2974 +/ 2975 string toString() const @trusted { 2976 char[200] buffer; 2977 switch(address.sa_family) { 2978 case AF_INET: 2979 auto writable = stringz(inet_ntop(address.sa_family, &in4.sin_addr, buffer.ptr, buffer.length)); 2980 auto it = writable.borrow; 2981 buffer[it.length] = ':'; 2982 auto numbers = intToString(port, buffer[it.length + 1 .. $]); 2983 return buffer[0 .. it.length + 1 + numbers.length].idup; 2984 case AF_INET6: 2985 buffer[0] = '['; 2986 auto writable = stringz(inet_ntop(address.sa_family, &in6.sin6_addr, buffer.ptr + 1, buffer.length - 1)); 2987 auto it = writable.borrow; 2988 buffer[it.length + 1] = ']'; 2989 buffer[it.length + 2] = ':'; 2990 auto numbers = intToString(port, buffer[it.length + 3 .. $]); 2991 return buffer[0 .. it.length + 3 + numbers.length].idup; 2992 case AF_UNIX: 2993 // FIXME: it might be abstract in which case stringz is wrong!!!!! 2994 auto writable = stringz(cast(char*) unix_address.sun_path.ptr).borrow; 2995 if(writable.length == 0) 2996 return "unix:"; 2997 string prefix = writable[0] == 0 ? "abstract:" : "unix:"; 2998 buffer[0 .. prefix.length] = prefix[]; 2999 buffer[prefix.length .. prefix.length + writable.length] = writable[writable[0] == 0 ? 1 : 0 .. $]; 3000 return buffer.idup; 3001 case AF_UNSPEC: 3002 return "<unspecified address>"; 3003 default: 3004 return "<unsupported address>"; // FIXME 3005 } 3006 } 3007 3008 ushort port() const @trusted { 3009 switch(address.sa_family) { 3010 case AF_INET: 3011 return ntohs(in4.sin_port); 3012 case AF_INET6: 3013 return ntohs(in6.sin6_port); 3014 default: 3015 return 0; 3016 } 3017 } 3018 3019 /+ 3020 @safe unittest { 3021 SocketAddress[4] buffer; 3022 foreach(addr; SocketAddress.get("arsdnet.net", "http", null, provideBuffer(buffer[]))) 3023 writeln(addr.toString()); 3024 } 3025 +/ 3026 3027 /+ 3028 unittest { 3029 // writeln(SocketAddress.ip4(null, 4444, true)); 3030 // writeln(SocketAddress.ip4("400.3.2.1", 4444)); 3031 // writeln(SocketAddress.ip4("bar", 4444)); 3032 foreach(addr; localhost(4444)) 3033 writeln(addr.toString()); 3034 } 3035 +/ 3036 3037 socklen_t addrlen = typeof(this).sizeof - socklen_t.sizeof; // the size of the union below 3038 3039 union { 3040 sockaddr address; 3041 3042 sockaddr_storage storage; 3043 3044 sockaddr_in in4; 3045 sockaddr_in6 in6; 3046 3047 sockaddr_un unix_address; 3048 } 3049 3050 /+ 3051 this(string node, string serviceOrPort, int family = 0) { 3052 // need to populate the approrpiate address and the length and make sure you set sa_family 3053 } 3054 +/ 3055 3056 int domain() { 3057 return address.sa_family; 3058 } 3059 sockaddr* rawAddr() return { 3060 return &address; 3061 } 3062 socklen_t rawAddrLength() { 3063 return addrlen; 3064 } 3065 3066 // FIXME it is AF_BLUETOOTH 3067 // see: https://people.csail.mit.edu/albert/bluez-intro/x79.html 3068 // see: https://learn.microsoft.com/en-us/windows/win32/Bluetooth/bluetooth-programming-with-windows-sockets 3069 } 3070 3071 private version(Windows) { 3072 struct sockaddr_un { 3073 ushort sun_family; 3074 char[108] sun_path; 3075 } 3076 } 3077 3078 class AsyncSocket : AsyncFile { 3079 // otherwise: accept, bind, connect, shutdown, close. 3080 3081 static auto lastError() { 3082 version(Windows) 3083 return WSAGetLastError(); 3084 else 3085 return errno; 3086 } 3087 3088 static bool wouldHaveBlocked() { 3089 auto error = lastError; 3090 version(Windows) { 3091 return error == WSAEWOULDBLOCK || error == WSAETIMEDOUT; 3092 } else { 3093 return error == EAGAIN || error == EWOULDBLOCK; 3094 } 3095 } 3096 3097 version(Windows) 3098 enum INVALID = INVALID_SOCKET; 3099 else 3100 enum INVALID = -1; 3101 3102 // type is mostly SOCK_STREAM or SOCK_DGRAM 3103 /++ 3104 Creates a socket compatible with the given address. It does not actually connect or bind, nor store the address. You will want to pass it again to those functions: 3105 3106 --- 3107 auto socket = new Socket(address, Socket.Type.Stream); 3108 socket.connect(address).waitForCompletion(); 3109 --- 3110 +/ 3111 this(SocketAddress address, int type, int protocol = 0) { 3112 // need to look up these values for linux 3113 // type |= SOCK_NONBLOCK | SOCK_CLOEXEC; 3114 3115 handle_ = socket(address.domain(), type, protocol); 3116 if(handle == INVALID) 3117 throw new SystemApiException("socket", lastError()); 3118 3119 super(cast(NativeFileHandle) handle); // I think that cast is ok on Windows... i think 3120 3121 version(Posix) { 3122 makeNonBlocking(handle); 3123 setCloExec(handle); 3124 } 3125 3126 // FIXME: chekc for broadcast 3127 3128 // FIXME: REUSEADDR ? 3129 3130 // FIXME: also set NO_DELAY prolly 3131 // int opt = 1; 3132 // setsockopt(handle, IPPROTO_TCP, TCP_NODELAY, &opt, opt.sizeof); 3133 } 3134 3135 /++ 3136 Enabling NODELAY can give latency improvements if you are managing buffers on your end 3137 +/ 3138 void setNoDelay(bool enabled) { 3139 3140 } 3141 3142 /++ 3143 3144 `allowQuickRestart` will set the SO_REUSEADDR on unix and SO_DONTLINGER on Windows, 3145 allowing the application to be quickly restarted despite there still potentially being 3146 pending data in the tcp stack. 3147 3148 See https://stackoverflow.com/questions/3229860/what-is-the-meaning-of-so-reuseaddr-setsockopt-option-linux for more information. 3149 3150 If you already set your appropriate socket options or value correctness and reliability of the network stream over restart speed, leave this at the default `false`. 3151 +/ 3152 void bind(SocketAddress address, bool allowQuickRestart = false) { 3153 if(allowQuickRestart) { 3154 // FIXME 3155 } 3156 3157 auto ret = .bind(handle, address.rawAddr, address.rawAddrLength); 3158 if(ret == -1) 3159 throw new SystemApiException("bind", lastError); 3160 } 3161 3162 /++ 3163 You must call [bind] before this. 3164 3165 The backlog should be set to a value where your application can reliably catch up on the backlog in a reasonable amount of time under average load. It is meant to smooth over short duration bursts and making it too big will leave clients hanging - which might cause them to try to reconnect, thinking things got lost in transit, adding to your impossible backlog. 3166 3167 I personally tend to set this to be two per worker thread unless I have actual real world measurements saying to do something else. It is a bit arbitrary and not based on legitimate reasoning, it just seems to work for me (perhaps just because it has never really been put to the test). 3168 +/ 3169 void listen(int backlog) { 3170 auto ret = .listen(handle, backlog); 3171 if(ret == -1) 3172 throw new SystemApiException("listen", lastError); 3173 } 3174 3175 /++ 3176 +/ 3177 void shutdown(int how) { 3178 auto ret = .shutdown(handle, how); 3179 if(ret == -1) 3180 throw new SystemApiException("shutdown", lastError); 3181 } 3182 3183 /++ 3184 +/ 3185 override void close() { 3186 version(Windows) 3187 closesocket(handle); 3188 else 3189 .close(handle); 3190 handle_ = -1; 3191 } 3192 3193 /++ 3194 You can also construct your own request externally to control the memory more. 3195 +/ 3196 AsyncConnectRequest connect(SocketAddress address, ubyte[] bufferToSend = null) { 3197 return new AsyncConnectRequest(this, address, bufferToSend); 3198 } 3199 3200 /++ 3201 You can also construct your own request externally to control the memory more. 3202 +/ 3203 AsyncAcceptRequest accept() { 3204 return new AsyncAcceptRequest(this); 3205 } 3206 3207 // note that send is just sendto w/ a null address 3208 // and receive is just receivefrom w/ a null address 3209 /++ 3210 You can also construct your own request externally to control the memory more. 3211 +/ 3212 AsyncSendRequest send(const(ubyte)[] buffer, int flags = 0) { 3213 return new AsyncSendRequest(this, buffer, null, flags); 3214 } 3215 3216 /++ 3217 You can also construct your own request externally to control the memory more. 3218 +/ 3219 AsyncReceiveRequest receive(ubyte[] buffer, int flags = 0) { 3220 return new AsyncReceiveRequest(this, buffer, null, flags); 3221 } 3222 3223 /++ 3224 You can also construct your own request externally to control the memory more. 3225 +/ 3226 AsyncSendRequest sendTo(const(ubyte)[] buffer, SocketAddress* address, int flags = 0) { 3227 return new AsyncSendRequest(this, buffer, address, flags); 3228 } 3229 /++ 3230 You can also construct your own request externally to control the memory more. 3231 +/ 3232 AsyncReceiveRequest receiveFrom(ubyte[] buffer, SocketAddress* address, int flags = 0) { 3233 return new AsyncReceiveRequest(this, buffer, address, flags); 3234 } 3235 3236 /++ 3237 +/ 3238 SocketAddress localAddress() { 3239 SocketAddress addr; 3240 getsockname(handle, &addr.address, &addr.addrlen); 3241 return addr; 3242 } 3243 /++ 3244 +/ 3245 SocketAddress peerAddress() { 3246 SocketAddress addr; 3247 getpeername(handle, &addr.address, &addr.addrlen); 3248 return addr; 3249 } 3250 3251 // for unix sockets on unix only: send/receive fd, get peer creds 3252 3253 /++ 3254 3255 +/ 3256 final NativeSocketHandle handle() { 3257 return handle_; 3258 } 3259 3260 private NativeSocketHandle handle_; 3261 } 3262 3263 /++ 3264 Initiates a connection request and optionally sends initial data as soon as possible. 3265 3266 Calls `ConnectEx` on Windows and emulates it on other systems. 3267 3268 The entire buffer is sent before the operation is considered complete. 3269 3270 NOT IMPLEMENTED / NOT STABLE 3271 +/ 3272 class AsyncConnectRequest : AsyncOperationRequest { 3273 // FIXME: i should take a list of addresses and take the first one that succeeds, so a getaddrinfo can be sent straight in. 3274 this(AsyncSocket socket, SocketAddress address, ubyte[] dataToWrite) { 3275 3276 } 3277 3278 override void start() {} 3279 override void cancel() {} 3280 override bool isComplete() { return true; } 3281 override AsyncConnectResponse waitForCompletion() { assert(0); } 3282 } 3283 /++ 3284 +/ 3285 class AsyncConnectResponse : AsyncOperationResponse { 3286 const SystemErrorCode errorCode; 3287 3288 this(SystemErrorCode errorCode) { 3289 this.errorCode = errorCode; 3290 } 3291 3292 override bool wasSuccessful() { 3293 return errorCode.wasSuccessful; 3294 } 3295 3296 } 3297 3298 // FIXME: TransmitFile/sendfile support 3299 3300 /++ 3301 Calls `AcceptEx` on Windows and emulates it on other systems. 3302 3303 NOT IMPLEMENTED / NOT STABLE 3304 +/ 3305 class AsyncAcceptRequest : AsyncOperationRequest { 3306 AsyncSocket socket; 3307 3308 override void start() {} 3309 override void cancel() {} 3310 override bool isComplete() { return true; } 3311 override AsyncConnectResponse waitForCompletion() { assert(0); } 3312 3313 3314 struct LowLevelOperation { 3315 AsyncSocket file; 3316 ubyte[] buffer; 3317 SocketAddress* address; 3318 3319 this(typeof(this.tupleof) args) { 3320 this.tupleof = args; 3321 } 3322 3323 version(Windows) { 3324 auto opCall(OVERLAPPED* overlapped, LPOVERLAPPED_COMPLETION_ROUTINE ocr) { 3325 WSABUF buf; 3326 buf.len = cast(int) buffer.length; 3327 buf.buf = cast(typeof(buf.buf)) buffer.ptr; 3328 3329 uint flags; 3330 3331 if(address is null) 3332 return WSARecv(file.handle, &buf, 1, null, &flags, overlapped, ocr); 3333 else { 3334 return WSARecvFrom(file.handle, &buf, 1, null, &flags, &(address.address), &(address.addrlen), overlapped, ocr); 3335 } 3336 } 3337 } else { 3338 auto opCall() { 3339 int flags; 3340 if(address is null) 3341 return core.sys.posix.sys.socket.recv(file.handle, buffer.ptr, buffer.length, flags); 3342 else 3343 return core.sys.posix.sys.socket.recvfrom(file.handle, buffer.ptr, buffer.length, flags, &(address.address), &(address.addrlen)); 3344 } 3345 } 3346 3347 string errorString() { 3348 return "Receive"; 3349 } 3350 } 3351 mixin OverlappedIoRequest!(AsyncAcceptResponse, LowLevelOperation); 3352 3353 this(AsyncSocket socket, ubyte[] buffer = null, SocketAddress* address = null) { 3354 llo = LowLevelOperation(socket, buffer, address); 3355 this.response = typeof(this.response).defaultConstructed; 3356 } 3357 3358 // can also look up the local address 3359 } 3360 /++ 3361 +/ 3362 class AsyncAcceptResponse : AsyncOperationResponse { 3363 AsyncSocket newSocket; 3364 const SystemErrorCode errorCode; 3365 3366 this(SystemErrorCode errorCode, ubyte[] buffer) { 3367 this.errorCode = errorCode; 3368 } 3369 3370 this(AsyncSocket newSocket, SystemErrorCode errorCode) { 3371 this.newSocket = newSocket; 3372 this.errorCode = errorCode; 3373 } 3374 3375 override bool wasSuccessful() { 3376 return errorCode.wasSuccessful; 3377 } 3378 } 3379 3380 /++ 3381 +/ 3382 class AsyncReceiveRequest : AsyncOperationRequest { 3383 struct LowLevelOperation { 3384 AsyncSocket file; 3385 ubyte[] buffer; 3386 int flags; 3387 SocketAddress* address; 3388 3389 this(typeof(this.tupleof) args) { 3390 this.tupleof = args; 3391 } 3392 3393 version(Windows) { 3394 auto opCall(OVERLAPPED* overlapped, LPOVERLAPPED_COMPLETION_ROUTINE ocr) { 3395 WSABUF buf; 3396 buf.len = cast(int) buffer.length; 3397 buf.buf = cast(typeof(buf.buf)) buffer.ptr; 3398 3399 uint flags = this.flags; 3400 3401 if(address is null) 3402 return WSARecv(file.handle, &buf, 1, null, &flags, overlapped, ocr); 3403 else { 3404 return WSARecvFrom(file.handle, &buf, 1, null, &flags, &(address.address), &(address.addrlen), overlapped, ocr); 3405 } 3406 } 3407 } else { 3408 auto opCall() { 3409 if(address is null) 3410 return core.sys.posix.sys.socket.recv(file.handle, buffer.ptr, buffer.length, flags); 3411 else 3412 return core.sys.posix.sys.socket.recvfrom(file.handle, buffer.ptr, buffer.length, flags, &(address.address), &(address.addrlen)); 3413 } 3414 } 3415 3416 string errorString() { 3417 return "Receive"; 3418 } 3419 } 3420 mixin OverlappedIoRequest!(AsyncReceiveResponse, LowLevelOperation); 3421 3422 this(AsyncSocket socket, ubyte[] buffer, SocketAddress* address, int flags) { 3423 llo = LowLevelOperation(socket, buffer, flags, address); 3424 this.response = typeof(this.response).defaultConstructed; 3425 } 3426 3427 } 3428 /++ 3429 +/ 3430 class AsyncReceiveResponse : AsyncOperationResponse { 3431 const ubyte[] bufferWritten; 3432 const SystemErrorCode errorCode; 3433 3434 this(SystemErrorCode errorCode, const(ubyte)[] bufferWritten) { 3435 this.errorCode = errorCode; 3436 this.bufferWritten = bufferWritten; 3437 } 3438 3439 override bool wasSuccessful() { 3440 return errorCode.wasSuccessful; 3441 } 3442 } 3443 3444 /++ 3445 +/ 3446 class AsyncSendRequest : AsyncOperationRequest { 3447 struct LowLevelOperation { 3448 AsyncSocket file; 3449 const(ubyte)[] buffer; 3450 int flags; 3451 SocketAddress* address; 3452 3453 this(typeof(this.tupleof) args) { 3454 this.tupleof = args; 3455 } 3456 3457 version(Windows) { 3458 auto opCall(OVERLAPPED* overlapped, LPOVERLAPPED_COMPLETION_ROUTINE ocr) { 3459 WSABUF buf; 3460 buf.len = cast(int) buffer.length; 3461 buf.buf = cast(typeof(buf.buf)) buffer.ptr; 3462 3463 if(address is null) 3464 return WSASend(file.handle, &buf, 1, null, flags, overlapped, ocr); 3465 else { 3466 return WSASendTo(file.handle, &buf, 1, null, flags, address.rawAddr, address.rawAddrLength, overlapped, ocr); 3467 } 3468 } 3469 } else { 3470 auto opCall() { 3471 if(address is null) 3472 return core.sys.posix.sys.socket.send(file.handle, buffer.ptr, buffer.length, flags); 3473 else 3474 return core.sys.posix.sys.socket.sendto(file.handle, buffer.ptr, buffer.length, flags, address.rawAddr, address.rawAddrLength); 3475 } 3476 } 3477 3478 string errorString() { 3479 return "Send"; 3480 } 3481 } 3482 mixin OverlappedIoRequest!(AsyncSendResponse, LowLevelOperation); 3483 3484 this(AsyncSocket socket, const(ubyte)[] buffer, SocketAddress* address, int flags) { 3485 llo = LowLevelOperation(socket, buffer, flags, address); 3486 this.response = typeof(this.response).defaultConstructed; 3487 } 3488 } 3489 3490 /++ 3491 +/ 3492 class AsyncSendResponse : AsyncOperationResponse { 3493 const ubyte[] bufferWritten; 3494 const SystemErrorCode errorCode; 3495 3496 this(SystemErrorCode errorCode, const(ubyte)[] bufferWritten) { 3497 this.errorCode = errorCode; 3498 this.bufferWritten = bufferWritten; 3499 } 3500 3501 override bool wasSuccessful() { 3502 return errorCode.wasSuccessful; 3503 } 3504 3505 } 3506 3507 /++ 3508 A set of sockets bound and ready to accept connections on worker threads. 3509 3510 Depending on the specified address, it can be tcp, tcpv6, unix domain, or all of the above. 3511 3512 NOT IMPLEMENTED / NOT STABLE 3513 +/ 3514 class StreamServer { 3515 AsyncSocket[] sockets; 3516 3517 this(SocketAddress[] listenTo, int backlog = 8) { 3518 foreach(listen; listenTo) { 3519 auto socket = new AsyncSocket(listen, SOCK_STREAM); 3520 3521 // FIXME: allInterfaces for ipv6 also covers ipv4 so the bind can fail... 3522 // so we have to permit it to fail w/ address in use if we know we already 3523 // are listening to ipv6 3524 3525 // or there is a setsockopt ipv6 only thing i could set. 3526 3527 socket.bind(listen); 3528 socket.listen(backlog); 3529 sockets ~= socket; 3530 3531 // writeln(socket.localAddress.port); 3532 } 3533 3534 // i have to start accepting on each thread for each socket... 3535 } 3536 // when a new connection arrives, it calls your callback 3537 // can be on a specific thread or on any thread 3538 3539 3540 void start() { 3541 foreach(socket; sockets) { 3542 auto request = socket.accept(); 3543 request.start(); 3544 } 3545 } 3546 } 3547 3548 /+ 3549 unittest { 3550 auto ss = new StreamServer(SocketAddress.localhost(0)); 3551 } 3552 +/ 3553 3554 /++ 3555 A socket bound and ready to use receiveFrom 3556 3557 Depending on the address, it can be udp or unix domain. 3558 3559 NOT IMPLEMENTED / NOT STABLE 3560 +/ 3561 class DatagramListener { 3562 // whenever a udp message arrives, it calls your callback 3563 // can be on a specific thread or on any thread 3564 3565 // UDP is realistically just an async read on the bound socket 3566 // just it can get the "from" data out and might need the "more in packet" flag 3567 } 3568 3569 /++ 3570 Just in case I decide to change the implementation some day. 3571 +/ 3572 alias AsyncAnonymousPipe = AsyncFile; 3573 3574 3575 // AsyncAnonymousPipe connectNamedPipe(AsyncAnonymousPipe preallocated, string name) 3576 3577 // unix fifos are considered just non-seekable files and have no special support in the lib; open them as a regular file w/ the async flag. 3578 3579 // DIRECTORY LISTINGS 3580 // not async, so if you want that, do it in a helper thread 3581 // just a convenient function to have (tho phobos has a decent one too, importing it expensive af) 3582 3583 /++ 3584 Note that the order of items called for your delegate is undefined; if you want it sorted, you'll have to collect and sort yourself. But it *might* be sorted by the OS (on Windows, it almost always is), so consider that when choosing a sorting algorithm. 3585 3586 History: 3587 previously in minigui as a private function. Moved to arsd.core on April 3, 2023 3588 +/ 3589 GetFilesResult getFiles(string directory, scope void delegate(string name, bool isDirectory) dg) { 3590 // FIXME: my buffers here aren't great lol 3591 3592 SavedArgument[1] argsForException() { 3593 return [ 3594 SavedArgument("directory", LimitedVariant(directory)), 3595 ]; 3596 } 3597 3598 version(Windows) { 3599 WIN32_FIND_DATA data; 3600 // FIXME: if directory ends with / or \\ ? 3601 WCharzBuffer search = WCharzBuffer(directory ~ "/*"); 3602 auto handle = FindFirstFileW(search.ptr, &data); 3603 scope(exit) if(handle !is INVALID_HANDLE_VALUE) FindClose(handle); 3604 if(handle is INVALID_HANDLE_VALUE) { 3605 if(GetLastError() == ERROR_FILE_NOT_FOUND) 3606 return GetFilesResult.fileNotFound; 3607 throw new WindowsApiException("FindFirstFileW", GetLastError(), argsForException()[]); 3608 } 3609 3610 try_more: 3611 3612 string name = makeUtf8StringFromWindowsString(data.cFileName[0 .. findIndexOfZero(data.cFileName[])]); 3613 3614 dg(name, (data.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY) ? true : false); 3615 3616 auto ret = FindNextFileW(handle, &data); 3617 if(ret == 0) { 3618 if(GetLastError() == ERROR_NO_MORE_FILES) 3619 return GetFilesResult.success; 3620 throw new WindowsApiException("FindNextFileW", GetLastError(), argsForException()[]); 3621 } 3622 3623 goto try_more; 3624 3625 } else version(Posix) { 3626 import core.sys.posix.dirent; 3627 import core.stdc.errno; 3628 auto dir = opendir((directory ~ "\0").ptr); 3629 scope(exit) 3630 if(dir) closedir(dir); 3631 if(dir is null) 3632 throw new ErrnoApiException("opendir", errno, argsForException()); 3633 3634 auto dirent = readdir(dir); 3635 if(dirent is null) 3636 return GetFilesResult.fileNotFound; 3637 3638 try_more: 3639 3640 string name = dirent.d_name[0 .. findIndexOfZero(dirent.d_name[])].idup; 3641 3642 dg(name, dirent.d_type == DT_DIR); 3643 3644 dirent = readdir(dir); 3645 if(dirent is null) 3646 return GetFilesResult.success; 3647 3648 goto try_more; 3649 } else static assert(0); 3650 } 3651 3652 /// ditto 3653 enum GetFilesResult { 3654 success, 3655 fileNotFound 3656 } 3657 3658 /++ 3659 This is currently a simplified glob where only the * wildcard in the first or last position gets special treatment or a single * in the middle. 3660 3661 More things may be added later to be more like what Phobos supports. 3662 +/ 3663 bool matchesFilePattern(scope const(char)[] name, scope const(char)[] pattern) { 3664 if(pattern.length == 0) 3665 return false; 3666 if(pattern == "*") 3667 return true; 3668 if(pattern.length > 2 && pattern[0] == '*' && pattern[$-1] == '*') { 3669 // if the rest of pattern appears in name, it is good 3670 return name.indexOf(pattern[1 .. $-1]) != -1; 3671 } else if(pattern[0] == '*') { 3672 // if the rest of pattern is at end of name, it is good 3673 return name.endsWith(pattern[1 .. $]); 3674 } else if(pattern[$-1] == '*') { 3675 // if the rest of pattern is at start of name, it is good 3676 return name.startsWith(pattern[0 .. $-1]); 3677 } else if(pattern.length >= 3) { 3678 auto idx = pattern.indexOf("*"); 3679 if(idx != -1) { 3680 auto lhs = pattern[0 .. idx]; 3681 auto rhs = pattern[idx + 1 .. $]; 3682 if(name.length >= lhs.length + rhs.length) { 3683 return name.startsWith(lhs) && name.endsWith(rhs); 3684 } else { 3685 return false; 3686 } 3687 } 3688 } 3689 3690 return name == pattern; 3691 } 3692 3693 unittest { 3694 assert("test.html".matchesFilePattern("*")); 3695 assert("test.html".matchesFilePattern("*.html")); 3696 assert("test.html".matchesFilePattern("*.*")); 3697 assert("test.html".matchesFilePattern("test.*")); 3698 assert(!"test.html".matchesFilePattern("pest.*")); 3699 assert(!"test.html".matchesFilePattern("*.dhtml")); 3700 3701 assert("test.html".matchesFilePattern("t*.html")); 3702 assert(!"test.html".matchesFilePattern("e*.html")); 3703 } 3704 3705 package(arsd) int indexOf(scope const(char)[] haystack, scope const(char)[] needle) { 3706 if(haystack.length < needle.length) 3707 return -1; 3708 if(haystack == needle) 3709 return 0; 3710 foreach(i; 0 .. haystack.length - needle.length + 1) 3711 if(haystack[i .. i + needle.length] == needle) 3712 return cast(int) i; 3713 return -1; 3714 } 3715 3716 unittest { 3717 assert("foo".indexOf("f") == 0); 3718 assert("foo".indexOf("o") == 1); 3719 assert("foo".indexOf("foo") == 0); 3720 assert("foo".indexOf("oo") == 1); 3721 assert("foo".indexOf("fo") == 0); 3722 assert("foo".indexOf("boo") == -1); 3723 assert("foo".indexOf("food") == -1); 3724 } 3725 3726 package(arsd) bool endsWith(scope const(char)[] haystack, scope const(char)[] needle) { 3727 if(needle.length > haystack.length) 3728 return false; 3729 return haystack[$ - needle.length .. $] == needle; 3730 } 3731 3732 unittest { 3733 assert("foo".endsWith("o")); 3734 assert("foo".endsWith("oo")); 3735 assert("foo".endsWith("foo")); 3736 assert(!"foo".endsWith("food")); 3737 assert(!"foo".endsWith("d")); 3738 } 3739 3740 package(arsd) bool startsWith(scope const(char)[] haystack, scope const(char)[] needle) { 3741 if(needle.length > haystack.length) 3742 return false; 3743 return haystack[0 .. needle.length] == needle; 3744 } 3745 3746 unittest { 3747 assert("foo".startsWith("f")); 3748 assert("foo".startsWith("fo")); 3749 assert("foo".startsWith("foo")); 3750 assert(!"foo".startsWith("food")); 3751 assert(!"foo".startsWith("d")); 3752 } 3753 3754 3755 // FILE/DIR WATCHES 3756 // linux does it by name, windows and bsd do it by handle/descriptor 3757 // dispatches change event to either your thread or maybe the any task` queue. 3758 3759 /++ 3760 PARTIALLY IMPLEMENTED / NOT STABLE 3761 3762 +/ 3763 class DirectoryWatcher { 3764 private { 3765 version(Arsd_core_windows) { 3766 OVERLAPPED overlapped; 3767 HANDLE hDirectory; 3768 ubyte[] buffer; 3769 3770 extern(Windows) 3771 static void overlappedCompletionRoutine(DWORD dwErrorCode, DWORD dwNumberOfBytesTransferred, LPOVERLAPPED lpOverlapped) { 3772 typeof(this) rr = cast(typeof(this)) (cast(void*) lpOverlapped - typeof(this).overlapped.offsetof); 3773 3774 // dwErrorCode 3775 auto response = rr.buffer[0 .. dwNumberOfBytesTransferred]; 3776 3777 while(response.length) { 3778 auto fni = cast(FILE_NOTIFY_INFORMATION*) response.ptr; 3779 auto filename = fni.FileName[0 .. fni.FileNameLength]; 3780 3781 if(fni.NextEntryOffset) 3782 response = response[fni.NextEntryOffset .. $]; 3783 else 3784 response = response[$..$]; 3785 3786 // FIXME: I think I need to pin every overlapped op while it is pending 3787 // and unpin it when it is returned. GC.addRoot... but i don't wanna do that 3788 // every op so i guess i should do a refcount scheme similar to the other callback helper. 3789 3790 rr.changeHandler( 3791 FilePath(makeUtf8StringFromWindowsString(filename)), // FIXME: this is a relative path 3792 ChangeOperation.unknown // FIXME this is fni.Action 3793 ); 3794 } 3795 3796 rr.requestRead(); 3797 } 3798 3799 void requestRead() { 3800 DWORD ignored; 3801 if(!ReadDirectoryChangesW( 3802 hDirectory, 3803 buffer.ptr, 3804 cast(int) buffer.length, 3805 recursive, 3806 FILE_NOTIFY_CHANGE_LAST_WRITE | FILE_NOTIFY_CHANGE_CREATION | FILE_NOTIFY_CHANGE_FILE_NAME, 3807 &ignored, 3808 &overlapped, 3809 &overlappedCompletionRoutine 3810 )) { 3811 auto error = GetLastError(); 3812 /+ 3813 if(error == ERROR_IO_PENDING) { 3814 // not expected here, the docs say it returns true when queued 3815 } 3816 +/ 3817 3818 throw new SystemApiException("ReadDirectoryChangesW", error); 3819 } 3820 } 3821 } else version(Arsd_core_epoll) { 3822 static int inotifyfd = -1; // this is TLS since it is associated with the thread's event loop 3823 static ICoreEventLoop.UnregisterToken inotifyToken; 3824 static CallbackHelper inotifycb; 3825 static DirectoryWatcher[int] watchMappings; 3826 3827 static ~this() { 3828 if(inotifyfd != -1) { 3829 close(inotifyfd); 3830 inotifyfd = -1; 3831 } 3832 } 3833 3834 import core.sys.linux.sys.inotify; 3835 3836 int watchId = -1; 3837 3838 static void inotifyReady() { 3839 // read from it 3840 ubyte[256 /* NAME_MAX + 1 */ + inotify_event.sizeof] sbuffer; 3841 3842 auto ret = read(inotifyfd, sbuffer.ptr, sbuffer.length); 3843 if(ret == -1) { 3844 auto errno = errno; 3845 if(errno == EAGAIN || errno == EWOULDBLOCK) 3846 return; 3847 throw new SystemApiException("read inotify", errno); 3848 } else if(ret == 0) { 3849 assert(0, "I don't think this is ever supposed to happen"); 3850 } 3851 3852 auto buffer = sbuffer[0 .. ret]; 3853 3854 while(buffer.length > 0) { 3855 inotify_event* event = cast(inotify_event*) buffer.ptr; 3856 buffer = buffer[inotify_event.sizeof .. $]; 3857 char[] filename = cast(char[]) buffer[0 .. event.len]; 3858 buffer = buffer[event.len .. $]; 3859 3860 // note that filename is padded with zeroes, so it is actually a stringz 3861 3862 if(auto obj = event.wd in watchMappings) { 3863 (*obj).changeHandler( 3864 FilePath(stringz(filename.ptr).borrow.idup), // FIXME: this is a relative path 3865 ChangeOperation.unknown // FIXME 3866 ); 3867 } else { 3868 // it has probably already been removed 3869 } 3870 } 3871 } 3872 } else version(Arsd_core_kqueue) { 3873 int fd; 3874 CallbackHelper cb; 3875 } 3876 3877 FilePath path; 3878 string globPattern; 3879 bool recursive; 3880 void delegate(FilePath filename, ChangeOperation op) changeHandler; 3881 } 3882 3883 enum ChangeOperation { 3884 unknown, 3885 deleted, // NOTE_DELETE, IN_DELETE, FILE_NOTIFY_CHANGE_FILE_NAME 3886 written, // NOTE_WRITE / NOTE_EXTEND / NOTE_TRUNCATE, IN_MODIFY, FILE_NOTIFY_CHANGE_LAST_WRITE / FILE_NOTIFY_CHANGE_SIZE 3887 renamed, // NOTE_RENAME, the moved from/to in linux, FILE_NOTIFY_CHANGE_FILE_NAME 3888 metadataChanged // NOTE_ATTRIB, IN_ATTRIB, FILE_NOTIFY_CHANGE_ATTRIBUTES 3889 3890 // there is a NOTE_OPEN on freebsd 13, and the access change on Windows. and an open thing on linux. so maybe i can do note open/note_read too. 3891 } 3892 3893 /+ 3894 Windows and Linux work best when you watch directories. The operating system tells you the name of files as they change. 3895 3896 BSD doesn't support this. You can only get names and reports when a file is modified by watching specific files. AS such, when you watch a directory on those systems, your delegate will be called with a null path. Cross-platform applications should check for this and not assume the name is always usable. 3897 3898 inotify is kinda clearly the best of the bunch, with Windows in second place, and kqueue dead last. 3899 3900 3901 If path to watch is a directory, it signals when a file inside the directory (only one layer deep) is created or modified. This is the most efficient on Windows and Linux. 3902 3903 If a path is a file, it only signals when that specific file is written. This is most efficient on BSD. 3904 3905 3906 The delegate is called when something happens. Note that the path modified may not be accurate on all systems when you are watching a directory. 3907 +/ 3908 3909 /++ 3910 Watches a directory and its contents. If the `globPattern` is `null`, it will not attempt to add child items but also will not filter it, meaning you will be left with platform-specific behavior. 3911 3912 On Windows, the globPattern is just used to filter events. 3913 3914 On Linux, the `recursive` flag, if set, will cause it to add additional OS-level watches for each subdirectory. 3915 3916 On BSD, anything other than a null pattern will cause a directory scan to add files to the watch list. 3917 3918 For best results, use the most limited thing you need, as watches can get quite involved on the bsd systems. 3919 3920 Newly added files and subdirectories may not be automatically added in all cases, meaning if it is added and then subsequently modified, you might miss a notification. 3921 3922 If the event queue is too busy, the OS may skip a notification. 3923 3924 You should always offer some way for the user to force a refresh and not rely on notifications being present; they are a convenience when they work, not an always reliable method. 3925 +/ 3926 this(FilePath directoryToWatch, string globPattern, bool recursive, void delegate(FilePath pathModified, ChangeOperation op) dg) { 3927 this.path = directoryToWatch; 3928 this.globPattern = globPattern; 3929 this.recursive = recursive; 3930 this.changeHandler = dg; 3931 3932 version(Arsd_core_windows) { 3933 WCharzBuffer wname = directoryToWatch.path; 3934 buffer = new ubyte[](1024); 3935 hDirectory = CreateFileW( 3936 wname.ptr, 3937 GENERIC_READ, 3938 FILE_SHARE_READ, 3939 null, 3940 OPEN_EXISTING, 3941 FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED | FILE_FLAG_BACKUP_SEMANTICS, 3942 null 3943 ); 3944 if(hDirectory == INVALID_HANDLE_VALUE) 3945 throw new SystemApiException("CreateFileW", GetLastError()); 3946 3947 requestRead(); 3948 } else version(Arsd_core_epoll) { 3949 auto el = getThisThreadEventLoop(); 3950 3951 // no need for sync because it is thread-local 3952 if(inotifyfd == -1) { 3953 inotifyfd = inotify_init1(IN_NONBLOCK | IN_CLOEXEC); 3954 if(inotifyfd == -1) 3955 throw new SystemApiException("inotify_init1", errno); 3956 3957 inotifycb = new CallbackHelper(&inotifyReady); 3958 inotifyToken = el.addCallbackOnFdReadable(inotifyfd, inotifycb); 3959 } 3960 3961 uint event_mask = IN_CREATE | IN_MODIFY | IN_DELETE; // FIXME 3962 CharzBuffer dtw = directoryToWatch.path; 3963 auto watchId = inotify_add_watch(inotifyfd, dtw.ptr, event_mask); 3964 if(watchId < -1) 3965 throw new SystemApiException("inotify_add_watch", errno, [SavedArgument("path", LimitedVariant(directoryToWatch.path))]); 3966 3967 watchMappings[watchId] = this; 3968 3969 // FIXME: recursive needs to add child things individually 3970 3971 } else version(Arsd_core_kqueue) { 3972 auto el = cast(CoreEventLoopImplementation) getThisThreadEventLoop(); 3973 3974 // FIXME: need to scan for globPattern 3975 // when a new file is added, i'll have to diff my list to detect it and open it too 3976 // and recursive might need to scan down too. 3977 3978 kevent_t ev; 3979 3980 import core.sys.posix.fcntl; 3981 CharzBuffer buffer = CharzBuffer(directoryToWatch.path); 3982 fd = ErrnoEnforce!open(buffer.ptr, O_RDONLY); 3983 setCloExec(fd); 3984 3985 cb = new CallbackHelper(&triggered); 3986 3987 EV_SET(&ev, fd, EVFILT_VNODE, EV_ADD | EV_ENABLE | EV_CLEAR, NOTE_WRITE, 0, cast(void*) cb); 3988 ErrnoEnforce!kevent(el.kqueuefd, &ev, 1, null, 0, null); 3989 } else assert(0, "Not yet implemented for this platform"); 3990 } 3991 3992 private void triggered() { 3993 writeln("triggered"); 3994 } 3995 3996 void dispose() { 3997 version(Arsd_core_windows) { 3998 CloseHandle(hDirectory); 3999 } else version(Arsd_core_epoll) { 4000 watchMappings.remove(watchId); // I could also do this on the IN_IGNORE notification but idk 4001 inotify_rm_watch(inotifyfd, watchId); 4002 } else version(Arsd_core_kqueue) { 4003 ErrnoEnforce!close(fd); 4004 fd = -1; 4005 } 4006 } 4007 } 4008 4009 version(none) 4010 void main() { 4011 4012 // auto file = new AsyncFile(FilePath("test.txt"), AsyncFile.OpenMode.writeWithTruncation, AsyncFile.RequirePreexisting.yes); 4013 4014 /+ 4015 getFiles("c:/windows\\", (string filename, bool isDirectory) { 4016 writeln(filename, " ", isDirectory ? "[dir]": "[file]"); 4017 }); 4018 +/ 4019 4020 auto w = new DirectoryWatcher(FilePath("."), "*", false, (path, op) { 4021 writeln(path.path); 4022 }); 4023 getThisThreadEventLoop().run(() => false); 4024 } 4025 4026 /++ 4027 This starts up a local pipe. If it is already claimed, it just communicates with the existing one through the interface. 4028 +/ 4029 class SingleInstanceApplication { 4030 // FIXME 4031 } 4032 4033 version(none) 4034 void main() { 4035 4036 auto file = new AsyncFile(FilePath("test.txt"), AsyncFile.OpenMode.writeWithTruncation, AsyncFile.RequirePreexisting.yes); 4037 4038 auto buffer = cast(ubyte[]) "hello"; 4039 auto wr = new AsyncWriteRequest(file, buffer, 0); 4040 wr.start(); 4041 4042 wr.waitForCompletion(); 4043 4044 file.close(); 4045 } 4046 4047 /++ 4048 Implementation details of some requests. You shouldn't need to know any of this, the interface is all public. 4049 +/ 4050 mixin template OverlappedIoRequest(Response, LowLevelOperation) { 4051 private { 4052 LowLevelOperation llo; 4053 4054 OwnedClass!Response response; 4055 4056 version(Windows) { 4057 OVERLAPPED overlapped; 4058 4059 extern(Windows) 4060 static void overlappedCompletionRoutine(DWORD dwErrorCode, DWORD dwNumberOfBytesTransferred, LPOVERLAPPED lpOverlapped) { 4061 typeof(this) rr = cast(typeof(this)) (cast(void*) lpOverlapped - typeof(this).overlapped.offsetof); 4062 4063 rr.response = typeof(rr.response)(SystemErrorCode(dwErrorCode), rr.llo.buffer[0 .. dwNumberOfBytesTransferred]); 4064 rr.state_ = State.complete; 4065 4066 // FIXME: on complete? 4067 4068 // this will queue our CallbackHelper and that should be run at the end of the event loop after it is woken up by the APC run 4069 } 4070 } 4071 4072 version(Posix) { 4073 ICoreEventLoop.RearmToken eventRegistration; 4074 CallbackHelper cb; 4075 4076 final CallbackHelper getCb() { 4077 if(cb is null) 4078 cb = new CallbackHelper(&cbImpl); 4079 return cb; 4080 } 4081 4082 final void cbImpl() { 4083 // it is ready to complete, time to do it 4084 auto ret = llo(); 4085 markCompleted(ret, errno); 4086 } 4087 4088 void markCompleted(long ret, int errno) { 4089 // maybe i should queue an apc to actually do it, to ensure the event loop has cycled... FIXME 4090 if(ret == -1) 4091 response = typeof(response)(SystemErrorCode(errno), null); 4092 else 4093 response = typeof(response)(SystemErrorCode(0), llo.buffer[0 .. cast(size_t) ret]); 4094 state_ = State.complete; 4095 } 4096 } 4097 } 4098 4099 enum State { 4100 unused, 4101 started, 4102 inProgress, 4103 complete 4104 } 4105 private State state_; 4106 4107 override void start() { 4108 assert(state_ == State.unused); 4109 4110 state_ = State.started; 4111 4112 version(Windows) { 4113 if(llo(&overlapped, &overlappedCompletionRoutine)) { 4114 // all good, though GetLastError() might have some informative info 4115 } else { 4116 // operation failed, the operation is always ReadFileEx or WriteFileEx so it won't give the io pending thing here 4117 // should i issue error async? idk 4118 state_ = State.complete; 4119 throw new SystemApiException(llo.errorString(), GetLastError()); 4120 } 4121 4122 // ReadFileEx always queues, even if it completed synchronously. I *could* check the get overlapped result and sleepex here but i'm prolly better off just letting the event loop do its thing anyway. 4123 } else version(Posix) { 4124 4125 // first try to just do it 4126 auto ret = llo(); 4127 4128 auto errno = errno; 4129 if(ret == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) { // unable to complete right now, register and try when it is ready 4130 eventRegistration = getThisThreadEventLoop().addCallbackOnFdReadableOneShot(this.llo.file.handle, this.getCb); 4131 } else { 4132 // i could set errors sync or async and since it couldn't even start, i think a sync exception is the right way 4133 if(ret == -1) 4134 throw new SystemApiException(llo.errorString(), errno); 4135 markCompleted(ret, errno); // it completed synchronously (if it is an error nor not is handled by the completion handler) 4136 } 4137 } 4138 } 4139 4140 4141 override void cancel() { 4142 if(state_ == State.complete) 4143 return; // it has already finished, just leave it alone, no point discarding what is already done 4144 version(Windows) { 4145 if(state_ != State.unused) 4146 Win32Enforce!CancelIoEx(llo.file.AbstractFile.handle, &overlapped); 4147 // Windows will notify us when the cancellation is complete, so we need to wait for that before updating the state 4148 } else version(Posix) { 4149 if(state_ != State.unused) 4150 eventRegistration.unregister(); 4151 markCompleted(-1, ECANCELED); 4152 } 4153 } 4154 4155 override bool isComplete() { 4156 // just always let the event loop do it instead 4157 return state_ == State.complete; 4158 4159 /+ 4160 version(Windows) { 4161 return HasOverlappedIoCompleted(&overlapped); 4162 } else version(Posix) { 4163 return state_ == State.complete; 4164 4165 } 4166 +/ 4167 } 4168 4169 override Response waitForCompletion() { 4170 if(state_ == State.unused) 4171 start(); 4172 4173 // FIXME: if we are inside a fiber, we can set a oncomplete callback and then yield instead... 4174 if(state_ != State.complete) 4175 getThisThreadEventLoop().run(&isComplete); 4176 4177 /+ 4178 version(Windows) { 4179 SleepEx(INFINITE, true); 4180 4181 //DWORD numberTransferred; 4182 //Win32Enforce!GetOverlappedResult(file.handle, &overlapped, &numberTransferred, true); 4183 } else version(Posix) { 4184 getThisThreadEventLoop().run(&isComplete); 4185 } 4186 +/ 4187 4188 return response; 4189 } 4190 } 4191 4192 /++ 4193 You can write to a file asynchronously by creating one of these. 4194 +/ 4195 final class AsyncWriteRequest : AsyncOperationRequest { 4196 struct LowLevelOperation { 4197 AsyncFile file; 4198 ubyte[] buffer; 4199 long offset; 4200 4201 this(typeof(this.tupleof) args) { 4202 this.tupleof = args; 4203 } 4204 4205 version(Windows) { 4206 auto opCall(OVERLAPPED* overlapped, LPOVERLAPPED_COMPLETION_ROUTINE ocr) { 4207 overlapped.Offset = (cast(ulong) offset) & 0xffff_ffff; 4208 overlapped.OffsetHigh = ((cast(ulong) offset) >> 32) & 0xffff_ffff; 4209 return WriteFileEx(file.handle, buffer.ptr, cast(int) buffer.length, overlapped, ocr); 4210 } 4211 } else { 4212 auto opCall() { 4213 return core.sys.posix.unistd.write(file.handle, buffer.ptr, buffer.length); 4214 } 4215 } 4216 4217 string errorString() { 4218 return "Write"; 4219 } 4220 } 4221 mixin OverlappedIoRequest!(AsyncWriteResponse, LowLevelOperation); 4222 4223 this(AsyncFile file, ubyte[] buffer, long offset) { 4224 this.llo = LowLevelOperation(file, buffer, offset); 4225 response = typeof(response).defaultConstructed; 4226 } 4227 } 4228 4229 /++ 4230 4231 +/ 4232 class AsyncWriteResponse : AsyncOperationResponse { 4233 const ubyte[] bufferWritten; 4234 const SystemErrorCode errorCode; 4235 4236 this(SystemErrorCode errorCode, const(ubyte)[] bufferWritten) { 4237 this.errorCode = errorCode; 4238 this.bufferWritten = bufferWritten; 4239 } 4240 4241 override bool wasSuccessful() { 4242 return errorCode.wasSuccessful; 4243 } 4244 } 4245 4246 /++ 4247 4248 +/ 4249 final class AsyncReadRequest : AsyncOperationRequest { 4250 struct LowLevelOperation { 4251 AsyncFile file; 4252 ubyte[] buffer; 4253 long offset; 4254 4255 this(typeof(this.tupleof) args) { 4256 this.tupleof = args; 4257 } 4258 4259 version(Windows) { 4260 auto opCall(OVERLAPPED* overlapped, LPOVERLAPPED_COMPLETION_ROUTINE ocr) { 4261 overlapped.Offset = (cast(ulong) offset) & 0xffff_ffff; 4262 overlapped.OffsetHigh = ((cast(ulong) offset) >> 32) & 0xffff_ffff; 4263 return ReadFileEx(file.handle, buffer.ptr, cast(int) buffer.length, overlapped, ocr); 4264 } 4265 } else { 4266 auto opCall() { 4267 return core.sys.posix.unistd.read(file.handle, buffer.ptr, buffer.length); 4268 } 4269 } 4270 4271 string errorString() { 4272 return "Read"; 4273 } 4274 } 4275 mixin OverlappedIoRequest!(AsyncReadResponse, LowLevelOperation); 4276 4277 /++ 4278 The file must have the overlapped flag enabled on Windows and the nonblock flag set on Posix. 4279 4280 The buffer MUST NOT be touched by you - not used by another request, modified, read, or freed, including letting a static array going out of scope - until this request's `isComplete` returns `true`. 4281 4282 The offset is where to start reading a disk file. For all other types of files, pass 0. 4283 +/ 4284 this(AsyncFile file, ubyte[] buffer, long offset) { 4285 this.llo = LowLevelOperation(file, buffer, offset); 4286 response = typeof(response).defaultConstructed; 4287 } 4288 4289 /++ 4290 4291 +/ 4292 // abstract void repeat(); 4293 } 4294 4295 /++ 4296 4297 +/ 4298 class AsyncReadResponse : AsyncOperationResponse { 4299 const ubyte[] bufferRead; 4300 const SystemErrorCode errorCode; 4301 4302 this(SystemErrorCode errorCode, const(ubyte)[] bufferRead) { 4303 this.errorCode = errorCode; 4304 this.bufferRead = bufferRead; 4305 } 4306 4307 override bool wasSuccessful() { 4308 return errorCode.wasSuccessful; 4309 } 4310 } 4311 4312 /+ 4313 Tasks: 4314 startTask() 4315 startSubTask() - what if it just did this when it knows it is being run from inside a task? 4316 runHelperFunction() - whomever it reports to is the parent 4317 +/ 4318 4319 /+ 4320 class Task : Fiber { 4321 4322 } 4323 +/ 4324 4325 private class CoreWorkerThread : Thread { 4326 this(EventLoopType type) { 4327 this.type = type; 4328 4329 // task runners are supposed to have smallish stacks since they either just run a single callback or call into fibers 4330 // the helper runners might be a bit bigger tho 4331 super(&run); 4332 } 4333 void run() { 4334 eventLoop = getThisThreadEventLoop(this.type); 4335 atomicOp!"+="(startedCount, 1); 4336 atomicOp!"+="(runningCount, 1); 4337 scope(exit) { 4338 atomicOp!"-="(runningCount, 1); 4339 } 4340 4341 eventLoop.run(() => true); 4342 } 4343 4344 EventLoopType type; 4345 ICoreEventLoop eventLoop; 4346 4347 __gshared static { 4348 CoreWorkerThread[] taskRunners; 4349 CoreWorkerThread[] helperRunners; 4350 ICoreEventLoop mainThreadLoop; 4351 4352 // for the helper function thing on the bsds i could have my own little circular buffer of availability 4353 4354 shared(int) startedCount; 4355 shared(int) runningCount; 4356 4357 bool started; 4358 4359 void setup(int numberOfTaskRunners, int numberOfHelpers) { 4360 assert(!started); 4361 synchronized { 4362 mainThreadLoop = getThisThreadEventLoop(); 4363 4364 foreach(i; 0 .. numberOfTaskRunners) { 4365 auto nt = new CoreWorkerThread(EventLoopType.TaskRunner); 4366 taskRunners ~= nt; 4367 nt.start(); 4368 } 4369 foreach(i; 0 .. numberOfHelpers) { 4370 auto nt = new CoreWorkerThread(EventLoopType.HelperWorker); 4371 helperRunners ~= nt; 4372 nt.start(); 4373 } 4374 4375 const expectedCount = numberOfHelpers + numberOfTaskRunners; 4376 4377 while(startedCount < expectedCount) { 4378 Thread.yield(); 4379 } 4380 4381 started = true; 4382 } 4383 } 4384 } 4385 } 4386 4387 private int numberOfCpus() { 4388 return 4; // FIXME 4389 } 4390 4391 /++ 4392 To opt in to the full functionality of this module with customization opportunity, create one and only one of these objects that is valid for exactly the lifetime of the application. 4393 4394 Normally, this means writing a main like this: 4395 4396 --- 4397 import arsd.core; 4398 void main() { 4399 ArsdCoreApplication app = ArsdCoreApplication("Your app name"); 4400 4401 // do your setup here 4402 4403 // the rest of your code here 4404 } 4405 --- 4406 4407 Its destructor runs the event loop then waits to for the workers to finish to clean them up. 4408 +/ 4409 struct ArsdCoreApplication { 4410 private ICoreEventLoop impl; 4411 4412 /++ 4413 default number of threads is to split your cpus between blocking function runners and task runners 4414 +/ 4415 this(string applicationName) { 4416 auto num = numberOfCpus(); 4417 num /= 2; 4418 if(num <= 0) 4419 num = 1; 4420 this(applicationName, num, num); 4421 } 4422 4423 /++ 4424 4425 +/ 4426 this(string applicationName, int numberOfTaskRunners, int numberOfHelpers) { 4427 impl = getThisThreadEventLoop(EventLoopType.Explicit); 4428 CoreWorkerThread.setup(numberOfTaskRunners, numberOfHelpers); 4429 } 4430 4431 @disable this(); 4432 @disable this(this); 4433 /++ 4434 This must be deterministically destroyed. 4435 +/ 4436 @disable new(); 4437 4438 ~this() { 4439 run(); 4440 exitApplication(); 4441 waitForWorkersToExit(3000); 4442 } 4443 4444 void exitApplication() { 4445 4446 } 4447 4448 void waitForWorkersToExit(int timeoutMilliseconds) { 4449 4450 } 4451 4452 void run() { 4453 impl.run(() => true); 4454 } 4455 } 4456 4457 4458 private class CoreEventLoopImplementation : ICoreEventLoop { 4459 4460 version(Arsd_core_kqueue) { 4461 // this thread apc dispatches go as a custom event to the queue 4462 // the other queues go through one byte at a time pipes (barf). freebsd 13 and newest nbsd have eventfd too tho so maybe i can use them but the other kqueue systems don't. 4463 4464 void runOnce() { 4465 kevent_t[16] ev; 4466 //timespec tout = timespec(1, 0); 4467 auto nev = kevent(kqueuefd, null, 0, ev.ptr, ev.length, null/*&tout*/); 4468 if(nev == -1) { 4469 // FIXME: EINTR 4470 throw new SystemApiException("kevent", errno); 4471 } else if(nev == 0) { 4472 // timeout 4473 } else { 4474 foreach(event; ev[0 .. nev]) { 4475 if(event.filter == EVFILT_SIGNAL) { 4476 // FIXME: I could prolly do this better tbh 4477 markSignalOccurred(cast(int) event.ident); 4478 signalChecker(); 4479 } else { 4480 // FIXME: event.filter more specific? 4481 CallbackHelper cb = cast(CallbackHelper) event.udata; 4482 cb.call(); 4483 } 4484 } 4485 } 4486 } 4487 4488 // FIXME: idk how to make one event that multiple kqueues can listen to w/o being shared 4489 // maybe a shared kqueue could work that the thread kqueue listen to (which i rejected for 4490 // epoll cuz it caused thundering herd problems but maybe it'd work here) 4491 4492 UnregisterToken addCallbackOnFdReadable(int fd, CallbackHelper cb) { 4493 kevent_t ev; 4494 4495 EV_SET(&ev, fd, EVFILT_READ, EV_ADD | EV_ENABLE/* | EV_ONESHOT*/, 0, 0, cast(void*) cb); 4496 4497 ErrnoEnforce!kevent(kqueuefd, &ev, 1, null, 0, null); 4498 4499 return UnregisterToken(this, fd, cb); 4500 } 4501 4502 RearmToken addCallbackOnFdReadableOneShot(int fd, CallbackHelper cb) { 4503 kevent_t ev; 4504 4505 EV_SET(&ev, fd, EVFILT_READ, EV_ADD | EV_ENABLE/* | EV_ONESHOT*/, 0, 0, cast(void*) cb); 4506 4507 ErrnoEnforce!kevent(kqueuefd, &ev, 1, null, 0, null); 4508 4509 return RearmToken(true, this, fd, cb, 0); 4510 } 4511 4512 RearmToken addCallbackOnFdWritableOneShot(int fd, CallbackHelper cb) { 4513 kevent_t ev; 4514 4515 EV_SET(&ev, fd, EVFILT_WRITE, EV_ADD | EV_ENABLE/* | EV_ONESHOT*/, 0, 0, cast(void*) cb); 4516 4517 ErrnoEnforce!kevent(kqueuefd, &ev, 1, null, 0, null); 4518 4519 return RearmToken(false, this, fd, cb, 0); 4520 } 4521 4522 private void rearmFd(RearmToken token) { 4523 if(token.readable) 4524 cast(void) addCallbackOnFdReadableOneShot(token.fd, token.cb); 4525 else 4526 cast(void) addCallbackOnFdWritableOneShot(token.fd, token.cb); 4527 } 4528 4529 private void triggerGlobalEvent() { 4530 ubyte a; 4531 import core.sys.posix.unistd; 4532 write(kqueueGlobalFd[1], &a, 1); 4533 } 4534 4535 private this() { 4536 kqueuefd = ErrnoEnforce!kqueue(); 4537 setCloExec(kqueuefd); // FIXME O_CLOEXEC 4538 4539 if(kqueueGlobalFd[0] == 0) { 4540 import core.sys.posix.unistd; 4541 pipe(kqueueGlobalFd); 4542 setCloExec(kqueueGlobalFd[0]); 4543 setCloExec(kqueueGlobalFd[1]); 4544 4545 signal(SIGINT, SIG_IGN); // FIXME 4546 } 4547 4548 kevent_t ev; 4549 4550 EV_SET(&ev, SIGCHLD, EVFILT_SIGNAL, EV_ADD | EV_ENABLE, 0, 0, null); 4551 ErrnoEnforce!kevent(kqueuefd, &ev, 1, null, 0, null); 4552 EV_SET(&ev, SIGINT, EVFILT_SIGNAL, EV_ADD | EV_ENABLE, 0, 0, null); 4553 ErrnoEnforce!kevent(kqueuefd, &ev, 1, null, 0, null); 4554 4555 globalEventSent = new CallbackHelper(&readGlobalEvent); 4556 EV_SET(&ev, kqueueGlobalFd[0], EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, cast(void*) globalEventSent); 4557 ErrnoEnforce!kevent(kqueuefd, &ev, 1, null, 0, null); 4558 } 4559 4560 private int kqueuefd = -1; 4561 4562 private CallbackHelper globalEventSent; 4563 void readGlobalEvent() { 4564 kevent_t event; 4565 4566 import core.sys.posix.unistd; 4567 ubyte a; 4568 read(kqueueGlobalFd[0], &a, 1); 4569 4570 // FIXME: the thread is woken up, now we need to check the circualr buffer queue 4571 } 4572 4573 private __gshared int[2] kqueueGlobalFd; 4574 } 4575 4576 /+ 4577 // this setup needs no extra allocation 4578 auto op = read(file, buffer); 4579 op.oncomplete = &thisfiber.call; 4580 op.start(); 4581 thisfiber.yield(); 4582 auto result = op.waitForCompletion(); // guaranteed to return instantly thanks to previous setup 4583 4584 can generically abstract that into: 4585 4586 auto result = thisTask.await(read(file, buffer)); 4587 4588 4589 You MUST NOT use buffer in any way - not read, modify, deallocate, reuse, anything - until the PendingOperation is complete. 4590 4591 Note that PendingOperation may just be a wrapper around an internally allocated object reference... but then if you do a waitForFirstToComplete what happens? 4592 4593 those could of course just take the value type things 4594 +/ 4595 4596 4597 version(Arsd_core_windows) { 4598 // all event loops share the one iocp, Windows 4599 // manages how to do it 4600 __gshared HANDLE iocpTaskRunners; 4601 __gshared HANDLE iocpWorkers; 4602 4603 HANDLE[] handles; 4604 4605 // i think to terminate i just have to post the message at least once for every thread i know about, maybe a few more times for threads i don't know about. 4606 4607 bool isWorker; // if it is a worker we wait on the iocp, if not we wait on msg 4608 4609 void runOnce() { 4610 if(isWorker) { 4611 // this function is only supported on Windows Vista and up, so using this 4612 // means dropping support for XP. 4613 //GetQueuedCompletionStatusEx(); 4614 assert(0); // FIXME 4615 } else { 4616 auto wto = 0; 4617 4618 auto waitResult = MsgWaitForMultipleObjectsEx( 4619 cast(int) handles.length, handles.ptr, 4620 (wto == 0 ? INFINITE : wto), /* timeout */ 4621 0x04FF, /* QS_ALLINPUT */ 4622 0x0002 /* MWMO_ALERTABLE */ | 0x0004 /* MWMO_INPUTAVAILABLE */); 4623 4624 enum WAIT_OBJECT_0 = 0; 4625 if(waitResult >= WAIT_OBJECT_0 && waitResult < handles.length + WAIT_OBJECT_0) { 4626 auto h = handles[waitResult - WAIT_OBJECT_0]; 4627 // FIXME: run the handle ready callback 4628 } else if(waitResult == handles.length + WAIT_OBJECT_0) { 4629 // message ready 4630 int count; 4631 MSG message; 4632 while(PeekMessage(&message, null, 0, 0, PM_NOREMOVE)) { // need to peek since sometimes MsgWaitForMultipleObjectsEx returns even though GetMessage can block. tbh i don't fully understand it but the docs say it is foreground activation 4633 auto ret = GetMessage(&message, null, 0, 0); 4634 if(ret == -1) 4635 throw new WindowsApiException("GetMessage", GetLastError()); 4636 TranslateMessage(&message); 4637 DispatchMessage(&message); 4638 4639 count++; 4640 if(count > 10) 4641 break; // take the opportunity to catch up on other events 4642 4643 if(ret == 0) { // WM_QUIT 4644 // EventLoop.quitApplication(); 4645 assert(0); // FIXME 4646 //break; 4647 } 4648 } 4649 } else if(waitResult == 0x000000C0L /* WAIT_IO_COMPLETION */) { 4650 SleepEx(0, true); // I call this to give it a chance to do stuff like async io 4651 } else if(waitResult == 258L /* WAIT_TIMEOUT */) { 4652 // timeout, should never happen since we aren't using it 4653 } else if(waitResult == 0xFFFFFFFF) { 4654 // failed 4655 throw new WindowsApiException("MsgWaitForMultipleObjectsEx", GetLastError()); 4656 } else { 4657 // idk.... 4658 } 4659 } 4660 } 4661 } 4662 4663 version(Posix) { 4664 private __gshared uint sigChildHappened = 0; 4665 private __gshared uint sigIntrHappened = 0; 4666 4667 static void signalChecker() { 4668 if(cas(&sigChildHappened, 1, 0)) { 4669 while(true) { // multiple children could have exited before we processed the notification 4670 4671 import core.sys.posix.sys.wait; 4672 4673 int status; 4674 auto pid = waitpid(-1, &status, WNOHANG); 4675 if(pid == -1) { 4676 import core.stdc.errno; 4677 auto errno = errno; 4678 if(errno == ECHILD) 4679 break; // also all done, there are no children left 4680 // no need to check EINTR since we set WNOHANG 4681 throw new ErrnoApiException("waitpid", errno); 4682 } 4683 if(pid == 0) 4684 break; // all done, all children are still running 4685 4686 // look up the pid for one of our objects 4687 // if it is found, inform it of its status 4688 // and then inform its controlling thread 4689 // to wake up so it can check its waitForCompletion, 4690 // trigger its callbacks, etc. 4691 4692 ExternalProcess.recordChildTerminated(pid, status); 4693 } 4694 4695 } 4696 if(cas(&sigIntrHappened, 1, 0)) { 4697 // FIXME 4698 import core.stdc.stdlib; 4699 exit(0); 4700 } 4701 } 4702 4703 /++ 4704 Informs the arsd.core system that the given signal happened. You can call this from inside a signal handler. 4705 +/ 4706 public static void markSignalOccurred(int sigNumber) nothrow { 4707 import core.sys.posix.unistd; 4708 4709 if(sigNumber == SIGCHLD) 4710 volatileStore(&sigChildHappened, 1); 4711 if(sigNumber == SIGINT) 4712 volatileStore(&sigIntrHappened, 1); 4713 4714 version(Arsd_core_epoll) { 4715 ulong writeValue = 1; 4716 write(signalPipeFd, &writeValue, writeValue.sizeof); 4717 } 4718 } 4719 } 4720 4721 version(Arsd_core_epoll) { 4722 4723 import core.sys.linux.epoll; 4724 import core.sys.linux.sys.eventfd; 4725 4726 private this() { 4727 4728 if(!globalsInitialized) { 4729 synchronized { 4730 if(!globalsInitialized) { 4731 // blocking signals is problematic because it is inherited by child processes 4732 // and that can be problematic for general purpose stuff so i use a self pipe 4733 // here. though since it is linux, im using an eventfd instead just to notify 4734 signalPipeFd = ErrnoEnforce!eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); 4735 signalReaderCallback = new CallbackHelper(&signalReader); 4736 4737 runInTaskRunnerQueue = new CallbackQueue("task runners", true); 4738 runInHelperThreadQueue = new CallbackQueue("helper threads", true); 4739 4740 setSignalHandlers(); 4741 4742 globalsInitialized = true; 4743 } 4744 } 4745 } 4746 4747 epollfd = epoll_create1(EPOLL_CLOEXEC); 4748 4749 // FIXME: ensure UI events get top priority 4750 4751 // global listeners 4752 4753 // FIXME: i should prolly keep the tokens and release them when tearing down. 4754 4755 cast(void) addCallbackOnFdReadable(signalPipeFd, signalReaderCallback); 4756 if(true) { // FIXME: if this is a task runner vs helper thread vs ui thread 4757 cast(void) addCallbackOnFdReadable(runInTaskRunnerQueue.fd, runInTaskRunnerQueue.callback); 4758 runInTaskRunnerQueue.callback.addref(); 4759 } else { 4760 cast(void) addCallbackOnFdReadable(runInHelperThreadQueue.fd, runInHelperThreadQueue.callback); 4761 runInHelperThreadQueue.callback.addref(); 4762 } 4763 4764 // local listener 4765 thisThreadQueue = new CallbackQueue("this thread", false); 4766 cast(void) addCallbackOnFdReadable(thisThreadQueue.fd, thisThreadQueue.callback); 4767 4768 // what are we going to do about timers? 4769 } 4770 4771 void teardown() { 4772 import core.sys.posix.fcntl; 4773 import core.sys.posix.unistd; 4774 4775 close(epollfd); 4776 epollfd = -1; 4777 4778 thisThreadQueue.teardown(); 4779 4780 // FIXME: should prolly free anything left in the callback queue, tho those could also be GC managed tbh. 4781 } 4782 4783 /+ // i call it explicitly at the thread exit instead, but worker threads aren't really supposed to exit generally speaking till process done anyway 4784 static ~this() { 4785 teardown(); 4786 } 4787 +/ 4788 4789 static void teardownGlobals() { 4790 import core.sys.posix.fcntl; 4791 import core.sys.posix.unistd; 4792 4793 synchronized { 4794 restoreSignalHandlers(); 4795 close(signalPipeFd); 4796 signalReaderCallback.release(); 4797 4798 runInTaskRunnerQueue.teardown(); 4799 runInHelperThreadQueue.teardown(); 4800 4801 globalsInitialized = false; 4802 } 4803 4804 } 4805 4806 4807 private static final class CallbackQueue { 4808 int fd = -1; 4809 string name; 4810 CallbackHelper callback; 4811 SynchronizedCircularBuffer!CallbackHelper queue; 4812 4813 this(string name, bool dequeueIsShared) { 4814 this.name = name; 4815 queue = typeof(queue)(this); 4816 4817 fd = ErrnoEnforce!eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK | (dequeueIsShared ? EFD_SEMAPHORE : 0)); 4818 4819 callback = new CallbackHelper(dequeueIsShared ? &sharedDequeueCb : &threadLocalDequeueCb); 4820 } 4821 4822 bool resetEvent() { 4823 import core.sys.posix.unistd; 4824 ulong count; 4825 return read(fd, &count, count.sizeof) == count.sizeof; 4826 } 4827 4828 void sharedDequeueCb() { 4829 if(resetEvent()) { 4830 auto cb = queue.dequeue(); 4831 cb.call(); 4832 cb.release(); 4833 } 4834 } 4835 4836 void threadLocalDequeueCb() { 4837 CallbackHelper[16] buffer; 4838 foreach(cb; queue.dequeueSeveral(buffer[], () { resetEvent(); })) { 4839 cb.call(); 4840 cb.release(); 4841 } 4842 } 4843 4844 void enqueue(CallbackHelper cb) { 4845 if(queue.enqueue(cb)) { 4846 import core.sys.posix.unistd; 4847 ulong count = 1; 4848 ErrnoEnforce!write(fd, &count, count.sizeof); 4849 } else { 4850 throw new ArsdException!"queue is full"(name); 4851 } 4852 } 4853 4854 void teardown() { 4855 import core.sys.posix.fcntl; 4856 import core.sys.posix.unistd; 4857 4858 close(fd); 4859 fd = -1; 4860 4861 callback.release(); 4862 } 4863 } 4864 4865 // there's a global instance of this we refer back to 4866 private __gshared { 4867 bool globalsInitialized; 4868 4869 CallbackHelper signalReaderCallback; 4870 4871 CallbackQueue runInTaskRunnerQueue; 4872 CallbackQueue runInHelperThreadQueue; 4873 4874 int exitEventFd = -1; // FIXME: implement 4875 } 4876 4877 // and then the local loop 4878 private { 4879 int epollfd = -1; 4880 4881 CallbackQueue thisThreadQueue; 4882 } 4883 4884 // signal stuff { 4885 import core.sys.posix.signal; 4886 4887 private __gshared sigaction_t oldSigIntr; 4888 private __gshared sigaction_t oldSigChld; 4889 private __gshared sigaction_t oldSigPipe; 4890 4891 private __gshared int signalPipeFd = -1; 4892 // sigpipe not important, i handle errors on the writes 4893 4894 public static void setSignalHandlers() { 4895 static extern(C) void interruptHandler(int sigNumber) nothrow { 4896 markSignalOccurred(sigNumber); 4897 4898 /+ 4899 // calling the old handler is non-trivial since there can be ignore 4900 // or default or a plain handler or a sigaction 3 arg handler and i 4901 // i don't think it is worth teh complication 4902 sigaction_t* oldHandler; 4903 if(sigNumber == SIGCHLD) 4904 oldHandler = &oldSigChld; 4905 else if(sigNumber == SIGINT) 4906 oldHandler = &oldSigIntr; 4907 if(oldHandler && oldHandler.sa_handler) 4908 oldHandler 4909 +/ 4910 } 4911 4912 sigaction_t n; 4913 n.sa_handler = &interruptHandler; 4914 n.sa_mask = cast(sigset_t) 0; 4915 n.sa_flags = 0; 4916 sigaction(SIGINT, &n, &oldSigIntr); 4917 sigaction(SIGCHLD, &n, &oldSigChld); 4918 4919 n.sa_handler = SIG_IGN; 4920 sigaction(SIGPIPE, &n, &oldSigPipe); 4921 } 4922 4923 public static void restoreSignalHandlers() { 4924 sigaction(SIGINT, &oldSigIntr, null); 4925 sigaction(SIGCHLD, &oldSigChld, null); 4926 sigaction(SIGPIPE, &oldSigPipe, null); 4927 } 4928 4929 private static void signalReader() { 4930 import core.sys.posix.unistd; 4931 ulong number; 4932 read(signalPipeFd, &number, number.sizeof); 4933 4934 signalChecker(); 4935 } 4936 // signal stuff done } 4937 4938 // the any thread poll is just registered in the this thread poll w/ exclusive. nobody actaully epoll_waits 4939 // on the global one directly. 4940 4941 void runOnce() { 4942 epoll_event[16] events; 4943 auto ret = epoll_wait(epollfd, events.ptr, cast(int) events.length, -1); // FIXME: timeout 4944 if(ret == -1) { 4945 import core.stdc.errno; 4946 if(errno == EINTR) { 4947 return; 4948 } 4949 throw new ErrnoApiException("epoll_wait", errno); 4950 } else if(ret == 0) { 4951 // timeout 4952 } else { 4953 // loop events and call associated callbacks 4954 foreach(event; events[0 .. ret]) { 4955 auto flags = event.events; 4956 auto cbObject = cast(CallbackHelper) event.data.ptr; 4957 4958 // FIXME: or if it is an error... 4959 // EPOLLERR - write end of pipe when read end closed or other error. and EPOLLHUP - terminal hangup or read end when write end close (but it will give 0 reading after that soon anyway) 4960 4961 cbObject.call(); 4962 } 4963 } 4964 } 4965 4966 // building blocks for low-level integration with the loop 4967 4968 UnregisterToken addCallbackOnFdReadable(int fd, CallbackHelper cb) { 4969 epoll_event event; 4970 event.data.ptr = cast(void*) cb; 4971 event.events = EPOLLIN | EPOLLEXCLUSIVE; 4972 if(epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event) == -1) 4973 throw new ErrnoApiException("epoll_ctl", errno); 4974 4975 return UnregisterToken(this, fd, cb); 4976 } 4977 4978 /++ 4979 Adds a one-off callback that you can optionally rearm when it happens. 4980 +/ 4981 RearmToken addCallbackOnFdReadableOneShot(int fd, CallbackHelper cb) { 4982 epoll_event event; 4983 event.data.ptr = cast(void*) cb; 4984 event.events = EPOLLIN | EPOLLONESHOT; 4985 if(epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event) == -1) 4986 throw new ErrnoApiException("epoll_ctl", errno); 4987 4988 return RearmToken(true, this, fd, cb, EPOLLIN | EPOLLONESHOT); 4989 } 4990 4991 /++ 4992 Adds a one-off callback that you can optionally rearm when it happens. 4993 +/ 4994 RearmToken addCallbackOnFdWritableOneShot(int fd, CallbackHelper cb) { 4995 epoll_event event; 4996 event.data.ptr = cast(void*) cb; 4997 event.events = EPOLLOUT | EPOLLONESHOT; 4998 if(epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event) == -1) 4999 throw new ErrnoApiException("epoll_ctl", errno); 5000 5001 return RearmToken(false, this, fd, cb, EPOLLOUT | EPOLLONESHOT); 5002 } 5003 5004 private void unregisterFd(int fd) { 5005 epoll_event event; 5006 if(epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, &event) == -1) 5007 throw new ErrnoApiException("epoll_ctl", errno); 5008 } 5009 5010 private void rearmFd(RearmToken token) { 5011 epoll_event event; 5012 event.data.ptr = cast(void*) token.cb; 5013 event.events = token.flags; 5014 if(epoll_ctl(epollfd, EPOLL_CTL_MOD, token.fd, &event) == -1) 5015 throw new ErrnoApiException("epoll_ctl", errno); 5016 } 5017 5018 // Disk files will have to be sent as messages to a worker to do the read and report back a completion packet. 5019 } 5020 5021 version(Arsd_core_kqueue) { 5022 // FIXME 5023 } 5024 5025 // cross platform adapters 5026 void setTimeout() {} 5027 void addFileOrDirectoryChangeListener(FilePath name, uint flags, bool recursive = false) {} 5028 } 5029 5030 // deduplication???????// 5031 bool postMessage(ThreadToRunIn destination, void delegate() code) { 5032 return false; 5033 } 5034 bool postMessage(ThreadToRunIn destination, Object message) { 5035 return false; 5036 } 5037 5038 /+ 5039 void main() { 5040 // FIXME: the offset doesn't seem to be done right 5041 auto file = new AsyncFile(FilePath("test.txt"), AsyncFile.OpenMode.writeWithTruncation); 5042 file.write("hello", 10).waitForCompletion(); 5043 } 5044 +/ 5045 5046 // to test the mailboxes 5047 /+ 5048 void main() { 5049 /+ 5050 import std.stdio; 5051 Thread[4] pool; 5052 5053 bool shouldExit; 5054 5055 static int received; 5056 5057 static void tester() { 5058 received++; 5059 //writeln(cast(void*) Thread.getThis, " ", received); 5060 } 5061 5062 foreach(ref thread; pool) { 5063 thread = new Thread(() { 5064 getThisThreadEventLoop().run(() { 5065 return shouldExit; 5066 }); 5067 }); 5068 thread.start(); 5069 } 5070 5071 getThisThreadEventLoop(); // ensure it is all initialized before proceeding. FIXME: i should have an ensure initialized function i do on most the public apis. 5072 5073 int lol; 5074 5075 try 5076 foreach(i; 0 .. 6000) { 5077 CoreEventLoopImplementation.runInTaskRunnerQueue.enqueue(new CallbackHelper(&tester)); 5078 lol = cast(int) i; 5079 } 5080 catch(ArsdExceptionBase e) { 5081 Thread.sleep(50.msecs); 5082 writeln(e); 5083 writeln(lol); 5084 } 5085 5086 import core.stdc.stdlib; 5087 exit(0); 5088 5089 version(none) 5090 foreach(i; 0 .. 100) 5091 CoreEventLoopImplementation.runInTaskRunnerQueue.enqueue(new CallbackHelper(&tester)); 5092 5093 5094 foreach(ref thread; pool) { 5095 thread.join(); 5096 } 5097 +/ 5098 5099 5100 static int received; 5101 5102 static void tester() { 5103 received++; 5104 //writeln(cast(void*) Thread.getThis, " ", received); 5105 } 5106 5107 5108 5109 auto ev = cast(CoreEventLoopImplementation) getThisThreadEventLoop(); 5110 foreach(i; 0 .. 100) 5111 ev.thisThreadQueue.enqueue(new CallbackHelper(&tester)); 5112 foreach(i; 0 .. 100 / 16 + 1) 5113 ev.runOnce(); 5114 import std.conv; 5115 assert(received == 100, to!string(received)); 5116 5117 } 5118 +/ 5119 5120 /++ 5121 This is primarily a helper for the event queues. It is public in the hope it might be useful, 5122 but subject to change without notice; I will treat breaking it the same as if it is private. 5123 (That said, it is a simple little utility that does its job, so it is unlikely to change much. 5124 The biggest change would probably be letting it grow and changing from inline to dynamic array.) 5125 5126 It is a fixed-size ring buffer that synchronizes on a given object you give it in the constructor. 5127 5128 After enqueuing something, you should probably set an event to notify the other threads. This is left 5129 as an exercise to you (or another wrapper). 5130 +/ 5131 struct SynchronizedCircularBuffer(T, size_t maxSize = 128) { 5132 private T[maxSize] ring; 5133 private int front; 5134 private int back; 5135 5136 private Object synchronizedOn; 5137 5138 @disable this(); 5139 5140 /++ 5141 The Object's monitor is used to synchronize the methods in here. 5142 +/ 5143 this(Object synchronizedOn) { 5144 this.synchronizedOn = synchronizedOn; 5145 } 5146 5147 /++ 5148 Note the potential race condition between calling this and actually dequeuing something. You might 5149 want to acquire the lock on the object before calling this (nested synchronized things are allowed 5150 as long as the same thread is the one doing it). 5151 +/ 5152 bool isEmpty() { 5153 synchronized(this.synchronizedOn) { 5154 return front == back; 5155 } 5156 } 5157 5158 /++ 5159 Note the potential race condition between calling this and actually queuing something. 5160 +/ 5161 bool isFull() { 5162 synchronized(this.synchronizedOn) { 5163 return isFullUnsynchronized(); 5164 } 5165 } 5166 5167 private bool isFullUnsynchronized() nothrow const { 5168 return ((back + 1) % ring.length) == front; 5169 5170 } 5171 5172 /++ 5173 If this returns true, you should signal listening threads (with an event or a semaphore, 5174 depending on how you dequeue it). If it returns false, the queue was full and your thing 5175 was NOT added. You might wait and retry later (you could set up another event to signal it 5176 has been read and wait for that, or maybe try on a timer), or just fail and throw an exception 5177 or to abandon the message. 5178 +/ 5179 bool enqueue(T what) { 5180 synchronized(this.synchronizedOn) { 5181 if(isFullUnsynchronized()) 5182 return false; 5183 ring[(back++) % ring.length] = what; 5184 return true; 5185 } 5186 } 5187 5188 private T dequeueUnsynchronized() nothrow { 5189 assert(front != back); 5190 return ring[(front++) % ring.length]; 5191 } 5192 5193 /++ 5194 If you are using a semaphore to signal, you can call this once for each count of it 5195 and you can do that separately from this call (though they should be paired). 5196 5197 If you are using an event, you should use [dequeueSeveral] instead to drain it. 5198 +/ 5199 T dequeue() { 5200 synchronized(this.synchronizedOn) { 5201 return dequeueUnsynchronized(); 5202 } 5203 } 5204 5205 /++ 5206 Note that if you use a semaphore to signal waiting threads, you should probably not call this. 5207 5208 If you use a set/reset event, there's a potential race condition between the dequeue and event 5209 reset. This is why the `runInsideLockIfEmpty` delegate is there - when it is empty, before it 5210 unlocks, it will give you a chance to reset the event. Otherwise, it can remain set to indicate 5211 that there's still pending data in the queue. 5212 +/ 5213 T[] dequeueSeveral(return T[] buffer, scope void delegate() runInsideLockIfEmpty = null) { 5214 int pos; 5215 synchronized(this.synchronizedOn) { 5216 while(pos < buffer.length && front != back) { 5217 buffer[pos++] = dequeueUnsynchronized(); 5218 } 5219 if(front == back && runInsideLockIfEmpty !is null) 5220 runInsideLockIfEmpty(); 5221 } 5222 return buffer[0 .. pos]; 5223 } 5224 } 5225 5226 unittest { 5227 Object object = new Object(); 5228 auto queue = SynchronizedCircularBuffer!CallbackHelper(object); 5229 assert(queue.isEmpty); 5230 foreach(i; 0 .. queue.ring.length - 1) 5231 queue.enqueue(cast(CallbackHelper) cast(void*) i); 5232 assert(queue.isFull); 5233 5234 foreach(i; 0 .. queue.ring.length - 1) 5235 assert(queue.dequeue() is (cast(CallbackHelper) cast(void*) i)); 5236 assert(queue.isEmpty); 5237 5238 foreach(i; 0 .. queue.ring.length - 1) 5239 queue.enqueue(cast(CallbackHelper) cast(void*) i); 5240 assert(queue.isFull); 5241 5242 CallbackHelper[] buffer = new CallbackHelper[](300); 5243 auto got = queue.dequeueSeveral(buffer); 5244 assert(got.length == queue.ring.length - 1); 5245 assert(queue.isEmpty); 5246 foreach(i, item; got) 5247 assert(item is (cast(CallbackHelper) cast(void*) i)); 5248 5249 foreach(i; 0 .. 8) 5250 queue.enqueue(cast(CallbackHelper) cast(void*) i); 5251 buffer = new CallbackHelper[](4); 5252 got = queue.dequeueSeveral(buffer); 5253 assert(got.length == 4); 5254 foreach(i, item; got) 5255 assert(item is (cast(CallbackHelper) cast(void*) i)); 5256 got = queue.dequeueSeveral(buffer); 5257 assert(got.length == 4); 5258 foreach(i, item; got) 5259 assert(item is (cast(CallbackHelper) cast(void*) (i+4))); 5260 got = queue.dequeueSeveral(buffer); 5261 assert(got.length == 0); 5262 assert(queue.isEmpty); 5263 } 5264 5265 /++ 5266 5267 +/ 5268 enum ByteOrder { 5269 irrelevant, 5270 littleEndian, 5271 bigEndian, 5272 } 5273 5274 /++ 5275 A class to help write a stream of binary data to some target. 5276 5277 NOT YET FUNCTIONAL 5278 +/ 5279 class WritableStream { 5280 /++ 5281 5282 +/ 5283 this(size_t bufferSize) { 5284 this(new ubyte[](bufferSize)); 5285 } 5286 5287 /// ditto 5288 this(ubyte[] buffer) { 5289 this.buffer = buffer; 5290 } 5291 5292 /++ 5293 5294 +/ 5295 final void put(T)(T value, ByteOrder byteOrder = ByteOrder.irrelevant, string file = __FILE__, size_t line = __LINE__) { 5296 static if(T.sizeof == 8) 5297 ulong b; 5298 else static if(T.sizeof == 4) 5299 uint b; 5300 else static if(T.sizeof == 2) 5301 ushort b; 5302 else static if(T.sizeof == 1) 5303 ubyte b; 5304 else static assert(0, "unimplemented type, try using just the basic types"); 5305 5306 if(byteOrder == ByteOrder.irrelevant && T.sizeof > 1) 5307 throw new InvalidArgumentsException("byteOrder", "byte order must be specified for type " ~ T.stringof ~ " because it is bigger than one byte", "WritableStream.put", file, line); 5308 5309 final switch(byteOrder) { 5310 case ByteOrder.irrelevant: 5311 writeOneByte(b); 5312 break; 5313 case ByteOrder.littleEndian: 5314 foreach(i; 0 .. T.sizeof) { 5315 writeOneByte(b & 0xff); 5316 b >>= 8; 5317 } 5318 break; 5319 case ByteOrder.bigEndian: 5320 int amount = T.sizeof * 8 - 8; 5321 foreach(i; 0 .. T.sizeof) { 5322 writeOneByte((b >> amount) & 0xff); 5323 amount -= 8; 5324 } 5325 break; 5326 } 5327 } 5328 5329 /// ditto 5330 final void put(T : E[], E)(T value, ByteOrder elementByteOrder = ByteOrder.irrelevant, string file = __FILE__, size_t line = __LINE__) { 5331 foreach(item; value) 5332 put(item, elementByteOrder, file, line); 5333 } 5334 5335 /++ 5336 Performs a final flush() call, then marks the stream as closed, meaning no further data will be written to it. 5337 +/ 5338 void close() { 5339 isClosed_ = true; 5340 } 5341 5342 /++ 5343 Writes what is currently in the buffer to the target and waits for the target to accept it. 5344 Please note: if you are subclassing this to go to a different target 5345 +/ 5346 void flush() {} 5347 5348 /++ 5349 Returns true if either you closed it or if the receiving end closed their side, indicating they 5350 don't want any more data. 5351 +/ 5352 bool isClosed() { 5353 return isClosed_; 5354 } 5355 5356 // hasRoomInBuffer 5357 // canFlush 5358 // waitUntilCanFlush 5359 5360 // flushImpl 5361 // markFinished / close - tells the other end you're done 5362 5363 private final writeOneByte(ubyte value) { 5364 if(bufferPosition == buffer.length) 5365 flush(); 5366 5367 buffer[bufferPosition++] = value; 5368 } 5369 5370 5371 private { 5372 ubyte[] buffer; 5373 int bufferPosition; 5374 bool isClosed_; 5375 } 5376 } 5377 5378 /++ 5379 A stream can be used by just one task at a time, but one task can consume multiple streams. 5380 5381 Streams may be populated by async sources (in which case they must be called from a fiber task), 5382 from a function generating the data on demand (including an input range), from memory, or from a synchronous file. 5383 5384 A stream of heterogeneous types is compatible with input ranges. 5385 5386 It reads binary data. 5387 +/ 5388 class ReadableStream { 5389 5390 this() { 5391 5392 } 5393 5394 /++ 5395 Gets data of the specified type `T` off the stream. The byte order of the T on the stream must be specified unless it is irrelevant (e.g. single byte entries). 5396 5397 --- 5398 // get an int out of a big endian stream 5399 int i = stream.get!int(ByteOrder.bigEndian); 5400 5401 // get i bytes off the stream 5402 ubyte[] data = stream.get!(ubyte[])(i); 5403 --- 5404 +/ 5405 final T get(T)(ByteOrder byteOrder = ByteOrder.irrelevant, string file = __FILE__, size_t line = __LINE__) { 5406 if(byteOrder == ByteOrder.irrelevant && T.sizeof > 1) 5407 throw new InvalidArgumentsException("byteOrder", "byte order must be specified for type " ~ T.stringof ~ " because it is bigger than one byte", "ReadableStream.get", file, line); 5408 5409 // FIXME: what if it is a struct? 5410 5411 while(bufferedLength() < T.sizeof) 5412 waitForAdditionalData(); 5413 5414 static if(T.sizeof == 1) { 5415 ubyte ret = consumeOneByte(); 5416 return *cast(T*) &ret; 5417 } else { 5418 static if(T.sizeof == 8) 5419 ulong ret; 5420 else static if(T.sizeof == 4) 5421 uint ret; 5422 else static if(T.sizeof == 2) 5423 ushort ret; 5424 else static assert(0, "unimplemented type, try using just the basic types"); 5425 5426 if(byteOrder == ByteOrder.littleEndian) { 5427 typeof(ret) buffer; 5428 foreach(b; 0 .. T.sizeof) { 5429 buffer = consumeOneByte(); 5430 buffer <<= T.sizeof * 8 - 8; 5431 5432 ret >>= 8; 5433 ret |= buffer; 5434 } 5435 } else { 5436 foreach(b; 0 .. T.sizeof) { 5437 ret <<= 8; 5438 ret |= consumeOneByte(); 5439 } 5440 } 5441 5442 return *cast(T*) &ret; 5443 } 5444 } 5445 5446 /// ditto 5447 final T get(T : E[], E)(size_t length, ByteOrder elementByteOrder = ByteOrder.irrelevant, string file = __FILE__, size_t line = __LINE__) { 5448 if(elementByteOrder == ByteOrder.irrelevant && E.sizeof > 1) 5449 throw new InvalidArgumentsException("elementByteOrder", "byte order must be specified for type " ~ E.stringof ~ " because it is bigger than one byte", "ReadableStream.get", file, line); 5450 5451 // if the stream is closed before getting the length or the terminator, should we send partial stuff 5452 // or just throw? 5453 5454 while(bufferedLength() < length * E.sizeof) 5455 waitForAdditionalData(); 5456 5457 T ret; 5458 5459 ret.length = length; 5460 5461 if(false && elementByteOrder == ByteOrder.irrelevant) { 5462 // ret[] = 5463 // FIXME: can prolly optimize 5464 } else { 5465 foreach(i; 0 .. length) 5466 ret[i] = get!E(elementByteOrder); 5467 } 5468 5469 return ret; 5470 5471 } 5472 5473 /// ditto 5474 final T get(T : E[], E)(scope bool delegate(E e) isTerminatingSentinel, ByteOrder elementByteOrder = ByteOrder.irrelevant, string file = __FILE__, size_t line = __LINE__) { 5475 if(byteOrder == ByteOrder.irrelevant && E.sizeof > 1) 5476 throw new InvalidArgumentsException("elementByteOrder", "byte order must be specified for type " ~ E.stringof ~ " because it is bigger than one byte", "ReadableStream.get", file, line); 5477 5478 assert(0, "Not implemented"); 5479 } 5480 5481 /++ 5482 5483 +/ 5484 bool isClosed() { 5485 return isClosed_; 5486 } 5487 5488 // Control side of things 5489 5490 private bool isClosed_; 5491 5492 /++ 5493 Feeds data into the stream, which can be consumed by `get`. If a task is waiting for more 5494 data to satisfy its get requests, this will trigger those tasks to resume. 5495 5496 If you feed it empty data, it will mark the stream as closed. 5497 +/ 5498 void feedData(ubyte[] data) { 5499 if(data.length == 0) 5500 isClosed_ = true; 5501 5502 currentBuffer = data; 5503 // this is a borrowed buffer, so we won't keep the reference long term 5504 scope(exit) 5505 currentBuffer = null; 5506 5507 if(waitingTask !is null) { 5508 waitingTask.call(); 5509 } 5510 } 5511 5512 /++ 5513 You basically have to use this thing from a task 5514 +/ 5515 protected void waitForAdditionalData() { 5516 Fiber task = Fiber.getThis; 5517 5518 assert(task !is null); 5519 5520 if(waitingTask !is null && waitingTask !is task) 5521 throw new ArsdException!"streams can only have one waiting task"; 5522 5523 // copy any pending data in our buffer to the longer-term buffer 5524 if(currentBuffer.length) 5525 leftoverBuffer ~= currentBuffer; 5526 5527 waitingTask = task; 5528 task.yield(); 5529 } 5530 5531 private Fiber waitingTask; 5532 private ubyte[] leftoverBuffer; 5533 private ubyte[] currentBuffer; 5534 5535 private size_t bufferedLength() { 5536 return leftoverBuffer.length + currentBuffer.length; 5537 } 5538 5539 private ubyte consumeOneByte() { 5540 ubyte b; 5541 if(leftoverBuffer.length) { 5542 b = leftoverBuffer[0]; 5543 leftoverBuffer = leftoverBuffer[1 .. $]; 5544 } else if(currentBuffer.length) { 5545 b = currentBuffer[0]; 5546 currentBuffer = currentBuffer[1 .. $]; 5547 } else { 5548 assert(0, "consuming off an empty buffer is impossible"); 5549 } 5550 5551 return b; 5552 } 5553 } 5554 5555 // FIXME: do a stringstream too 5556 5557 unittest { 5558 auto stream = new ReadableStream(); 5559 5560 int position; 5561 char[16] errorBuffer; 5562 5563 auto fiber = new Fiber(() { 5564 position = 1; 5565 int a = stream.get!int(ByteOrder.littleEndian); 5566 assert(a == 10, intToString(a, errorBuffer[])); 5567 position = 2; 5568 ubyte b = stream.get!ubyte; 5569 assert(b == 33); 5570 position = 3; 5571 5572 // ubyte[] c = stream.get!(ubyte[])(3); 5573 // int[] d = stream.get!(int[])(3); 5574 }); 5575 5576 fiber.call(); 5577 assert(position == 1); 5578 stream.feedData([10, 0, 0, 0]); 5579 assert(position == 2); 5580 stream.feedData([33]); 5581 assert(position == 3); 5582 5583 // stream.feedData([1,2,3]); 5584 // stream.feedData([1,2,3,4,1,2,3,4,1,2,3,4]); 5585 } 5586 5587 /++ 5588 UNSTABLE, NOT FULLY IMPLEMENTED. DO NOT USE YET. 5589 5590 You might use this like: 5591 5592 --- 5593 auto proc = new ExternalProcess(); 5594 auto stdoutStream = new ReadableStream(); 5595 5596 // to use a stream you can make one and have a task consume it 5597 runTask({ 5598 while(!stdoutStream.isClosed) { 5599 auto line = stdoutStream.get!string(e => e == '\n'); 5600 } 5601 }); 5602 5603 // then make the process feed into the stream 5604 proc.onStdoutAvailable = (got) { 5605 stdoutStream.feedData(got); // send it to the stream for processing 5606 stdout.rawWrite(got); // forward it through to our own thing 5607 // could also append it to a buffer to return it on complete 5608 }; 5609 proc.start(); 5610 --- 5611 5612 Please note that this does not currently and I have no plans as of this writing to add support for any kind of direct file descriptor passing. It always pipes them back to the parent for processing. If you don't want this, call the lower level functions yourself; the reason this class is here is to aid integration in the arsd.core event loop. Of course, I might change my mind on this. 5613 5614 Bugs: 5615 Not implemented at all on Windows yet. 5616 +/ 5617 class ExternalProcess /*: AsyncOperationRequest*/ { 5618 5619 private static version(Posix) { 5620 __gshared ExternalProcess[pid_t] activeChildren; 5621 5622 void recordChildCreated(pid_t pid, ExternalProcess proc) { 5623 synchronized(typeid(ExternalProcess)) { 5624 activeChildren[pid] = proc; 5625 } 5626 } 5627 5628 void recordChildTerminated(pid_t pid, int status) { 5629 synchronized(typeid(ExternalProcess)) { 5630 if(pid in activeChildren) { 5631 auto ac = activeChildren[pid]; 5632 ac.completed = true; 5633 ac.status = status; 5634 activeChildren.remove(pid); 5635 } 5636 } 5637 } 5638 } 5639 5640 // FIXME: config to pass through a shell or not 5641 5642 /++ 5643 This is the native version for Windows. 5644 +/ 5645 this(string program, string commandLine) { 5646 version(Posix) { 5647 assert(0, "not implemented command line to posix args yet"); 5648 } 5649 else throw new NotYetImplementedException(); 5650 } 5651 5652 this(string commandLine) { 5653 version(Posix) { 5654 assert(0, "not implemented command line to posix args yet"); 5655 } 5656 else throw new NotYetImplementedException(); 5657 } 5658 5659 this(string[] args) { 5660 version(Posix) { 5661 this.program = FilePath(args[0]); 5662 this.args = args; 5663 } 5664 else throw new NotYetImplementedException(); 5665 } 5666 5667 /++ 5668 This is the native version for Posix. 5669 +/ 5670 this(FilePath program, string[] args) { 5671 version(Posix) { 5672 this.program = program; 5673 this.args = args; 5674 } 5675 else throw new NotYetImplementedException(); 5676 } 5677 5678 // you can modify these before calling start 5679 int stdoutBufferSize = 32 * 1024; 5680 int stderrBufferSize = 8 * 1024; 5681 5682 void start() { 5683 version(Posix) { 5684 int ret; 5685 5686 int[2] stdinPipes; 5687 ret = pipe(stdinPipes); 5688 if(ret == -1) 5689 throw new ErrnoApiException("stdin pipe", errno); 5690 5691 scope(failure) { 5692 close(stdinPipes[0]); 5693 close(stdinPipes[1]); 5694 } 5695 5696 stdinFd = stdinPipes[1]; 5697 5698 int[2] stdoutPipes; 5699 ret = pipe(stdoutPipes); 5700 if(ret == -1) 5701 throw new ErrnoApiException("stdout pipe", errno); 5702 5703 scope(failure) { 5704 close(stdoutPipes[0]); 5705 close(stdoutPipes[1]); 5706 } 5707 5708 stdoutFd = stdoutPipes[0]; 5709 5710 int[2] stderrPipes; 5711 ret = pipe(stderrPipes); 5712 if(ret == -1) 5713 throw new ErrnoApiException("stderr pipe", errno); 5714 5715 scope(failure) { 5716 close(stderrPipes[0]); 5717 close(stderrPipes[1]); 5718 } 5719 5720 stderrFd = stderrPipes[0]; 5721 5722 5723 int[2] errorReportPipes; 5724 ret = pipe(errorReportPipes); 5725 if(ret == -1) 5726 throw new ErrnoApiException("error reporting pipe", errno); 5727 5728 scope(failure) { 5729 close(errorReportPipes[0]); 5730 close(errorReportPipes[1]); 5731 } 5732 5733 setCloExec(errorReportPipes[0]); 5734 setCloExec(errorReportPipes[1]); 5735 5736 auto forkRet = fork(); 5737 if(forkRet == -1) 5738 throw new ErrnoApiException("fork", errno); 5739 5740 if(forkRet == 0) { 5741 // child side 5742 5743 // FIXME can we do more error checking that is actually useful here? 5744 // these operations are virtually guaranteed to succeed given the setup anyway. 5745 5746 // FIXME pty too 5747 5748 void fail(int step) { 5749 import core.stdc.errno; 5750 auto code = errno; 5751 5752 // report the info back to the parent then exit 5753 5754 int[2] msg = [step, code]; 5755 auto ret = write(errorReportPipes[1], msg.ptr, msg.sizeof); 5756 5757 // but if this fails there's not much we can do... 5758 5759 import core.stdc.stdlib; 5760 exit(1); 5761 } 5762 5763 // dup2 closes the fd it is replacing automatically 5764 dup2(stdinPipes[0], 0); 5765 dup2(stdoutPipes[1], 1); 5766 dup2(stderrPipes[1], 2); 5767 5768 // don't need either of the original pipe fds anymore 5769 close(stdinPipes[0]); 5770 close(stdinPipes[1]); 5771 close(stdoutPipes[0]); 5772 close(stdoutPipes[1]); 5773 close(stderrPipes[0]); 5774 close(stderrPipes[1]); 5775 5776 // the error reporting pipe will be closed upon exec since we set cloexec before fork 5777 // and everything else should have cloexec set too hopefully. 5778 5779 if(beforeExec) 5780 beforeExec(); 5781 5782 // i'm not sure that a fully-initialized druntime is still usable 5783 // after a fork(), so i'm gonna stick to the C lib in here. 5784 5785 const(char)* file = mallocedStringz(program.path).ptr; 5786 if(file is null) 5787 fail(1); 5788 const(char)*[] argv = mallocSlice!(const(char)*)(args.length + 1); 5789 if(argv is null) 5790 fail(2); 5791 foreach(idx, arg; args) { 5792 argv[idx] = mallocedStringz(args[idx]).ptr; 5793 if(argv[idx] is null) 5794 fail(3); 5795 } 5796 argv[args.length] = null; 5797 5798 auto rete = execvp/*e*/(file, argv.ptr/*, envp*/); 5799 if(rete == -1) { 5800 fail(4); 5801 } else { 5802 // unreachable code, exec never returns if it succeeds 5803 assert(0); 5804 } 5805 } else { 5806 pid = forkRet; 5807 5808 recordChildCreated(pid, this); 5809 5810 // close our copy of the write side of the error reporting pipe 5811 // so the read will immediately give eof when the fork closes it too 5812 ErrnoEnforce!close(errorReportPipes[1]); 5813 5814 int[2] msg; 5815 // this will block to wait for it to actually either start up or fail to exec (which should be near instant) 5816 auto val = read(errorReportPipes[0], msg.ptr, msg.sizeof); 5817 5818 if(val == -1) 5819 throw new ErrnoApiException("read error report", errno); 5820 5821 if(val == msg.sizeof) { 5822 // error happened 5823 // FIXME: keep the step part of the error report too 5824 throw new ErrnoApiException("exec", msg[1]); 5825 } else if(val == 0) { 5826 // pipe closed, meaning exec succeeded 5827 } else { 5828 assert(0); // never supposed to happen 5829 } 5830 5831 // set the ones we keep to close upon future execs 5832 // FIXME should i set NOBLOCK at this time too? prolly should 5833 setCloExec(stdinPipes[1]); 5834 setCloExec(stdoutPipes[0]); 5835 setCloExec(stderrPipes[0]); 5836 5837 // and close the others 5838 ErrnoEnforce!close(stdinPipes[0]); 5839 ErrnoEnforce!close(stdoutPipes[1]); 5840 ErrnoEnforce!close(stderrPipes[1]); 5841 5842 ErrnoEnforce!close(errorReportPipes[0]); 5843 5844 // and now register the ones we need to read with the event loop so it can call the callbacks 5845 // also need to listen to SIGCHLD to queue up the terminated callback. FIXME 5846 5847 stdoutUnregisterToken = getThisThreadEventLoop().addCallbackOnFdReadable(stdoutFd, new CallbackHelper(&stdoutReadable)); 5848 stderrUnregisterToken = getThisThreadEventLoop().addCallbackOnFdReadable(stderrFd, new CallbackHelper(&stderrReadable)); 5849 } 5850 } 5851 } 5852 5853 private version(Posix) { 5854 import core.sys.posix.unistd; 5855 import core.sys.posix.fcntl; 5856 5857 int stdinFd = -1; 5858 int stdoutFd = -1; 5859 int stderrFd = -1; 5860 5861 ICoreEventLoop.UnregisterToken stdoutUnregisterToken; 5862 ICoreEventLoop.UnregisterToken stderrUnregisterToken; 5863 5864 pid_t pid = -1; 5865 5866 public void delegate() beforeExec; 5867 5868 FilePath program; 5869 string[] args; 5870 5871 void stdoutReadable() { 5872 if(stdoutReadBuffer is null) 5873 stdoutReadBuffer = new ubyte[](stdoutBufferSize); 5874 auto ret = read(stdoutFd, stdoutReadBuffer.ptr, stdoutReadBuffer.length); 5875 if(ret == -1) 5876 throw new ErrnoApiException("read", errno); 5877 if(onStdoutAvailable) { 5878 onStdoutAvailable(stdoutReadBuffer[0 .. ret]); 5879 } 5880 5881 if(ret == 0) { 5882 stdoutUnregisterToken.unregister(); 5883 5884 close(stdoutFd); 5885 stdoutFd = -1; 5886 } 5887 } 5888 5889 void stderrReadable() { 5890 if(stderrReadBuffer is null) 5891 stderrReadBuffer = new ubyte[](stderrBufferSize); 5892 auto ret = read(stderrFd, stderrReadBuffer.ptr, stderrReadBuffer.length); 5893 if(ret == -1) 5894 throw new ErrnoApiException("read", errno); 5895 if(onStderrAvailable) { 5896 onStderrAvailable(stderrReadBuffer[0 .. ret]); 5897 } 5898 5899 if(ret == 0) { 5900 stderrUnregisterToken.unregister(); 5901 5902 close(stderrFd); 5903 stderrFd = -1; 5904 } 5905 } 5906 } 5907 5908 private ubyte[] stdoutReadBuffer; 5909 private ubyte[] stderrReadBuffer; 5910 5911 void waitForCompletion() { 5912 getThisThreadEventLoop().run(&this.isComplete); 5913 } 5914 5915 bool isComplete() { 5916 return completed; 5917 } 5918 5919 bool completed; 5920 int status = int.min; 5921 5922 /++ 5923 If blocking, it will block the current task until the write succeeds. 5924 5925 Write `null` as data to close the pipe. Once the pipe is closed, you must not try to write to it again. 5926 +/ 5927 void writeToStdin(in void[] data) { 5928 version(Posix) { 5929 if(data is null) { 5930 close(stdinFd); 5931 stdinFd = -1; 5932 } else { 5933 // FIXME: check the return value again and queue async writes 5934 auto ret = write(stdinFd, data.ptr, data.length); 5935 if(ret == -1) 5936 throw new ErrnoApiException("write", errno); 5937 } 5938 } 5939 5940 } 5941 5942 void delegate(ubyte[] got) onStdoutAvailable; 5943 void delegate(ubyte[] got) onStderrAvailable; 5944 void delegate(int code) onTermination; 5945 5946 // pty? 5947 } 5948 5949 // FIXME: comment this out 5950 /+ 5951 unittest { 5952 auto proc = new ExternalProcess(FilePath("/bin/cat"), ["/bin/cat"]); 5953 5954 getThisThreadEventLoop(); // initialize it 5955 5956 int c = 0; 5957 proc.onStdoutAvailable = delegate(ubyte[] got) { 5958 if(c == 0) 5959 assert(cast(string) got == "hello!"); 5960 else 5961 assert(got.length == 0); 5962 // import std.stdio; writeln(got); 5963 c++; 5964 }; 5965 5966 proc.start(); 5967 5968 assert(proc.pid != -1); 5969 5970 5971 import std.stdio; 5972 Thread[4] pool; 5973 5974 bool shouldExit; 5975 5976 static int received; 5977 5978 proc.writeToStdin("hello!"); 5979 proc.writeToStdin(null); // closes the pipe 5980 5981 proc.waitForCompletion(); 5982 5983 assert(proc.status == 0); 5984 5985 assert(c == 2); 5986 5987 // writeln("here"); 5988 } 5989 +/ 5990 5991 // to test the thundering herd on signal handling 5992 version(none) 5993 unittest { 5994 Thread[4] pool; 5995 foreach(ref thread; pool) { 5996 thread = new class Thread { 5997 this() { 5998 super({ 5999 int count; 6000 getThisThreadEventLoop().run(() { 6001 if(count > 4) return true; 6002 count++; 6003 return false; 6004 }); 6005 }); 6006 } 6007 }; 6008 thread.start(); 6009 } 6010 foreach(ref thread; pool) { 6011 thread.join(); 6012 } 6013 } 6014 6015 /+ 6016 ================= 6017 STDIO REPLACEMENT 6018 ================= 6019 +/ 6020 6021 private void appendToBuffer(ref char[] buffer, ref int pos, scope const(char)[] what) { 6022 auto required = pos + what.length; 6023 if(buffer.length < required) 6024 buffer.length = required; 6025 buffer[pos .. pos + what.length] = what[]; 6026 pos += what.length; 6027 } 6028 6029 private void appendToBuffer(ref char[] buffer, ref int pos, long what) { 6030 if(buffer.length < pos + 16) 6031 buffer.length = pos + 16; 6032 auto sliced = intToString(what, buffer[pos .. $]); 6033 pos += sliced.length; 6034 } 6035 6036 /++ 6037 A `writeln` that actually works, at least for some basic types. 6038 6039 It works correctly on Windows, using the correct functions to write unicode to the console. even allocating a console if needed. If the output has been redirected to a file or pipe, it writes UTF-8. 6040 6041 This always does text. See also WritableStream and WritableTextStream when they are implemented. 6042 +/ 6043 void writeln(T...)(T t) { 6044 char[256] bufferBacking; 6045 char[] buffer = bufferBacking[]; 6046 int pos; 6047 6048 foreach(arg; t) { 6049 static if(is(typeof(arg) : const char[])) { 6050 appendToBuffer(buffer, pos, arg); 6051 } else static if(is(typeof(arg) : stringz)) { 6052 appendToBuffer(buffer, pos, arg.borrow); 6053 } else static if(is(typeof(arg) : long)) { 6054 appendToBuffer(buffer, pos, arg); 6055 } else static if(is(typeof(arg.toString()) : const char[])) { 6056 appendToBuffer(buffer, pos, arg.toString()); 6057 } else { 6058 appendToBuffer(buffer, pos, "<" ~ typeof(arg).stringof ~ ">"); 6059 } 6060 } 6061 6062 appendToBuffer(buffer, pos, "\n"); 6063 6064 actuallyWriteToStdout(buffer[0 .. pos]); 6065 } 6066 6067 private void actuallyWriteToStdout(scope char[] buffer) @trusted { 6068 version(Windows) { 6069 import core.sys.windows.wincon; 6070 6071 auto hStdOut = GetStdHandle(STD_OUTPUT_HANDLE); 6072 if(hStdOut == null || hStdOut == INVALID_HANDLE_VALUE) { 6073 AllocConsole(); 6074 hStdOut = GetStdHandle(STD_OUTPUT_HANDLE); 6075 } 6076 6077 if(GetFileType(hStdOut) == FILE_TYPE_CHAR) { 6078 wchar[256] wbuffer; 6079 auto toWrite = makeWindowsString(buffer, wbuffer, WindowsStringConversionFlags.convertNewLines); 6080 6081 DWORD written; 6082 WriteConsoleW(hStdOut, toWrite.ptr, cast(DWORD) toWrite.length, &written, null); 6083 } else { 6084 DWORD written; 6085 WriteFile(hStdOut, buffer.ptr, cast(DWORD) buffer.length, &written, null); 6086 } 6087 } else { 6088 import unix = core.sys.posix.unistd; 6089 unix.write(1, buffer.ptr, buffer.length); 6090 } 6091 } 6092 6093 /+ 6094 6095 STDIO 6096 6097 /++ 6098 Please note using this will create a compile-time dependency on [arsd.terminal] 6099 6100 6101 6102 so my writeln replacement: 6103 6104 1) if the std output handle is null, alloc one 6105 2) if it is a character device, write out the proper Unicode text. 6106 3) otherwise write out UTF-8.... maybe with a BOM but maybe not. it is tricky to know what the other end of a pipe expects... 6107 [8:15 AM] 6108 im actually tempted to make the write binary to stdout functions throw an exception if it is a character console / interactive terminal instead of letting you spam it right out 6109 [8:16 AM] 6110 of course you can still cheat by casting binary data to string and using the write string function (and this might be appropriate sometimes) but there kinda is a legit difference between a text output and a binary output device 6111 6112 Stdout can represent either 6113 6114 +/ 6115 void writeln(){} { 6116 6117 } 6118 6119 stderr? 6120 6121 /++ 6122 Please note using this will create a compile-time dependency on [arsd.terminal] 6123 6124 It can be called from a task. 6125 6126 It works correctly on Windows and is user friendly on Linux (using arsd.terminal.getline) 6127 while also working if stdin has been redirected (where arsd.terminal itself would throw) 6128 6129 6130 so say you run a program on an interactive terminal. the program tries to open the stdin binary stream 6131 6132 instead of throwing, the prompt could change to indicate the binary data is expected and you can feed it in either by typing it up,,,, or running some command like maybe <file.bin to have the library do what the shell would have done and feed that to the rest of the program 6133 6134 +/ 6135 string readln()() { 6136 6137 } 6138 6139 6140 // if using stdio as a binary output thing you can pretend it is a file w/ stream capability 6141 struct File { 6142 WritableStream ostream; 6143 ReadableStream istream; 6144 6145 ulong tell; 6146 void seek(ulong to) {} 6147 6148 void sync(); 6149 void close(); 6150 } 6151 6152 // these are a bit special because if it actually is an interactive character device, it might be different than other files and even different than other pipes. 6153 WritableStream stdoutStream() { return null; } 6154 WritableStream stderrStream() { return null; } 6155 ReadableStream stdinStream() { return null; } 6156 6157 +/ 6158 6159 6160 /+ 6161 6162 6163 /+ 6164 Druntime appears to have stuff for darwin, freebsd. I might have to add some for openbsd here and maybe netbsd if i care to test it. 6165 +/ 6166 6167 /+ 6168 6169 arsd_core_init(number_of_worker_threads) 6170 6171 Building-block things wanted for the event loop integration: 6172 * ui 6173 * windows 6174 * terminal / console 6175 * generic 6176 * adopt fd 6177 * adopt windows handle 6178 * shared lib 6179 * load 6180 * timers (relative and real time) 6181 * create 6182 * update 6183 * cancel 6184 * file/directory watches 6185 * file created 6186 * file deleted 6187 * file modified 6188 * file ops 6189 * open 6190 * close 6191 * read 6192 * write 6193 * seek 6194 * sendfile on linux, TransmitFile on Windows 6195 * let completion handlers run in the io worker thread instead of signaling back 6196 * pipe ops (anonymous or named) 6197 * create 6198 * read 6199 * write 6200 * get info about other side of the pipe 6201 * network ops (stream + datagram, ip, ipv6, unix) 6202 * address look up 6203 * connect 6204 * start tls 6205 * listen 6206 * send 6207 * receive 6208 * get peer info 6209 * process ops 6210 * spawn 6211 * notifications when it is terminated or fork or execs 6212 * send signal 6213 * i/o pipes 6214 * thread ops (isDaemon?) 6215 * spawn 6216 * talk to its event loop 6217 * termination notification 6218 * signals 6219 * ctrl+c is the only one i really care about but the others might be made available too. sigchld needs to be done as an impl detail of process ops. 6220 * custom messages 6221 * should be able to send messages from finalizers... 6222 6223 * want to make sure i can stream stuff on top of it all too. 6224 6225 ======== 6226 6227 These things all refer back to a task-local thing that queues the tasks. If it is a fiber, it uses that 6228 and if it is a thread it uses that... 6229 6230 tls IArsdCoreEventLoop curentTaskInterface; // this yields on the wait for calls. the fiber swapper will swap this too. 6231 tls IArsdCoreEventLoop currentThreadInterface; // this blocks on the event loop 6232 6233 shared IArsdCoreEventLoop currentProcessInterface; // this dispatches to any available thread 6234 +/ 6235 6236 6237 /+ 6238 You might have configurable tasks that do not auto-start, e.g. httprequest. maybe @mustUse on those 6239 6240 then some that do auto-start, e.g. setTimeout 6241 6242 6243 timeouts: duration, MonoTime, or SysTime? duration is just a timer monotime auto-adjusts the when, systime sets a real time timerfd 6244 6245 tasks can be set to: 6246 thread affinity - this, any, specific reference 6247 reports to - defaults to this, can also pass down a parent reference. if reports to dies, all its subordinates are cancelled. 6248 6249 6250 you can send a message to a task... maybe maybe just to a task runner (which is itself a task?) 6251 6252 auto file = readFile(x); 6253 auto timeout = setTimeout(y); 6254 auto completed = waitForFirstToCompleteThenCancelOthers(file, timeout); 6255 if(completed == 0) { 6256 file.... 6257 } else { 6258 timeout.... 6259 } 6260 6261 /+ 6262 A task will run on a thread (with possible migration), and report to a task. 6263 +/ 6264 6265 // a compute task is run on a helper thread 6266 auto task = computeTask((shared(bool)* cancellationRequested) { 6267 // or pass in a yield thing... prolly a TaskController which has cancellationRequested and yield controls as well as send message to parent (sync or async) 6268 6269 // you'd periodically send messages back to the parent 6270 }, RunOn.AnyAvailable, Affinity.CanMigrate); 6271 6272 auto task = Task((TaskController controller) { 6273 foreach(x, 0 .. 1000) { 6274 if(x % 10 == 0) 6275 controller.yield(); // periodically yield control, which also checks for cancellation for us 6276 // do some work 6277 6278 controller.sendMessage(...); 6279 controller.sendProgress(x); // yields it for a foreach stream kind of thing 6280 } 6281 6282 return something; // automatically sends the something as the result in a TaskFinished message 6283 }); 6284 6285 foreach(item; task) // waitsForProgress, sendProgress sends an item and the final return sends an item 6286 {} 6287 6288 6289 see ~/test/task.d 6290 6291 // an io task is run locally via the event loops 6292 auto task2 = ioTask(() { 6293 6294 }); 6295 6296 6297 6298 waitForEvent 6299 +/ 6300 6301 /+ 6302 Most functions should prolly take a thread arg too, which defaults 6303 to this thread, but you can also pass it a reference, or a "any available" thing. 6304 6305 This can be a ufcs overload 6306 +/ 6307 6308 interface SemiSynchronousTask { 6309 6310 } 6311 6312 struct TimeoutCompletionResult { 6313 bool completed; 6314 6315 bool opCast(T : bool)() { 6316 return completed; 6317 } 6318 } 6319 6320 struct Timeout { 6321 void reschedule(Duration when) { 6322 6323 } 6324 6325 void cancel() { 6326 6327 } 6328 6329 TimeoutCompletionResult waitForCompletion() { 6330 return TimeoutCompletionResult(false); 6331 } 6332 } 6333 6334 Timeout setTimeout(void delegate() dg, int msecs, int permittedJitter = 20) { 6335 return Timeout.init; 6336 } 6337 6338 void clearTimeout(Timeout timeout) { 6339 timeout.cancel(); 6340 } 6341 6342 void createInterval() {} 6343 void clearInterval() {} 6344 6345 /++ 6346 Schedules a task at the given wall clock time. 6347 +/ 6348 void scheduleTask() {} 6349 6350 struct IoOperationCompletionResult { 6351 enum Status { 6352 cancelled, 6353 completed 6354 } 6355 6356 Status status; 6357 6358 int error; 6359 int bytesWritten; 6360 6361 bool opCast(T : bool)() { 6362 return status == Status.completed; 6363 } 6364 } 6365 6366 struct IoOperation { 6367 void cancel() {} 6368 6369 IoOperationCompletionResult waitForCompletion() { 6370 return IoOperationCompletionResult.init; 6371 } 6372 6373 // could contain a scoped class in here too so it stack allocated 6374 } 6375 6376 // Should return both the object and the index in the array! 6377 Result waitForFirstToComplete(Operation[]...) {} 6378 6379 IoOperation read(IoHandle handle, ubyte[] buffer 6380 6381 /+ 6382 class IoOperation {} 6383 6384 // an io operation and its buffer must not be modified or freed 6385 // in between a call to enqueue and a call to waitForCompletion 6386 // if you used the whenComplete callback, make sure it is NOT gc'd or scope thing goes out of scope in the mean time 6387 // if its dtor runs, it'd be forced to be cancelled... 6388 6389 scope IoOperation op = new IoOperation(buffer_size); 6390 op.start(); 6391 op.waitForCompletion(); 6392 +/ 6393 6394 /+ 6395 will want: 6396 read, write 6397 send, recv 6398 6399 cancel 6400 6401 open file, open (named or anonymous) pipe, open process 6402 connect, accept 6403 SSL 6404 close 6405 6406 postEvent 6407 postAPC? like run in gui thread / async 6408 waitForEvent ? needs to handle a timeout and a cancellation. would only work in the fiber task api. 6409 6410 waitForSuccess 6411 6412 interrupt handler 6413 6414 onPosixReadReadiness 6415 onPosixWriteReadiness 6416 6417 onWindowsHandleReadiness 6418 - but they're one-offs so you gotta reregister for each event 6419 +/ 6420 6421 6422 6423 /+ 6424 arsd.core.uda 6425 6426 you define a model struct with the types you want to extract 6427 6428 you get it with like Model extract(Model, UDAs...)(Model default) 6429 6430 defaultModel!alias > defaultModel!Type(defaultModel("identifier")) 6431 6432 6433 6434 6435 6436 6437 6438 6439 6440 6441 so while i laid there sleep deprived i did think a lil more on some uda stuff. it isn't especially novel but a combination of a few other techniques 6442 6443 you might be like 6444 6445 struct MyUdas { 6446 DbName name; 6447 DbIgnore ignore; 6448 } 6449 6450 elsewhere 6451 6452 foreach(alias; allMembers) { 6453 auto udas = getUdas!(MyUdas, __traits(getAttributes, alias))(MyUdas(DbName(__traits(identifier, alias)))); 6454 } 6455 6456 6457 so you pass the expected type and the attributes as the template params, then the runtime params are the default values for the given types 6458 6459 so what the thing does essentially is just sets the values of the given thing to the udas based on type then returns the modified instance 6460 6461 so the end result is you keep the last ones. it wouldn't report errors if multiple things added but it p simple to understand, simple to document (even though the default values are not in the struct itself, you can put ddocs in them), and uses the tricks to minimize generated code size 6462 +/ 6463 6464 +/ 6465 6466 private version(Windows) extern(Windows) { 6467 BOOL CancelIoEx(HANDLE, LPOVERLAPPED); 6468 6469 struct WSABUF { 6470 ULONG len; 6471 ubyte* buf; 6472 } 6473 alias LPWSABUF = WSABUF*; 6474 6475 // https://learn.microsoft.com/en-us/windows/win32/api/winsock2/ns-winsock2-wsaoverlapped 6476 // "The WSAOVERLAPPED structure is compatible with the Windows OVERLAPPED structure." 6477 // so ima lie here in the bindings. 6478 6479 int WSASend(SOCKET, LPWSABUF, DWORD, LPDWORD, DWORD, LPOVERLAPPED, LPOVERLAPPED_COMPLETION_ROUTINE); 6480 int WSASendTo(SOCKET, LPWSABUF, DWORD, LPDWORD, DWORD, const sockaddr*, int, LPOVERLAPPED, LPOVERLAPPED_COMPLETION_ROUTINE); 6481 6482 int WSARecv(SOCKET, LPWSABUF, DWORD, LPDWORD, LPDWORD, LPOVERLAPPED, LPOVERLAPPED_COMPLETION_ROUTINE); 6483 int WSARecvFrom(SOCKET, LPWSABUF, DWORD, LPDWORD, LPDWORD, sockaddr*, LPINT, LPOVERLAPPED, LPOVERLAPPED_COMPLETION_ROUTINE); 6484 }