1 /++ 2 Shared core functionality including exception helpers, library loader, event loop, and possibly more. Maybe command line processor and uda helper and some basic shared annotation types. 3 4 I'll probably move the url, websocket, and ssl stuff in here too as they are often shared. 5 6 If you use this directly outside the arsd library, you might consider using `static import` since names in here are likely to clash with Phobos if you use them together. `static import` will let you easily disambiguate and avoid name conflict errors if I add more here. Some names even clash deliberately to remind me to avoid some antipatterns inside the arsd modules! 7 8 History: 9 Added March 2023 (dub v11.0). Several functions were migrated in here at that time, noted individually. Members without a note were added with the module. 10 +/ 11 module arsd.core; 12 13 // see for useful info: https://devblogs.microsoft.com/dotnet/how-async-await-really-works/ 14 15 import core.thread; 16 import core..volatile; 17 import core.atomic; 18 import core.time; 19 20 import core.stdc.errno; 21 22 import core.attribute; 23 static if(!__traits(hasMember, core.attribute, "mustuse")) 24 enum mustuse; 25 26 // FIXME: add an arena allocator? can do task local destruction maybe. 27 28 // the three implementations are windows, epoll, and kqueue 29 version(Windows) { 30 version=Arsd_core_windows; 31 32 import core.sys.windows.windows; 33 import core.sys.windows.windef; 34 } else version(linux) { 35 version=Arsd_core_epoll; 36 37 version=Arsd_core_has_cloexec; 38 } else version(FreeBSD) { 39 version=Arsd_core_kqueue; 40 41 import core.sys.freebsd.sys.event; 42 } else version(OSX) { 43 version=Arsd_core_kqueue; 44 45 import core.sys.darwin.sys.event; 46 } 47 48 version(Posix) { 49 import core.sys.posix.signal; 50 } 51 52 // FIXME: the exceptions should actually give some explanatory text too (at least sometimes) 53 54 /+ 55 ========================= 56 GENERAL UTILITY FUNCTIONS 57 ========================= 58 +/ 59 60 // enum stringz : const(char)* { init = null } 61 62 /++ 63 A wrapper around a `const(char)*` to indicate that it is a zero-terminated C string. 64 +/ 65 struct stringz { 66 private const(char)* raw; 67 68 /++ 69 Wraps the given pointer in the struct. Note that it retains a copy of the pointer. 70 +/ 71 this(const(char)* raw) { 72 this.raw = raw; 73 } 74 75 /++ 76 Returns the original raw pointer back out. 77 +/ 78 const(char)* ptr() const { 79 return raw; 80 } 81 82 /++ 83 Borrows a slice of the pointer up to the zero terminator. 84 +/ 85 const(char)[] borrow() const { 86 if(raw is null) 87 return null; 88 89 const(char)* p = raw; 90 int length; 91 while(*p++) length++; 92 93 return raw[0 .. length]; 94 } 95 } 96 97 /++ 98 This is a dummy type to indicate the end of normal arguments and the beginning of the file/line inferred args. It is meant to ensure you don't accidentally send a string that is interpreted as a filename when it was meant to be a normal argument to the function and trigger the wrong overload. 99 +/ 100 struct ArgSentinel {} 101 102 /++ 103 A trivial wrapper around C's malloc that creates a D slice. It multiples n by T.sizeof and returns the slice of the pointer from 0 to n. 104 105 Please note that the ptr might be null - it is your responsibility to check that, same as normal malloc. Check `ret is null` specifically, since `ret.length` will always be `n`, even if the `malloc` failed. 106 107 Remember to `free` the returned pointer with `core.stdc.stdlib.free(ret.ptr);` 108 109 $(TIP 110 I strongly recommend you simply use the normal garbage collector unless you have a very specific reason not to. 111 ) 112 113 See_Also: 114 [mallocedStringz] 115 +/ 116 T[] mallocSlice(T)(size_t n) { 117 import c = core.stdc.stdlib; 118 119 return (cast(T*) c.malloc(n * T.sizeof))[0 .. n]; 120 } 121 122 /++ 123 Uses C's malloc to allocate a copy of `original` with an attached zero terminator. It may return a slice with a `null` pointer (but non-zero length!) if `malloc` fails and you are responsible for freeing the returned pointer with `core.stdc.stdlib.free(ret.ptr)`. 124 125 $(TIP 126 I strongly recommend you use [CharzBuffer] or Phobos' [std.string.toStringz] instead unless there's a special reason not to. 127 ) 128 129 See_Also: 130 [CharzBuffer] for a generally better alternative. You should only use `mallocedStringz` where `CharzBuffer` cannot be used (e.g. when druntime is not usable or you have no stack space for the temporary buffer). 131 132 [mallocSlice] is the function this function calls, so the notes in its documentation applies here too. 133 +/ 134 char[] mallocedStringz(in char[] original) { 135 auto slice = mallocSlice!char(original.length + 1); 136 if(slice is null) 137 return null; 138 slice[0 .. original.length] = original[]; 139 slice[original.length] = 0; 140 return slice; 141 } 142 143 /++ 144 Basically a `scope class` you can return from a function or embed in another aggregate. 145 +/ 146 struct OwnedClass(Class) { 147 ubyte[__traits(classInstanceSize, Class)] rawData; 148 149 static OwnedClass!Class defaultConstructed() { 150 OwnedClass!Class i = OwnedClass!Class.init; 151 i.initializeRawData(); 152 return i; 153 } 154 155 private void initializeRawData() @trusted { 156 if(!this) 157 rawData[] = cast(ubyte[]) typeid(Class).initializer[]; 158 } 159 160 this(T...)(T t) { 161 initializeRawData(); 162 rawInstance.__ctor(t); 163 } 164 165 bool opCast(T : bool)() @trusted { 166 return !(*(cast(void**) rawData.ptr) is null); 167 } 168 169 @disable this(); 170 @disable this(this); 171 172 Class rawInstance() return @trusted { 173 if(!this) 174 throw new Exception("null"); 175 return cast(Class) rawData.ptr; 176 } 177 178 alias rawInstance this; 179 180 ~this() @trusted { 181 if(this) 182 .destroy(rawInstance()); 183 } 184 } 185 186 187 188 version(Posix) 189 package(arsd) void makeNonBlocking(int fd) { 190 import core.sys.posix.fcntl; 191 auto flags = fcntl(fd, F_GETFL, 0); 192 if(flags == -1) 193 throw new ErrnoApiException("fcntl get", errno); 194 flags |= O_NONBLOCK; 195 auto s = fcntl(fd, F_SETFL, flags); 196 if(s == -1) 197 throw new ErrnoApiException("fcntl set", errno); 198 } 199 200 version(Posix) 201 package(arsd) void setCloExec(int fd) { 202 import core.sys.posix.fcntl; 203 auto flags = fcntl(fd, F_GETFD, 0); 204 if(flags == -1) 205 throw new ErrnoApiException("fcntl get", errno); 206 flags |= FD_CLOEXEC; 207 auto s = fcntl(fd, F_SETFD, flags); 208 if(s == -1) 209 throw new ErrnoApiException("fcntl set", errno); 210 } 211 212 213 /++ 214 A helper object for temporarily constructing a string appropriate for the Windows API from a D UTF-8 string. 215 216 217 It will use a small internal static buffer is possible, and allocate a new buffer if the string is too big. 218 219 History: 220 Moved from simpledisplay.d to core.d in March 2023 (dub v11.0). 221 +/ 222 version(Windows) 223 struct WCharzBuffer { 224 private wchar[] buffer; 225 private wchar[128] staticBuffer = void; 226 227 /// Length of the string, excluding the zero terminator. 228 size_t length() { 229 return buffer.length; 230 } 231 232 // Returns the pointer to the internal buffer. You must assume its lifetime is less than that of the WCharzBuffer. It is zero-terminated. 233 wchar* ptr() { 234 return buffer.ptr; 235 } 236 237 /// Returns the slice of the internal buffer, excluding the zero terminator (though there is one present right off the end of the slice). You must assume its lifetime is less than that of the WCharzBuffer. 238 wchar[] slice() { 239 return buffer; 240 } 241 242 /// Copies it into a static array of wchars 243 void copyInto(R)(ref R r) { 244 static if(is(R == wchar[N], size_t N)) { 245 r[0 .. this.length] = slice[]; 246 r[this.length] = 0; 247 } else static assert(0, "can only copy into wchar[n], not " ~ R.stringof); 248 } 249 250 /++ 251 conversionFlags = [WindowsStringConversionFlags] 252 +/ 253 this(in char[] data, int conversionFlags = 0) { 254 conversionFlags |= WindowsStringConversionFlags.zeroTerminate; // this ALWAYS zero terminates cuz of its name 255 auto sz = sizeOfConvertedWstring(data, conversionFlags); 256 if(sz > staticBuffer.length) 257 buffer = new wchar[](sz); 258 else 259 buffer = staticBuffer[]; 260 261 buffer = makeWindowsString(data, buffer, conversionFlags); 262 } 263 } 264 265 /++ 266 Alternative for toStringz 267 268 History: 269 Added March 18, 2023 (dub v11.0) 270 +/ 271 struct CharzBuffer { 272 private char[] buffer; 273 private char[128] staticBuffer = void; 274 275 /// Length of the string, excluding the zero terminator. 276 size_t length() { 277 assert(buffer.length > 0); 278 return buffer.length - 1; 279 } 280 281 // Returns the pointer to the internal buffer. You must assume its lifetime is less than that of the CharzBuffer. It is zero-terminated. 282 char* ptr() { 283 return buffer.ptr; 284 } 285 286 /// Returns the slice of the internal buffer, excluding the zero terminator (though there is one present right off the end of the slice). You must assume its lifetime is less than that of the CharzBuffer. 287 char[] slice() { 288 assert(buffer.length > 0); 289 return buffer[0 .. $-1]; 290 } 291 292 /// Copies it into a static array of chars 293 void copyInto(R)(ref R r) { 294 static if(is(R == char[N], size_t N)) { 295 r[0 .. this.length] = slice[]; 296 r[this.length] = 0; 297 } else static assert(0, "can only copy into char[n], not " ~ R.stringof); 298 } 299 300 @disable this(); 301 @disable this(this); 302 303 /++ 304 Copies `data` into the CharzBuffer, allocating a new one if needed, and zero-terminates it. 305 +/ 306 this(in char[] data) { 307 if(data.length + 1 > staticBuffer.length) 308 buffer = new char[](data.length + 1); 309 else 310 buffer = staticBuffer[]; 311 312 buffer[0 .. data.length] = data[]; 313 buffer[data.length] = 0; 314 } 315 } 316 317 /++ 318 Given the string `str`, converts it to a string compatible with the Windows API and puts the result in `buffer`, returning the slice of `buffer` actually used. `buffer` must be at least [sizeOfConvertedWstring] elements long. 319 320 History: 321 Moved from simpledisplay.d to core.d in March 2023 (dub v11.0). 322 +/ 323 version(Windows) 324 wchar[] makeWindowsString(in char[] str, wchar[] buffer, int conversionFlags = WindowsStringConversionFlags.zeroTerminate) { 325 if(str.length == 0) 326 return null; 327 328 int pos = 0; 329 dchar last; 330 foreach(dchar c; str) { 331 if(c <= 0xFFFF) { 332 if((conversionFlags & WindowsStringConversionFlags.convertNewLines) && c == 10 && last != 13) 333 buffer[pos++] = 13; 334 buffer[pos++] = cast(wchar) c; 335 } else if(c <= 0x10FFFF) { 336 buffer[pos++] = cast(wchar)((((c - 0x10000) >> 10) & 0x3FF) + 0xD800); 337 buffer[pos++] = cast(wchar)(((c - 0x10000) & 0x3FF) + 0xDC00); 338 } 339 340 last = c; 341 } 342 343 if(conversionFlags & WindowsStringConversionFlags.zeroTerminate) { 344 buffer[pos] = 0; 345 } 346 347 return buffer[0 .. pos]; 348 } 349 350 /++ 351 Converts the Windows API string `str` to a D UTF-8 string, storing it in `buffer`. Returns the slice of `buffer` actually used. 352 353 History: 354 Moved from simpledisplay.d to core.d in March 2023 (dub v11.0). 355 +/ 356 version(Windows) 357 char[] makeUtf8StringFromWindowsString(in wchar[] str, char[] buffer) { 358 if(str.length == 0) 359 return null; 360 361 auto got = WideCharToMultiByte(CP_UTF8, 0, str.ptr, cast(int) str.length, buffer.ptr, cast(int) buffer.length, null, null); 362 if(got == 0) { 363 if(GetLastError() == ERROR_INSUFFICIENT_BUFFER) 364 throw new object.Exception("not enough buffer"); 365 else 366 throw new object.Exception("conversion"); // FIXME: GetLastError 367 } 368 return buffer[0 .. got]; 369 } 370 371 /++ 372 Converts the Windows API string `str` to a newly-allocated D UTF-8 string. 373 374 History: 375 Moved from simpledisplay.d to core.d in March 2023 (dub v11.0). 376 +/ 377 version(Windows) 378 string makeUtf8StringFromWindowsString(in wchar[] str) { 379 char[] buffer; 380 auto got = WideCharToMultiByte(CP_UTF8, 0, str.ptr, cast(int) str.length, null, 0, null, null); 381 buffer.length = got; 382 383 // it is unique because we just allocated it above! 384 return cast(string) makeUtf8StringFromWindowsString(str, buffer); 385 } 386 387 /// ditto 388 version(Windows) 389 string makeUtf8StringFromWindowsString(wchar* str) { 390 char[] buffer; 391 auto got = WideCharToMultiByte(CP_UTF8, 0, str, -1, null, 0, null, null); 392 buffer.length = got; 393 394 got = WideCharToMultiByte(CP_UTF8, 0, str, -1, buffer.ptr, cast(int) buffer.length, null, null); 395 if(got == 0) { 396 if(GetLastError() == ERROR_INSUFFICIENT_BUFFER) 397 throw new object.Exception("not enough buffer"); 398 else 399 throw new object.Exception("conversion"); // FIXME: GetLastError 400 } 401 return cast(string) buffer[0 .. got]; 402 } 403 404 // only used from minigui rn 405 package int findIndexOfZero(in wchar[] str) { 406 foreach(idx, wchar ch; str) 407 if(ch == 0) 408 return cast(int) idx; 409 return cast(int) str.length; 410 } 411 package int findIndexOfZero(in char[] str) { 412 foreach(idx, char ch; str) 413 if(ch == 0) 414 return cast(int) idx; 415 return cast(int) str.length; 416 } 417 418 /++ 419 Returns a minimum buffer length to hold the string `s` with the given conversions. It might be slightly larger than necessary, but is guaranteed to be big enough to hold it. 420 421 History: 422 Moved from simpledisplay.d to core.d in March 2023 (dub v11.0). 423 +/ 424 version(Windows) 425 int sizeOfConvertedWstring(in char[] s, int conversionFlags) { 426 int size = 0; 427 428 if(conversionFlags & WindowsStringConversionFlags.convertNewLines) { 429 // need to convert line endings, which means the length will get bigger. 430 431 // BTW I betcha this could be faster with some simd stuff. 432 char last; 433 foreach(char ch; s) { 434 if(ch == 10 && last != 13) 435 size++; // will add a 13 before it... 436 size++; 437 last = ch; 438 } 439 } else { 440 // no conversion necessary, just estimate based on length 441 /* 442 I don't think there's any string with a longer length 443 in code units when encoded in UTF-16 than it has in UTF-8. 444 This will probably over allocate, but that's OK. 445 */ 446 size = cast(int) s.length; 447 } 448 449 if(conversionFlags & WindowsStringConversionFlags.zeroTerminate) 450 size++; 451 452 return size; 453 } 454 455 /++ 456 Used by [makeWindowsString] and [WCharzBuffer] 457 458 History: 459 Moved from simpledisplay.d to core.d in March 2023 (dub v11.0). 460 +/ 461 version(Windows) 462 enum WindowsStringConversionFlags : int { 463 /++ 464 Append a zero terminator to the string. 465 +/ 466 zeroTerminate = 1, 467 /++ 468 Converts newlines from \n to \r\n. 469 +/ 470 convertNewLines = 2, 471 } 472 473 /++ 474 An int printing function that doesn't need to import Phobos. Can do some of the things std.conv.to and std.format.format do. 475 476 The buffer must be sized to hold the converted number. 32 chars is enough for most anything. 477 478 Returns: the slice of `buffer` containing the converted number. 479 +/ 480 char[] intToString(long value, char[] buffer, IntToStringArgs args = IntToStringArgs.init) { 481 const int radix = args.radix ? args.radix : 10; 482 const int digitsPad = args.padTo; 483 const int groupSize = args.groupSize; 484 485 int pos; 486 487 if(value < 0) { 488 buffer[pos++] = '-'; 489 value = -value; 490 } 491 492 int start = pos; 493 int digitCount; 494 495 do { 496 auto remainder = value % radix; 497 value = value / radix; 498 499 buffer[pos++] = cast(char) (remainder < 10 ? (remainder + '0') : (remainder - 10 + args.ten)); 500 digitCount++; 501 } while(value); 502 503 if(digitsPad > 0) { 504 while(digitCount < digitsPad) { 505 buffer[pos++] = args.padWith; 506 digitCount++; 507 } 508 } 509 510 assert(pos >= 1); 511 assert(pos - start > 0); 512 513 auto reverseSlice = buffer[start .. pos]; 514 for(int i = 0; i < reverseSlice.length / 2; i++) { 515 auto paired = cast(int) reverseSlice.length - i - 1; 516 char tmp = reverseSlice[i]; 517 reverseSlice[i] = reverseSlice[paired]; 518 reverseSlice[paired] = tmp; 519 } 520 521 return buffer[0 .. pos]; 522 } 523 524 /// ditto 525 struct IntToStringArgs { 526 private { 527 ubyte padTo; 528 char padWith; 529 ubyte radix; 530 char ten; 531 ubyte groupSize; 532 char separator; 533 } 534 535 IntToStringArgs withPadding(int padTo, char padWith = '0') { 536 IntToStringArgs args = this; 537 args.padTo = cast(ubyte) padTo; 538 args.padWith = padWith; 539 return args; 540 } 541 542 IntToStringArgs withRadix(int radix, char ten = 'a') { 543 IntToStringArgs args = this; 544 args.radix = cast(ubyte) radix; 545 args.ten = ten; 546 return args; 547 } 548 549 IntToStringArgs withGroupSeparator(int groupSize, char separator = '_') { 550 IntToStringArgs args = this; 551 args.groupSize = cast(ubyte) groupSize; 552 args.separator = separator; 553 return args; 554 } 555 } 556 557 unittest { 558 char[32] buffer; 559 assert(intToString(0, buffer[]) == "0"); 560 assert(intToString(-1, buffer[]) == "-1"); 561 assert(intToString(-132, buffer[]) == "-132"); 562 assert(intToString(-1932, buffer[]) == "-1932"); 563 assert(intToString(1, buffer[]) == "1"); 564 assert(intToString(132, buffer[]) == "132"); 565 assert(intToString(1932, buffer[]) == "1932"); 566 567 assert(intToString(0x1, buffer[], IntToStringArgs().withRadix(16)) == "1"); 568 assert(intToString(0x1b, buffer[], IntToStringArgs().withRadix(16)) == "1b"); 569 assert(intToString(0xef1, buffer[], IntToStringArgs().withRadix(16)) == "ef1"); 570 571 assert(intToString(0xef1, buffer[], IntToStringArgs().withRadix(16).withPadding(8)) == "00000ef1"); 572 assert(intToString(-0xef1, buffer[], IntToStringArgs().withRadix(16).withPadding(8)) == "-00000ef1"); 573 assert(intToString(-0xef1, buffer[], IntToStringArgs().withRadix(16, 'A').withPadding(8, ' ')) == "- EF1"); 574 } 575 576 /++ 577 History: 578 Moved from color.d to core.d in March 2023 (dub v11.0). 579 +/ 580 nothrow @safe @nogc pure 581 inout(char)[] stripInternal(return inout(char)[] s) { 582 foreach(i, char c; s) 583 if(c != ' ' && c != '\t' && c != '\n' && c != '\r') { 584 s = s[i .. $]; 585 break; 586 } 587 for(int a = cast(int)(s.length - 1); a > 0; a--) { 588 char c = s[a]; 589 if(c != ' ' && c != '\t' && c != '\n' && c != '\r') { 590 s = s[0 .. a + 1]; 591 break; 592 } 593 } 594 595 return s; 596 } 597 598 /++ 599 Shortcut for converting some types to string without invoking Phobos (but it will as a last resort). 600 601 History: 602 Moved from color.d to core.d in March 2023 (dub v11.0). 603 +/ 604 string toStringInternal(T)(T t) { 605 char[32] buffer; 606 static if(is(T : string)) 607 return t; 608 else static if(is(T : long)) 609 return intToString(t, buffer[]).idup; 610 else { 611 import std.conv; 612 return to!string(t); 613 } 614 } 615 616 /++ 617 This populates a struct from a list of values (or other expressions, but it only looks at the values) based on types of the members, with one exception: `bool` members.. maybe. 618 619 It is intended for collecting a record of relevant UDAs off a symbol in a single call like this: 620 621 --- 622 struct Name { 623 string n; 624 } 625 626 struct Validator { 627 string regex; 628 } 629 630 struct FormInfo { 631 Name name; 632 Validator validator; 633 } 634 635 @Name("foo") @Validator(".*") 636 void foo() {} 637 638 auto info = populateFromUdas!(FormInfo, __traits(getAttributes, foo)); 639 assert(info.name == Name("foo")); 640 assert(info.validator == Validator(".*")); 641 --- 642 643 Note that instead of UDAs, you can also pass a variadic argument list and get the same result, but the function is `populateFromArgs` and you pass them as the runtime list to bypass "args cannot be evaluated at compile time" errors: 644 645 --- 646 void foo(T...)(T t) { 647 auto info = populateFromArgs!(FormInfo)(t); 648 // assuming the call below 649 assert(info.name == Name("foo")); 650 assert(info.validator == Validator(".*")); 651 } 652 653 foo(Name("foo"), Validator(".*")); 654 --- 655 656 The benefit of this over constructing the struct directly is that the arguments can be reordered or missing. Its value is diminished with named arguments in the language. 657 +/ 658 template populateFromUdas(Struct, UDAs...) { 659 enum Struct populateFromUdas = () { 660 Struct ret; 661 foreach(memberName; __traits(allMembers, Struct)) { 662 alias memberType = typeof(__traits(getMember, Struct, memberName)); 663 foreach(uda; UDAs) { 664 static if(is(memberType == PresenceOf!a, a)) { 665 static if(__traits(isSame, a, uda)) 666 __traits(getMember, ret, memberName) = true; 667 } 668 else 669 static if(is(typeof(uda) : memberType)) { 670 __traits(getMember, ret, memberName) = uda; 671 } 672 } 673 } 674 675 return ret; 676 }(); 677 } 678 679 /// ditto 680 Struct populateFromArgs(Struct, Args...)(Args args) { 681 Struct ret; 682 foreach(memberName; __traits(allMembers, Struct)) { 683 alias memberType = typeof(__traits(getMember, Struct, memberName)); 684 foreach(arg; args) { 685 static if(is(typeof(arg == memberType))) { 686 __traits(getMember, ret, memberName) = arg; 687 } 688 } 689 } 690 691 return ret; 692 } 693 694 /// ditto 695 struct PresenceOf(alias a) { 696 bool there; 697 alias there this; 698 } 699 700 /// 701 unittest { 702 enum a; 703 enum b; 704 struct Name { string name; } 705 struct Info { 706 Name n; 707 PresenceOf!a athere; 708 PresenceOf!b bthere; 709 int c; 710 } 711 712 void test() @a @Name("test") {} 713 714 auto info = populateFromUdas!(Info, __traits(getAttributes, test)); 715 assert(info.n == Name("test")); // but present ones are in there 716 assert(info.athere == true); // non-values can be tested with PresenceOf!it, which works like a bool 717 assert(info.bthere == false); 718 assert(info.c == 0); // absent thing will keep the default value 719 } 720 721 /++ 722 Declares a delegate property with several setters to allow for handlers that don't care about the arguments. 723 724 Throughout the arsd library, you will often see types of these to indicate that you can set listeners with or without arguments. If you care about the details of the callback event, you can set a delegate that declares them. And if you don't, you can set one that doesn't even declare them and it will be ignored. 725 +/ 726 struct FlexibleDelegate(DelegateType) { 727 // please note that Parameters and ReturnType are public now! 728 static if(is(DelegateType FunctionType == delegate)) 729 static if(is(FunctionType Parameters == __parameters)) 730 static if(is(DelegateType ReturnType == return)) { 731 732 /++ 733 Calls the currently set delegate. 734 735 Diagnostics: 736 If the callback delegate has not been set, this may cause a null pointer dereference. 737 +/ 738 ReturnType opCall(Parameters args) { 739 return dg(args); 740 } 741 742 /++ 743 Use `if(thing)` to check if the delegate is null or not. 744 +/ 745 bool opCast(T : bool)() { 746 return dg !is null; 747 } 748 749 /++ 750 These opAssign overloads are what puts the flexibility in the flexible delegate. 751 752 Bugs: 753 The other overloads do not keep attributes like `nothrow` on the `dg` parameter, making them unusable if `DelegateType` requires them. I consider the attributes more trouble than they're worth anyway, and the language's poor support for composing them doesn't help any. I have no need for them and thus no plans to add them in the overloads at this time. 754 +/ 755 void opAssign(DelegateType dg) { 756 this.dg = dg; 757 } 758 759 /// ditto 760 void opAssign(ReturnType delegate() dg) { 761 this.dg = (Parameters ignored) => dg(); 762 } 763 764 /// ditto 765 void opAssign(ReturnType function(Parameters params) dg) { 766 this.dg = (Parameters params) => dg(params); 767 } 768 769 /// ditto 770 void opAssign(ReturnType function() dg) { 771 this.dg = (Parameters ignored) => dg(); 772 } 773 774 /// ditto 775 void opAssign(typeof(null) explicitNull) { 776 this.dg = null; 777 } 778 779 private DelegateType dg; 780 } 781 else static assert(0, DelegateType.stringof ~ " failed return value check"); 782 else static assert(0, DelegateType.stringof ~ " failed parameters check"); 783 else static assert(0, DelegateType.stringof ~ " failed delegate check"); 784 } 785 786 /++ 787 788 +/ 789 unittest { 790 // you don't have to put the arguments in a struct, but i recommend 791 // you do as it is more future proof - you can add more info to the 792 // struct without breaking user code that consumes it. 793 struct MyEventArguments { 794 795 } 796 797 // then you declare it just adding FlexibleDelegate!() around the 798 // plain delegate type you'd normally use 799 FlexibleDelegate!(void delegate(MyEventArguments args)) callback; 800 801 // until you set it, it will be null and thus be false in any boolean check 802 assert(!callback); 803 804 // can set it to the properly typed thing 805 callback = delegate(MyEventArguments args) {}; 806 807 // and now it is no longer null 808 assert(callback); 809 810 // or if you don't care about the args, you can leave them off 811 callback = () {}; 812 813 // and it works if the compiler types you as a function instead of delegate too 814 // (which happens automatically if you don't access any local state or if you 815 // explicitly define it as a function) 816 817 callback = function(MyEventArguments args) { }; 818 819 // can set it back to null explicitly if you ever wanted 820 callback = null; 821 822 // the reflection info used internally also happens to be exposed publicly 823 // which can actually sometimes be nice so if the language changes, i'll change 824 // the code to keep this working. 825 static assert(is(callback.ReturnType == void)); 826 827 // which can be convenient if the params is an annoying type since you can 828 // consistently use something like this too 829 callback = (callback.Parameters params) {}; 830 831 // check for null and call it pretty normally 832 if(callback) 833 callback(MyEventArguments()); 834 } 835 836 /+ 837 ====================== 838 ERROR HANDLING HELPERS 839 ====================== 840 +/ 841 842 /+ + 843 arsd code shouldn't be using Exception. Really, I don't think any code should be - instead, construct an appropriate object with structured information. 844 845 If you want to catch someone else's Exception, use `catch(object.Exception e)`. 846 +/ 847 //package deprecated struct Exception {} 848 849 850 /++ 851 Base class representing my exceptions. You should almost never work with this directly, but you might catch it as a generic thing. Catch it before generic `object.Exception` or `object.Throwable` in any catch chains. 852 853 854 $(H3 General guidelines for exceptions) 855 856 The purpose of an exception is to cancel a task that has proven to be impossible and give the programmer enough information to use at a higher level to decide what to do about it. 857 858 Cancelling a task is accomplished with the `throw` keyword. The transmission of information to a higher level is done by the language runtime. The decision point is marked by the `catch` keyword. The part missing - the job of the `Exception` class you construct and throw - is to gather the information that will be useful at a later decision point. 859 860 It is thus important that you gather as much useful information as possible and keep it in a way that the code catching the exception can still interpret it when constructing an exception. Other concerns are secondary to this to this primary goal. 861 862 With this in mind, here's some guidelines for exception handling in arsd code. 863 864 $(H4 Allocations and lifetimes) 865 866 Don't get clever with exception allocations. You don't know what the catcher is going to do with an exception and you don't want the error handling scheme to introduce its own tricky bugs. Remember, an exception object's first job is to deliver useful information up the call chain in a way this code can use it. You don't know what this code is or what it is going to do. 867 868 Keep your memory management schemes simple and let the garbage collector do its job. 869 870 $(LIST 871 * All thrown exceptions should be allocated with the `new` keyword. 872 873 * Members inside the exception should be value types or have infinite lifetime (that is, be GC managed). 874 875 * While this document is concerned with throwing, you might want to add additional information to an in-flight exception, and this is done by catching, so you need to know how that works too, and there is a global compiler switch that can change things, so even inside arsd we can't completely avoid its implications. 876 877 DIP1008's presence complicates things a bit on the catch side - if you catch an exception and return it from a function, remember to `ex.refcount = ex.refcount + 1;` so you don't introduce more use-after-free woes for those unfortunate souls. 878 ) 879 880 $(H4 Error strings) 881 882 Strings can deliver useful information to people reading the message, but are often suboptimal for delivering useful information to other chunks of code. Remember, an exception's first job is to be caught by another block of code. Printing to users is a last resort; even if you want a user-readable error message, an exception is not the ideal way to deliver one since it is constructed in the guts of a failed task, without the higher level context of what the user was actually trying to do. User error messages ought to be made from information in the exception, combined with higher level knowledge. This is best done in a `catch` block, not a `throw` statement. 883 884 As such, I recommend that you: 885 886 $(LIST 887 * Don't concatenate error strings at the throw site. Instead, pass the data you would have used to build the string as actual data to the constructor. This lets catchers see the original data without having to try to extract it from a string. For unique data, you will likely need a unique exception type. More on this in the next section. 888 889 * Don't construct error strings in a constructor either, for the same reason. Pass the useful data up the call chain, as exception members, to the maximum extent possible. Exception: if you are passed some data with a temporary lifetime that is important enough to pass up the chain. You may `.idup` or `to!string` to preserve as much data as you can before it is lost, but still store it in a separate member of the Exception subclass object. 890 891 * $(I Do) construct strings out of public members in [getAdditionalPrintableInformation]. When this is called, the user has requested as much relevant information as reasonable in string format. Still, avoid concatenation - it lets you pass as many key/value pairs as you like to the caller. They can concatenate as needed. However, note the words "public members" - everything you do in `getAdditionalPrintableInformation` ought to also be possible for code that caught your exception via your public methods and properties. 892 ) 893 894 $(H4 Subclasses) 895 896 Any exception with unique data types should be a unique class. Whenever practical, this should be one you write and document at the top-level of a module. But I know we get lazy - me too - and this is why in standard D we'd often fall back to `throw new Exception("some string " ~ some info)`. To help resist these urges, I offer some helper functions to use instead that better achieve the key goal of exceptions - passing structured data up a call chain - while still being convenient to write. 897 898 See: [ArsdException], [Win32Enforce] 899 900 +/ 901 class ArsdExceptionBase : object.Exception { 902 /++ 903 Don't call this except from other exceptions; this is essentially an abstract class. 904 905 Params: 906 operation = the specific operation that failed, throwing the exception 907 +/ 908 package this(string operation, string file = __FILE__, size_t line = __LINE__, Throwable next = null) { 909 super(operation, file, line, next); 910 } 911 912 /++ 913 The toString method will print out several components: 914 915 $(LIST 916 * The file, line, static message, and object class name from the constructor. You can access these independently with the members `file`, `line`, `msg`, and [printableExceptionName]. 917 * The generic category codes stored with this exception 918 * Additional members stored with the exception child classes (e.g. platform error codes, associated function arguments) 919 * The stack trace associated with the exception. You can access these lines independently with `foreach` over the `info` member. 920 ) 921 922 This is meant to be read by the developer, not end users. You should wrap your user-relevant tasks in a try/catch block and construct more appropriate error messages from context available there, using the individual properties of the exception to add richness. 923 +/ 924 final override void toString(scope void delegate(in char[]) sink) const { 925 // class name and info from constructor 926 sink(printableExceptionName); 927 sink("@"); 928 sink(file); 929 sink("("); 930 char[16] buffer; 931 sink(intToString(line, buffer[])); 932 sink("): "); 933 sink(message); 934 935 getAdditionalPrintableInformation((string name, in char[] value) { 936 sink("\n"); 937 sink(name); 938 sink(": "); 939 sink(value); 940 }); 941 942 // full stack trace 943 sink("\n----------------\n"); 944 foreach(str; info) { 945 sink(str); 946 sink("\n"); 947 } 948 } 949 /// ditto 950 final override string toString() { 951 string s; 952 toString((in char[] chunk) { s ~= chunk; }); 953 return s; 954 } 955 956 /++ 957 Users might like to see additional information with the exception. API consumers should pull this out of properties on your child class, but the parent class might not be able to deal with the arbitrary types at runtime the children can introduce, so bringing them all down to strings simplifies that. 958 959 Overrides should always call `super.getAdditionalPrintableInformation(sink);` before adding additional information by calling the sink with other arguments afterward. 960 961 You should spare no expense in preparing this information - translate error codes, build rich strings, whatever it takes - to make the information here useful to the reader. 962 +/ 963 void getAdditionalPrintableInformation(scope void delegate(string name, in char[] value) sink) const { 964 965 } 966 967 /++ 968 This is the name of the exception class, suitable for printing. This should be static data (e.g. a string literal). Override it in subclasses. 969 +/ 970 string printableExceptionName() const { 971 return typeid(this).name; 972 } 973 974 /// deliberately hiding `Throwable.msg`. Use [message] and [toString] instead. 975 @disable final void msg() {} 976 977 override const(char)[] message() const { 978 return super.msg; 979 } 980 } 981 982 /++ 983 You can catch an ArsdException to get its passed arguments out. 984 985 You can pass either a base class or a string as `Type`. 986 987 See the examples for how to use it. 988 +/ 989 template ArsdException(alias Type, DataTuple...) { 990 static if(DataTuple.length) 991 alias Parent = ArsdException!(Type, DataTuple[0 .. $-1]); 992 else 993 alias Parent = ArsdExceptionBase; 994 995 class ArsdException : Parent { 996 DataTuple data; 997 998 this(DataTuple data, string file = __FILE__, size_t line = __LINE__) { 999 this.data = data; 1000 static if(is(Parent == ArsdExceptionBase)) 1001 super(null, file, line); 1002 else 1003 super(data[0 .. $-1], file, line); 1004 } 1005 1006 static opCall(R...)(R r, string file = __FILE__, size_t line = __LINE__) { 1007 return new ArsdException!(Type, DataTuple, R)(r, file, line); 1008 } 1009 1010 override string printableExceptionName() const { 1011 static if(DataTuple.length) 1012 enum str = "ArsdException!(" ~ Type.stringof ~ ", " ~ DataTuple.stringof[1 .. $-1] ~ ")"; 1013 else 1014 enum str = "ArsdException!" ~ Type.stringof; 1015 return str; 1016 } 1017 1018 override void getAdditionalPrintableInformation(scope void delegate(string name, in char[] value) sink) const { 1019 ArsdExceptionBase.getAdditionalPrintableInformation(sink); 1020 1021 foreach(idx, datum; data) { 1022 enum int lol = cast(int) idx; 1023 enum key = "[" ~ lol.stringof ~ "] " ~ DataTuple[idx].stringof; 1024 sink(key, toStringInternal(datum)); 1025 } 1026 } 1027 } 1028 } 1029 1030 /// This example shows how you can throw and catch the ad-hoc exception types. 1031 unittest { 1032 // you can throw and catch by matching the string and argument types 1033 try { 1034 // throw it with parenthesis after the template args (it uses opCall to construct) 1035 throw ArsdException!"Test"(); 1036 // you could also `throw new ArsdException!"test";`, but that gets harder with args 1037 // as we'll see in the following example 1038 assert(0); // remove from docs 1039 } catch(ArsdException!"Test" e) { // catch it without them 1040 // this has no useful information except for the type 1041 // but you can catch it like this and it is still more than generic Exception 1042 } 1043 1044 // an exception's job is to deliver useful information up the chain 1045 // and you can do that easily by passing arguments: 1046 1047 try { 1048 throw ArsdException!"Test"(4, "four"); 1049 // you could also `throw new ArsdException!("Test", int, string)(4, "four")` 1050 // but now you start to see how the opCall convenience constructor simplifies things 1051 assert(0); // remove from docs 1052 } catch(ArsdException!("Test", int, string) e) { // catch it and use info by specifying types 1053 assert(e.data[0] == 4); // and extract arguments like this 1054 assert(e.data[1] == "four"); 1055 } 1056 1057 // a throw site can add additional information without breaking code that catches just some 1058 // generally speaking, each additional argument creates a new subclass on top of the previous args 1059 // so you can cast 1060 1061 try { 1062 throw ArsdException!"Test"(4, "four", 9); 1063 assert(0); // remove from docs 1064 } catch(ArsdException!("Test", int, string) e) { // this catch still works 1065 assert(e.data[0] == 4); 1066 assert(e.data[1] == "four"); 1067 // but if you were to print it, all the members would be there 1068 // import std.stdio; writeln(e); // would show something like: 1069 /+ 1070 ArsdException!("Test", int, string, int)@file.d(line): 1071 [0] int: 4 1072 [1] string: four 1073 [2] int: 9 1074 +/ 1075 // indicating that there's additional information available if you wanted to process it 1076 1077 // and meanwhile: 1078 ArsdException!("Test", int) e2 = e; // this implicit cast works thanks to the parent-child relationship 1079 ArsdException!"Test" e3 = e; // this works too, the base type/string still matches 1080 1081 // so catching those types would work too 1082 } 1083 } 1084 1085 /++ 1086 A tagged union that holds an error code from system apis, meaning one from Windows GetLastError() or C's errno. 1087 1088 You construct it with `SystemErrorCode(thing)` and the overloaded constructor tags and stores it. 1089 +/ 1090 struct SystemErrorCode { 1091 /// 1092 enum Type { 1093 errno, /// 1094 win32 /// 1095 } 1096 1097 const Type type; /// 1098 const int code; /// You should technically cast it back to DWORD if it is a win32 code 1099 1100 /++ 1101 C/unix error are typed as signed ints... 1102 Windows' errors are typed DWORD, aka unsigned... 1103 1104 so just passing them straight up will pick the right overload here to set the tag. 1105 +/ 1106 this(int errno) { 1107 this.type = Type.errno; 1108 this.code = errno; 1109 } 1110 1111 /// ditto 1112 this(uint win32) { 1113 this.type = Type.win32; 1114 this.code = win32; 1115 } 1116 1117 /++ 1118 Constructs a string containing both the code and the explanation string. 1119 +/ 1120 string toString() const { 1121 return codeAsString ~ " " ~ errorString; 1122 } 1123 1124 /++ 1125 The numeric code itself as a string. 1126 1127 See [errorString] for a text explanation of the code. 1128 +/ 1129 string codeAsString() const { 1130 char[16] buffer; 1131 final switch(type) { 1132 case Type.errno: 1133 return intToString(code, buffer[]).idup; 1134 case Type.win32: 1135 buffer[0 .. 2] = "0x"; 1136 return buffer[0 .. 2 + intToString(code, buffer[2 .. $], IntToStringArgs().withRadix(16).withPadding(8)).length].idup; 1137 } 1138 } 1139 1140 /++ 1141 A text explanation of the code. See [codeAsString] for a string representation of the numeric representation. 1142 +/ 1143 string errorString() const { 1144 final switch(type) { 1145 case Type.errno: 1146 import core.stdc.string; 1147 auto strptr = strerror(code); 1148 auto orig = strptr; 1149 int len; 1150 while(*strptr++) { 1151 len++; 1152 } 1153 1154 return orig[0 .. len].idup; 1155 case Type.win32: 1156 version(Windows) { 1157 wchar[256] buffer; 1158 auto size = FormatMessageW( 1159 FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, 1160 null, 1161 code, 1162 MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), 1163 buffer.ptr, 1164 buffer.length, 1165 null 1166 ); 1167 1168 return makeUtf8StringFromWindowsString(buffer[0 .. size]).stripInternal; 1169 } else { 1170 return null; 1171 } 1172 } 1173 } 1174 } 1175 1176 /++ 1177 1178 +/ 1179 class SystemApiException : ArsdExceptionBase { 1180 this(string msg, int originalErrorNo, string file = __FILE__, size_t line = __LINE__, Throwable next = null) { 1181 this(msg, SystemErrorCode(originalErrorNo), file, line, next); 1182 } 1183 1184 version(Windows) 1185 this(string msg, DWORD windowsError, string file = __FILE__, size_t line = __LINE__, Throwable next = null) { 1186 this(msg, SystemErrorCode(windowsError), file, line, next); 1187 } 1188 1189 this(string msg, SystemErrorCode code, string file = __FILE__, size_t line = __LINE__, Throwable next = null) { 1190 this.errorCode = code; 1191 1192 super(msg, file, line, next); 1193 } 1194 1195 /++ 1196 1197 +/ 1198 const SystemErrorCode errorCode; 1199 1200 override void getAdditionalPrintableInformation(scope void delegate(string name, in char[] value) sink) const { 1201 super.getAdditionalPrintableInformation(sink); 1202 sink("Error code", errorCode.toString()); 1203 } 1204 1205 } 1206 1207 /++ 1208 The low level use of this would look like `throw new WindowsApiException("MsgWaitForMultipleObjectsEx", GetLastError())` but it is meant to be used from higher level things like [Win32Enforce]. 1209 1210 History: 1211 Moved from simpledisplay.d to core.d in March 2023 (dub v11.0). 1212 +/ 1213 alias WindowsApiException = SystemApiException; 1214 1215 /++ 1216 History: 1217 Moved from simpledisplay.d to core.d in March 2023 (dub v11.0). 1218 +/ 1219 alias ErrnoApiException = SystemApiException; 1220 1221 /++ 1222 Calls the C API function `fn`. If it returns an error value, it throws an [ErrnoApiException] (or subclass) after getting `errno`. 1223 +/ 1224 template ErrnoEnforce(alias fn, alias errorValue = void) { 1225 static if(is(typeof(fn) Return == return)) 1226 static if(is(typeof(fn) Params == __parameters)) { 1227 static if(is(errorValue == void)) { 1228 static if(is(typeof(null) : Return)) 1229 enum errorValueToUse = null; 1230 else static if(is(Return : long)) 1231 enum errorValueToUse = -1; 1232 else 1233 static assert(0, "Please pass the error value"); 1234 } else { 1235 enum errorValueToUse = errorValue; 1236 } 1237 1238 Return ErrnoEnforce(Params params, ArgSentinel sentinel = ArgSentinel.init, string file = __FILE__, size_t line = __LINE__) { 1239 import core.stdc.errno; 1240 1241 Return value = fn(params); 1242 1243 if(value == errorValueToUse) { 1244 throw new ErrnoApiException(__traits(identifier, fn), errno, file, line); 1245 } 1246 1247 return value; 1248 } 1249 } 1250 } 1251 1252 version(Windows) { 1253 /++ 1254 Calls the Windows API function `fn`. If it returns an error value, it throws a [WindowsApiException] (or subclass) after calling `GetLastError()`. 1255 +/ 1256 template Win32Enforce(alias fn, alias errorValue = void) { 1257 static if(is(typeof(fn) Return == return)) 1258 static if(is(typeof(fn) Params == __parameters)) { 1259 static if(is(errorValue == void)) { 1260 static if(is(Return == BOOL)) 1261 enum errorValueToUse = false; 1262 else static if(is(Return : HANDLE)) 1263 enum errorValueToUse = NULL; 1264 else static if(is(Return == DWORD)) 1265 enum errorValueToUse = cast(DWORD) 0xffffffff; 1266 else 1267 static assert(0, "Please pass the error value"); 1268 } else { 1269 enum errorValueToUse = errorValue; 1270 } 1271 1272 Return Win32Enforce(Params params, ArgSentinel sentinel = ArgSentinel.init, string file = __FILE__, size_t line = __LINE__) { 1273 import core.sys.windows.winbase; 1274 1275 Return value = fn(params); 1276 1277 if(value == errorValueToUse) { 1278 auto error = GetLastError(); 1279 throw new WindowsApiException(__traits(identifier, fn), error, file, line); 1280 } 1281 1282 return value; 1283 } 1284 } 1285 } 1286 1287 } 1288 1289 /+ 1290 =============== 1291 EVENT LOOP CORE 1292 =============== 1293 +/ 1294 1295 /+ 1296 UI threads 1297 need to get window messages in addition to all the other jobs 1298 I/O Worker threads 1299 need to get commands for read/writes, run them, and send the reply back. not necessary on Windows 1300 if interrupted, check cancel flags. 1301 CPU Worker threads 1302 gets functions, runs them, send reply back. should send a cancel flag to periodically check 1303 Task worker threads 1304 runs fibers and multiplexes them 1305 1306 1307 General procedure: 1308 issue the read/write command 1309 if it would block on linux, epoll associate it. otherwise do the callback immediately 1310 1311 callbacks have default affinity to the current thread, meaning their callbacks always run here 1312 accepts can usually be dispatched to any available thread tho 1313 1314 // In other words, a single thread can be associated with, at most, one I/O completion port. 1315 1316 Realistically, IOCP only used if there is no thread affinity. If there is, just do overlapped w/ sleepex. 1317 1318 1319 case study: http server 1320 1321 1) main thread starts the server. it does an accept loop with no thread affinity. the main thread does NOT check the global queue (the iocp/global epoll) 1322 2) connections come in and are assigned to first available thread via the iocp/global epoll 1323 3) these run local event loops until the connection task is finished 1324 1325 EVENT LOOP TYPES: 1326 1) main ui thread - MsgWaitForMultipleObjectsEx / epoll on the local ui. it does NOT check the any worker thread thing! 1327 The main ui thread should never terminate until the program is ready to close. 1328 You can have additional ui threads in theory but im not really gonna support that in full; most things will assume there is just the one. simpledisplay's gui thread is the primary if it exists. (and sdpy will prolly continue to be threaded the way it is now) 1329 1330 The biggest complication is the TerminalDirectToEmulator, where the primary ui thread is NOT the thread that runs `main` 1331 2) worker thread GetQueuedCompletionStatusEx / epoll on the local thread fd and the global epoll fd 1332 3) local event loop - check local things only. SleepEx / epoll on local thread fd. This more of a compatibility hack for `waitForCompletion` outside a fiber. 1333 1334 i'll use: 1335 * QueueUserAPC to send interruptions to a worker thread 1336 * PostQueuedCompletionStatus is to send interruptions to any available thread. 1337 * PostMessage to a window 1338 * ??? to a fiber task 1339 1340 I also need a way to de-duplicate events in the queue so if you try to push the same thing it won't trigger multiple times.... I might want to keep a duplicate of the thing... really, what I'd do is post the "event wake up" message and keep the queue in my own thing. (WM_PAINT auto-coalesces) 1341 1342 Destructors need to be able to post messages back to a specific task to queue thread-affinity cleanup. This must be GC safe. 1343 1344 A task might want to wait on certain events. If the task is a fiber, it yields and gets called upon the event. If the task is a thread, it really has to call the event loop... which can be a loop of loops we want to avoid. `waitForCompletion` is more often gonna be used just to run the loop at top level tho... it might not even check for the global info availability so it'd run the local thing only. 1345 1346 APCs should not themselves enter an alterable wait cuz it can stack overflow. So generally speaking, they should avoid calling fibers or other event loops. 1347 +/ 1348 1349 /++ 1350 You can also pass a handle to a specific thread, if you have one. 1351 +/ 1352 enum ThreadToRunIn { 1353 /++ 1354 The callback should be only run by the same thread that set it. 1355 +/ 1356 CurrentThread, 1357 /++ 1358 The UI thread is a special one - it is the supervisor of the workers and the controller of gui and console handles. It is the first thread to call [arsd_core_init] actively running an event loop unless there is a thread that has actively asserted the ui supervisor role. FIXME is this true after i implemen it? 1359 1360 A ui thread should be always quickly responsive to new events. 1361 1362 There should only be one main ui thread, in which simpledisplay and minigui can be used. 1363 1364 Other threads can run like ui threads, but are considered temporary and only concerned with their own needs (it is the default style of loop 1365 for an undeclared thread but will not receive messages from other threads unless there is no other option) 1366 1367 1368 Ad-Hoc thread - something running an event loop that isn't another thing 1369 Controller thread - running an explicit event loop instance set as not a task runner or blocking worker 1370 UI thread - simpledisplay's event loop, which it will require remain live for the duration of the program (running two .eventLoops without a parent EventLoop instance will become illegal, throwing at runtime if it happens telling people to change their code 1371 1372 Windows HANDLES will always be listened on the thread itself that is requesting, UNLESS it is a worker/helper thread, in which case it goes to a coordinator thread. since it prolly can't rely on the parent per se this will have to be one created by arsd core init, UNLESS the parent is inside an explicit EventLoop structure. 1373 1374 All use the MsgWaitForMultipleObjectsEx pattern 1375 1376 1377 +/ 1378 UiThread, 1379 /++ 1380 The callback can be called from any available worker thread. It will be added to a global queue and the first thread to see it will run it. 1381 1382 These will not run on the UI thread unless there is no other option on the platform (and all platforms this lib supports have other options). 1383 1384 These are expected to run cooperatively multitasked things; functions that frequently yield as they wait on other tasks. Think a fiber. 1385 1386 A task runner should be generally responsive to new events. 1387 +/ 1388 AnyAvailableTaskRunnerThread, 1389 /++ 1390 These are expected to run longer blocking, but independent operations. Think an individual function with no context. 1391 1392 A blocking worker can wait hundreds of milliseconds between checking for new events. 1393 +/ 1394 AnyAvailableBlockingWorkerThread, 1395 /++ 1396 The callback will be duplicated across all threads known to the arsd.core event loop. 1397 1398 It adds it to an immutable queue that each thread will go through... might just replace with an exit() function. 1399 1400 1401 so to cancel all associated tasks for like a web server, it could just have the tasks atomicAdd to a counter and subtract when they are finished. Then you have a single semaphore you signal the number of times you have an active thing and wait for them to acknowledge it. 1402 1403 threads should report when they start running the loop and they really should report when they terminate but that isn't reliable 1404 1405 1406 hmmm what if: all user-created threads (the public api) count as ui threads. only ones created in here are task runners or helpers. ui threads can wait on a global event to exit. 1407 1408 there's still prolly be one "the" ui thread, which does the handle listening on windows and is the one sdpy wants. 1409 +/ 1410 BroadcastToAllThreads, 1411 } 1412 1413 /++ 1414 Initializes the arsd core event loop and creates its worker threads. You don't actually have to call this, since the first use of an arsd.core function that requires it will call it implicitly, but calling it yourself gives you a chance to control the configuration more explicitly if you want to. 1415 +/ 1416 void arsd_core_init(int numberOfWorkers = 0) { 1417 1418 } 1419 1420 version(Windows) 1421 class WindowsHandleReader_ex { 1422 // Windows handles are always dispatched to the main ui thread, which can then send a command back to a worker thread to run the callback if needed 1423 this(HANDLE handle) {} 1424 } 1425 1426 version(Posix) 1427 class PosixFdReader_ex { 1428 // posix readers can just register with whatever instance we want to handle the callback 1429 } 1430 1431 /++ 1432 1433 +/ 1434 interface ICoreEventLoop { 1435 /++ 1436 Runs the event loop for this thread until the `until` delegate returns `true`. 1437 +/ 1438 final void run(scope bool delegate() until) { 1439 while(!until()) { 1440 runOnce(); 1441 } 1442 } 1443 1444 /++ 1445 Runs a single iteration of the event loop for this thread. It will return when the first thing happens, but that thing might be totally uninteresting to anyone, or it might trigger significant work you'll wait on. 1446 +/ 1447 void runOnce(); 1448 1449 // to send messages between threads, i'll queue up a function that just call dispatchMessage. can embed the arg inside the callback helper prolly. 1450 // tho i might prefer to actually do messages w/ run payloads so it is easier to deduplicate i can still dedupe by insepcting the call args so idk 1451 1452 version(Arsd_core_epoll) { 1453 @mustuse 1454 static struct UnregisterToken { 1455 private CoreEventLoopImplementation impl; 1456 private int fd; 1457 private CallbackHelper cb; 1458 1459 /++ 1460 Unregisters the file descriptor from the event loop and releases the reference to the callback held by the event loop (which will probably free it). 1461 1462 You must call this when you're done. Normally, this will be right before you close the fd (Which is often after the other side closes it, meaning you got a 0 length read). 1463 +/ 1464 void unregister() { 1465 assert(impl !is null, "Cannot reuse unregister token"); 1466 impl.unregisterFd(fd); 1467 cb.release(); 1468 this = typeof(this).init; 1469 } 1470 } 1471 1472 @mustuse 1473 static struct RearmToken { 1474 private CoreEventLoopImplementation impl; 1475 private int fd; 1476 private CallbackHelper cb; 1477 private uint flags; 1478 1479 /++ 1480 Calls [UnregisterToken.unregister] 1481 +/ 1482 void unregister() { 1483 assert(impl !is null, "cannot reuse rearm token after unregistering it"); 1484 impl.unregisterFd(fd); 1485 cb.release(); 1486 this = typeof(this).init; 1487 } 1488 1489 /++ 1490 Rearms the event so you will get another callback next time it is ready. 1491 +/ 1492 void rearm() { 1493 assert(impl !is null, "cannot reuse rearm token after unregistering it"); 1494 impl.rearmFd(this); 1495 } 1496 } 1497 1498 UnregisterToken addCallbackOnFdReadable(int fd, CallbackHelper cb); 1499 RearmToken addCallbackOnFdReadableOneShot(int fd, CallbackHelper cb); 1500 } 1501 1502 version(Arsd_core_kqueue) { 1503 // can be a do-nothing here since the event is one off 1504 @mustuse 1505 static struct UnregisterToken { 1506 void unregister() {} 1507 } 1508 1509 UnregisterToken addCallbackOnFdReadable(int fd, CallbackHelper cb); 1510 } 1511 } 1512 1513 /++ 1514 Get the event loop associated with this thread 1515 +/ 1516 ICoreEventLoop getThisThreadEventLoop(EventLoopType type = EventLoopType.AdHoc) { 1517 static ICoreEventLoop loop; 1518 if(loop is null) 1519 loop = new CoreEventLoopImplementation(); 1520 return loop; 1521 } 1522 1523 /++ 1524 The internal types that will be exposed through other api things. 1525 +/ 1526 package(arsd) enum EventLoopType { 1527 /++ 1528 The event loop is being run temporarily and the thread doesn't promise to keep running it. 1529 +/ 1530 AdHoc, 1531 /++ 1532 The event loop struct has been instantiated at top level. Its destructor will run when the 1533 function exits, which is only at the end of the entire block of work it is responsible for. 1534 1535 It must be in scope for the whole time the arsd event loop functions are expected to be used 1536 (meaning it should generally be top-level in `main`) 1537 +/ 1538 Explicit, 1539 /++ 1540 A specialization of `Explicit`, so all the same rules apply there, but this is specifically the event loop coming from simpledisplay or minigui. It will run for the duration of the UI's existence. 1541 +/ 1542 Ui, 1543 /++ 1544 A special event loop specifically for threads that listen to the task runner queue and handle I/O events from running tasks. Typically, a task runner runs cooperatively multitasked coroutines (so they prefer not to block the whole thread). 1545 +/ 1546 TaskRunner, 1547 /++ 1548 A special event loop specifically for threads that listen to the helper function request queue. Helper functions are expected to run independently for a somewhat long time (them blocking the thread for some time is normal) and send a reply message back to the requester. 1549 +/ 1550 HelperWorker 1551 } 1552 1553 /+ 1554 Tasks are given an object to talk to their parent... can be a dialog where it is like 1555 1556 sendBuffer 1557 waitForWordToProceed 1558 1559 in a loop 1560 1561 1562 Tasks are assigned to a worker thread and may share it with other tasks. 1563 +/ 1564 1565 1566 // the GC may not be able to see this! remember, it can be hidden inside kernel buffers 1567 private class CallbackHelper { 1568 import core.memory; 1569 1570 void call() { 1571 if(callback) 1572 callback(); 1573 } 1574 1575 void delegate() callback; 1576 void*[3] argsStore; 1577 1578 void addref() { 1579 atomicOp!"+="(refcount, 1); 1580 } 1581 1582 void release() { 1583 if(atomicOp!"-="(refcount, 1) <= 0) { 1584 if(flags & 1) 1585 GC.removeRoot(cast(void*) this); 1586 } 1587 } 1588 1589 private shared(int) refcount; 1590 private uint flags; 1591 1592 this(void function() callback) { 1593 this( () { callback(); } ); 1594 } 1595 1596 this(void delegate() callback, bool addRoot = true) { 1597 if(addRoot) { 1598 GC.addRoot(cast(void*) this); 1599 this.flags |= 1; 1600 } 1601 1602 this.addref(); 1603 this.callback = callback; 1604 } 1605 } 1606 1607 /++ 1608 This represents a file. Technically, file paths aren't actually strings (for example, on Linux, they need not be valid utf-8, while a D string is supposed to be), even though we almost always use them like that. 1609 1610 This type is meant to represent a filename / path. I might not keep it around. 1611 +/ 1612 struct FilePath { 1613 string path; 1614 } 1615 1616 /++ 1617 Represents a generic async, waitable request. 1618 +/ 1619 class AsyncOperationRequest { 1620 /++ 1621 Actually issues the request, starting the operation. 1622 +/ 1623 abstract void start(); 1624 /++ 1625 Cancels the request. This will cause `isComplete` to return true once the cancellation has been processed, but [AsyncOperationResponse.wasSuccessful] will return `false` (unless it completed before the cancellation was processed, in which case it is still allowed to finish successfully). 1626 1627 After cancelling a request, you should still wait for it to complete to ensure that the task has actually released its resources before doing anything else on it. 1628 1629 Once a cancellation request has been sent, it cannot be undone. 1630 +/ 1631 abstract void cancel(); 1632 1633 /++ 1634 Returns `true` if the operation has been completed. It may be completed successfully, cancelled, or have errored out - to check this, call [waitForCompletion] and check the members on the response object. 1635 +/ 1636 abstract bool isComplete(); 1637 /++ 1638 Waits until the request has completed - successfully or otherwise - and returns the response object. It will run an ad-hoc event loop that may call other callbacks while waiting. 1639 1640 The response object may be embedded in the request object - do not reuse the request until you are finished with the response and do not keep the response around longer than you keep the request. 1641 1642 1643 Note to implementers: all subclasses should override this and return their specific response object. You can use the top-level `waitForFirstToCompleteByIndex` function with a single-element static array to help with the implementation. 1644 +/ 1645 abstract AsyncOperationResponse waitForCompletion(); 1646 1647 /++ 1648 1649 +/ 1650 // abstract void repeat(); 1651 } 1652 1653 /++ 1654 1655 +/ 1656 abstract class AsyncOperationResponse { 1657 /++ 1658 Returns true if the request completed successfully, finishing what it was supposed to. 1659 1660 Should be set to `false` if the request was cancelled before completing or encountered an error. 1661 +/ 1662 abstract bool wasSuccessful(); 1663 } 1664 1665 /++ 1666 It returns the $(I request) so you can identify it more easily. `request.waitForCompletion()` is guaranteed to return the response without any actual wait, since it is already complete when this function returns. 1667 1668 Please note that "completion" is not necessary successful completion; a request being cancelled or encountering an error also counts as it being completed. 1669 1670 The `waitForFirstToCompleteByIndex` version instead returns the index of the array entry that completed first. 1671 1672 It is your responsibility to remove the completed request from the array before calling the function again, since any request already completed will always be immediately returned. 1673 1674 You might prefer using [asTheyComplete], which will give each request as it completes and loop over until all of them are complete. 1675 1676 Returns: 1677 `null` or `requests.length` if none completed before returning. 1678 +/ 1679 AsyncOperationRequest waitForFirstToComplete(AsyncOperationRequest[] requests...) { 1680 auto idx = waitForFirstToCompleteByIndex(requests); 1681 if(idx == requests.length) 1682 return null; 1683 return requests[idx]; 1684 } 1685 /// ditto 1686 size_t waitForFirstToCompleteByIndex(AsyncOperationRequest[] requests...) { 1687 size_t helper() { 1688 foreach(idx, request; requests) 1689 if(request.isComplete()) 1690 return idx; 1691 return requests.length; 1692 } 1693 1694 auto idx = helper(); 1695 // if one is already done, return it 1696 if(idx != requests.length) 1697 return idx; 1698 1699 // otherwise, run the ad-hoc event loop until one is 1700 // FIXME: what if we are inside a fiber? 1701 auto el = getThisThreadEventLoop(); 1702 el.run(() => (idx = helper()) != requests.length); 1703 1704 return idx; 1705 } 1706 1707 /++ 1708 Waits for all the `requests` to complete, giving each one through the range interface as it completes. 1709 1710 This meant to be used in a foreach loop. 1711 1712 The `requests` array and its contents must remain valid for the lifetime of the returned range. Its contents may be shuffled as the requests complete (the implementation works through an unstable sort+remove). 1713 +/ 1714 AsTheyCompleteRange asTheyComplete(AsyncOperationRequest[] requests...) { 1715 return AsTheyCompleteRange(requests); 1716 } 1717 /// ditto 1718 struct AsTheyCompleteRange { 1719 AsyncOperationRequest[] requests; 1720 1721 this(AsyncOperationRequest[] requests) { 1722 this.requests = requests; 1723 1724 if(requests.length == 0) 1725 return; 1726 1727 // wait for first one to complete, then move it to the front of the array 1728 moveFirstCompleteToFront(); 1729 } 1730 1731 private void moveFirstCompleteToFront() { 1732 auto idx = waitForFirstToCompleteByIndex(requests); 1733 1734 auto tmp = requests[0]; 1735 requests[0] = requests[idx]; 1736 requests[idx] = tmp; 1737 } 1738 1739 bool empty() { 1740 return requests.length == 0; 1741 } 1742 1743 void popFront() { 1744 assert(!empty); 1745 /+ 1746 this needs to 1747 1) remove the front of the array as being already processed (unless it is the initial priming call) 1748 2) wait for one of them to complete 1749 3) move the complete one to the front of the array 1750 +/ 1751 1752 requests[0] = requests[$-1]; 1753 requests = requests[0 .. $-1]; 1754 1755 if(requests.length) 1756 moveFirstCompleteToFront(); 1757 } 1758 1759 AsyncOperationRequest front() { 1760 return requests[0]; 1761 } 1762 } 1763 1764 version(Windows) { 1765 alias NativeFileHandle = HANDLE; /// 1766 alias NativeSocketHandle = SOCKET; /// 1767 alias NativePipeHandle = HANDLE; /// 1768 } else version(Posix) { 1769 alias NativeFileHandle = int; /// 1770 alias NativeSocketHandle = int; /// 1771 alias NativePipeHandle = int; /// 1772 } 1773 1774 /++ 1775 1776 +/ 1777 final class SyncFile { 1778 private { 1779 NativeFileHandle handle; 1780 } 1781 1782 /++ 1783 +/ 1784 enum OpenMode { 1785 readOnly, /// C's "r", the file is read 1786 writeWithTruncation, /// C's "w", the file is blanked upon opening so it only holds what you write 1787 appendOnly, /// C's "a", writes will always be appended to the file 1788 readAndWrite /// C's "r+", writes will overwrite existing parts of the file based on where you seek (default is at the beginning) 1789 } 1790 1791 /++ 1792 +/ 1793 enum RequirePreexisting { 1794 no, 1795 yes 1796 } 1797 1798 /++ 1799 1800 +/ 1801 ubyte[] read(scope ubyte[] buffer) { 1802 return null; 1803 } 1804 1805 /++ 1806 1807 +/ 1808 void write(in void[] buffer) { 1809 } 1810 1811 enum Seek { 1812 current, 1813 fromBeginning, 1814 fromEnd 1815 } 1816 1817 // Seeking/telling/sizing is not permitted when appending and some files don't support it 1818 void seek(long where, Seek fromWhence) {} 1819 1820 long tell() { return 0; } 1821 1822 long size() { return 0; } 1823 1824 // note that there is no fsync thing, instead use the special flag. 1825 1826 /+ 1827 enum SpecialFlags { 1828 randomAccessExpected, /// FILE_FLAG_SEQUENTIAL_SCAN is turned off 1829 skipCache, /// O_DSYNC, FILE_FLAG_NO_BUFFERING and maybe WRITE_THROUGH. note that metadata still goes through the cache, FlushFileBuffers and fsync can still do those 1830 temporary, /// FILE_ATTRIBUTE_TEMPORARY on Windows, idk how to specify on linux 1831 deleteWhenClosed, /// Windows has a flag for this but idk if it is of any real use 1832 } 1833 +/ 1834 1835 /++ 1836 1837 +/ 1838 this(FilePath filename, OpenMode mode = OpenMode.readOnly, RequirePreexisting require = RequirePreexisting.no) { 1839 version(Windows) { 1840 DWORD access; 1841 DWORD creation; 1842 1843 final switch(mode) { 1844 case OpenMode.readOnly: 1845 access = GENERIC_READ; 1846 creation = OPEN_EXISTING; 1847 break; 1848 case OpenMode.writeWithTruncation: 1849 access = GENERIC_WRITE; 1850 1851 final switch(require) { 1852 case RequirePreexisting.no: 1853 creation = CREATE_ALWAYS; 1854 break; 1855 case RequirePreexisting.yes: 1856 creation = TRUNCATE_EXISTING; 1857 break; 1858 } 1859 break; 1860 case OpenMode.appendOnly: 1861 access = FILE_APPEND_DATA; 1862 1863 final switch(require) { 1864 case RequirePreexisting.no: 1865 creation = CREATE_ALWAYS; 1866 break; 1867 case RequirePreexisting.yes: 1868 creation = OPEN_EXISTING; 1869 break; 1870 } 1871 break; 1872 case OpenMode.readAndWrite: 1873 access = GENERIC_READ | GENERIC_WRITE; 1874 1875 final switch(require) { 1876 case RequirePreexisting.no: 1877 creation = CREATE_NEW; 1878 break; 1879 case RequirePreexisting.yes: 1880 creation = OPEN_EXISTING; 1881 break; 1882 } 1883 break; 1884 } 1885 1886 WCharzBuffer wname = WCharzBuffer(filename.path); 1887 1888 auto handle = CreateFileW( 1889 wname.ptr, 1890 access, 1891 FILE_SHARE_READ, 1892 null, 1893 creation, 1894 FILE_ATTRIBUTE_NORMAL,/* | FILE_FLAG_OVERLAPPED,*/ 1895 null 1896 ); 1897 1898 if(handle == INVALID_HANDLE_VALUE) { 1899 // FIXME: throw the filename and other params here too 1900 throw new WindowsApiException("CreateFileW", GetLastError()); 1901 } 1902 1903 this.handle = handle; 1904 } else version(Posix) { 1905 import core.sys.posix.unistd; 1906 import core.sys.posix.fcntl; 1907 1908 CharzBuffer namez = CharzBuffer(filename.path); 1909 int flags; 1910 1911 // FIXME does mac not have cloexec for real or is this just a druntime problem????? 1912 version(Arsd_core_has_cloexec) { 1913 flags = O_CLOEXEC; 1914 } else { 1915 scope(success) 1916 setCloExec(this.handle); 1917 } 1918 1919 final switch(mode) { 1920 case OpenMode.readOnly: 1921 flags |= O_RDONLY; 1922 break; 1923 case OpenMode.writeWithTruncation: 1924 flags |= O_WRONLY | O_TRUNC; 1925 1926 final switch(require) { 1927 case RequirePreexisting.no: 1928 flags |= O_CREAT; 1929 break; 1930 case RequirePreexisting.yes: 1931 break; 1932 } 1933 break; 1934 case OpenMode.appendOnly: 1935 flags |= O_APPEND; 1936 1937 final switch(require) { 1938 case RequirePreexisting.no: 1939 flags |= O_CREAT; 1940 break; 1941 case RequirePreexisting.yes: 1942 break; 1943 } 1944 break; 1945 case OpenMode.readAndWrite: 1946 flags |= O_RDWR; 1947 1948 final switch(require) { 1949 case RequirePreexisting.no: 1950 flags |= O_CREAT; 1951 break; 1952 case RequirePreexisting.yes: 1953 break; 1954 } 1955 break; 1956 } 1957 1958 int fd = open(namez.ptr, flags, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); 1959 if(fd == -1) 1960 throw new ErrnoApiException("open", errno); 1961 1962 this.handle = fd; 1963 } 1964 } 1965 1966 /++ 1967 1968 +/ 1969 this(NativeFileHandle handleToWrap) { 1970 this.handle = handleToWrap; 1971 } 1972 1973 /++ 1974 1975 +/ 1976 void close() { 1977 version(Windows) { 1978 Win32Enforce!CloseHandle(handle); 1979 handle = null; 1980 } else version(Posix) { 1981 import unix = core.sys.posix.unistd; 1982 import core.sys.posix.fcntl; 1983 1984 ErrnoEnforce!(unix.close)(handle); 1985 handle = -1; 1986 } 1987 } 1988 } 1989 1990 version(none) 1991 void main() { 1992 auto file = new AsyncFile(FilePath("test.txt"), AsyncFile.OpenMode.write, AsyncFile.PreserveContents.truncateIfWriting, AsyncFile.RequirePreexisting.yes); 1993 1994 auto buffer = cast(ubyte[]) "hello"; 1995 auto wr = new AsyncWriteRequest(file, buffer); 1996 wr.start(); 1997 1998 wr.waitForCompletion(); 1999 2000 file.close(); 2001 } 2002 2003 mixin template OverlappedIoRequest(Response) { 2004 private { 2005 SyncFile file; 2006 ubyte[] buffer; 2007 2008 OwnedClass!Response response; 2009 2010 version(Windows) { 2011 OVERLAPPED overlapped; 2012 2013 extern(Windows) 2014 static void overlappedCompletionRoutine(DWORD dwErrorCode, DWORD dwNumberOfBytesTransferred, LPOVERLAPPED lpOverlapped) { 2015 typeof(this) rr = cast(typeof(this)) (cast(void*) lpOverlapped - typeof(this).overlapped.offsetof); 2016 2017 // this will queue our CallbackHelper and that should be run at the end of the event loop after it is woken up by the APC run 2018 } 2019 } 2020 2021 bool started; 2022 } 2023 2024 override void cancel() { 2025 version(Windows) { 2026 Win32Enforce!CancelIoEx(file.handle, &overlapped); 2027 } else version(Posix) { 2028 // FIXME 2029 } 2030 } 2031 2032 override bool isComplete() { 2033 version(Windows) { 2034 return HasOverlappedIoCompleted(&overlapped); 2035 } else version(Posix) { 2036 return true; 2037 2038 } 2039 } 2040 2041 override Response waitForCompletion() { 2042 if(!started) 2043 start(); 2044 2045 version(Windows) { 2046 SleepEx(INFINITE, true); 2047 2048 //DWORD numberTransferred; 2049 //Win32Enforce!GetOverlappedResult(file.handle, &overlapped, &numberTransferred, true); 2050 } else version(Posix) { 2051 2052 } 2053 2054 return response; 2055 } 2056 } 2057 2058 /++ 2059 You can write to a file asynchronously by creating one of these. 2060 +/ 2061 final class AsyncWriteRequest : AsyncOperationRequest { 2062 mixin OverlappedIoRequest!AsyncWriteResponse; 2063 2064 this() { 2065 response = typeof(response).defaultConstructed; 2066 } 2067 2068 override void start() { 2069 } 2070 2071 /++ 2072 2073 +/ 2074 void repeat() { 2075 // FIXME 2076 } 2077 } 2078 2079 class AsyncWriteResponse : AsyncOperationResponse { 2080 override bool wasSuccessful() { 2081 return false; 2082 } 2083 } 2084 2085 2086 final class AsyncReadRequest : AsyncOperationRequest { 2087 mixin OverlappedIoRequest!AsyncReadResponse; 2088 2089 /++ 2090 The file must have the overlapped flag enabled on Windows and the nonblock flag set on Posix. 2091 2092 The buffer MUST NOT be touched by you - not used by another request, modified, read, or freed, including letting a static array going out of scope - until this request's `isComplete` returns `true`. 2093 2094 The offset pointer is shared between this and other requests. 2095 +/ 2096 this(NativeFileHandle file, ubyte[] buffer, shared(long)* offset) { 2097 response = typeof(response).defaultConstructed; 2098 } 2099 2100 override void start() { 2101 version(Windows) { 2102 auto ret = ReadFileEx(file.handle, buffer.ptr, cast(DWORD) buffer.length, &overlapped, &overlappedCompletionRoutine); 2103 // need to check GetLastError 2104 2105 // ReadFileEx always queues, even if it completed synchronously. I *could* check the get overlapped result and sleepex here but i'm prolly better off just letting the event loop do its thing anyway. 2106 } else version(Posix) { 2107 import core.sys.posix.unistd; 2108 2109 // first try to just do it 2110 auto ret = read(file.handle, buffer.ptr, buffer.length); 2111 2112 // then if it doesn't complete synchronously, need to event loop register 2113 2114 // if we are inside a fiber task, it can simply yield and call the fiber in the callback 2115 // when we return here, it tries to read again 2116 2117 // if not inside, we need to ensure the buffer remains valid and set a callback... and return. 2118 // the callback must retry the read 2119 2120 // generally, the callback must satisfy the read somehow they set the callback to trigger the result object's completion handler 2121 } 2122 } 2123 /++ 2124 Cancels the request. This will cause `isComplete` to return true once the cancellation has been processed, but [AsyncOperationResponse.wasSuccessful] will return `false` (unless it completed before the cancellation was processed, in which case it is still allowed to finish successfully). 2125 2126 After cancelling a request, you should still wait for it to complete to ensure that the task has actually released its resources before doing anything else on it. 2127 2128 Once a cancellation request has been sent, it cannot be undone. 2129 +/ 2130 override void cancel() { 2131 2132 } 2133 2134 /++ 2135 Returns `true` if the operation has been completed. It may be completed successfully, cancelled, or have errored out - to check this, call [waitForCompletion] and check the members on the response object. 2136 +/ 2137 override bool isComplete() { 2138 return false; 2139 } 2140 /++ 2141 Waits until the request has completed - successfully or otherwise - and returns the response object. 2142 2143 The response object may be embedded in the request object - do not reuse the request until you are finished with the response and do not keep the response around longer than you keep the request. 2144 2145 2146 Note to implementers: all subclasses should override this and return their specific response object. You can use the top-level `waitForFirstToCompleteByIndex` function with a single-element static array to help with the implementation. 2147 +/ 2148 override AsyncOperationResponse waitForCompletion() { 2149 return response; 2150 } 2151 2152 /++ 2153 2154 +/ 2155 // abstract void repeat(); 2156 } 2157 2158 class AsyncReadResponse : AsyncOperationResponse { 2159 override bool wasSuccessful() { 2160 return false; 2161 } 2162 } 2163 2164 /+ 2165 Tasks: 2166 startTask() 2167 startSubTask() - what if it just did this when it knows it is being run from inside a task? 2168 runHelperFunction() - whomever it reports to is the parent 2169 +/ 2170 2171 private class CoreEventLoopImplementation : ICoreEventLoop { 2172 2173 version(Arsd_core_kqueue) { 2174 void runOnce() { 2175 kevent_t[16] ev; 2176 //timespec tout = timespec(1, 0); 2177 auto nev = kevent(kqueuefd, null, 0, ev.ptr, ev.length, null/*&tout*/); 2178 if(nev == -1) { 2179 // FIXME: EINTR 2180 throw new SystemApiException("kevent", errno); 2181 } else if(nev == 0) { 2182 // timeout 2183 } else { 2184 foreach(event; ev[0 .. nev]) { 2185 if(event.filter == EVFILT_SIGNAL) { 2186 // FIXME: I could prolly do this better tbh 2187 markSignalOccurred(cast(int) event.ident); 2188 signalChecker(); 2189 } else { 2190 // FIXME: event.filter more specific? 2191 CallbackHelper cb = cast(CallbackHelper) event.udata; 2192 cb.call(); 2193 } 2194 } 2195 } 2196 } 2197 2198 // FIXME: idk how to make one event that multiple kqueues can listen to w/o being shared 2199 // maybe a shared kqueue could work that the thread kqueue listen to (which i rejected for 2200 // epoll cuz it caused thundering herd problems but maybe it'd work here) 2201 2202 UnregisterToken addCallbackOnFdReadable(int fd, CallbackHelper cb) { 2203 kevent_t ev; 2204 2205 EV_SET(&ev, fd, EVFILT_READ, EV_ADD | EV_ENABLE/* | EV_ONESHOT*/, 0, 0, cast(void*) cb); 2206 2207 ErrnoEnforce!kevent(kqueuefd, &ev, 1, null, 0, null); 2208 2209 return UnregisterToken(); 2210 } 2211 2212 private void triggerGlobalEvent() { 2213 ubyte a; 2214 import core.sys.posix.unistd; 2215 write(kqueueGlobalFd[1], &a, 1); 2216 } 2217 2218 private this() { 2219 kqueuefd = ErrnoEnforce!kqueue(); 2220 setCloExec(kqueuefd); // FIXME O_CLOEXEC 2221 2222 if(kqueueGlobalFd[0] == 0) { 2223 import core.sys.posix.unistd; 2224 pipe(kqueueGlobalFd); 2225 2226 signal(SIGINT, SIG_IGN); // FIXME 2227 } 2228 2229 kevent_t ev; 2230 2231 EV_SET(&ev, SIGCHLD, EVFILT_SIGNAL, EV_ADD | EV_ENABLE, 0, 0, null); 2232 ErrnoEnforce!kevent(kqueuefd, &ev, 1, null, 0, null); 2233 EV_SET(&ev, SIGINT, EVFILT_SIGNAL, EV_ADD | EV_ENABLE, 0, 0, null); 2234 ErrnoEnforce!kevent(kqueuefd, &ev, 1, null, 0, null); 2235 2236 globalEventSent = new CallbackHelper(&readGlobalEvent); 2237 EV_SET(&ev, kqueueGlobalFd[0], EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, cast(void*) globalEventSent); 2238 ErrnoEnforce!kevent(kqueuefd, &ev, 1, null, 0, null); 2239 } 2240 2241 private int kqueuefd = -1; 2242 2243 private CallbackHelper globalEventSent; 2244 void readGlobalEvent() { 2245 kevent_t event; 2246 2247 import core.sys.posix.unistd; 2248 ubyte a; 2249 read(kqueueGlobalFd[0], &a, 1); 2250 2251 // FIXME: the thread is woken up, now we need to check the circualr buffer queue 2252 } 2253 2254 private __gshared int[2] kqueueGlobalFd; 2255 } 2256 2257 /+ 2258 // this setup needs no extra allocation 2259 auto op = read(file, buffer); 2260 op.oncomplete = &thisfiber.call; 2261 op.start(); 2262 thisfiber.yield(); 2263 auto result = op.waitForCompletion(); // guaranteed to return instantly thanks to previous setup 2264 2265 can generically abstract that into: 2266 2267 auto result = thisTask.await(read(file, buffer)); 2268 2269 2270 You MUST NOT use buffer in any way - not read, modify, deallocate, reuse, anything - until the PendingOperation is complete. 2271 2272 Note that PendingOperation may just be a wrapper around an internally allocated object reference... but then if you do a waitForFirstToComplete what happens? 2273 2274 those could of course just take the value type things 2275 +/ 2276 2277 2278 version(Arsd_core_windows) { 2279 // all event loops share the one iocp, Windows 2280 // manages how to do it 2281 __gshared HANDLE iocpTaskRunners; 2282 __gshared HANDLE iocpWorkers; 2283 // i think to terminate i just have to post the message at least once for every thread i know about, maybe a few more times for threads i don't know about. 2284 2285 bool isWorker; // if it is a worker we wait on the iocp, if not we wait on msg 2286 2287 void runOnce() { 2288 if(isWorker) { 2289 // this function is only supported on Windows Vista and up, so using this 2290 // means dropping support for XP. 2291 //GetQueuedCompletionStatusEx(); 2292 } else { 2293 //MsgWaitForMultipleObjectsEx(); 2294 if(true) { 2295 // handle: timeout 2296 // HANDLE ready, forward message 2297 // window messages 2298 // also sleepex if needed 2299 } 2300 } 2301 } 2302 } 2303 2304 version(Posix) { 2305 private __gshared uint sigChildHappened = 0; 2306 private __gshared uint sigIntrHappened = 0; 2307 2308 static void signalChecker() { 2309 if(cas(&sigChildHappened, 1, 0)) { 2310 while(true) { // multiple children could have exited before we processed the notification 2311 2312 import core.sys.posix.sys.wait; 2313 2314 int status; 2315 auto pid = waitpid(-1, &status, WNOHANG); 2316 if(pid == -1) { 2317 import core.stdc.errno; 2318 auto errno = errno; 2319 if(errno == ECHILD) 2320 break; // also all done, there are no children left 2321 // no need to check EINTR since we set WNOHANG 2322 throw new ErrnoApiException("waitpid", errno); 2323 } 2324 if(pid == 0) 2325 break; // all done, all children are still running 2326 2327 // look up the pid for one of our objects 2328 // if it is found, inform it of its status 2329 // and then inform its controlling thread 2330 // to wake up so it can check its waitForCompletion, 2331 // trigger its callbacks, etc. 2332 2333 ExternalProcess.recordChildTerminated(pid, status); 2334 } 2335 2336 } 2337 if(cas(&sigIntrHappened, 1, 0)) { 2338 // FIXME 2339 import core.stdc.stdlib; 2340 exit(0); 2341 } 2342 } 2343 2344 /++ 2345 Informs the arsd.core system that the given signal happened. You can call this from inside a signal handler. 2346 +/ 2347 public static void markSignalOccurred(int sigNumber) nothrow { 2348 import core.sys.posix.unistd; 2349 2350 if(sigNumber == SIGCHLD) 2351 volatileStore(&sigChildHappened, 1); 2352 if(sigNumber == SIGINT) 2353 volatileStore(&sigIntrHappened, 1); 2354 2355 version(Arsd_core_epoll) { 2356 ulong writeValue = 1; 2357 write(signalPipeFd, &writeValue, writeValue.sizeof); 2358 } 2359 } 2360 } 2361 2362 version(Arsd_core_epoll) { 2363 2364 import core.sys.linux.epoll; 2365 import core.sys.linux.sys.eventfd; 2366 2367 private this() { 2368 2369 if(!globalsInitialized) { 2370 synchronized { 2371 if(!globalsInitialized) { 2372 // blocking signals is problematic because it is inherited by child processes 2373 // and that can be problematic for general purpose stuff so i use a self pipe 2374 // here. though since it is linux, im using an eventfd instead just to notify 2375 signalPipeFd = ErrnoEnforce!eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); 2376 signalReaderCallback = new CallbackHelper(&signalReader); 2377 2378 runInTaskRunnerQueue = new CallbackQueue("task runners", true); 2379 runInHelperThreadQueue = new CallbackQueue("helper threads", true); 2380 2381 setSignalHandlers(); 2382 2383 globalsInitialized = true; 2384 } 2385 } 2386 } 2387 2388 epollfd = epoll_create1(EPOLL_CLOEXEC); 2389 2390 // FIXME: ensure UI events get top priority 2391 2392 // global listeners 2393 2394 // FIXME: i should prolly keep the tokens and release them when tearing down. 2395 2396 cast(void) addCallbackOnFdReadable(signalPipeFd, signalReaderCallback); 2397 if(true) { // FIXME: if this is a task runner vs helper thread vs ui thread 2398 cast(void) addCallbackOnFdReadable(runInTaskRunnerQueue.fd, runInTaskRunnerQueue.callback); 2399 runInTaskRunnerQueue.callback.addref(); 2400 } else { 2401 cast(void) addCallbackOnFdReadable(runInHelperThreadQueue.fd, runInHelperThreadQueue.callback); 2402 runInHelperThreadQueue.callback.addref(); 2403 } 2404 2405 // local listener 2406 thisThreadQueue = new CallbackQueue("this thread", false); 2407 cast(void) addCallbackOnFdReadable(thisThreadQueue.fd, thisThreadQueue.callback); 2408 2409 // what are we going to do about timers? 2410 } 2411 2412 void teardown() { 2413 import core.sys.posix.fcntl; 2414 import core.sys.posix.unistd; 2415 2416 close(epollfd); 2417 epollfd = -1; 2418 2419 thisThreadQueue.teardown(); 2420 2421 // FIXME: should prolly free anything left in the callback queue, tho those could also be GC managed tbh. 2422 } 2423 2424 static void teardownGlobals() { 2425 import core.sys.posix.fcntl; 2426 import core.sys.posix.unistd; 2427 2428 synchronized { 2429 restoreSignalHandlers(); 2430 close(signalPipeFd); 2431 signalReaderCallback.release(); 2432 2433 runInTaskRunnerQueue.teardown(); 2434 runInHelperThreadQueue.teardown(); 2435 2436 globalsInitialized = false; 2437 } 2438 2439 } 2440 2441 2442 private static final class CallbackQueue { 2443 int fd = -1; 2444 string name; 2445 CallbackHelper callback; 2446 SynchronizedCircularBuffer!CallbackHelper queue; 2447 2448 this(string name, bool dequeueIsShared) { 2449 this.name = name; 2450 queue = typeof(queue)(this); 2451 2452 fd = ErrnoEnforce!eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK | (dequeueIsShared ? EFD_SEMAPHORE : 0)); 2453 2454 callback = new CallbackHelper(dequeueIsShared ? &sharedDequeueCb : &threadLocalDequeueCb); 2455 } 2456 2457 bool resetEvent() { 2458 import core.sys.posix.unistd; 2459 ulong count; 2460 return read(fd, &count, count.sizeof) == count.sizeof; 2461 } 2462 2463 void sharedDequeueCb() { 2464 if(resetEvent()) { 2465 auto cb = queue.dequeue(); 2466 cb.call(); 2467 cb.release(); 2468 } 2469 } 2470 2471 void threadLocalDequeueCb() { 2472 CallbackHelper[16] buffer; 2473 foreach(cb; queue.dequeueSeveral(buffer[], () { resetEvent(); })) { 2474 cb.call(); 2475 cb.release(); 2476 } 2477 } 2478 2479 void enqueue(CallbackHelper cb) { 2480 if(queue.enqueue(cb)) { 2481 import core.sys.posix.unistd; 2482 ulong count = 1; 2483 ErrnoEnforce!write(fd, &count, count.sizeof); 2484 } else { 2485 throw new ArsdException!"queue is full"(name); 2486 } 2487 } 2488 2489 void teardown() { 2490 import core.sys.posix.fcntl; 2491 import core.sys.posix.unistd; 2492 2493 close(fd); 2494 fd = -1; 2495 2496 callback.release(); 2497 } 2498 2499 alias queue this; 2500 } 2501 2502 // there's a global instance of this we refer back to 2503 private __gshared { 2504 bool globalsInitialized; 2505 2506 CallbackHelper signalReaderCallback; 2507 2508 CallbackQueue runInTaskRunnerQueue; 2509 CallbackQueue runInHelperThreadQueue; 2510 2511 int exitEventFd = -1; // FIXME: implement 2512 } 2513 2514 // and then the local loop 2515 private { 2516 int epollfd = -1; 2517 2518 CallbackQueue thisThreadQueue; 2519 } 2520 2521 // signal stuff { 2522 import core.sys.posix.signal; 2523 2524 private __gshared sigaction_t oldSigIntr; 2525 private __gshared sigaction_t oldSigChld; 2526 private __gshared sigaction_t oldSigPipe; 2527 2528 private __gshared int signalPipeFd = -1; 2529 // sigpipe not important, i handle errors on the writes 2530 2531 public static void setSignalHandlers() { 2532 static extern(C) void interruptHandler(int sigNumber) nothrow { 2533 markSignalOccurred(sigNumber); 2534 2535 /+ 2536 // calling the old handler is non-trivial since there can be ignore 2537 // or default or a plain handler or a sigaction 3 arg handler and i 2538 // i don't think it is worth teh complication 2539 sigaction_t* oldHandler; 2540 if(sigNumber == SIGCHLD) 2541 oldHandler = &oldSigChld; 2542 else if(sigNumber == SIGINT) 2543 oldHandler = &oldSigIntr; 2544 if(oldHandler && oldHandler.sa_handler) 2545 oldHandler 2546 +/ 2547 } 2548 2549 sigaction_t n; 2550 n.sa_handler = &interruptHandler; 2551 n.sa_mask = cast(sigset_t) 0; 2552 n.sa_flags = 0; 2553 sigaction(SIGINT, &n, &oldSigIntr); 2554 sigaction(SIGCHLD, &n, &oldSigChld); 2555 2556 n.sa_handler = SIG_IGN; 2557 sigaction(SIGPIPE, &n, &oldSigPipe); 2558 } 2559 2560 public static void restoreSignalHandlers() { 2561 sigaction(SIGINT, &oldSigIntr, null); 2562 sigaction(SIGCHLD, &oldSigChld, null); 2563 sigaction(SIGPIPE, &oldSigPipe, null); 2564 } 2565 2566 private static void signalReader() { 2567 import core.sys.posix.unistd; 2568 ulong number; 2569 read(signalPipeFd, &number, number.sizeof); 2570 2571 signalChecker(); 2572 } 2573 // signal stuff done } 2574 2575 // the any thread poll is just registered in the this thread poll w/ exclusive. nobody actaully epoll_waits 2576 // on the global one directly. 2577 2578 void runOnce() { 2579 epoll_event[16] events; 2580 auto ret = epoll_wait(epollfd, events.ptr, cast(int) events.length, -1); // FIXME: timeout 2581 if(ret == -1) { 2582 import core.stdc.errno; 2583 if(errno == EINTR) { 2584 return; 2585 } 2586 throw new ErrnoApiException("epoll_wait", errno); 2587 } else if(ret == 0) { 2588 // timeout 2589 } else { 2590 // loop events and call associated callbacks 2591 foreach(event; events[0 .. ret]) { 2592 auto flags = event.events; 2593 auto cbObject = cast(CallbackHelper) event.data.ptr; 2594 2595 // FIXME: or if it is an error... 2596 // EPOLLERR - write end of pipe when read end closed or other error. and EPOLLHUP - terminal hangup or read end when write end close (but it will give 0 reading after that soon anyway) 2597 2598 cbObject.call(); 2599 } 2600 } 2601 } 2602 2603 // building blocks for low-level integration with the loop 2604 2605 UnregisterToken addCallbackOnFdReadable(int fd, CallbackHelper cb) { 2606 epoll_event event; 2607 event.data.ptr = cast(void*) cb; 2608 event.events = EPOLLIN | EPOLLEXCLUSIVE; 2609 if(epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event) == -1) 2610 throw new ErrnoApiException("epoll_ctl", errno); 2611 2612 return UnregisterToken(this, fd, cb); 2613 } 2614 2615 /++ 2616 Adds a one-off callback that you can optionally rearm when it happens. 2617 +/ 2618 RearmToken addCallbackOnFdReadableOneShot(int fd, CallbackHelper cb) { 2619 epoll_event event; 2620 event.data.ptr = cast(void*) cb; 2621 event.events = EPOLLIN | EPOLLONESHOT; 2622 if(epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event) == -1) 2623 throw new ErrnoApiException("epoll_ctl", errno); 2624 2625 return RearmToken(this, fd, cb, EPOLLIN | EPOLLONESHOT); 2626 } 2627 2628 private void unregisterFd(int fd) { 2629 epoll_event event; 2630 if(epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, &event) == -1) 2631 throw new ErrnoApiException("epoll_ctl", errno); 2632 } 2633 2634 private void rearmFd(RearmToken token) { 2635 epoll_event event; 2636 event.data.ptr = cast(void*) token.cb; 2637 event.events = token.flags; 2638 if(epoll_ctl(epollfd, EPOLL_CTL_MOD, token.fd, &event) == -1) 2639 throw new ErrnoApiException("epoll_ctl", errno); 2640 } 2641 2642 // Disk files will have to be sent as messages to a worker to do the read and report back a completion packet. 2643 } 2644 2645 version(Arsd_core_kqueue) { 2646 // FIXME 2647 } 2648 2649 // cross platform adapters 2650 void setTimeout() {} 2651 void addFileOrDirectoryChangeListener(FilePath name, uint flags, bool recursive = false) {} 2652 } 2653 2654 // deduplication???????// 2655 bool postMessage(ThreadToRunIn destination, void delegate() code) { 2656 return false; 2657 } 2658 bool postMessage(ThreadToRunIn destination, Object message) { 2659 return false; 2660 } 2661 2662 // to test the mailboxes 2663 /+ 2664 void main() { 2665 /+ 2666 import std.stdio; 2667 Thread[4] pool; 2668 2669 bool shouldExit; 2670 2671 static int received; 2672 2673 static void tester() { 2674 received++; 2675 //writeln(cast(void*) Thread.getThis, " ", received); 2676 } 2677 2678 foreach(ref thread; pool) { 2679 thread = new Thread(() { 2680 getThisThreadEventLoop().run(() { 2681 return shouldExit; 2682 }); 2683 }); 2684 thread.start(); 2685 } 2686 2687 getThisThreadEventLoop(); // ensure it is all initialized before proceeding. FIXME: i should have an ensure initialized function i do on most the public apis. 2688 2689 int lol; 2690 2691 try 2692 foreach(i; 0 .. 6000) { 2693 CoreEventLoopImplementation.runInTaskRunnerQueue.enqueue(new CallbackHelper(&tester)); 2694 lol = cast(int) i; 2695 } 2696 catch(ArsdExceptionBase e) { 2697 Thread.sleep(50.msecs); 2698 writeln(e); 2699 writeln(lol); 2700 } 2701 2702 import core.stdc.stdlib; 2703 exit(0); 2704 2705 version(none) 2706 foreach(i; 0 .. 100) 2707 CoreEventLoopImplementation.runInTaskRunnerQueue.enqueue(new CallbackHelper(&tester)); 2708 2709 2710 foreach(ref thread; pool) { 2711 thread.join(); 2712 } 2713 +/ 2714 2715 2716 static int received; 2717 2718 static void tester() { 2719 received++; 2720 //writeln(cast(void*) Thread.getThis, " ", received); 2721 } 2722 2723 2724 2725 auto ev = cast(CoreEventLoopImplementation) getThisThreadEventLoop(); 2726 foreach(i; 0 .. 100) 2727 ev.thisThreadQueue.enqueue(new CallbackHelper(&tester)); 2728 foreach(i; 0 .. 100 / 16 + 1) 2729 ev.runOnce(); 2730 import std.conv; 2731 assert(received == 100, to!string(received)); 2732 2733 } 2734 +/ 2735 2736 /++ 2737 This is primarily a helper for the event queues. It is public in the hope it might be useful, 2738 but subject to change without notice; I will treat breaking it the same as if it is private. 2739 (That said, it is a simple little utility that does its job, so it is unlikely to change much. 2740 The biggest change would probably be letting it grow and changing from inline to dynamic array.) 2741 2742 It is a fixed-size ring buffer that synchronizes on a given object you give it in the constructor. 2743 2744 After enqueuing something, you should probably set an event to notify the other threads. This is left 2745 as an exercise to you (or another wrapper). 2746 +/ 2747 struct SynchronizedCircularBuffer(T, size_t maxSize = 128) { 2748 private T[maxSize] ring; 2749 private int front; 2750 private int back; 2751 2752 private Object synchronizedOn; 2753 2754 @disable this(); 2755 2756 /++ 2757 The Object's monitor is used to synchronize the methods in here. 2758 +/ 2759 this(Object synchronizedOn) { 2760 this.synchronizedOn = synchronizedOn; 2761 } 2762 2763 /++ 2764 Note the potential race condition between calling this and actually dequeuing something. You might 2765 want to acquire the lock on the object before calling this (nested synchronized things are allowed 2766 as long as the same thread is the one doing it). 2767 +/ 2768 bool isEmpty() { 2769 synchronized(this.synchronizedOn) { 2770 return front == back; 2771 } 2772 } 2773 2774 /++ 2775 Note the potential race condition between calling this and actually queuing something. 2776 +/ 2777 bool isFull() { 2778 synchronized(this.synchronizedOn) { 2779 return isFullUnsynchronized(); 2780 } 2781 } 2782 2783 private bool isFullUnsynchronized() nothrow const { 2784 return ((back + 1) % ring.length) == front; 2785 2786 } 2787 2788 /++ 2789 If this returns true, you should signal listening threads (with an event or a semaphore, 2790 depending on how you dequeue it). If it returns false, the queue was full and your thing 2791 was NOT added. You might wait and retry later (you could set up another event to signal it 2792 has been read and wait for that, or maybe try on a timer), or just fail and throw an exception 2793 or to abandon the message. 2794 +/ 2795 bool enqueue(T what) { 2796 synchronized(this.synchronizedOn) { 2797 if(isFullUnsynchronized()) 2798 return false; 2799 ring[(back++) % ring.length] = what; 2800 return true; 2801 } 2802 } 2803 2804 private T dequeueUnsynchronized() nothrow { 2805 assert(front != back); 2806 return ring[(front++) % ring.length]; 2807 } 2808 2809 /++ 2810 If you are using a semaphore to signal, you can call this once for each count of it 2811 and you can do that separately from this call (though they should be paired). 2812 2813 If you are using an event, you should use [dequeueSeveral] instead to drain it. 2814 +/ 2815 T dequeue() { 2816 synchronized(this.synchronizedOn) { 2817 return dequeueUnsynchronized(); 2818 } 2819 } 2820 2821 /++ 2822 Note that if you use a semaphore to signal waiting threads, you should probably not call this. 2823 2824 If you use a set/reset event, there's a potential race condition between the dequeue and event 2825 reset. This is why the `runInsideLockIfEmpty` delegate is there - when it is empty, before it 2826 unlocks, it will give you a chance to reset the event. Otherwise, it can remain set to indicate 2827 that there's still pending data in the queue. 2828 +/ 2829 T[] dequeueSeveral(return T[] buffer, scope void delegate() runInsideLockIfEmpty = null) { 2830 int pos; 2831 synchronized(this.synchronizedOn) { 2832 while(pos < buffer.length && front != back) { 2833 buffer[pos++] = dequeueUnsynchronized(); 2834 } 2835 if(front == back && runInsideLockIfEmpty !is null) 2836 runInsideLockIfEmpty(); 2837 } 2838 return buffer[0 .. pos]; 2839 } 2840 } 2841 2842 unittest { 2843 Object object = new Object(); 2844 auto queue = SynchronizedCircularBuffer!CallbackHelper(object); 2845 assert(queue.isEmpty); 2846 foreach(i; 0 .. queue.ring.length - 1) 2847 queue.enqueue(cast(CallbackHelper) cast(void*) i); 2848 assert(queue.isFull); 2849 2850 foreach(i; 0 .. queue.ring.length - 1) 2851 assert(queue.dequeue() is (cast(CallbackHelper) cast(void*) i)); 2852 assert(queue.isEmpty); 2853 2854 foreach(i; 0 .. queue.ring.length - 1) 2855 queue.enqueue(cast(CallbackHelper) cast(void*) i); 2856 assert(queue.isFull); 2857 2858 CallbackHelper[] buffer = new CallbackHelper[](300); 2859 auto got = queue.dequeueSeveral(buffer); 2860 assert(got.length == queue.ring.length - 1); 2861 assert(queue.isEmpty); 2862 foreach(i, item; got) 2863 assert(item is (cast(CallbackHelper) cast(void*) i)); 2864 2865 foreach(i; 0 .. 8) 2866 queue.enqueue(cast(CallbackHelper) cast(void*) i); 2867 buffer = new CallbackHelper[](4); 2868 got = queue.dequeueSeveral(buffer); 2869 assert(got.length == 4); 2870 foreach(i, item; got) 2871 assert(item is (cast(CallbackHelper) cast(void*) i)); 2872 got = queue.dequeueSeveral(buffer); 2873 assert(got.length == 4); 2874 foreach(i, item; got) 2875 assert(item is (cast(CallbackHelper) cast(void*) (i+4))); 2876 got = queue.dequeueSeveral(buffer); 2877 assert(got.length == 0); 2878 assert(queue.isEmpty); 2879 } 2880 2881 /++ 2882 2883 +/ 2884 enum ByteOrder { 2885 irrelevant, 2886 littleEndian, 2887 bigEndian, 2888 } 2889 2890 class WritableStream { 2891 this(size_t bufferSize) { 2892 } 2893 2894 void put(T)() {} 2895 2896 void flush() {} 2897 2898 bool isClosed() { return true; } 2899 2900 // hasRoomInBuffer 2901 // canFlush 2902 // waitUntilCanFlush 2903 2904 // flushImpl 2905 // markFinished / close - tells the other end you're done 2906 } 2907 2908 /++ 2909 A stream can be used by just one task at a time, but one task can consume multiple streams. 2910 2911 Streams may be populated by async sources (in which case they must be called from a fiber task), 2912 from a function generating the data on demand (including an input range), from memory, or from a synchronous file. 2913 2914 A stream of heterogeneous types is compatible with input ranges. 2915 +/ 2916 class ReadableStream { 2917 2918 this() { 2919 2920 } 2921 2922 T get(T)(ByteOrder byteOrder = ByteOrder.irrelevant) { 2923 if(byteOrder == ByteOrder.irrelevant && T.sizeof > 1) 2924 throw new ArsdException!"byte order must be specified for a type that is bigger than one byte"; 2925 2926 while(bufferedLength() < T.sizeof) 2927 waitForAdditionalData(); 2928 2929 static if(T.sizeof == 1) { 2930 ubyte ret = consumeOneByte(); 2931 return *cast(T*) &ret; 2932 } else { 2933 static if(T.sizeof == 8) 2934 ulong ret; 2935 else static if(T.sizeof == 4) 2936 uint ret; 2937 else static if(T.sizeof == 2) 2938 ushort ret; 2939 else static assert(0, "unimplemented type, try using just the basic types"); 2940 2941 if(byteOrder == ByteOrder.littleEndian) { 2942 typeof(ret) buffer; 2943 foreach(b; 0 .. T.sizeof) { 2944 buffer = consumeOneByte(); 2945 buffer <<= T.sizeof * 8 - 8; 2946 2947 ret >>= 8; 2948 ret |= buffer; 2949 } 2950 } else { 2951 foreach(b; 0 .. T.sizeof) { 2952 ret <<= 8; 2953 ret |= consumeOneByte(); 2954 } 2955 } 2956 2957 return *cast(T*) &ret; 2958 } 2959 } 2960 2961 // if the stream is closed before getting the length or the terminator, should we send partial stuff 2962 // or just throw? 2963 T get(T : E[], E)(size_t length, ByteOrder elementByteOrder = ByteOrder.irrelevant) { 2964 if(byteOrder == ByteOrder.irrelevant && E.sizeof > 1) 2965 throw new ArsdException!"byte order must be specified for a type that is bigger than one byte"; 2966 2967 while(bufferedLength() < length * E.sizeof) 2968 waitForAdditionalData(); 2969 2970 T ret; 2971 2972 // FIXME 2973 2974 return ret; 2975 2976 } 2977 2978 T get(T : E[], E)(scope bool delegate(E e) isTerminatingSentinel, ByteOrder elementByteOrder = ByteOrder.irrelevant) { 2979 if(byteOrder == ByteOrder.irrelevant && E.sizeof > 1) 2980 throw new ArsdException!"byte order must be specified for a type that is bigger than one byte"; 2981 2982 assert(0, "Not implemented"); 2983 } 2984 2985 /++ 2986 2987 +/ 2988 bool isClosed() { 2989 return isClosed_; 2990 } 2991 2992 // Control side of things 2993 2994 private bool isClosed_; 2995 2996 /++ 2997 Feeds data into the stream, which can be consumed by `get`. If a task is waiting for more 2998 data to satisfy its get requests, this will trigger those tasks to resume. 2999 3000 If you feed it empty data, it will mark the stream as closed. 3001 +/ 3002 void feedData(ubyte[] data) { 3003 if(data.length == 0) 3004 isClosed_ = true; 3005 3006 currentBuffer = data; 3007 // this is a borrowed buffer, so we won't keep the reference long term 3008 scope(exit) 3009 currentBuffer = null; 3010 3011 if(waitingTask !is null) { 3012 waitingTask.call(); 3013 } 3014 } 3015 3016 /++ 3017 You basically have to use this thing from a task 3018 +/ 3019 protected void waitForAdditionalData() { 3020 Fiber task = Fiber.getThis; 3021 3022 assert(task !is null); 3023 3024 if(waitingTask !is null && waitingTask !is task) 3025 throw new ArsdException!"streams can only have one waiting task"; 3026 3027 // copy any pending data in our buffer to the longer-term buffer 3028 if(currentBuffer.length) 3029 leftoverBuffer ~= currentBuffer; 3030 3031 waitingTask = task; 3032 task.yield(); 3033 } 3034 3035 private Fiber waitingTask; 3036 private ubyte[] leftoverBuffer; 3037 private ubyte[] currentBuffer; 3038 3039 private size_t bufferedLength() { 3040 return leftoverBuffer.length + currentBuffer.length; 3041 } 3042 3043 private ubyte consumeOneByte() { 3044 ubyte b; 3045 if(leftoverBuffer.length) { 3046 b = leftoverBuffer[0]; 3047 leftoverBuffer = leftoverBuffer[1 .. $]; 3048 } else if(currentBuffer.length) { 3049 b = currentBuffer[0]; 3050 currentBuffer = currentBuffer[1 .. $]; 3051 } else { 3052 assert(0, "consuming off an empty buffer is impossible"); 3053 } 3054 3055 return b; 3056 } 3057 } 3058 3059 unittest { 3060 auto stream = new ReadableStream(); 3061 3062 int position; 3063 char[16] errorBuffer; 3064 3065 auto fiber = new Fiber(() { 3066 position = 1; 3067 int a = stream.get!int(ByteOrder.littleEndian); 3068 assert(a == 10, intToString(a, errorBuffer[])); 3069 position = 2; 3070 ubyte b = stream.get!ubyte; 3071 assert(b == 33); 3072 position = 3; 3073 }); 3074 3075 fiber.call(); 3076 assert(position == 1); 3077 stream.feedData([10, 0, 0, 0]); 3078 assert(position == 2); 3079 stream.feedData([33]); 3080 assert(position == 3); 3081 } 3082 3083 /++ 3084 You might use this like: 3085 3086 --- 3087 auto proc = new ExternalProcess(); 3088 auto stdoutStream = new ReadableStream(); 3089 3090 // to use a stream you can make one and have a task consume it 3091 runTask({ 3092 while(!stdoutStream.isClosed) { 3093 auto line = stdoutStream.get!string(e => e == '\n'); 3094 } 3095 }); 3096 3097 // then make the process feed into the stream 3098 proc.onStdoutAvailable = (got) { 3099 stdoutStream.feedData(got); // send it to the stream for processing 3100 stdout.rawWrite(got); // forward it through to our own thing 3101 // could also append it to a buffer to return it on complete 3102 }; 3103 proc.start(); 3104 --- 3105 3106 Please note that this does not currently and I have no plans as of this writing to add support for any kind of direct file descriptor passing. It always pipes them back to the parent for processing. If you don't want this, call the lower level functions yourself; the reason this class is here is to aid integration in the arsd.core event loop. 3107 3108 Of course, I might change my mind on this. 3109 +/ 3110 class ExternalProcess { 3111 3112 private static version(Posix) { 3113 __gshared ExternalProcess[pid_t] activeChildren; 3114 3115 void recordChildCreated(pid_t pid, ExternalProcess proc) { 3116 synchronized(typeid(ExternalProcess)) { 3117 activeChildren[pid] = proc; 3118 } 3119 } 3120 3121 void recordChildTerminated(pid_t pid, int status) { 3122 synchronized(typeid(ExternalProcess)) { 3123 if(pid in activeChildren) { 3124 auto ac = activeChildren[pid]; 3125 ac.completed = true; 3126 ac.status = status; 3127 activeChildren.remove(pid); 3128 } 3129 } 3130 } 3131 } 3132 3133 // FIXME: config to pass through a shell or not 3134 3135 /++ 3136 This is the native version for Windows. 3137 +/ 3138 this(string program, string commandLine) { 3139 } 3140 3141 this(string commandLine) { 3142 version(Posix) { 3143 assert(0, "not implemented command line to posix args yet"); 3144 } 3145 3146 } 3147 3148 this(string[] args) { 3149 version(Posix) { 3150 this.program = FilePath(args[0]); 3151 this.args = args; 3152 } 3153 3154 } 3155 3156 /++ 3157 This is the native version for Posix. 3158 +/ 3159 this(FilePath program, string[] args) { 3160 version(Posix) { 3161 this.program = program; 3162 this.args = args; 3163 } 3164 } 3165 3166 // you can modify these before calling start 3167 int stdoutBufferSize = 32 * 1024; 3168 int stderrBufferSize = 8 * 1024; 3169 3170 void start() { 3171 version(Posix) { 3172 int ret; 3173 3174 int[2] stdinPipes; 3175 ret = pipe(stdinPipes); 3176 if(ret == -1) 3177 throw new ErrnoApiException("stdin pipe", errno); 3178 3179 scope(failure) { 3180 close(stdinPipes[0]); 3181 close(stdinPipes[1]); 3182 } 3183 3184 stdinFd = stdinPipes[1]; 3185 3186 int[2] stdoutPipes; 3187 ret = pipe(stdoutPipes); 3188 if(ret == -1) 3189 throw new ErrnoApiException("stdout pipe", errno); 3190 3191 scope(failure) { 3192 close(stdoutPipes[0]); 3193 close(stdoutPipes[1]); 3194 } 3195 3196 stdoutFd = stdoutPipes[0]; 3197 3198 int[2] stderrPipes; 3199 ret = pipe(stderrPipes); 3200 if(ret == -1) 3201 throw new ErrnoApiException("stderr pipe", errno); 3202 3203 scope(failure) { 3204 close(stderrPipes[0]); 3205 close(stderrPipes[1]); 3206 } 3207 3208 stderrFd = stderrPipes[0]; 3209 3210 3211 int[2] errorReportPipes; 3212 ret = pipe(errorReportPipes); 3213 if(ret == -1) 3214 throw new ErrnoApiException("error reporting pipe", errno); 3215 3216 scope(failure) { 3217 close(errorReportPipes[0]); 3218 close(errorReportPipes[1]); 3219 } 3220 3221 setCloExec(errorReportPipes[0]); 3222 setCloExec(errorReportPipes[1]); 3223 3224 auto forkRet = fork(); 3225 if(forkRet == -1) 3226 throw new ErrnoApiException("fork", errno); 3227 3228 if(forkRet == 0) { 3229 // child side 3230 3231 // FIXME can we do more error checking that is actually useful here? 3232 // these operations are virtually guaranteed to succeed given the setup anyway. 3233 3234 // FIXME pty too 3235 3236 void fail(int step) { 3237 import core.stdc.errno; 3238 auto code = errno; 3239 3240 // report the info back to the parent then exit 3241 3242 int[2] msg = [step, code]; 3243 auto ret = write(errorReportPipes[1], msg.ptr, msg.sizeof); 3244 3245 // but if this fails there's not much we can do... 3246 3247 import core.stdc.stdlib; 3248 exit(1); 3249 } 3250 3251 // dup2 closes the fd it is replacing automatically 3252 dup2(stdinPipes[0], 0); 3253 dup2(stdoutPipes[1], 1); 3254 dup2(stderrPipes[1], 2); 3255 3256 // don't need either of the original pipe fds anymore 3257 close(stdinPipes[0]); 3258 close(stdinPipes[1]); 3259 close(stdoutPipes[0]); 3260 close(stdoutPipes[1]); 3261 close(stderrPipes[0]); 3262 close(stderrPipes[1]); 3263 3264 // the error reporting pipe will be closed upon exec since we set cloexec before fork 3265 // and everything else should have cloexec set too hopefully. 3266 3267 if(beforeExec) 3268 beforeExec(); 3269 3270 // i'm not sure that a fully-initialized druntime is still usable 3271 // after a fork(), so i'm gonna stick to the C lib in here. 3272 3273 const(char)* file = mallocedStringz(program.path).ptr; 3274 if(file is null) 3275 fail(1); 3276 const(char)*[] argv = mallocSlice!(const(char)*)(args.length + 1); 3277 if(argv is null) 3278 fail(2); 3279 foreach(idx, arg; args) { 3280 argv[idx] = mallocedStringz(args[idx]).ptr; 3281 if(argv[idx] is null) 3282 fail(3); 3283 } 3284 argv[args.length] = null; 3285 3286 auto rete = execvp/*e*/(file, argv.ptr/*, envp*/); 3287 if(rete == -1) { 3288 fail(4); 3289 } else { 3290 // unreachable code, exec never returns if it succeeds 3291 assert(0); 3292 } 3293 } else { 3294 pid = forkRet; 3295 3296 recordChildCreated(pid, this); 3297 3298 // close our copy of the write side of the error reporting pipe 3299 // so the read will immediately give eof when the fork closes it too 3300 ErrnoEnforce!close(errorReportPipes[1]); 3301 3302 int[2] msg; 3303 // this will block to wait for it to actually either start up or fail to exec (which should be near instant) 3304 auto val = read(errorReportPipes[0], msg.ptr, msg.sizeof); 3305 3306 if(val == -1) 3307 throw new ErrnoApiException("read error report", errno); 3308 3309 if(val == msg.sizeof) { 3310 // error happened 3311 // FIXME: keep the step part of the error report too 3312 throw new ErrnoApiException("exec", msg[1]); 3313 } else if(val == 0) { 3314 // pipe closed, meaning exec succeeded 3315 } else { 3316 assert(0); // never supposed to happen 3317 } 3318 3319 // set the ones we keep to close upon future execs 3320 // FIXME should i set NOBLOCK at this time too? prolly should 3321 setCloExec(stdinPipes[1]); 3322 setCloExec(stdoutPipes[0]); 3323 setCloExec(stderrPipes[0]); 3324 3325 // and close the others 3326 ErrnoEnforce!close(stdinPipes[0]); 3327 ErrnoEnforce!close(stdoutPipes[1]); 3328 ErrnoEnforce!close(stderrPipes[1]); 3329 3330 ErrnoEnforce!close(errorReportPipes[0]); 3331 3332 // and now register the ones we need to read with the event loop so it can call the callbacks 3333 // also need to listen to SIGCHLD to queue up the terminated callback. FIXME 3334 3335 stdoutUnregisterToken = getThisThreadEventLoop().addCallbackOnFdReadable(stdoutFd, new CallbackHelper(&stdoutReadable)); 3336 } 3337 } 3338 } 3339 3340 private version(Posix) { 3341 import core.sys.posix.unistd; 3342 import core.sys.posix.fcntl; 3343 3344 int stdinFd = -1; 3345 int stdoutFd = -1; 3346 int stderrFd = -1; 3347 3348 ICoreEventLoop.UnregisterToken stdoutUnregisterToken; 3349 3350 pid_t pid = -1; 3351 3352 public void delegate() beforeExec; 3353 3354 FilePath program; 3355 string[] args; 3356 3357 void stdoutReadable() { 3358 ubyte[1024] buffer; 3359 auto ret = read(stdoutFd, buffer.ptr, buffer.length); 3360 if(ret == -1) 3361 throw new ErrnoApiException("read", errno); 3362 if(onStdoutAvailable) { 3363 onStdoutAvailable(buffer[0 .. ret]); 3364 } 3365 3366 if(ret == 0) { 3367 stdoutUnregisterToken.unregister(); 3368 3369 close(stdoutFd); 3370 stdoutFd = -1; 3371 } 3372 } 3373 } 3374 3375 void waitForCompletion() { 3376 getThisThreadEventLoop().run(&this.isComplete); 3377 } 3378 3379 bool isComplete() { 3380 return completed; 3381 } 3382 3383 bool completed; 3384 int status = int.min; 3385 3386 /++ 3387 If blocking, it will block the current task until the write succeeds. 3388 3389 Write `null` as data to close the pipe. Once the pipe is closed, you must not try to write to it again. 3390 +/ 3391 void writeToStdin(in void[] data) { 3392 version(Posix) { 3393 if(data is null) { 3394 close(stdinFd); 3395 stdinFd = -1; 3396 } else { 3397 // FIXME: check the return value again and queue async writes 3398 auto ret = write(stdinFd, data.ptr, data.length); 3399 if(ret == -1) 3400 throw new ErrnoApiException("write", errno); 3401 } 3402 } 3403 3404 } 3405 3406 void delegate(ubyte[] got) onStdoutAvailable; 3407 void delegate(ubyte[] got) onStderrAvailable; 3408 void delegate(int code) onTermination; 3409 3410 // pty? 3411 } 3412 3413 // FIXME: comment this out 3414 ///+ 3415 unittest { 3416 auto proc = new ExternalProcess(FilePath("/bin/cat"), ["/bin/cat"]); 3417 3418 getThisThreadEventLoop(); // initialize it 3419 3420 int c = 0; 3421 proc.onStdoutAvailable = delegate(ubyte[] got) { 3422 if(c == 0) 3423 assert(cast(string) got == "hello!"); 3424 else 3425 assert(got.length == 0); 3426 // import std.stdio; writeln(got); 3427 c++; 3428 }; 3429 3430 proc.start(); 3431 3432 assert(proc.pid != -1); 3433 3434 3435 import std.stdio; 3436 Thread[4] pool; 3437 3438 bool shouldExit; 3439 3440 static int received; 3441 3442 static void tester() { 3443 received++; 3444 //writeln(cast(void*) Thread.getThis, " ", received); 3445 } 3446 3447 foreach(ref thread; pool) { 3448 thread = new Thread(() { 3449 getThisThreadEventLoop().run(() { 3450 return shouldExit; 3451 }); 3452 }); 3453 thread.start(); 3454 } 3455 3456 3457 3458 proc.writeToStdin("hello!"); 3459 proc.writeToStdin(null); // closes the pipe 3460 3461 proc.waitForCompletion(); 3462 3463 assert(proc.status == 0); 3464 3465 assert(c == 2); 3466 } 3467 //+/ 3468 3469 // to test the thundering herd on signal handling 3470 version(none) 3471 unittest { 3472 Thread[4] pool; 3473 foreach(ref thread; pool) { 3474 thread = new class Thread { 3475 this() { 3476 super({ 3477 int count; 3478 getThisThreadEventLoop().run(() { 3479 if(count > 4) return true; 3480 count++; 3481 return false; 3482 }); 3483 }); 3484 } 3485 }; 3486 thread.start(); 3487 } 3488 foreach(ref thread; pool) { 3489 thread.join(); 3490 } 3491 } 3492 3493 /+ 3494 3495 STDIO 3496 3497 /++ 3498 Please note using this will create a compile-time dependency on [arsd.terminal] 3499 3500 It works correctly on Windows, using the correct functions to write unicode to the console. 3501 even allocating a console if needed. If the output has been redirected to a file or pipe, it 3502 writes UTF-8. 3503 3504 3505 so my writeln replacement: 3506 3507 1) if the std output handle is null, alloc one 3508 2) if it is a character device, write out the proper Unicode text. 3509 3) otherwise write out UTF-8.... maybe with a BOM but maybe not. it is tricky to know what the other end of a pipe expects... 3510 [8:15 AM] 3511 im actually tempted to make the write binary to stdout functions throw an exception if it is a character console / interactive terminal instead of letting you spam it right out 3512 [8:16 AM] 3513 of course you can still cheat by casting binary data to string and using the write string function (and this might be appropriate sometimes) but there kinda is a legit difference between a text output and a binary output device 3514 3515 Stdout can represent either 3516 3517 +/ 3518 void writeln(){} { 3519 3520 } 3521 3522 stderr? 3523 3524 /++ 3525 Please note using this will create a compile-time dependency on [arsd.terminal] 3526 3527 It can be called from a task. 3528 3529 It works correctly on Windows and is user friendly on Linux (using arsd.terminal.getline) 3530 while also working if stdin has been redirected (where arsd.terminal itself would throw) 3531 3532 3533 so say you run a program on an interactive terminal. the program tries to open the stdin binary stream 3534 3535 instead of throwing, the prompt could change to indicate the binary data is expected and you can feed it in either by typing it up,,,, or running some command like maybe <file.bin to have the library do what the shell would have done and feed that to the rest of the program 3536 3537 +/ 3538 string readln()() { 3539 3540 } 3541 3542 3543 // if using stdio as a binary output thing you can pretend it is a file w/ stream capability 3544 struct File { 3545 WritableStream ostream; 3546 ReadableStream istream; 3547 3548 ulong tell; 3549 void seek(ulong to) {} 3550 3551 void sync(); 3552 void close(); 3553 } 3554 3555 // these are a bit special because if it actually is an interactive character device, it might be different than other files and even different than other pipes. 3556 WritableStream stdoutStream() { return null; } 3557 WritableStream stderrStream() { return null; } 3558 ReadableStream stdinStream() { return null; } 3559 3560 +/ 3561 3562 3563 /+ 3564 3565 3566 /+ 3567 Druntime appears to have stuff for darwin, freebsd. I might have to add some for openbsd here and maybe netbsd if i care to test it. 3568 +/ 3569 3570 /+ 3571 3572 arsd_core_init(number_of_worker_threads) 3573 3574 Building-block things wanted for the event loop integration: 3575 * ui 3576 * windows 3577 * terminal / console 3578 * generic 3579 * adopt fd 3580 * adopt windows handle 3581 * shared lib 3582 * load 3583 * timers (relative and real time) 3584 * create 3585 * update 3586 * cancel 3587 * file/directory watches 3588 * file created 3589 * file deleted 3590 * file modified 3591 * file ops 3592 * open 3593 * close 3594 * read 3595 * write 3596 * seek 3597 * sendfile on linux 3598 * let completion handlers run in the io worker thread instead of signaling back 3599 * pipe ops (anonymous or named) 3600 * create 3601 * read 3602 * write 3603 * get info about other side of the pipe 3604 * network ops (stream + datagram, ip, ipv6, unix) 3605 * address look up 3606 * connect 3607 * start tls 3608 * listen 3609 * send 3610 * receive 3611 * get peer info 3612 * process ops 3613 * spawn 3614 * notifications when it is terminated or fork or execs 3615 * send signal 3616 * i/o pipes 3617 * thread ops (isDaemon?) 3618 * spawn 3619 * talk to its event loop 3620 * termination notification 3621 * signals 3622 * ctrl+c is the only one i really care about but the others might be made available too. sigchld needs to be done as an impl detail of process ops. 3623 * custom messages 3624 * should be able to send messages from finalizers... 3625 3626 * want to make sure i can stream stuff on top of it all too. 3627 3628 ======== 3629 3630 These things all refer back to a task-local thing that queues the tasks. If it is a fiber, it uses that 3631 and if it is a thread it uses that... 3632 3633 tls IArsdCoreEventLoop curentTaskInterface; // this yields on the wait for calls. the fiber swapper will swap this too. 3634 tls IArsdCoreEventLoop currentThreadInterface; // this blocks on the event loop 3635 3636 shared IArsdCoreEventLoop currentProcessInterface; // this dispatches to any available thread 3637 +/ 3638 3639 3640 /+ 3641 You might have configurable tasks that do not auto-start, e.g. httprequest. maybe @mustUse on those 3642 3643 then some that do auto-start, e.g. setTimeout 3644 3645 3646 timeouts: duration, MonoTime, or SysTime? duration is just a timer monotime auto-adjusts the when, systime sets a real time timerfd 3647 3648 tasks can be set to: 3649 thread affinity - this, any, specific reference 3650 reports to - defaults to this, can also pass down a parent reference. if reports to dies, all its subordinates are cancelled. 3651 3652 3653 you can send a message to a task... maybe maybe just to a task runner (which is itself a task?) 3654 3655 auto file = readFile(x); 3656 auto timeout = setTimeout(y); 3657 auto completed = waitForFirstToCompleteThenCancelOthers(file, timeout); 3658 if(completed == 0) { 3659 file.... 3660 } else { 3661 timeout.... 3662 } 3663 3664 /+ 3665 A task will run on a thread (with possible migration), and report to a task. 3666 +/ 3667 3668 // a compute task is run on a helper thread 3669 auto task = computeTask((shared(bool)* cancellationRequested) { 3670 // or pass in a yield thing... prolly a TaskController which has cancellationRequested and yield controls as well as send message to parent (sync or async) 3671 3672 // you'd periodically send messages back to the parent 3673 }, RunOn.AnyAvailable, Affinity.CanMigrate); 3674 3675 auto task = Task((TaskController controller) { 3676 foreach(x, 0 .. 1000) { 3677 if(x % 10 == 0) 3678 controller.yield(); // periodically yield control, which also checks for cancellation for us 3679 // do some work 3680 3681 controller.sendMessage(...); 3682 controller.sendProgress(x); // yields it for a foreach stream kind of thing 3683 } 3684 3685 return something; // automatically sends the something as the result in a TaskFinished message 3686 }); 3687 3688 foreach(item; task) // waitsForProgress, sendProgress sends an item and the final return sends an item 3689 {} 3690 3691 3692 see ~/test/task.d 3693 3694 // an io task is run locally via the event loops 3695 auto task2 = ioTask(() { 3696 3697 }); 3698 3699 3700 3701 waitForEvent 3702 +/ 3703 3704 /+ 3705 Most functions should prolly take a thread arg too, which defaults 3706 to this thread, but you can also pass it a reference, or a "any available" thing. 3707 3708 This can be a ufcs overload 3709 +/ 3710 3711 interface SemiSynchronousTask { 3712 3713 } 3714 3715 struct TimeoutCompletionResult { 3716 bool completed; 3717 3718 bool opCast(T : bool)() { 3719 return completed; 3720 } 3721 } 3722 3723 struct Timeout { 3724 void reschedule(Duration when) { 3725 3726 } 3727 3728 void cancel() { 3729 3730 } 3731 3732 TimeoutCompletionResult waitForCompletion() { 3733 return TimeoutCompletionResult(false); 3734 } 3735 } 3736 3737 Timeout setTimeout(void delegate() dg, int msecs, int permittedJitter = 20) { 3738 return Timeout.init; 3739 } 3740 3741 void clearTimeout(Timeout timeout) { 3742 timeout.cancel(); 3743 } 3744 3745 void createInterval() {} 3746 void clearInterval() {} 3747 3748 /++ 3749 Schedules a task at the given wall clock time. 3750 +/ 3751 void scheduleTask() {} 3752 3753 struct IoOperationCompletionResult { 3754 enum Status { 3755 cancelled, 3756 completed 3757 } 3758 3759 Status status; 3760 3761 int error; 3762 int bytesWritten; 3763 3764 bool opCast(T : bool)() { 3765 return status == Status.completed; 3766 } 3767 } 3768 3769 struct IoOperation { 3770 void cancel() {} 3771 3772 IoOperationCompletionResult waitForCompletion() { 3773 return IoOperationCompletionResult.init; 3774 } 3775 3776 // could contain a scoped class in here too so it stack allocated 3777 } 3778 3779 // Should return both the object and the index in the array! 3780 Result waitForFirstToComplete(Operation[]...) {} 3781 3782 IoOperation read(IoHandle handle, ubyte[] buffer 3783 3784 /+ 3785 class IoOperation {} 3786 3787 // an io operation and its buffer must not be modified or freed 3788 // in between a call to enqueue and a call to waitForCompletion 3789 // if you used the whenComplete callback, make sure it is NOT gc'd or scope thing goes out of scope in the mean time 3790 // if its dtor runs, it'd be forced to be cancelled... 3791 3792 scope IoOperation op = new IoOperation(buffer_size); 3793 op.start(); 3794 op.waitForCompletion(); 3795 +/ 3796 3797 /+ 3798 will want: 3799 read, write 3800 send, recv 3801 3802 cancel 3803 3804 open file, open (named or anonymous) pipe, open process 3805 connect, accept 3806 SSL 3807 close 3808 3809 postEvent 3810 postAPC? like run in gui thread / async 3811 waitForEvent ? needs to handle a timeout and a cancellation. would only work in the fiber task api. 3812 3813 waitForSuccess 3814 3815 interrupt handler 3816 3817 onPosixReadReadiness 3818 onPosixWriteReadiness 3819 3820 onWindowsHandleReadiness 3821 - but they're one-offs so you gotta reregister for each event 3822 +/ 3823 3824 3825 3826 /+ 3827 arsd.core.uda 3828 3829 you define a model struct with the types you want to extract 3830 3831 you get it with like Model extract(Model, UDAs...)(Model default) 3832 3833 defaultModel!alias > defaultModel!Type(defaultModel("identifier")) 3834 3835 3836 3837 3838 3839 3840 3841 3842 3843 3844 so while i laid there sleep deprived i did think a lil more on some uda stuff. it isn't especially novel but a combination of a few other techniques 3845 3846 you might be like 3847 3848 struct MyUdas { 3849 DbName name; 3850 DbIgnore ignore; 3851 } 3852 3853 elsewhere 3854 3855 foreach(alias; allMembers) { 3856 auto udas = getUdas!(MyUdas, __traits(getAttributes, alias))(MyUdas(DbName(__traits(identifier, alias)))); 3857 } 3858 3859 3860 so you pass the expected type and the attributes as the template params, then the runtime params are the default values for the given types 3861 3862 so what the thing does essentially is just sets the values of the given thing to the udas based on type then returns the modified instance 3863 3864 so the end result is you keep the last ones. it wouldn't report errors if multiple things added but it p simple to understand, simple to document (even though the default values are not in the struct itself, you can put ddocs in them), and uses the tricks to minimize generated code size 3865 +/ 3866 3867 +/ 3868 3869 private version(Windows) extern(Windows) { 3870 BOOL CancelIoEx(HANDLE, LPOVERLAPPED); 3871 }