1 /**
2  * This is a low-level messaging API upon which more structured or restrictive
3  * APIs may be built.  The general idea is that every messageable entity is
4  * represented by a common handle type called a Tid, which allows messages to
5  * be sent to logical threads that are executing in both the current process
6  * and in external processes using the same interface.  This is an important
7  * aspect of scalability because it allows the components of a program to be
8  * spread across available resources with few to no changes to the actual
9  * implementation.
10  *
11  * A logical thread is an execution context that has its own stack and which
12  * runs asynchronously to other logical threads.  These may be preemptively
13  * scheduled kernel threads, fibers (cooperative user-space threads), or some
14  * other concept with similar behavior.
15  *
16  * The type of concurrency used when logical threads are created is determined
17  * by the Scheduler selected at initialization time.  The default behavior is
18  * currently to create a new kernel thread per call to spawn, but other
19  * schedulers are available that multiplex fibers across the main thread or
20  * use some combination of the two approaches.
21  *
22  * Copyright: Copyright Sean Kelly 2009 - 2014.
23  * License:   <a href="http://www.boost.org/LICENSE_1_0.txt">Boost License 1.0</a>.
24  * Authors:   Sean Kelly, Alex Rønne Petersen, Martin Nowak
25  * Source:    $(PHOBOSSRC std/concurrency.d)
26  */
27 /*          Copyright Sean Kelly 2009 - 2014.
28  * Distributed under the Boost Software License, Version 1.0.
29  *    (See accompanying file LICENSE_1_0.txt or copy at
30  *          http://www.boost.org/LICENSE_1_0.txt)
31  */
32 module std.concurrency;
33 
34 public import std.variant;
35 
36 import core.atomic;
37 import core.sync.condition;
38 import core.sync.mutex;
39 import core.thread;
40 import std.range.primitives;
41 import std.range.interfaces : InputRange;
42 import std.traits;
43 
44 ///
45 @system unittest
46 {
47     __gshared string received;
48     static void spawnedFunc(Tid ownerTid)
49     {
50         import std.conv : text;
51         // Receive a message from the owner thread.
52         receive((int i){
53             received = text("Received the number ", i);
54 
55             // Send a message back to the owner thread
56             // indicating success.
57             send(ownerTid, true);
58         });
59     }
60 
61     // Start spawnedFunc in a new thread.
62     auto childTid = spawn(&spawnedFunc, thisTid);
63 
64     // Send the number 42 to this new thread.
65     send(childTid, 42);
66 
67     // Receive the result code.
68     auto wasSuccessful = receiveOnly!(bool);
69     assert(wasSuccessful);
70     assert(received == "Received the number 42");
71 }
72 
73 private
74 {
75     bool hasLocalAliasing(Types...)()
76     {
77         import std.typecons : Rebindable;
78 
79         // Works around "statement is not reachable"
80         bool doesIt = false;
81         static foreach (T; Types)
82         {
83             static if (is(T == Tid))
84             { /* Allowed */ }
85             else static if (is(T : Rebindable!R, R))
86                 doesIt |= hasLocalAliasing!R;
87             else static if (is(T == struct))
88                 doesIt |= hasLocalAliasing!(typeof(T.tupleof));
89             else
90                 doesIt |= std.traits.hasUnsharedAliasing!(T);
91         }
92         return doesIt;
93     }
94 
95     @safe unittest
96     {
97         static struct Container { Tid t; }
98         static assert(!hasLocalAliasing!(Tid, Container, int));
99     }
100 
101     // https://issues.dlang.org/show_bug.cgi?id=20097
102     @safe unittest
103     {
104         import std.datetime.systime : SysTime;
105         static struct Container { SysTime time; }
106         static assert(!hasLocalAliasing!(SysTime, Container));
107     }
108 
109     enum MsgType
110     {
111         standard,
112         priority,
113         linkDead,
114     }
115 
116     struct Message
117     {
118         MsgType type;
119         Variant data;
120 
121         this(T...)(MsgType t, T vals) if (T.length > 0)
122         {
123             static if (T.length == 1)
124             {
125                 type = t;
126                 data = vals[0];
127             }
128             else
129             {
130                 import std.typecons : Tuple;
131 
132                 type = t;
133                 data = Tuple!(T)(vals);
134             }
135         }
136 
137         @property auto convertsTo(T...)()
138         {
139             static if (T.length == 1)
140             {
141                 return is(T[0] == Variant) || data.convertsTo!(T);
142             }
143             else
144             {
145                 import std.typecons : Tuple;
146                 return data.convertsTo!(Tuple!(T));
147             }
148         }
149 
150         @property auto get(T...)()
151         {
152             static if (T.length == 1)
153             {
154                 static if (is(T[0] == Variant))
155                     return data;
156                 else
157                     return data.get!(T);
158             }
159             else
160             {
161                 import std.typecons : Tuple;
162                 return data.get!(Tuple!(T));
163             }
164         }
165 
166         auto map(Op)(Op op)
167         {
168             alias Args = Parameters!(Op);
169 
170             static if (Args.length == 1)
171             {
172                 static if (is(Args[0] == Variant))
173                     return op(data);
174                 else
175                     return op(data.get!(Args));
176             }
177             else
178             {
179                 import std.typecons : Tuple;
180                 return op(data.get!(Tuple!(Args)).expand);
181             }
182         }
183     }
184 
185     void checkops(T...)(T ops)
186     {
187         import std.format : format;
188 
189         foreach (i, t1; T)
190         {
191             static assert(isFunctionPointer!t1 || isDelegate!t1,
192                     format!"T %d is not a function pointer or delegates"(i));
193             alias a1 = Parameters!(t1);
194             alias r1 = ReturnType!(t1);
195 
196             static if (i < T.length - 1 && is(r1 == void))
197             {
198                 static assert(a1.length != 1 || !is(a1[0] == Variant),
199                               "function with arguments " ~ a1.stringof ~
200                               " occludes successive function");
201 
202                 foreach (t2; T[i + 1 .. $])
203                 {
204                     alias a2 = Parameters!(t2);
205 
206                     static assert(!is(a1 == a2),
207                         "function with arguments " ~ a1.stringof ~ " occludes successive function");
208                 }
209             }
210         }
211     }
212 
213     @property ref ThreadInfo thisInfo() nothrow
214     {
215         if (scheduler is null)
216             return ThreadInfo.thisInfo;
217         return scheduler.thisInfo;
218     }
219 }
220 
221 static ~this()
222 {
223     thisInfo.cleanup();
224 }
225 
226 // Exceptions
227 
228 /**
229  * Thrown on calls to `receiveOnly` if a message other than the type
230  * the receiving thread expected is sent.
231  */
232 class MessageMismatch : Exception
233 {
234     ///
235     this(string msg = "Unexpected message type") @safe pure nothrow @nogc
236     {
237         super(msg);
238     }
239 }
240 
241 /**
242  * Thrown on calls to `receive` if the thread that spawned the receiving
243  * thread has terminated and no more messages exist.
244  */
245 class OwnerTerminated : Exception
246 {
247     ///
248     this(Tid t, string msg = "Owner terminated") @safe pure nothrow @nogc
249     {
250         super(msg);
251         tid = t;
252     }
253 
254     Tid tid;
255 }
256 
257 /**
258  * Thrown if a linked thread has terminated.
259  */
260 class LinkTerminated : Exception
261 {
262     ///
263     this(Tid t, string msg = "Link terminated") @safe pure nothrow @nogc
264     {
265         super(msg);
266         tid = t;
267     }
268 
269     Tid tid;
270 }
271 
272 /**
273  * Thrown if a message was sent to a thread via
274  * $(REF prioritySend, std,concurrency) and the receiver does not have a handler
275  * for a message of this type.
276  */
277 class PriorityMessageException : Exception
278 {
279     ///
280     this(Variant vals)
281     {
282         super("Priority message");
283         message = vals;
284     }
285 
286     /**
287      * The message that was sent.
288      */
289     Variant message;
290 }
291 
292 /**
293  * Thrown on mailbox crowding if the mailbox is configured with
294  * `OnCrowding.throwException`.
295  */
296 class MailboxFull : Exception
297 {
298     ///
299     this(Tid t, string msg = "Mailbox full") @safe pure nothrow @nogc
300     {
301         super(msg);
302         tid = t;
303     }
304 
305     Tid tid;
306 }
307 
308 /**
309  * Thrown when a Tid is missing, e.g. when `ownerTid` doesn't
310  * find an owner thread.
311  */
312 class TidMissingException : Exception
313 {
314     import std.exception : basicExceptionCtors;
315     ///
316     mixin basicExceptionCtors;
317 }
318 
319 
320 // Thread ID
321 
322 
323 /**
324  * An opaque type used to represent a logical thread.
325  */
326 struct Tid
327 {
328 private:
329     this(MessageBox m) @safe pure nothrow @nogc
330     {
331         mbox = m;
332     }
333 
334     MessageBox mbox;
335 
336 public:
337 
338     /**
339      * Generate a convenient string for identifying this Tid.  This is only
340      * useful to see if Tid's that are currently executing are the same or
341      * different, e.g. for logging and debugging.  It is potentially possible
342      * that a Tid executed in the future will have the same toString() output
343      * as another Tid that has already terminated.
344      */
345     void toString(scope void delegate(const(char)[]) sink)
346     {
347         import std.format : formattedWrite;
348         formattedWrite(sink, "Tid(%x)", cast(void*) mbox);
349     }
350 
351 }
352 
353 @system unittest
354 {
355     // text!Tid is @system
356     import std.conv : text;
357     Tid tid;
358     assert(text(tid) == "Tid(0)");
359     auto tid2 = thisTid;
360     assert(text(tid2) != "Tid(0)");
361     auto tid3 = tid2;
362     assert(text(tid2) == text(tid3));
363 }
364 
365 /**
366  * Returns: The $(LREF Tid) of the caller's thread.
367  */
368 @property Tid thisTid() @safe
369 {
370     // TODO: remove when concurrency is safe
371     static auto trus() @trusted
372     {
373         if (thisInfo.ident != Tid.init)
374             return thisInfo.ident;
375         thisInfo.ident = Tid(new MessageBox);
376         return thisInfo.ident;
377     }
378 
379     return trus();
380 }
381 
382 /**
383  * Return the Tid of the thread which spawned the caller's thread.
384  *
385  * Throws: A `TidMissingException` exception if
386  * there is no owner thread.
387  */
388 @property Tid ownerTid()
389 {
390     import std.exception : enforce;
391 
392     enforce!TidMissingException(thisInfo.owner.mbox !is null, "Error: Thread has no owner thread.");
393     return thisInfo.owner;
394 }
395 
396 @system unittest
397 {
398     import std.exception : assertThrown;
399 
400     static void fun()
401     {
402         string res = receiveOnly!string();
403         assert(res == "Main calling");
404         ownerTid.send("Child responding");
405     }
406 
407     assertThrown!TidMissingException(ownerTid);
408     auto child = spawn(&fun);
409     child.send("Main calling");
410     string res = receiveOnly!string();
411     assert(res == "Child responding");
412 }
413 
414 // Thread Creation
415 
416 private template isSpawnable(F, T...)
417 {
418     template isParamsImplicitlyConvertible(F1, F2, int i = 0)
419     {
420         alias param1 = Parameters!F1;
421         alias param2 = Parameters!F2;
422         static if (param1.length != param2.length)
423             enum isParamsImplicitlyConvertible = false;
424         else static if (param1.length == i)
425             enum isParamsImplicitlyConvertible = true;
426         else static if (isImplicitlyConvertible!(param2[i], param1[i]))
427             enum isParamsImplicitlyConvertible = isParamsImplicitlyConvertible!(F1,
428                     F2, i + 1);
429         else
430             enum isParamsImplicitlyConvertible = false;
431     }
432 
433     enum isSpawnable = isCallable!F && is(ReturnType!F == void)
434             && isParamsImplicitlyConvertible!(F, void function(T))
435             && (isFunctionPointer!F || !hasUnsharedAliasing!F);
436 }
437 
438 /**
439  * Starts fn(args) in a new logical thread.
440  *
441  * Executes the supplied function in a new logical thread represented by
442  * `Tid`.  The calling thread is designated as the owner of the new thread.
443  * When the owner thread terminates an `OwnerTerminated` message will be
444  * sent to the new thread, causing an `OwnerTerminated` exception to be
445  * thrown on `receive()`.
446  *
447  * Params:
448  *  fn   = The function to execute.
449  *  args = Arguments to the function.
450  *
451  * Returns:
452  *  A Tid representing the new logical thread.
453  *
454  * Notes:
455  *  `args` must not have unshared aliasing.  In other words, all arguments
456  *  to `fn` must either be `shared` or `immutable` or have no
457  *  pointer indirection.  This is necessary for enforcing isolation among
458  *  threads.
459  */
460 Tid spawn(F, T...)(F fn, T args)
461 if (isSpawnable!(F, T))
462 {
463     static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed.");
464     return _spawn(false, fn, args);
465 }
466 
467 ///
468 @system unittest
469 {
470     static void f(string msg)
471     {
472         assert(msg == "Hello World");
473     }
474 
475     auto tid = spawn(&f, "Hello World");
476 }
477 
478 /// Fails: char[] has mutable aliasing.
479 @system unittest
480 {
481     string msg = "Hello, World!";
482 
483     static void f1(string msg) {}
484     static assert(!__traits(compiles, spawn(&f1, msg.dup)));
485     static assert( __traits(compiles, spawn(&f1, msg.idup)));
486 
487     static void f2(char[] msg) {}
488     static assert(!__traits(compiles, spawn(&f2, msg.dup)));
489     static assert(!__traits(compiles, spawn(&f2, msg.idup)));
490 }
491 
492 /// New thread with anonymous function
493 @system unittest
494 {
495     spawn({
496         ownerTid.send("This is so great!");
497     });
498     assert(receiveOnly!string == "This is so great!");
499 }
500 
501 @system unittest
502 {
503     import core.thread : thread_joinAll;
504 
505     __gshared string receivedMessage;
506     static void f1(string msg)
507     {
508         receivedMessage = msg;
509     }
510 
511     auto tid1 = spawn(&f1, "Hello World");
512     thread_joinAll;
513     assert(receivedMessage == "Hello World");
514 }
515 
516 /**
517  * Starts fn(args) in a logical thread and will receive a LinkTerminated
518  * message when the operation terminates.
519  *
520  * Executes the supplied function in a new logical thread represented by
521  * Tid.  This new thread is linked to the calling thread so that if either
522  * it or the calling thread terminates a LinkTerminated message will be sent
523  * to the other, causing a LinkTerminated exception to be thrown on receive().
524  * The owner relationship from spawn() is preserved as well, so if the link
525  * between threads is broken, owner termination will still result in an
526  * OwnerTerminated exception to be thrown on receive().
527  *
528  * Params:
529  *  fn   = The function to execute.
530  *  args = Arguments to the function.
531  *
532  * Returns:
533  *  A Tid representing the new thread.
534  */
535 Tid spawnLinked(F, T...)(F fn, T args)
536 if (isSpawnable!(F, T))
537 {
538     static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed.");
539     return _spawn(true, fn, args);
540 }
541 
542 /*
543  *
544  */
545 private Tid _spawn(F, T...)(bool linked, F fn, T args)
546 if (isSpawnable!(F, T))
547 {
548     // TODO: MessageList and &exec should be shared.
549     auto spawnTid = Tid(new MessageBox);
550     auto ownerTid = thisTid;
551 
552     void exec()
553     {
554         thisInfo.ident = spawnTid;
555         thisInfo.owner = ownerTid;
556         fn(args);
557     }
558 
559     // TODO: MessageList and &exec should be shared.
560     if (scheduler !is null)
561         scheduler.spawn(&exec);
562     else
563     {
564         auto t = new Thread(&exec);
565         t.start();
566     }
567     thisInfo.links[spawnTid] = linked;
568     return spawnTid;
569 }
570 
571 @system unittest
572 {
573     void function() fn1;
574     void function(int) fn2;
575     static assert(__traits(compiles, spawn(fn1)));
576     static assert(__traits(compiles, spawn(fn2, 2)));
577     static assert(!__traits(compiles, spawn(fn1, 1)));
578     static assert(!__traits(compiles, spawn(fn2)));
579 
580     void delegate(int) shared dg1;
581     shared(void delegate(int)) dg2;
582     shared(void delegate(long) shared) dg3;
583     shared(void delegate(real, int, long) shared) dg4;
584     void delegate(int) immutable dg5;
585     void delegate(int) dg6;
586     static assert(__traits(compiles, spawn(dg1, 1)));
587     static assert(__traits(compiles, spawn(dg2, 2)));
588     static assert(__traits(compiles, spawn(dg3, 3)));
589     static assert(__traits(compiles, spawn(dg4, 4, 4, 4)));
590     static assert(__traits(compiles, spawn(dg5, 5)));
591     static assert(!__traits(compiles, spawn(dg6, 6)));
592 
593     auto callable1  = new class{ void opCall(int) shared {} };
594     auto callable2  = cast(shared) new class{ void opCall(int) shared {} };
595     auto callable3  = new class{ void opCall(int) immutable {} };
596     auto callable4  = cast(immutable) new class{ void opCall(int) immutable {} };
597     auto callable5  = new class{ void opCall(int) {} };
598     auto callable6  = cast(shared) new class{ void opCall(int) immutable {} };
599     auto callable7  = cast(immutable) new class{ void opCall(int) shared {} };
600     auto callable8  = cast(shared) new class{ void opCall(int) const shared {} };
601     auto callable9  = cast(const shared) new class{ void opCall(int) shared {} };
602     auto callable10 = cast(const shared) new class{ void opCall(int) const shared {} };
603     auto callable11 = cast(immutable) new class{ void opCall(int) const shared {} };
604     static assert(!__traits(compiles, spawn(callable1,  1)));
605     static assert( __traits(compiles, spawn(callable2,  2)));
606     static assert(!__traits(compiles, spawn(callable3,  3)));
607     static assert( __traits(compiles, spawn(callable4,  4)));
608     static assert(!__traits(compiles, spawn(callable5,  5)));
609     static assert(!__traits(compiles, spawn(callable6,  6)));
610     static assert(!__traits(compiles, spawn(callable7,  7)));
611     static assert( __traits(compiles, spawn(callable8,  8)));
612     static assert(!__traits(compiles, spawn(callable9,  9)));
613     static assert( __traits(compiles, spawn(callable10, 10)));
614     static assert( __traits(compiles, spawn(callable11, 11)));
615 }
616 
617 /**
618  * Places the values as a message at the back of tid's message queue.
619  *
620  * Sends the supplied value to the thread represented by tid.  As with
621  * $(REF spawn, std,concurrency), `T` must not have unshared aliasing.
622  */
623 void send(T...)(Tid tid, T vals)
624 in (tid.mbox !is null)
625 {
626     static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed.");
627     _send(tid, vals);
628 }
629 
630 /**
631  * Places the values as a message on the front of tid's message queue.
632  *
633  * Send a message to `tid` but place it at the front of `tid`'s message
634  * queue instead of at the back.  This function is typically used for
635  * out-of-band communication, to signal exceptional conditions, etc.
636  */
637 void prioritySend(T...)(Tid tid, T vals)
638 in (tid.mbox !is null)
639 {
640     static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed.");
641     _send(MsgType.priority, tid, vals);
642 }
643 
644 /*
645  * ditto
646  */
647 private void _send(T...)(Tid tid, T vals)
648 in (tid.mbox !is null)
649 {
650     _send(MsgType.standard, tid, vals);
651 }
652 
653 /*
654  * Implementation of send.  This allows parameter checking to be different for
655  * both Tid.send() and .send().
656  */
657 private void _send(T...)(MsgType type, Tid tid, T vals)
658 in (tid.mbox !is null)
659 {
660     auto msg = Message(type, vals);
661     tid.mbox.put(msg);
662 }
663 
664 /**
665  * Receives a message from another thread.
666  *
667  * Receive a message from another thread, or block if no messages of the
668  * specified types are available.  This function works by pattern matching
669  * a message against a set of delegates and executing the first match found.
670  *
671  * If a delegate that accepts a $(REF Variant, std,variant) is included as
672  * the last argument to `receive`, it will match any message that was not
673  * matched by an earlier delegate.  If more than one argument is sent,
674  * the `Variant` will contain a $(REF Tuple, std,typecons) of all values
675  * sent.
676  *
677  * Params:
678  *     ops = Variadic list of function pointers and delegates. Entries
679  *           in this list must not occlude later entries.
680  *
681  * Throws: $(LREF OwnerTerminated) when the sending thread was terminated.
682  */
683 void receive(T...)( T ops )
684 in
685 {
686     assert(thisInfo.ident.mbox !is null,
687            "Cannot receive a message until a thread was spawned "
688            ~ "or thisTid was passed to a running thread.");
689 }
690 do
691 {
692     checkops( ops );
693 
694     thisInfo.ident.mbox.get( ops );
695 }
696 
697 ///
698 @system unittest
699 {
700     import std.variant : Variant;
701 
702     auto process = ()
703     {
704         receive(
705             (int i) { ownerTid.send(1); },
706             (double f) { ownerTid.send(2); },
707             (Variant v) { ownerTid.send(3); }
708         );
709     };
710 
711     {
712         auto tid = spawn(process);
713         send(tid, 42);
714         assert(receiveOnly!int == 1);
715     }
716 
717     {
718         auto tid = spawn(process);
719         send(tid, 3.14);
720         assert(receiveOnly!int == 2);
721     }
722 
723     {
724         auto tid = spawn(process);
725         send(tid, "something else");
726         assert(receiveOnly!int == 3);
727     }
728 }
729 
730 @safe unittest
731 {
732     static assert( __traits( compiles,
733                       {
734                           receive( (Variant x) {} );
735                           receive( (int x) {}, (Variant x) {} );
736                       } ) );
737 
738     static assert( !__traits( compiles,
739                        {
740                            receive( (Variant x) {}, (int x) {} );
741                        } ) );
742 
743     static assert( !__traits( compiles,
744                        {
745                            receive( (int x) {}, (int x) {} );
746                        } ) );
747 }
748 
749 // Make sure receive() works with free functions as well.
750 version (StdUnittest)
751 {
752     private void receiveFunction(int x) {}
753 }
754 @safe unittest
755 {
756     static assert( __traits( compiles,
757                       {
758                           receive( &receiveFunction );
759                           receive( &receiveFunction, (Variant x) {} );
760                       } ) );
761 }
762 
763 
764 private template receiveOnlyRet(T...)
765 {
766     static if ( T.length == 1 )
767     {
768         alias receiveOnlyRet = T[0];
769     }
770     else
771     {
772         import std.typecons : Tuple;
773         alias receiveOnlyRet = Tuple!(T);
774     }
775 }
776 
777 /**
778  * Receives only messages with arguments of the specified types.
779  *
780  * Params:
781  *     T = Variadic list of types to be received.
782  *
783  * Returns: The received message.  If `T` has more than one entry,
784  *          the message will be packed into a $(REF Tuple, std,typecons).
785  *
786  * Throws: $(LREF MessageMismatch) if a message of types other than `T`
787  *         is received,
788  *         $(LREF OwnerTerminated) when the sending thread was terminated.
789  */
790 receiveOnlyRet!(T) receiveOnly(T...)()
791 in
792 {
793     assert(thisInfo.ident.mbox !is null,
794         "Cannot receive a message until a thread was spawned or thisTid was passed to a running thread.");
795 }
796 do
797 {
798     import std.format : format;
799     import std.typecons : Tuple;
800 
801     Tuple!(T) ret;
802 
803     thisInfo.ident.mbox.get((T val) {
804         static if (T.length)
805             ret.field = val;
806     },
807     (LinkTerminated e) { throw e; },
808     (OwnerTerminated e) { throw e; },
809     (Variant val) {
810         static if (T.length > 1)
811             string exp = T.stringof;
812         else
813             string exp = T[0].stringof;
814 
815         throw new MessageMismatch(
816             format("Unexpected message type: expected '%s', got '%s'", exp, val.type.toString()));
817     });
818     static if (T.length == 1)
819         return ret[0];
820     else
821         return ret;
822 }
823 
824 ///
825 @system unittest
826 {
827     auto tid = spawn(
828     {
829         assert(receiveOnly!int == 42);
830     });
831     send(tid, 42);
832 }
833 
834 ///
835 @system unittest
836 {
837     auto tid = spawn(
838     {
839         assert(receiveOnly!string == "text");
840     });
841     send(tid, "text");
842 }
843 
844 ///
845 @system unittest
846 {
847     struct Record { string name; int age; }
848 
849     auto tid = spawn(
850     {
851         auto msg = receiveOnly!(double, Record);
852         assert(msg[0] == 0.5);
853         assert(msg[1].name == "Alice");
854         assert(msg[1].age == 31);
855     });
856 
857     send(tid, 0.5, Record("Alice", 31));
858 }
859 
860 @system unittest
861 {
862     static void t1(Tid mainTid)
863     {
864         try
865         {
866             receiveOnly!string();
867             mainTid.send("");
868         }
869         catch (Throwable th)
870         {
871             mainTid.send(th.msg);
872         }
873     }
874 
875     auto tid = spawn(&t1, thisTid);
876     tid.send(1);
877     string result = receiveOnly!string();
878     assert(result == "Unexpected message type: expected 'string', got 'int'");
879 }
880 
881 /**
882  * Receives a message from another thread and gives up if no match
883  * arrives within a specified duration.
884  *
885  * Receive a message from another thread, or block until `duration` exceeds,
886  * if no messages of the specified types are available. This function works
887  * by pattern matching a message against a set of delegates and executing
888  * the first match found.
889  *
890  * If a delegate that accepts a $(REF Variant, std,variant) is included as
891  * the last argument, it will match any message that was not
892  * matched by an earlier delegate.  If more than one argument is sent,
893  * the `Variant` will contain a $(REF Tuple, std,typecons) of all values
894  * sent.
895  *
896  * Params:
897  *     duration = Duration, how long to wait. If `duration` is negative,
898  *                won't wait at all.
899  *     ops = Variadic list of function pointers and delegates. Entries
900  *           in this list must not occlude later entries.
901  *
902  * Returns: `true` if it received a message and `false` if it timed out waiting
903  *          for one.
904  *
905  * Throws: $(LREF OwnerTerminated) when the sending thread was terminated.
906  */
907 bool receiveTimeout(T...)(Duration duration, T ops)
908 in
909 {
910     assert(thisInfo.ident.mbox !is null,
911         "Cannot receive a message until a thread was spawned or thisTid was passed to a running thread.");
912 }
913 do
914 {
915     checkops(ops);
916 
917     return thisInfo.ident.mbox.get(duration, ops);
918 }
919 
920 @safe unittest
921 {
922     static assert(__traits(compiles, {
923         receiveTimeout(msecs(0), (Variant x) {});
924         receiveTimeout(msecs(0), (int x) {}, (Variant x) {});
925     }));
926 
927     static assert(!__traits(compiles, {
928         receiveTimeout(msecs(0), (Variant x) {}, (int x) {});
929     }));
930 
931     static assert(!__traits(compiles, {
932         receiveTimeout(msecs(0), (int x) {}, (int x) {});
933     }));
934 
935     static assert(__traits(compiles, {
936         receiveTimeout(msecs(10), (int x) {}, (Variant x) {});
937     }));
938 }
939 
940 // MessageBox Limits
941 
942 /**
943  * These behaviors may be specified when a mailbox is full.
944  */
945 enum OnCrowding
946 {
947     block, /// Wait until room is available.
948     throwException, /// Throw a MailboxFull exception.
949     ignore /// Abort the send and return.
950 }
951 
952 private
953 {
954     bool onCrowdingBlock(Tid tid) @safe pure nothrow @nogc
955     {
956         return true;
957     }
958 
959     bool onCrowdingThrow(Tid tid) @safe pure
960     {
961         throw new MailboxFull(tid);
962     }
963 
964     bool onCrowdingIgnore(Tid tid) @safe pure nothrow @nogc
965     {
966         return false;
967     }
968 }
969 
970 /**
971  * Sets a maximum mailbox size.
972  *
973  * Sets a limit on the maximum number of user messages allowed in the mailbox.
974  * If this limit is reached, the caller attempting to add a new message will
975  * execute the behavior specified by doThis.  If messages is zero, the mailbox
976  * is unbounded.
977  *
978  * Params:
979  *  tid      = The Tid of the thread for which this limit should be set.
980  *  messages = The maximum number of messages or zero if no limit.
981  *  doThis   = The behavior executed when a message is sent to a full
982  *             mailbox.
983  */
984 void setMaxMailboxSize(Tid tid, size_t messages, OnCrowding doThis) @safe pure
985 in (tid.mbox !is null)
986 {
987     final switch (doThis)
988     {
989     case OnCrowding.block:
990         return tid.mbox.setMaxMsgs(messages, &onCrowdingBlock);
991     case OnCrowding.throwException:
992         return tid.mbox.setMaxMsgs(messages, &onCrowdingThrow);
993     case OnCrowding.ignore:
994         return tid.mbox.setMaxMsgs(messages, &onCrowdingIgnore);
995     }
996 }
997 
998 /**
999  * Sets a maximum mailbox size.
1000  *
1001  * Sets a limit on the maximum number of user messages allowed in the mailbox.
1002  * If this limit is reached, the caller attempting to add a new message will
1003  * execute onCrowdingDoThis.  If messages is zero, the mailbox is unbounded.
1004  *
1005  * Params:
1006  *  tid      = The Tid of the thread for which this limit should be set.
1007  *  messages = The maximum number of messages or zero if no limit.
1008  *  onCrowdingDoThis = The routine called when a message is sent to a full
1009  *                     mailbox.
1010  */
1011 void setMaxMailboxSize(Tid tid, size_t messages, bool function(Tid) onCrowdingDoThis)
1012 in (tid.mbox !is null)
1013 {
1014     tid.mbox.setMaxMsgs(messages, onCrowdingDoThis);
1015 }
1016 
1017 private
1018 {
1019     __gshared Tid[string] tidByName;
1020     __gshared string[][Tid] namesByTid;
1021 }
1022 
1023 private @property Mutex registryLock()
1024 {
1025     __gshared Mutex impl;
1026     initOnce!impl(new Mutex);
1027     return impl;
1028 }
1029 
1030 private void unregisterMe(ref ThreadInfo me)
1031 {
1032     if (me.ident != Tid.init)
1033     {
1034         synchronized (registryLock)
1035         {
1036             if (auto allNames = me.ident in namesByTid)
1037             {
1038                 foreach (name; *allNames)
1039                     tidByName.remove(name);
1040                 namesByTid.remove(me.ident);
1041             }
1042         }
1043     }
1044 }
1045 
1046 /**
1047  * Associates name with tid.
1048  *
1049  * Associates name with tid in a process-local map.  When the thread
1050  * represented by tid terminates, any names associated with it will be
1051  * automatically unregistered.
1052  *
1053  * Params:
1054  *  name = The name to associate with tid.
1055  *  tid  = The tid register by name.
1056  *
1057  * Returns:
1058  *  true if the name is available and tid is not known to represent a
1059  *  defunct thread.
1060  */
1061 bool register(string name, Tid tid)
1062 in (tid.mbox !is null)
1063 {
1064     synchronized (registryLock)
1065     {
1066         if (name in tidByName)
1067             return false;
1068         if (tid.mbox.isClosed)
1069             return false;
1070         namesByTid[tid] ~= name;
1071         tidByName[name] = tid;
1072         return true;
1073     }
1074 }
1075 
1076 /**
1077  * Removes the registered name associated with a tid.
1078  *
1079  * Params:
1080  *  name = The name to unregister.
1081  *
1082  * Returns:
1083  *  true if the name is registered, false if not.
1084  */
1085 bool unregister(string name)
1086 {
1087     import std.algorithm.mutation : remove, SwapStrategy;
1088     import std.algorithm.searching : countUntil;
1089 
1090     synchronized (registryLock)
1091     {
1092         if (auto tid = name in tidByName)
1093         {
1094             auto allNames = *tid in namesByTid;
1095             auto pos = countUntil(*allNames, name);
1096             remove!(SwapStrategy.unstable)(*allNames, pos);
1097             tidByName.remove(name);
1098             return true;
1099         }
1100         return false;
1101     }
1102 }
1103 
1104 /**
1105  * Gets the Tid associated with name.
1106  *
1107  * Params:
1108  *  name = The name to locate within the registry.
1109  *
1110  * Returns:
1111  *  The associated Tid or Tid.init if name is not registered.
1112  */
1113 Tid locate(string name)
1114 {
1115     synchronized (registryLock)
1116     {
1117         if (auto tid = name in tidByName)
1118             return *tid;
1119         return Tid.init;
1120     }
1121 }
1122 
1123 /**
1124  * Encapsulates all implementation-level data needed for scheduling.
1125  *
1126  * When defining a Scheduler, an instance of this struct must be associated
1127  * with each logical thread.  It contains all implementation-level information
1128  * needed by the internal API.
1129  */
1130 struct ThreadInfo
1131 {
1132     Tid ident;
1133     bool[Tid] links;
1134     Tid owner;
1135 
1136     /**
1137      * Gets a thread-local instance of ThreadInfo.
1138      *
1139      * Gets a thread-local instance of ThreadInfo, which should be used as the
1140      * default instance when info is requested for a thread not created by the
1141      * Scheduler.
1142      */
1143     static @property ref thisInfo() nothrow
1144     {
1145         static ThreadInfo val;
1146         return val;
1147     }
1148 
1149     /**
1150      * Cleans up this ThreadInfo.
1151      *
1152      * This must be called when a scheduled thread terminates.  It tears down
1153      * the messaging system for the thread and notifies interested parties of
1154      * the thread's termination.
1155      */
1156     void cleanup()
1157     {
1158         if (ident.mbox !is null)
1159             ident.mbox.close();
1160         foreach (tid; links.keys)
1161             _send(MsgType.linkDead, tid, ident);
1162         if (owner != Tid.init)
1163             _send(MsgType.linkDead, owner, ident);
1164         unregisterMe(this); // clean up registry entries
1165     }
1166 
1167     // https://issues.dlang.org/show_bug.cgi?id=20160
1168     @system unittest
1169     {
1170         register("main_thread", thisTid());
1171 
1172         ThreadInfo t;
1173         t.cleanup();
1174 
1175         assert(locate("main_thread") == thisTid());
1176     }
1177 }
1178 
1179 /**
1180  * A Scheduler controls how threading is performed by spawn.
1181  *
1182  * Implementing a Scheduler allows the concurrency mechanism used by this
1183  * module to be customized according to different needs.  By default, a call
1184  * to spawn will create a new kernel thread that executes the supplied routine
1185  * and terminates when finished.  But it is possible to create Schedulers that
1186  * reuse threads, that multiplex Fibers (coroutines) across a single thread,
1187  * or any number of other approaches.  By making the choice of Scheduler a
1188  * user-level option, std.concurrency may be used for far more types of
1189  * application than if this behavior were predefined.
1190  *
1191  * Example:
1192  * ---
1193  * import std.concurrency;
1194  * import std.stdio;
1195  *
1196  * void main()
1197  * {
1198  *     scheduler = new FiberScheduler;
1199  *     scheduler.start(
1200  *     {
1201  *         writeln("the rest of main goes here");
1202  *     });
1203  * }
1204  * ---
1205  *
1206  * Some schedulers have a dispatching loop that must run if they are to work
1207  * properly, so for the sake of consistency, when using a scheduler, start()
1208  * must be called within main().  This yields control to the scheduler and
1209  * will ensure that any spawned threads are executed in an expected manner.
1210  */
1211 interface Scheduler
1212 {
1213     /**
1214      * Spawns the supplied op and starts the Scheduler.
1215      *
1216      * This is intended to be called at the start of the program to yield all
1217      * scheduling to the active Scheduler instance.  This is necessary for
1218      * schedulers that explicitly dispatch threads rather than simply relying
1219      * on the operating system to do so, and so start should always be called
1220      * within main() to begin normal program execution.
1221      *
1222      * Params:
1223      *  op = A wrapper for whatever the main thread would have done in the
1224      *       absence of a custom scheduler.  It will be automatically executed
1225      *       via a call to spawn by the Scheduler.
1226      */
1227     void start(void delegate() op);
1228 
1229     /**
1230      * Assigns a logical thread to execute the supplied op.
1231      *
1232      * This routine is called by spawn.  It is expected to instantiate a new
1233      * logical thread and run the supplied operation.  This thread must call
1234      * thisInfo.cleanup() when the thread terminates if the scheduled thread
1235      * is not a kernel thread--all kernel threads will have their ThreadInfo
1236      * cleaned up automatically by a thread-local destructor.
1237      *
1238      * Params:
1239      *  op = The function to execute.  This may be the actual function passed
1240      *       by the user to spawn itself, or may be a wrapper function.
1241      */
1242     void spawn(void delegate() op);
1243 
1244     /**
1245      * Yields execution to another logical thread.
1246      *
1247      * This routine is called at various points within concurrency-aware APIs
1248      * to provide a scheduler a chance to yield execution when using some sort
1249      * of cooperative multithreading model.  If this is not appropriate, such
1250      * as when each logical thread is backed by a dedicated kernel thread,
1251      * this routine may be a no-op.
1252      */
1253     void yield() nothrow;
1254 
1255     /**
1256      * Returns an appropriate ThreadInfo instance.
1257      *
1258      * Returns an instance of ThreadInfo specific to the logical thread that
1259      * is calling this routine or, if the calling thread was not create by
1260      * this scheduler, returns ThreadInfo.thisInfo instead.
1261      */
1262     @property ref ThreadInfo thisInfo() nothrow;
1263 
1264     /**
1265      * Creates a Condition variable analog for signaling.
1266      *
1267      * Creates a new Condition variable analog which is used to check for and
1268      * to signal the addition of messages to a thread's message queue.  Like
1269      * yield, some schedulers may need to define custom behavior so that calls
1270      * to Condition.wait() yield to another thread when no new messages are
1271      * available instead of blocking.
1272      *
1273      * Params:
1274      *  m = The Mutex that will be associated with this condition.  It will be
1275      *      locked prior to any operation on the condition, and so in some
1276      *      cases a Scheduler may need to hold this reference and unlock the
1277      *      mutex before yielding execution to another logical thread.
1278      */
1279     Condition newCondition(Mutex m) nothrow;
1280 }
1281 
1282 /**
1283  * An example Scheduler using kernel threads.
1284  *
1285  * This is an example Scheduler that mirrors the default scheduling behavior
1286  * of creating one kernel thread per call to spawn.  It is fully functional
1287  * and may be instantiated and used, but is not a necessary part of the
1288  * default functioning of this module.
1289  */
1290 class ThreadScheduler : Scheduler
1291 {
1292     /**
1293      * This simply runs op directly, since no real scheduling is needed by
1294      * this approach.
1295      */
1296     void start(void delegate() op)
1297     {
1298         op();
1299     }
1300 
1301     /**
1302      * Creates a new kernel thread and assigns it to run the supplied op.
1303      */
1304     void spawn(void delegate() op)
1305     {
1306         auto t = new Thread(op);
1307         t.start();
1308     }
1309 
1310     /**
1311      * This scheduler does no explicit multiplexing, so this is a no-op.
1312      */
1313     void yield() nothrow
1314     {
1315         // no explicit yield needed
1316     }
1317 
1318     /**
1319      * Returns ThreadInfo.thisInfo, since it is a thread-local instance of
1320      * ThreadInfo, which is the correct behavior for this scheduler.
1321      */
1322     @property ref ThreadInfo thisInfo() nothrow
1323     {
1324         return ThreadInfo.thisInfo;
1325     }
1326 
1327     /**
1328      * Creates a new Condition variable.  No custom behavior is needed here.
1329      */
1330     Condition newCondition(Mutex m) nothrow
1331     {
1332         return new Condition(m);
1333     }
1334 }
1335 
1336 /**
1337  * An example Scheduler using Fibers.
1338  *
1339  * This is an example scheduler that creates a new Fiber per call to spawn
1340  * and multiplexes the execution of all fibers within the main thread.
1341  */
1342 class FiberScheduler : Scheduler
1343 {
1344     /**
1345      * This creates a new Fiber for the supplied op and then starts the
1346      * dispatcher.
1347      */
1348     void start(void delegate() op)
1349     {
1350         create(op);
1351         dispatch();
1352     }
1353 
1354     /**
1355      * This created a new Fiber for the supplied op and adds it to the
1356      * dispatch list.
1357      */
1358     void spawn(void delegate() op) nothrow
1359     {
1360         create(op);
1361         yield();
1362     }
1363 
1364     /**
1365      * If the caller is a scheduled Fiber, this yields execution to another
1366      * scheduled Fiber.
1367      */
1368     void yield() nothrow
1369     {
1370         // NOTE: It's possible that we should test whether the calling Fiber
1371         //       is an InfoFiber before yielding, but I think it's reasonable
1372         //       that any (non-Generator) fiber should yield here.
1373         if (Fiber.getThis())
1374             Fiber.yield();
1375     }
1376 
1377     /**
1378      * Returns an appropriate ThreadInfo instance.
1379      *
1380      * Returns a ThreadInfo instance specific to the calling Fiber if the
1381      * Fiber was created by this dispatcher, otherwise it returns
1382      * ThreadInfo.thisInfo.
1383      */
1384     @property ref ThreadInfo thisInfo() nothrow
1385     {
1386         auto f = cast(InfoFiber) Fiber.getThis();
1387 
1388         if (f !is null)
1389             return f.info;
1390         return ThreadInfo.thisInfo;
1391     }
1392 
1393     /**
1394      * Returns a Condition analog that yields when wait or notify is called.
1395      *
1396      * Bug:
1397      * For the default implementation, `notifyAll`will behave like `notify`.
1398      *
1399      * Params:
1400      *   m = A `Mutex` to use for locking if the condition needs to be waited on
1401      *       or notified from multiple `Thread`s.
1402      *       If `null`, no `Mutex` will be used and it is assumed that the
1403      *       `Condition` is only waited on/notified from one `Thread`.
1404      */
1405     Condition newCondition(Mutex m) nothrow
1406     {
1407         return new FiberCondition(m);
1408     }
1409 
1410 protected:
1411     /**
1412      * Creates a new Fiber which calls the given delegate.
1413      *
1414      * Params:
1415      *   op = The delegate the fiber should call
1416      */
1417     void create(void delegate() op) nothrow
1418     {
1419         void wrap()
1420         {
1421             scope (exit)
1422             {
1423                 thisInfo.cleanup();
1424             }
1425             op();
1426         }
1427 
1428         m_fibers ~= new InfoFiber(&wrap);
1429     }
1430 
1431     /**
1432      * Fiber which embeds a ThreadInfo
1433      */
1434     static class InfoFiber : Fiber
1435     {
1436         ThreadInfo info;
1437 
1438         this(void delegate() op) nothrow
1439         {
1440             super(op);
1441         }
1442 
1443         this(void delegate() op, size_t sz) nothrow
1444         {
1445             super(op, sz);
1446         }
1447     }
1448 
1449 private:
1450     class FiberCondition : Condition
1451     {
1452         this(Mutex m) nothrow
1453         {
1454             super(m);
1455             notified = false;
1456         }
1457 
1458         override void wait() nothrow
1459         {
1460             scope (exit) notified = false;
1461 
1462             while (!notified)
1463                 switchContext();
1464         }
1465 
1466         override bool wait(Duration period) nothrow
1467         {
1468             import core.time : MonoTime;
1469 
1470             scope (exit) notified = false;
1471 
1472             for (auto limit = MonoTime.currTime + period;
1473                  !notified && !period.isNegative;
1474                  period = limit - MonoTime.currTime)
1475             {
1476                 this.outer.yield();
1477             }
1478             return notified;
1479         }
1480 
1481         override void notify() nothrow
1482         {
1483             notified = true;
1484             switchContext();
1485         }
1486 
1487         override void notifyAll() nothrow
1488         {
1489             notified = true;
1490             switchContext();
1491         }
1492 
1493     private:
1494         void switchContext() nothrow
1495         {
1496             if (mutex_nothrow) mutex_nothrow.unlock_nothrow();
1497             scope (exit)
1498                 if (mutex_nothrow)
1499                     mutex_nothrow.lock_nothrow();
1500             this.outer.yield();
1501         }
1502 
1503         private bool notified;
1504     }
1505 
1506 private:
1507     void dispatch()
1508     {
1509         import std.algorithm.mutation : remove;
1510 
1511         while (m_fibers.length > 0)
1512         {
1513             auto t = m_fibers[m_pos].call(Fiber.Rethrow.no);
1514             if (t !is null && !(cast(OwnerTerminated) t))
1515             {
1516                 throw t;
1517             }
1518             if (m_fibers[m_pos].state == Fiber.State.TERM)
1519             {
1520                 if (m_pos >= (m_fibers = remove(m_fibers, m_pos)).length)
1521                     m_pos = 0;
1522             }
1523             else if (m_pos++ >= m_fibers.length - 1)
1524             {
1525                 m_pos = 0;
1526             }
1527         }
1528     }
1529 
1530 private:
1531     Fiber[] m_fibers;
1532     size_t m_pos;
1533 }
1534 
1535 @system unittest
1536 {
1537     static void receive(Condition cond, ref size_t received)
1538     {
1539         while (true)
1540         {
1541             synchronized (cond.mutex)
1542             {
1543                 cond.wait();
1544                 ++received;
1545             }
1546         }
1547     }
1548 
1549     static void send(Condition cond, ref size_t sent)
1550     {
1551         while (true)
1552         {
1553             synchronized (cond.mutex)
1554             {
1555                 ++sent;
1556                 cond.notify();
1557             }
1558         }
1559     }
1560 
1561     auto fs = new FiberScheduler;
1562     auto mtx = new Mutex;
1563     auto cond = fs.newCondition(mtx);
1564 
1565     size_t received, sent;
1566     auto waiter = new Fiber({ receive(cond, received); }), notifier = new Fiber({ send(cond, sent); });
1567     waiter.call();
1568     assert(received == 0);
1569     notifier.call();
1570     assert(sent == 1);
1571     assert(received == 0);
1572     waiter.call();
1573     assert(received == 1);
1574     waiter.call();
1575     assert(received == 1);
1576 }
1577 
1578 /**
1579  * Sets the Scheduler behavior within the program.
1580  *
1581  * This variable sets the Scheduler behavior within this program.  Typically,
1582  * when setting a Scheduler, scheduler.start() should be called in main.  This
1583  * routine will not return until program execution is complete.
1584  */
1585 __gshared Scheduler scheduler;
1586 
1587 // Generator
1588 
1589 /**
1590  * If the caller is a Fiber and is not a Generator, this function will call
1591  * scheduler.yield() or Fiber.yield(), as appropriate.
1592  */
1593 void yield() nothrow
1594 {
1595     auto fiber = Fiber.getThis();
1596     if (!(cast(IsGenerator) fiber))
1597     {
1598         if (scheduler is null)
1599         {
1600             if (fiber)
1601                 return Fiber.yield();
1602         }
1603         else
1604             scheduler.yield();
1605     }
1606 }
1607 
1608 /// Used to determine whether a Generator is running.
1609 private interface IsGenerator {}
1610 
1611 
1612 /**
1613  * A Generator is a Fiber that periodically returns values of type T to the
1614  * caller via yield.  This is represented as an InputRange.
1615  */
1616 class Generator(T) :
1617     Fiber, IsGenerator, InputRange!T
1618 {
1619     /**
1620      * Initializes a generator object which is associated with a static
1621      * D function.  The function will be called once to prepare the range
1622      * for iteration.
1623      *
1624      * Params:
1625      *  fn = The fiber function.
1626      *
1627      * In:
1628      *  fn must not be null.
1629      */
1630     this(void function() fn)
1631     {
1632         super(fn);
1633         call();
1634     }
1635 
1636     /**
1637      * Initializes a generator object which is associated with a static
1638      * D function.  The function will be called once to prepare the range
1639      * for iteration.
1640      *
1641      * Params:
1642      *  fn = The fiber function.
1643      *  sz = The stack size for this fiber.
1644      *
1645      * In:
1646      *  fn must not be null.
1647      */
1648     this(void function() fn, size_t sz)
1649     {
1650         super(fn, sz);
1651         call();
1652     }
1653 
1654     /**
1655      * Initializes a generator object which is associated with a static
1656      * D function.  The function will be called once to prepare the range
1657      * for iteration.
1658      *
1659      * Params:
1660      *  fn = The fiber function.
1661      *  sz = The stack size for this fiber.
1662      *  guardPageSize = size of the guard page to trap fiber's stack
1663      *                  overflows. Refer to $(REF Fiber, core,thread)'s
1664      *                  documentation for more details.
1665      *
1666      * In:
1667      *  fn must not be null.
1668      */
1669     this(void function() fn, size_t sz, size_t guardPageSize)
1670     {
1671         super(fn, sz, guardPageSize);
1672         call();
1673     }
1674 
1675     /**
1676      * Initializes a generator object which is associated with a dynamic
1677      * D function.  The function will be called once to prepare the range
1678      * for iteration.
1679      *
1680      * Params:
1681      *  dg = The fiber function.
1682      *
1683      * In:
1684      *  dg must not be null.
1685      */
1686     this(void delegate() dg)
1687     {
1688         super(dg);
1689         call();
1690     }
1691 
1692     /**
1693      * Initializes a generator object which is associated with a dynamic
1694      * D function.  The function will be called once to prepare the range
1695      * for iteration.
1696      *
1697      * Params:
1698      *  dg = The fiber function.
1699      *  sz = The stack size for this fiber.
1700      *
1701      * In:
1702      *  dg must not be null.
1703      */
1704     this(void delegate() dg, size_t sz)
1705     {
1706         super(dg, sz);
1707         call();
1708     }
1709 
1710     /**
1711      * Initializes a generator object which is associated with a dynamic
1712      * D function.  The function will be called once to prepare the range
1713      * for iteration.
1714      *
1715      * Params:
1716      *  dg = The fiber function.
1717      *  sz = The stack size for this fiber.
1718      *  guardPageSize = size of the guard page to trap fiber's stack
1719      *                  overflows. Refer to $(REF Fiber, core,thread)'s
1720      *                  documentation for more details.
1721      *
1722      * In:
1723      *  dg must not be null.
1724      */
1725     this(void delegate() dg, size_t sz, size_t guardPageSize)
1726     {
1727         super(dg, sz, guardPageSize);
1728         call();
1729     }
1730 
1731     /**
1732      * Returns true if the generator is empty.
1733      */
1734     final bool empty() @property
1735     {
1736         return m_value is null || state == State.TERM;
1737     }
1738 
1739     /**
1740      * Obtains the next value from the underlying function.
1741      */
1742     final void popFront()
1743     {
1744         call();
1745     }
1746 
1747     /**
1748      * Returns the most recently generated value by shallow copy.
1749      */
1750     final T front() @property
1751     {
1752         return *m_value;
1753     }
1754 
1755     /**
1756      * Returns the most recently generated value without executing a
1757      * copy contructor. Will not compile for element types defining a
1758      * postblit, because Generator does not return by reference.
1759      */
1760     final T moveFront()
1761     {
1762         static if (!hasElaborateCopyConstructor!T)
1763         {
1764             return front;
1765         }
1766         else
1767         {
1768             static assert(0,
1769                     "Fiber front is always rvalue and thus cannot be moved since it defines a postblit.");
1770         }
1771     }
1772 
1773     final int opApply(scope int delegate(T) loopBody)
1774     {
1775         int broken;
1776         for (; !empty; popFront())
1777         {
1778             broken = loopBody(front);
1779             if (broken) break;
1780         }
1781         return broken;
1782     }
1783 
1784     final int opApply(scope int delegate(size_t, T) loopBody)
1785     {
1786         int broken;
1787         for (size_t i; !empty; ++i, popFront())
1788         {
1789             broken = loopBody(i, front);
1790             if (broken) break;
1791         }
1792         return broken;
1793     }
1794 private:
1795     T* m_value;
1796 }
1797 
1798 ///
1799 @system unittest
1800 {
1801     auto tid = spawn({
1802         int i;
1803         while (i < 9)
1804             i = receiveOnly!int;
1805 
1806         ownerTid.send(i * 2);
1807     });
1808 
1809     auto r = new Generator!int({
1810         foreach (i; 1 .. 10)
1811             yield(i);
1812     });
1813 
1814     foreach (e; r)
1815         tid.send(e);
1816 
1817     assert(receiveOnly!int == 18);
1818 }
1819 
1820 /**
1821  * Yields a value of type T to the caller of the currently executing
1822  * generator.
1823  *
1824  * Params:
1825  *  value = The value to yield.
1826  */
1827 void yield(T)(ref T value)
1828 {
1829     Generator!T cur = cast(Generator!T) Fiber.getThis();
1830     if (cur !is null && cur.state == Fiber.State.EXEC)
1831     {
1832         cur.m_value = &value;
1833         return Fiber.yield();
1834     }
1835     throw new Exception("yield(T) called with no active generator for the supplied type");
1836 }
1837 
1838 /// ditto
1839 void yield(T)(T value)
1840 {
1841     yield(value);
1842 }
1843 
1844 @system unittest
1845 {
1846     import core.exception;
1847     import std.exception;
1848 
1849     static void testScheduler(Scheduler s)
1850     {
1851         scheduler = s;
1852         scheduler.start({
1853             auto tid = spawn({
1854                 int i;
1855 
1856                 try
1857                 {
1858                     for (i = 1; i < 10; i++)
1859                     {
1860                         assertNotThrown!AssertError(assert(receiveOnly!int() == i));
1861                     }
1862                 }
1863                 catch (OwnerTerminated e)
1864                 {
1865 
1866                 }
1867 
1868                 // i will advance 1 past the last value expected
1869                 assert(i == 4);
1870             });
1871 
1872             auto r = new Generator!int({
1873                 assertThrown!Exception(yield(2.0));
1874                 yield(); // ensure this is a no-op
1875                 yield(1);
1876                 yield(); // also once something has been yielded
1877                 yield(2);
1878                 yield(3);
1879             });
1880 
1881             foreach (e; r)
1882             {
1883                 tid.send(e);
1884             }
1885         });
1886         scheduler = null;
1887     }
1888 
1889     testScheduler(new ThreadScheduler);
1890     testScheduler(new FiberScheduler);
1891 }
1892 ///
1893 @system unittest
1894 {
1895     import std.range;
1896 
1897     InputRange!int myIota = iota(10).inputRangeObject;
1898 
1899     myIota.popFront();
1900     myIota.popFront();
1901     assert(myIota.moveFront == 2);
1902     assert(myIota.front == 2);
1903     myIota.popFront();
1904     assert(myIota.front == 3);
1905 
1906     //can be assigned to std.range.interfaces.InputRange directly
1907     myIota = new Generator!int(
1908     {
1909         foreach (i; 0 .. 10) yield(i);
1910     });
1911 
1912     myIota.popFront();
1913     myIota.popFront();
1914     assert(myIota.moveFront == 2);
1915     assert(myIota.front == 2);
1916     myIota.popFront();
1917     assert(myIota.front == 3);
1918 
1919     size_t[2] counter = [0, 0];
1920     foreach (i, unused; myIota) counter[] += [1, i];
1921 
1922     assert(myIota.empty);
1923     assert(counter == [7, 21]);
1924 }
1925 
1926 private
1927 {
1928     /*
1929      * A MessageBox is a message queue for one thread.  Other threads may send
1930      * messages to this owner by calling put(), and the owner receives them by
1931      * calling get().  The put() call is therefore effectively shared and the
1932      * get() call is effectively local.  setMaxMsgs may be used by any thread
1933      * to limit the size of the message queue.
1934      */
1935     class MessageBox
1936     {
1937         this() @trusted nothrow /* TODO: make @safe after relevant druntime PR gets merged */
1938         {
1939             m_lock = new Mutex;
1940             m_closed = false;
1941 
1942             if (scheduler is null)
1943             {
1944                 m_putMsg = new Condition(m_lock);
1945                 m_notFull = new Condition(m_lock);
1946             }
1947             else
1948             {
1949                 m_putMsg = scheduler.newCondition(m_lock);
1950                 m_notFull = scheduler.newCondition(m_lock);
1951             }
1952         }
1953 
1954         ///
1955         final @property bool isClosed() @safe @nogc pure
1956         {
1957             synchronized (m_lock)
1958             {
1959                 return m_closed;
1960             }
1961         }
1962 
1963         /*
1964          * Sets a limit on the maximum number of user messages allowed in the
1965          * mailbox.  If this limit is reached, the caller attempting to add
1966          * a new message will execute call.  If num is zero, there is no limit
1967          * on the message queue.
1968          *
1969          * Params:
1970          *  num  = The maximum size of the queue or zero if the queue is
1971          *         unbounded.
1972          *  call = The routine to call when the queue is full.
1973          */
1974         final void setMaxMsgs(size_t num, bool function(Tid) call) @safe @nogc pure
1975         {
1976             synchronized (m_lock)
1977             {
1978                 m_maxMsgs = num;
1979                 m_onMaxMsgs = call;
1980             }
1981         }
1982 
1983         /*
1984          * If maxMsgs is not set, the message is added to the queue and the
1985          * owner is notified.  If the queue is full, the message will still be
1986          * accepted if it is a control message, otherwise onCrowdingDoThis is
1987          * called.  If the routine returns true, this call will block until
1988          * the owner has made space available in the queue.  If it returns
1989          * false, this call will abort.
1990          *
1991          * Params:
1992          *  msg = The message to put in the queue.
1993          *
1994          * Throws:
1995          *  An exception if the queue is full and onCrowdingDoThis throws.
1996          */
1997         final void put(ref Message msg)
1998         {
1999             synchronized (m_lock)
2000             {
2001                 // TODO: Generate an error here if m_closed is true, or maybe
2002                 //       put a message in the caller's queue?
2003                 if (!m_closed)
2004                 {
2005                     while (true)
2006                     {
2007                         if (isPriorityMsg(msg))
2008                         {
2009                             m_sharedPty.put(msg);
2010                             m_putMsg.notify();
2011                             return;
2012                         }
2013                         if (!mboxFull() || isControlMsg(msg))
2014                         {
2015                             m_sharedBox.put(msg);
2016                             m_putMsg.notify();
2017                             return;
2018                         }
2019                         if (m_onMaxMsgs !is null && !m_onMaxMsgs(thisTid))
2020                         {
2021                             return;
2022                         }
2023                         m_putQueue++;
2024                         m_notFull.wait();
2025                         m_putQueue--;
2026                     }
2027                 }
2028             }
2029         }
2030 
2031         /*
2032          * Matches ops against each message in turn until a match is found.
2033          *
2034          * Params:
2035          *  ops = The operations to match.  Each may return a bool to indicate
2036          *        whether a message with a matching type is truly a match.
2037          *
2038          * Returns:
2039          *  true if a message was retrieved and false if not (such as if a
2040          *  timeout occurred).
2041          *
2042          * Throws:
2043          *  LinkTerminated if a linked thread terminated, or OwnerTerminated
2044          * if the owner thread terminates and no existing messages match the
2045          * supplied ops.
2046          */
2047         bool get(T...)(scope T vals)
2048         {
2049             import std.meta : AliasSeq;
2050 
2051             static assert(T.length, "T must not be empty");
2052 
2053             static if (isImplicitlyConvertible!(T[0], Duration))
2054             {
2055                 alias Ops = AliasSeq!(T[1 .. $]);
2056                 alias ops = vals[1 .. $];
2057                 enum timedWait = true;
2058                 Duration period = vals[0];
2059             }
2060             else
2061             {
2062                 alias Ops = AliasSeq!(T);
2063                 alias ops = vals[0 .. $];
2064                 enum timedWait = false;
2065             }
2066 
2067             bool onStandardMsg(ref Message msg)
2068             {
2069                 foreach (i, t; Ops)
2070                 {
2071                     alias Args = Parameters!(t);
2072                     auto op = ops[i];
2073 
2074                     if (msg.convertsTo!(Args))
2075                     {
2076                         static if (is(ReturnType!(t) == bool))
2077                         {
2078                             return msg.map(op);
2079                         }
2080                         else
2081                         {
2082                             msg.map(op);
2083                             return true;
2084                         }
2085                     }
2086                 }
2087                 return false;
2088             }
2089 
2090             bool onLinkDeadMsg(ref Message msg)
2091             {
2092                 assert(msg.convertsTo!(Tid),
2093                         "Message could be converted to Tid");
2094                 auto tid = msg.get!(Tid);
2095 
2096                 if (bool* pDepends = tid in thisInfo.links)
2097                 {
2098                     auto depends = *pDepends;
2099                     thisInfo.links.remove(tid);
2100                     // Give the owner relationship precedence.
2101                     if (depends && tid != thisInfo.owner)
2102                     {
2103                         auto e = new LinkTerminated(tid);
2104                         auto m = Message(MsgType.standard, e);
2105                         if (onStandardMsg(m))
2106                             return true;
2107                         throw e;
2108                     }
2109                 }
2110                 if (tid == thisInfo.owner)
2111                 {
2112                     thisInfo.owner = Tid.init;
2113                     auto e = new OwnerTerminated(tid);
2114                     auto m = Message(MsgType.standard, e);
2115                     if (onStandardMsg(m))
2116                         return true;
2117                     throw e;
2118                 }
2119                 return false;
2120             }
2121 
2122             bool onControlMsg(ref Message msg)
2123             {
2124                 switch (msg.type)
2125                 {
2126                 case MsgType.linkDead:
2127                     return onLinkDeadMsg(msg);
2128                 default:
2129                     return false;
2130                 }
2131             }
2132 
2133             bool scan(ref ListT list)
2134             {
2135                 for (auto range = list[]; !range.empty;)
2136                 {
2137                     // Only the message handler will throw, so if this occurs
2138                     // we can be certain that the message was handled.
2139                     scope (failure)
2140                         list.removeAt(range);
2141 
2142                     if (isControlMsg(range.front))
2143                     {
2144                         if (onControlMsg(range.front))
2145                         {
2146                             // Although the linkDead message is a control message,
2147                             // it can be handled by the user.  Since the linkDead
2148                             // message throws if not handled, if we get here then
2149                             // it has been handled and we can return from receive.
2150                             // This is a weird special case that will have to be
2151                             // handled in a more general way if more are added.
2152                             if (!isLinkDeadMsg(range.front))
2153                             {
2154                                 list.removeAt(range);
2155                                 continue;
2156                             }
2157                             list.removeAt(range);
2158                             return true;
2159                         }
2160                         range.popFront();
2161                         continue;
2162                     }
2163                     else
2164                     {
2165                         if (onStandardMsg(range.front))
2166                         {
2167                             list.removeAt(range);
2168                             return true;
2169                         }
2170                         range.popFront();
2171                         continue;
2172                     }
2173                 }
2174                 return false;
2175             }
2176 
2177             bool pty(ref ListT list)
2178             {
2179                 if (!list.empty)
2180                 {
2181                     auto range = list[];
2182 
2183                     if (onStandardMsg(range.front))
2184                     {
2185                         list.removeAt(range);
2186                         return true;
2187                     }
2188                     if (range.front.convertsTo!(Throwable))
2189                         throw range.front.get!(Throwable);
2190                     else if (range.front.convertsTo!(shared(Throwable)))
2191                         throw range.front.get!(shared(Throwable));
2192                     else
2193                         throw new PriorityMessageException(range.front.data);
2194                 }
2195                 return false;
2196             }
2197 
2198             static if (timedWait)
2199             {
2200                 import core.time : MonoTime;
2201                 auto limit = MonoTime.currTime + period;
2202             }
2203 
2204             while (true)
2205             {
2206                 ListT arrived;
2207 
2208                 if (pty(m_localPty) || scan(m_localBox))
2209                 {
2210                     return true;
2211                 }
2212                 yield();
2213                 synchronized (m_lock)
2214                 {
2215                     updateMsgCount();
2216                     while (m_sharedPty.empty && m_sharedBox.empty)
2217                     {
2218                         // NOTE: We're notifying all waiters here instead of just
2219                         //       a few because the onCrowding behavior may have
2220                         //       changed and we don't want to block sender threads
2221                         //       unnecessarily if the new behavior is not to block.
2222                         //       This will admittedly result in spurious wakeups
2223                         //       in other situations, but what can you do?
2224                         if (m_putQueue && !mboxFull())
2225                             m_notFull.notifyAll();
2226                         static if (timedWait)
2227                         {
2228                             if (period <= Duration.zero || !m_putMsg.wait(period))
2229                                 return false;
2230                         }
2231                         else
2232                         {
2233                             m_putMsg.wait();
2234                         }
2235                     }
2236                     m_localPty.put(m_sharedPty);
2237                     arrived.put(m_sharedBox);
2238                 }
2239                 if (m_localPty.empty)
2240                 {
2241                     scope (exit) m_localBox.put(arrived);
2242                     if (scan(arrived))
2243                     {
2244                         return true;
2245                     }
2246                     else
2247                     {
2248                         static if (timedWait)
2249                         {
2250                             period = limit - MonoTime.currTime;
2251                         }
2252                         continue;
2253                     }
2254                 }
2255                 m_localBox.put(arrived);
2256                 pty(m_localPty);
2257                 return true;
2258             }
2259         }
2260 
2261         /*
2262          * Called on thread termination.  This routine processes any remaining
2263          * control messages, clears out message queues, and sets a flag to
2264          * reject any future messages.
2265          */
2266         final void close()
2267         {
2268             static void onLinkDeadMsg(ref Message msg)
2269             {
2270                 assert(msg.convertsTo!(Tid),
2271                         "Message could be converted to Tid");
2272                 auto tid = msg.get!(Tid);
2273 
2274                 thisInfo.links.remove(tid);
2275                 if (tid == thisInfo.owner)
2276                     thisInfo.owner = Tid.init;
2277             }
2278 
2279             static void sweep(ref ListT list)
2280             {
2281                 for (auto range = list[]; !range.empty; range.popFront())
2282                 {
2283                     if (range.front.type == MsgType.linkDead)
2284                         onLinkDeadMsg(range.front);
2285                 }
2286             }
2287 
2288             ListT arrived;
2289 
2290             sweep(m_localBox);
2291             synchronized (m_lock)
2292             {
2293                 arrived.put(m_sharedBox);
2294                 m_closed = true;
2295             }
2296             m_localBox.clear();
2297             sweep(arrived);
2298         }
2299 
2300     private:
2301         // Routines involving local data only, no lock needed.
2302 
2303         bool mboxFull() @safe @nogc pure nothrow
2304         {
2305             return m_maxMsgs && m_maxMsgs <= m_localMsgs + m_sharedBox.length;
2306         }
2307 
2308         void updateMsgCount() @safe @nogc pure nothrow
2309         {
2310             m_localMsgs = m_localBox.length;
2311         }
2312 
2313         bool isControlMsg(ref Message msg) @safe @nogc pure nothrow
2314         {
2315             return msg.type != MsgType.standard && msg.type != MsgType.priority;
2316         }
2317 
2318         bool isPriorityMsg(ref Message msg) @safe @nogc pure nothrow
2319         {
2320             return msg.type == MsgType.priority;
2321         }
2322 
2323         bool isLinkDeadMsg(ref Message msg) @safe @nogc pure nothrow
2324         {
2325             return msg.type == MsgType.linkDead;
2326         }
2327 
2328         alias OnMaxFn = bool function(Tid);
2329         alias ListT = List!(Message);
2330 
2331         ListT m_localBox;
2332         ListT m_localPty;
2333 
2334         Mutex m_lock;
2335         Condition m_putMsg;
2336         Condition m_notFull;
2337         size_t m_putQueue;
2338         ListT m_sharedBox;
2339         ListT m_sharedPty;
2340         OnMaxFn m_onMaxMsgs;
2341         size_t m_localMsgs;
2342         size_t m_maxMsgs;
2343         bool m_closed;
2344     }
2345 
2346     /*
2347      *
2348      */
2349     struct List(T)
2350     {
2351         struct Range
2352         {
2353             import std.exception : enforce;
2354 
2355             @property bool empty() const
2356             {
2357                 return !m_prev.next;
2358             }
2359 
2360             @property ref T front()
2361             {
2362                 enforce(m_prev.next, "invalid list node");
2363                 return m_prev.next.val;
2364             }
2365 
2366             @property void front(T val)
2367             {
2368                 enforce(m_prev.next, "invalid list node");
2369                 m_prev.next.val = val;
2370             }
2371 
2372             void popFront()
2373             {
2374                 enforce(m_prev.next, "invalid list node");
2375                 m_prev = m_prev.next;
2376             }
2377 
2378             private this(Node* p)
2379             {
2380                 m_prev = p;
2381             }
2382 
2383             private Node* m_prev;
2384         }
2385 
2386         void put(T val)
2387         {
2388             put(newNode(val));
2389         }
2390 
2391         void put(ref List!(T) rhs)
2392         {
2393             if (!rhs.empty)
2394             {
2395                 put(rhs.m_first);
2396                 while (m_last.next !is null)
2397                 {
2398                     m_last = m_last.next;
2399                     m_count++;
2400                 }
2401                 rhs.m_first = null;
2402                 rhs.m_last = null;
2403                 rhs.m_count = 0;
2404             }
2405         }
2406 
2407         Range opSlice()
2408         {
2409             return Range(cast(Node*)&m_first);
2410         }
2411 
2412         void removeAt(Range r)
2413         {
2414             import std.exception : enforce;
2415 
2416             assert(m_count, "Can not remove from empty Range");
2417             Node* n = r.m_prev;
2418             enforce(n && n.next, "attempting to remove invalid list node");
2419 
2420             if (m_last is m_first)
2421                 m_last = null;
2422             else if (m_last is n.next)
2423                 m_last = n; // nocoverage
2424             Node* to_free = n.next;
2425             n.next = n.next.next;
2426             freeNode(to_free);
2427             m_count--;
2428         }
2429 
2430         @property size_t length()
2431         {
2432             return m_count;
2433         }
2434 
2435         void clear()
2436         {
2437             m_first = m_last = null;
2438             m_count = 0;
2439         }
2440 
2441         @property bool empty()
2442         {
2443             return m_first is null;
2444         }
2445 
2446     private:
2447         struct Node
2448         {
2449             Node* next;
2450             T val;
2451 
2452             this(T v)
2453             {
2454                 val = v;
2455             }
2456         }
2457 
2458         static shared struct SpinLock
2459         {
2460             void lock() { while (!cas(&locked, false, true)) { Thread.yield(); } }
2461             void unlock() { atomicStore!(MemoryOrder.rel)(locked, false); }
2462             bool locked;
2463         }
2464 
2465         static shared SpinLock sm_lock;
2466         static shared Node* sm_head;
2467 
2468         Node* newNode(T v)
2469         {
2470             Node* n;
2471             {
2472                 sm_lock.lock();
2473                 scope (exit) sm_lock.unlock();
2474 
2475                 if (sm_head)
2476                 {
2477                     n = cast(Node*) sm_head;
2478                     sm_head = sm_head.next;
2479                 }
2480             }
2481             if (n)
2482             {
2483                 import std.conv : emplace;
2484                 emplace!Node(n, v);
2485             }
2486             else
2487             {
2488                 n = new Node(v);
2489             }
2490             return n;
2491         }
2492 
2493         void freeNode(Node* n)
2494         {
2495             // destroy val to free any owned GC memory
2496             destroy(n.val);
2497 
2498             sm_lock.lock();
2499             scope (exit) sm_lock.unlock();
2500 
2501             auto sn = cast(shared(Node)*) n;
2502             sn.next = sm_head;
2503             sm_head = sn;
2504         }
2505 
2506         void put(Node* n)
2507         {
2508             m_count++;
2509             if (!empty)
2510             {
2511                 m_last.next = n;
2512                 m_last = n;
2513                 return;
2514             }
2515             m_first = n;
2516             m_last = n;
2517         }
2518 
2519         Node* m_first;
2520         Node* m_last;
2521         size_t m_count;
2522     }
2523 }
2524 
2525 @system unittest
2526 {
2527     import std.typecons : tuple, Tuple;
2528 
2529     static void testfn(Tid tid)
2530     {
2531         receive((float val) { assert(0); }, (int val, int val2) {
2532             assert(val == 42 && val2 == 86);
2533         });
2534         receive((Tuple!(int, int) val) { assert(val[0] == 42 && val[1] == 86); });
2535         receive((Variant val) {  });
2536         receive((string val) {
2537             if ("the quick brown fox" != val)
2538                 return false;
2539             return true;
2540         }, (string val) { assert(false); });
2541         prioritySend(tid, "done");
2542     }
2543 
2544     static void runTest(Tid tid)
2545     {
2546         send(tid, 42, 86);
2547         send(tid, tuple(42, 86));
2548         send(tid, "hello", "there");
2549         send(tid, "the quick brown fox");
2550         receive((string val) { assert(val == "done"); });
2551     }
2552 
2553     static void simpleTest()
2554     {
2555         auto tid = spawn(&testfn, thisTid);
2556         runTest(tid);
2557 
2558         // Run the test again with a limited mailbox size.
2559         tid = spawn(&testfn, thisTid);
2560         setMaxMailboxSize(tid, 2, OnCrowding.block);
2561         runTest(tid);
2562     }
2563 
2564     simpleTest();
2565 
2566     scheduler = new ThreadScheduler;
2567     simpleTest();
2568     scheduler = null;
2569 }
2570 
2571 private @property shared(Mutex) initOnceLock()
2572 {
2573     static shared Mutex lock;
2574     if (auto mtx = atomicLoad!(MemoryOrder.acq)(lock))
2575         return mtx;
2576     auto mtx = new shared Mutex;
2577     if (cas(&lock, cast(shared) null, mtx))
2578         return mtx;
2579     return atomicLoad!(MemoryOrder.acq)(lock);
2580 }
2581 
2582 /**
2583  * Initializes $(D_PARAM var) with the lazy $(D_PARAM init) value in a
2584  * thread-safe manner.
2585  *
2586  * The implementation guarantees that all threads simultaneously calling
2587  * initOnce with the same $(D_PARAM var) argument block until $(D_PARAM var) is
2588  * fully initialized. All side-effects of $(D_PARAM init) are globally visible
2589  * afterwards.
2590  *
2591  * Params:
2592  *   var = The variable to initialize
2593  *   init = The lazy initializer value
2594  *
2595  * Returns:
2596  *   A reference to the initialized variable
2597  */
2598 auto ref initOnce(alias var)(lazy typeof(var) init)
2599 {
2600     return initOnce!var(init, initOnceLock);
2601 }
2602 
2603 /// A typical use-case is to perform lazy but thread-safe initialization.
2604 @system unittest
2605 {
2606     static class MySingleton
2607     {
2608         static MySingleton instance()
2609         {
2610             __gshared MySingleton inst;
2611             return initOnce!inst(new MySingleton);
2612         }
2613     }
2614 
2615     assert(MySingleton.instance !is null);
2616 }
2617 
2618 @system unittest
2619 {
2620     static class MySingleton
2621     {
2622         static MySingleton instance()
2623         {
2624             __gshared MySingleton inst;
2625             return initOnce!inst(new MySingleton);
2626         }
2627 
2628     private:
2629         this() { val = ++cnt; }
2630         size_t val;
2631         __gshared size_t cnt;
2632     }
2633 
2634     foreach (_; 0 .. 10)
2635         spawn({ ownerTid.send(MySingleton.instance.val); });
2636     foreach (_; 0 .. 10)
2637         assert(receiveOnly!size_t == MySingleton.instance.val);
2638     assert(MySingleton.cnt == 1);
2639 }
2640 
2641 /**
2642  * Same as above, but takes a separate mutex instead of sharing one among
2643  * all initOnce instances.
2644  *
2645  * This should be used to avoid dead-locks when the $(D_PARAM init)
2646  * expression waits for the result of another thread that might also
2647  * call initOnce. Use with care.
2648  *
2649  * Params:
2650  *   var = The variable to initialize
2651  *   init = The lazy initializer value
2652  *   mutex = A mutex to prevent race conditions
2653  *
2654  * Returns:
2655  *   A reference to the initialized variable
2656  */
2657 auto ref initOnce(alias var)(lazy typeof(var) init, shared Mutex mutex)
2658 {
2659     // check that var is global, can't take address of a TLS variable
2660     static assert(is(typeof({ __gshared p = &var; })),
2661         "var must be 'static shared' or '__gshared'.");
2662     import core.atomic : atomicLoad, MemoryOrder, atomicStore;
2663 
2664     static shared bool flag;
2665     if (!atomicLoad!(MemoryOrder.acq)(flag))
2666     {
2667         synchronized (mutex)
2668         {
2669             if (!atomicLoad!(MemoryOrder.raw)(flag))
2670             {
2671                 var = init;
2672                 atomicStore!(MemoryOrder.rel)(flag, true);
2673             }
2674         }
2675     }
2676     return var;
2677 }
2678 
2679 /// ditto
2680 auto ref initOnce(alias var)(lazy typeof(var) init, Mutex mutex)
2681 {
2682     return initOnce!var(init, cast(shared) mutex);
2683 }
2684 
2685 /// Use a separate mutex when init blocks on another thread that might also call initOnce.
2686 @system unittest
2687 {
2688     import core.sync.mutex : Mutex;
2689 
2690     static shared bool varA, varB;
2691     static shared Mutex m;
2692     m = new shared Mutex;
2693 
2694     spawn({
2695         // use a different mutex for varB to avoid a dead-lock
2696         initOnce!varB(true, m);
2697         ownerTid.send(true);
2698     });
2699     // init depends on the result of the spawned thread
2700     initOnce!varA(receiveOnly!bool);
2701     assert(varA == true);
2702     assert(varB == true);
2703 }
2704 
2705 @system unittest
2706 {
2707     static shared bool a;
2708     __gshared bool b;
2709     static bool c;
2710     bool d;
2711     initOnce!a(true);
2712     initOnce!b(true);
2713     static assert(!__traits(compiles, initOnce!c(true))); // TLS
2714     static assert(!__traits(compiles, initOnce!d(true))); // local variable
2715 }
2716 
2717 // test ability to send shared arrays
2718 @system unittest
2719 {
2720     static shared int[] x = new shared(int)[1];
2721     auto tid = spawn({
2722         auto arr = receiveOnly!(shared(int)[]);
2723         arr[0] = 5;
2724         ownerTid.send(true);
2725     });
2726     tid.send(x);
2727     receiveOnly!(bool);
2728     assert(x[0] == 5);
2729 }
Suggestion Box / Bug Report