1 /** 2 * This is a low-level messaging API upon which more structured or restrictive 3 * APIs may be built. The general idea is that every messageable entity is 4 * represented by a common handle type called a Tid, which allows messages to 5 * be sent to logical threads that are executing in both the current process 6 * and in external processes using the same interface. This is an important 7 * aspect of scalability because it allows the components of a program to be 8 * spread across available resources with few to no changes to the actual 9 * implementation. 10 * 11 * A logical thread is an execution context that has its own stack and which 12 * runs asynchronously to other logical threads. These may be preemptively 13 * scheduled kernel threads, fibers (cooperative user-space threads), or some 14 * other concept with similar behavior. 15 * 16 * The type of concurrency used when logical threads are created is determined 17 * by the Scheduler selected at initialization time. The default behavior is 18 * currently to create a new kernel thread per call to spawn, but other 19 * schedulers are available that multiplex fibers across the main thread or 20 * use some combination of the two approaches. 21 * 22 * Copyright: Copyright Sean Kelly 2009 - 2014. 23 * License: <a href="http://www.boost.org/LICENSE_1_0.txt">Boost License 1.0</a>. 24 * Authors: Sean Kelly, Alex Rønne Petersen, Martin Nowak 25 * Source: $(PHOBOSSRC std/concurrency.d) 26 */ 27 /* Copyright Sean Kelly 2009 - 2014. 28 * Distributed under the Boost Software License, Version 1.0. 29 * (See accompanying file LICENSE_1_0.txt or copy at 30 * http://www.boost.org/LICENSE_1_0.txt) 31 */ 32 module std.concurrency; 33 34 public import std.variant; 35 36 import core.atomic; 37 import core.sync.condition; 38 import core.sync.mutex; 39 import core.thread; 40 import std.range.primitives; 41 import std.range.interfaces : InputRange; 42 import std.traits; 43 44 /// 45 @system unittest 46 { 47 __gshared string received; 48 static void spawnedFunc(Tid ownerTid) 49 { 50 import std.conv : text; 51 // Receive a message from the owner thread. 52 receive((int i){ 53 received = text("Received the number ", i); 54 55 // Send a message back to the owner thread 56 // indicating success. 57 send(ownerTid, true); 58 }); 59 } 60 61 // Start spawnedFunc in a new thread. 62 auto childTid = spawn(&spawnedFunc, thisTid); 63 64 // Send the number 42 to this new thread. 65 send(childTid, 42); 66 67 // Receive the result code. 68 auto wasSuccessful = receiveOnly!(bool); 69 assert(wasSuccessful); 70 assert(received == "Received the number 42"); 71 } 72 73 private 74 { 75 bool hasLocalAliasing(Types...)() 76 { 77 import std.typecons : Rebindable; 78 79 // Works around "statement is not reachable" 80 bool doesIt = false; 81 static foreach (T; Types) 82 { 83 static if (is(T == Tid)) 84 { /* Allowed */ } 85 else static if (is(T : Rebindable!R, R)) 86 doesIt |= hasLocalAliasing!R; 87 else static if (is(T == struct)) 88 doesIt |= hasLocalAliasing!(typeof(T.tupleof)); 89 else 90 doesIt |= std.traits.hasUnsharedAliasing!(T); 91 } 92 return doesIt; 93 } 94 95 @safe unittest 96 { 97 static struct Container { Tid t; } 98 static assert(!hasLocalAliasing!(Tid, Container, int)); 99 } 100 101 // https://issues.dlang.org/show_bug.cgi?id=20097 102 @safe unittest 103 { 104 import std.datetime.systime : SysTime; 105 static struct Container { SysTime time; } 106 static assert(!hasLocalAliasing!(SysTime, Container)); 107 } 108 109 enum MsgType 110 { 111 standard, 112 priority, 113 linkDead, 114 } 115 116 struct Message 117 { 118 MsgType type; 119 Variant data; 120 121 this(T...)(MsgType t, T vals) if (T.length > 0) 122 { 123 static if (T.length == 1) 124 { 125 type = t; 126 data = vals[0]; 127 } 128 else 129 { 130 import std.typecons : Tuple; 131 132 type = t; 133 data = Tuple!(T)(vals); 134 } 135 } 136 137 @property auto convertsTo(T...)() 138 { 139 static if (T.length == 1) 140 { 141 return is(T[0] == Variant) || data.convertsTo!(T); 142 } 143 else 144 { 145 import std.typecons : Tuple; 146 return data.convertsTo!(Tuple!(T)); 147 } 148 } 149 150 @property auto get(T...)() 151 { 152 static if (T.length == 1) 153 { 154 static if (is(T[0] == Variant)) 155 return data; 156 else 157 return data.get!(T); 158 } 159 else 160 { 161 import std.typecons : Tuple; 162 return data.get!(Tuple!(T)); 163 } 164 } 165 166 auto map(Op)(Op op) 167 { 168 alias Args = Parameters!(Op); 169 170 static if (Args.length == 1) 171 { 172 static if (is(Args[0] == Variant)) 173 return op(data); 174 else 175 return op(data.get!(Args)); 176 } 177 else 178 { 179 import std.typecons : Tuple; 180 return op(data.get!(Tuple!(Args)).expand); 181 } 182 } 183 } 184 185 void checkops(T...)(T ops) 186 { 187 import std.format : format; 188 189 foreach (i, t1; T) 190 { 191 static assert(isFunctionPointer!t1 || isDelegate!t1, 192 format!"T %d is not a function pointer or delegates"(i)); 193 alias a1 = Parameters!(t1); 194 alias r1 = ReturnType!(t1); 195 196 static if (i < T.length - 1 && is(r1 == void)) 197 { 198 static assert(a1.length != 1 || !is(a1[0] == Variant), 199 "function with arguments " ~ a1.stringof ~ 200 " occludes successive function"); 201 202 foreach (t2; T[i + 1 .. $]) 203 { 204 alias a2 = Parameters!(t2); 205 206 static assert(!is(a1 == a2), 207 "function with arguments " ~ a1.stringof ~ " occludes successive function"); 208 } 209 } 210 } 211 } 212 213 @property ref ThreadInfo thisInfo() nothrow 214 { 215 if (scheduler is null) 216 return ThreadInfo.thisInfo; 217 return scheduler.thisInfo; 218 } 219 } 220 221 static ~this() 222 { 223 thisInfo.cleanup(); 224 } 225 226 // Exceptions 227 228 /** 229 * Thrown on calls to `receiveOnly` if a message other than the type 230 * the receiving thread expected is sent. 231 */ 232 class MessageMismatch : Exception 233 { 234 /// 235 this(string msg = "Unexpected message type") @safe pure nothrow @nogc 236 { 237 super(msg); 238 } 239 } 240 241 /** 242 * Thrown on calls to `receive` if the thread that spawned the receiving 243 * thread has terminated and no more messages exist. 244 */ 245 class OwnerTerminated : Exception 246 { 247 /// 248 this(Tid t, string msg = "Owner terminated") @safe pure nothrow @nogc 249 { 250 super(msg); 251 tid = t; 252 } 253 254 Tid tid; 255 } 256 257 /** 258 * Thrown if a linked thread has terminated. 259 */ 260 class LinkTerminated : Exception 261 { 262 /// 263 this(Tid t, string msg = "Link terminated") @safe pure nothrow @nogc 264 { 265 super(msg); 266 tid = t; 267 } 268 269 Tid tid; 270 } 271 272 /** 273 * Thrown if a message was sent to a thread via 274 * $(REF prioritySend, std,concurrency) and the receiver does not have a handler 275 * for a message of this type. 276 */ 277 class PriorityMessageException : Exception 278 { 279 /// 280 this(Variant vals) 281 { 282 super("Priority message"); 283 message = vals; 284 } 285 286 /** 287 * The message that was sent. 288 */ 289 Variant message; 290 } 291 292 /** 293 * Thrown on mailbox crowding if the mailbox is configured with 294 * `OnCrowding.throwException`. 295 */ 296 class MailboxFull : Exception 297 { 298 /// 299 this(Tid t, string msg = "Mailbox full") @safe pure nothrow @nogc 300 { 301 super(msg); 302 tid = t; 303 } 304 305 Tid tid; 306 } 307 308 /** 309 * Thrown when a Tid is missing, e.g. when `ownerTid` doesn't 310 * find an owner thread. 311 */ 312 class TidMissingException : Exception 313 { 314 import std.exception : basicExceptionCtors; 315 /// 316 mixin basicExceptionCtors; 317 } 318 319 320 // Thread ID 321 322 323 /** 324 * An opaque type used to represent a logical thread. 325 */ 326 struct Tid 327 { 328 private: 329 this(MessageBox m) @safe pure nothrow @nogc 330 { 331 mbox = m; 332 } 333 334 MessageBox mbox; 335 336 public: 337 338 /** 339 * Generate a convenient string for identifying this Tid. This is only 340 * useful to see if Tid's that are currently executing are the same or 341 * different, e.g. for logging and debugging. It is potentially possible 342 * that a Tid executed in the future will have the same toString() output 343 * as another Tid that has already terminated. 344 */ 345 void toString(scope void delegate(const(char)[]) sink) 346 { 347 import std.format : formattedWrite; 348 formattedWrite(sink, "Tid(%x)", cast(void*) mbox); 349 } 350 351 } 352 353 @system unittest 354 { 355 // text!Tid is @system 356 import std.conv : text; 357 Tid tid; 358 assert(text(tid) == "Tid(0)"); 359 auto tid2 = thisTid; 360 assert(text(tid2) != "Tid(0)"); 361 auto tid3 = tid2; 362 assert(text(tid2) == text(tid3)); 363 } 364 365 /** 366 * Returns: The $(LREF Tid) of the caller's thread. 367 */ 368 @property Tid thisTid() @safe 369 { 370 // TODO: remove when concurrency is safe 371 static auto trus() @trusted 372 { 373 if (thisInfo.ident != Tid.init) 374 return thisInfo.ident; 375 thisInfo.ident = Tid(new MessageBox); 376 return thisInfo.ident; 377 } 378 379 return trus(); 380 } 381 382 /** 383 * Return the Tid of the thread which spawned the caller's thread. 384 * 385 * Throws: A `TidMissingException` exception if 386 * there is no owner thread. 387 */ 388 @property Tid ownerTid() 389 { 390 import std.exception : enforce; 391 392 enforce!TidMissingException(thisInfo.owner.mbox !is null, "Error: Thread has no owner thread."); 393 return thisInfo.owner; 394 } 395 396 @system unittest 397 { 398 import std.exception : assertThrown; 399 400 static void fun() 401 { 402 string res = receiveOnly!string(); 403 assert(res == "Main calling"); 404 ownerTid.send("Child responding"); 405 } 406 407 assertThrown!TidMissingException(ownerTid); 408 auto child = spawn(&fun); 409 child.send("Main calling"); 410 string res = receiveOnly!string(); 411 assert(res == "Child responding"); 412 } 413 414 // Thread Creation 415 416 private template isSpawnable(F, T...) 417 { 418 template isParamsImplicitlyConvertible(F1, F2, int i = 0) 419 { 420 alias param1 = Parameters!F1; 421 alias param2 = Parameters!F2; 422 static if (param1.length != param2.length) 423 enum isParamsImplicitlyConvertible = false; 424 else static if (param1.length == i) 425 enum isParamsImplicitlyConvertible = true; 426 else static if (isImplicitlyConvertible!(param2[i], param1[i])) 427 enum isParamsImplicitlyConvertible = isParamsImplicitlyConvertible!(F1, 428 F2, i + 1); 429 else 430 enum isParamsImplicitlyConvertible = false; 431 } 432 433 enum isSpawnable = isCallable!F && is(ReturnType!F == void) 434 && isParamsImplicitlyConvertible!(F, void function(T)) 435 && (isFunctionPointer!F || !hasUnsharedAliasing!F); 436 } 437 438 /** 439 * Starts fn(args) in a new logical thread. 440 * 441 * Executes the supplied function in a new logical thread represented by 442 * `Tid`. The calling thread is designated as the owner of the new thread. 443 * When the owner thread terminates an `OwnerTerminated` message will be 444 * sent to the new thread, causing an `OwnerTerminated` exception to be 445 * thrown on `receive()`. 446 * 447 * Params: 448 * fn = The function to execute. 449 * args = Arguments to the function. 450 * 451 * Returns: 452 * A Tid representing the new logical thread. 453 * 454 * Notes: 455 * `args` must not have unshared aliasing. In other words, all arguments 456 * to `fn` must either be `shared` or `immutable` or have no 457 * pointer indirection. This is necessary for enforcing isolation among 458 * threads. 459 */ 460 Tid spawn(F, T...)(F fn, T args) 461 if (isSpawnable!(F, T)) 462 { 463 static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed."); 464 return _spawn(false, fn, args); 465 } 466 467 /// 468 @system unittest 469 { 470 static void f(string msg) 471 { 472 assert(msg == "Hello World"); 473 } 474 475 auto tid = spawn(&f, "Hello World"); 476 } 477 478 /// Fails: char[] has mutable aliasing. 479 @system unittest 480 { 481 string msg = "Hello, World!"; 482 483 static void f1(string msg) {} 484 static assert(!__traits(compiles, spawn(&f1, msg.dup))); 485 static assert( __traits(compiles, spawn(&f1, msg.idup))); 486 487 static void f2(char[] msg) {} 488 static assert(!__traits(compiles, spawn(&f2, msg.dup))); 489 static assert(!__traits(compiles, spawn(&f2, msg.idup))); 490 } 491 492 /// New thread with anonymous function 493 @system unittest 494 { 495 spawn({ 496 ownerTid.send("This is so great!"); 497 }); 498 assert(receiveOnly!string == "This is so great!"); 499 } 500 501 @system unittest 502 { 503 import core.thread : thread_joinAll; 504 505 __gshared string receivedMessage; 506 static void f1(string msg) 507 { 508 receivedMessage = msg; 509 } 510 511 auto tid1 = spawn(&f1, "Hello World"); 512 thread_joinAll; 513 assert(receivedMessage == "Hello World"); 514 } 515 516 /** 517 * Starts fn(args) in a logical thread and will receive a LinkTerminated 518 * message when the operation terminates. 519 * 520 * Executes the supplied function in a new logical thread represented by 521 * Tid. This new thread is linked to the calling thread so that if either 522 * it or the calling thread terminates a LinkTerminated message will be sent 523 * to the other, causing a LinkTerminated exception to be thrown on receive(). 524 * The owner relationship from spawn() is preserved as well, so if the link 525 * between threads is broken, owner termination will still result in an 526 * OwnerTerminated exception to be thrown on receive(). 527 * 528 * Params: 529 * fn = The function to execute. 530 * args = Arguments to the function. 531 * 532 * Returns: 533 * A Tid representing the new thread. 534 */ 535 Tid spawnLinked(F, T...)(F fn, T args) 536 if (isSpawnable!(F, T)) 537 { 538 static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed."); 539 return _spawn(true, fn, args); 540 } 541 542 /* 543 * 544 */ 545 private Tid _spawn(F, T...)(bool linked, F fn, T args) 546 if (isSpawnable!(F, T)) 547 { 548 // TODO: MessageList and &exec should be shared. 549 auto spawnTid = Tid(new MessageBox); 550 auto ownerTid = thisTid; 551 552 void exec() 553 { 554 thisInfo.ident = spawnTid; 555 thisInfo.owner = ownerTid; 556 fn(args); 557 } 558 559 // TODO: MessageList and &exec should be shared. 560 if (scheduler !is null) 561 scheduler.spawn(&exec); 562 else 563 { 564 auto t = new Thread(&exec); 565 t.start(); 566 } 567 thisInfo.links[spawnTid] = linked; 568 return spawnTid; 569 } 570 571 @system unittest 572 { 573 void function() fn1; 574 void function(int) fn2; 575 static assert(__traits(compiles, spawn(fn1))); 576 static assert(__traits(compiles, spawn(fn2, 2))); 577 static assert(!__traits(compiles, spawn(fn1, 1))); 578 static assert(!__traits(compiles, spawn(fn2))); 579 580 void delegate(int) shared dg1; 581 shared(void delegate(int)) dg2; 582 shared(void delegate(long) shared) dg3; 583 shared(void delegate(real, int, long) shared) dg4; 584 void delegate(int) immutable dg5; 585 void delegate(int) dg6; 586 static assert(__traits(compiles, spawn(dg1, 1))); 587 static assert(__traits(compiles, spawn(dg2, 2))); 588 static assert(__traits(compiles, spawn(dg3, 3))); 589 static assert(__traits(compiles, spawn(dg4, 4, 4, 4))); 590 static assert(__traits(compiles, spawn(dg5, 5))); 591 static assert(!__traits(compiles, spawn(dg6, 6))); 592 593 auto callable1 = new class{ void opCall(int) shared {} }; 594 auto callable2 = cast(shared) new class{ void opCall(int) shared {} }; 595 auto callable3 = new class{ void opCall(int) immutable {} }; 596 auto callable4 = cast(immutable) new class{ void opCall(int) immutable {} }; 597 auto callable5 = new class{ void opCall(int) {} }; 598 auto callable6 = cast(shared) new class{ void opCall(int) immutable {} }; 599 auto callable7 = cast(immutable) new class{ void opCall(int) shared {} }; 600 auto callable8 = cast(shared) new class{ void opCall(int) const shared {} }; 601 auto callable9 = cast(const shared) new class{ void opCall(int) shared {} }; 602 auto callable10 = cast(const shared) new class{ void opCall(int) const shared {} }; 603 auto callable11 = cast(immutable) new class{ void opCall(int) const shared {} }; 604 static assert(!__traits(compiles, spawn(callable1, 1))); 605 static assert( __traits(compiles, spawn(callable2, 2))); 606 static assert(!__traits(compiles, spawn(callable3, 3))); 607 static assert( __traits(compiles, spawn(callable4, 4))); 608 static assert(!__traits(compiles, spawn(callable5, 5))); 609 static assert(!__traits(compiles, spawn(callable6, 6))); 610 static assert(!__traits(compiles, spawn(callable7, 7))); 611 static assert( __traits(compiles, spawn(callable8, 8))); 612 static assert(!__traits(compiles, spawn(callable9, 9))); 613 static assert( __traits(compiles, spawn(callable10, 10))); 614 static assert( __traits(compiles, spawn(callable11, 11))); 615 } 616 617 /** 618 * Places the values as a message at the back of tid's message queue. 619 * 620 * Sends the supplied value to the thread represented by tid. As with 621 * $(REF spawn, std,concurrency), `T` must not have unshared aliasing. 622 */ 623 void send(T...)(Tid tid, T vals) 624 in (tid.mbox !is null) 625 { 626 static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed."); 627 _send(tid, vals); 628 } 629 630 /** 631 * Places the values as a message on the front of tid's message queue. 632 * 633 * Send a message to `tid` but place it at the front of `tid`'s message 634 * queue instead of at the back. This function is typically used for 635 * out-of-band communication, to signal exceptional conditions, etc. 636 */ 637 void prioritySend(T...)(Tid tid, T vals) 638 in (tid.mbox !is null) 639 { 640 static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed."); 641 _send(MsgType.priority, tid, vals); 642 } 643 644 /* 645 * ditto 646 */ 647 private void _send(T...)(Tid tid, T vals) 648 in (tid.mbox !is null) 649 { 650 _send(MsgType.standard, tid, vals); 651 } 652 653 /* 654 * Implementation of send. This allows parameter checking to be different for 655 * both Tid.send() and .send(). 656 */ 657 private void _send(T...)(MsgType type, Tid tid, T vals) 658 in (tid.mbox !is null) 659 { 660 auto msg = Message(type, vals); 661 tid.mbox.put(msg); 662 } 663 664 /** 665 * Receives a message from another thread. 666 * 667 * Receive a message from another thread, or block if no messages of the 668 * specified types are available. This function works by pattern matching 669 * a message against a set of delegates and executing the first match found. 670 * 671 * If a delegate that accepts a $(REF Variant, std,variant) is included as 672 * the last argument to `receive`, it will match any message that was not 673 * matched by an earlier delegate. If more than one argument is sent, 674 * the `Variant` will contain a $(REF Tuple, std,typecons) of all values 675 * sent. 676 * 677 * Params: 678 * ops = Variadic list of function pointers and delegates. Entries 679 * in this list must not occlude later entries. 680 * 681 * Throws: $(LREF OwnerTerminated) when the sending thread was terminated. 682 */ 683 void receive(T...)( T ops ) 684 in 685 { 686 assert(thisInfo.ident.mbox !is null, 687 "Cannot receive a message until a thread was spawned " 688 ~ "or thisTid was passed to a running thread."); 689 } 690 do 691 { 692 checkops( ops ); 693 694 thisInfo.ident.mbox.get( ops ); 695 } 696 697 /// 698 @system unittest 699 { 700 import std.variant : Variant; 701 702 auto process = () 703 { 704 receive( 705 (int i) { ownerTid.send(1); }, 706 (double f) { ownerTid.send(2); }, 707 (Variant v) { ownerTid.send(3); } 708 ); 709 }; 710 711 { 712 auto tid = spawn(process); 713 send(tid, 42); 714 assert(receiveOnly!int == 1); 715 } 716 717 { 718 auto tid = spawn(process); 719 send(tid, 3.14); 720 assert(receiveOnly!int == 2); 721 } 722 723 { 724 auto tid = spawn(process); 725 send(tid, "something else"); 726 assert(receiveOnly!int == 3); 727 } 728 } 729 730 @safe unittest 731 { 732 static assert( __traits( compiles, 733 { 734 receive( (Variant x) {} ); 735 receive( (int x) {}, (Variant x) {} ); 736 } ) ); 737 738 static assert( !__traits( compiles, 739 { 740 receive( (Variant x) {}, (int x) {} ); 741 } ) ); 742 743 static assert( !__traits( compiles, 744 { 745 receive( (int x) {}, (int x) {} ); 746 } ) ); 747 } 748 749 // Make sure receive() works with free functions as well. 750 version (StdUnittest) 751 { 752 private void receiveFunction(int x) {} 753 } 754 @safe unittest 755 { 756 static assert( __traits( compiles, 757 { 758 receive( &receiveFunction ); 759 receive( &receiveFunction, (Variant x) {} ); 760 } ) ); 761 } 762 763 764 private template receiveOnlyRet(T...) 765 { 766 static if ( T.length == 1 ) 767 { 768 alias receiveOnlyRet = T[0]; 769 } 770 else 771 { 772 import std.typecons : Tuple; 773 alias receiveOnlyRet = Tuple!(T); 774 } 775 } 776 777 /** 778 * Receives only messages with arguments of the specified types. 779 * 780 * Params: 781 * T = Variadic list of types to be received. 782 * 783 * Returns: The received message. If `T` has more than one entry, 784 * the message will be packed into a $(REF Tuple, std,typecons). 785 * 786 * Throws: $(LREF MessageMismatch) if a message of types other than `T` 787 * is received, 788 * $(LREF OwnerTerminated) when the sending thread was terminated. 789 */ 790 receiveOnlyRet!(T) receiveOnly(T...)() 791 in 792 { 793 assert(thisInfo.ident.mbox !is null, 794 "Cannot receive a message until a thread was spawned or thisTid was passed to a running thread."); 795 } 796 do 797 { 798 import std.format : format; 799 import std.typecons : Tuple; 800 801 Tuple!(T) ret; 802 803 thisInfo.ident.mbox.get((T val) { 804 static if (T.length) 805 ret.field = val; 806 }, 807 (LinkTerminated e) { throw e; }, 808 (OwnerTerminated e) { throw e; }, 809 (Variant val) { 810 static if (T.length > 1) 811 string exp = T.stringof; 812 else 813 string exp = T[0].stringof; 814 815 throw new MessageMismatch( 816 format("Unexpected message type: expected '%s', got '%s'", exp, val.type.toString())); 817 }); 818 static if (T.length == 1) 819 return ret[0]; 820 else 821 return ret; 822 } 823 824 /// 825 @system unittest 826 { 827 auto tid = spawn( 828 { 829 assert(receiveOnly!int == 42); 830 }); 831 send(tid, 42); 832 } 833 834 /// 835 @system unittest 836 { 837 auto tid = spawn( 838 { 839 assert(receiveOnly!string == "text"); 840 }); 841 send(tid, "text"); 842 } 843 844 /// 845 @system unittest 846 { 847 struct Record { string name; int age; } 848 849 auto tid = spawn( 850 { 851 auto msg = receiveOnly!(double, Record); 852 assert(msg[0] == 0.5); 853 assert(msg[1].name == "Alice"); 854 assert(msg[1].age == 31); 855 }); 856 857 send(tid, 0.5, Record("Alice", 31)); 858 } 859 860 @system unittest 861 { 862 static void t1(Tid mainTid) 863 { 864 try 865 { 866 receiveOnly!string(); 867 mainTid.send(""); 868 } 869 catch (Throwable th) 870 { 871 mainTid.send(th.msg); 872 } 873 } 874 875 auto tid = spawn(&t1, thisTid); 876 tid.send(1); 877 string result = receiveOnly!string(); 878 assert(result == "Unexpected message type: expected 'string', got 'int'"); 879 } 880 881 /** 882 * Receives a message from another thread and gives up if no match 883 * arrives within a specified duration. 884 * 885 * Receive a message from another thread, or block until `duration` exceeds, 886 * if no messages of the specified types are available. This function works 887 * by pattern matching a message against a set of delegates and executing 888 * the first match found. 889 * 890 * If a delegate that accepts a $(REF Variant, std,variant) is included as 891 * the last argument, it will match any message that was not 892 * matched by an earlier delegate. If more than one argument is sent, 893 * the `Variant` will contain a $(REF Tuple, std,typecons) of all values 894 * sent. 895 * 896 * Params: 897 * duration = Duration, how long to wait. If `duration` is negative, 898 * won't wait at all. 899 * ops = Variadic list of function pointers and delegates. Entries 900 * in this list must not occlude later entries. 901 * 902 * Returns: `true` if it received a message and `false` if it timed out waiting 903 * for one. 904 * 905 * Throws: $(LREF OwnerTerminated) when the sending thread was terminated. 906 */ 907 bool receiveTimeout(T...)(Duration duration, T ops) 908 in 909 { 910 assert(thisInfo.ident.mbox !is null, 911 "Cannot receive a message until a thread was spawned or thisTid was passed to a running thread."); 912 } 913 do 914 { 915 checkops(ops); 916 917 return thisInfo.ident.mbox.get(duration, ops); 918 } 919 920 @safe unittest 921 { 922 static assert(__traits(compiles, { 923 receiveTimeout(msecs(0), (Variant x) {}); 924 receiveTimeout(msecs(0), (int x) {}, (Variant x) {}); 925 })); 926 927 static assert(!__traits(compiles, { 928 receiveTimeout(msecs(0), (Variant x) {}, (int x) {}); 929 })); 930 931 static assert(!__traits(compiles, { 932 receiveTimeout(msecs(0), (int x) {}, (int x) {}); 933 })); 934 935 static assert(__traits(compiles, { 936 receiveTimeout(msecs(10), (int x) {}, (Variant x) {}); 937 })); 938 } 939 940 // MessageBox Limits 941 942 /** 943 * These behaviors may be specified when a mailbox is full. 944 */ 945 enum OnCrowding 946 { 947 block, /// Wait until room is available. 948 throwException, /// Throw a MailboxFull exception. 949 ignore /// Abort the send and return. 950 } 951 952 private 953 { 954 bool onCrowdingBlock(Tid tid) @safe pure nothrow @nogc 955 { 956 return true; 957 } 958 959 bool onCrowdingThrow(Tid tid) @safe pure 960 { 961 throw new MailboxFull(tid); 962 } 963 964 bool onCrowdingIgnore(Tid tid) @safe pure nothrow @nogc 965 { 966 return false; 967 } 968 } 969 970 /** 971 * Sets a maximum mailbox size. 972 * 973 * Sets a limit on the maximum number of user messages allowed in the mailbox. 974 * If this limit is reached, the caller attempting to add a new message will 975 * execute the behavior specified by doThis. If messages is zero, the mailbox 976 * is unbounded. 977 * 978 * Params: 979 * tid = The Tid of the thread for which this limit should be set. 980 * messages = The maximum number of messages or zero if no limit. 981 * doThis = The behavior executed when a message is sent to a full 982 * mailbox. 983 */ 984 void setMaxMailboxSize(Tid tid, size_t messages, OnCrowding doThis) @safe pure 985 in (tid.mbox !is null) 986 { 987 final switch (doThis) 988 { 989 case OnCrowding.block: 990 return tid.mbox.setMaxMsgs(messages, &onCrowdingBlock); 991 case OnCrowding.throwException: 992 return tid.mbox.setMaxMsgs(messages, &onCrowdingThrow); 993 case OnCrowding.ignore: 994 return tid.mbox.setMaxMsgs(messages, &onCrowdingIgnore); 995 } 996 } 997 998 /** 999 * Sets a maximum mailbox size. 1000 * 1001 * Sets a limit on the maximum number of user messages allowed in the mailbox. 1002 * If this limit is reached, the caller attempting to add a new message will 1003 * execute onCrowdingDoThis. If messages is zero, the mailbox is unbounded. 1004 * 1005 * Params: 1006 * tid = The Tid of the thread for which this limit should be set. 1007 * messages = The maximum number of messages or zero if no limit. 1008 * onCrowdingDoThis = The routine called when a message is sent to a full 1009 * mailbox. 1010 */ 1011 void setMaxMailboxSize(Tid tid, size_t messages, bool function(Tid) onCrowdingDoThis) 1012 in (tid.mbox !is null) 1013 { 1014 tid.mbox.setMaxMsgs(messages, onCrowdingDoThis); 1015 } 1016 1017 private 1018 { 1019 __gshared Tid[string] tidByName; 1020 __gshared string[][Tid] namesByTid; 1021 } 1022 1023 private @property Mutex registryLock() 1024 { 1025 __gshared Mutex impl; 1026 initOnce!impl(new Mutex); 1027 return impl; 1028 } 1029 1030 private void unregisterMe(ref ThreadInfo me) 1031 { 1032 if (me.ident != Tid.init) 1033 { 1034 synchronized (registryLock) 1035 { 1036 if (auto allNames = me.ident in namesByTid) 1037 { 1038 foreach (name; *allNames) 1039 tidByName.remove(name); 1040 namesByTid.remove(me.ident); 1041 } 1042 } 1043 } 1044 } 1045 1046 /** 1047 * Associates name with tid. 1048 * 1049 * Associates name with tid in a process-local map. When the thread 1050 * represented by tid terminates, any names associated with it will be 1051 * automatically unregistered. 1052 * 1053 * Params: 1054 * name = The name to associate with tid. 1055 * tid = The tid register by name. 1056 * 1057 * Returns: 1058 * true if the name is available and tid is not known to represent a 1059 * defunct thread. 1060 */ 1061 bool register(string name, Tid tid) 1062 in (tid.mbox !is null) 1063 { 1064 synchronized (registryLock) 1065 { 1066 if (name in tidByName) 1067 return false; 1068 if (tid.mbox.isClosed) 1069 return false; 1070 namesByTid[tid] ~= name; 1071 tidByName[name] = tid; 1072 return true; 1073 } 1074 } 1075 1076 /** 1077 * Removes the registered name associated with a tid. 1078 * 1079 * Params: 1080 * name = The name to unregister. 1081 * 1082 * Returns: 1083 * true if the name is registered, false if not. 1084 */ 1085 bool unregister(string name) 1086 { 1087 import std.algorithm.mutation : remove, SwapStrategy; 1088 import std.algorithm.searching : countUntil; 1089 1090 synchronized (registryLock) 1091 { 1092 if (auto tid = name in tidByName) 1093 { 1094 auto allNames = *tid in namesByTid; 1095 auto pos = countUntil(*allNames, name); 1096 remove!(SwapStrategy.unstable)(*allNames, pos); 1097 tidByName.remove(name); 1098 return true; 1099 } 1100 return false; 1101 } 1102 } 1103 1104 /** 1105 * Gets the Tid associated with name. 1106 * 1107 * Params: 1108 * name = The name to locate within the registry. 1109 * 1110 * Returns: 1111 * The associated Tid or Tid.init if name is not registered. 1112 */ 1113 Tid locate(string name) 1114 { 1115 synchronized (registryLock) 1116 { 1117 if (auto tid = name in tidByName) 1118 return *tid; 1119 return Tid.init; 1120 } 1121 } 1122 1123 /** 1124 * Encapsulates all implementation-level data needed for scheduling. 1125 * 1126 * When defining a Scheduler, an instance of this struct must be associated 1127 * with each logical thread. It contains all implementation-level information 1128 * needed by the internal API. 1129 */ 1130 struct ThreadInfo 1131 { 1132 Tid ident; 1133 bool[Tid] links; 1134 Tid owner; 1135 1136 /** 1137 * Gets a thread-local instance of ThreadInfo. 1138 * 1139 * Gets a thread-local instance of ThreadInfo, which should be used as the 1140 * default instance when info is requested for a thread not created by the 1141 * Scheduler. 1142 */ 1143 static @property ref thisInfo() nothrow 1144 { 1145 static ThreadInfo val; 1146 return val; 1147 } 1148 1149 /** 1150 * Cleans up this ThreadInfo. 1151 * 1152 * This must be called when a scheduled thread terminates. It tears down 1153 * the messaging system for the thread and notifies interested parties of 1154 * the thread's termination. 1155 */ 1156 void cleanup() 1157 { 1158 if (ident.mbox !is null) 1159 ident.mbox.close(); 1160 foreach (tid; links.keys) 1161 _send(MsgType.linkDead, tid, ident); 1162 if (owner != Tid.init) 1163 _send(MsgType.linkDead, owner, ident); 1164 unregisterMe(this); // clean up registry entries 1165 } 1166 1167 // https://issues.dlang.org/show_bug.cgi?id=20160 1168 @system unittest 1169 { 1170 register("main_thread", thisTid()); 1171 1172 ThreadInfo t; 1173 t.cleanup(); 1174 1175 assert(locate("main_thread") == thisTid()); 1176 } 1177 } 1178 1179 /** 1180 * A Scheduler controls how threading is performed by spawn. 1181 * 1182 * Implementing a Scheduler allows the concurrency mechanism used by this 1183 * module to be customized according to different needs. By default, a call 1184 * to spawn will create a new kernel thread that executes the supplied routine 1185 * and terminates when finished. But it is possible to create Schedulers that 1186 * reuse threads, that multiplex Fibers (coroutines) across a single thread, 1187 * or any number of other approaches. By making the choice of Scheduler a 1188 * user-level option, std.concurrency may be used for far more types of 1189 * application than if this behavior were predefined. 1190 * 1191 * Example: 1192 * --- 1193 * import std.concurrency; 1194 * import std.stdio; 1195 * 1196 * void main() 1197 * { 1198 * scheduler = new FiberScheduler; 1199 * scheduler.start( 1200 * { 1201 * writeln("the rest of main goes here"); 1202 * }); 1203 * } 1204 * --- 1205 * 1206 * Some schedulers have a dispatching loop that must run if they are to work 1207 * properly, so for the sake of consistency, when using a scheduler, start() 1208 * must be called within main(). This yields control to the scheduler and 1209 * will ensure that any spawned threads are executed in an expected manner. 1210 */ 1211 interface Scheduler 1212 { 1213 /** 1214 * Spawns the supplied op and starts the Scheduler. 1215 * 1216 * This is intended to be called at the start of the program to yield all 1217 * scheduling to the active Scheduler instance. This is necessary for 1218 * schedulers that explicitly dispatch threads rather than simply relying 1219 * on the operating system to do so, and so start should always be called 1220 * within main() to begin normal program execution. 1221 * 1222 * Params: 1223 * op = A wrapper for whatever the main thread would have done in the 1224 * absence of a custom scheduler. It will be automatically executed 1225 * via a call to spawn by the Scheduler. 1226 */ 1227 void start(void delegate() op); 1228 1229 /** 1230 * Assigns a logical thread to execute the supplied op. 1231 * 1232 * This routine is called by spawn. It is expected to instantiate a new 1233 * logical thread and run the supplied operation. This thread must call 1234 * thisInfo.cleanup() when the thread terminates if the scheduled thread 1235 * is not a kernel thread--all kernel threads will have their ThreadInfo 1236 * cleaned up automatically by a thread-local destructor. 1237 * 1238 * Params: 1239 * op = The function to execute. This may be the actual function passed 1240 * by the user to spawn itself, or may be a wrapper function. 1241 */ 1242 void spawn(void delegate() op); 1243 1244 /** 1245 * Yields execution to another logical thread. 1246 * 1247 * This routine is called at various points within concurrency-aware APIs 1248 * to provide a scheduler a chance to yield execution when using some sort 1249 * of cooperative multithreading model. If this is not appropriate, such 1250 * as when each logical thread is backed by a dedicated kernel thread, 1251 * this routine may be a no-op. 1252 */ 1253 void yield() nothrow; 1254 1255 /** 1256 * Returns an appropriate ThreadInfo instance. 1257 * 1258 * Returns an instance of ThreadInfo specific to the logical thread that 1259 * is calling this routine or, if the calling thread was not create by 1260 * this scheduler, returns ThreadInfo.thisInfo instead. 1261 */ 1262 @property ref ThreadInfo thisInfo() nothrow; 1263 1264 /** 1265 * Creates a Condition variable analog for signaling. 1266 * 1267 * Creates a new Condition variable analog which is used to check for and 1268 * to signal the addition of messages to a thread's message queue. Like 1269 * yield, some schedulers may need to define custom behavior so that calls 1270 * to Condition.wait() yield to another thread when no new messages are 1271 * available instead of blocking. 1272 * 1273 * Params: 1274 * m = The Mutex that will be associated with this condition. It will be 1275 * locked prior to any operation on the condition, and so in some 1276 * cases a Scheduler may need to hold this reference and unlock the 1277 * mutex before yielding execution to another logical thread. 1278 */ 1279 Condition newCondition(Mutex m) nothrow; 1280 } 1281 1282 /** 1283 * An example Scheduler using kernel threads. 1284 * 1285 * This is an example Scheduler that mirrors the default scheduling behavior 1286 * of creating one kernel thread per call to spawn. It is fully functional 1287 * and may be instantiated and used, but is not a necessary part of the 1288 * default functioning of this module. 1289 */ 1290 class ThreadScheduler : Scheduler 1291 { 1292 /** 1293 * This simply runs op directly, since no real scheduling is needed by 1294 * this approach. 1295 */ 1296 void start(void delegate() op) 1297 { 1298 op(); 1299 } 1300 1301 /** 1302 * Creates a new kernel thread and assigns it to run the supplied op. 1303 */ 1304 void spawn(void delegate() op) 1305 { 1306 auto t = new Thread(op); 1307 t.start(); 1308 } 1309 1310 /** 1311 * This scheduler does no explicit multiplexing, so this is a no-op. 1312 */ 1313 void yield() nothrow 1314 { 1315 // no explicit yield needed 1316 } 1317 1318 /** 1319 * Returns ThreadInfo.thisInfo, since it is a thread-local instance of 1320 * ThreadInfo, which is the correct behavior for this scheduler. 1321 */ 1322 @property ref ThreadInfo thisInfo() nothrow 1323 { 1324 return ThreadInfo.thisInfo; 1325 } 1326 1327 /** 1328 * Creates a new Condition variable. No custom behavior is needed here. 1329 */ 1330 Condition newCondition(Mutex m) nothrow 1331 { 1332 return new Condition(m); 1333 } 1334 } 1335 1336 /** 1337 * An example Scheduler using Fibers. 1338 * 1339 * This is an example scheduler that creates a new Fiber per call to spawn 1340 * and multiplexes the execution of all fibers within the main thread. 1341 */ 1342 class FiberScheduler : Scheduler 1343 { 1344 /** 1345 * This creates a new Fiber for the supplied op and then starts the 1346 * dispatcher. 1347 */ 1348 void start(void delegate() op) 1349 { 1350 create(op); 1351 dispatch(); 1352 } 1353 1354 /** 1355 * This created a new Fiber for the supplied op and adds it to the 1356 * dispatch list. 1357 */ 1358 void spawn(void delegate() op) nothrow 1359 { 1360 create(op); 1361 yield(); 1362 } 1363 1364 /** 1365 * If the caller is a scheduled Fiber, this yields execution to another 1366 * scheduled Fiber. 1367 */ 1368 void yield() nothrow 1369 { 1370 // NOTE: It's possible that we should test whether the calling Fiber 1371 // is an InfoFiber before yielding, but I think it's reasonable 1372 // that any (non-Generator) fiber should yield here. 1373 if (Fiber.getThis()) 1374 Fiber.yield(); 1375 } 1376 1377 /** 1378 * Returns an appropriate ThreadInfo instance. 1379 * 1380 * Returns a ThreadInfo instance specific to the calling Fiber if the 1381 * Fiber was created by this dispatcher, otherwise it returns 1382 * ThreadInfo.thisInfo. 1383 */ 1384 @property ref ThreadInfo thisInfo() nothrow 1385 { 1386 auto f = cast(InfoFiber) Fiber.getThis(); 1387 1388 if (f !is null) 1389 return f.info; 1390 return ThreadInfo.thisInfo; 1391 } 1392 1393 /** 1394 * Returns a Condition analog that yields when wait or notify is called. 1395 * 1396 * Bug: 1397 * For the default implementation, `notifyAll`will behave like `notify`. 1398 * 1399 * Params: 1400 * m = A `Mutex` to use for locking if the condition needs to be waited on 1401 * or notified from multiple `Thread`s. 1402 * If `null`, no `Mutex` will be used and it is assumed that the 1403 * `Condition` is only waited on/notified from one `Thread`. 1404 */ 1405 Condition newCondition(Mutex m) nothrow 1406 { 1407 return new FiberCondition(m); 1408 } 1409 1410 protected: 1411 /** 1412 * Creates a new Fiber which calls the given delegate. 1413 * 1414 * Params: 1415 * op = The delegate the fiber should call 1416 */ 1417 void create(void delegate() op) nothrow 1418 { 1419 void wrap() 1420 { 1421 scope (exit) 1422 { 1423 thisInfo.cleanup(); 1424 } 1425 op(); 1426 } 1427 1428 m_fibers ~= new InfoFiber(&wrap); 1429 } 1430 1431 /** 1432 * Fiber which embeds a ThreadInfo 1433 */ 1434 static class InfoFiber : Fiber 1435 { 1436 ThreadInfo info; 1437 1438 this(void delegate() op) nothrow 1439 { 1440 super(op); 1441 } 1442 1443 this(void delegate() op, size_t sz) nothrow 1444 { 1445 super(op, sz); 1446 } 1447 } 1448 1449 private: 1450 class FiberCondition : Condition 1451 { 1452 this(Mutex m) nothrow 1453 { 1454 super(m); 1455 notified = false; 1456 } 1457 1458 override void wait() nothrow 1459 { 1460 scope (exit) notified = false; 1461 1462 while (!notified) 1463 switchContext(); 1464 } 1465 1466 override bool wait(Duration period) nothrow 1467 { 1468 import core.time : MonoTime; 1469 1470 scope (exit) notified = false; 1471 1472 for (auto limit = MonoTime.currTime + period; 1473 !notified && !period.isNegative; 1474 period = limit - MonoTime.currTime) 1475 { 1476 this.outer.yield(); 1477 } 1478 return notified; 1479 } 1480 1481 override void notify() nothrow 1482 { 1483 notified = true; 1484 switchContext(); 1485 } 1486 1487 override void notifyAll() nothrow 1488 { 1489 notified = true; 1490 switchContext(); 1491 } 1492 1493 private: 1494 void switchContext() nothrow 1495 { 1496 if (mutex_nothrow) mutex_nothrow.unlock_nothrow(); 1497 scope (exit) 1498 if (mutex_nothrow) 1499 mutex_nothrow.lock_nothrow(); 1500 this.outer.yield(); 1501 } 1502 1503 private bool notified; 1504 } 1505 1506 private: 1507 void dispatch() 1508 { 1509 import std.algorithm.mutation : remove; 1510 1511 while (m_fibers.length > 0) 1512 { 1513 auto t = m_fibers[m_pos].call(Fiber.Rethrow.no); 1514 if (t !is null && !(cast(OwnerTerminated) t)) 1515 { 1516 throw t; 1517 } 1518 if (m_fibers[m_pos].state == Fiber.State.TERM) 1519 { 1520 if (m_pos >= (m_fibers = remove(m_fibers, m_pos)).length) 1521 m_pos = 0; 1522 } 1523 else if (m_pos++ >= m_fibers.length - 1) 1524 { 1525 m_pos = 0; 1526 } 1527 } 1528 } 1529 1530 private: 1531 Fiber[] m_fibers; 1532 size_t m_pos; 1533 } 1534 1535 @system unittest 1536 { 1537 static void receive(Condition cond, ref size_t received) 1538 { 1539 while (true) 1540 { 1541 synchronized (cond.mutex) 1542 { 1543 cond.wait(); 1544 ++received; 1545 } 1546 } 1547 } 1548 1549 static void send(Condition cond, ref size_t sent) 1550 { 1551 while (true) 1552 { 1553 synchronized (cond.mutex) 1554 { 1555 ++sent; 1556 cond.notify(); 1557 } 1558 } 1559 } 1560 1561 auto fs = new FiberScheduler; 1562 auto mtx = new Mutex; 1563 auto cond = fs.newCondition(mtx); 1564 1565 size_t received, sent; 1566 auto waiter = new Fiber({ receive(cond, received); }), notifier = new Fiber({ send(cond, sent); }); 1567 waiter.call(); 1568 assert(received == 0); 1569 notifier.call(); 1570 assert(sent == 1); 1571 assert(received == 0); 1572 waiter.call(); 1573 assert(received == 1); 1574 waiter.call(); 1575 assert(received == 1); 1576 } 1577 1578 /** 1579 * Sets the Scheduler behavior within the program. 1580 * 1581 * This variable sets the Scheduler behavior within this program. Typically, 1582 * when setting a Scheduler, scheduler.start() should be called in main. This 1583 * routine will not return until program execution is complete. 1584 */ 1585 __gshared Scheduler scheduler; 1586 1587 // Generator 1588 1589 /** 1590 * If the caller is a Fiber and is not a Generator, this function will call 1591 * scheduler.yield() or Fiber.yield(), as appropriate. 1592 */ 1593 void yield() nothrow 1594 { 1595 auto fiber = Fiber.getThis(); 1596 if (!(cast(IsGenerator) fiber)) 1597 { 1598 if (scheduler is null) 1599 { 1600 if (fiber) 1601 return Fiber.yield(); 1602 } 1603 else 1604 scheduler.yield(); 1605 } 1606 } 1607 1608 /// Used to determine whether a Generator is running. 1609 private interface IsGenerator {} 1610 1611 1612 /** 1613 * A Generator is a Fiber that periodically returns values of type T to the 1614 * caller via yield. This is represented as an InputRange. 1615 */ 1616 class Generator(T) : 1617 Fiber, IsGenerator, InputRange!T 1618 { 1619 /** 1620 * Initializes a generator object which is associated with a static 1621 * D function. The function will be called once to prepare the range 1622 * for iteration. 1623 * 1624 * Params: 1625 * fn = The fiber function. 1626 * 1627 * In: 1628 * fn must not be null. 1629 */ 1630 this(void function() fn) 1631 { 1632 super(fn); 1633 call(); 1634 } 1635 1636 /** 1637 * Initializes a generator object which is associated with a static 1638 * D function. The function will be called once to prepare the range 1639 * for iteration. 1640 * 1641 * Params: 1642 * fn = The fiber function. 1643 * sz = The stack size for this fiber. 1644 * 1645 * In: 1646 * fn must not be null. 1647 */ 1648 this(void function() fn, size_t sz) 1649 { 1650 super(fn, sz); 1651 call(); 1652 } 1653 1654 /** 1655 * Initializes a generator object which is associated with a static 1656 * D function. The function will be called once to prepare the range 1657 * for iteration. 1658 * 1659 * Params: 1660 * fn = The fiber function. 1661 * sz = The stack size for this fiber. 1662 * guardPageSize = size of the guard page to trap fiber's stack 1663 * overflows. Refer to $(REF Fiber, core,thread)'s 1664 * documentation for more details. 1665 * 1666 * In: 1667 * fn must not be null. 1668 */ 1669 this(void function() fn, size_t sz, size_t guardPageSize) 1670 { 1671 super(fn, sz, guardPageSize); 1672 call(); 1673 } 1674 1675 /** 1676 * Initializes a generator object which is associated with a dynamic 1677 * D function. The function will be called once to prepare the range 1678 * for iteration. 1679 * 1680 * Params: 1681 * dg = The fiber function. 1682 * 1683 * In: 1684 * dg must not be null. 1685 */ 1686 this(void delegate() dg) 1687 { 1688 super(dg); 1689 call(); 1690 } 1691 1692 /** 1693 * Initializes a generator object which is associated with a dynamic 1694 * D function. The function will be called once to prepare the range 1695 * for iteration. 1696 * 1697 * Params: 1698 * dg = The fiber function. 1699 * sz = The stack size for this fiber. 1700 * 1701 * In: 1702 * dg must not be null. 1703 */ 1704 this(void delegate() dg, size_t sz) 1705 { 1706 super(dg, sz); 1707 call(); 1708 } 1709 1710 /** 1711 * Initializes a generator object which is associated with a dynamic 1712 * D function. The function will be called once to prepare the range 1713 * for iteration. 1714 * 1715 * Params: 1716 * dg = The fiber function. 1717 * sz = The stack size for this fiber. 1718 * guardPageSize = size of the guard page to trap fiber's stack 1719 * overflows. Refer to $(REF Fiber, core,thread)'s 1720 * documentation for more details. 1721 * 1722 * In: 1723 * dg must not be null. 1724 */ 1725 this(void delegate() dg, size_t sz, size_t guardPageSize) 1726 { 1727 super(dg, sz, guardPageSize); 1728 call(); 1729 } 1730 1731 /** 1732 * Returns true if the generator is empty. 1733 */ 1734 final bool empty() @property 1735 { 1736 return m_value is null || state == State.TERM; 1737 } 1738 1739 /** 1740 * Obtains the next value from the underlying function. 1741 */ 1742 final void popFront() 1743 { 1744 call(); 1745 } 1746 1747 /** 1748 * Returns the most recently generated value by shallow copy. 1749 */ 1750 final T front() @property 1751 { 1752 return *m_value; 1753 } 1754 1755 /** 1756 * Returns the most recently generated value without executing a 1757 * copy contructor. Will not compile for element types defining a 1758 * postblit, because Generator does not return by reference. 1759 */ 1760 final T moveFront() 1761 { 1762 static if (!hasElaborateCopyConstructor!T) 1763 { 1764 return front; 1765 } 1766 else 1767 { 1768 static assert(0, 1769 "Fiber front is always rvalue and thus cannot be moved since it defines a postblit."); 1770 } 1771 } 1772 1773 final int opApply(scope int delegate(T) loopBody) 1774 { 1775 int broken; 1776 for (; !empty; popFront()) 1777 { 1778 broken = loopBody(front); 1779 if (broken) break; 1780 } 1781 return broken; 1782 } 1783 1784 final int opApply(scope int delegate(size_t, T) loopBody) 1785 { 1786 int broken; 1787 for (size_t i; !empty; ++i, popFront()) 1788 { 1789 broken = loopBody(i, front); 1790 if (broken) break; 1791 } 1792 return broken; 1793 } 1794 private: 1795 T* m_value; 1796 } 1797 1798 /// 1799 @system unittest 1800 { 1801 auto tid = spawn({ 1802 int i; 1803 while (i < 9) 1804 i = receiveOnly!int; 1805 1806 ownerTid.send(i * 2); 1807 }); 1808 1809 auto r = new Generator!int({ 1810 foreach (i; 1 .. 10) 1811 yield(i); 1812 }); 1813 1814 foreach (e; r) 1815 tid.send(e); 1816 1817 assert(receiveOnly!int == 18); 1818 } 1819 1820 /** 1821 * Yields a value of type T to the caller of the currently executing 1822 * generator. 1823 * 1824 * Params: 1825 * value = The value to yield. 1826 */ 1827 void yield(T)(ref T value) 1828 { 1829 Generator!T cur = cast(Generator!T) Fiber.getThis(); 1830 if (cur !is null && cur.state == Fiber.State.EXEC) 1831 { 1832 cur.m_value = &value; 1833 return Fiber.yield(); 1834 } 1835 throw new Exception("yield(T) called with no active generator for the supplied type"); 1836 } 1837 1838 /// ditto 1839 void yield(T)(T value) 1840 { 1841 yield(value); 1842 } 1843 1844 @system unittest 1845 { 1846 import core.exception; 1847 import std.exception; 1848 1849 static void testScheduler(Scheduler s) 1850 { 1851 scheduler = s; 1852 scheduler.start({ 1853 auto tid = spawn({ 1854 int i; 1855 1856 try 1857 { 1858 for (i = 1; i < 10; i++) 1859 { 1860 assertNotThrown!AssertError(assert(receiveOnly!int() == i)); 1861 } 1862 } 1863 catch (OwnerTerminated e) 1864 { 1865 1866 } 1867 1868 // i will advance 1 past the last value expected 1869 assert(i == 4); 1870 }); 1871 1872 auto r = new Generator!int({ 1873 assertThrown!Exception(yield(2.0)); 1874 yield(); // ensure this is a no-op 1875 yield(1); 1876 yield(); // also once something has been yielded 1877 yield(2); 1878 yield(3); 1879 }); 1880 1881 foreach (e; r) 1882 { 1883 tid.send(e); 1884 } 1885 }); 1886 scheduler = null; 1887 } 1888 1889 testScheduler(new ThreadScheduler); 1890 testScheduler(new FiberScheduler); 1891 } 1892 /// 1893 @system unittest 1894 { 1895 import std.range; 1896 1897 InputRange!int myIota = iota(10).inputRangeObject; 1898 1899 myIota.popFront(); 1900 myIota.popFront(); 1901 assert(myIota.moveFront == 2); 1902 assert(myIota.front == 2); 1903 myIota.popFront(); 1904 assert(myIota.front == 3); 1905 1906 //can be assigned to std.range.interfaces.InputRange directly 1907 myIota = new Generator!int( 1908 { 1909 foreach (i; 0 .. 10) yield(i); 1910 }); 1911 1912 myIota.popFront(); 1913 myIota.popFront(); 1914 assert(myIota.moveFront == 2); 1915 assert(myIota.front == 2); 1916 myIota.popFront(); 1917 assert(myIota.front == 3); 1918 1919 size_t[2] counter = [0, 0]; 1920 foreach (i, unused; myIota) counter[] += [1, i]; 1921 1922 assert(myIota.empty); 1923 assert(counter == [7, 21]); 1924 } 1925 1926 private 1927 { 1928 /* 1929 * A MessageBox is a message queue for one thread. Other threads may send 1930 * messages to this owner by calling put(), and the owner receives them by 1931 * calling get(). The put() call is therefore effectively shared and the 1932 * get() call is effectively local. setMaxMsgs may be used by any thread 1933 * to limit the size of the message queue. 1934 */ 1935 class MessageBox 1936 { 1937 this() @trusted nothrow /* TODO: make @safe after relevant druntime PR gets merged */ 1938 { 1939 m_lock = new Mutex; 1940 m_closed = false; 1941 1942 if (scheduler is null) 1943 { 1944 m_putMsg = new Condition(m_lock); 1945 m_notFull = new Condition(m_lock); 1946 } 1947 else 1948 { 1949 m_putMsg = scheduler.newCondition(m_lock); 1950 m_notFull = scheduler.newCondition(m_lock); 1951 } 1952 } 1953 1954 /// 1955 final @property bool isClosed() @safe @nogc pure 1956 { 1957 synchronized (m_lock) 1958 { 1959 return m_closed; 1960 } 1961 } 1962 1963 /* 1964 * Sets a limit on the maximum number of user messages allowed in the 1965 * mailbox. If this limit is reached, the caller attempting to add 1966 * a new message will execute call. If num is zero, there is no limit 1967 * on the message queue. 1968 * 1969 * Params: 1970 * num = The maximum size of the queue or zero if the queue is 1971 * unbounded. 1972 * call = The routine to call when the queue is full. 1973 */ 1974 final void setMaxMsgs(size_t num, bool function(Tid) call) @safe @nogc pure 1975 { 1976 synchronized (m_lock) 1977 { 1978 m_maxMsgs = num; 1979 m_onMaxMsgs = call; 1980 } 1981 } 1982 1983 /* 1984 * If maxMsgs is not set, the message is added to the queue and the 1985 * owner is notified. If the queue is full, the message will still be 1986 * accepted if it is a control message, otherwise onCrowdingDoThis is 1987 * called. If the routine returns true, this call will block until 1988 * the owner has made space available in the queue. If it returns 1989 * false, this call will abort. 1990 * 1991 * Params: 1992 * msg = The message to put in the queue. 1993 * 1994 * Throws: 1995 * An exception if the queue is full and onCrowdingDoThis throws. 1996 */ 1997 final void put(ref Message msg) 1998 { 1999 synchronized (m_lock) 2000 { 2001 // TODO: Generate an error here if m_closed is true, or maybe 2002 // put a message in the caller's queue? 2003 if (!m_closed) 2004 { 2005 while (true) 2006 { 2007 if (isPriorityMsg(msg)) 2008 { 2009 m_sharedPty.put(msg); 2010 m_putMsg.notify(); 2011 return; 2012 } 2013 if (!mboxFull() || isControlMsg(msg)) 2014 { 2015 m_sharedBox.put(msg); 2016 m_putMsg.notify(); 2017 return; 2018 } 2019 if (m_onMaxMsgs !is null && !m_onMaxMsgs(thisTid)) 2020 { 2021 return; 2022 } 2023 m_putQueue++; 2024 m_notFull.wait(); 2025 m_putQueue--; 2026 } 2027 } 2028 } 2029 } 2030 2031 /* 2032 * Matches ops against each message in turn until a match is found. 2033 * 2034 * Params: 2035 * ops = The operations to match. Each may return a bool to indicate 2036 * whether a message with a matching type is truly a match. 2037 * 2038 * Returns: 2039 * true if a message was retrieved and false if not (such as if a 2040 * timeout occurred). 2041 * 2042 * Throws: 2043 * LinkTerminated if a linked thread terminated, or OwnerTerminated 2044 * if the owner thread terminates and no existing messages match the 2045 * supplied ops. 2046 */ 2047 bool get(T...)(scope T vals) 2048 { 2049 import std.meta : AliasSeq; 2050 2051 static assert(T.length, "T must not be empty"); 2052 2053 static if (isImplicitlyConvertible!(T[0], Duration)) 2054 { 2055 alias Ops = AliasSeq!(T[1 .. $]); 2056 alias ops = vals[1 .. $]; 2057 enum timedWait = true; 2058 Duration period = vals[0]; 2059 } 2060 else 2061 { 2062 alias Ops = AliasSeq!(T); 2063 alias ops = vals[0 .. $]; 2064 enum timedWait = false; 2065 } 2066 2067 bool onStandardMsg(ref Message msg) 2068 { 2069 foreach (i, t; Ops) 2070 { 2071 alias Args = Parameters!(t); 2072 auto op = ops[i]; 2073 2074 if (msg.convertsTo!(Args)) 2075 { 2076 static if (is(ReturnType!(t) == bool)) 2077 { 2078 return msg.map(op); 2079 } 2080 else 2081 { 2082 msg.map(op); 2083 return true; 2084 } 2085 } 2086 } 2087 return false; 2088 } 2089 2090 bool onLinkDeadMsg(ref Message msg) 2091 { 2092 assert(msg.convertsTo!(Tid), 2093 "Message could be converted to Tid"); 2094 auto tid = msg.get!(Tid); 2095 2096 if (bool* pDepends = tid in thisInfo.links) 2097 { 2098 auto depends = *pDepends; 2099 thisInfo.links.remove(tid); 2100 // Give the owner relationship precedence. 2101 if (depends && tid != thisInfo.owner) 2102 { 2103 auto e = new LinkTerminated(tid); 2104 auto m = Message(MsgType.standard, e); 2105 if (onStandardMsg(m)) 2106 return true; 2107 throw e; 2108 } 2109 } 2110 if (tid == thisInfo.owner) 2111 { 2112 thisInfo.owner = Tid.init; 2113 auto e = new OwnerTerminated(tid); 2114 auto m = Message(MsgType.standard, e); 2115 if (onStandardMsg(m)) 2116 return true; 2117 throw e; 2118 } 2119 return false; 2120 } 2121 2122 bool onControlMsg(ref Message msg) 2123 { 2124 switch (msg.type) 2125 { 2126 case MsgType.linkDead: 2127 return onLinkDeadMsg(msg); 2128 default: 2129 return false; 2130 } 2131 } 2132 2133 bool scan(ref ListT list) 2134 { 2135 for (auto range = list[]; !range.empty;) 2136 { 2137 // Only the message handler will throw, so if this occurs 2138 // we can be certain that the message was handled. 2139 scope (failure) 2140 list.removeAt(range); 2141 2142 if (isControlMsg(range.front)) 2143 { 2144 if (onControlMsg(range.front)) 2145 { 2146 // Although the linkDead message is a control message, 2147 // it can be handled by the user. Since the linkDead 2148 // message throws if not handled, if we get here then 2149 // it has been handled and we can return from receive. 2150 // This is a weird special case that will have to be 2151 // handled in a more general way if more are added. 2152 if (!isLinkDeadMsg(range.front)) 2153 { 2154 list.removeAt(range); 2155 continue; 2156 } 2157 list.removeAt(range); 2158 return true; 2159 } 2160 range.popFront(); 2161 continue; 2162 } 2163 else 2164 { 2165 if (onStandardMsg(range.front)) 2166 { 2167 list.removeAt(range); 2168 return true; 2169 } 2170 range.popFront(); 2171 continue; 2172 } 2173 } 2174 return false; 2175 } 2176 2177 bool pty(ref ListT list) 2178 { 2179 if (!list.empty) 2180 { 2181 auto range = list[]; 2182 2183 if (onStandardMsg(range.front)) 2184 { 2185 list.removeAt(range); 2186 return true; 2187 } 2188 if (range.front.convertsTo!(Throwable)) 2189 throw range.front.get!(Throwable); 2190 else if (range.front.convertsTo!(shared(Throwable))) 2191 throw range.front.get!(shared(Throwable)); 2192 else 2193 throw new PriorityMessageException(range.front.data); 2194 } 2195 return false; 2196 } 2197 2198 static if (timedWait) 2199 { 2200 import core.time : MonoTime; 2201 auto limit = MonoTime.currTime + period; 2202 } 2203 2204 while (true) 2205 { 2206 ListT arrived; 2207 2208 if (pty(m_localPty) || scan(m_localBox)) 2209 { 2210 return true; 2211 } 2212 yield(); 2213 synchronized (m_lock) 2214 { 2215 updateMsgCount(); 2216 while (m_sharedPty.empty && m_sharedBox.empty) 2217 { 2218 // NOTE: We're notifying all waiters here instead of just 2219 // a few because the onCrowding behavior may have 2220 // changed and we don't want to block sender threads 2221 // unnecessarily if the new behavior is not to block. 2222 // This will admittedly result in spurious wakeups 2223 // in other situations, but what can you do? 2224 if (m_putQueue && !mboxFull()) 2225 m_notFull.notifyAll(); 2226 static if (timedWait) 2227 { 2228 if (period <= Duration.zero || !m_putMsg.wait(period)) 2229 return false; 2230 } 2231 else 2232 { 2233 m_putMsg.wait(); 2234 } 2235 } 2236 m_localPty.put(m_sharedPty); 2237 arrived.put(m_sharedBox); 2238 } 2239 if (m_localPty.empty) 2240 { 2241 scope (exit) m_localBox.put(arrived); 2242 if (scan(arrived)) 2243 { 2244 return true; 2245 } 2246 else 2247 { 2248 static if (timedWait) 2249 { 2250 period = limit - MonoTime.currTime; 2251 } 2252 continue; 2253 } 2254 } 2255 m_localBox.put(arrived); 2256 pty(m_localPty); 2257 return true; 2258 } 2259 } 2260 2261 /* 2262 * Called on thread termination. This routine processes any remaining 2263 * control messages, clears out message queues, and sets a flag to 2264 * reject any future messages. 2265 */ 2266 final void close() 2267 { 2268 static void onLinkDeadMsg(ref Message msg) 2269 { 2270 assert(msg.convertsTo!(Tid), 2271 "Message could be converted to Tid"); 2272 auto tid = msg.get!(Tid); 2273 2274 thisInfo.links.remove(tid); 2275 if (tid == thisInfo.owner) 2276 thisInfo.owner = Tid.init; 2277 } 2278 2279 static void sweep(ref ListT list) 2280 { 2281 for (auto range = list[]; !range.empty; range.popFront()) 2282 { 2283 if (range.front.type == MsgType.linkDead) 2284 onLinkDeadMsg(range.front); 2285 } 2286 } 2287 2288 ListT arrived; 2289 2290 sweep(m_localBox); 2291 synchronized (m_lock) 2292 { 2293 arrived.put(m_sharedBox); 2294 m_closed = true; 2295 } 2296 m_localBox.clear(); 2297 sweep(arrived); 2298 } 2299 2300 private: 2301 // Routines involving local data only, no lock needed. 2302 2303 bool mboxFull() @safe @nogc pure nothrow 2304 { 2305 return m_maxMsgs && m_maxMsgs <= m_localMsgs + m_sharedBox.length; 2306 } 2307 2308 void updateMsgCount() @safe @nogc pure nothrow 2309 { 2310 m_localMsgs = m_localBox.length; 2311 } 2312 2313 bool isControlMsg(ref Message msg) @safe @nogc pure nothrow 2314 { 2315 return msg.type != MsgType.standard && msg.type != MsgType.priority; 2316 } 2317 2318 bool isPriorityMsg(ref Message msg) @safe @nogc pure nothrow 2319 { 2320 return msg.type == MsgType.priority; 2321 } 2322 2323 bool isLinkDeadMsg(ref Message msg) @safe @nogc pure nothrow 2324 { 2325 return msg.type == MsgType.linkDead; 2326 } 2327 2328 alias OnMaxFn = bool function(Tid); 2329 alias ListT = List!(Message); 2330 2331 ListT m_localBox; 2332 ListT m_localPty; 2333 2334 Mutex m_lock; 2335 Condition m_putMsg; 2336 Condition m_notFull; 2337 size_t m_putQueue; 2338 ListT m_sharedBox; 2339 ListT m_sharedPty; 2340 OnMaxFn m_onMaxMsgs; 2341 size_t m_localMsgs; 2342 size_t m_maxMsgs; 2343 bool m_closed; 2344 } 2345 2346 /* 2347 * 2348 */ 2349 struct List(T) 2350 { 2351 struct Range 2352 { 2353 import std.exception : enforce; 2354 2355 @property bool empty() const 2356 { 2357 return !m_prev.next; 2358 } 2359 2360 @property ref T front() 2361 { 2362 enforce(m_prev.next, "invalid list node"); 2363 return m_prev.next.val; 2364 } 2365 2366 @property void front(T val) 2367 { 2368 enforce(m_prev.next, "invalid list node"); 2369 m_prev.next.val = val; 2370 } 2371 2372 void popFront() 2373 { 2374 enforce(m_prev.next, "invalid list node"); 2375 m_prev = m_prev.next; 2376 } 2377 2378 private this(Node* p) 2379 { 2380 m_prev = p; 2381 } 2382 2383 private Node* m_prev; 2384 } 2385 2386 void put(T val) 2387 { 2388 put(newNode(val)); 2389 } 2390 2391 void put(ref List!(T) rhs) 2392 { 2393 if (!rhs.empty) 2394 { 2395 put(rhs.m_first); 2396 while (m_last.next !is null) 2397 { 2398 m_last = m_last.next; 2399 m_count++; 2400 } 2401 rhs.m_first = null; 2402 rhs.m_last = null; 2403 rhs.m_count = 0; 2404 } 2405 } 2406 2407 Range opSlice() 2408 { 2409 return Range(cast(Node*)&m_first); 2410 } 2411 2412 void removeAt(Range r) 2413 { 2414 import std.exception : enforce; 2415 2416 assert(m_count, "Can not remove from empty Range"); 2417 Node* n = r.m_prev; 2418 enforce(n && n.next, "attempting to remove invalid list node"); 2419 2420 if (m_last is m_first) 2421 m_last = null; 2422 else if (m_last is n.next) 2423 m_last = n; // nocoverage 2424 Node* to_free = n.next; 2425 n.next = n.next.next; 2426 freeNode(to_free); 2427 m_count--; 2428 } 2429 2430 @property size_t length() 2431 { 2432 return m_count; 2433 } 2434 2435 void clear() 2436 { 2437 m_first = m_last = null; 2438 m_count = 0; 2439 } 2440 2441 @property bool empty() 2442 { 2443 return m_first is null; 2444 } 2445 2446 private: 2447 struct Node 2448 { 2449 Node* next; 2450 T val; 2451 2452 this(T v) 2453 { 2454 val = v; 2455 } 2456 } 2457 2458 static shared struct SpinLock 2459 { 2460 void lock() { while (!cas(&locked, false, true)) { Thread.yield(); } } 2461 void unlock() { atomicStore!(MemoryOrder.rel)(locked, false); } 2462 bool locked; 2463 } 2464 2465 static shared SpinLock sm_lock; 2466 static shared Node* sm_head; 2467 2468 Node* newNode(T v) 2469 { 2470 Node* n; 2471 { 2472 sm_lock.lock(); 2473 scope (exit) sm_lock.unlock(); 2474 2475 if (sm_head) 2476 { 2477 n = cast(Node*) sm_head; 2478 sm_head = sm_head.next; 2479 } 2480 } 2481 if (n) 2482 { 2483 import std.conv : emplace; 2484 emplace!Node(n, v); 2485 } 2486 else 2487 { 2488 n = new Node(v); 2489 } 2490 return n; 2491 } 2492 2493 void freeNode(Node* n) 2494 { 2495 // destroy val to free any owned GC memory 2496 destroy(n.val); 2497 2498 sm_lock.lock(); 2499 scope (exit) sm_lock.unlock(); 2500 2501 auto sn = cast(shared(Node)*) n; 2502 sn.next = sm_head; 2503 sm_head = sn; 2504 } 2505 2506 void put(Node* n) 2507 { 2508 m_count++; 2509 if (!empty) 2510 { 2511 m_last.next = n; 2512 m_last = n; 2513 return; 2514 } 2515 m_first = n; 2516 m_last = n; 2517 } 2518 2519 Node* m_first; 2520 Node* m_last; 2521 size_t m_count; 2522 } 2523 } 2524 2525 @system unittest 2526 { 2527 import std.typecons : tuple, Tuple; 2528 2529 static void testfn(Tid tid) 2530 { 2531 receive((float val) { assert(0); }, (int val, int val2) { 2532 assert(val == 42 && val2 == 86); 2533 }); 2534 receive((Tuple!(int, int) val) { assert(val[0] == 42 && val[1] == 86); }); 2535 receive((Variant val) { }); 2536 receive((string val) { 2537 if ("the quick brown fox" != val) 2538 return false; 2539 return true; 2540 }, (string val) { assert(false); }); 2541 prioritySend(tid, "done"); 2542 } 2543 2544 static void runTest(Tid tid) 2545 { 2546 send(tid, 42, 86); 2547 send(tid, tuple(42, 86)); 2548 send(tid, "hello", "there"); 2549 send(tid, "the quick brown fox"); 2550 receive((string val) { assert(val == "done"); }); 2551 } 2552 2553 static void simpleTest() 2554 { 2555 auto tid = spawn(&testfn, thisTid); 2556 runTest(tid); 2557 2558 // Run the test again with a limited mailbox size. 2559 tid = spawn(&testfn, thisTid); 2560 setMaxMailboxSize(tid, 2, OnCrowding.block); 2561 runTest(tid); 2562 } 2563 2564 simpleTest(); 2565 2566 scheduler = new ThreadScheduler; 2567 simpleTest(); 2568 scheduler = null; 2569 } 2570 2571 private @property shared(Mutex) initOnceLock() 2572 { 2573 static shared Mutex lock; 2574 if (auto mtx = atomicLoad!(MemoryOrder.acq)(lock)) 2575 return mtx; 2576 auto mtx = new shared Mutex; 2577 if (cas(&lock, cast(shared) null, mtx)) 2578 return mtx; 2579 return atomicLoad!(MemoryOrder.acq)(lock); 2580 } 2581 2582 /** 2583 * Initializes $(D_PARAM var) with the lazy $(D_PARAM init) value in a 2584 * thread-safe manner. 2585 * 2586 * The implementation guarantees that all threads simultaneously calling 2587 * initOnce with the same $(D_PARAM var) argument block until $(D_PARAM var) is 2588 * fully initialized. All side-effects of $(D_PARAM init) are globally visible 2589 * afterwards. 2590 * 2591 * Params: 2592 * var = The variable to initialize 2593 * init = The lazy initializer value 2594 * 2595 * Returns: 2596 * A reference to the initialized variable 2597 */ 2598 auto ref initOnce(alias var)(lazy typeof(var) init) 2599 { 2600 return initOnce!var(init, initOnceLock); 2601 } 2602 2603 /// A typical use-case is to perform lazy but thread-safe initialization. 2604 @system unittest 2605 { 2606 static class MySingleton 2607 { 2608 static MySingleton instance() 2609 { 2610 __gshared MySingleton inst; 2611 return initOnce!inst(new MySingleton); 2612 } 2613 } 2614 2615 assert(MySingleton.instance !is null); 2616 } 2617 2618 @system unittest 2619 { 2620 static class MySingleton 2621 { 2622 static MySingleton instance() 2623 { 2624 __gshared MySingleton inst; 2625 return initOnce!inst(new MySingleton); 2626 } 2627 2628 private: 2629 this() { val = ++cnt; } 2630 size_t val; 2631 __gshared size_t cnt; 2632 } 2633 2634 foreach (_; 0 .. 10) 2635 spawn({ ownerTid.send(MySingleton.instance.val); }); 2636 foreach (_; 0 .. 10) 2637 assert(receiveOnly!size_t == MySingleton.instance.val); 2638 assert(MySingleton.cnt == 1); 2639 } 2640 2641 /** 2642 * Same as above, but takes a separate mutex instead of sharing one among 2643 * all initOnce instances. 2644 * 2645 * This should be used to avoid dead-locks when the $(D_PARAM init) 2646 * expression waits for the result of another thread that might also 2647 * call initOnce. Use with care. 2648 * 2649 * Params: 2650 * var = The variable to initialize 2651 * init = The lazy initializer value 2652 * mutex = A mutex to prevent race conditions 2653 * 2654 * Returns: 2655 * A reference to the initialized variable 2656 */ 2657 auto ref initOnce(alias var)(lazy typeof(var) init, shared Mutex mutex) 2658 { 2659 // check that var is global, can't take address of a TLS variable 2660 static assert(is(typeof({ __gshared p = &var; })), 2661 "var must be 'static shared' or '__gshared'."); 2662 import core.atomic : atomicLoad, MemoryOrder, atomicStore; 2663 2664 static shared bool flag; 2665 if (!atomicLoad!(MemoryOrder.acq)(flag)) 2666 { 2667 synchronized (mutex) 2668 { 2669 if (!atomicLoad!(MemoryOrder.raw)(flag)) 2670 { 2671 var = init; 2672 atomicStore!(MemoryOrder.rel)(flag, true); 2673 } 2674 } 2675 } 2676 return var; 2677 } 2678 2679 /// ditto 2680 auto ref initOnce(alias var)(lazy typeof(var) init, Mutex mutex) 2681 { 2682 return initOnce!var(init, cast(shared) mutex); 2683 } 2684 2685 /// Use a separate mutex when init blocks on another thread that might also call initOnce. 2686 @system unittest 2687 { 2688 import core.sync.mutex : Mutex; 2689 2690 static shared bool varA, varB; 2691 static shared Mutex m; 2692 m = new shared Mutex; 2693 2694 spawn({ 2695 // use a different mutex for varB to avoid a dead-lock 2696 initOnce!varB(true, m); 2697 ownerTid.send(true); 2698 }); 2699 // init depends on the result of the spawned thread 2700 initOnce!varA(receiveOnly!bool); 2701 assert(varA == true); 2702 assert(varB == true); 2703 } 2704 2705 @system unittest 2706 { 2707 static shared bool a; 2708 __gshared bool b; 2709 static bool c; 2710 bool d; 2711 initOnce!a(true); 2712 initOnce!b(true); 2713 static assert(!__traits(compiles, initOnce!c(true))); // TLS 2714 static assert(!__traits(compiles, initOnce!d(true))); // local variable 2715 } 2716 2717 // test ability to send shared arrays 2718 @system unittest 2719 { 2720 static shared int[] x = new shared(int)[1]; 2721 auto tid = spawn({ 2722 auto arr = receiveOnly!(shared(int)[]); 2723 arr[0] = 5; 2724 ownerTid.send(true); 2725 }); 2726 tid.send(x); 2727 receiveOnly!(bool); 2728 assert(x[0] == 5); 2729 }