1 /**
2 `std.parallelism` implements high-level primitives for SMP parallelism.
3 These include parallel foreach, parallel reduce, parallel eager map, pipelining
4 and future/promise parallelism.  `std.parallelism` is recommended when the
5 same operation is to be executed in parallel on different data, or when a
6 function is to be executed in a background thread and its result returned to a
7 well-defined main thread.  For communication between arbitrary threads, see
8 `std.concurrency`.
9 
10 `std.parallelism` is based on the concept of a `Task`.  A `Task` is an
11 object that represents the fundamental unit of work in this library and may be
12 executed in parallel with any other `Task`.  Using `Task`
13 directly allows programming with a future/promise paradigm.  All other
14 supported parallelism paradigms (parallel foreach, map, reduce, pipelining)
15 represent an additional level of abstraction over `Task`.  They
16 automatically create one or more `Task` objects, or closely related types
17 that are conceptually identical but not part of the public API.
18 
19 After creation, a `Task` may be executed in a new thread, or submitted
20 to a `TaskPool` for execution.  A `TaskPool` encapsulates a task queue
21 and its worker threads.  Its purpose is to efficiently map a large
22 number of `Task`s onto a smaller number of threads.  A task queue is a
23 FIFO queue of `Task` objects that have been submitted to the
24 `TaskPool` and are awaiting execution.  A worker thread is a thread that
25 is associated with exactly one task queue.  It executes the `Task` at the
26 front of its queue when the queue has work available, or sleeps when
27 no work is available.  Each task queue is associated with zero or
28 more worker threads.  If the result of a `Task` is needed before execution
29 by a worker thread has begun, the `Task` can be removed from the task queue
30 and executed immediately in the thread where the result is needed.
31 
32 Warning:  Unless marked as `@trusted` or `@safe`, artifacts in
33           this module allow implicit data sharing between threads and cannot
34           guarantee that client code is free from low level data races.
35 
36 Source:    $(PHOBOSSRC std/parallelism.d)
37 Author:  David Simcha
38 Copyright:  Copyright (c) 2009-2011, David Simcha.
39 License:    $(HTTP boost.org/LICENSE_1_0.txt, Boost License 1.0)
40 */
41 module std.parallelism;
42 
43 version (OSX)
44     version = Darwin;
45 else version (iOS)
46     version = Darwin;
47 else version (TVOS)
48     version = Darwin;
49 else version (WatchOS)
50     version = Darwin;
51 
52 ///
53 @system unittest
54 {
55     import std.algorithm.iteration : map;
56     import std.math : approxEqual;
57     import std.parallelism : taskPool;
58     import std.range : iota;
59 
60     // Parallel reduce can be combined with
61     // std.algorithm.iteration.map to interesting effect.
62     // The following example (thanks to Russel Winder)
63     // calculates pi by quadrature  using
64     // std.algorithm.map and TaskPool.reduce.
65     // getTerm is evaluated in parallel as needed by
66     // TaskPool.reduce.
67     //
68     // Timings on an Intel i5-3450 quad core machine
69     // for n = 1_000_000_000:
70     //
71     // TaskPool.reduce:       1.067 s
72     // std.algorithm.reduce:  4.011 s
73 
74     enum n = 1_000_000;
75     enum delta = 1.0 / n;
76 
77     alias getTerm = (int i)
78     {
79         immutable x = ( i - 0.5 ) * delta;
80         return delta / ( 1.0 + x * x ) ;
81     };
82 
83     immutable pi = 4.0 * taskPool.reduce!"a + b"(n.iota.map!getTerm);
84 
85     assert(pi.approxEqual(3.1415926));
86 }
87 
88 import core.atomic;
89 import core.memory;
90 import core.sync.condition;
91 import core.thread;
92 
93 import std.functional;
94 import std.meta;
95 import std.range.primitives;
96 import std.traits;
97 
98 version (Darwin)
99 {
100     version = useSysctlbyname;
101 }
102 else version (FreeBSD)
103 {
104     version = useSysctlbyname;
105 }
106 else version (DragonFlyBSD)
107 {
108     version = useSysctlbyname;
109 }
110 else version (NetBSD)
111 {
112     version = useSysctlbyname;
113 }
114 
115 /*
116 (For now public undocumented with reserved name.)
117 
118 A lazily initialized global constant. The underlying value is a shared global
119 statically initialized to `outOfBandValue` which must not be a legit value of
120 the constant. Upon the first call the situation is detected and the global is
121 initialized by calling `initializer`. The initializer is assumed to be pure
122 (even if not marked as such), i.e. return the same value upon repeated calls.
123 For that reason, no special precautions are taken so `initializer` may be called
124 more than one time leading to benign races on the cached value.
125 
126 In the quiescent state the cost of the function is an atomic load from a global.
127 
128 Params:
129     T = The type of the pseudo-constant (may be qualified)
130     outOfBandValue = A value that cannot be valid, it is used for initialization
131     initializer = The function performing initialization; must be `nothrow`
132 
133 Returns:
134     The lazily initialized value
135 */
136 @property pure
137 T __lazilyInitializedConstant(T, alias outOfBandValue, alias initializer)()
138 if (is(Unqual!T : T)
139     && is(typeof(initializer()) : T)
140     && is(typeof(outOfBandValue) : T))
141 {
142     static T impl() nothrow
143     {
144         // Thread-local cache
145         static Unqual!T tls = outOfBandValue;
146         auto local = tls;
147         // Shortest path, no atomic operations
148         if (local != outOfBandValue) return local;
149         // Process-level cache
150         static shared Unqual!T result = outOfBandValue;
151         // Initialize both process-level cache and tls
152         local = atomicLoad(result);
153         if (local == outOfBandValue)
154         {
155             local = initializer();
156             atomicStore(result, local);
157         }
158         tls = local;
159         return local;
160     }
161 
162     import std.traits : SetFunctionAttributes;
163     alias Fun = SetFunctionAttributes!(typeof(&impl), "D",
164         functionAttributes!(typeof(&impl)) | FunctionAttribute.pure_);
165     auto purified = (() @trusted => cast(Fun) &impl)();
166     return purified();
167 }
168 
169 // Returns the size of a cache line.
170 alias cacheLineSize =
171     __lazilyInitializedConstant!(immutable(size_t), size_t.max, cacheLineSizeImpl);
172 
173 private size_t cacheLineSizeImpl() @nogc nothrow @trusted
174 {
175     size_t result = 0;
176     import core.cpuid : datacache;
177     foreach (ref const cachelevel; datacache)
178     {
179         if (cachelevel.lineSize > result && cachelevel.lineSize < uint.max)
180         {
181             result = cachelevel.lineSize;
182         }
183     }
184     return result;
185 }
186 
187 @nogc @safe nothrow unittest
188 {
189     assert(cacheLineSize == cacheLineSizeImpl);
190 }
191 
192 /* Atomics code.  These forward to core.atomic, but are written like this
193    for two reasons:
194 
195    1.  They used to actually contain ASM code and I don' want to have to change
196        to directly calling core.atomic in a zillion different places.
197 
198    2.  core.atomic has some misc. issues that make my use cases difficult
199        without wrapping it.  If I didn't wrap it, casts would be required
200        basically everywhere.
201 */
202 private void atomicSetUbyte(T)(ref T stuff, T newVal)
203 if (__traits(isIntegral, T) && is(T : ubyte))
204 {
205     //core.atomic.cas(cast(shared) &stuff, stuff, newVal);
206     atomicStore(*(cast(shared) &stuff), newVal);
207 }
208 
209 private ubyte atomicReadUbyte(T)(ref T val)
210 if (__traits(isIntegral, T) && is(T : ubyte))
211 {
212     return atomicLoad(*(cast(shared) &val));
213 }
214 
215 // This gets rid of the need for a lot of annoying casts in other parts of the
216 // code, when enums are involved.
217 private bool atomicCasUbyte(T)(ref T stuff, T testVal, T newVal)
218 if (__traits(isIntegral, T) && is(T : ubyte))
219 {
220     return core.atomic.cas(cast(shared) &stuff, testVal, newVal);
221 }
222 
223 /*--------------------- Generic helper functions, etc.------------------------*/
224 private template MapType(R, functions...)
225 {
226     static assert(functions.length);
227 
228     ElementType!R e = void;
229     alias MapType =
230         typeof(adjoin!(staticMap!(unaryFun, functions))(e));
231 }
232 
233 private template ReduceType(alias fun, R, E)
234 {
235     alias ReduceType = typeof(binaryFun!fun(E.init, ElementType!R.init));
236 }
237 
238 private template noUnsharedAliasing(T)
239 {
240     enum bool noUnsharedAliasing = !hasUnsharedAliasing!T;
241 }
242 
243 // This template tests whether a function may be executed in parallel from
244 // @safe code via Task.executeInNewThread().  There is an additional
245 // requirement for executing it via a TaskPool.  (See isSafeReturn).
246 private template isSafeTask(F)
247 {
248     enum bool isSafeTask =
249         (functionAttributes!F & (FunctionAttribute.safe | FunctionAttribute.trusted)) != 0 &&
250         (functionAttributes!F & FunctionAttribute.ref_) == 0 &&
251         (isFunctionPointer!F || !hasUnsharedAliasing!F) &&
252         allSatisfy!(noUnsharedAliasing, Parameters!F);
253 }
254 
255 @safe unittest
256 {
257     alias F1 = void function() @safe;
258     alias F2 = void function();
259     alias F3 = void function(uint, string) @trusted;
260     alias F4 = void function(uint, char[]);
261 
262     static assert( isSafeTask!F1);
263     static assert(!isSafeTask!F2);
264     static assert( isSafeTask!F3);
265     static assert(!isSafeTask!F4);
266 
267     alias F5 = uint[] function(uint, string) pure @trusted;
268     static assert( isSafeTask!F5);
269 }
270 
271 // This function decides whether Tasks that meet all of the other requirements
272 // for being executed from @safe code can be executed on a TaskPool.
273 // When executing via TaskPool, it's theoretically possible
274 // to return a value that is also pointed to by a worker thread's thread local
275 // storage.  When executing from executeInNewThread(), the thread that executed
276 // the Task is terminated by the time the return value is visible in the calling
277 // thread, so this is a non-issue.  It's also a non-issue for pure functions
278 // since they can't read global state.
279 private template isSafeReturn(T)
280 {
281     static if (!hasUnsharedAliasing!(T.ReturnType))
282     {
283         enum isSafeReturn = true;
284     }
285     else static if (T.isPure)
286     {
287         enum isSafeReturn = true;
288     }
289     else
290     {
291         enum isSafeReturn = false;
292     }
293 }
294 
295 private template randAssignable(R)
296 {
297     enum randAssignable = isRandomAccessRange!R && hasAssignableElements!R;
298 }
299 
300 private enum TaskStatus : ubyte
301 {
302     notStarted,
303     inProgress,
304     done
305 }
306 
307 private template AliasReturn(alias fun, T...)
308 {
309     alias AliasReturn = typeof({ T args; return fun(args); });
310 }
311 
312 // Should be private, but std.algorithm.reduce is used in the zero-thread case
313 // and won't work w/ private.
314 template reduceAdjoin(functions...)
315 {
316     static if (functions.length == 1)
317     {
318         alias reduceAdjoin = binaryFun!(functions[0]);
319     }
320     else
321     {
322         T reduceAdjoin(T, U)(T lhs, U rhs)
323         {
324             alias funs = staticMap!(binaryFun, functions);
325 
326             foreach (i, Unused; typeof(lhs.expand))
327             {
328                 lhs.expand[i] = funs[i](lhs.expand[i], rhs);
329             }
330 
331             return lhs;
332         }
333     }
334 }
335 
336 private template reduceFinish(functions...)
337 {
338     static if (functions.length == 1)
339     {
340         alias reduceFinish = binaryFun!(functions[0]);
341     }
342     else
343     {
344         T reduceFinish(T)(T lhs, T rhs)
345         {
346             alias funs = staticMap!(binaryFun, functions);
347 
348             foreach (i, Unused; typeof(lhs.expand))
349             {
350                 lhs.expand[i] = funs[i](lhs.expand[i], rhs.expand[i]);
351             }
352 
353             return lhs;
354         }
355     }
356 }
357 
358 private template isRoundRobin(R : RoundRobinBuffer!(C1, C2), C1, C2)
359 {
360     enum isRoundRobin = true;
361 }
362 
363 private template isRoundRobin(T)
364 {
365     enum isRoundRobin = false;
366 }
367 
368 @safe unittest
369 {
370     static assert( isRoundRobin!(RoundRobinBuffer!(void delegate(char[]), bool delegate())));
371     static assert(!isRoundRobin!(uint));
372 }
373 
374 // This is the base "class" for all of the other tasks.  Using C-style
375 // polymorphism to allow more direct control over memory allocation, etc.
376 private struct AbstractTask
377 {
378     AbstractTask* prev;
379     AbstractTask* next;
380 
381     // Pointer to a function that executes this task.
382     void function(void*) runTask;
383 
384     Throwable exception;
385     ubyte taskStatus = TaskStatus.notStarted;
386 
387     bool done() @property
388     {
389         if (atomicReadUbyte(taskStatus) == TaskStatus.done)
390         {
391             if (exception)
392             {
393                 throw exception;
394             }
395 
396             return true;
397         }
398 
399         return false;
400     }
401 
402     void job()
403     {
404         runTask(&this);
405     }
406 }
407 
408 /**
409 `Task` represents the fundamental unit of work.  A `Task` may be
410 executed in parallel with any other `Task`.  Using this struct directly
411 allows future/promise parallelism.  In this paradigm, a function (or delegate
412 or other callable) is executed in a thread other than the one it was called
413 from.  The calling thread does not block while the function is being executed.
414 A call to `workForce`, `yieldForce`, or `spinForce` is used to
415 ensure that the `Task` has finished executing and to obtain the return
416 value, if any.  These functions and `done` also act as full memory barriers,
417 meaning that any memory writes made in the thread that executed the `Task`
418 are guaranteed to be visible in the calling thread after one of these functions
419 returns.
420 
421 The $(REF task, std,parallelism) and $(REF scopedTask, std,parallelism) functions can
422 be used to create an instance of this struct.  See `task` for usage examples.
423 
424 Function results are returned from `yieldForce`, `spinForce` and
425 `workForce` by ref.  If `fun` returns by ref, the reference will point
426 to the returned reference of `fun`.  Otherwise it will point to a
427 field in this struct.
428 
429 Copying of this struct is disabled, since it would provide no useful semantics.
430 If you want to pass this struct around, you should do so by reference or
431 pointer.
432 
433 Bugs:  Changes to `ref` and `out` arguments are not propagated to the
434        call site, only to `args` in this struct.
435 */
436 struct Task(alias fun, Args...)
437 {
438     AbstractTask base = {runTask : &impl};
439     alias base this;
440 
441     private @property AbstractTask* basePtr()
442     {
443         return &base;
444     }
445 
446     private static void impl(void* myTask)
447     {
448         import std.algorithm.internal : addressOf;
449 
450         Task* myCastedTask = cast(typeof(this)*) myTask;
451         static if (is(ReturnType == void))
452         {
453             fun(myCastedTask._args);
454         }
455         else static if (is(typeof(&(fun(myCastedTask._args)))))
456         {
457             myCastedTask.returnVal = addressOf(fun(myCastedTask._args));
458         }
459         else
460         {
461             myCastedTask.returnVal = fun(myCastedTask._args);
462         }
463     }
464 
465     private TaskPool pool;
466     private bool isScoped;  // True if created with scopedTask.
467 
468     Args _args;
469 
470     /**
471     The arguments the function was called with.  Changes to `out` and
472     `ref` arguments will be visible here.
473     */
474     static if (__traits(isSame, fun, run))
475     {
476         alias args = _args[1..$];
477     }
478     else
479     {
480         alias args = _args;
481     }
482 
483 
484     // The purpose of this code is to decide whether functions whose
485     // return values have unshared aliasing can be executed via
486     // TaskPool from @safe code.  See isSafeReturn.
487     static if (__traits(isSame, fun, run))
488     {
489         static if (isFunctionPointer!(_args[0]))
490         {
491             private enum bool isPure =
492             (functionAttributes!(Args[0]) & FunctionAttribute.pure_) != 0;
493         }
494         else
495         {
496             // BUG:  Should check this for delegates too, but std.traits
497             //       apparently doesn't allow this.  isPure is irrelevant
498             //       for delegates, at least for now since shared delegates
499             //       don't work.
500             private enum bool isPure = false;
501         }
502 
503     }
504     else
505     {
506         // We already know that we can't execute aliases in @safe code, so
507         // just put a dummy value here.
508         private enum bool isPure = false;
509     }
510 
511 
512     /**
513     The return type of the function called by this `Task`.  This can be
514     `void`.
515     */
516     alias ReturnType = typeof(fun(_args));
517 
518     static if (!is(ReturnType == void))
519     {
520         static if (is(typeof(&fun(_args))))
521         {
522             // Ref return.
523             ReturnType* returnVal;
524 
525             ref ReturnType fixRef(ReturnType* val)
526             {
527                 return *val;
528             }
529 
530         }
531         else
532         {
533             ReturnType returnVal;
534 
535             ref ReturnType fixRef(ref ReturnType val)
536             {
537                 return val;
538             }
539         }
540     }
541 
542     private void enforcePool()
543     {
544         import std.exception : enforce;
545         enforce(this.pool !is null, "Job not submitted yet.");
546     }
547 
548     static if (Args.length > 0)
549     {
550         private this(Args args)
551         {
552             _args = args;
553         }
554     }
555 
556     // Work around DMD bug https://issues.dlang.org/show_bug.cgi?id=6588,
557     // allow immutable elements.
558     static if (allSatisfy!(isAssignable, Args))
559     {
560         typeof(this) opAssign(typeof(this) rhs)
561         {
562             foreach (i, Type; typeof(this.tupleof))
563             {
564                 this.tupleof[i] = rhs.tupleof[i];
565             }
566             return this;
567         }
568     }
569     else
570     {
571         @disable typeof(this) opAssign(typeof(this) rhs);
572     }
573 
574     /**
575     If the `Task` isn't started yet, execute it in the current thread.
576     If it's done, return its return value, if any.  If it's in progress,
577     busy spin until it's done, then return the return value.  If it threw
578     an exception, rethrow that exception.
579 
580     This function should be used when you expect the result of the
581     `Task` to be available on a timescale shorter than that of an OS
582     context switch.
583      */
584     @property ref ReturnType spinForce() @trusted
585     {
586         enforcePool();
587 
588         this.pool.tryDeleteExecute(basePtr);
589 
590         while (atomicReadUbyte(this.taskStatus) != TaskStatus.done) {}
591 
592         if (exception)
593         {
594             throw exception;
595         }
596 
597         static if (!is(ReturnType == void))
598         {
599             return fixRef(this.returnVal);
600         }
601     }
602 
603     /**
604     If the `Task` isn't started yet, execute it in the current thread.
605     If it's done, return its return value, if any.  If it's in progress,
606     wait on a condition variable.  If it threw an exception, rethrow that
607     exception.
608 
609     This function should be used for expensive functions, as waiting on a
610     condition variable introduces latency, but avoids wasted CPU cycles.
611      */
612     @property ref ReturnType yieldForce() @trusted
613     {
614         enforcePool();
615         this.pool.tryDeleteExecute(basePtr);
616 
617         if (done)
618         {
619             static if (is(ReturnType == void))
620             {
621                 return;
622             }
623             else
624             {
625                 return fixRef(this.returnVal);
626             }
627         }
628 
629         pool.waiterLock();
630         scope(exit) pool.waiterUnlock();
631 
632         while (atomicReadUbyte(this.taskStatus) != TaskStatus.done)
633         {
634             pool.waitUntilCompletion();
635         }
636 
637         if (exception)
638         {
639             throw exception; // nocoverage
640         }
641 
642         static if (!is(ReturnType == void))
643         {
644             return fixRef(this.returnVal);
645         }
646     }
647 
648     /**
649     If this `Task` was not started yet, execute it in the current
650     thread.  If it is finished, return its result.  If it is in progress,
651     execute any other `Task` from the `TaskPool` instance that
652     this `Task` was submitted to until this one
653     is finished.  If it threw an exception, rethrow that exception.
654     If no other tasks are available or this `Task` was executed using
655     `executeInNewThread`, wait on a condition variable.
656      */
657     @property ref ReturnType workForce() @trusted
658     {
659         enforcePool();
660         this.pool.tryDeleteExecute(basePtr);
661 
662         while (true)
663         {
664             if (done)    // done() implicitly checks for exceptions.
665             {
666                 static if (is(ReturnType == void))
667                 {
668                     return;
669                 }
670                 else
671                 {
672                     return fixRef(this.returnVal);
673                 }
674             }
675 
676             AbstractTask* job;
677             {
678                 // Locking explicitly and calling popNoSync() because
679                 // pop() waits on a condition variable if there are no Tasks
680                 // in the queue.
681 
682                 pool.queueLock();
683                 scope(exit) pool.queueUnlock();
684                 job = pool.popNoSync();
685             }
686 
687 
688             if (job !is null)
689             {
690 
691                 version (verboseUnittest)
692                 {
693                     stderr.writeln("Doing workForce work.");
694                 }
695 
696                 pool.doJob(job);
697 
698                 if (done)
699                 {
700                     static if (is(ReturnType == void))
701                     {
702                         return;
703                     }
704                     else
705                     {
706                         return fixRef(this.returnVal);
707                     }
708                 }
709             }
710             else
711             {
712                 version (verboseUnittest)
713                 {
714                     stderr.writeln("Yield from workForce.");
715                 }
716 
717                 return yieldForce;
718             }
719         }
720     }
721 
722     /**
723     Returns `true` if the `Task` is finished executing.
724 
725     Throws:  Rethrows any exception thrown during the execution of the
726              `Task`.
727     */
728     @property bool done() @trusted
729     {
730         // Explicitly forwarded for documentation purposes.
731         return base.done;
732     }
733 
734     /**
735     Create a new thread for executing this `Task`, execute it in the
736     newly created thread, then terminate the thread.  This can be used for
737     future/promise parallelism.  An explicit priority may be given
738     to the `Task`.  If one is provided, its value is forwarded to
739     `core.thread.Thread.priority`. See $(REF task, std,parallelism) for
740     usage example.
741     */
742     void executeInNewThread() @trusted
743     {
744         pool = new TaskPool(basePtr);
745     }
746 
747     /// Ditto
748     void executeInNewThread(int priority) @trusted
749     {
750         pool = new TaskPool(basePtr, priority);
751     }
752 
753     @safe ~this()
754     {
755         if (isScoped && pool !is null && taskStatus != TaskStatus.done)
756         {
757             yieldForce;
758         }
759     }
760 
761     // When this is uncommented, it somehow gets called on returning from
762     // scopedTask even though the struct shouldn't be getting copied.
763     //@disable this(this) {}
764 }
765 
766 // Calls `fpOrDelegate` with `args`.  This is an
767 // adapter that makes `Task` work with delegates, function pointers and
768 // functors instead of just aliases.
769 ReturnType!F run(F, Args...)(F fpOrDelegate, ref Args args)
770 {
771     return fpOrDelegate(args);
772 }
773 
774 /**
775 Creates a `Task` on the GC heap that calls an alias.  This may be executed
776 via `Task.executeInNewThread` or by submitting to a
777 $(REF TaskPool, std,parallelism).  A globally accessible instance of
778 `TaskPool` is provided by $(REF taskPool, std,parallelism).
779 
780 Returns:  A pointer to the `Task`.
781 
782 Example:
783 ---
784 // Read two files into memory at the same time.
785 import std.file;
786 
787 void main()
788 {
789     // Create and execute a Task for reading
790     // foo.txt.
791     auto file1Task = task!read("foo.txt");
792     file1Task.executeInNewThread();
793 
794     // Read bar.txt in parallel.
795     auto file2Data = read("bar.txt");
796 
797     // Get the results of reading foo.txt.
798     auto file1Data = file1Task.yieldForce;
799 }
800 ---
801 
802 ---
803 // Sorts an array using a parallel quick sort algorithm.
804 // The first partition is done serially.  Both recursion
805 // branches are then executed in parallel.
806 //
807 // Timings for sorting an array of 1,000,000 doubles on
808 // an Athlon 64 X2 dual core machine:
809 //
810 // This implementation:               176 milliseconds.
811 // Equivalent serial implementation:  280 milliseconds
812 void parallelSort(T)(T[] data)
813 {
814     // Sort small subarrays serially.
815     if (data.length < 100)
816     {
817          std.algorithm.sort(data);
818          return;
819     }
820 
821     // Partition the array.
822     swap(data[$ / 2], data[$ - 1]);
823     auto pivot = data[$ - 1];
824     bool lessThanPivot(T elem) { return elem < pivot; }
825 
826     auto greaterEqual = partition!lessThanPivot(data[0..$ - 1]);
827     swap(data[$ - greaterEqual.length - 1], data[$ - 1]);
828 
829     auto less = data[0..$ - greaterEqual.length - 1];
830     greaterEqual = data[$ - greaterEqual.length..$];
831 
832     // Execute both recursion branches in parallel.
833     auto recurseTask = task!parallelSort(greaterEqual);
834     taskPool.put(recurseTask);
835     parallelSort(less);
836     recurseTask.yieldForce;
837 }
838 ---
839 */
840 auto task(alias fun, Args...)(Args args)
841 {
842     return new Task!(fun, Args)(args);
843 }
844 
845 /**
846 Creates a `Task` on the GC heap that calls a function pointer, delegate, or
847 class/struct with overloaded opCall.
848 
849 Example:
850 ---
851 // Read two files in at the same time again,
852 // but this time use a function pointer instead
853 // of an alias to represent std.file.read.
854 import std.file;
855 
856 void main()
857 {
858     // Create and execute a Task for reading
859     // foo.txt.
860     auto file1Task = task(&read!string, "foo.txt", size_t.max);
861     file1Task.executeInNewThread();
862 
863     // Read bar.txt in parallel.
864     auto file2Data = read("bar.txt");
865 
866     // Get the results of reading foo.txt.
867     auto file1Data = file1Task.yieldForce;
868 }
869 ---
870 
871 Notes: This function takes a non-scope delegate, meaning it can be
872        used with closures.  If you can't allocate a closure due to objects
873        on the stack that have scoped destruction, see `scopedTask`, which
874        takes a scope delegate.
875  */
876 auto task(F, Args...)(F delegateOrFp, Args args)
877 if (is(typeof(delegateOrFp(args))) && !isSafeTask!F)
878 {
879     return new Task!(run, F, Args)(delegateOrFp, args);
880 }
881 
882 /**
883 Version of `task` usable from `@safe` code.  Usage mechanics are
884 identical to the non-@safe case, but safety introduces some restrictions:
885 
886 1.  `fun` must be @safe or @trusted.
887 
888 2.  `F` must not have any unshared aliasing as defined by
889     $(REF hasUnsharedAliasing, std,traits).  This means it
890     may not be an unshared delegate or a non-shared class or struct
891     with overloaded `opCall`.  This also precludes accepting template
892     alias parameters.
893 
894 3.  `Args` must not have unshared aliasing.
895 
896 4.  `fun` must not return by reference.
897 
898 5.  The return type must not have unshared aliasing unless `fun` is
899     `pure` or the `Task` is executed via `executeInNewThread` instead
900     of using a `TaskPool`.
901 
902 */
903 @trusted auto task(F, Args...)(F fun, Args args)
904 if (is(typeof(fun(args))) && isSafeTask!F)
905 {
906     return new Task!(run, F, Args)(fun, args);
907 }
908 
909 /**
910 These functions allow the creation of `Task` objects on the stack rather
911 than the GC heap.  The lifetime of a `Task` created by `scopedTask`
912 cannot exceed the lifetime of the scope it was created in.
913 
914 `scopedTask` might be preferred over `task`:
915 
916 1.  When a `Task` that calls a delegate is being created and a closure
917     cannot be allocated due to objects on the stack that have scoped
918     destruction.  The delegate overload of `scopedTask` takes a `scope`
919     delegate.
920 
921 2.  As a micro-optimization, to avoid the heap allocation associated with
922     `task` or with the creation of a closure.
923 
924 Usage is otherwise identical to `task`.
925 
926 Notes:  `Task` objects created using `scopedTask` will automatically
927 call `Task.yieldForce` in their destructor if necessary to ensure
928 the `Task` is complete before the stack frame they reside on is destroyed.
929 */
930 auto scopedTask(alias fun, Args...)(Args args)
931 {
932     auto ret = Task!(fun, Args)(args);
933     ret.isScoped = true;
934     return ret;
935 }
936 
937 /// Ditto
938 auto scopedTask(F, Args...)(scope F delegateOrFp, Args args)
939 if (is(typeof(delegateOrFp(args))) && !isSafeTask!F)
940 {
941     auto ret = Task!(run, F, Args)(delegateOrFp, args);
942     ret.isScoped = true;
943     return ret;
944 }
945 
946 /// Ditto
947 @trusted auto scopedTask(F, Args...)(F fun, Args args)
948 if (is(typeof(fun(args))) && isSafeTask!F)
949 {
950     auto ret = Task!(run, F, Args)(fun, args);
951     ret.isScoped = true;
952     return ret;
953 }
954 
955 version (useSysctlbyname)
956     private extern(C) int sysctlbyname(
957         const char *, void *, size_t *, void *, size_t
958     ) @nogc nothrow;
959 
960 /**
961 The total number of CPU cores available on the current machine, as reported by
962 the operating system.
963 */
964 alias totalCPUs =
965     __lazilyInitializedConstant!(immutable(uint), uint.max, totalCPUsImpl);
966 
967 uint totalCPUsImpl() @nogc nothrow @trusted
968 {
969     version (Windows)
970     {
971         // BUGS:  Only works on Windows 2000 and above.
972         import core.sys.windows.winbase : SYSTEM_INFO, GetSystemInfo;
973         import std.algorithm.comparison : max;
974         SYSTEM_INFO si;
975         GetSystemInfo(&si);
976         return max(1, cast(uint) si.dwNumberOfProcessors);
977     }
978     else version (linux)
979     {
980         import core.sys.linux.sched : CPU_COUNT, cpu_set_t, sched_getaffinity;
981         import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf;
982 
983         cpu_set_t set = void;
984         if (sched_getaffinity(0, cpu_set_t.sizeof, &set) == 0)
985         {
986             int count = CPU_COUNT(&set);
987             if (count > 0)
988                 return cast(uint) count;
989         }
990         return cast(uint) sysconf(_SC_NPROCESSORS_ONLN);
991     }
992     else version (Solaris)
993     {
994         import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf;
995         return cast(uint) sysconf(_SC_NPROCESSORS_ONLN);
996     }
997     else version (useSysctlbyname)
998     {
999         version (Darwin)
1000         {
1001             enum nameStr = "hw.physicalcpu";
1002         }
1003         else version (FreeBSD)
1004         {
1005             auto nameStr = "hw.ncpu\0".ptr;
1006         }
1007         else version (DragonFlyBSD)
1008         {
1009             auto nameStr = "hw.ncpu\0".ptr;
1010         }
1011         else version (NetBSD)
1012         {
1013             auto nameStr = "hw.ncpu\0".ptr;
1014         }
1015 
1016         uint result;
1017         size_t len = result.sizeof;
1018         sysctlbyname(nameStr, &result, &len, null, 0);
1019         return result;
1020     }
1021     else
1022     {
1023         static assert(0, "Don't know how to get N CPUs on this OS.");
1024     }
1025 }
1026 
1027 /*
1028 This class serves two purposes:
1029 
1030 1.  It distinguishes std.parallelism threads from other threads so that
1031     the std.parallelism daemon threads can be terminated.
1032 
1033 2.  It adds a reference to the pool that the thread is a member of,
1034     which is also necessary to allow the daemon threads to be properly
1035     terminated.
1036 */
1037 private final class ParallelismThread : Thread
1038 {
1039     this(void delegate() dg)
1040     {
1041         super(dg);
1042     }
1043 
1044     TaskPool pool;
1045 }
1046 
1047 // Kill daemon threads.
1048 shared static ~this()
1049 {
1050     foreach (ref thread; Thread)
1051     {
1052         auto pthread = cast(ParallelismThread) thread;
1053         if (pthread is null) continue;
1054         auto pool = pthread.pool;
1055         if (!pool.isDaemon) continue;
1056         pool.stop();
1057         pthread.join();
1058     }
1059 }
1060 
1061 /**
1062 This class encapsulates a task queue and a set of worker threads.  Its purpose
1063 is to efficiently map a large number of `Task`s onto a smaller number of
1064 threads.  A task queue is a FIFO queue of `Task` objects that have been
1065 submitted to the `TaskPool` and are awaiting execution.  A worker thread is a
1066 thread that executes the `Task` at the front of the queue when one is
1067 available and sleeps when the queue is empty.
1068 
1069 This class should usually be used via the global instantiation
1070 available via the $(REF taskPool, std,parallelism) property.
1071 Occasionally it is useful to explicitly instantiate a `TaskPool`:
1072 
1073 1.  When you want `TaskPool` instances with multiple priorities, for example
1074     a low priority pool and a high priority pool.
1075 
1076 2.  When the threads in the global task pool are waiting on a synchronization
1077     primitive (for example a mutex), and you want to parallelize the code that
1078     needs to run before these threads can be resumed.
1079 
1080 Note: The worker threads in this pool will not stop until
1081       `stop` or `finish` is called, even if the main thread
1082       has finished already. This may lead to programs that
1083       never end. If you do not want this behaviour, you can set `isDaemon`
1084       to true.
1085  */
1086 final class TaskPool
1087 {
1088 private:
1089 
1090     // A pool can either be a regular pool or a single-task pool.  A
1091     // single-task pool is a dummy pool that's fired up for
1092     // Task.executeInNewThread().
1093     bool isSingleTask;
1094 
1095     ParallelismThread[] pool;
1096     Thread singleTaskThread;
1097 
1098     AbstractTask* head;
1099     AbstractTask* tail;
1100     PoolState status = PoolState.running;
1101     Condition workerCondition;
1102     Condition waiterCondition;
1103     Mutex queueMutex;
1104     Mutex waiterMutex;  // For waiterCondition
1105 
1106     // The instanceStartIndex of the next instance that will be created.
1107     __gshared size_t nextInstanceIndex = 1;
1108 
1109     // The index of the current thread.
1110     static size_t threadIndex;
1111 
1112     // The index of the first thread in this instance.
1113     immutable size_t instanceStartIndex;
1114 
1115     // The index that the next thread to be initialized in this pool will have.
1116     size_t nextThreadIndex;
1117 
1118     enum PoolState : ubyte
1119     {
1120         running,
1121         finishing,
1122         stopNow
1123     }
1124 
1125     void doJob(AbstractTask* job)
1126     {
1127         assert(job.taskStatus == TaskStatus.inProgress);
1128         assert(job.next is null);
1129         assert(job.prev is null);
1130 
1131         scope(exit)
1132         {
1133             if (!isSingleTask)
1134             {
1135                 waiterLock();
1136                 scope(exit) waiterUnlock();
1137                 notifyWaiters();
1138             }
1139         }
1140 
1141         try
1142         {
1143             job.job();
1144         }
1145         catch (Throwable e)
1146         {
1147             job.exception = e;
1148         }
1149 
1150         atomicSetUbyte(job.taskStatus, TaskStatus.done);
1151     }
1152 
1153     // This function is used for dummy pools created by Task.executeInNewThread().
1154     void doSingleTask()
1155     {
1156         // No synchronization.  Pool is guaranteed to only have one thread,
1157         // and the queue is submitted to before this thread is created.
1158         assert(head);
1159         auto t = head;
1160         t.next = t.prev = head = null;
1161         doJob(t);
1162     }
1163 
1164     // This function performs initialization for each thread that affects
1165     // thread local storage and therefore must be done from within the
1166     // worker thread.  It then calls executeWorkLoop().
1167     void startWorkLoop()
1168     {
1169         // Initialize thread index.
1170         {
1171             queueLock();
1172             scope(exit) queueUnlock();
1173             threadIndex = nextThreadIndex;
1174             nextThreadIndex++;
1175         }
1176 
1177         executeWorkLoop();
1178     }
1179 
1180     // This is the main work loop that worker threads spend their time in
1181     // until they terminate.  It's also entered by non-worker threads when
1182     // finish() is called with the blocking variable set to true.
1183     void executeWorkLoop()
1184     {
1185         while (atomicReadUbyte(status) != PoolState.stopNow)
1186         {
1187             AbstractTask* task = pop();
1188             if (task is null)
1189             {
1190                 if (atomicReadUbyte(status) == PoolState.finishing)
1191                 {
1192                     atomicSetUbyte(status, PoolState.stopNow);
1193                     return;
1194                 }
1195             }
1196             else
1197             {
1198                 doJob(task);
1199             }
1200         }
1201     }
1202 
1203     // Pop a task off the queue.
1204     AbstractTask* pop()
1205     {
1206         queueLock();
1207         scope(exit) queueUnlock();
1208         auto ret = popNoSync();
1209         while (ret is null && status == PoolState.running)
1210         {
1211             wait();
1212             ret = popNoSync();
1213         }
1214         return ret;
1215     }
1216 
1217     AbstractTask* popNoSync()
1218     out(returned)
1219     {
1220         /* If task.prev and task.next aren't null, then another thread
1221          * can try to delete this task from the pool after it's
1222          * alreadly been deleted/popped.
1223          */
1224         if (returned !is null)
1225         {
1226             assert(returned.next is null);
1227             assert(returned.prev is null);
1228         }
1229     }
1230     do
1231     {
1232         if (isSingleTask) return null;
1233 
1234         AbstractTask* returned = head;
1235         if (head !is null)
1236         {
1237             head = head.next;
1238             returned.prev = null;
1239             returned.next = null;
1240             returned.taskStatus = TaskStatus.inProgress;
1241         }
1242         if (head !is null)
1243         {
1244             head.prev = null;
1245         }
1246 
1247         return returned;
1248     }
1249 
1250     // Push a task onto the queue.
1251     void abstractPut(AbstractTask* task)
1252     {
1253         queueLock();
1254         scope(exit) queueUnlock();
1255         abstractPutNoSync(task);
1256     }
1257 
1258     void abstractPutNoSync(AbstractTask* task)
1259     in
1260     {
1261         assert(task);
1262     }
1263     out
1264     {
1265         import std.conv : text;
1266 
1267         assert(tail.prev !is tail);
1268         assert(tail.next is null, text(tail.prev, '\t', tail.next));
1269         if (tail.prev !is null)
1270         {
1271             assert(tail.prev.next is tail, text(tail.prev, '\t', tail.next));
1272         }
1273     }
1274     do
1275     {
1276         // Not using enforce() to save on function call overhead since this
1277         // is a performance critical function.
1278         if (status != PoolState.running)
1279         {
1280             throw new Error(
1281                 "Cannot submit a new task to a pool after calling " ~
1282                 "finish() or stop()."
1283             );
1284         }
1285 
1286         task.next = null;
1287         if (head is null)   //Queue is empty.
1288         {
1289             head = task;
1290             tail = task;
1291             tail.prev = null;
1292         }
1293         else
1294         {
1295             assert(tail);
1296             task.prev = tail;
1297             tail.next = task;
1298             tail = task;
1299         }
1300         notify();
1301     }
1302 
1303     void abstractPutGroupNoSync(AbstractTask* h, AbstractTask* t)
1304     {
1305         if (status != PoolState.running)
1306         {
1307             throw new Error(
1308                 "Cannot submit a new task to a pool after calling " ~
1309                 "finish() or stop()."
1310             );
1311         }
1312 
1313         if (head is null)
1314         {
1315             head = h;
1316             tail = t;
1317         }
1318         else
1319         {
1320             h.prev = tail;
1321             tail.next = h;
1322             tail = t;
1323         }
1324 
1325         notifyAll();
1326     }
1327 
1328     void tryDeleteExecute(AbstractTask* toExecute)
1329     {
1330         if (isSingleTask) return;
1331 
1332         if ( !deleteItem(toExecute) )
1333         {
1334             return;
1335         }
1336 
1337         try
1338         {
1339             toExecute.job();
1340         }
1341         catch (Exception e)
1342         {
1343             toExecute.exception = e;
1344         }
1345 
1346         atomicSetUbyte(toExecute.taskStatus, TaskStatus.done);
1347     }
1348 
1349     bool deleteItem(AbstractTask* item)
1350     {
1351         queueLock();
1352         scope(exit) queueUnlock();
1353         return deleteItemNoSync(item);
1354     }
1355 
1356     bool deleteItemNoSync(AbstractTask* item)
1357     {
1358         if (item.taskStatus != TaskStatus.notStarted)
1359         {
1360             return false;
1361         }
1362         item.taskStatus = TaskStatus.inProgress;
1363 
1364         if (item is head)
1365         {
1366             // Make sure head gets set properly.
1367             popNoSync();
1368             return true;
1369         }
1370         if (item is tail)
1371         {
1372             tail = tail.prev;
1373             if (tail !is null)
1374             {
1375                 tail.next = null;
1376             }
1377             item.next = null;
1378             item.prev = null;
1379             return true;
1380         }
1381         if (item.next !is null)
1382         {
1383             assert(item.next.prev is item);  // Check queue consistency.
1384             item.next.prev = item.prev;
1385         }
1386         if (item.prev !is null)
1387         {
1388             assert(item.prev.next is item);  // Check queue consistency.
1389             item.prev.next = item.next;
1390         }
1391         item.next = null;
1392         item.prev = null;
1393         return true;
1394     }
1395 
1396     void queueLock()
1397     {
1398         assert(queueMutex);
1399         if (!isSingleTask) queueMutex.lock();
1400     }
1401 
1402     void queueUnlock()
1403     {
1404         assert(queueMutex);
1405         if (!isSingleTask) queueMutex.unlock();
1406     }
1407 
1408     void waiterLock()
1409     {
1410         if (!isSingleTask) waiterMutex.lock();
1411     }
1412 
1413     void waiterUnlock()
1414     {
1415         if (!isSingleTask) waiterMutex.unlock();
1416     }
1417 
1418     void wait()
1419     {
1420         if (!isSingleTask) workerCondition.wait();
1421     }
1422 
1423     void notify()
1424     {
1425         if (!isSingleTask) workerCondition.notify();
1426     }
1427 
1428     void notifyAll()
1429     {
1430         if (!isSingleTask) workerCondition.notifyAll();
1431     }
1432 
1433     void waitUntilCompletion()
1434     {
1435         if (isSingleTask)
1436         {
1437             singleTaskThread.join();
1438         }
1439         else
1440         {
1441             waiterCondition.wait();
1442         }
1443     }
1444 
1445     void notifyWaiters()
1446     {
1447         if (!isSingleTask) waiterCondition.notifyAll();
1448     }
1449 
1450     // Private constructor for creating dummy pools that only have one thread,
1451     // only execute one Task, and then terminate.  This is used for
1452     // Task.executeInNewThread().
1453     this(AbstractTask* task, int priority = int.max)
1454     {
1455         assert(task);
1456 
1457         // Dummy value, not used.
1458         instanceStartIndex = 0;
1459 
1460         this.isSingleTask = true;
1461         task.taskStatus = TaskStatus.inProgress;
1462         this.head = task;
1463         singleTaskThread = new Thread(&doSingleTask);
1464         singleTaskThread.start();
1465 
1466         // Disabled until writing code to support
1467         // running thread with specified priority
1468         // See https://issues.dlang.org/show_bug.cgi?id=8960
1469 
1470         /*if (priority != int.max)
1471         {
1472             singleTaskThread.priority = priority;
1473         }*/
1474     }
1475 
1476 public:
1477     // This is used in parallel_algorithm but is too unstable to document
1478     // as public API.
1479     size_t defaultWorkUnitSize(size_t rangeLen) const @safe pure nothrow
1480     {
1481         import std.algorithm.comparison : max;
1482 
1483         if (this.size == 0)
1484         {
1485             return rangeLen;
1486         }
1487 
1488         immutable size_t eightSize = 4 * (this.size + 1);
1489         auto ret = (rangeLen / eightSize) + ((rangeLen % eightSize == 0) ? 0 : 1);
1490         return max(ret, 1);
1491     }
1492 
1493     /**
1494     Default constructor that initializes a `TaskPool` with
1495     `totalCPUs` - 1 worker threads.  The minus 1 is included because the
1496     main thread will also be available to do work.
1497 
1498     Note:  On single-core machines, the primitives provided by `TaskPool`
1499            operate transparently in single-threaded mode.
1500      */
1501     this() @trusted
1502     {
1503         this(totalCPUs - 1);
1504     }
1505 
1506     /**
1507     Allows for custom number of worker threads.
1508     */
1509     this(size_t nWorkers) @trusted
1510     {
1511         synchronized(typeid(TaskPool))
1512         {
1513             instanceStartIndex = nextInstanceIndex;
1514 
1515             // The first worker thread to be initialized will have this index,
1516             // and will increment it.  The second worker to be initialized will
1517             // have this index plus 1.
1518             nextThreadIndex = instanceStartIndex;
1519             nextInstanceIndex += nWorkers;
1520         }
1521 
1522         queueMutex = new Mutex(this);
1523         waiterMutex = new Mutex();
1524         workerCondition = new Condition(queueMutex);
1525         waiterCondition = new Condition(waiterMutex);
1526 
1527         pool = new ParallelismThread[nWorkers];
1528         foreach (ref poolThread; pool)
1529         {
1530             poolThread = new ParallelismThread(&startWorkLoop);
1531             poolThread.pool = this;
1532             poolThread.start();
1533         }
1534     }
1535 
1536     /**
1537     Implements a parallel foreach loop over a range.  This works by implicitly
1538     creating and submitting one `Task` to the `TaskPool` for each worker
1539     thread.  A work unit is a set of consecutive elements of `range` to
1540     be processed by a worker thread between communication with any other
1541     thread.  The number of elements processed per work unit is controlled by the
1542     `workUnitSize` parameter.  Smaller work units provide better load
1543     balancing, but larger work units avoid the overhead of communicating
1544     with other threads frequently to fetch the next work unit.  Large work
1545     units also avoid false sharing in cases where the range is being modified.
1546     The less time a single iteration of the loop takes, the larger
1547     `workUnitSize` should be.  For very expensive loop bodies,
1548     `workUnitSize` should  be 1.  An overload that chooses a default work
1549     unit size is also available.
1550 
1551     Example:
1552     ---
1553     // Find the logarithm of every number from 1 to
1554     // 10_000_000 in parallel.
1555     auto logs = new double[10_000_000];
1556 
1557     // Parallel foreach works with or without an index
1558     // variable.  It can be iterate by ref if range.front
1559     // returns by ref.
1560 
1561     // Iterate over logs using work units of size 100.
1562     foreach (i, ref elem; taskPool.parallel(logs, 100))
1563     {
1564         elem = log(i + 1.0);
1565     }
1566 
1567     // Same thing, but use the default work unit size.
1568     //
1569     // Timings on an Athlon 64 X2 dual core machine:
1570     //
1571     // Parallel foreach:  388 milliseconds
1572     // Regular foreach:   619 milliseconds
1573     foreach (i, ref elem; taskPool.parallel(logs))
1574     {
1575         elem = log(i + 1.0);
1576     }
1577     ---
1578 
1579     Notes:
1580 
1581     The memory usage of this implementation is guaranteed to be constant
1582     in `range.length`.
1583 
1584     Breaking from a parallel foreach loop via a break, labeled break,
1585     labeled continue, return or goto statement throws a
1586     `ParallelForeachError`.
1587 
1588     In the case of non-random access ranges, parallel foreach buffers lazily
1589     to an array of size `workUnitSize` before executing the parallel portion
1590     of the loop.  The exception is that, if a parallel foreach is executed
1591     over a range returned by `asyncBuf` or `map`, the copying is elided
1592     and the buffers are simply swapped.  In this case `workUnitSize` is
1593     ignored and the work unit size is set to the  buffer size of `range`.
1594 
1595     A memory barrier is guaranteed to be executed on exit from the loop,
1596     so that results produced by all threads are visible in the calling thread.
1597 
1598     $(B Exception Handling):
1599 
1600     When at least one exception is thrown from inside a parallel foreach loop,
1601     the submission of additional `Task` objects is terminated as soon as
1602     possible, in a non-deterministic manner.  All executing or
1603     enqueued work units are allowed to complete.  Then, all exceptions that
1604     were thrown by any work unit are chained using `Throwable.next` and
1605     rethrown.  The order of the exception chaining is non-deterministic.
1606     */
1607     ParallelForeach!R parallel(R)(R range, size_t workUnitSize)
1608     {
1609         import std.exception : enforce;
1610         enforce(workUnitSize > 0, "workUnitSize must be > 0.");
1611         alias RetType = ParallelForeach!R;
1612         return RetType(this, range, workUnitSize);
1613     }
1614 
1615 
1616     /// Ditto
1617     ParallelForeach!R parallel(R)(R range)
1618     {
1619         static if (hasLength!R)
1620         {
1621             // Default work unit size is such that we would use 4x as many
1622             // slots as are in this thread pool.
1623             size_t workUnitSize = defaultWorkUnitSize(range.length);
1624             return parallel(range, workUnitSize);
1625         }
1626         else
1627         {
1628             // Just use a really, really dumb guess if the user is too lazy to
1629             // specify.
1630             return parallel(range, 512);
1631         }
1632     }
1633 
1634     ///
1635     template amap(functions...)
1636     {
1637         /**
1638         Eager parallel map.  The eagerness of this function means it has less
1639         overhead than the lazily evaluated `TaskPool.map` and should be
1640         preferred where the memory requirements of eagerness are acceptable.
1641         `functions` are the functions to be evaluated, passed as template
1642         alias parameters in a style similar to
1643         $(REF map, std,algorithm,iteration).
1644         The first argument must be a random access range. For performance
1645         reasons, amap will assume the range elements have not yet been
1646         initialized. Elements will be overwritten without calling a destructor
1647         nor doing an assignment. As such, the range must not contain meaningful
1648         data$(DDOC_COMMENT not a section): either un-initialized objects, or
1649         objects in their `.init` state.
1650 
1651         ---
1652         auto numbers = iota(100_000_000.0);
1653 
1654         // Find the square roots of numbers.
1655         //
1656         // Timings on an Athlon 64 X2 dual core machine:
1657         //
1658         // Parallel eager map:                   0.802 s
1659         // Equivalent serial implementation:     1.768 s
1660         auto squareRoots = taskPool.amap!sqrt(numbers);
1661         ---
1662 
1663         Immediately after the range argument, an optional work unit size argument
1664         may be provided.  Work units as used by `amap` are identical to those
1665         defined for parallel foreach.  If no work unit size is provided, the
1666         default work unit size is used.
1667 
1668         ---
1669         // Same thing, but make work unit size 100.
1670         auto squareRoots = taskPool.amap!sqrt(numbers, 100);
1671         ---
1672 
1673         An output range for returning the results may be provided as the last
1674         argument.  If one is not provided, an array of the proper type will be
1675         allocated on the garbage collected heap.  If one is provided, it must be a
1676         random access range with assignable elements, must have reference
1677         semantics with respect to assignment to its elements, and must have the
1678         same length as the input range.  Writing to adjacent elements from
1679         different threads must be safe.
1680 
1681         ---
1682         // Same thing, but explicitly allocate an array
1683         // to return the results in.  The element type
1684         // of the array may be either the exact type
1685         // returned by functions or an implicit conversion
1686         // target.
1687         auto squareRoots = new float[numbers.length];
1688         taskPool.amap!sqrt(numbers, squareRoots);
1689 
1690         // Multiple functions, explicit output range, and
1691         // explicit work unit size.
1692         auto results = new Tuple!(float, real)[numbers.length];
1693         taskPool.amap!(sqrt, log)(numbers, 100, results);
1694         ---
1695 
1696         Note:
1697 
1698         A memory barrier is guaranteed to be executed after all results are written
1699         but before returning so that results produced by all threads are visible
1700         in the calling thread.
1701 
1702         Tips:
1703 
1704         To perform the mapping operation in place, provide the same range for the
1705         input and output range.
1706 
1707         To parallelize the copying of a range with expensive to evaluate elements
1708         to an array, pass an identity function (a function that just returns
1709         whatever argument is provided to it) to `amap`.
1710 
1711         $(B Exception Handling):
1712 
1713         When at least one exception is thrown from inside the map functions,
1714         the submission of additional `Task` objects is terminated as soon as
1715         possible, in a non-deterministic manner.  All currently executing or
1716         enqueued work units are allowed to complete.  Then, all exceptions that
1717         were thrown from any work unit are chained using `Throwable.next` and
1718         rethrown.  The order of the exception chaining is non-deterministic.
1719         */
1720         auto amap(Args...)(Args args)
1721         if (isRandomAccessRange!(Args[0]))
1722         {
1723             import std.conv : emplaceRef;
1724 
1725             alias fun = adjoin!(staticMap!(unaryFun, functions));
1726 
1727             alias range = args[0];
1728             immutable len = range.length;
1729 
1730             static if (
1731                 Args.length > 1 &&
1732                 randAssignable!(Args[$ - 1]) &&
1733                 is(MapType!(Args[0], functions) : ElementType!(Args[$ - 1]))
1734                 )
1735             {
1736                 import std.conv : text;
1737                 import std.exception : enforce;
1738 
1739                 alias buf = args[$ - 1];
1740                 alias args2 = args[0..$ - 1];
1741                 alias Args2 = Args[0..$ - 1];
1742                 enforce(buf.length == len,
1743                         text("Can't use a user supplied buffer that's the wrong ",
1744                              "size.  (Expected  :", len, " Got:  ", buf.length));
1745             }
1746             else static if (randAssignable!(Args[$ - 1]) && Args.length > 1)
1747             {
1748                 static assert(0, "Wrong buffer type.");
1749             }
1750             else
1751             {
1752                 import std.array : uninitializedArray;
1753 
1754                 auto buf = uninitializedArray!(MapType!(Args[0], functions)[])(len);
1755                 alias args2 = args;
1756                 alias Args2 = Args;
1757             }
1758 
1759             if (!len) return buf;
1760 
1761             static if (isIntegral!(Args2[$ - 1]))
1762             {
1763                 static assert(args2.length == 2);
1764                 auto workUnitSize = cast(size_t) args2[1];
1765             }
1766             else
1767             {
1768                 static assert(args2.length == 1, Args);
1769                 auto workUnitSize = defaultWorkUnitSize(range.length);
1770             }
1771 
1772             alias R = typeof(range);
1773 
1774             if (workUnitSize > len)
1775             {
1776                 workUnitSize = len;
1777             }
1778 
1779             // Handle as a special case:
1780             if (size == 0)
1781             {
1782                 size_t index = 0;
1783                 foreach (elem; range)
1784                 {
1785                     emplaceRef(buf[index++], fun(elem));
1786                 }
1787                 return buf;
1788             }
1789 
1790             // Effectively -1:  chunkIndex + 1 == 0:
1791             shared size_t workUnitIndex = size_t.max;
1792             shared bool shouldContinue = true;
1793 
1794             void doIt()
1795             {
1796                 import std.algorithm.comparison : min;
1797 
1798                 scope(failure)
1799                 {
1800                     // If an exception is thrown, all threads should bail.
1801                     atomicStore(shouldContinue, false);
1802                 }
1803 
1804                 while (atomicLoad(shouldContinue))
1805                 {
1806                     immutable myUnitIndex = atomicOp!"+="(workUnitIndex, 1);
1807                     immutable start = workUnitSize * myUnitIndex;
1808                     if (start >= len)
1809                     {
1810                         atomicStore(shouldContinue, false);
1811                         break;
1812                     }
1813 
1814                     immutable end = min(len, start + workUnitSize);
1815 
1816                     static if (hasSlicing!R)
1817                     {
1818                         auto subrange = range[start .. end];
1819                         foreach (i; start .. end)
1820                         {
1821                             emplaceRef(buf[i], fun(subrange.front));
1822                             subrange.popFront();
1823                         }
1824                     }
1825                     else
1826                     {
1827                         foreach (i; start .. end)
1828                         {
1829                             emplaceRef(buf[i], fun(range[i]));
1830                         }
1831                     }
1832                 }
1833             }
1834 
1835             submitAndExecute(this, &doIt);
1836             return buf;
1837         }
1838     }
1839 
1840     ///
1841     template map(functions...)
1842     {
1843         /**
1844         A semi-lazy parallel map that can be used for pipelining.  The map
1845         functions are evaluated for the first `bufSize` elements and stored in a
1846         buffer and made available to `popFront`.  Meanwhile, in the
1847         background a second buffer of the same size is filled.  When the first
1848         buffer is exhausted, it is swapped with the second buffer and filled while
1849         the values from what was originally the second buffer are read.  This
1850         implementation allows for elements to be written to the buffer without
1851         the need for atomic operations or synchronization for each write, and
1852         enables the mapping function to be evaluated efficiently in parallel.
1853 
1854         `map` has more overhead than the simpler procedure used by `amap`
1855         but avoids the need to keep all results in memory simultaneously and works
1856         with non-random access ranges.
1857 
1858         Params:
1859 
1860         source = The $(REF_ALTTEXT input range, isInputRange, std,range,primitives)
1861         to be mapped.  If `source` is not random
1862         access it will be lazily buffered to an array of size `bufSize` before
1863         the map function is evaluated.  (For an exception to this rule, see Notes.)
1864 
1865         bufSize = The size of the buffer to store the evaluated elements.
1866 
1867         workUnitSize = The number of elements to evaluate in a single
1868         `Task`.  Must be less than or equal to `bufSize`, and
1869         should be a fraction of `bufSize` such that all worker threads can be
1870         used.  If the default of size_t.max is used, workUnitSize will be set to
1871         the pool-wide default.
1872 
1873         Returns:  An input range representing the results of the map.  This range
1874                   has a length iff `source` has a length.
1875 
1876         Notes:
1877 
1878         If a range returned by `map` or `asyncBuf` is used as an input to
1879         `map`, then as an optimization the copying from the output buffer
1880         of the first range to the input buffer of the second range is elided, even
1881         though the ranges returned by `map` and `asyncBuf` are non-random
1882         access ranges.  This means that the `bufSize` parameter passed to the
1883         current call to `map` will be ignored and the size of the buffer
1884         will be the buffer size of `source`.
1885 
1886         Example:
1887         ---
1888         // Pipeline reading a file, converting each line
1889         // to a number, taking the logarithms of the numbers,
1890         // and performing the additions necessary to find
1891         // the sum of the logarithms.
1892 
1893         auto lineRange = File("numberList.txt").byLine();
1894         auto dupedLines = std.algorithm.map!"a.idup"(lineRange);
1895         auto nums = taskPool.map!(to!double)(dupedLines);
1896         auto logs = taskPool.map!log10(nums);
1897 
1898         double sum = 0;
1899         foreach (elem; logs)
1900         {
1901             sum += elem;
1902         }
1903         ---
1904 
1905         $(B Exception Handling):
1906 
1907         Any exceptions thrown while iterating over `source`
1908         or computing the map function are re-thrown on a call to `popFront` or,
1909         if thrown during construction, are simply allowed to propagate to the
1910         caller.  In the case of exceptions thrown while computing the map function,
1911         the exceptions are chained as in `TaskPool.amap`.
1912         */
1913         auto
1914         map(S)(S source, size_t bufSize = 100, size_t workUnitSize = size_t.max)
1915         if (isInputRange!S)
1916         {
1917             import std.exception : enforce;
1918 
1919             enforce(workUnitSize == size_t.max || workUnitSize <= bufSize,
1920                     "Work unit size must be smaller than buffer size.");
1921             alias fun = adjoin!(staticMap!(unaryFun, functions));
1922 
1923             static final class Map
1924             {
1925                 // This is a class because the task needs to be located on the
1926                 // heap and in the non-random access case source needs to be on
1927                 // the heap, too.
1928 
1929             private:
1930                 enum bufferTrick = is(typeof(source.buf1)) &&
1931                 is(typeof(source.bufPos)) &&
1932                 is(typeof(source.doBufSwap()));
1933 
1934                 alias E = MapType!(S, functions);
1935                 E[] buf1, buf2;
1936                 S source;
1937                 TaskPool pool;
1938                 Task!(run, E[] delegate(E[]), E[]) nextBufTask;
1939                 size_t workUnitSize;
1940                 size_t bufPos;
1941                 bool lastTaskWaited;
1942 
1943             static if (isRandomAccessRange!S)
1944             {
1945                 alias FromType = S;
1946 
1947                 void popSource()
1948                 {
1949                     import std.algorithm.comparison : min;
1950 
1951                     static if (__traits(compiles, source[0 .. source.length]))
1952                     {
1953                         source = source[min(buf1.length, source.length)..source.length];
1954                     }
1955                     else static if (__traits(compiles, source[0..$]))
1956                     {
1957                         source = source[min(buf1.length, source.length)..$];
1958                     }
1959                     else
1960                     {
1961                         static assert(0, "S must have slicing for Map."
1962                                       ~ "  " ~ S.stringof ~ " doesn't.");
1963                     }
1964                 }
1965             }
1966             else static if (bufferTrick)
1967             {
1968                 // Make sure we don't have the buffer recycling overload of
1969                 // asyncBuf.
1970                 static if (
1971                     is(typeof(source.source)) &&
1972                     isRoundRobin!(typeof(source.source))
1973                 )
1974                 {
1975                     static assert(0, "Cannot execute a parallel map on " ~
1976                                   "the buffer recycling overload of asyncBuf."
1977                                  );
1978                 }
1979 
1980                 alias FromType = typeof(source.buf1);
1981                 FromType from;
1982 
1983                 // Just swap our input buffer with source's output buffer.
1984                 // No need to copy element by element.
1985                 FromType dumpToFrom()
1986                 {
1987                     import std.algorithm.mutation : swap;
1988 
1989                     assert(source.buf1.length <= from.length);
1990                     from.length = source.buf1.length;
1991                     swap(source.buf1, from);
1992 
1993                     // Just in case this source has been popped before
1994                     // being sent to map:
1995                     from = from[source.bufPos..$];
1996 
1997                     static if (is(typeof(source._length)))
1998                     {
1999                         source._length -= (from.length - source.bufPos);
2000                     }
2001 
2002                     source.doBufSwap();
2003 
2004                     return from;
2005                 }
2006             }
2007             else
2008             {
2009                 alias FromType = ElementType!S[];
2010 
2011                 // The temporary array that data is copied to before being
2012                 // mapped.
2013                 FromType from;
2014 
2015                 FromType dumpToFrom()
2016                 {
2017                     assert(from !is null);
2018 
2019                     size_t i;
2020                     for (; !source.empty && i < from.length; source.popFront())
2021                     {
2022                         from[i++] = source.front;
2023                     }
2024 
2025                     from = from[0 .. i];
2026                     return from;
2027                 }
2028             }
2029 
2030             static if (hasLength!S)
2031             {
2032                 size_t _length;
2033 
2034                 public @property size_t length() const @safe pure nothrow
2035                 {
2036                     return _length;
2037                 }
2038             }
2039 
2040                 this(S source, size_t bufSize, size_t workUnitSize, TaskPool pool)
2041                 {
2042                     static if (bufferTrick)
2043                     {
2044                         bufSize = source.buf1.length;
2045                     }
2046 
2047                     buf1.length = bufSize;
2048                     buf2.length = bufSize;
2049 
2050                     static if (!isRandomAccessRange!S)
2051                     {
2052                         from.length = bufSize;
2053                     }
2054 
2055                     this.workUnitSize = (workUnitSize == size_t.max) ?
2056                             pool.defaultWorkUnitSize(bufSize) : workUnitSize;
2057                     this.source = source;
2058                     this.pool = pool;
2059 
2060                     static if (hasLength!S)
2061                     {
2062                         _length = source.length;
2063                     }
2064 
2065                     buf1 = fillBuf(buf1);
2066                     submitBuf2();
2067                 }
2068 
2069                 // The from parameter is a dummy and ignored in the random access
2070                 // case.
2071                 E[] fillBuf(E[] buf)
2072                 {
2073                     import std.algorithm.comparison : min;
2074 
2075                     static if (isRandomAccessRange!S)
2076                     {
2077                         import std.range : take;
2078                         auto toMap = take(source, buf.length);
2079                         scope(success) popSource();
2080                     }
2081                     else
2082                     {
2083                         auto toMap = dumpToFrom();
2084                     }
2085 
2086                     buf = buf[0 .. min(buf.length, toMap.length)];
2087 
2088                     // Handle as a special case:
2089                     if (pool.size == 0)
2090                     {
2091                         size_t index = 0;
2092                         foreach (elem; toMap)
2093                         {
2094                             buf[index++] = fun(elem);
2095                         }
2096                         return buf;
2097                     }
2098 
2099                     pool.amap!functions(toMap, workUnitSize, buf);
2100 
2101                     return buf;
2102                 }
2103 
2104                 void submitBuf2()
2105                 in
2106                 {
2107                     assert(nextBufTask.prev is null);
2108                     assert(nextBufTask.next is null);
2109                 }
2110                 do
2111                 {
2112                     // Hack to reuse the task object.
2113 
2114                     nextBufTask = typeof(nextBufTask).init;
2115                     nextBufTask._args[0] = &fillBuf;
2116                     nextBufTask._args[1] = buf2;
2117                     pool.put(nextBufTask);
2118                 }
2119 
2120                 void doBufSwap()
2121                 {
2122                     if (lastTaskWaited)
2123                     {
2124                         // Then the source is empty.  Signal it here.
2125                         buf1 = null;
2126                         buf2 = null;
2127 
2128                         static if (!isRandomAccessRange!S)
2129                         {
2130                             from = null;
2131                         }
2132 
2133                         return;
2134                     }
2135 
2136                     buf2 = buf1;
2137                     buf1 = nextBufTask.yieldForce;
2138                     bufPos = 0;
2139 
2140                     if (source.empty)
2141                     {
2142                         lastTaskWaited = true;
2143                     }
2144                     else
2145                     {
2146                         submitBuf2();
2147                     }
2148                 }
2149 
2150             public:
2151                 @property auto front()
2152                 {
2153                     return buf1[bufPos];
2154                 }
2155 
2156                 void popFront()
2157                 {
2158                     static if (hasLength!S)
2159                     {
2160                         _length--;
2161                     }
2162 
2163                     bufPos++;
2164                     if (bufPos >= buf1.length)
2165                     {
2166                         doBufSwap();
2167                     }
2168                 }
2169 
2170                 static if (isInfinite!S)
2171                 {
2172                     enum bool empty = false;
2173                 }
2174                 else
2175                 {
2176 
2177                     bool empty() const @property
2178                     {
2179                         // popFront() sets this when source is empty
2180                         return buf1.length == 0;
2181                     }
2182                 }
2183             }
2184             return new Map(source, bufSize, workUnitSize, this);
2185         }
2186     }
2187 
2188     /**
2189     Given a `source` range that is expensive to iterate over, returns an
2190     $(REF_ALTTEXT input range, isInputRange, std,range,primitives) that
2191     asynchronously buffers the contents of `source` into a buffer of `bufSize` elements in a worker thread,
2192     while making previously buffered elements from a second buffer, also of size
2193     `bufSize`, available via the range interface of the returned
2194     object.  The returned range has a length iff `hasLength!S`.
2195     `asyncBuf` is useful, for example, when performing expensive operations
2196     on the elements of ranges that represent data on a disk or network.
2197 
2198     Example:
2199     ---
2200     import std.conv, std.stdio;
2201 
2202     void main()
2203     {
2204         // Fetch lines of a file in a background thread
2205         // while processing previously fetched lines,
2206         // dealing with byLine's buffer recycling by
2207         // eagerly duplicating every line.
2208         auto lines = File("foo.txt").byLine();
2209         auto duped = std.algorithm.map!"a.idup"(lines);
2210 
2211         // Fetch more lines in the background while we
2212         // process the lines already read into memory
2213         // into a matrix of doubles.
2214         double[][] matrix;
2215         auto asyncReader = taskPool.asyncBuf(duped);
2216 
2217         foreach (line; asyncReader)
2218         {
2219             auto ls = line.split("\t");
2220             matrix ~= to!(double[])(ls);
2221         }
2222     }
2223     ---
2224 
2225     $(B Exception Handling):
2226 
2227     Any exceptions thrown while iterating over `source` are re-thrown on a
2228     call to `popFront` or, if thrown during construction, simply
2229     allowed to propagate to the caller.
2230     */
2231     auto asyncBuf(S)(S source, size_t bufSize = 100) if (isInputRange!S)
2232     {
2233         static final class AsyncBuf
2234         {
2235             // This is a class because the task and source both need to be on
2236             // the heap.
2237 
2238             // The element type of S.
2239             alias E = ElementType!S;  // Needs to be here b/c of forward ref bugs.
2240 
2241         private:
2242             E[] buf1, buf2;
2243             S source;
2244             TaskPool pool;
2245             Task!(run, E[] delegate(E[]), E[]) nextBufTask;
2246             size_t bufPos;
2247             bool lastTaskWaited;
2248 
2249             static if (hasLength!S)
2250             {
2251                 size_t _length;
2252 
2253                 // Available if hasLength!S.
2254                 public @property size_t length() const @safe pure nothrow
2255                 {
2256                     return _length;
2257                 }
2258             }
2259 
2260             this(S source, size_t bufSize, TaskPool pool)
2261             {
2262                 buf1.length = bufSize;
2263                 buf2.length = bufSize;
2264 
2265                 this.source = source;
2266                 this.pool = pool;
2267 
2268                 static if (hasLength!S)
2269                 {
2270                     _length = source.length;
2271                 }
2272 
2273                 buf1 = fillBuf(buf1);
2274                 submitBuf2();
2275             }
2276 
2277             E[] fillBuf(E[] buf)
2278             {
2279                 assert(buf !is null);
2280 
2281                 size_t i;
2282                 for (; !source.empty && i < buf.length; source.popFront())
2283                 {
2284                     buf[i++] = source.front;
2285                 }
2286 
2287                 buf = buf[0 .. i];
2288                 return buf;
2289             }
2290 
2291             void submitBuf2()
2292             in
2293             {
2294                 assert(nextBufTask.prev is null);
2295                 assert(nextBufTask.next is null);
2296             }
2297             do
2298             {
2299                 // Hack to reuse the task object.
2300 
2301                 nextBufTask = typeof(nextBufTask).init;
2302                 nextBufTask._args[0] = &fillBuf;
2303                 nextBufTask._args[1] = buf2;
2304                 pool.put(nextBufTask);
2305             }
2306 
2307             void doBufSwap()
2308             {
2309                 if (lastTaskWaited)
2310                 {
2311                     // Then source is empty.  Signal it here.
2312                     buf1 = null;
2313                     buf2 = null;
2314                     return;
2315                 }
2316 
2317                 buf2 = buf1;
2318                 buf1 = nextBufTask.yieldForce;
2319                 bufPos = 0;
2320 
2321                 if (source.empty)
2322                 {
2323                     lastTaskWaited = true;
2324                 }
2325                 else
2326                 {
2327                     submitBuf2();
2328                 }
2329             }
2330 
2331         public:
2332             E front() @property
2333             {
2334                 return buf1[bufPos];
2335             }
2336 
2337             void popFront()
2338             {
2339                 static if (hasLength!S)
2340                 {
2341                     _length--;
2342                 }
2343 
2344                 bufPos++;
2345                 if (bufPos >= buf1.length)
2346                 {
2347                     doBufSwap();
2348                 }
2349             }
2350 
2351             static if (isInfinite!S)
2352             {
2353                 enum bool empty = false;
2354             }
2355 
2356             else
2357             {
2358                 ///
2359                 bool empty() @property
2360                 {
2361                     // popFront() sets this when source is empty:
2362                     return buf1.length == 0;
2363                 }
2364             }
2365         }
2366         return new AsyncBuf(source, bufSize, this);
2367     }
2368 
2369     /**
2370     Given a callable object `next` that writes to a user-provided buffer and
2371     a second callable object `empty` that determines whether more data is
2372     available to write via `next`, returns an input range that
2373     asynchronously calls `next` with a set of size `nBuffers` of buffers
2374     and makes the results available in the order they were obtained via the
2375     input range interface of the returned object.  Similarly to the
2376     input range overload of `asyncBuf`, the first half of the buffers
2377     are made available via the range interface while the second half are
2378     filled and vice-versa.
2379 
2380     Params:
2381 
2382     next = A callable object that takes a single argument that must be an array
2383            with mutable elements.  When called, `next` writes data to
2384            the array provided by the caller.
2385 
2386     empty = A callable object that takes no arguments and returns a type
2387             implicitly convertible to `bool`.  This is used to signify
2388             that no more data is available to be obtained by calling `next`.
2389 
2390     initialBufSize = The initial size of each buffer.  If `next` takes its
2391                      array by reference, it may resize the buffers.
2392 
2393     nBuffers = The number of buffers to cycle through when calling `next`.
2394 
2395     Example:
2396     ---
2397     // Fetch lines of a file in a background
2398     // thread while processing previously fetched
2399     // lines, without duplicating any lines.
2400     auto file = File("foo.txt");
2401 
2402     void next(ref char[] buf)
2403     {
2404         file.readln(buf);
2405     }
2406 
2407     // Fetch more lines in the background while we
2408     // process the lines already read into memory
2409     // into a matrix of doubles.
2410     double[][] matrix;
2411     auto asyncReader = taskPool.asyncBuf(&next, &file.eof);
2412 
2413     foreach (line; asyncReader)
2414     {
2415         auto ls = line.split("\t");
2416         matrix ~= to!(double[])(ls);
2417     }
2418     ---
2419 
2420     $(B Exception Handling):
2421 
2422     Any exceptions thrown while iterating over `range` are re-thrown on a
2423     call to `popFront`.
2424 
2425     Warning:
2426 
2427     Using the range returned by this function in a parallel foreach loop
2428     will not work because buffers may be overwritten while the task that
2429     processes them is in queue.  This is checked for at compile time
2430     and will result in a static assertion failure.
2431     */
2432     auto asyncBuf(C1, C2)(C1 next, C2 empty, size_t initialBufSize = 0, size_t nBuffers = 100)
2433     if (is(typeof(C2.init()) : bool) &&
2434         Parameters!C1.length == 1 &&
2435         Parameters!C2.length == 0 &&
2436         isArray!(Parameters!C1[0])
2437     ) {
2438         auto roundRobin = RoundRobinBuffer!(C1, C2)(next, empty, initialBufSize, nBuffers);
2439         return asyncBuf(roundRobin, nBuffers / 2);
2440     }
2441 
2442     ///
2443     template reduce(functions...)
2444     {
2445         /**
2446         Parallel reduce on a random access range.  Except as otherwise noted,
2447         usage is similar to $(REF _reduce, std,algorithm,iteration).  There is
2448         also $(LREF fold) which does the same thing with a different parameter
2449         order.
2450 
2451         This function works by splitting the range to be reduced into work
2452         units, which are slices to be reduced in parallel.  Once the results
2453         from all work units are computed, a final serial reduction is performed
2454         on these results to compute the final answer. Therefore, care must be
2455         taken to choose the seed value appropriately.
2456 
2457         Because the reduction is being performed in parallel, `functions`
2458         must be associative.  For notational simplicity, let # be an
2459         infix operator representing `functions`.  Then, (a # b) # c must equal
2460         a # (b # c).  Floating point addition is not associative
2461         even though addition in exact arithmetic is.  Summing floating
2462         point numbers using this function may give different results than summing
2463         serially.  However, for many practical purposes floating point addition
2464         can be treated as associative.
2465 
2466         Note that, since `functions` are assumed to be associative,
2467         additional optimizations are made to the serial portion of the reduction
2468         algorithm. These take advantage of the instruction level parallelism of
2469         modern CPUs, in addition to the thread-level parallelism that the rest
2470         of this module exploits.  This can lead to better than linear speedups
2471         relative to $(REF _reduce, std,algorithm,iteration), especially for
2472         fine-grained benchmarks like dot products.
2473 
2474         An explicit seed may be provided as the first argument.  If
2475         provided, it is used as the seed for all work units and for the final
2476         reduction of results from all work units.  Therefore, if it is not the
2477         identity value for the operation being performed, results may differ
2478         from those generated by $(REF _reduce, std,algorithm,iteration) or
2479         depending on how many work units are used.  The next argument must be
2480         the range to be reduced.
2481         ---
2482         // Find the sum of squares of a range in parallel, using
2483         // an explicit seed.
2484         //
2485         // Timings on an Athlon 64 X2 dual core machine:
2486         //
2487         // Parallel reduce:                     72 milliseconds
2488         // Using std.algorithm.reduce instead:  181 milliseconds
2489         auto nums = iota(10_000_000.0f);
2490         auto sumSquares = taskPool.reduce!"a + b"(
2491             0.0, std.algorithm.map!"a * a"(nums)
2492         );
2493         ---
2494 
2495         If no explicit seed is provided, the first element of each work unit
2496         is used as a seed.  For the final reduction, the result from the first
2497         work unit is used as the seed.
2498         ---
2499         // Find the sum of a range in parallel, using the first
2500         // element of each work unit as the seed.
2501         auto sum = taskPool.reduce!"a + b"(nums);
2502         ---
2503 
2504         An explicit work unit size may be specified as the last argument.
2505         Specifying too small a work unit size will effectively serialize the
2506         reduction, as the final reduction of the result of each work unit will
2507         dominate computation time.  If `TaskPool.size` for this instance
2508         is zero, this parameter is ignored and one work unit is used.
2509         ---
2510         // Use a work unit size of 100.
2511         auto sum2 = taskPool.reduce!"a + b"(nums, 100);
2512 
2513         // Work unit size of 100 and explicit seed.
2514         auto sum3 = taskPool.reduce!"a + b"(0.0, nums, 100);
2515         ---
2516 
2517         Parallel reduce supports multiple functions, like
2518         `std.algorithm.reduce`.
2519         ---
2520         // Find both the min and max of nums.
2521         auto minMax = taskPool.reduce!(min, max)(nums);
2522         assert(minMax[0] == reduce!min(nums));
2523         assert(minMax[1] == reduce!max(nums));
2524         ---
2525 
2526         $(B Exception Handling):
2527 
2528         After this function is finished executing, any exceptions thrown
2529         are chained together via `Throwable.next` and rethrown.  The chaining
2530         order is non-deterministic.
2531 
2532         See_Also:
2533 
2534             $(LREF fold) is functionally equivalent to $(LREF _reduce) except the
2535             range parameter comes first and there is no need to use
2536             $(REF_ALTTEXT `tuple`,tuple,std,typecons) for multiple seeds.
2537          */
2538         auto reduce(Args...)(Args args)
2539         {
2540             import core.exception : OutOfMemoryError;
2541             import std.conv : emplaceRef;
2542             import std.exception : enforce;
2543 
2544             alias fun = reduceAdjoin!functions;
2545             alias finishFun = reduceFinish!functions;
2546 
2547             static if (isIntegral!(Args[$ - 1]))
2548             {
2549                 size_t workUnitSize = cast(size_t) args[$ - 1];
2550                 alias args2 = args[0..$ - 1];
2551                 alias Args2 = Args[0..$ - 1];
2552             }
2553             else
2554             {
2555                 alias args2 = args;
2556                 alias Args2 = Args;
2557             }
2558 
2559             auto makeStartValue(Type)(Type e)
2560             {
2561                 static if (functions.length == 1)
2562                 {
2563                     return e;
2564                 }
2565                 else
2566                 {
2567                     typeof(adjoin!(staticMap!(binaryFun, functions))(e, e)) seed = void;
2568                     foreach (i, T; seed.Types)
2569                     {
2570                         emplaceRef(seed.expand[i], e);
2571                     }
2572 
2573                     return seed;
2574                 }
2575             }
2576 
2577             static if (args2.length == 2)
2578             {
2579                 static assert(isInputRange!(Args2[1]));
2580                 alias range = args2[1];
2581                 alias seed = args2[0];
2582                 enum explicitSeed = true;
2583 
2584                 static if (!is(typeof(workUnitSize)))
2585                 {
2586                     size_t workUnitSize = defaultWorkUnitSize(range.length);
2587                 }
2588             }
2589             else
2590             {
2591                 static assert(args2.length == 1);
2592                 alias range = args2[0];
2593 
2594                 static if (!is(typeof(workUnitSize)))
2595                 {
2596                     size_t workUnitSize = defaultWorkUnitSize(range.length);
2597                 }
2598 
2599                 enforce(!range.empty,
2600                     "Cannot reduce an empty range with first element as start value.");
2601 
2602                 auto seed = makeStartValue(range.front);
2603                 enum explicitSeed = false;
2604                 range.popFront();
2605             }
2606 
2607             alias E = typeof(seed);
2608             alias R = typeof(range);
2609 
2610             E reduceOnRange(R range, size_t lowerBound, size_t upperBound)
2611             {
2612                 // This is for exploiting instruction level parallelism by
2613                 // using multiple accumulator variables within each thread,
2614                 // since we're assuming functions are associative anyhow.
2615 
2616                 // This is so that loops can be unrolled automatically.
2617                 enum ilpTuple = AliasSeq!(0, 1, 2, 3, 4, 5);
2618                 enum nILP = ilpTuple.length;
2619                 immutable subSize = (upperBound - lowerBound) / nILP;
2620 
2621                 if (subSize <= 1)
2622                 {
2623                     // Handle as a special case.
2624                     static if (explicitSeed)
2625                     {
2626                         E result = seed;
2627                     }
2628                     else
2629                     {
2630                         E result = makeStartValue(range[lowerBound]);
2631                         lowerBound++;
2632                     }
2633 
2634                     foreach (i; lowerBound .. upperBound)
2635                     {
2636                         result = fun(result, range[i]);
2637                     }
2638 
2639                     return result;
2640                 }
2641 
2642                 assert(subSize > 1);
2643                 E[nILP] results;
2644                 size_t[nILP] offsets;
2645 
2646                 foreach (i; ilpTuple)
2647                 {
2648                     offsets[i] = lowerBound + subSize * i;
2649 
2650                     static if (explicitSeed)
2651                     {
2652                         results[i] = seed;
2653                     }
2654                     else
2655                     {
2656                         results[i] = makeStartValue(range[offsets[i]]);
2657                         offsets[i]++;
2658                     }
2659                 }
2660 
2661                 immutable nLoop = subSize - (!explicitSeed);
2662                 foreach (i; 0 .. nLoop)
2663                 {
2664                     foreach (j; ilpTuple)
2665                     {
2666                         results[j] = fun(results[j], range[offsets[j]]);
2667                         offsets[j]++;
2668                     }
2669                 }
2670 
2671                 // Finish the remainder.
2672                 foreach (i; nILP * subSize + lowerBound .. upperBound)
2673                 {
2674                     results[$ - 1] = fun(results[$ - 1], range[i]);
2675                 }
2676 
2677                 foreach (i; ilpTuple[1..$])
2678                 {
2679                     results[0] = finishFun(results[0], results[i]);
2680                 }
2681 
2682                 return results[0];
2683             }
2684 
2685             immutable len = range.length;
2686             if (len == 0)
2687             {
2688                 return seed;
2689             }
2690 
2691             if (this.size == 0)
2692             {
2693                 return finishFun(seed, reduceOnRange(range, 0, len));
2694             }
2695 
2696             // Unlike the rest of the functions here, I can't use the Task object
2697             // recycling trick here because this has to work on non-commutative
2698             // operations.  After all the tasks are done executing, fun() has to
2699             // be applied on the results of these to get a final result, but
2700             // it can't be evaluated out of order.
2701 
2702             if (workUnitSize > len)
2703             {
2704                 workUnitSize = len;
2705             }
2706 
2707             immutable size_t nWorkUnits = (len / workUnitSize) + ((len % workUnitSize == 0) ? 0 : 1);
2708             assert(nWorkUnits * workUnitSize >= len);
2709 
2710             alias RTask = Task!(run, typeof(&reduceOnRange), R, size_t, size_t);
2711             RTask[] tasks;
2712 
2713             // Can't use alloca() due to https://issues.dlang.org/show_bug.cgi?id=3753
2714             // Use a fixed buffer backed by malloc().
2715             enum maxStack = 2_048;
2716             byte[maxStack] buf = void;
2717             immutable size_t nBytesNeeded = nWorkUnits * RTask.sizeof;
2718 
2719             import core.stdc.stdlib : malloc, free;
2720             if (nBytesNeeded < maxStack)
2721             {
2722                 tasks = (cast(RTask*) buf.ptr)[0 .. nWorkUnits];
2723             }
2724             else
2725             {
2726                 auto ptr = cast(RTask*) malloc(nBytesNeeded);
2727                 if (!ptr)
2728                 {
2729                     throw new OutOfMemoryError(
2730                         "Out of memory in std.parallelism."
2731                     );
2732                 }
2733 
2734                 tasks = ptr[0 .. nWorkUnits];
2735             }
2736 
2737             scope(exit)
2738             {
2739                 if (nBytesNeeded > maxStack)
2740                 {
2741                     free(tasks.ptr);
2742                 }
2743             }
2744 
2745             foreach (ref t; tasks[])
2746                 emplaceRef(t, RTask());
2747 
2748             // Hack to take the address of a nested function w/o
2749             // making a closure.
2750             static auto scopedAddress(D)(scope D del) @system
2751             {
2752                 auto tmp = del;
2753                 return tmp;
2754             }
2755 
2756             size_t curPos = 0;
2757             void useTask(ref RTask task)
2758             {
2759                 import std.algorithm.comparison : min;
2760 
2761                 task.pool = this;
2762                 task._args[0] = scopedAddress(&reduceOnRange);
2763                 task._args[3] = min(len, curPos + workUnitSize);  // upper bound.
2764                 task._args[1] = range;  // range
2765                 task._args[2] = curPos; // lower bound.
2766 
2767                 curPos += workUnitSize;
2768             }
2769 
2770             foreach (ref task; tasks)
2771             {
2772                 useTask(task);
2773             }
2774 
2775             foreach (i; 1 .. tasks.length - 1)
2776             {
2777                 tasks[i].next = tasks[i + 1].basePtr;
2778                 tasks[i + 1].prev = tasks[i].basePtr;
2779             }
2780 
2781             if (tasks.length > 1)
2782             {
2783                 queueLock();
2784                 scope(exit) queueUnlock();
2785 
2786                 abstractPutGroupNoSync(
2787                     tasks[1].basePtr,
2788                     tasks[$ - 1].basePtr
2789                 );
2790             }
2791 
2792             if (tasks.length > 0)
2793             {
2794                 try
2795                 {
2796                     tasks[0].job();
2797                 }
2798                 catch (Throwable e)
2799                 {
2800                     tasks[0].exception = e;
2801                 }
2802                 tasks[0].taskStatus = TaskStatus.done;
2803 
2804                 // Try to execute each of these in the current thread
2805                 foreach (ref task; tasks[1..$])
2806                 {
2807                     tryDeleteExecute(task.basePtr);
2808                 }
2809             }
2810 
2811             // Now that we've tried to execute every task, they're all either
2812             // done or in progress.  Force all of them.
2813             E result = seed;
2814 
2815             Throwable firstException;
2816 
2817             foreach (ref task; tasks)
2818             {
2819                 try
2820                 {
2821                     task.yieldForce;
2822                 }
2823                 catch (Throwable e)
2824                 {
2825                     /* Chain e to front because order doesn't matter and because
2826                      * e is not likely to be a chain itself (so fewer traversals)
2827                      */
2828                     firstException = Throwable.chainTogether(e, firstException);
2829                     continue;
2830                 }
2831 
2832                 if (!firstException) result = finishFun(result, task.returnVal);
2833             }
2834 
2835             if (firstException) throw firstException;
2836 
2837             return result;
2838         }
2839     }
2840 
2841     ///
2842     template fold(functions...)
2843     {
2844         /** Implements the homonym function (also known as `accumulate`, `compress`,
2845             `inject`, or `foldl`) present in various programming languages of
2846             functional flavor.
2847 
2848             `fold` is functionally equivalent to $(LREF reduce) except the range
2849             parameter comes first and there is no need to use $(REF_ALTTEXT
2850             `tuple`,tuple,std,typecons) for multiple seeds.
2851 
2852             There may be one or more callable entities (`functions` argument) to
2853             apply.
2854 
2855             Params:
2856                 args = Just the range to _fold over; or the range and one seed
2857                        per function; or the range, one seed per function, and
2858                        the work unit size
2859 
2860             Returns:
2861                 The accumulated result as a single value for single function and
2862                 as a tuple of values for multiple functions
2863 
2864             See_Also:
2865             Similar to $(REF _fold, std,algorithm,iteration), `fold` is a wrapper around $(LREF reduce).
2866 
2867             Example:
2868             ---
2869             static int adder(int a, int b)
2870             {
2871                 return a + b;
2872             }
2873             static int multiplier(int a, int b)
2874             {
2875                 return a * b;
2876             }
2877 
2878             // Just the range
2879             auto x = taskPool.fold!adder([1, 2, 3, 4]);
2880             assert(x == 10);
2881 
2882             // The range and the seeds (0 and 1 below; also note multiple
2883             // functions in this example)
2884             auto y = taskPool.fold!(adder, multiplier)([1, 2, 3, 4], 0, 1);
2885             assert(y[0] == 10);
2886             assert(y[1] == 24);
2887 
2888             // The range, the seed (0), and the work unit size (20)
2889             auto z = taskPool.fold!adder([1, 2, 3, 4], 0, 20);
2890             assert(z == 10);
2891             ---
2892         */
2893         auto fold(Args...)(Args args)
2894         {
2895             static assert(isInputRange!(Args[0]), "First argument must be an InputRange");
2896 
2897             alias range = args[0];
2898 
2899             static if (Args.length == 1)
2900             {
2901                 // Just the range
2902                 return reduce!functions(range);
2903             }
2904             else static if (Args.length == 1 + functions.length ||
2905                             Args.length == 1 + functions.length + 1)
2906             {
2907                 static if (functions.length == 1)
2908                 {
2909                     alias seeds = args[1];
2910                 }
2911                 else
2912                 {
2913                     auto seeds()
2914                     {
2915                         import std.typecons : tuple;
2916                         return tuple(args[1 .. functions.length+1]);
2917                     }
2918                 }
2919 
2920                 static if (Args.length == 1 + functions.length)
2921                 {
2922                     // The range and the seeds
2923                     return reduce!functions(seeds, range);
2924                 }
2925                 else static if (Args.length == 1 + functions.length + 1)
2926                 {
2927                     // The range, the seeds, and the work unit size
2928                     static assert(isIntegral!(Args[$-1]), "Work unit size must be an integral type");
2929                     return reduce!functions(seeds, range, args[$-1]);
2930                 }
2931             }
2932             else
2933             {
2934                 import std.conv : text;
2935                 static assert(0, "Invalid number of arguments (" ~ Args.length.text ~ "): Should be an input range, "
2936                               ~ functions.length.text ~ " optional seed(s), and an optional work unit size.");
2937             }
2938         }
2939     }
2940 
2941     // This test is not included in the documentation because even though these
2942     // examples are for the inner fold() template, with their current location,
2943     // they would appear under the outer one. (We can't move this inside the
2944     // outer fold() template because then dmd runs out of memory possibly due to
2945     // recursive template instantiation, which is surprisingly not caught.)
2946     @system unittest
2947     {
2948         // Just the range
2949         auto x = taskPool.fold!"a + b"([1, 2, 3, 4]);
2950         assert(x == 10);
2951 
2952         // The range and the seeds (0 and 1 below; also note multiple
2953         // functions in this example)
2954         auto y = taskPool.fold!("a + b", "a * b")([1, 2, 3, 4], 0, 1);
2955         assert(y[0] == 10);
2956         assert(y[1] == 24);
2957 
2958         // The range, the seed (0), and the work unit size (20)
2959         auto z = taskPool.fold!"a + b"([1, 2, 3, 4], 0, 20);
2960         assert(z == 10);
2961     }
2962 
2963     /**
2964     Gets the index of the current thread relative to this `TaskPool`.  Any
2965     thread not in this pool will receive an index of 0.  The worker threads in
2966     this pool receive unique indices of 1 through `this.size`.
2967 
2968     This function is useful for maintaining worker-local resources.
2969 
2970     Example:
2971     ---
2972     // Execute a loop that computes the greatest common
2973     // divisor of every number from 0 through 999 with
2974     // 42 in parallel.  Write the results out to
2975     // a set of files, one for each thread.  This allows
2976     // results to be written out without any synchronization.
2977 
2978     import std.conv, std.range, std.numeric, std.stdio;
2979 
2980     void main()
2981     {
2982         auto filesHandles = new File[taskPool.size + 1];
2983         scope(exit) {
2984             foreach (ref handle; fileHandles)
2985             {
2986                 handle.close();
2987             }
2988         }
2989 
2990         foreach (i, ref handle; fileHandles)
2991         {
2992             handle = File("workerResults" ~ to!string(i) ~ ".txt");
2993         }
2994 
2995         foreach (num; parallel(iota(1_000)))
2996         {
2997             auto outHandle = fileHandles[taskPool.workerIndex];
2998             outHandle.writeln(num, '\t', gcd(num, 42));
2999         }
3000     }
3001     ---
3002     */
3003     size_t workerIndex() @property @safe const nothrow
3004     {
3005         immutable rawInd = threadIndex;
3006         return (rawInd >= instanceStartIndex && rawInd < instanceStartIndex + size) ?
3007                 (rawInd - instanceStartIndex + 1) : 0;
3008     }
3009 
3010     /**
3011     Struct for creating worker-local storage.  Worker-local storage is
3012     thread-local storage that exists only for worker threads in a given
3013     `TaskPool` plus a single thread outside the pool.  It is allocated on the
3014     garbage collected heap in a way that avoids _false sharing, and doesn't
3015     necessarily have global scope within any thread.  It can be accessed from
3016     any worker thread in the `TaskPool` that created it, and one thread
3017     outside this `TaskPool`.  All threads outside the pool that created a
3018     given instance of worker-local storage share a single slot.
3019 
3020     Since the underlying data for this struct is heap-allocated, this struct
3021     has reference semantics when passed between functions.
3022 
3023     The main uses cases for `WorkerLocalStorageStorage` are:
3024 
3025     1.  Performing parallel reductions with an imperative, as opposed to
3026         functional, programming style.  In this case, it's useful to treat
3027         `WorkerLocalStorageStorage` as local to each thread for only the parallel
3028         portion of an algorithm.
3029 
3030     2.  Recycling temporary buffers across iterations of a parallel foreach loop.
3031 
3032     Example:
3033     ---
3034     // Calculate pi as in our synopsis example, but
3035     // use an imperative instead of a functional style.
3036     immutable n = 1_000_000_000;
3037     immutable delta = 1.0L / n;
3038 
3039     auto sums = taskPool.workerLocalStorage(0.0L);
3040     foreach (i; parallel(iota(n)))
3041     {
3042         immutable x = ( i - 0.5L ) * delta;
3043         immutable toAdd = delta / ( 1.0 + x * x );
3044         sums.get += toAdd;
3045     }
3046 
3047     // Add up the results from each worker thread.
3048     real pi = 0;
3049     foreach (threadResult; sums.toRange)
3050     {
3051         pi += 4.0L * threadResult;
3052     }
3053     ---
3054      */
3055     static struct WorkerLocalStorage(T)
3056     {
3057     private:
3058         TaskPool pool;
3059         size_t size;
3060 
3061         size_t elemSize;
3062         bool* stillThreadLocal;
3063 
3064         static size_t roundToLine(size_t num) pure nothrow
3065         {
3066             if (num % cacheLineSize == 0)
3067             {
3068                 return num;
3069             }
3070             else
3071             {
3072                 return ((num / cacheLineSize) + 1) * cacheLineSize;
3073             }
3074         }
3075 
3076         void* data;
3077 
3078         void initialize(TaskPool pool)
3079         {
3080             this.pool = pool;
3081             size = pool.size + 1;
3082             stillThreadLocal = new bool;
3083             *stillThreadLocal = true;
3084 
3085             // Determines whether the GC should scan the array.
3086             auto blkInfo = (typeid(T).flags & 1) ?
3087                            cast(GC.BlkAttr) 0 :
3088                            GC.BlkAttr.NO_SCAN;
3089 
3090             immutable nElem = pool.size + 1;
3091             elemSize = roundToLine(T.sizeof);
3092 
3093             // The + 3 is to pad one full cache line worth of space on either side
3094             // of the data structure to make sure false sharing with completely
3095             // unrelated heap data is prevented, and to provide enough padding to
3096             // make sure that data is cache line-aligned.
3097             data = GC.malloc(elemSize * (nElem + 3), blkInfo) + elemSize;
3098 
3099             // Cache line align data ptr.
3100             data = cast(void*) roundToLine(cast(size_t) data);
3101 
3102             foreach (i; 0 .. nElem)
3103             {
3104                 this.opIndex(i) = T.init;
3105             }
3106         }
3107 
3108         ref opIndex(this Qualified)(size_t index)
3109         {
3110             import std.conv : text;
3111             assert(index < size, text(index, '\t', uint.max));
3112             return *(cast(CopyTypeQualifiers!(Qualified, T)*) (data + elemSize * index));
3113         }
3114 
3115         void opIndexAssign(T val, size_t index)
3116         {
3117             assert(index < size);
3118             *(cast(T*) (data + elemSize * index)) = val;
3119         }
3120 
3121     public:
3122         /**
3123         Get the current thread's instance.  Returns by ref.
3124         Note that calling `get` from any thread
3125         outside the `TaskPool` that created this instance will return the
3126         same reference, so an instance of worker-local storage should only be
3127         accessed from one thread outside the pool that created it.  If this
3128         rule is violated, undefined behavior will result.
3129 
3130         If assertions are enabled and `toRange` has been called, then this
3131         WorkerLocalStorage instance is no longer worker-local and an assertion
3132         failure will result when calling this method.  This is not checked
3133         when assertions are disabled for performance reasons.
3134          */
3135         ref get(this Qualified)() @property
3136         {
3137             assert(*stillThreadLocal,
3138                 "Cannot call get() on this instance of WorkerLocalStorage " ~
3139                 "because it is no longer worker-local."
3140             );
3141             return opIndex(pool.workerIndex);
3142         }
3143 
3144         /**
3145         Assign a value to the current thread's instance.  This function has
3146         the same caveats as its overload.
3147         */
3148         void get(T val) @property
3149         {
3150             assert(*stillThreadLocal,
3151                 "Cannot call get() on this instance of WorkerLocalStorage " ~
3152                 "because it is no longer worker-local."
3153             );
3154 
3155             opIndexAssign(val, pool.workerIndex);
3156         }
3157 
3158         /**
3159         Returns a range view of the values for all threads, which can be used
3160         to further process the results of each thread after running the parallel
3161         part of your algorithm.  Do not use this method in the parallel portion
3162         of your algorithm.
3163 
3164         Calling this function sets a flag indicating that this struct is no
3165         longer worker-local, and attempting to use the `get` method again
3166         will result in an assertion failure if assertions are enabled.
3167          */
3168         WorkerLocalStorageRange!T toRange() @property
3169         {
3170             if (*stillThreadLocal)
3171             {
3172                 *stillThreadLocal = false;
3173 
3174                 // Make absolutely sure results are visible to all threads.
3175                 // This is probably not necessary since some other
3176                 // synchronization primitive will be used to signal that the
3177                 // parallel part of the algorithm is done, but the
3178                 // performance impact should be negligible, so it's better
3179                 // to be safe.
3180                 ubyte barrierDummy;
3181                 atomicSetUbyte(barrierDummy, 1);
3182             }
3183 
3184             return WorkerLocalStorageRange!T(this);
3185         }
3186     }
3187 
3188     /**
3189     Range primitives for worker-local storage.  The purpose of this is to
3190     access results produced by each worker thread from a single thread once you
3191     are no longer using the worker-local storage from multiple threads.
3192     Do not use this struct in the parallel portion of your algorithm.
3193 
3194     The proper way to instantiate this object is to call
3195     `WorkerLocalStorage.toRange`.  Once instantiated, this object behaves
3196     as a finite random-access range with assignable, lvalue elements and
3197     a length equal to the number of worker threads in the `TaskPool` that
3198     created it plus 1.
3199      */
3200     static struct WorkerLocalStorageRange(T)
3201     {
3202     private:
3203         WorkerLocalStorage!T workerLocalStorage;
3204 
3205         size_t _length;
3206         size_t beginOffset;
3207 
3208         this(WorkerLocalStorage!T wl)
3209         {
3210             this.workerLocalStorage = wl;
3211             _length = wl.size;
3212         }
3213 
3214     public:
3215         ref front(this Qualified)() @property
3216         {
3217             return this[0];
3218         }
3219 
3220         ref back(this Qualified)() @property
3221         {
3222             return this[_length - 1];
3223         }
3224 
3225         void popFront()
3226         {
3227             if (_length > 0)
3228             {
3229                 beginOffset++;
3230                 _length--;
3231             }
3232         }
3233 
3234         void popBack()
3235         {
3236             if (_length > 0)
3237             {
3238                 _length--;
3239             }
3240         }
3241 
3242         typeof(this) save() @property
3243         {
3244             return this;
3245         }
3246 
3247         ref opIndex(this Qualified)(size_t index)
3248         {
3249             assert(index < _length);
3250             return workerLocalStorage[index + beginOffset];
3251         }
3252 
3253         void opIndexAssign(T val, size_t index)
3254         {
3255             assert(index < _length);
3256             workerLocalStorage[index] = val;
3257         }
3258 
3259         typeof(this) opSlice(size_t lower, size_t upper)
3260         {
3261             assert(upper <= _length);
3262             auto newWl = this.workerLocalStorage;
3263             newWl.data += lower * newWl.elemSize;
3264             newWl.size = upper - lower;
3265             return typeof(this)(newWl);
3266         }
3267 
3268         bool empty() const @property
3269         {
3270             return length == 0;
3271         }
3272 
3273         size_t length() const @property
3274         {
3275             return _length;
3276         }
3277     }
3278 
3279     /**
3280     Creates an instance of worker-local storage, initialized with a given
3281     value.  The value is `lazy` so that you can, for example, easily
3282     create one instance of a class for each worker.  For usage example,
3283     see the `WorkerLocalStorage` struct.
3284      */
3285     WorkerLocalStorage!T workerLocalStorage(T)(lazy T initialVal = T.init)
3286     {
3287         WorkerLocalStorage!T ret;
3288         ret.initialize(this);
3289         foreach (i; 0 .. size + 1)
3290         {
3291             ret[i] = initialVal;
3292         }
3293 
3294         // Memory barrier to make absolutely sure that what we wrote is
3295         // visible to worker threads.
3296         ubyte barrierDummy;
3297         atomicSetUbyte(barrierDummy, 0);
3298 
3299         return ret;
3300     }
3301 
3302     /**
3303     Signals to all worker threads to terminate as soon as they are finished
3304     with their current `Task`, or immediately if they are not executing a
3305     `Task`.  `Task`s that were in queue will not be executed unless
3306     a call to `Task.workForce`, `Task.yieldForce` or `Task.spinForce`
3307     causes them to be executed.
3308 
3309     Use only if you have waited on every `Task` and therefore know the
3310     queue is empty, or if you speculatively executed some tasks and no longer
3311     need the results.
3312      */
3313     void stop() @trusted
3314     {
3315         queueLock();
3316         scope(exit) queueUnlock();
3317         atomicSetUbyte(status, PoolState.stopNow);
3318         notifyAll();
3319     }
3320 
3321     /**
3322     Signals worker threads to terminate when the queue becomes empty.
3323 
3324     If blocking argument is true, wait for all worker threads to terminate
3325     before returning.  This option might be used in applications where
3326     task results are never consumed-- e.g. when `TaskPool` is employed as a
3327     rudimentary scheduler for tasks which communicate by means other than
3328     return values.
3329 
3330     Warning:  Calling this function with $(D blocking = true) from a worker
3331               thread that is a member of the same `TaskPool` that
3332               `finish` is being called on will result in a deadlock.
3333      */
3334     void finish(bool blocking = false) @trusted
3335     {
3336         {
3337             queueLock();
3338             scope(exit) queueUnlock();
3339             atomicCasUbyte(status, PoolState.running, PoolState.finishing);
3340             notifyAll();
3341         }
3342         if (blocking)
3343         {
3344             // Use this thread as a worker until everything is finished.
3345             executeWorkLoop();
3346 
3347             foreach (t; pool)
3348             {
3349                 // Maybe there should be something here to prevent a thread
3350                 // from calling join() on itself if this function is called
3351                 // from a worker thread in the same pool, but:
3352                 //
3353                 // 1.  Using an if statement to skip join() would result in
3354                 //     finish() returning without all tasks being finished.
3355                 //
3356                 // 2.  If an exception were thrown, it would bubble up to the
3357                 //     Task from which finish() was called and likely be
3358                 //     swallowed.
3359                 t.join();
3360             }
3361         }
3362     }
3363 
3364     /// Returns the number of worker threads in the pool.
3365     @property size_t size() @safe const pure nothrow
3366     {
3367         return pool.length;
3368     }
3369 
3370     /**
3371     Put a `Task` object on the back of the task queue.  The `Task`
3372     object may be passed by pointer or reference.
3373 
3374     Example:
3375     ---
3376     import std.file;
3377 
3378     // Create a task.
3379     auto t = task!read("foo.txt");
3380 
3381     // Add it to the queue to be executed.
3382     taskPool.put(t);
3383     ---
3384 
3385     Notes:
3386 
3387     @trusted overloads of this function are called for `Task`s if
3388     $(REF hasUnsharedAliasing, std,traits) is false for the `Task`'s
3389     return type or the function the `Task` executes is `pure`.
3390     `Task` objects that meet all other requirements specified in the
3391     `@trusted` overloads of `task` and `scopedTask` may be created
3392     and executed from `@safe` code via `Task.executeInNewThread` but
3393     not via `TaskPool`.
3394 
3395     While this function takes the address of variables that may
3396     be on the stack, some overloads are marked as @trusted.
3397     `Task` includes a destructor that waits for the task to complete
3398     before destroying the stack frame it is allocated on.  Therefore,
3399     it is impossible for the stack frame to be destroyed before the task is
3400     complete and no longer referenced by a `TaskPool`.
3401     */
3402     void put(alias fun, Args...)(ref Task!(fun, Args) task)
3403     if (!isSafeReturn!(typeof(task)))
3404     {
3405         task.pool = this;
3406         abstractPut(task.basePtr);
3407     }
3408 
3409     /// Ditto
3410     void put(alias fun, Args...)(Task!(fun, Args)* task)
3411     if (!isSafeReturn!(typeof(*task)))
3412     {
3413         import std.exception : enforce;
3414         enforce(task !is null, "Cannot put a null Task on a TaskPool queue.");
3415         put(*task);
3416     }
3417 
3418     @trusted void put(alias fun, Args...)(ref Task!(fun, Args) task)
3419     if (isSafeReturn!(typeof(task)))
3420     {
3421         task.pool = this;
3422         abstractPut(task.basePtr);
3423     }
3424 
3425     @trusted void put(alias fun, Args...)(Task!(fun, Args)* task)
3426     if (isSafeReturn!(typeof(*task)))
3427     {
3428         import std.exception : enforce;
3429         enforce(task !is null, "Cannot put a null Task on a TaskPool queue.");
3430         put(*task);
3431     }
3432 
3433     /**
3434     These properties control whether the worker threads are daemon threads.
3435     A daemon thread is automatically terminated when all non-daemon threads
3436     have terminated.  A non-daemon thread will prevent a program from
3437     terminating as long as it has not terminated.
3438 
3439     If any `TaskPool` with non-daemon threads is active, either `stop`
3440     or `finish` must be called on it before the program can terminate.
3441 
3442     The worker treads in the `TaskPool` instance returned by the
3443     `taskPool` property are daemon by default.  The worker threads of
3444     manually instantiated task pools are non-daemon by default.
3445 
3446     Note:  For a size zero pool, the getter arbitrarily returns true and the
3447            setter has no effect.
3448     */
3449     bool isDaemon() @property @trusted
3450     {
3451         queueLock();
3452         scope(exit) queueUnlock();
3453         return (size == 0) ? true : pool[0].isDaemon;
3454     }
3455 
3456     /// Ditto
3457     void isDaemon(bool newVal) @property @trusted
3458     {
3459         queueLock();
3460         scope(exit) queueUnlock();
3461         foreach (thread; pool)
3462         {
3463             thread.isDaemon = newVal;
3464         }
3465     }
3466 
3467     /**
3468     These functions allow getting and setting the OS scheduling priority of
3469     the worker threads in this `TaskPool`.  They forward to
3470     `core.thread.Thread.priority`, so a given priority value here means the
3471     same thing as an identical priority value in `core.thread`.
3472 
3473     Note:  For a size zero pool, the getter arbitrarily returns
3474            `core.thread.Thread.PRIORITY_MIN` and the setter has no effect.
3475     */
3476     int priority() @property @trusted
3477     {
3478         return (size == 0) ? core.thread.Thread.PRIORITY_MIN :
3479         pool[0].priority;
3480     }
3481 
3482     /// Ditto
3483     void priority(int newPriority) @property @trusted
3484     {
3485         if (size > 0)
3486         {
3487             foreach (t; pool)
3488             {
3489                 t.priority = newPriority;
3490             }
3491         }
3492     }
3493 }
3494 
3495 @system unittest
3496 {
3497     import std.algorithm.iteration : sum;
3498     import std.range : iota;
3499     import std.typecons : tuple;
3500 
3501     enum N = 100;
3502     auto r = iota(1, N + 1);
3503     const expected = r.sum();
3504 
3505     // Just the range
3506     assert(taskPool.fold!"a + b"(r) == expected);
3507 
3508     // Range and seeds
3509     assert(taskPool.fold!"a + b"(r, 0) == expected);
3510     assert(taskPool.fold!("a + b", "a + b")(r, 0, 0) == tuple(expected, expected));
3511 
3512     // Range, seeds, and work unit size
3513     assert(taskPool.fold!"a + b"(r, 0, 42) == expected);
3514     assert(taskPool.fold!("a + b", "a + b")(r, 0, 0, 42) == tuple(expected, expected));
3515 }
3516 
3517 /**
3518 Returns a lazily initialized global instantiation of `TaskPool`.
3519 This function can safely be called concurrently from multiple non-worker
3520 threads.  The worker threads in this pool are daemon threads, meaning that it
3521 is not necessary to call `TaskPool.stop` or `TaskPool.finish` before
3522 terminating the main thread.
3523 */
3524 @property TaskPool taskPool() @trusted
3525 {
3526     import std.concurrency : initOnce;
3527     __gshared TaskPool pool;
3528     return initOnce!pool({
3529         auto p = new TaskPool(defaultPoolThreads);
3530         p.isDaemon = true;
3531         return p;
3532     }());
3533 }
3534 
3535 private shared uint _defaultPoolThreads = uint.max;
3536 
3537 /**
3538 These properties get and set the number of worker threads in the `TaskPool`
3539 instance returned by `taskPool`.  The default value is `totalCPUs` - 1.
3540 Calling the setter after the first call to `taskPool` does not changes
3541 number of worker threads in the instance returned by `taskPool`.
3542 */
3543 @property uint defaultPoolThreads() @trusted
3544 {
3545     const local = atomicLoad(_defaultPoolThreads);
3546     return local < uint.max ? local : totalCPUs - 1;
3547 }
3548 
3549 /// Ditto
3550 @property void defaultPoolThreads(uint newVal) @trusted
3551 {
3552     atomicStore(_defaultPoolThreads, newVal);
3553 }
3554 
3555 /**
3556 Convenience functions that forwards to `taskPool.parallel`.  The
3557 purpose of these is to make parallel foreach less verbose and more
3558 readable.
3559 
3560 Example:
3561 ---
3562 // Find the logarithm of every number from
3563 // 1 to 1_000_000 in parallel, using the
3564 // default TaskPool instance.
3565 auto logs = new double[1_000_000];
3566 
3567 foreach (i, ref elem; parallel(logs))
3568 {
3569     elem = log(i + 1.0);
3570 }
3571 ---
3572 
3573 */
3574 ParallelForeach!R parallel(R)(R range)
3575 {
3576     return taskPool.parallel(range);
3577 }
3578 
3579 /// Ditto
3580 ParallelForeach!R parallel(R)(R range, size_t workUnitSize)
3581 {
3582     return taskPool.parallel(range, workUnitSize);
3583 }
3584 
3585 //  `each` should be usable with parallel
3586 // https://issues.dlang.org/show_bug.cgi?id=17019
3587 @system unittest
3588 {
3589     import std.algorithm.iteration : each, sum;
3590     import std.range : iota;
3591 
3592     // check behavior with parallel
3593     auto arr = new int[10];
3594     parallel(arr).each!((ref e) => e += 1);
3595     assert(arr.sum == 10);
3596 
3597     auto arrIndex = new int[10];
3598     parallel(arrIndex).each!((i, ref e) => e += i);
3599     assert(arrIndex.sum == 10.iota.sum);
3600 }
3601 
3602 // Thrown when a parallel foreach loop is broken from.
3603 class ParallelForeachError : Error
3604 {
3605     this()
3606     {
3607         super("Cannot break from a parallel foreach loop using break, return, "
3608               ~ "labeled break/continue or goto statements.");
3609     }
3610 }
3611 
3612 /*------Structs that implement opApply for parallel foreach.------------------*/
3613 private template randLen(R)
3614 {
3615     enum randLen = isRandomAccessRange!R && hasLength!R;
3616 }
3617 
3618 private void submitAndExecute(
3619     TaskPool pool,
3620     scope void delegate() doIt
3621 )
3622 {
3623     import core.exception : OutOfMemoryError;
3624     immutable nThreads = pool.size + 1;
3625 
3626     alias PTask = typeof(scopedTask(doIt));
3627     import core.stdc.stdlib : malloc, free;
3628     import core.stdc..string : memcpy;
3629 
3630     // The logical thing to do would be to just use alloca() here, but that
3631     // causes problems on Windows for reasons that I don't understand
3632     // (tentatively a compiler bug) and definitely doesn't work on Posix due
3633     // to https://issues.dlang.org/show_bug.cgi?id=3753.
3634     // Therefore, allocate a fixed buffer and fall back to `malloc()` if
3635     // someone's using a ridiculous amount of threads.
3636     // Also, the using a byte array instead of a PTask array as the fixed buffer
3637     // is to prevent d'tors from being called on uninitialized excess PTask
3638     // instances.
3639     enum nBuf = 64;
3640     byte[nBuf * PTask.sizeof] buf = void;
3641     PTask[] tasks;
3642     if (nThreads <= nBuf)
3643     {
3644         tasks = (cast(PTask*) buf.ptr)[0 .. nThreads];
3645     }
3646     else
3647     {
3648         auto ptr = cast(PTask*) malloc(nThreads * PTask.sizeof);
3649         if (!ptr) throw new OutOfMemoryError("Out of memory in std.parallelism.");
3650         tasks = ptr[0 .. nThreads];
3651     }
3652 
3653     scope(exit)
3654     {
3655         if (nThreads > nBuf)
3656         {
3657             free(tasks.ptr);
3658         }
3659     }
3660 
3661     foreach (ref t; tasks)
3662     {
3663         import core.stdc..string : memcpy;
3664 
3665         // This silly looking code is necessary to prevent d'tors from being
3666         // called on uninitialized objects.
3667         auto temp = scopedTask(doIt);
3668         memcpy(&t, &temp, PTask.sizeof);
3669 
3670         // This has to be done to t after copying, not temp before copying.
3671         // Otherwise, temp's destructor will sit here and wait for the
3672         // task to finish.
3673         t.pool = pool;
3674     }
3675 
3676     foreach (i; 1 .. tasks.length - 1)
3677     {
3678         tasks[i].next = tasks[i + 1].basePtr;
3679         tasks[i + 1].prev = tasks[i].basePtr;
3680     }
3681 
3682     if (tasks.length > 1)
3683     {
3684         pool.queueLock();
3685         scope(exit) pool.queueUnlock();
3686 
3687         pool.abstractPutGroupNoSync(
3688             tasks[1].basePtr,
3689             tasks[$ - 1].basePtr
3690         );
3691     }
3692 
3693     if (tasks.length > 0)
3694     {
3695         try
3696         {
3697             tasks[0].job();
3698         }
3699         catch (Throwable e)
3700         {
3701             tasks[0].exception = e; // nocoverage
3702         }
3703         tasks[0].taskStatus = TaskStatus.done;
3704 
3705         // Try to execute each of these in the current thread
3706         foreach (ref task; tasks[1..$])
3707         {
3708             pool.tryDeleteExecute(task.basePtr);
3709         }
3710     }
3711 
3712     Throwable firstException;
3713 
3714     foreach (i, ref task; tasks)
3715     {
3716         try
3717         {
3718             task.yieldForce;
3719         }
3720         catch (Throwable e)
3721         {
3722             /* Chain e to front because order doesn't matter and because
3723              * e is not likely to be a chain itself (so fewer traversals)
3724              */
3725             firstException = Throwable.chainTogether(e, firstException);
3726             continue;
3727         }
3728     }
3729 
3730     if (firstException) throw firstException;
3731 }
3732 
3733 void foreachErr()
3734 {
3735     throw new ParallelForeachError();
3736 }
3737 
3738 int doSizeZeroCase(R, Delegate)(ref ParallelForeach!R p, Delegate dg)
3739 {
3740     with(p)
3741     {
3742         int res = 0;
3743         size_t index = 0;
3744 
3745         // The explicit ElementType!R in the foreach loops is necessary for
3746         // correct behavior when iterating over strings.
3747         static if (hasLvalueElements!R)
3748         {
3749             foreach (ref ElementType!R elem; range)
3750             {
3751                 static if (Parameters!dg.length == 2)
3752                 {
3753                     res = dg(index, elem);
3754                 }
3755                 else
3756                 {
3757                     res = dg(elem);
3758                 }
3759                 if (res) break;
3760                 index++;
3761             }
3762         }
3763         else
3764         {
3765             foreach (ElementType!R elem; range)
3766             {
3767                 static if (Parameters!dg.length == 2)
3768                 {
3769                     res = dg(index, elem);
3770                 }
3771                 else
3772                 {
3773                     res = dg(elem);
3774                 }
3775                 if (res) break;
3776                 index++;
3777             }
3778         }
3779         if (res) foreachErr;
3780         return res;
3781     }
3782 }
3783 
3784 private enum string parallelApplyMixinRandomAccess = q{
3785     // Handle empty thread pool as special case.
3786     if (pool.size == 0)
3787     {
3788         return doSizeZeroCase(this, dg);
3789     }
3790 
3791     // Whether iteration is with or without an index variable.
3792     enum withIndex = Parameters!(typeof(dg)).length == 2;
3793 
3794     shared size_t workUnitIndex = size_t.max;  // Effectively -1:  chunkIndex + 1 == 0
3795     immutable len = range.length;
3796     if (!len) return 0;
3797 
3798     shared bool shouldContinue = true;
3799 
3800     void doIt()
3801     {
3802         import std.algorithm.comparison : min;
3803 
3804         scope(failure)
3805         {
3806             // If an exception is thrown, all threads should bail.
3807             atomicStore(shouldContinue, false);
3808         }
3809 
3810         while (atomicLoad(shouldContinue))
3811         {
3812             immutable myUnitIndex = atomicOp!"+="(workUnitIndex, 1);
3813             immutable start = workUnitSize * myUnitIndex;
3814             if (start >= len)
3815             {
3816                 atomicStore(shouldContinue, false);
3817                 break;
3818             }
3819 
3820             immutable end = min(len, start + workUnitSize);
3821 
3822             foreach (i; start .. end)
3823             {
3824                 static if (withIndex)
3825                 {
3826                     if (dg(i, range[i])) foreachErr();
3827                 }
3828                 else
3829                 {
3830                     if (dg(range[i])) foreachErr();
3831                 }
3832             }
3833         }
3834     }
3835 
3836     submitAndExecute(pool, &doIt);
3837 
3838     return 0;
3839 };
3840 
3841 enum string parallelApplyMixinInputRange = q{
3842     // Handle empty thread pool as special case.
3843     if (pool.size == 0)
3844     {
3845         return doSizeZeroCase(this, dg);
3846     }
3847 
3848     // Whether iteration is with or without an index variable.
3849     enum withIndex = Parameters!(typeof(dg)).length == 2;
3850 
3851     // This protects the range while copying it.
3852     auto rangeMutex = new Mutex();
3853 
3854     shared bool shouldContinue = true;
3855 
3856     // The total number of elements that have been popped off range.
3857     // This is updated only while protected by rangeMutex;
3858     size_t nPopped = 0;
3859 
3860     static if (
3861         is(typeof(range.buf1)) &&
3862         is(typeof(range.bufPos)) &&
3863         is(typeof(range.doBufSwap()))
3864     )
3865     {
3866         // Make sure we don't have the buffer recycling overload of
3867         // asyncBuf.
3868         static if (
3869             is(typeof(range.source)) &&
3870             isRoundRobin!(typeof(range.source))
3871         )
3872         {
3873             static assert(0, "Cannot execute a parallel foreach loop on " ~
3874             "the buffer recycling overload of asyncBuf.");
3875         }
3876 
3877         enum bool bufferTrick = true;
3878     }
3879     else
3880     {
3881         enum bool bufferTrick = false;
3882     }
3883 
3884     void doIt()
3885     {
3886         scope(failure)
3887         {
3888             // If an exception is thrown, all threads should bail.
3889             atomicStore(shouldContinue, false);
3890         }
3891 
3892         static if (hasLvalueElements!R)
3893         {
3894             alias Temp = ElementType!R*[];
3895             Temp temp;
3896 
3897             // Returns:  The previous value of nPopped.
3898             size_t makeTemp()
3899             {
3900                 import std.algorithm.internal : addressOf;
3901                 import std.array : uninitializedArray;
3902 
3903                 if (temp is null)
3904                 {
3905                     temp = uninitializedArray!Temp(workUnitSize);
3906                 }
3907 
3908                 rangeMutex.lock();
3909                 scope(exit) rangeMutex.unlock();
3910 
3911                 size_t i = 0;
3912                 for (; i < workUnitSize && !range.empty; range.popFront(), i++)
3913                 {
3914                     temp[i] = addressOf(range.front);
3915                 }
3916 
3917                 temp = temp[0 .. i];
3918                 auto ret = nPopped;
3919                 nPopped += temp.length;
3920                 return ret;
3921             }
3922 
3923         }
3924         else
3925         {
3926 
3927             alias Temp = ElementType!R[];
3928             Temp temp;
3929 
3930             // Returns:  The previous value of nPopped.
3931             static if (!bufferTrick) size_t makeTemp()
3932             {
3933                 import std.array : uninitializedArray;
3934 
3935                 if (temp is null)
3936                 {
3937                     temp = uninitializedArray!Temp(workUnitSize);
3938                 }
3939 
3940                 rangeMutex.lock();
3941                 scope(exit) rangeMutex.unlock();
3942 
3943                 size_t i = 0;
3944                 for (; i < workUnitSize && !range.empty; range.popFront(), i++)
3945                 {
3946                     temp[i] = range.front;
3947                 }
3948 
3949                 temp = temp[0 .. i];
3950                 auto ret = nPopped;
3951                 nPopped += temp.length;
3952                 return ret;
3953             }
3954 
3955             static if (bufferTrick) size_t makeTemp()
3956             {
3957                 import std.algorithm.mutation : swap;
3958                 rangeMutex.lock();
3959                 scope(exit) rangeMutex.unlock();
3960 
3961                 // Elide copying by just swapping buffers.
3962                 temp.length = range.buf1.length;
3963                 swap(range.buf1, temp);
3964 
3965                 // This is necessary in case popFront() has been called on
3966                 // range before entering the parallel foreach loop.
3967                 temp = temp[range.bufPos..$];
3968 
3969                 static if (is(typeof(range._length)))
3970                 {
3971                     range._length -= (temp.length - range.bufPos);
3972                 }
3973 
3974                 range.doBufSwap();
3975                 auto ret = nPopped;
3976                 nPopped += temp.length;
3977                 return ret;
3978             }
3979         }
3980 
3981         while (atomicLoad(shouldContinue))
3982         {
3983             auto overallIndex = makeTemp();
3984             if (temp.empty)
3985             {
3986                 atomicStore(shouldContinue, false);
3987                 break;
3988             }
3989 
3990             foreach (i; 0 .. temp.length)
3991             {
3992                 scope(success) overallIndex++;
3993 
3994                 static if (hasLvalueElements!R)
3995                 {
3996                     static if (withIndex)
3997                     {
3998                         if (dg(overallIndex, *temp[i])) foreachErr();
3999                     }
4000                     else
4001                     {
4002                         if (dg(*temp[i])) foreachErr();
4003                     }
4004                 }
4005                 else
4006                 {
4007                     static if (withIndex)
4008                     {
4009                         if (dg(overallIndex, temp[i])) foreachErr();
4010                     }
4011                     else
4012                     {
4013                         if (dg(temp[i])) foreachErr();
4014                     }
4015                 }
4016             }
4017         }
4018     }
4019 
4020     submitAndExecute(pool, &doIt);
4021 
4022     return 0;
4023 };
4024 
4025 
4026 private struct ParallelForeach(R)
4027 {
4028     TaskPool pool;
4029     R range;
4030     size_t workUnitSize;
4031     alias E = ElementType!R;
4032 
4033     static if (hasLvalueElements!R)
4034     {
4035         alias NoIndexDg = int delegate(ref E);
4036         alias IndexDg = int delegate(size_t, ref E);
4037     }
4038     else
4039     {
4040         alias NoIndexDg = int delegate(E);
4041         alias IndexDg = int delegate(size_t, E);
4042     }
4043 
4044     int opApply(scope NoIndexDg dg)
4045     {
4046         static if (randLen!R)
4047         {
4048             mixin(parallelApplyMixinRandomAccess);
4049         }
4050         else
4051         {
4052             mixin(parallelApplyMixinInputRange);
4053         }
4054     }
4055 
4056     int opApply(scope IndexDg dg)
4057     {
4058         static if (randLen!R)
4059         {
4060             mixin(parallelApplyMixinRandomAccess);
4061         }
4062         else
4063         {
4064             mixin(parallelApplyMixinInputRange);
4065         }
4066     }
4067 }
4068 
4069 /*
4070 This struct buffers the output of a callable that outputs data into a
4071 user-supplied buffer into a set of buffers of some fixed size.  It allows these
4072 buffers to be accessed with an input range interface.  This is used internally
4073 in the buffer-recycling overload of TaskPool.asyncBuf, which creates an
4074 instance and forwards it to the input range overload of asyncBuf.
4075 */
4076 private struct RoundRobinBuffer(C1, C2)
4077 {
4078     // No need for constraints because they're already checked for in asyncBuf.
4079 
4080     alias Array = Parameters!(C1.init)[0];
4081     alias T = typeof(Array.init[0]);
4082 
4083     T[][] bufs;
4084     size_t index;
4085     C1 nextDel;
4086     C2 emptyDel;
4087     bool _empty;
4088     bool primed;
4089 
4090     this(
4091         C1 nextDel,
4092         C2 emptyDel,
4093         size_t initialBufSize,
4094         size_t nBuffers
4095     ) {
4096         this.nextDel = nextDel;
4097         this.emptyDel = emptyDel;
4098         bufs.length = nBuffers;
4099 
4100         foreach (ref buf; bufs)
4101         {
4102             buf.length = initialBufSize;
4103         }
4104     }
4105 
4106     void prime()
4107     in
4108     {
4109         assert(!empty);
4110     }
4111     do
4112     {
4113         scope(success) primed = true;
4114         nextDel(bufs[index]);
4115     }
4116 
4117 
4118     T[] front() @property
4119     in
4120     {
4121         assert(!empty);
4122     }
4123     do
4124     {
4125         if (!primed) prime();
4126         return bufs[index];
4127     }
4128 
4129     void popFront()
4130     {
4131         if (empty || emptyDel())
4132         {
4133             _empty = true;
4134             return;
4135         }
4136 
4137         index = (index + 1) % bufs.length;
4138         primed = false;
4139     }
4140 
4141     bool empty() @property const @safe pure nothrow
4142     {
4143         return _empty;
4144     }
4145 }
4146 
4147 version (StdUnittest)
4148 {
4149     // This was the only way I could get nested maps to work.
4150     private __gshared TaskPool poolInstance;
4151 }
4152 
4153 // These test basic functionality but don't stress test for threading bugs.
4154 // These are the tests that should be run every time Phobos is compiled.
4155 @system unittest
4156 {
4157     import std.algorithm.comparison : equal, min, max;
4158     import std.algorithm.iteration : filter, map, reduce;
4159     import std.array : split;
4160     import std.conv : text;
4161     import std.exception : assertThrown;
4162     import std.math : approxEqual, sqrt, log;
4163     import std.range : indexed, iota, join;
4164     import std.typecons : Tuple, tuple;
4165     import std.stdio;
4166 
4167     poolInstance = new TaskPool(2);
4168     scope(exit) poolInstance.stop();
4169 
4170     // The only way this can be verified is manually.
4171     debug(std_parallelism) stderr.writeln("totalCPUs = ", totalCPUs);
4172 
4173     auto oldPriority = poolInstance.priority;
4174     poolInstance.priority = Thread.PRIORITY_MAX;
4175     assert(poolInstance.priority == Thread.PRIORITY_MAX);
4176 
4177     poolInstance.priority = Thread.PRIORITY_MIN;
4178     assert(poolInstance.priority == Thread.PRIORITY_MIN);
4179 
4180     poolInstance.priority = oldPriority;
4181     assert(poolInstance.priority == oldPriority);
4182 
4183     static void refFun(ref uint num)
4184     {
4185         num++;
4186     }
4187 
4188     uint x;
4189 
4190     // Test task().
4191     auto t = task!refFun(x);
4192     poolInstance.put(t);
4193     t.yieldForce;
4194     assert(t.args[0] == 1);
4195 
4196     auto t2 = task(&refFun, x);
4197     poolInstance.put(t2);
4198     t2.yieldForce;
4199     assert(t2.args[0] == 1);
4200 
4201     // Test scopedTask().
4202     auto st = scopedTask!refFun(x);
4203     poolInstance.put(st);
4204     st.yieldForce;
4205     assert(st.args[0] == 1);
4206 
4207     auto st2 = scopedTask(&refFun, x);
4208     poolInstance.put(st2);
4209     st2.yieldForce;
4210     assert(st2.args[0] == 1);
4211 
4212     // Test executeInNewThread().
4213     auto ct = scopedTask!refFun(x);
4214     ct.executeInNewThread(Thread.PRIORITY_MAX);
4215     ct.yieldForce;
4216     assert(ct.args[0] == 1);
4217 
4218     // Test ref return.
4219     uint toInc = 0;
4220     static ref T makeRef(T)(ref T num)
4221     {
4222         return num;
4223     }
4224 
4225     auto t3 = task!makeRef(toInc);
4226     taskPool.put(t3);
4227     assert(t3.args[0] == 0);
4228     t3.spinForce++;
4229     assert(t3.args[0] == 1);
4230 
4231     static void testSafe() @safe {
4232         static int bump(int num)
4233         {
4234             return num + 1;
4235         }
4236 
4237         auto safePool = new TaskPool(0);
4238         auto t = task(&bump, 1);
4239         taskPool.put(t);
4240         assert(t.yieldForce == 2);
4241 
4242         auto st = scopedTask(&bump, 1);
4243         taskPool.put(st);
4244         assert(st.yieldForce == 2);
4245         safePool.stop();
4246     }
4247 
4248     auto arr = [1,2,3,4,5];
4249     auto nums = new uint[5];
4250     auto nums2 = new uint[5];
4251 
4252     foreach (i, ref elem; poolInstance.parallel(arr))
4253     {
4254         elem++;
4255         nums[i] = cast(uint) i + 2;
4256         nums2[i] = elem;
4257     }
4258 
4259     assert(nums == [2,3,4,5,6], text(nums));
4260     assert(nums2 == nums, text(nums2));
4261     assert(arr == nums, text(arr));
4262 
4263     // Test const/immutable arguments.
4264     static int add(int lhs, int rhs)
4265     {
4266         return lhs + rhs;
4267     }
4268     immutable addLhs = 1;
4269     immutable addRhs = 2;
4270     auto addTask = task(&add, addLhs, addRhs);
4271     auto addScopedTask = scopedTask(&add, addLhs, addRhs);
4272     poolInstance.put(addTask);
4273     poolInstance.put(addScopedTask);
4274     assert(addTask.yieldForce == 3);
4275     assert(addScopedTask.yieldForce == 3);
4276 
4277     // Test parallel foreach with non-random access range.
4278     auto range = filter!"a != 666"([0, 1, 2, 3, 4]);
4279 
4280     foreach (i, elem; poolInstance.parallel(range))
4281     {
4282         nums[i] = cast(uint) i;
4283     }
4284 
4285     assert(nums == [0,1,2,3,4]);
4286 
4287     auto logs = new double[1_000_000];
4288     foreach (i, ref elem; poolInstance.parallel(logs))
4289     {
4290         elem = log(i + 1.0);
4291     }
4292 
4293     foreach (i, elem; logs)
4294     {
4295         assert(approxEqual(elem, cast(double) log(i + 1)));
4296     }
4297 
4298     assert(poolInstance.amap!"a * a"([1,2,3,4,5]) == [1,4,9,16,25]);
4299     assert(poolInstance.amap!"a * a"([1,2,3,4,5], new long[5]) == [1,4,9,16,25]);
4300     assert(poolInstance.amap!("a * a", "-a")([1,2,3]) ==
4301            [tuple(1, -1), tuple(4, -2), tuple(9, -3)]);
4302 
4303     auto tupleBuf = new Tuple!(int, int)[3];
4304     poolInstance.amap!("a * a", "-a")([1,2,3], tupleBuf);
4305     assert(tupleBuf == [tuple(1, -1), tuple(4, -2), tuple(9, -3)]);
4306     poolInstance.amap!("a * a", "-a")([1,2,3], 5, tupleBuf);
4307     assert(tupleBuf == [tuple(1, -1), tuple(4, -2), tuple(9, -3)]);
4308 
4309     // Test amap with a non-array buffer.
4310     auto toIndex = new int[5];
4311     auto ind = indexed(toIndex, [3, 1, 4, 0, 2]);
4312     poolInstance.amap!"a * 2"([1, 2, 3, 4, 5], ind);
4313     assert(equal(ind, [2, 4, 6, 8, 10]));
4314     assert(equal(toIndex, [8, 4, 10, 2, 6]));
4315     poolInstance.amap!"a / 2"(ind, ind);
4316     assert(equal(ind, [1, 2, 3, 4, 5]));
4317     assert(equal(toIndex, [4, 2, 5, 1, 3]));
4318 
4319     auto buf = new int[5];
4320     poolInstance.amap!"a * a"([1,2,3,4,5], buf);
4321     assert(buf == [1,4,9,16,25]);
4322     poolInstance.amap!"a * a"([1,2,3,4,5], 4, buf);
4323     assert(buf == [1,4,9,16,25]);
4324 
4325     assert(poolInstance.reduce!"a + b"([1]) == 1);
4326     assert(poolInstance.reduce!"a + b"([1,2,3,4]) == 10);
4327     assert(poolInstance.reduce!"a + b"(0.0, [1,2,3,4]) == 10);
4328     assert(poolInstance.reduce!"a + b"(0.0, [1,2,3,4], 1) == 10);
4329     assert(poolInstance.reduce!(min, max)([1,2,3,4]) == tuple(1, 4));
4330     assert(poolInstance.reduce!("a + b", "a * b")(tuple(0, 1), [1,2,3,4]) ==
4331            tuple(10, 24));
4332 
4333     immutable serialAns = reduce!"a + b"(iota(1000));
4334     assert(poolInstance.reduce!"a + b"(0, iota(1000)) == serialAns);
4335     assert(poolInstance.reduce!"a + b"(iota(1000)) == serialAns);
4336 
4337     // Test worker-local storage.
4338     auto wl = poolInstance.workerLocalStorage(0);
4339     foreach (i; poolInstance.parallel(iota(1000), 1))
4340     {
4341         wl.get = wl.get + i;
4342     }
4343 
4344     auto wlRange = wl.toRange;
4345     auto parallelSum = poolInstance.reduce!"a + b"(wlRange);
4346     assert(parallelSum == 499500);
4347     assert(wlRange[0 .. 1][0] == wlRange[0]);
4348     assert(wlRange[1 .. 2][0] == wlRange[1]);
4349 
4350     // Test finish()
4351     {
4352         static void slowFun() { Thread.sleep(dur!"msecs"(1)); }
4353 
4354         auto pool1 = new TaskPool();
4355         auto tSlow = task!slowFun();
4356         pool1.put(tSlow);
4357         pool1.finish();
4358         tSlow.yieldForce;
4359         // Can't assert that pool1.status == PoolState.stopNow because status
4360         // doesn't change until after the "done" flag is set and the waiting
4361         // thread is woken up.
4362 
4363         auto pool2 = new TaskPool();
4364         auto tSlow2 = task!slowFun();
4365         pool2.put(tSlow2);
4366         pool2.finish(true); // blocking
4367         assert(tSlow2.done);
4368 
4369         // Test fix for https://issues.dlang.org/show_bug.cgi?id=8582 by making pool size zero.
4370         auto pool3 = new TaskPool(0);
4371         auto tSlow3 = task!slowFun();
4372         pool3.put(tSlow3);
4373         pool3.finish(true); // blocking
4374         assert(tSlow3.done);
4375 
4376         // This is correct because no thread will terminate unless pool2.status
4377         // and pool3.status have already been set to stopNow.
4378         assert(pool2.status == TaskPool.PoolState.stopNow);
4379         assert(pool3.status == TaskPool.PoolState.stopNow);
4380     }
4381 
4382     // Test default pool stuff.
4383     assert(taskPool.size == totalCPUs - 1);
4384 
4385     nums = new uint[1000];
4386     foreach (i; parallel(iota(1000)))
4387     {
4388         nums[i] = cast(uint) i;
4389     }
4390     assert(equal(nums, iota(1000)));
4391 
4392     assert(equal(
4393                poolInstance.map!"a * a"(iota(3_000_001), 10_000),
4394                map!"a * a"(iota(3_000_001))
4395            ));
4396 
4397     // The filter is to kill random access and test the non-random access
4398     // branch.
4399     assert(equal(
4400                poolInstance.map!"a * a"(
4401                    filter!"a == a"(iota(3_000_001)
4402                                   ), 10_000, 1000),
4403                map!"a * a"(iota(3_000_001))
4404            ));
4405 
4406     assert(
4407         reduce!"a + b"(0UL,
4408                        poolInstance.map!"a * a"(iota(300_001), 10_000)
4409                       ) ==
4410         reduce!"a + b"(0UL,
4411                        map!"a * a"(iota(300_001))
4412                       )
4413     );
4414 
4415     assert(equal(
4416                iota(1_000_002),
4417                poolInstance.asyncBuf(filter!"a == a"(iota(1_000_002)))
4418            ));
4419 
4420     {
4421         import std.conv : to;
4422         import std.file : deleteme;
4423 
4424         string temp_file = deleteme ~ "-tempDelMe.txt";
4425         auto file = File(temp_file, "wb");
4426         scope(exit)
4427         {
4428             file.close();
4429             import std.file;
4430             remove(temp_file);
4431         }
4432 
4433         auto written = [[1.0, 2, 3], [4.0, 5, 6], [7.0, 8, 9]];
4434         foreach (row; written)
4435         {
4436             file.writeln(join(to!(string[])(row), "\t"));
4437         }
4438 
4439         file = File(temp_file);
4440 
4441         void next(ref char[] buf)
4442         {
4443             file.readln(buf);
4444             import std..string : chomp;
4445             buf = chomp(buf);
4446         }
4447 
4448         double[][] read;
4449         auto asyncReader = taskPool.asyncBuf(&next, &file.eof);
4450 
4451         foreach (line; asyncReader)
4452         {
4453             if (line.length == 0) continue;
4454             auto ls = line.split("\t");
4455             read ~= to!(double[])(ls);
4456         }
4457 
4458         assert(read == written);
4459         file.close();
4460     }
4461 
4462     // Test Map/AsyncBuf chaining.
4463 
4464     auto abuf = poolInstance.asyncBuf(iota(-1.0, 3_000_000), 100);
4465     auto temp = poolInstance.map!sqrt(
4466                     abuf, 100, 5
4467                 );
4468     auto lmchain = poolInstance.map!"a * a"(temp, 100, 5);
4469     lmchain.popFront();
4470 
4471     int ii;
4472     foreach ( elem; (lmchain))
4473     {
4474         if (!approxEqual(elem, ii))
4475         {
4476             stderr.writeln(ii, '\t', elem);
4477         }
4478         ii++;
4479     }
4480 
4481     // Test buffer trick in parallel foreach.
4482     abuf = poolInstance.asyncBuf(iota(-1.0, 1_000_000), 100);
4483     abuf.popFront();
4484     auto bufTrickTest = new size_t[abuf.length];
4485     foreach (i, elem; parallel(abuf))
4486     {
4487         bufTrickTest[i] = i;
4488     }
4489 
4490     assert(equal(iota(1_000_000), bufTrickTest));
4491 
4492     auto myTask = task!(std.math.abs)(-1);
4493     taskPool.put(myTask);
4494     assert(myTask.spinForce == 1);
4495 
4496     // Test that worker local storage from one pool receives an index of 0
4497     // when the index is queried w.r.t. another pool.  The only way to do this
4498     // is non-deterministically.
4499     foreach (i; parallel(iota(1000), 1))
4500     {
4501         assert(poolInstance.workerIndex == 0);
4502     }
4503 
4504     foreach (i; poolInstance.parallel(iota(1000), 1))
4505     {
4506         assert(taskPool.workerIndex == 0);
4507     }
4508 
4509     // Test exception handling.
4510     static void parallelForeachThrow()
4511     {
4512         foreach (elem; parallel(iota(10)))
4513         {
4514             throw new Exception("");
4515         }
4516     }
4517 
4518     assertThrown!Exception(parallelForeachThrow());
4519 
4520     static int reduceException(int a, int b)
4521     {
4522         throw new Exception("");
4523     }
4524 
4525     assertThrown!Exception(
4526         poolInstance.reduce!reduceException(iota(3))
4527     );
4528 
4529     static int mapException(int a)
4530     {
4531         throw new Exception("");
4532     }
4533 
4534     assertThrown!Exception(
4535         poolInstance.amap!mapException(iota(3))
4536     );
4537 
4538     static void mapThrow()
4539     {
4540         auto m = poolInstance.map!mapException(iota(3));
4541         m.popFront();
4542     }
4543 
4544     assertThrown!Exception(mapThrow());
4545 
4546     struct ThrowingRange
4547     {
4548         @property int front()
4549         {
4550             return 1;
4551         }
4552         void popFront()
4553         {
4554             throw new Exception("");
4555         }
4556         enum bool empty = false;
4557     }
4558 
4559     assertThrown!Exception(poolInstance.asyncBuf(ThrowingRange.init));
4560 }
4561 
4562 //version = parallelismStressTest;
4563 
4564 // These are more like stress tests than real unit tests.  They print out
4565 // tons of stuff and should not be run every time make unittest is run.
4566 version (parallelismStressTest)
4567 {
4568     @safe unittest
4569     {
4570         size_t attempt;
4571         for (; attempt < 10; attempt++)
4572             foreach (poolSize; [0, 4])
4573         {
4574 
4575             poolInstance = new TaskPool(poolSize);
4576 
4577             uint[] numbers = new uint[1_000];
4578 
4579             foreach (i; poolInstance.parallel( iota(0, numbers.length)) )
4580             {
4581                 numbers[i] = cast(uint) i;
4582             }
4583 
4584             // Make sure it works.
4585             foreach (i; 0 .. numbers.length)
4586             {
4587                 assert(numbers[i] == i);
4588             }
4589 
4590             stderr.writeln("Done creating nums.");
4591 
4592 
4593             auto myNumbers = filter!"a % 7 > 0"( iota(0, 1000));
4594             foreach (num; poolInstance.parallel(myNumbers))
4595             {
4596                 assert(num % 7 > 0 && num < 1000);
4597             }
4598             stderr.writeln("Done modulus test.");
4599 
4600             uint[] squares = poolInstance.amap!"a * a"(numbers, 100);
4601             assert(squares.length == numbers.length);
4602             foreach (i, number; numbers)
4603             {
4604                 assert(squares[i] == number * number);
4605             }
4606             stderr.writeln("Done squares.");
4607 
4608             auto sumFuture = task!( reduce!"a + b" )(numbers);
4609             poolInstance.put(sumFuture);
4610 
4611             ulong sumSquares = 0;
4612             foreach (elem; numbers)
4613             {
4614                 sumSquares += elem * elem;
4615             }
4616 
4617             uint mySum = sumFuture.spinForce();
4618             assert(mySum == 999 * 1000 / 2);
4619 
4620             auto mySumParallel = poolInstance.reduce!"a + b"(numbers);
4621             assert(mySum == mySumParallel);
4622             stderr.writeln("Done sums.");
4623 
4624             auto myTask = task(
4625             {
4626                 synchronized writeln("Our lives are parallel...Our lives are parallel.");
4627             });
4628             poolInstance.put(myTask);
4629 
4630             auto nestedOuter = "abcd";
4631             auto nestedInner =  iota(0, 10, 2);
4632 
4633             foreach (i, letter; poolInstance.parallel(nestedOuter, 1))
4634             {
4635                 foreach (j, number; poolInstance.parallel(nestedInner, 1))
4636                 {
4637                     synchronized writeln(i, ": ", letter, "  ", j, ": ", number);
4638                 }
4639             }
4640 
4641             poolInstance.stop();
4642         }
4643 
4644         assert(attempt == 10);
4645         writeln("Press enter to go to next round of unittests.");
4646         readln();
4647     }
4648 
4649     // These unittests are intended more for actual testing and not so much
4650     // as examples.
4651     @safe unittest
4652     {
4653         foreach (attempt; 0 .. 10)
4654         foreach (poolSize; [0, 4])
4655         {
4656             poolInstance = new TaskPool(poolSize);
4657 
4658             // Test indexing.
4659             stderr.writeln("Creator Raw Index:  ", poolInstance.threadIndex);
4660             assert(poolInstance.workerIndex() == 0);
4661 
4662             // Test worker-local storage.
4663             auto workerLocalStorage = poolInstance.workerLocalStorage!uint(1);
4664             foreach (i; poolInstance.parallel(iota(0U, 1_000_000)))
4665             {
4666                 workerLocalStorage.get++;
4667             }
4668             assert(reduce!"a + b"(workerLocalStorage.toRange) ==
4669             1_000_000 + poolInstance.size + 1);
4670 
4671             // Make sure work is reasonably balanced among threads.  This test is
4672             // non-deterministic and is more of a sanity check than something that
4673             // has an absolute pass/fail.
4674             shared(uint)[void*] nJobsByThread;
4675             foreach (thread; poolInstance.pool)
4676             {
4677                 nJobsByThread[cast(void*) thread] = 0;
4678             }
4679             nJobsByThread[ cast(void*) Thread.getThis()] = 0;
4680 
4681             foreach (i; poolInstance.parallel( iota(0, 1_000_000), 100 ))
4682             {
4683                 atomicOp!"+="( nJobsByThread[ cast(void*) Thread.getThis() ], 1);
4684             }
4685 
4686             stderr.writeln("\nCurrent thread is:  ",
4687             cast(void*) Thread.getThis());
4688             stderr.writeln("Workload distribution:  ");
4689             foreach (k, v; nJobsByThread)
4690             {
4691                 stderr.writeln(k, '\t', v);
4692             }
4693 
4694             // Test whether amap can be nested.
4695             real[][] matrix = new real[][](1000, 1000);
4696             foreach (i; poolInstance.parallel( iota(0, matrix.length) ))
4697             {
4698                 foreach (j; poolInstance.parallel( iota(0, matrix[0].length) ))
4699                 {
4700                     matrix[i][j] = i * j;
4701                 }
4702             }
4703 
4704             // Get around weird bugs having to do w/ sqrt being an intrinsic:
4705             static real mySqrt(real num)
4706             {
4707                 return sqrt(num);
4708             }
4709 
4710             static real[] parallelSqrt(real[] nums)
4711             {
4712                 return poolInstance.amap!mySqrt(nums);
4713             }
4714 
4715             real[][] sqrtMatrix = poolInstance.amap!parallelSqrt(matrix);
4716 
4717             foreach (i, row; sqrtMatrix)
4718             {
4719                 foreach (j, elem; row)
4720                 {
4721                     real shouldBe = sqrt( cast(real) i * j);
4722                     assert(approxEqual(shouldBe, elem));
4723                     sqrtMatrix[i][j] = shouldBe;
4724                 }
4725             }
4726 
4727             auto saySuccess = task(
4728             {
4729                 stderr.writeln(
4730                     "Success doing matrix stuff that involves nested pool use.");
4731             });
4732             poolInstance.put(saySuccess);
4733             saySuccess.workForce();
4734 
4735             // A more thorough test of amap, reduce:  Find the sum of the square roots of
4736             // matrix.
4737 
4738             static real parallelSum(real[] input)
4739             {
4740                 return poolInstance.reduce!"a + b"(input);
4741             }
4742 
4743             auto sumSqrt = poolInstance.reduce!"a + b"(
4744                                poolInstance.amap!parallelSum(
4745                                    sqrtMatrix
4746                                )
4747                            );
4748 
4749             assert(approxEqual(sumSqrt, 4.437e8));
4750             stderr.writeln("Done sum of square roots.");
4751 
4752             // Test whether tasks work with function pointers.
4753             auto nanTask = task(&isNaN, 1.0L);
4754             poolInstance.put(nanTask);
4755             assert(nanTask.spinForce == false);
4756 
4757             if (poolInstance.size > 0)
4758             {
4759                 // Test work waiting.
4760                 static void uselessFun()
4761                 {
4762                     foreach (i; 0 .. 1_000_000) {}
4763                 }
4764 
4765                 auto uselessTasks = new typeof(task(&uselessFun))[1000];
4766                 foreach (ref uselessTask; uselessTasks)
4767                 {
4768                     uselessTask = task(&uselessFun);
4769                 }
4770                 foreach (ref uselessTask; uselessTasks)
4771                 {
4772                     poolInstance.put(uselessTask);
4773                 }
4774                 foreach (ref uselessTask; uselessTasks)
4775                 {
4776                     uselessTask.workForce();
4777                 }
4778             }
4779 
4780             // Test the case of non-random access + ref returns.
4781             int[] nums = [1,2,3,4,5];
4782             static struct RemoveRandom
4783             {
4784                 int[] arr;
4785 
4786                 ref int front()
4787                 {
4788                     return arr.front;
4789                 }
4790                 void popFront()
4791                 {
4792                     arr.popFront();
4793                 }
4794                 bool empty()
4795                 {
4796                     return arr.empty;
4797                 }
4798             }
4799 
4800             auto refRange = RemoveRandom(nums);
4801             foreach (ref elem; poolInstance.parallel(refRange))
4802             {
4803                 elem++;
4804             }
4805             assert(nums == [2,3,4,5,6], text(nums));
4806             stderr.writeln("Nums:  ", nums);
4807 
4808             poolInstance.stop();
4809         }
4810     }
4811 }
4812 
4813 @system unittest
4814 {
4815     static struct __S_12733
4816     {
4817         invariant() { assert(checksum == 1_234_567_890); }
4818         this(ulong u){n = u;}
4819         void opAssign(__S_12733 s){this.n = s.n;}
4820         ulong n;
4821         ulong checksum = 1_234_567_890;
4822     }
4823 
4824     static auto __genPair_12733(ulong n) { return __S_12733(n); }
4825     immutable ulong[] data = [ 2UL^^59-1, 2UL^^59-1, 2UL^^59-1, 112_272_537_195_293UL ];
4826 
4827     auto result = taskPool.amap!__genPair_12733(data);
4828 }
4829 
4830 @safe unittest
4831 {
4832     import std.range : iota;
4833 
4834     // this test was in std.range, but caused cycles.
4835     assert(__traits(compiles, { foreach (i; iota(0, 100UL).parallel) {} }));
4836 }
4837 
4838 @safe unittest
4839 {
4840     import std.algorithm.iteration : each;
4841 
4842     long[] arr;
4843     static assert(is(typeof({
4844         arr.parallel.each!"a++";
4845     })));
4846 }
4847 
4848 // https://issues.dlang.org/show_bug.cgi?id=17539
4849 @system unittest
4850 {
4851     import std.random : rndGen;
4852     // ensure compilation
4853     try foreach (rnd; rndGen.parallel) break;
4854     catch (ParallelForeachError e) {}
4855 }
Suggestion Box / Bug Report