1 /** 2 `std.parallelism` implements high-level primitives for SMP parallelism. 3 These include parallel foreach, parallel reduce, parallel eager map, pipelining 4 and future/promise parallelism. `std.parallelism` is recommended when the 5 same operation is to be executed in parallel on different data, or when a 6 function is to be executed in a background thread and its result returned to a 7 well-defined main thread. For communication between arbitrary threads, see 8 `std.concurrency`. 9 10 `std.parallelism` is based on the concept of a `Task`. A `Task` is an 11 object that represents the fundamental unit of work in this library and may be 12 executed in parallel with any other `Task`. Using `Task` 13 directly allows programming with a future/promise paradigm. All other 14 supported parallelism paradigms (parallel foreach, map, reduce, pipelining) 15 represent an additional level of abstraction over `Task`. They 16 automatically create one or more `Task` objects, or closely related types 17 that are conceptually identical but not part of the public API. 18 19 After creation, a `Task` may be executed in a new thread, or submitted 20 to a `TaskPool` for execution. A `TaskPool` encapsulates a task queue 21 and its worker threads. Its purpose is to efficiently map a large 22 number of `Task`s onto a smaller number of threads. A task queue is a 23 FIFO queue of `Task` objects that have been submitted to the 24 `TaskPool` and are awaiting execution. A worker thread is a thread that 25 is associated with exactly one task queue. It executes the `Task` at the 26 front of its queue when the queue has work available, or sleeps when 27 no work is available. Each task queue is associated with zero or 28 more worker threads. If the result of a `Task` is needed before execution 29 by a worker thread has begun, the `Task` can be removed from the task queue 30 and executed immediately in the thread where the result is needed. 31 32 Warning: Unless marked as `@trusted` or `@safe`, artifacts in 33 this module allow implicit data sharing between threads and cannot 34 guarantee that client code is free from low level data races. 35 36 Source: $(PHOBOSSRC std/parallelism.d) 37 Author: David Simcha 38 Copyright: Copyright (c) 2009-2011, David Simcha. 39 License: $(HTTP boost.org/LICENSE_1_0.txt, Boost License 1.0) 40 */ 41 module std.parallelism; 42 43 version (OSX) 44 version = Darwin; 45 else version (iOS) 46 version = Darwin; 47 else version (TVOS) 48 version = Darwin; 49 else version (WatchOS) 50 version = Darwin; 51 52 /// 53 @system unittest 54 { 55 import std.algorithm.iteration : map; 56 import std.math : approxEqual; 57 import std.parallelism : taskPool; 58 import std.range : iota; 59 60 // Parallel reduce can be combined with 61 // std.algorithm.iteration.map to interesting effect. 62 // The following example (thanks to Russel Winder) 63 // calculates pi by quadrature using 64 // std.algorithm.map and TaskPool.reduce. 65 // getTerm is evaluated in parallel as needed by 66 // TaskPool.reduce. 67 // 68 // Timings on an Intel i5-3450 quad core machine 69 // for n = 1_000_000_000: 70 // 71 // TaskPool.reduce: 1.067 s 72 // std.algorithm.reduce: 4.011 s 73 74 enum n = 1_000_000; 75 enum delta = 1.0 / n; 76 77 alias getTerm = (int i) 78 { 79 immutable x = ( i - 0.5 ) * delta; 80 return delta / ( 1.0 + x * x ) ; 81 }; 82 83 immutable pi = 4.0 * taskPool.reduce!"a + b"(n.iota.map!getTerm); 84 85 assert(pi.approxEqual(3.1415926)); 86 } 87 88 import core.atomic; 89 import core.memory; 90 import core.sync.condition; 91 import core.thread; 92 93 import std.functional; 94 import std.meta; 95 import std.range.primitives; 96 import std.traits; 97 98 version (Darwin) 99 { 100 version = useSysctlbyname; 101 } 102 else version (FreeBSD) 103 { 104 version = useSysctlbyname; 105 } 106 else version (DragonFlyBSD) 107 { 108 version = useSysctlbyname; 109 } 110 else version (NetBSD) 111 { 112 version = useSysctlbyname; 113 } 114 115 /* 116 (For now public undocumented with reserved name.) 117 118 A lazily initialized global constant. The underlying value is a shared global 119 statically initialized to `outOfBandValue` which must not be a legit value of 120 the constant. Upon the first call the situation is detected and the global is 121 initialized by calling `initializer`. The initializer is assumed to be pure 122 (even if not marked as such), i.e. return the same value upon repeated calls. 123 For that reason, no special precautions are taken so `initializer` may be called 124 more than one time leading to benign races on the cached value. 125 126 In the quiescent state the cost of the function is an atomic load from a global. 127 128 Params: 129 T = The type of the pseudo-constant (may be qualified) 130 outOfBandValue = A value that cannot be valid, it is used for initialization 131 initializer = The function performing initialization; must be `nothrow` 132 133 Returns: 134 The lazily initialized value 135 */ 136 @property pure 137 T __lazilyInitializedConstant(T, alias outOfBandValue, alias initializer)() 138 if (is(Unqual!T : T) 139 && is(typeof(initializer()) : T) 140 && is(typeof(outOfBandValue) : T)) 141 { 142 static T impl() nothrow 143 { 144 // Thread-local cache 145 static Unqual!T tls = outOfBandValue; 146 auto local = tls; 147 // Shortest path, no atomic operations 148 if (local != outOfBandValue) return local; 149 // Process-level cache 150 static shared Unqual!T result = outOfBandValue; 151 // Initialize both process-level cache and tls 152 local = atomicLoad(result); 153 if (local == outOfBandValue) 154 { 155 local = initializer(); 156 atomicStore(result, local); 157 } 158 tls = local; 159 return local; 160 } 161 162 import std.traits : SetFunctionAttributes; 163 alias Fun = SetFunctionAttributes!(typeof(&impl), "D", 164 functionAttributes!(typeof(&impl)) | FunctionAttribute.pure_); 165 auto purified = (() @trusted => cast(Fun) &impl)(); 166 return purified(); 167 } 168 169 // Returns the size of a cache line. 170 alias cacheLineSize = 171 __lazilyInitializedConstant!(immutable(size_t), size_t.max, cacheLineSizeImpl); 172 173 private size_t cacheLineSizeImpl() @nogc nothrow @trusted 174 { 175 size_t result = 0; 176 import core.cpuid : datacache; 177 foreach (ref const cachelevel; datacache) 178 { 179 if (cachelevel.lineSize > result && cachelevel.lineSize < uint.max) 180 { 181 result = cachelevel.lineSize; 182 } 183 } 184 return result; 185 } 186 187 @nogc @safe nothrow unittest 188 { 189 assert(cacheLineSize == cacheLineSizeImpl); 190 } 191 192 /* Atomics code. These forward to core.atomic, but are written like this 193 for two reasons: 194 195 1. They used to actually contain ASM code and I don' want to have to change 196 to directly calling core.atomic in a zillion different places. 197 198 2. core.atomic has some misc. issues that make my use cases difficult 199 without wrapping it. If I didn't wrap it, casts would be required 200 basically everywhere. 201 */ 202 private void atomicSetUbyte(T)(ref T stuff, T newVal) 203 if (__traits(isIntegral, T) && is(T : ubyte)) 204 { 205 //core.atomic.cas(cast(shared) &stuff, stuff, newVal); 206 atomicStore(*(cast(shared) &stuff), newVal); 207 } 208 209 private ubyte atomicReadUbyte(T)(ref T val) 210 if (__traits(isIntegral, T) && is(T : ubyte)) 211 { 212 return atomicLoad(*(cast(shared) &val)); 213 } 214 215 // This gets rid of the need for a lot of annoying casts in other parts of the 216 // code, when enums are involved. 217 private bool atomicCasUbyte(T)(ref T stuff, T testVal, T newVal) 218 if (__traits(isIntegral, T) && is(T : ubyte)) 219 { 220 return core.atomic.cas(cast(shared) &stuff, testVal, newVal); 221 } 222 223 /*--------------------- Generic helper functions, etc.------------------------*/ 224 private template MapType(R, functions...) 225 { 226 static assert(functions.length); 227 228 ElementType!R e = void; 229 alias MapType = 230 typeof(adjoin!(staticMap!(unaryFun, functions))(e)); 231 } 232 233 private template ReduceType(alias fun, R, E) 234 { 235 alias ReduceType = typeof(binaryFun!fun(E.init, ElementType!R.init)); 236 } 237 238 private template noUnsharedAliasing(T) 239 { 240 enum bool noUnsharedAliasing = !hasUnsharedAliasing!T; 241 } 242 243 // This template tests whether a function may be executed in parallel from 244 // @safe code via Task.executeInNewThread(). There is an additional 245 // requirement for executing it via a TaskPool. (See isSafeReturn). 246 private template isSafeTask(F) 247 { 248 enum bool isSafeTask = 249 (functionAttributes!F & (FunctionAttribute.safe | FunctionAttribute.trusted)) != 0 && 250 (functionAttributes!F & FunctionAttribute.ref_) == 0 && 251 (isFunctionPointer!F || !hasUnsharedAliasing!F) && 252 allSatisfy!(noUnsharedAliasing, Parameters!F); 253 } 254 255 @safe unittest 256 { 257 alias F1 = void function() @safe; 258 alias F2 = void function(); 259 alias F3 = void function(uint, string) @trusted; 260 alias F4 = void function(uint, char[]); 261 262 static assert( isSafeTask!F1); 263 static assert(!isSafeTask!F2); 264 static assert( isSafeTask!F3); 265 static assert(!isSafeTask!F4); 266 267 alias F5 = uint[] function(uint, string) pure @trusted; 268 static assert( isSafeTask!F5); 269 } 270 271 // This function decides whether Tasks that meet all of the other requirements 272 // for being executed from @safe code can be executed on a TaskPool. 273 // When executing via TaskPool, it's theoretically possible 274 // to return a value that is also pointed to by a worker thread's thread local 275 // storage. When executing from executeInNewThread(), the thread that executed 276 // the Task is terminated by the time the return value is visible in the calling 277 // thread, so this is a non-issue. It's also a non-issue for pure functions 278 // since they can't read global state. 279 private template isSafeReturn(T) 280 { 281 static if (!hasUnsharedAliasing!(T.ReturnType)) 282 { 283 enum isSafeReturn = true; 284 } 285 else static if (T.isPure) 286 { 287 enum isSafeReturn = true; 288 } 289 else 290 { 291 enum isSafeReturn = false; 292 } 293 } 294 295 private template randAssignable(R) 296 { 297 enum randAssignable = isRandomAccessRange!R && hasAssignableElements!R; 298 } 299 300 private enum TaskStatus : ubyte 301 { 302 notStarted, 303 inProgress, 304 done 305 } 306 307 private template AliasReturn(alias fun, T...) 308 { 309 alias AliasReturn = typeof({ T args; return fun(args); }); 310 } 311 312 // Should be private, but std.algorithm.reduce is used in the zero-thread case 313 // and won't work w/ private. 314 template reduceAdjoin(functions...) 315 { 316 static if (functions.length == 1) 317 { 318 alias reduceAdjoin = binaryFun!(functions[0]); 319 } 320 else 321 { 322 T reduceAdjoin(T, U)(T lhs, U rhs) 323 { 324 alias funs = staticMap!(binaryFun, functions); 325 326 foreach (i, Unused; typeof(lhs.expand)) 327 { 328 lhs.expand[i] = funs[i](lhs.expand[i], rhs); 329 } 330 331 return lhs; 332 } 333 } 334 } 335 336 private template reduceFinish(functions...) 337 { 338 static if (functions.length == 1) 339 { 340 alias reduceFinish = binaryFun!(functions[0]); 341 } 342 else 343 { 344 T reduceFinish(T)(T lhs, T rhs) 345 { 346 alias funs = staticMap!(binaryFun, functions); 347 348 foreach (i, Unused; typeof(lhs.expand)) 349 { 350 lhs.expand[i] = funs[i](lhs.expand[i], rhs.expand[i]); 351 } 352 353 return lhs; 354 } 355 } 356 } 357 358 private template isRoundRobin(R : RoundRobinBuffer!(C1, C2), C1, C2) 359 { 360 enum isRoundRobin = true; 361 } 362 363 private template isRoundRobin(T) 364 { 365 enum isRoundRobin = false; 366 } 367 368 @safe unittest 369 { 370 static assert( isRoundRobin!(RoundRobinBuffer!(void delegate(char[]), bool delegate()))); 371 static assert(!isRoundRobin!(uint)); 372 } 373 374 // This is the base "class" for all of the other tasks. Using C-style 375 // polymorphism to allow more direct control over memory allocation, etc. 376 private struct AbstractTask 377 { 378 AbstractTask* prev; 379 AbstractTask* next; 380 381 // Pointer to a function that executes this task. 382 void function(void*) runTask; 383 384 Throwable exception; 385 ubyte taskStatus = TaskStatus.notStarted; 386 387 bool done() @property 388 { 389 if (atomicReadUbyte(taskStatus) == TaskStatus.done) 390 { 391 if (exception) 392 { 393 throw exception; 394 } 395 396 return true; 397 } 398 399 return false; 400 } 401 402 void job() 403 { 404 runTask(&this); 405 } 406 } 407 408 /** 409 `Task` represents the fundamental unit of work. A `Task` may be 410 executed in parallel with any other `Task`. Using this struct directly 411 allows future/promise parallelism. In this paradigm, a function (or delegate 412 or other callable) is executed in a thread other than the one it was called 413 from. The calling thread does not block while the function is being executed. 414 A call to `workForce`, `yieldForce`, or `spinForce` is used to 415 ensure that the `Task` has finished executing and to obtain the return 416 value, if any. These functions and `done` also act as full memory barriers, 417 meaning that any memory writes made in the thread that executed the `Task` 418 are guaranteed to be visible in the calling thread after one of these functions 419 returns. 420 421 The $(REF task, std,parallelism) and $(REF scopedTask, std,parallelism) functions can 422 be used to create an instance of this struct. See `task` for usage examples. 423 424 Function results are returned from `yieldForce`, `spinForce` and 425 `workForce` by ref. If `fun` returns by ref, the reference will point 426 to the returned reference of `fun`. Otherwise it will point to a 427 field in this struct. 428 429 Copying of this struct is disabled, since it would provide no useful semantics. 430 If you want to pass this struct around, you should do so by reference or 431 pointer. 432 433 Bugs: Changes to `ref` and `out` arguments are not propagated to the 434 call site, only to `args` in this struct. 435 */ 436 struct Task(alias fun, Args...) 437 { 438 AbstractTask base = {runTask : &impl}; 439 alias base this; 440 441 private @property AbstractTask* basePtr() 442 { 443 return &base; 444 } 445 446 private static void impl(void* myTask) 447 { 448 import std.algorithm.internal : addressOf; 449 450 Task* myCastedTask = cast(typeof(this)*) myTask; 451 static if (is(ReturnType == void)) 452 { 453 fun(myCastedTask._args); 454 } 455 else static if (is(typeof(&(fun(myCastedTask._args))))) 456 { 457 myCastedTask.returnVal = addressOf(fun(myCastedTask._args)); 458 } 459 else 460 { 461 myCastedTask.returnVal = fun(myCastedTask._args); 462 } 463 } 464 465 private TaskPool pool; 466 private bool isScoped; // True if created with scopedTask. 467 468 Args _args; 469 470 /** 471 The arguments the function was called with. Changes to `out` and 472 `ref` arguments will be visible here. 473 */ 474 static if (__traits(isSame, fun, run)) 475 { 476 alias args = _args[1..$]; 477 } 478 else 479 { 480 alias args = _args; 481 } 482 483 484 // The purpose of this code is to decide whether functions whose 485 // return values have unshared aliasing can be executed via 486 // TaskPool from @safe code. See isSafeReturn. 487 static if (__traits(isSame, fun, run)) 488 { 489 static if (isFunctionPointer!(_args[0])) 490 { 491 private enum bool isPure = 492 (functionAttributes!(Args[0]) & FunctionAttribute.pure_) != 0; 493 } 494 else 495 { 496 // BUG: Should check this for delegates too, but std.traits 497 // apparently doesn't allow this. isPure is irrelevant 498 // for delegates, at least for now since shared delegates 499 // don't work. 500 private enum bool isPure = false; 501 } 502 503 } 504 else 505 { 506 // We already know that we can't execute aliases in @safe code, so 507 // just put a dummy value here. 508 private enum bool isPure = false; 509 } 510 511 512 /** 513 The return type of the function called by this `Task`. This can be 514 `void`. 515 */ 516 alias ReturnType = typeof(fun(_args)); 517 518 static if (!is(ReturnType == void)) 519 { 520 static if (is(typeof(&fun(_args)))) 521 { 522 // Ref return. 523 ReturnType* returnVal; 524 525 ref ReturnType fixRef(ReturnType* val) 526 { 527 return *val; 528 } 529 530 } 531 else 532 { 533 ReturnType returnVal; 534 535 ref ReturnType fixRef(ref ReturnType val) 536 { 537 return val; 538 } 539 } 540 } 541 542 private void enforcePool() 543 { 544 import std.exception : enforce; 545 enforce(this.pool !is null, "Job not submitted yet."); 546 } 547 548 static if (Args.length > 0) 549 { 550 private this(Args args) 551 { 552 _args = args; 553 } 554 } 555 556 // Work around DMD bug https://issues.dlang.org/show_bug.cgi?id=6588, 557 // allow immutable elements. 558 static if (allSatisfy!(isAssignable, Args)) 559 { 560 typeof(this) opAssign(typeof(this) rhs) 561 { 562 foreach (i, Type; typeof(this.tupleof)) 563 { 564 this.tupleof[i] = rhs.tupleof[i]; 565 } 566 return this; 567 } 568 } 569 else 570 { 571 @disable typeof(this) opAssign(typeof(this) rhs); 572 } 573 574 /** 575 If the `Task` isn't started yet, execute it in the current thread. 576 If it's done, return its return value, if any. If it's in progress, 577 busy spin until it's done, then return the return value. If it threw 578 an exception, rethrow that exception. 579 580 This function should be used when you expect the result of the 581 `Task` to be available on a timescale shorter than that of an OS 582 context switch. 583 */ 584 @property ref ReturnType spinForce() @trusted 585 { 586 enforcePool(); 587 588 this.pool.tryDeleteExecute(basePtr); 589 590 while (atomicReadUbyte(this.taskStatus) != TaskStatus.done) {} 591 592 if (exception) 593 { 594 throw exception; 595 } 596 597 static if (!is(ReturnType == void)) 598 { 599 return fixRef(this.returnVal); 600 } 601 } 602 603 /** 604 If the `Task` isn't started yet, execute it in the current thread. 605 If it's done, return its return value, if any. If it's in progress, 606 wait on a condition variable. If it threw an exception, rethrow that 607 exception. 608 609 This function should be used for expensive functions, as waiting on a 610 condition variable introduces latency, but avoids wasted CPU cycles. 611 */ 612 @property ref ReturnType yieldForce() @trusted 613 { 614 enforcePool(); 615 this.pool.tryDeleteExecute(basePtr); 616 617 if (done) 618 { 619 static if (is(ReturnType == void)) 620 { 621 return; 622 } 623 else 624 { 625 return fixRef(this.returnVal); 626 } 627 } 628 629 pool.waiterLock(); 630 scope(exit) pool.waiterUnlock(); 631 632 while (atomicReadUbyte(this.taskStatus) != TaskStatus.done) 633 { 634 pool.waitUntilCompletion(); 635 } 636 637 if (exception) 638 { 639 throw exception; // nocoverage 640 } 641 642 static if (!is(ReturnType == void)) 643 { 644 return fixRef(this.returnVal); 645 } 646 } 647 648 /** 649 If this `Task` was not started yet, execute it in the current 650 thread. If it is finished, return its result. If it is in progress, 651 execute any other `Task` from the `TaskPool` instance that 652 this `Task` was submitted to until this one 653 is finished. If it threw an exception, rethrow that exception. 654 If no other tasks are available or this `Task` was executed using 655 `executeInNewThread`, wait on a condition variable. 656 */ 657 @property ref ReturnType workForce() @trusted 658 { 659 enforcePool(); 660 this.pool.tryDeleteExecute(basePtr); 661 662 while (true) 663 { 664 if (done) // done() implicitly checks for exceptions. 665 { 666 static if (is(ReturnType == void)) 667 { 668 return; 669 } 670 else 671 { 672 return fixRef(this.returnVal); 673 } 674 } 675 676 AbstractTask* job; 677 { 678 // Locking explicitly and calling popNoSync() because 679 // pop() waits on a condition variable if there are no Tasks 680 // in the queue. 681 682 pool.queueLock(); 683 scope(exit) pool.queueUnlock(); 684 job = pool.popNoSync(); 685 } 686 687 688 if (job !is null) 689 { 690 691 version (verboseUnittest) 692 { 693 stderr.writeln("Doing workForce work."); 694 } 695 696 pool.doJob(job); 697 698 if (done) 699 { 700 static if (is(ReturnType == void)) 701 { 702 return; 703 } 704 else 705 { 706 return fixRef(this.returnVal); 707 } 708 } 709 } 710 else 711 { 712 version (verboseUnittest) 713 { 714 stderr.writeln("Yield from workForce."); 715 } 716 717 return yieldForce; 718 } 719 } 720 } 721 722 /** 723 Returns `true` if the `Task` is finished executing. 724 725 Throws: Rethrows any exception thrown during the execution of the 726 `Task`. 727 */ 728 @property bool done() @trusted 729 { 730 // Explicitly forwarded for documentation purposes. 731 return base.done; 732 } 733 734 /** 735 Create a new thread for executing this `Task`, execute it in the 736 newly created thread, then terminate the thread. This can be used for 737 future/promise parallelism. An explicit priority may be given 738 to the `Task`. If one is provided, its value is forwarded to 739 `core.thread.Thread.priority`. See $(REF task, std,parallelism) for 740 usage example. 741 */ 742 void executeInNewThread() @trusted 743 { 744 pool = new TaskPool(basePtr); 745 } 746 747 /// Ditto 748 void executeInNewThread(int priority) @trusted 749 { 750 pool = new TaskPool(basePtr, priority); 751 } 752 753 @safe ~this() 754 { 755 if (isScoped && pool !is null && taskStatus != TaskStatus.done) 756 { 757 yieldForce; 758 } 759 } 760 761 // When this is uncommented, it somehow gets called on returning from 762 // scopedTask even though the struct shouldn't be getting copied. 763 //@disable this(this) {} 764 } 765 766 // Calls `fpOrDelegate` with `args`. This is an 767 // adapter that makes `Task` work with delegates, function pointers and 768 // functors instead of just aliases. 769 ReturnType!F run(F, Args...)(F fpOrDelegate, ref Args args) 770 { 771 return fpOrDelegate(args); 772 } 773 774 /** 775 Creates a `Task` on the GC heap that calls an alias. This may be executed 776 via `Task.executeInNewThread` or by submitting to a 777 $(REF TaskPool, std,parallelism). A globally accessible instance of 778 `TaskPool` is provided by $(REF taskPool, std,parallelism). 779 780 Returns: A pointer to the `Task`. 781 782 Example: 783 --- 784 // Read two files into memory at the same time. 785 import std.file; 786 787 void main() 788 { 789 // Create and execute a Task for reading 790 // foo.txt. 791 auto file1Task = task!read("foo.txt"); 792 file1Task.executeInNewThread(); 793 794 // Read bar.txt in parallel. 795 auto file2Data = read("bar.txt"); 796 797 // Get the results of reading foo.txt. 798 auto file1Data = file1Task.yieldForce; 799 } 800 --- 801 802 --- 803 // Sorts an array using a parallel quick sort algorithm. 804 // The first partition is done serially. Both recursion 805 // branches are then executed in parallel. 806 // 807 // Timings for sorting an array of 1,000,000 doubles on 808 // an Athlon 64 X2 dual core machine: 809 // 810 // This implementation: 176 milliseconds. 811 // Equivalent serial implementation: 280 milliseconds 812 void parallelSort(T)(T[] data) 813 { 814 // Sort small subarrays serially. 815 if (data.length < 100) 816 { 817 std.algorithm.sort(data); 818 return; 819 } 820 821 // Partition the array. 822 swap(data[$ / 2], data[$ - 1]); 823 auto pivot = data[$ - 1]; 824 bool lessThanPivot(T elem) { return elem < pivot; } 825 826 auto greaterEqual = partition!lessThanPivot(data[0..$ - 1]); 827 swap(data[$ - greaterEqual.length - 1], data[$ - 1]); 828 829 auto less = data[0..$ - greaterEqual.length - 1]; 830 greaterEqual = data[$ - greaterEqual.length..$]; 831 832 // Execute both recursion branches in parallel. 833 auto recurseTask = task!parallelSort(greaterEqual); 834 taskPool.put(recurseTask); 835 parallelSort(less); 836 recurseTask.yieldForce; 837 } 838 --- 839 */ 840 auto task(alias fun, Args...)(Args args) 841 { 842 return new Task!(fun, Args)(args); 843 } 844 845 /** 846 Creates a `Task` on the GC heap that calls a function pointer, delegate, or 847 class/struct with overloaded opCall. 848 849 Example: 850 --- 851 // Read two files in at the same time again, 852 // but this time use a function pointer instead 853 // of an alias to represent std.file.read. 854 import std.file; 855 856 void main() 857 { 858 // Create and execute a Task for reading 859 // foo.txt. 860 auto file1Task = task(&read!string, "foo.txt", size_t.max); 861 file1Task.executeInNewThread(); 862 863 // Read bar.txt in parallel. 864 auto file2Data = read("bar.txt"); 865 866 // Get the results of reading foo.txt. 867 auto file1Data = file1Task.yieldForce; 868 } 869 --- 870 871 Notes: This function takes a non-scope delegate, meaning it can be 872 used with closures. If you can't allocate a closure due to objects 873 on the stack that have scoped destruction, see `scopedTask`, which 874 takes a scope delegate. 875 */ 876 auto task(F, Args...)(F delegateOrFp, Args args) 877 if (is(typeof(delegateOrFp(args))) && !isSafeTask!F) 878 { 879 return new Task!(run, F, Args)(delegateOrFp, args); 880 } 881 882 /** 883 Version of `task` usable from `@safe` code. Usage mechanics are 884 identical to the non-@safe case, but safety introduces some restrictions: 885 886 1. `fun` must be @safe or @trusted. 887 888 2. `F` must not have any unshared aliasing as defined by 889 $(REF hasUnsharedAliasing, std,traits). This means it 890 may not be an unshared delegate or a non-shared class or struct 891 with overloaded `opCall`. This also precludes accepting template 892 alias parameters. 893 894 3. `Args` must not have unshared aliasing. 895 896 4. `fun` must not return by reference. 897 898 5. The return type must not have unshared aliasing unless `fun` is 899 `pure` or the `Task` is executed via `executeInNewThread` instead 900 of using a `TaskPool`. 901 902 */ 903 @trusted auto task(F, Args...)(F fun, Args args) 904 if (is(typeof(fun(args))) && isSafeTask!F) 905 { 906 return new Task!(run, F, Args)(fun, args); 907 } 908 909 /** 910 These functions allow the creation of `Task` objects on the stack rather 911 than the GC heap. The lifetime of a `Task` created by `scopedTask` 912 cannot exceed the lifetime of the scope it was created in. 913 914 `scopedTask` might be preferred over `task`: 915 916 1. When a `Task` that calls a delegate is being created and a closure 917 cannot be allocated due to objects on the stack that have scoped 918 destruction. The delegate overload of `scopedTask` takes a `scope` 919 delegate. 920 921 2. As a micro-optimization, to avoid the heap allocation associated with 922 `task` or with the creation of a closure. 923 924 Usage is otherwise identical to `task`. 925 926 Notes: `Task` objects created using `scopedTask` will automatically 927 call `Task.yieldForce` in their destructor if necessary to ensure 928 the `Task` is complete before the stack frame they reside on is destroyed. 929 */ 930 auto scopedTask(alias fun, Args...)(Args args) 931 { 932 auto ret = Task!(fun, Args)(args); 933 ret.isScoped = true; 934 return ret; 935 } 936 937 /// Ditto 938 auto scopedTask(F, Args...)(scope F delegateOrFp, Args args) 939 if (is(typeof(delegateOrFp(args))) && !isSafeTask!F) 940 { 941 auto ret = Task!(run, F, Args)(delegateOrFp, args); 942 ret.isScoped = true; 943 return ret; 944 } 945 946 /// Ditto 947 @trusted auto scopedTask(F, Args...)(F fun, Args args) 948 if (is(typeof(fun(args))) && isSafeTask!F) 949 { 950 auto ret = Task!(run, F, Args)(fun, args); 951 ret.isScoped = true; 952 return ret; 953 } 954 955 version (useSysctlbyname) 956 private extern(C) int sysctlbyname( 957 const char *, void *, size_t *, void *, size_t 958 ) @nogc nothrow; 959 960 /** 961 The total number of CPU cores available on the current machine, as reported by 962 the operating system. 963 */ 964 alias totalCPUs = 965 __lazilyInitializedConstant!(immutable(uint), uint.max, totalCPUsImpl); 966 967 uint totalCPUsImpl() @nogc nothrow @trusted 968 { 969 version (Windows) 970 { 971 // BUGS: Only works on Windows 2000 and above. 972 import core.sys.windows.winbase : SYSTEM_INFO, GetSystemInfo; 973 import std.algorithm.comparison : max; 974 SYSTEM_INFO si; 975 GetSystemInfo(&si); 976 return max(1, cast(uint) si.dwNumberOfProcessors); 977 } 978 else version (linux) 979 { 980 import core.sys.linux.sched : CPU_COUNT, cpu_set_t, sched_getaffinity; 981 import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf; 982 983 cpu_set_t set = void; 984 if (sched_getaffinity(0, cpu_set_t.sizeof, &set) == 0) 985 { 986 int count = CPU_COUNT(&set); 987 if (count > 0) 988 return cast(uint) count; 989 } 990 return cast(uint) sysconf(_SC_NPROCESSORS_ONLN); 991 } 992 else version (Solaris) 993 { 994 import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf; 995 return cast(uint) sysconf(_SC_NPROCESSORS_ONLN); 996 } 997 else version (useSysctlbyname) 998 { 999 version (Darwin) 1000 { 1001 enum nameStr = "hw.physicalcpu"; 1002 } 1003 else version (FreeBSD) 1004 { 1005 auto nameStr = "hw.ncpu\0".ptr; 1006 } 1007 else version (DragonFlyBSD) 1008 { 1009 auto nameStr = "hw.ncpu\0".ptr; 1010 } 1011 else version (NetBSD) 1012 { 1013 auto nameStr = "hw.ncpu\0".ptr; 1014 } 1015 1016 uint result; 1017 size_t len = result.sizeof; 1018 sysctlbyname(nameStr, &result, &len, null, 0); 1019 return result; 1020 } 1021 else 1022 { 1023 static assert(0, "Don't know how to get N CPUs on this OS."); 1024 } 1025 } 1026 1027 /* 1028 This class serves two purposes: 1029 1030 1. It distinguishes std.parallelism threads from other threads so that 1031 the std.parallelism daemon threads can be terminated. 1032 1033 2. It adds a reference to the pool that the thread is a member of, 1034 which is also necessary to allow the daemon threads to be properly 1035 terminated. 1036 */ 1037 private final class ParallelismThread : Thread 1038 { 1039 this(void delegate() dg) 1040 { 1041 super(dg); 1042 } 1043 1044 TaskPool pool; 1045 } 1046 1047 // Kill daemon threads. 1048 shared static ~this() 1049 { 1050 foreach (ref thread; Thread) 1051 { 1052 auto pthread = cast(ParallelismThread) thread; 1053 if (pthread is null) continue; 1054 auto pool = pthread.pool; 1055 if (!pool.isDaemon) continue; 1056 pool.stop(); 1057 pthread.join(); 1058 } 1059 } 1060 1061 /** 1062 This class encapsulates a task queue and a set of worker threads. Its purpose 1063 is to efficiently map a large number of `Task`s onto a smaller number of 1064 threads. A task queue is a FIFO queue of `Task` objects that have been 1065 submitted to the `TaskPool` and are awaiting execution. A worker thread is a 1066 thread that executes the `Task` at the front of the queue when one is 1067 available and sleeps when the queue is empty. 1068 1069 This class should usually be used via the global instantiation 1070 available via the $(REF taskPool, std,parallelism) property. 1071 Occasionally it is useful to explicitly instantiate a `TaskPool`: 1072 1073 1. When you want `TaskPool` instances with multiple priorities, for example 1074 a low priority pool and a high priority pool. 1075 1076 2. When the threads in the global task pool are waiting on a synchronization 1077 primitive (for example a mutex), and you want to parallelize the code that 1078 needs to run before these threads can be resumed. 1079 1080 Note: The worker threads in this pool will not stop until 1081 `stop` or `finish` is called, even if the main thread 1082 has finished already. This may lead to programs that 1083 never end. If you do not want this behaviour, you can set `isDaemon` 1084 to true. 1085 */ 1086 final class TaskPool 1087 { 1088 private: 1089 1090 // A pool can either be a regular pool or a single-task pool. A 1091 // single-task pool is a dummy pool that's fired up for 1092 // Task.executeInNewThread(). 1093 bool isSingleTask; 1094 1095 ParallelismThread[] pool; 1096 Thread singleTaskThread; 1097 1098 AbstractTask* head; 1099 AbstractTask* tail; 1100 PoolState status = PoolState.running; 1101 Condition workerCondition; 1102 Condition waiterCondition; 1103 Mutex queueMutex; 1104 Mutex waiterMutex; // For waiterCondition 1105 1106 // The instanceStartIndex of the next instance that will be created. 1107 __gshared size_t nextInstanceIndex = 1; 1108 1109 // The index of the current thread. 1110 static size_t threadIndex; 1111 1112 // The index of the first thread in this instance. 1113 immutable size_t instanceStartIndex; 1114 1115 // The index that the next thread to be initialized in this pool will have. 1116 size_t nextThreadIndex; 1117 1118 enum PoolState : ubyte 1119 { 1120 running, 1121 finishing, 1122 stopNow 1123 } 1124 1125 void doJob(AbstractTask* job) 1126 { 1127 assert(job.taskStatus == TaskStatus.inProgress); 1128 assert(job.next is null); 1129 assert(job.prev is null); 1130 1131 scope(exit) 1132 { 1133 if (!isSingleTask) 1134 { 1135 waiterLock(); 1136 scope(exit) waiterUnlock(); 1137 notifyWaiters(); 1138 } 1139 } 1140 1141 try 1142 { 1143 job.job(); 1144 } 1145 catch (Throwable e) 1146 { 1147 job.exception = e; 1148 } 1149 1150 atomicSetUbyte(job.taskStatus, TaskStatus.done); 1151 } 1152 1153 // This function is used for dummy pools created by Task.executeInNewThread(). 1154 void doSingleTask() 1155 { 1156 // No synchronization. Pool is guaranteed to only have one thread, 1157 // and the queue is submitted to before this thread is created. 1158 assert(head); 1159 auto t = head; 1160 t.next = t.prev = head = null; 1161 doJob(t); 1162 } 1163 1164 // This function performs initialization for each thread that affects 1165 // thread local storage and therefore must be done from within the 1166 // worker thread. It then calls executeWorkLoop(). 1167 void startWorkLoop() 1168 { 1169 // Initialize thread index. 1170 { 1171 queueLock(); 1172 scope(exit) queueUnlock(); 1173 threadIndex = nextThreadIndex; 1174 nextThreadIndex++; 1175 } 1176 1177 executeWorkLoop(); 1178 } 1179 1180 // This is the main work loop that worker threads spend their time in 1181 // until they terminate. It's also entered by non-worker threads when 1182 // finish() is called with the blocking variable set to true. 1183 void executeWorkLoop() 1184 { 1185 while (atomicReadUbyte(status) != PoolState.stopNow) 1186 { 1187 AbstractTask* task = pop(); 1188 if (task is null) 1189 { 1190 if (atomicReadUbyte(status) == PoolState.finishing) 1191 { 1192 atomicSetUbyte(status, PoolState.stopNow); 1193 return; 1194 } 1195 } 1196 else 1197 { 1198 doJob(task); 1199 } 1200 } 1201 } 1202 1203 // Pop a task off the queue. 1204 AbstractTask* pop() 1205 { 1206 queueLock(); 1207 scope(exit) queueUnlock(); 1208 auto ret = popNoSync(); 1209 while (ret is null && status == PoolState.running) 1210 { 1211 wait(); 1212 ret = popNoSync(); 1213 } 1214 return ret; 1215 } 1216 1217 AbstractTask* popNoSync() 1218 out(returned) 1219 { 1220 /* If task.prev and task.next aren't null, then another thread 1221 * can try to delete this task from the pool after it's 1222 * alreadly been deleted/popped. 1223 */ 1224 if (returned !is null) 1225 { 1226 assert(returned.next is null); 1227 assert(returned.prev is null); 1228 } 1229 } 1230 do 1231 { 1232 if (isSingleTask) return null; 1233 1234 AbstractTask* returned = head; 1235 if (head !is null) 1236 { 1237 head = head.next; 1238 returned.prev = null; 1239 returned.next = null; 1240 returned.taskStatus = TaskStatus.inProgress; 1241 } 1242 if (head !is null) 1243 { 1244 head.prev = null; 1245 } 1246 1247 return returned; 1248 } 1249 1250 // Push a task onto the queue. 1251 void abstractPut(AbstractTask* task) 1252 { 1253 queueLock(); 1254 scope(exit) queueUnlock(); 1255 abstractPutNoSync(task); 1256 } 1257 1258 void abstractPutNoSync(AbstractTask* task) 1259 in 1260 { 1261 assert(task); 1262 } 1263 out 1264 { 1265 import std.conv : text; 1266 1267 assert(tail.prev !is tail); 1268 assert(tail.next is null, text(tail.prev, '\t', tail.next)); 1269 if (tail.prev !is null) 1270 { 1271 assert(tail.prev.next is tail, text(tail.prev, '\t', tail.next)); 1272 } 1273 } 1274 do 1275 { 1276 // Not using enforce() to save on function call overhead since this 1277 // is a performance critical function. 1278 if (status != PoolState.running) 1279 { 1280 throw new Error( 1281 "Cannot submit a new task to a pool after calling " ~ 1282 "finish() or stop()." 1283 ); 1284 } 1285 1286 task.next = null; 1287 if (head is null) //Queue is empty. 1288 { 1289 head = task; 1290 tail = task; 1291 tail.prev = null; 1292 } 1293 else 1294 { 1295 assert(tail); 1296 task.prev = tail; 1297 tail.next = task; 1298 tail = task; 1299 } 1300 notify(); 1301 } 1302 1303 void abstractPutGroupNoSync(AbstractTask* h, AbstractTask* t) 1304 { 1305 if (status != PoolState.running) 1306 { 1307 throw new Error( 1308 "Cannot submit a new task to a pool after calling " ~ 1309 "finish() or stop()." 1310 ); 1311 } 1312 1313 if (head is null) 1314 { 1315 head = h; 1316 tail = t; 1317 } 1318 else 1319 { 1320 h.prev = tail; 1321 tail.next = h; 1322 tail = t; 1323 } 1324 1325 notifyAll(); 1326 } 1327 1328 void tryDeleteExecute(AbstractTask* toExecute) 1329 { 1330 if (isSingleTask) return; 1331 1332 if ( !deleteItem(toExecute) ) 1333 { 1334 return; 1335 } 1336 1337 try 1338 { 1339 toExecute.job(); 1340 } 1341 catch (Exception e) 1342 { 1343 toExecute.exception = e; 1344 } 1345 1346 atomicSetUbyte(toExecute.taskStatus, TaskStatus.done); 1347 } 1348 1349 bool deleteItem(AbstractTask* item) 1350 { 1351 queueLock(); 1352 scope(exit) queueUnlock(); 1353 return deleteItemNoSync(item); 1354 } 1355 1356 bool deleteItemNoSync(AbstractTask* item) 1357 { 1358 if (item.taskStatus != TaskStatus.notStarted) 1359 { 1360 return false; 1361 } 1362 item.taskStatus = TaskStatus.inProgress; 1363 1364 if (item is head) 1365 { 1366 // Make sure head gets set properly. 1367 popNoSync(); 1368 return true; 1369 } 1370 if (item is tail) 1371 { 1372 tail = tail.prev; 1373 if (tail !is null) 1374 { 1375 tail.next = null; 1376 } 1377 item.next = null; 1378 item.prev = null; 1379 return true; 1380 } 1381 if (item.next !is null) 1382 { 1383 assert(item.next.prev is item); // Check queue consistency. 1384 item.next.prev = item.prev; 1385 } 1386 if (item.prev !is null) 1387 { 1388 assert(item.prev.next is item); // Check queue consistency. 1389 item.prev.next = item.next; 1390 } 1391 item.next = null; 1392 item.prev = null; 1393 return true; 1394 } 1395 1396 void queueLock() 1397 { 1398 assert(queueMutex); 1399 if (!isSingleTask) queueMutex.lock(); 1400 } 1401 1402 void queueUnlock() 1403 { 1404 assert(queueMutex); 1405 if (!isSingleTask) queueMutex.unlock(); 1406 } 1407 1408 void waiterLock() 1409 { 1410 if (!isSingleTask) waiterMutex.lock(); 1411 } 1412 1413 void waiterUnlock() 1414 { 1415 if (!isSingleTask) waiterMutex.unlock(); 1416 } 1417 1418 void wait() 1419 { 1420 if (!isSingleTask) workerCondition.wait(); 1421 } 1422 1423 void notify() 1424 { 1425 if (!isSingleTask) workerCondition.notify(); 1426 } 1427 1428 void notifyAll() 1429 { 1430 if (!isSingleTask) workerCondition.notifyAll(); 1431 } 1432 1433 void waitUntilCompletion() 1434 { 1435 if (isSingleTask) 1436 { 1437 singleTaskThread.join(); 1438 } 1439 else 1440 { 1441 waiterCondition.wait(); 1442 } 1443 } 1444 1445 void notifyWaiters() 1446 { 1447 if (!isSingleTask) waiterCondition.notifyAll(); 1448 } 1449 1450 // Private constructor for creating dummy pools that only have one thread, 1451 // only execute one Task, and then terminate. This is used for 1452 // Task.executeInNewThread(). 1453 this(AbstractTask* task, int priority = int.max) 1454 { 1455 assert(task); 1456 1457 // Dummy value, not used. 1458 instanceStartIndex = 0; 1459 1460 this.isSingleTask = true; 1461 task.taskStatus = TaskStatus.inProgress; 1462 this.head = task; 1463 singleTaskThread = new Thread(&doSingleTask); 1464 singleTaskThread.start(); 1465 1466 // Disabled until writing code to support 1467 // running thread with specified priority 1468 // See https://issues.dlang.org/show_bug.cgi?id=8960 1469 1470 /*if (priority != int.max) 1471 { 1472 singleTaskThread.priority = priority; 1473 }*/ 1474 } 1475 1476 public: 1477 // This is used in parallel_algorithm but is too unstable to document 1478 // as public API. 1479 size_t defaultWorkUnitSize(size_t rangeLen) const @safe pure nothrow 1480 { 1481 import std.algorithm.comparison : max; 1482 1483 if (this.size == 0) 1484 { 1485 return rangeLen; 1486 } 1487 1488 immutable size_t eightSize = 4 * (this.size + 1); 1489 auto ret = (rangeLen / eightSize) + ((rangeLen % eightSize == 0) ? 0 : 1); 1490 return max(ret, 1); 1491 } 1492 1493 /** 1494 Default constructor that initializes a `TaskPool` with 1495 `totalCPUs` - 1 worker threads. The minus 1 is included because the 1496 main thread will also be available to do work. 1497 1498 Note: On single-core machines, the primitives provided by `TaskPool` 1499 operate transparently in single-threaded mode. 1500 */ 1501 this() @trusted 1502 { 1503 this(totalCPUs - 1); 1504 } 1505 1506 /** 1507 Allows for custom number of worker threads. 1508 */ 1509 this(size_t nWorkers) @trusted 1510 { 1511 synchronized(typeid(TaskPool)) 1512 { 1513 instanceStartIndex = nextInstanceIndex; 1514 1515 // The first worker thread to be initialized will have this index, 1516 // and will increment it. The second worker to be initialized will 1517 // have this index plus 1. 1518 nextThreadIndex = instanceStartIndex; 1519 nextInstanceIndex += nWorkers; 1520 } 1521 1522 queueMutex = new Mutex(this); 1523 waiterMutex = new Mutex(); 1524 workerCondition = new Condition(queueMutex); 1525 waiterCondition = new Condition(waiterMutex); 1526 1527 pool = new ParallelismThread[nWorkers]; 1528 foreach (ref poolThread; pool) 1529 { 1530 poolThread = new ParallelismThread(&startWorkLoop); 1531 poolThread.pool = this; 1532 poolThread.start(); 1533 } 1534 } 1535 1536 /** 1537 Implements a parallel foreach loop over a range. This works by implicitly 1538 creating and submitting one `Task` to the `TaskPool` for each worker 1539 thread. A work unit is a set of consecutive elements of `range` to 1540 be processed by a worker thread between communication with any other 1541 thread. The number of elements processed per work unit is controlled by the 1542 `workUnitSize` parameter. Smaller work units provide better load 1543 balancing, but larger work units avoid the overhead of communicating 1544 with other threads frequently to fetch the next work unit. Large work 1545 units also avoid false sharing in cases where the range is being modified. 1546 The less time a single iteration of the loop takes, the larger 1547 `workUnitSize` should be. For very expensive loop bodies, 1548 `workUnitSize` should be 1. An overload that chooses a default work 1549 unit size is also available. 1550 1551 Example: 1552 --- 1553 // Find the logarithm of every number from 1 to 1554 // 10_000_000 in parallel. 1555 auto logs = new double[10_000_000]; 1556 1557 // Parallel foreach works with or without an index 1558 // variable. It can be iterate by ref if range.front 1559 // returns by ref. 1560 1561 // Iterate over logs using work units of size 100. 1562 foreach (i, ref elem; taskPool.parallel(logs, 100)) 1563 { 1564 elem = log(i + 1.0); 1565 } 1566 1567 // Same thing, but use the default work unit size. 1568 // 1569 // Timings on an Athlon 64 X2 dual core machine: 1570 // 1571 // Parallel foreach: 388 milliseconds 1572 // Regular foreach: 619 milliseconds 1573 foreach (i, ref elem; taskPool.parallel(logs)) 1574 { 1575 elem = log(i + 1.0); 1576 } 1577 --- 1578 1579 Notes: 1580 1581 The memory usage of this implementation is guaranteed to be constant 1582 in `range.length`. 1583 1584 Breaking from a parallel foreach loop via a break, labeled break, 1585 labeled continue, return or goto statement throws a 1586 `ParallelForeachError`. 1587 1588 In the case of non-random access ranges, parallel foreach buffers lazily 1589 to an array of size `workUnitSize` before executing the parallel portion 1590 of the loop. The exception is that, if a parallel foreach is executed 1591 over a range returned by `asyncBuf` or `map`, the copying is elided 1592 and the buffers are simply swapped. In this case `workUnitSize` is 1593 ignored and the work unit size is set to the buffer size of `range`. 1594 1595 A memory barrier is guaranteed to be executed on exit from the loop, 1596 so that results produced by all threads are visible in the calling thread. 1597 1598 $(B Exception Handling): 1599 1600 When at least one exception is thrown from inside a parallel foreach loop, 1601 the submission of additional `Task` objects is terminated as soon as 1602 possible, in a non-deterministic manner. All executing or 1603 enqueued work units are allowed to complete. Then, all exceptions that 1604 were thrown by any work unit are chained using `Throwable.next` and 1605 rethrown. The order of the exception chaining is non-deterministic. 1606 */ 1607 ParallelForeach!R parallel(R)(R range, size_t workUnitSize) 1608 { 1609 import std.exception : enforce; 1610 enforce(workUnitSize > 0, "workUnitSize must be > 0."); 1611 alias RetType = ParallelForeach!R; 1612 return RetType(this, range, workUnitSize); 1613 } 1614 1615 1616 /// Ditto 1617 ParallelForeach!R parallel(R)(R range) 1618 { 1619 static if (hasLength!R) 1620 { 1621 // Default work unit size is such that we would use 4x as many 1622 // slots as are in this thread pool. 1623 size_t workUnitSize = defaultWorkUnitSize(range.length); 1624 return parallel(range, workUnitSize); 1625 } 1626 else 1627 { 1628 // Just use a really, really dumb guess if the user is too lazy to 1629 // specify. 1630 return parallel(range, 512); 1631 } 1632 } 1633 1634 /// 1635 template amap(functions...) 1636 { 1637 /** 1638 Eager parallel map. The eagerness of this function means it has less 1639 overhead than the lazily evaluated `TaskPool.map` and should be 1640 preferred where the memory requirements of eagerness are acceptable. 1641 `functions` are the functions to be evaluated, passed as template 1642 alias parameters in a style similar to 1643 $(REF map, std,algorithm,iteration). 1644 The first argument must be a random access range. For performance 1645 reasons, amap will assume the range elements have not yet been 1646 initialized. Elements will be overwritten without calling a destructor 1647 nor doing an assignment. As such, the range must not contain meaningful 1648 data$(DDOC_COMMENT not a section): either un-initialized objects, or 1649 objects in their `.init` state. 1650 1651 --- 1652 auto numbers = iota(100_000_000.0); 1653 1654 // Find the square roots of numbers. 1655 // 1656 // Timings on an Athlon 64 X2 dual core machine: 1657 // 1658 // Parallel eager map: 0.802 s 1659 // Equivalent serial implementation: 1.768 s 1660 auto squareRoots = taskPool.amap!sqrt(numbers); 1661 --- 1662 1663 Immediately after the range argument, an optional work unit size argument 1664 may be provided. Work units as used by `amap` are identical to those 1665 defined for parallel foreach. If no work unit size is provided, the 1666 default work unit size is used. 1667 1668 --- 1669 // Same thing, but make work unit size 100. 1670 auto squareRoots = taskPool.amap!sqrt(numbers, 100); 1671 --- 1672 1673 An output range for returning the results may be provided as the last 1674 argument. If one is not provided, an array of the proper type will be 1675 allocated on the garbage collected heap. If one is provided, it must be a 1676 random access range with assignable elements, must have reference 1677 semantics with respect to assignment to its elements, and must have the 1678 same length as the input range. Writing to adjacent elements from 1679 different threads must be safe. 1680 1681 --- 1682 // Same thing, but explicitly allocate an array 1683 // to return the results in. The element type 1684 // of the array may be either the exact type 1685 // returned by functions or an implicit conversion 1686 // target. 1687 auto squareRoots = new float[numbers.length]; 1688 taskPool.amap!sqrt(numbers, squareRoots); 1689 1690 // Multiple functions, explicit output range, and 1691 // explicit work unit size. 1692 auto results = new Tuple!(float, real)[numbers.length]; 1693 taskPool.amap!(sqrt, log)(numbers, 100, results); 1694 --- 1695 1696 Note: 1697 1698 A memory barrier is guaranteed to be executed after all results are written 1699 but before returning so that results produced by all threads are visible 1700 in the calling thread. 1701 1702 Tips: 1703 1704 To perform the mapping operation in place, provide the same range for the 1705 input and output range. 1706 1707 To parallelize the copying of a range with expensive to evaluate elements 1708 to an array, pass an identity function (a function that just returns 1709 whatever argument is provided to it) to `amap`. 1710 1711 $(B Exception Handling): 1712 1713 When at least one exception is thrown from inside the map functions, 1714 the submission of additional `Task` objects is terminated as soon as 1715 possible, in a non-deterministic manner. All currently executing or 1716 enqueued work units are allowed to complete. Then, all exceptions that 1717 were thrown from any work unit are chained using `Throwable.next` and 1718 rethrown. The order of the exception chaining is non-deterministic. 1719 */ 1720 auto amap(Args...)(Args args) 1721 if (isRandomAccessRange!(Args[0])) 1722 { 1723 import std.conv : emplaceRef; 1724 1725 alias fun = adjoin!(staticMap!(unaryFun, functions)); 1726 1727 alias range = args[0]; 1728 immutable len = range.length; 1729 1730 static if ( 1731 Args.length > 1 && 1732 randAssignable!(Args[$ - 1]) && 1733 is(MapType!(Args[0], functions) : ElementType!(Args[$ - 1])) 1734 ) 1735 { 1736 import std.conv : text; 1737 import std.exception : enforce; 1738 1739 alias buf = args[$ - 1]; 1740 alias args2 = args[0..$ - 1]; 1741 alias Args2 = Args[0..$ - 1]; 1742 enforce(buf.length == len, 1743 text("Can't use a user supplied buffer that's the wrong ", 1744 "size. (Expected :", len, " Got: ", buf.length)); 1745 } 1746 else static if (randAssignable!(Args[$ - 1]) && Args.length > 1) 1747 { 1748 static assert(0, "Wrong buffer type."); 1749 } 1750 else 1751 { 1752 import std.array : uninitializedArray; 1753 1754 auto buf = uninitializedArray!(MapType!(Args[0], functions)[])(len); 1755 alias args2 = args; 1756 alias Args2 = Args; 1757 } 1758 1759 if (!len) return buf; 1760 1761 static if (isIntegral!(Args2[$ - 1])) 1762 { 1763 static assert(args2.length == 2); 1764 auto workUnitSize = cast(size_t) args2[1]; 1765 } 1766 else 1767 { 1768 static assert(args2.length == 1, Args); 1769 auto workUnitSize = defaultWorkUnitSize(range.length); 1770 } 1771 1772 alias R = typeof(range); 1773 1774 if (workUnitSize > len) 1775 { 1776 workUnitSize = len; 1777 } 1778 1779 // Handle as a special case: 1780 if (size == 0) 1781 { 1782 size_t index = 0; 1783 foreach (elem; range) 1784 { 1785 emplaceRef(buf[index++], fun(elem)); 1786 } 1787 return buf; 1788 } 1789 1790 // Effectively -1: chunkIndex + 1 == 0: 1791 shared size_t workUnitIndex = size_t.max; 1792 shared bool shouldContinue = true; 1793 1794 void doIt() 1795 { 1796 import std.algorithm.comparison : min; 1797 1798 scope(failure) 1799 { 1800 // If an exception is thrown, all threads should bail. 1801 atomicStore(shouldContinue, false); 1802 } 1803 1804 while (atomicLoad(shouldContinue)) 1805 { 1806 immutable myUnitIndex = atomicOp!"+="(workUnitIndex, 1); 1807 immutable start = workUnitSize * myUnitIndex; 1808 if (start >= len) 1809 { 1810 atomicStore(shouldContinue, false); 1811 break; 1812 } 1813 1814 immutable end = min(len, start + workUnitSize); 1815 1816 static if (hasSlicing!R) 1817 { 1818 auto subrange = range[start .. end]; 1819 foreach (i; start .. end) 1820 { 1821 emplaceRef(buf[i], fun(subrange.front)); 1822 subrange.popFront(); 1823 } 1824 } 1825 else 1826 { 1827 foreach (i; start .. end) 1828 { 1829 emplaceRef(buf[i], fun(range[i])); 1830 } 1831 } 1832 } 1833 } 1834 1835 submitAndExecute(this, &doIt); 1836 return buf; 1837 } 1838 } 1839 1840 /// 1841 template map(functions...) 1842 { 1843 /** 1844 A semi-lazy parallel map that can be used for pipelining. The map 1845 functions are evaluated for the first `bufSize` elements and stored in a 1846 buffer and made available to `popFront`. Meanwhile, in the 1847 background a second buffer of the same size is filled. When the first 1848 buffer is exhausted, it is swapped with the second buffer and filled while 1849 the values from what was originally the second buffer are read. This 1850 implementation allows for elements to be written to the buffer without 1851 the need for atomic operations or synchronization for each write, and 1852 enables the mapping function to be evaluated efficiently in parallel. 1853 1854 `map` has more overhead than the simpler procedure used by `amap` 1855 but avoids the need to keep all results in memory simultaneously and works 1856 with non-random access ranges. 1857 1858 Params: 1859 1860 source = The $(REF_ALTTEXT input range, isInputRange, std,range,primitives) 1861 to be mapped. If `source` is not random 1862 access it will be lazily buffered to an array of size `bufSize` before 1863 the map function is evaluated. (For an exception to this rule, see Notes.) 1864 1865 bufSize = The size of the buffer to store the evaluated elements. 1866 1867 workUnitSize = The number of elements to evaluate in a single 1868 `Task`. Must be less than or equal to `bufSize`, and 1869 should be a fraction of `bufSize` such that all worker threads can be 1870 used. If the default of size_t.max is used, workUnitSize will be set to 1871 the pool-wide default. 1872 1873 Returns: An input range representing the results of the map. This range 1874 has a length iff `source` has a length. 1875 1876 Notes: 1877 1878 If a range returned by `map` or `asyncBuf` is used as an input to 1879 `map`, then as an optimization the copying from the output buffer 1880 of the first range to the input buffer of the second range is elided, even 1881 though the ranges returned by `map` and `asyncBuf` are non-random 1882 access ranges. This means that the `bufSize` parameter passed to the 1883 current call to `map` will be ignored and the size of the buffer 1884 will be the buffer size of `source`. 1885 1886 Example: 1887 --- 1888 // Pipeline reading a file, converting each line 1889 // to a number, taking the logarithms of the numbers, 1890 // and performing the additions necessary to find 1891 // the sum of the logarithms. 1892 1893 auto lineRange = File("numberList.txt").byLine(); 1894 auto dupedLines = std.algorithm.map!"a.idup"(lineRange); 1895 auto nums = taskPool.map!(to!double)(dupedLines); 1896 auto logs = taskPool.map!log10(nums); 1897 1898 double sum = 0; 1899 foreach (elem; logs) 1900 { 1901 sum += elem; 1902 } 1903 --- 1904 1905 $(B Exception Handling): 1906 1907 Any exceptions thrown while iterating over `source` 1908 or computing the map function are re-thrown on a call to `popFront` or, 1909 if thrown during construction, are simply allowed to propagate to the 1910 caller. In the case of exceptions thrown while computing the map function, 1911 the exceptions are chained as in `TaskPool.amap`. 1912 */ 1913 auto 1914 map(S)(S source, size_t bufSize = 100, size_t workUnitSize = size_t.max) 1915 if (isInputRange!S) 1916 { 1917 import std.exception : enforce; 1918 1919 enforce(workUnitSize == size_t.max || workUnitSize <= bufSize, 1920 "Work unit size must be smaller than buffer size."); 1921 alias fun = adjoin!(staticMap!(unaryFun, functions)); 1922 1923 static final class Map 1924 { 1925 // This is a class because the task needs to be located on the 1926 // heap and in the non-random access case source needs to be on 1927 // the heap, too. 1928 1929 private: 1930 enum bufferTrick = is(typeof(source.buf1)) && 1931 is(typeof(source.bufPos)) && 1932 is(typeof(source.doBufSwap())); 1933 1934 alias E = MapType!(S, functions); 1935 E[] buf1, buf2; 1936 S source; 1937 TaskPool pool; 1938 Task!(run, E[] delegate(E[]), E[]) nextBufTask; 1939 size_t workUnitSize; 1940 size_t bufPos; 1941 bool lastTaskWaited; 1942 1943 static if (isRandomAccessRange!S) 1944 { 1945 alias FromType = S; 1946 1947 void popSource() 1948 { 1949 import std.algorithm.comparison : min; 1950 1951 static if (__traits(compiles, source[0 .. source.length])) 1952 { 1953 source = source[min(buf1.length, source.length)..source.length]; 1954 } 1955 else static if (__traits(compiles, source[0..$])) 1956 { 1957 source = source[min(buf1.length, source.length)..$]; 1958 } 1959 else 1960 { 1961 static assert(0, "S must have slicing for Map." 1962 ~ " " ~ S.stringof ~ " doesn't."); 1963 } 1964 } 1965 } 1966 else static if (bufferTrick) 1967 { 1968 // Make sure we don't have the buffer recycling overload of 1969 // asyncBuf. 1970 static if ( 1971 is(typeof(source.source)) && 1972 isRoundRobin!(typeof(source.source)) 1973 ) 1974 { 1975 static assert(0, "Cannot execute a parallel map on " ~ 1976 "the buffer recycling overload of asyncBuf." 1977 ); 1978 } 1979 1980 alias FromType = typeof(source.buf1); 1981 FromType from; 1982 1983 // Just swap our input buffer with source's output buffer. 1984 // No need to copy element by element. 1985 FromType dumpToFrom() 1986 { 1987 import std.algorithm.mutation : swap; 1988 1989 assert(source.buf1.length <= from.length); 1990 from.length = source.buf1.length; 1991 swap(source.buf1, from); 1992 1993 // Just in case this source has been popped before 1994 // being sent to map: 1995 from = from[source.bufPos..$]; 1996 1997 static if (is(typeof(source._length))) 1998 { 1999 source._length -= (from.length - source.bufPos); 2000 } 2001 2002 source.doBufSwap(); 2003 2004 return from; 2005 } 2006 } 2007 else 2008 { 2009 alias FromType = ElementType!S[]; 2010 2011 // The temporary array that data is copied to before being 2012 // mapped. 2013 FromType from; 2014 2015 FromType dumpToFrom() 2016 { 2017 assert(from !is null); 2018 2019 size_t i; 2020 for (; !source.empty && i < from.length; source.popFront()) 2021 { 2022 from[i++] = source.front; 2023 } 2024 2025 from = from[0 .. i]; 2026 return from; 2027 } 2028 } 2029 2030 static if (hasLength!S) 2031 { 2032 size_t _length; 2033 2034 public @property size_t length() const @safe pure nothrow 2035 { 2036 return _length; 2037 } 2038 } 2039 2040 this(S source, size_t bufSize, size_t workUnitSize, TaskPool pool) 2041 { 2042 static if (bufferTrick) 2043 { 2044 bufSize = source.buf1.length; 2045 } 2046 2047 buf1.length = bufSize; 2048 buf2.length = bufSize; 2049 2050 static if (!isRandomAccessRange!S) 2051 { 2052 from.length = bufSize; 2053 } 2054 2055 this.workUnitSize = (workUnitSize == size_t.max) ? 2056 pool.defaultWorkUnitSize(bufSize) : workUnitSize; 2057 this.source = source; 2058 this.pool = pool; 2059 2060 static if (hasLength!S) 2061 { 2062 _length = source.length; 2063 } 2064 2065 buf1 = fillBuf(buf1); 2066 submitBuf2(); 2067 } 2068 2069 // The from parameter is a dummy and ignored in the random access 2070 // case. 2071 E[] fillBuf(E[] buf) 2072 { 2073 import std.algorithm.comparison : min; 2074 2075 static if (isRandomAccessRange!S) 2076 { 2077 import std.range : take; 2078 auto toMap = take(source, buf.length); 2079 scope(success) popSource(); 2080 } 2081 else 2082 { 2083 auto toMap = dumpToFrom(); 2084 } 2085 2086 buf = buf[0 .. min(buf.length, toMap.length)]; 2087 2088 // Handle as a special case: 2089 if (pool.size == 0) 2090 { 2091 size_t index = 0; 2092 foreach (elem; toMap) 2093 { 2094 buf[index++] = fun(elem); 2095 } 2096 return buf; 2097 } 2098 2099 pool.amap!functions(toMap, workUnitSize, buf); 2100 2101 return buf; 2102 } 2103 2104 void submitBuf2() 2105 in 2106 { 2107 assert(nextBufTask.prev is null); 2108 assert(nextBufTask.next is null); 2109 } 2110 do 2111 { 2112 // Hack to reuse the task object. 2113 2114 nextBufTask = typeof(nextBufTask).init; 2115 nextBufTask._args[0] = &fillBuf; 2116 nextBufTask._args[1] = buf2; 2117 pool.put(nextBufTask); 2118 } 2119 2120 void doBufSwap() 2121 { 2122 if (lastTaskWaited) 2123 { 2124 // Then the source is empty. Signal it here. 2125 buf1 = null; 2126 buf2 = null; 2127 2128 static if (!isRandomAccessRange!S) 2129 { 2130 from = null; 2131 } 2132 2133 return; 2134 } 2135 2136 buf2 = buf1; 2137 buf1 = nextBufTask.yieldForce; 2138 bufPos = 0; 2139 2140 if (source.empty) 2141 { 2142 lastTaskWaited = true; 2143 } 2144 else 2145 { 2146 submitBuf2(); 2147 } 2148 } 2149 2150 public: 2151 @property auto front() 2152 { 2153 return buf1[bufPos]; 2154 } 2155 2156 void popFront() 2157 { 2158 static if (hasLength!S) 2159 { 2160 _length--; 2161 } 2162 2163 bufPos++; 2164 if (bufPos >= buf1.length) 2165 { 2166 doBufSwap(); 2167 } 2168 } 2169 2170 static if (isInfinite!S) 2171 { 2172 enum bool empty = false; 2173 } 2174 else 2175 { 2176 2177 bool empty() const @property 2178 { 2179 // popFront() sets this when source is empty 2180 return buf1.length == 0; 2181 } 2182 } 2183 } 2184 return new Map(source, bufSize, workUnitSize, this); 2185 } 2186 } 2187 2188 /** 2189 Given a `source` range that is expensive to iterate over, returns an 2190 $(REF_ALTTEXT input range, isInputRange, std,range,primitives) that 2191 asynchronously buffers the contents of `source` into a buffer of `bufSize` elements in a worker thread, 2192 while making previously buffered elements from a second buffer, also of size 2193 `bufSize`, available via the range interface of the returned 2194 object. The returned range has a length iff `hasLength!S`. 2195 `asyncBuf` is useful, for example, when performing expensive operations 2196 on the elements of ranges that represent data on a disk or network. 2197 2198 Example: 2199 --- 2200 import std.conv, std.stdio; 2201 2202 void main() 2203 { 2204 // Fetch lines of a file in a background thread 2205 // while processing previously fetched lines, 2206 // dealing with byLine's buffer recycling by 2207 // eagerly duplicating every line. 2208 auto lines = File("foo.txt").byLine(); 2209 auto duped = std.algorithm.map!"a.idup"(lines); 2210 2211 // Fetch more lines in the background while we 2212 // process the lines already read into memory 2213 // into a matrix of doubles. 2214 double[][] matrix; 2215 auto asyncReader = taskPool.asyncBuf(duped); 2216 2217 foreach (line; asyncReader) 2218 { 2219 auto ls = line.split("\t"); 2220 matrix ~= to!(double[])(ls); 2221 } 2222 } 2223 --- 2224 2225 $(B Exception Handling): 2226 2227 Any exceptions thrown while iterating over `source` are re-thrown on a 2228 call to `popFront` or, if thrown during construction, simply 2229 allowed to propagate to the caller. 2230 */ 2231 auto asyncBuf(S)(S source, size_t bufSize = 100) if (isInputRange!S) 2232 { 2233 static final class AsyncBuf 2234 { 2235 // This is a class because the task and source both need to be on 2236 // the heap. 2237 2238 // The element type of S. 2239 alias E = ElementType!S; // Needs to be here b/c of forward ref bugs. 2240 2241 private: 2242 E[] buf1, buf2; 2243 S source; 2244 TaskPool pool; 2245 Task!(run, E[] delegate(E[]), E[]) nextBufTask; 2246 size_t bufPos; 2247 bool lastTaskWaited; 2248 2249 static if (hasLength!S) 2250 { 2251 size_t _length; 2252 2253 // Available if hasLength!S. 2254 public @property size_t length() const @safe pure nothrow 2255 { 2256 return _length; 2257 } 2258 } 2259 2260 this(S source, size_t bufSize, TaskPool pool) 2261 { 2262 buf1.length = bufSize; 2263 buf2.length = bufSize; 2264 2265 this.source = source; 2266 this.pool = pool; 2267 2268 static if (hasLength!S) 2269 { 2270 _length = source.length; 2271 } 2272 2273 buf1 = fillBuf(buf1); 2274 submitBuf2(); 2275 } 2276 2277 E[] fillBuf(E[] buf) 2278 { 2279 assert(buf !is null); 2280 2281 size_t i; 2282 for (; !source.empty && i < buf.length; source.popFront()) 2283 { 2284 buf[i++] = source.front; 2285 } 2286 2287 buf = buf[0 .. i]; 2288 return buf; 2289 } 2290 2291 void submitBuf2() 2292 in 2293 { 2294 assert(nextBufTask.prev is null); 2295 assert(nextBufTask.next is null); 2296 } 2297 do 2298 { 2299 // Hack to reuse the task object. 2300 2301 nextBufTask = typeof(nextBufTask).init; 2302 nextBufTask._args[0] = &fillBuf; 2303 nextBufTask._args[1] = buf2; 2304 pool.put(nextBufTask); 2305 } 2306 2307 void doBufSwap() 2308 { 2309 if (lastTaskWaited) 2310 { 2311 // Then source is empty. Signal it here. 2312 buf1 = null; 2313 buf2 = null; 2314 return; 2315 } 2316 2317 buf2 = buf1; 2318 buf1 = nextBufTask.yieldForce; 2319 bufPos = 0; 2320 2321 if (source.empty) 2322 { 2323 lastTaskWaited = true; 2324 } 2325 else 2326 { 2327 submitBuf2(); 2328 } 2329 } 2330 2331 public: 2332 E front() @property 2333 { 2334 return buf1[bufPos]; 2335 } 2336 2337 void popFront() 2338 { 2339 static if (hasLength!S) 2340 { 2341 _length--; 2342 } 2343 2344 bufPos++; 2345 if (bufPos >= buf1.length) 2346 { 2347 doBufSwap(); 2348 } 2349 } 2350 2351 static if (isInfinite!S) 2352 { 2353 enum bool empty = false; 2354 } 2355 2356 else 2357 { 2358 /// 2359 bool empty() @property 2360 { 2361 // popFront() sets this when source is empty: 2362 return buf1.length == 0; 2363 } 2364 } 2365 } 2366 return new AsyncBuf(source, bufSize, this); 2367 } 2368 2369 /** 2370 Given a callable object `next` that writes to a user-provided buffer and 2371 a second callable object `empty` that determines whether more data is 2372 available to write via `next`, returns an input range that 2373 asynchronously calls `next` with a set of size `nBuffers` of buffers 2374 and makes the results available in the order they were obtained via the 2375 input range interface of the returned object. Similarly to the 2376 input range overload of `asyncBuf`, the first half of the buffers 2377 are made available via the range interface while the second half are 2378 filled and vice-versa. 2379 2380 Params: 2381 2382 next = A callable object that takes a single argument that must be an array 2383 with mutable elements. When called, `next` writes data to 2384 the array provided by the caller. 2385 2386 empty = A callable object that takes no arguments and returns a type 2387 implicitly convertible to `bool`. This is used to signify 2388 that no more data is available to be obtained by calling `next`. 2389 2390 initialBufSize = The initial size of each buffer. If `next` takes its 2391 array by reference, it may resize the buffers. 2392 2393 nBuffers = The number of buffers to cycle through when calling `next`. 2394 2395 Example: 2396 --- 2397 // Fetch lines of a file in a background 2398 // thread while processing previously fetched 2399 // lines, without duplicating any lines. 2400 auto file = File("foo.txt"); 2401 2402 void next(ref char[] buf) 2403 { 2404 file.readln(buf); 2405 } 2406 2407 // Fetch more lines in the background while we 2408 // process the lines already read into memory 2409 // into a matrix of doubles. 2410 double[][] matrix; 2411 auto asyncReader = taskPool.asyncBuf(&next, &file.eof); 2412 2413 foreach (line; asyncReader) 2414 { 2415 auto ls = line.split("\t"); 2416 matrix ~= to!(double[])(ls); 2417 } 2418 --- 2419 2420 $(B Exception Handling): 2421 2422 Any exceptions thrown while iterating over `range` are re-thrown on a 2423 call to `popFront`. 2424 2425 Warning: 2426 2427 Using the range returned by this function in a parallel foreach loop 2428 will not work because buffers may be overwritten while the task that 2429 processes them is in queue. This is checked for at compile time 2430 and will result in a static assertion failure. 2431 */ 2432 auto asyncBuf(C1, C2)(C1 next, C2 empty, size_t initialBufSize = 0, size_t nBuffers = 100) 2433 if (is(typeof(C2.init()) : bool) && 2434 Parameters!C1.length == 1 && 2435 Parameters!C2.length == 0 && 2436 isArray!(Parameters!C1[0]) 2437 ) { 2438 auto roundRobin = RoundRobinBuffer!(C1, C2)(next, empty, initialBufSize, nBuffers); 2439 return asyncBuf(roundRobin, nBuffers / 2); 2440 } 2441 2442 /// 2443 template reduce(functions...) 2444 { 2445 /** 2446 Parallel reduce on a random access range. Except as otherwise noted, 2447 usage is similar to $(REF _reduce, std,algorithm,iteration). There is 2448 also $(LREF fold) which does the same thing with a different parameter 2449 order. 2450 2451 This function works by splitting the range to be reduced into work 2452 units, which are slices to be reduced in parallel. Once the results 2453 from all work units are computed, a final serial reduction is performed 2454 on these results to compute the final answer. Therefore, care must be 2455 taken to choose the seed value appropriately. 2456 2457 Because the reduction is being performed in parallel, `functions` 2458 must be associative. For notational simplicity, let # be an 2459 infix operator representing `functions`. Then, (a # b) # c must equal 2460 a # (b # c). Floating point addition is not associative 2461 even though addition in exact arithmetic is. Summing floating 2462 point numbers using this function may give different results than summing 2463 serially. However, for many practical purposes floating point addition 2464 can be treated as associative. 2465 2466 Note that, since `functions` are assumed to be associative, 2467 additional optimizations are made to the serial portion of the reduction 2468 algorithm. These take advantage of the instruction level parallelism of 2469 modern CPUs, in addition to the thread-level parallelism that the rest 2470 of this module exploits. This can lead to better than linear speedups 2471 relative to $(REF _reduce, std,algorithm,iteration), especially for 2472 fine-grained benchmarks like dot products. 2473 2474 An explicit seed may be provided as the first argument. If 2475 provided, it is used as the seed for all work units and for the final 2476 reduction of results from all work units. Therefore, if it is not the 2477 identity value for the operation being performed, results may differ 2478 from those generated by $(REF _reduce, std,algorithm,iteration) or 2479 depending on how many work units are used. The next argument must be 2480 the range to be reduced. 2481 --- 2482 // Find the sum of squares of a range in parallel, using 2483 // an explicit seed. 2484 // 2485 // Timings on an Athlon 64 X2 dual core machine: 2486 // 2487 // Parallel reduce: 72 milliseconds 2488 // Using std.algorithm.reduce instead: 181 milliseconds 2489 auto nums = iota(10_000_000.0f); 2490 auto sumSquares = taskPool.reduce!"a + b"( 2491 0.0, std.algorithm.map!"a * a"(nums) 2492 ); 2493 --- 2494 2495 If no explicit seed is provided, the first element of each work unit 2496 is used as a seed. For the final reduction, the result from the first 2497 work unit is used as the seed. 2498 --- 2499 // Find the sum of a range in parallel, using the first 2500 // element of each work unit as the seed. 2501 auto sum = taskPool.reduce!"a + b"(nums); 2502 --- 2503 2504 An explicit work unit size may be specified as the last argument. 2505 Specifying too small a work unit size will effectively serialize the 2506 reduction, as the final reduction of the result of each work unit will 2507 dominate computation time. If `TaskPool.size` for this instance 2508 is zero, this parameter is ignored and one work unit is used. 2509 --- 2510 // Use a work unit size of 100. 2511 auto sum2 = taskPool.reduce!"a + b"(nums, 100); 2512 2513 // Work unit size of 100 and explicit seed. 2514 auto sum3 = taskPool.reduce!"a + b"(0.0, nums, 100); 2515 --- 2516 2517 Parallel reduce supports multiple functions, like 2518 `std.algorithm.reduce`. 2519 --- 2520 // Find both the min and max of nums. 2521 auto minMax = taskPool.reduce!(min, max)(nums); 2522 assert(minMax[0] == reduce!min(nums)); 2523 assert(minMax[1] == reduce!max(nums)); 2524 --- 2525 2526 $(B Exception Handling): 2527 2528 After this function is finished executing, any exceptions thrown 2529 are chained together via `Throwable.next` and rethrown. The chaining 2530 order is non-deterministic. 2531 2532 See_Also: 2533 2534 $(LREF fold) is functionally equivalent to $(LREF _reduce) except the 2535 range parameter comes first and there is no need to use 2536 $(REF_ALTTEXT `tuple`,tuple,std,typecons) for multiple seeds. 2537 */ 2538 auto reduce(Args...)(Args args) 2539 { 2540 import core.exception : OutOfMemoryError; 2541 import std.conv : emplaceRef; 2542 import std.exception : enforce; 2543 2544 alias fun = reduceAdjoin!functions; 2545 alias finishFun = reduceFinish!functions; 2546 2547 static if (isIntegral!(Args[$ - 1])) 2548 { 2549 size_t workUnitSize = cast(size_t) args[$ - 1]; 2550 alias args2 = args[0..$ - 1]; 2551 alias Args2 = Args[0..$ - 1]; 2552 } 2553 else 2554 { 2555 alias args2 = args; 2556 alias Args2 = Args; 2557 } 2558 2559 auto makeStartValue(Type)(Type e) 2560 { 2561 static if (functions.length == 1) 2562 { 2563 return e; 2564 } 2565 else 2566 { 2567 typeof(adjoin!(staticMap!(binaryFun, functions))(e, e)) seed = void; 2568 foreach (i, T; seed.Types) 2569 { 2570 emplaceRef(seed.expand[i], e); 2571 } 2572 2573 return seed; 2574 } 2575 } 2576 2577 static if (args2.length == 2) 2578 { 2579 static assert(isInputRange!(Args2[1])); 2580 alias range = args2[1]; 2581 alias seed = args2[0]; 2582 enum explicitSeed = true; 2583 2584 static if (!is(typeof(workUnitSize))) 2585 { 2586 size_t workUnitSize = defaultWorkUnitSize(range.length); 2587 } 2588 } 2589 else 2590 { 2591 static assert(args2.length == 1); 2592 alias range = args2[0]; 2593 2594 static if (!is(typeof(workUnitSize))) 2595 { 2596 size_t workUnitSize = defaultWorkUnitSize(range.length); 2597 } 2598 2599 enforce(!range.empty, 2600 "Cannot reduce an empty range with first element as start value."); 2601 2602 auto seed = makeStartValue(range.front); 2603 enum explicitSeed = false; 2604 range.popFront(); 2605 } 2606 2607 alias E = typeof(seed); 2608 alias R = typeof(range); 2609 2610 E reduceOnRange(R range, size_t lowerBound, size_t upperBound) 2611 { 2612 // This is for exploiting instruction level parallelism by 2613 // using multiple accumulator variables within each thread, 2614 // since we're assuming functions are associative anyhow. 2615 2616 // This is so that loops can be unrolled automatically. 2617 enum ilpTuple = AliasSeq!(0, 1, 2, 3, 4, 5); 2618 enum nILP = ilpTuple.length; 2619 immutable subSize = (upperBound - lowerBound) / nILP; 2620 2621 if (subSize <= 1) 2622 { 2623 // Handle as a special case. 2624 static if (explicitSeed) 2625 { 2626 E result = seed; 2627 } 2628 else 2629 { 2630 E result = makeStartValue(range[lowerBound]); 2631 lowerBound++; 2632 } 2633 2634 foreach (i; lowerBound .. upperBound) 2635 { 2636 result = fun(result, range[i]); 2637 } 2638 2639 return result; 2640 } 2641 2642 assert(subSize > 1); 2643 E[nILP] results; 2644 size_t[nILP] offsets; 2645 2646 foreach (i; ilpTuple) 2647 { 2648 offsets[i] = lowerBound + subSize * i; 2649 2650 static if (explicitSeed) 2651 { 2652 results[i] = seed; 2653 } 2654 else 2655 { 2656 results[i] = makeStartValue(range[offsets[i]]); 2657 offsets[i]++; 2658 } 2659 } 2660 2661 immutable nLoop = subSize - (!explicitSeed); 2662 foreach (i; 0 .. nLoop) 2663 { 2664 foreach (j; ilpTuple) 2665 { 2666 results[j] = fun(results[j], range[offsets[j]]); 2667 offsets[j]++; 2668 } 2669 } 2670 2671 // Finish the remainder. 2672 foreach (i; nILP * subSize + lowerBound .. upperBound) 2673 { 2674 results[$ - 1] = fun(results[$ - 1], range[i]); 2675 } 2676 2677 foreach (i; ilpTuple[1..$]) 2678 { 2679 results[0] = finishFun(results[0], results[i]); 2680 } 2681 2682 return results[0]; 2683 } 2684 2685 immutable len = range.length; 2686 if (len == 0) 2687 { 2688 return seed; 2689 } 2690 2691 if (this.size == 0) 2692 { 2693 return finishFun(seed, reduceOnRange(range, 0, len)); 2694 } 2695 2696 // Unlike the rest of the functions here, I can't use the Task object 2697 // recycling trick here because this has to work on non-commutative 2698 // operations. After all the tasks are done executing, fun() has to 2699 // be applied on the results of these to get a final result, but 2700 // it can't be evaluated out of order. 2701 2702 if (workUnitSize > len) 2703 { 2704 workUnitSize = len; 2705 } 2706 2707 immutable size_t nWorkUnits = (len / workUnitSize) + ((len % workUnitSize == 0) ? 0 : 1); 2708 assert(nWorkUnits * workUnitSize >= len); 2709 2710 alias RTask = Task!(run, typeof(&reduceOnRange), R, size_t, size_t); 2711 RTask[] tasks; 2712 2713 // Can't use alloca() due to https://issues.dlang.org/show_bug.cgi?id=3753 2714 // Use a fixed buffer backed by malloc(). 2715 enum maxStack = 2_048; 2716 byte[maxStack] buf = void; 2717 immutable size_t nBytesNeeded = nWorkUnits * RTask.sizeof; 2718 2719 import core.stdc.stdlib : malloc, free; 2720 if (nBytesNeeded < maxStack) 2721 { 2722 tasks = (cast(RTask*) buf.ptr)[0 .. nWorkUnits]; 2723 } 2724 else 2725 { 2726 auto ptr = cast(RTask*) malloc(nBytesNeeded); 2727 if (!ptr) 2728 { 2729 throw new OutOfMemoryError( 2730 "Out of memory in std.parallelism." 2731 ); 2732 } 2733 2734 tasks = ptr[0 .. nWorkUnits]; 2735 } 2736 2737 scope(exit) 2738 { 2739 if (nBytesNeeded > maxStack) 2740 { 2741 free(tasks.ptr); 2742 } 2743 } 2744 2745 foreach (ref t; tasks[]) 2746 emplaceRef(t, RTask()); 2747 2748 // Hack to take the address of a nested function w/o 2749 // making a closure. 2750 static auto scopedAddress(D)(scope D del) @system 2751 { 2752 auto tmp = del; 2753 return tmp; 2754 } 2755 2756 size_t curPos = 0; 2757 void useTask(ref RTask task) 2758 { 2759 import std.algorithm.comparison : min; 2760 2761 task.pool = this; 2762 task._args[0] = scopedAddress(&reduceOnRange); 2763 task._args[3] = min(len, curPos + workUnitSize); // upper bound. 2764 task._args[1] = range; // range 2765 task._args[2] = curPos; // lower bound. 2766 2767 curPos += workUnitSize; 2768 } 2769 2770 foreach (ref task; tasks) 2771 { 2772 useTask(task); 2773 } 2774 2775 foreach (i; 1 .. tasks.length - 1) 2776 { 2777 tasks[i].next = tasks[i + 1].basePtr; 2778 tasks[i + 1].prev = tasks[i].basePtr; 2779 } 2780 2781 if (tasks.length > 1) 2782 { 2783 queueLock(); 2784 scope(exit) queueUnlock(); 2785 2786 abstractPutGroupNoSync( 2787 tasks[1].basePtr, 2788 tasks[$ - 1].basePtr 2789 ); 2790 } 2791 2792 if (tasks.length > 0) 2793 { 2794 try 2795 { 2796 tasks[0].job(); 2797 } 2798 catch (Throwable e) 2799 { 2800 tasks[0].exception = e; 2801 } 2802 tasks[0].taskStatus = TaskStatus.done; 2803 2804 // Try to execute each of these in the current thread 2805 foreach (ref task; tasks[1..$]) 2806 { 2807 tryDeleteExecute(task.basePtr); 2808 } 2809 } 2810 2811 // Now that we've tried to execute every task, they're all either 2812 // done or in progress. Force all of them. 2813 E result = seed; 2814 2815 Throwable firstException; 2816 2817 foreach (ref task; tasks) 2818 { 2819 try 2820 { 2821 task.yieldForce; 2822 } 2823 catch (Throwable e) 2824 { 2825 /* Chain e to front because order doesn't matter and because 2826 * e is not likely to be a chain itself (so fewer traversals) 2827 */ 2828 firstException = Throwable.chainTogether(e, firstException); 2829 continue; 2830 } 2831 2832 if (!firstException) result = finishFun(result, task.returnVal); 2833 } 2834 2835 if (firstException) throw firstException; 2836 2837 return result; 2838 } 2839 } 2840 2841 /// 2842 template fold(functions...) 2843 { 2844 /** Implements the homonym function (also known as `accumulate`, `compress`, 2845 `inject`, or `foldl`) present in various programming languages of 2846 functional flavor. 2847 2848 `fold` is functionally equivalent to $(LREF reduce) except the range 2849 parameter comes first and there is no need to use $(REF_ALTTEXT 2850 `tuple`,tuple,std,typecons) for multiple seeds. 2851 2852 There may be one or more callable entities (`functions` argument) to 2853 apply. 2854 2855 Params: 2856 args = Just the range to _fold over; or the range and one seed 2857 per function; or the range, one seed per function, and 2858 the work unit size 2859 2860 Returns: 2861 The accumulated result as a single value for single function and 2862 as a tuple of values for multiple functions 2863 2864 See_Also: 2865 Similar to $(REF _fold, std,algorithm,iteration), `fold` is a wrapper around $(LREF reduce). 2866 2867 Example: 2868 --- 2869 static int adder(int a, int b) 2870 { 2871 return a + b; 2872 } 2873 static int multiplier(int a, int b) 2874 { 2875 return a * b; 2876 } 2877 2878 // Just the range 2879 auto x = taskPool.fold!adder([1, 2, 3, 4]); 2880 assert(x == 10); 2881 2882 // The range and the seeds (0 and 1 below; also note multiple 2883 // functions in this example) 2884 auto y = taskPool.fold!(adder, multiplier)([1, 2, 3, 4], 0, 1); 2885 assert(y[0] == 10); 2886 assert(y[1] == 24); 2887 2888 // The range, the seed (0), and the work unit size (20) 2889 auto z = taskPool.fold!adder([1, 2, 3, 4], 0, 20); 2890 assert(z == 10); 2891 --- 2892 */ 2893 auto fold(Args...)(Args args) 2894 { 2895 static assert(isInputRange!(Args[0]), "First argument must be an InputRange"); 2896 2897 alias range = args[0]; 2898 2899 static if (Args.length == 1) 2900 { 2901 // Just the range 2902 return reduce!functions(range); 2903 } 2904 else static if (Args.length == 1 + functions.length || 2905 Args.length == 1 + functions.length + 1) 2906 { 2907 static if (functions.length == 1) 2908 { 2909 alias seeds = args[1]; 2910 } 2911 else 2912 { 2913 auto seeds() 2914 { 2915 import std.typecons : tuple; 2916 return tuple(args[1 .. functions.length+1]); 2917 } 2918 } 2919 2920 static if (Args.length == 1 + functions.length) 2921 { 2922 // The range and the seeds 2923 return reduce!functions(seeds, range); 2924 } 2925 else static if (Args.length == 1 + functions.length + 1) 2926 { 2927 // The range, the seeds, and the work unit size 2928 static assert(isIntegral!(Args[$-1]), "Work unit size must be an integral type"); 2929 return reduce!functions(seeds, range, args[$-1]); 2930 } 2931 } 2932 else 2933 { 2934 import std.conv : text; 2935 static assert(0, "Invalid number of arguments (" ~ Args.length.text ~ "): Should be an input range, " 2936 ~ functions.length.text ~ " optional seed(s), and an optional work unit size."); 2937 } 2938 } 2939 } 2940 2941 // This test is not included in the documentation because even though these 2942 // examples are for the inner fold() template, with their current location, 2943 // they would appear under the outer one. (We can't move this inside the 2944 // outer fold() template because then dmd runs out of memory possibly due to 2945 // recursive template instantiation, which is surprisingly not caught.) 2946 @system unittest 2947 { 2948 // Just the range 2949 auto x = taskPool.fold!"a + b"([1, 2, 3, 4]); 2950 assert(x == 10); 2951 2952 // The range and the seeds (0 and 1 below; also note multiple 2953 // functions in this example) 2954 auto y = taskPool.fold!("a + b", "a * b")([1, 2, 3, 4], 0, 1); 2955 assert(y[0] == 10); 2956 assert(y[1] == 24); 2957 2958 // The range, the seed (0), and the work unit size (20) 2959 auto z = taskPool.fold!"a + b"([1, 2, 3, 4], 0, 20); 2960 assert(z == 10); 2961 } 2962 2963 /** 2964 Gets the index of the current thread relative to this `TaskPool`. Any 2965 thread not in this pool will receive an index of 0. The worker threads in 2966 this pool receive unique indices of 1 through `this.size`. 2967 2968 This function is useful for maintaining worker-local resources. 2969 2970 Example: 2971 --- 2972 // Execute a loop that computes the greatest common 2973 // divisor of every number from 0 through 999 with 2974 // 42 in parallel. Write the results out to 2975 // a set of files, one for each thread. This allows 2976 // results to be written out without any synchronization. 2977 2978 import std.conv, std.range, std.numeric, std.stdio; 2979 2980 void main() 2981 { 2982 auto filesHandles = new File[taskPool.size + 1]; 2983 scope(exit) { 2984 foreach (ref handle; fileHandles) 2985 { 2986 handle.close(); 2987 } 2988 } 2989 2990 foreach (i, ref handle; fileHandles) 2991 { 2992 handle = File("workerResults" ~ to!string(i) ~ ".txt"); 2993 } 2994 2995 foreach (num; parallel(iota(1_000))) 2996 { 2997 auto outHandle = fileHandles[taskPool.workerIndex]; 2998 outHandle.writeln(num, '\t', gcd(num, 42)); 2999 } 3000 } 3001 --- 3002 */ 3003 size_t workerIndex() @property @safe const nothrow 3004 { 3005 immutable rawInd = threadIndex; 3006 return (rawInd >= instanceStartIndex && rawInd < instanceStartIndex + size) ? 3007 (rawInd - instanceStartIndex + 1) : 0; 3008 } 3009 3010 /** 3011 Struct for creating worker-local storage. Worker-local storage is 3012 thread-local storage that exists only for worker threads in a given 3013 `TaskPool` plus a single thread outside the pool. It is allocated on the 3014 garbage collected heap in a way that avoids _false sharing, and doesn't 3015 necessarily have global scope within any thread. It can be accessed from 3016 any worker thread in the `TaskPool` that created it, and one thread 3017 outside this `TaskPool`. All threads outside the pool that created a 3018 given instance of worker-local storage share a single slot. 3019 3020 Since the underlying data for this struct is heap-allocated, this struct 3021 has reference semantics when passed between functions. 3022 3023 The main uses cases for `WorkerLocalStorageStorage` are: 3024 3025 1. Performing parallel reductions with an imperative, as opposed to 3026 functional, programming style. In this case, it's useful to treat 3027 `WorkerLocalStorageStorage` as local to each thread for only the parallel 3028 portion of an algorithm. 3029 3030 2. Recycling temporary buffers across iterations of a parallel foreach loop. 3031 3032 Example: 3033 --- 3034 // Calculate pi as in our synopsis example, but 3035 // use an imperative instead of a functional style. 3036 immutable n = 1_000_000_000; 3037 immutable delta = 1.0L / n; 3038 3039 auto sums = taskPool.workerLocalStorage(0.0L); 3040 foreach (i; parallel(iota(n))) 3041 { 3042 immutable x = ( i - 0.5L ) * delta; 3043 immutable toAdd = delta / ( 1.0 + x * x ); 3044 sums.get += toAdd; 3045 } 3046 3047 // Add up the results from each worker thread. 3048 real pi = 0; 3049 foreach (threadResult; sums.toRange) 3050 { 3051 pi += 4.0L * threadResult; 3052 } 3053 --- 3054 */ 3055 static struct WorkerLocalStorage(T) 3056 { 3057 private: 3058 TaskPool pool; 3059 size_t size; 3060 3061 size_t elemSize; 3062 bool* stillThreadLocal; 3063 3064 static size_t roundToLine(size_t num) pure nothrow 3065 { 3066 if (num % cacheLineSize == 0) 3067 { 3068 return num; 3069 } 3070 else 3071 { 3072 return ((num / cacheLineSize) + 1) * cacheLineSize; 3073 } 3074 } 3075 3076 void* data; 3077 3078 void initialize(TaskPool pool) 3079 { 3080 this.pool = pool; 3081 size = pool.size + 1; 3082 stillThreadLocal = new bool; 3083 *stillThreadLocal = true; 3084 3085 // Determines whether the GC should scan the array. 3086 auto blkInfo = (typeid(T).flags & 1) ? 3087 cast(GC.BlkAttr) 0 : 3088 GC.BlkAttr.NO_SCAN; 3089 3090 immutable nElem = pool.size + 1; 3091 elemSize = roundToLine(T.sizeof); 3092 3093 // The + 3 is to pad one full cache line worth of space on either side 3094 // of the data structure to make sure false sharing with completely 3095 // unrelated heap data is prevented, and to provide enough padding to 3096 // make sure that data is cache line-aligned. 3097 data = GC.malloc(elemSize * (nElem + 3), blkInfo) + elemSize; 3098 3099 // Cache line align data ptr. 3100 data = cast(void*) roundToLine(cast(size_t) data); 3101 3102 foreach (i; 0 .. nElem) 3103 { 3104 this.opIndex(i) = T.init; 3105 } 3106 } 3107 3108 ref opIndex(this Qualified)(size_t index) 3109 { 3110 import std.conv : text; 3111 assert(index < size, text(index, '\t', uint.max)); 3112 return *(cast(CopyTypeQualifiers!(Qualified, T)*) (data + elemSize * index)); 3113 } 3114 3115 void opIndexAssign(T val, size_t index) 3116 { 3117 assert(index < size); 3118 *(cast(T*) (data + elemSize * index)) = val; 3119 } 3120 3121 public: 3122 /** 3123 Get the current thread's instance. Returns by ref. 3124 Note that calling `get` from any thread 3125 outside the `TaskPool` that created this instance will return the 3126 same reference, so an instance of worker-local storage should only be 3127 accessed from one thread outside the pool that created it. If this 3128 rule is violated, undefined behavior will result. 3129 3130 If assertions are enabled and `toRange` has been called, then this 3131 WorkerLocalStorage instance is no longer worker-local and an assertion 3132 failure will result when calling this method. This is not checked 3133 when assertions are disabled for performance reasons. 3134 */ 3135 ref get(this Qualified)() @property 3136 { 3137 assert(*stillThreadLocal, 3138 "Cannot call get() on this instance of WorkerLocalStorage " ~ 3139 "because it is no longer worker-local." 3140 ); 3141 return opIndex(pool.workerIndex); 3142 } 3143 3144 /** 3145 Assign a value to the current thread's instance. This function has 3146 the same caveats as its overload. 3147 */ 3148 void get(T val) @property 3149 { 3150 assert(*stillThreadLocal, 3151 "Cannot call get() on this instance of WorkerLocalStorage " ~ 3152 "because it is no longer worker-local." 3153 ); 3154 3155 opIndexAssign(val, pool.workerIndex); 3156 } 3157 3158 /** 3159 Returns a range view of the values for all threads, which can be used 3160 to further process the results of each thread after running the parallel 3161 part of your algorithm. Do not use this method in the parallel portion 3162 of your algorithm. 3163 3164 Calling this function sets a flag indicating that this struct is no 3165 longer worker-local, and attempting to use the `get` method again 3166 will result in an assertion failure if assertions are enabled. 3167 */ 3168 WorkerLocalStorageRange!T toRange() @property 3169 { 3170 if (*stillThreadLocal) 3171 { 3172 *stillThreadLocal = false; 3173 3174 // Make absolutely sure results are visible to all threads. 3175 // This is probably not necessary since some other 3176 // synchronization primitive will be used to signal that the 3177 // parallel part of the algorithm is done, but the 3178 // performance impact should be negligible, so it's better 3179 // to be safe. 3180 ubyte barrierDummy; 3181 atomicSetUbyte(barrierDummy, 1); 3182 } 3183 3184 return WorkerLocalStorageRange!T(this); 3185 } 3186 } 3187 3188 /** 3189 Range primitives for worker-local storage. The purpose of this is to 3190 access results produced by each worker thread from a single thread once you 3191 are no longer using the worker-local storage from multiple threads. 3192 Do not use this struct in the parallel portion of your algorithm. 3193 3194 The proper way to instantiate this object is to call 3195 `WorkerLocalStorage.toRange`. Once instantiated, this object behaves 3196 as a finite random-access range with assignable, lvalue elements and 3197 a length equal to the number of worker threads in the `TaskPool` that 3198 created it plus 1. 3199 */ 3200 static struct WorkerLocalStorageRange(T) 3201 { 3202 private: 3203 WorkerLocalStorage!T workerLocalStorage; 3204 3205 size_t _length; 3206 size_t beginOffset; 3207 3208 this(WorkerLocalStorage!T wl) 3209 { 3210 this.workerLocalStorage = wl; 3211 _length = wl.size; 3212 } 3213 3214 public: 3215 ref front(this Qualified)() @property 3216 { 3217 return this[0]; 3218 } 3219 3220 ref back(this Qualified)() @property 3221 { 3222 return this[_length - 1]; 3223 } 3224 3225 void popFront() 3226 { 3227 if (_length > 0) 3228 { 3229 beginOffset++; 3230 _length--; 3231 } 3232 } 3233 3234 void popBack() 3235 { 3236 if (_length > 0) 3237 { 3238 _length--; 3239 } 3240 } 3241 3242 typeof(this) save() @property 3243 { 3244 return this; 3245 } 3246 3247 ref opIndex(this Qualified)(size_t index) 3248 { 3249 assert(index < _length); 3250 return workerLocalStorage[index + beginOffset]; 3251 } 3252 3253 void opIndexAssign(T val, size_t index) 3254 { 3255 assert(index < _length); 3256 workerLocalStorage[index] = val; 3257 } 3258 3259 typeof(this) opSlice(size_t lower, size_t upper) 3260 { 3261 assert(upper <= _length); 3262 auto newWl = this.workerLocalStorage; 3263 newWl.data += lower * newWl.elemSize; 3264 newWl.size = upper - lower; 3265 return typeof(this)(newWl); 3266 } 3267 3268 bool empty() const @property 3269 { 3270 return length == 0; 3271 } 3272 3273 size_t length() const @property 3274 { 3275 return _length; 3276 } 3277 } 3278 3279 /** 3280 Creates an instance of worker-local storage, initialized with a given 3281 value. The value is `lazy` so that you can, for example, easily 3282 create one instance of a class for each worker. For usage example, 3283 see the `WorkerLocalStorage` struct. 3284 */ 3285 WorkerLocalStorage!T workerLocalStorage(T)(lazy T initialVal = T.init) 3286 { 3287 WorkerLocalStorage!T ret; 3288 ret.initialize(this); 3289 foreach (i; 0 .. size + 1) 3290 { 3291 ret[i] = initialVal; 3292 } 3293 3294 // Memory barrier to make absolutely sure that what we wrote is 3295 // visible to worker threads. 3296 ubyte barrierDummy; 3297 atomicSetUbyte(barrierDummy, 0); 3298 3299 return ret; 3300 } 3301 3302 /** 3303 Signals to all worker threads to terminate as soon as they are finished 3304 with their current `Task`, or immediately if they are not executing a 3305 `Task`. `Task`s that were in queue will not be executed unless 3306 a call to `Task.workForce`, `Task.yieldForce` or `Task.spinForce` 3307 causes them to be executed. 3308 3309 Use only if you have waited on every `Task` and therefore know the 3310 queue is empty, or if you speculatively executed some tasks and no longer 3311 need the results. 3312 */ 3313 void stop() @trusted 3314 { 3315 queueLock(); 3316 scope(exit) queueUnlock(); 3317 atomicSetUbyte(status, PoolState.stopNow); 3318 notifyAll(); 3319 } 3320 3321 /** 3322 Signals worker threads to terminate when the queue becomes empty. 3323 3324 If blocking argument is true, wait for all worker threads to terminate 3325 before returning. This option might be used in applications where 3326 task results are never consumed-- e.g. when `TaskPool` is employed as a 3327 rudimentary scheduler for tasks which communicate by means other than 3328 return values. 3329 3330 Warning: Calling this function with $(D blocking = true) from a worker 3331 thread that is a member of the same `TaskPool` that 3332 `finish` is being called on will result in a deadlock. 3333 */ 3334 void finish(bool blocking = false) @trusted 3335 { 3336 { 3337 queueLock(); 3338 scope(exit) queueUnlock(); 3339 atomicCasUbyte(status, PoolState.running, PoolState.finishing); 3340 notifyAll(); 3341 } 3342 if (blocking) 3343 { 3344 // Use this thread as a worker until everything is finished. 3345 executeWorkLoop(); 3346 3347 foreach (t; pool) 3348 { 3349 // Maybe there should be something here to prevent a thread 3350 // from calling join() on itself if this function is called 3351 // from a worker thread in the same pool, but: 3352 // 3353 // 1. Using an if statement to skip join() would result in 3354 // finish() returning without all tasks being finished. 3355 // 3356 // 2. If an exception were thrown, it would bubble up to the 3357 // Task from which finish() was called and likely be 3358 // swallowed. 3359 t.join(); 3360 } 3361 } 3362 } 3363 3364 /// Returns the number of worker threads in the pool. 3365 @property size_t size() @safe const pure nothrow 3366 { 3367 return pool.length; 3368 } 3369 3370 /** 3371 Put a `Task` object on the back of the task queue. The `Task` 3372 object may be passed by pointer or reference. 3373 3374 Example: 3375 --- 3376 import std.file; 3377 3378 // Create a task. 3379 auto t = task!read("foo.txt"); 3380 3381 // Add it to the queue to be executed. 3382 taskPool.put(t); 3383 --- 3384 3385 Notes: 3386 3387 @trusted overloads of this function are called for `Task`s if 3388 $(REF hasUnsharedAliasing, std,traits) is false for the `Task`'s 3389 return type or the function the `Task` executes is `pure`. 3390 `Task` objects that meet all other requirements specified in the 3391 `@trusted` overloads of `task` and `scopedTask` may be created 3392 and executed from `@safe` code via `Task.executeInNewThread` but 3393 not via `TaskPool`. 3394 3395 While this function takes the address of variables that may 3396 be on the stack, some overloads are marked as @trusted. 3397 `Task` includes a destructor that waits for the task to complete 3398 before destroying the stack frame it is allocated on. Therefore, 3399 it is impossible for the stack frame to be destroyed before the task is 3400 complete and no longer referenced by a `TaskPool`. 3401 */ 3402 void put(alias fun, Args...)(ref Task!(fun, Args) task) 3403 if (!isSafeReturn!(typeof(task))) 3404 { 3405 task.pool = this; 3406 abstractPut(task.basePtr); 3407 } 3408 3409 /// Ditto 3410 void put(alias fun, Args...)(Task!(fun, Args)* task) 3411 if (!isSafeReturn!(typeof(*task))) 3412 { 3413 import std.exception : enforce; 3414 enforce(task !is null, "Cannot put a null Task on a TaskPool queue."); 3415 put(*task); 3416 } 3417 3418 @trusted void put(alias fun, Args...)(ref Task!(fun, Args) task) 3419 if (isSafeReturn!(typeof(task))) 3420 { 3421 task.pool = this; 3422 abstractPut(task.basePtr); 3423 } 3424 3425 @trusted void put(alias fun, Args...)(Task!(fun, Args)* task) 3426 if (isSafeReturn!(typeof(*task))) 3427 { 3428 import std.exception : enforce; 3429 enforce(task !is null, "Cannot put a null Task on a TaskPool queue."); 3430 put(*task); 3431 } 3432 3433 /** 3434 These properties control whether the worker threads are daemon threads. 3435 A daemon thread is automatically terminated when all non-daemon threads 3436 have terminated. A non-daemon thread will prevent a program from 3437 terminating as long as it has not terminated. 3438 3439 If any `TaskPool` with non-daemon threads is active, either `stop` 3440 or `finish` must be called on it before the program can terminate. 3441 3442 The worker treads in the `TaskPool` instance returned by the 3443 `taskPool` property are daemon by default. The worker threads of 3444 manually instantiated task pools are non-daemon by default. 3445 3446 Note: For a size zero pool, the getter arbitrarily returns true and the 3447 setter has no effect. 3448 */ 3449 bool isDaemon() @property @trusted 3450 { 3451 queueLock(); 3452 scope(exit) queueUnlock(); 3453 return (size == 0) ? true : pool[0].isDaemon; 3454 } 3455 3456 /// Ditto 3457 void isDaemon(bool newVal) @property @trusted 3458 { 3459 queueLock(); 3460 scope(exit) queueUnlock(); 3461 foreach (thread; pool) 3462 { 3463 thread.isDaemon = newVal; 3464 } 3465 } 3466 3467 /** 3468 These functions allow getting and setting the OS scheduling priority of 3469 the worker threads in this `TaskPool`. They forward to 3470 `core.thread.Thread.priority`, so a given priority value here means the 3471 same thing as an identical priority value in `core.thread`. 3472 3473 Note: For a size zero pool, the getter arbitrarily returns 3474 `core.thread.Thread.PRIORITY_MIN` and the setter has no effect. 3475 */ 3476 int priority() @property @trusted 3477 { 3478 return (size == 0) ? core.thread.Thread.PRIORITY_MIN : 3479 pool[0].priority; 3480 } 3481 3482 /// Ditto 3483 void priority(int newPriority) @property @trusted 3484 { 3485 if (size > 0) 3486 { 3487 foreach (t; pool) 3488 { 3489 t.priority = newPriority; 3490 } 3491 } 3492 } 3493 } 3494 3495 @system unittest 3496 { 3497 import std.algorithm.iteration : sum; 3498 import std.range : iota; 3499 import std.typecons : tuple; 3500 3501 enum N = 100; 3502 auto r = iota(1, N + 1); 3503 const expected = r.sum(); 3504 3505 // Just the range 3506 assert(taskPool.fold!"a + b"(r) == expected); 3507 3508 // Range and seeds 3509 assert(taskPool.fold!"a + b"(r, 0) == expected); 3510 assert(taskPool.fold!("a + b", "a + b")(r, 0, 0) == tuple(expected, expected)); 3511 3512 // Range, seeds, and work unit size 3513 assert(taskPool.fold!"a + b"(r, 0, 42) == expected); 3514 assert(taskPool.fold!("a + b", "a + b")(r, 0, 0, 42) == tuple(expected, expected)); 3515 } 3516 3517 /** 3518 Returns a lazily initialized global instantiation of `TaskPool`. 3519 This function can safely be called concurrently from multiple non-worker 3520 threads. The worker threads in this pool are daemon threads, meaning that it 3521 is not necessary to call `TaskPool.stop` or `TaskPool.finish` before 3522 terminating the main thread. 3523 */ 3524 @property TaskPool taskPool() @trusted 3525 { 3526 import std.concurrency : initOnce; 3527 __gshared TaskPool pool; 3528 return initOnce!pool({ 3529 auto p = new TaskPool(defaultPoolThreads); 3530 p.isDaemon = true; 3531 return p; 3532 }()); 3533 } 3534 3535 private shared uint _defaultPoolThreads = uint.max; 3536 3537 /** 3538 These properties get and set the number of worker threads in the `TaskPool` 3539 instance returned by `taskPool`. The default value is `totalCPUs` - 1. 3540 Calling the setter after the first call to `taskPool` does not changes 3541 number of worker threads in the instance returned by `taskPool`. 3542 */ 3543 @property uint defaultPoolThreads() @trusted 3544 { 3545 const local = atomicLoad(_defaultPoolThreads); 3546 return local < uint.max ? local : totalCPUs - 1; 3547 } 3548 3549 /// Ditto 3550 @property void defaultPoolThreads(uint newVal) @trusted 3551 { 3552 atomicStore(_defaultPoolThreads, newVal); 3553 } 3554 3555 /** 3556 Convenience functions that forwards to `taskPool.parallel`. The 3557 purpose of these is to make parallel foreach less verbose and more 3558 readable. 3559 3560 Example: 3561 --- 3562 // Find the logarithm of every number from 3563 // 1 to 1_000_000 in parallel, using the 3564 // default TaskPool instance. 3565 auto logs = new double[1_000_000]; 3566 3567 foreach (i, ref elem; parallel(logs)) 3568 { 3569 elem = log(i + 1.0); 3570 } 3571 --- 3572 3573 */ 3574 ParallelForeach!R parallel(R)(R range) 3575 { 3576 return taskPool.parallel(range); 3577 } 3578 3579 /// Ditto 3580 ParallelForeach!R parallel(R)(R range, size_t workUnitSize) 3581 { 3582 return taskPool.parallel(range, workUnitSize); 3583 } 3584 3585 // `each` should be usable with parallel 3586 // https://issues.dlang.org/show_bug.cgi?id=17019 3587 @system unittest 3588 { 3589 import std.algorithm.iteration : each, sum; 3590 import std.range : iota; 3591 3592 // check behavior with parallel 3593 auto arr = new int[10]; 3594 parallel(arr).each!((ref e) => e += 1); 3595 assert(arr.sum == 10); 3596 3597 auto arrIndex = new int[10]; 3598 parallel(arrIndex).each!((i, ref e) => e += i); 3599 assert(arrIndex.sum == 10.iota.sum); 3600 } 3601 3602 // Thrown when a parallel foreach loop is broken from. 3603 class ParallelForeachError : Error 3604 { 3605 this() 3606 { 3607 super("Cannot break from a parallel foreach loop using break, return, " 3608 ~ "labeled break/continue or goto statements."); 3609 } 3610 } 3611 3612 /*------Structs that implement opApply for parallel foreach.------------------*/ 3613 private template randLen(R) 3614 { 3615 enum randLen = isRandomAccessRange!R && hasLength!R; 3616 } 3617 3618 private void submitAndExecute( 3619 TaskPool pool, 3620 scope void delegate() doIt 3621 ) 3622 { 3623 import core.exception : OutOfMemoryError; 3624 immutable nThreads = pool.size + 1; 3625 3626 alias PTask = typeof(scopedTask(doIt)); 3627 import core.stdc.stdlib : malloc, free; 3628 import core.stdc..string : memcpy; 3629 3630 // The logical thing to do would be to just use alloca() here, but that 3631 // causes problems on Windows for reasons that I don't understand 3632 // (tentatively a compiler bug) and definitely doesn't work on Posix due 3633 // to https://issues.dlang.org/show_bug.cgi?id=3753. 3634 // Therefore, allocate a fixed buffer and fall back to `malloc()` if 3635 // someone's using a ridiculous amount of threads. 3636 // Also, the using a byte array instead of a PTask array as the fixed buffer 3637 // is to prevent d'tors from being called on uninitialized excess PTask 3638 // instances. 3639 enum nBuf = 64; 3640 byte[nBuf * PTask.sizeof] buf = void; 3641 PTask[] tasks; 3642 if (nThreads <= nBuf) 3643 { 3644 tasks = (cast(PTask*) buf.ptr)[0 .. nThreads]; 3645 } 3646 else 3647 { 3648 auto ptr = cast(PTask*) malloc(nThreads * PTask.sizeof); 3649 if (!ptr) throw new OutOfMemoryError("Out of memory in std.parallelism."); 3650 tasks = ptr[0 .. nThreads]; 3651 } 3652 3653 scope(exit) 3654 { 3655 if (nThreads > nBuf) 3656 { 3657 free(tasks.ptr); 3658 } 3659 } 3660 3661 foreach (ref t; tasks) 3662 { 3663 import core.stdc..string : memcpy; 3664 3665 // This silly looking code is necessary to prevent d'tors from being 3666 // called on uninitialized objects. 3667 auto temp = scopedTask(doIt); 3668 memcpy(&t, &temp, PTask.sizeof); 3669 3670 // This has to be done to t after copying, not temp before copying. 3671 // Otherwise, temp's destructor will sit here and wait for the 3672 // task to finish. 3673 t.pool = pool; 3674 } 3675 3676 foreach (i; 1 .. tasks.length - 1) 3677 { 3678 tasks[i].next = tasks[i + 1].basePtr; 3679 tasks[i + 1].prev = tasks[i].basePtr; 3680 } 3681 3682 if (tasks.length > 1) 3683 { 3684 pool.queueLock(); 3685 scope(exit) pool.queueUnlock(); 3686 3687 pool.abstractPutGroupNoSync( 3688 tasks[1].basePtr, 3689 tasks[$ - 1].basePtr 3690 ); 3691 } 3692 3693 if (tasks.length > 0) 3694 { 3695 try 3696 { 3697 tasks[0].job(); 3698 } 3699 catch (Throwable e) 3700 { 3701 tasks[0].exception = e; // nocoverage 3702 } 3703 tasks[0].taskStatus = TaskStatus.done; 3704 3705 // Try to execute each of these in the current thread 3706 foreach (ref task; tasks[1..$]) 3707 { 3708 pool.tryDeleteExecute(task.basePtr); 3709 } 3710 } 3711 3712 Throwable firstException; 3713 3714 foreach (i, ref task; tasks) 3715 { 3716 try 3717 { 3718 task.yieldForce; 3719 } 3720 catch (Throwable e) 3721 { 3722 /* Chain e to front because order doesn't matter and because 3723 * e is not likely to be a chain itself (so fewer traversals) 3724 */ 3725 firstException = Throwable.chainTogether(e, firstException); 3726 continue; 3727 } 3728 } 3729 3730 if (firstException) throw firstException; 3731 } 3732 3733 void foreachErr() 3734 { 3735 throw new ParallelForeachError(); 3736 } 3737 3738 int doSizeZeroCase(R, Delegate)(ref ParallelForeach!R p, Delegate dg) 3739 { 3740 with(p) 3741 { 3742 int res = 0; 3743 size_t index = 0; 3744 3745 // The explicit ElementType!R in the foreach loops is necessary for 3746 // correct behavior when iterating over strings. 3747 static if (hasLvalueElements!R) 3748 { 3749 foreach (ref ElementType!R elem; range) 3750 { 3751 static if (Parameters!dg.length == 2) 3752 { 3753 res = dg(index, elem); 3754 } 3755 else 3756 { 3757 res = dg(elem); 3758 } 3759 if (res) break; 3760 index++; 3761 } 3762 } 3763 else 3764 { 3765 foreach (ElementType!R elem; range) 3766 { 3767 static if (Parameters!dg.length == 2) 3768 { 3769 res = dg(index, elem); 3770 } 3771 else 3772 { 3773 res = dg(elem); 3774 } 3775 if (res) break; 3776 index++; 3777 } 3778 } 3779 if (res) foreachErr; 3780 return res; 3781 } 3782 } 3783 3784 private enum string parallelApplyMixinRandomAccess = q{ 3785 // Handle empty thread pool as special case. 3786 if (pool.size == 0) 3787 { 3788 return doSizeZeroCase(this, dg); 3789 } 3790 3791 // Whether iteration is with or without an index variable. 3792 enum withIndex = Parameters!(typeof(dg)).length == 2; 3793 3794 shared size_t workUnitIndex = size_t.max; // Effectively -1: chunkIndex + 1 == 0 3795 immutable len = range.length; 3796 if (!len) return 0; 3797 3798 shared bool shouldContinue = true; 3799 3800 void doIt() 3801 { 3802 import std.algorithm.comparison : min; 3803 3804 scope(failure) 3805 { 3806 // If an exception is thrown, all threads should bail. 3807 atomicStore(shouldContinue, false); 3808 } 3809 3810 while (atomicLoad(shouldContinue)) 3811 { 3812 immutable myUnitIndex = atomicOp!"+="(workUnitIndex, 1); 3813 immutable start = workUnitSize * myUnitIndex; 3814 if (start >= len) 3815 { 3816 atomicStore(shouldContinue, false); 3817 break; 3818 } 3819 3820 immutable end = min(len, start + workUnitSize); 3821 3822 foreach (i; start .. end) 3823 { 3824 static if (withIndex) 3825 { 3826 if (dg(i, range[i])) foreachErr(); 3827 } 3828 else 3829 { 3830 if (dg(range[i])) foreachErr(); 3831 } 3832 } 3833 } 3834 } 3835 3836 submitAndExecute(pool, &doIt); 3837 3838 return 0; 3839 }; 3840 3841 enum string parallelApplyMixinInputRange = q{ 3842 // Handle empty thread pool as special case. 3843 if (pool.size == 0) 3844 { 3845 return doSizeZeroCase(this, dg); 3846 } 3847 3848 // Whether iteration is with or without an index variable. 3849 enum withIndex = Parameters!(typeof(dg)).length == 2; 3850 3851 // This protects the range while copying it. 3852 auto rangeMutex = new Mutex(); 3853 3854 shared bool shouldContinue = true; 3855 3856 // The total number of elements that have been popped off range. 3857 // This is updated only while protected by rangeMutex; 3858 size_t nPopped = 0; 3859 3860 static if ( 3861 is(typeof(range.buf1)) && 3862 is(typeof(range.bufPos)) && 3863 is(typeof(range.doBufSwap())) 3864 ) 3865 { 3866 // Make sure we don't have the buffer recycling overload of 3867 // asyncBuf. 3868 static if ( 3869 is(typeof(range.source)) && 3870 isRoundRobin!(typeof(range.source)) 3871 ) 3872 { 3873 static assert(0, "Cannot execute a parallel foreach loop on " ~ 3874 "the buffer recycling overload of asyncBuf."); 3875 } 3876 3877 enum bool bufferTrick = true; 3878 } 3879 else 3880 { 3881 enum bool bufferTrick = false; 3882 } 3883 3884 void doIt() 3885 { 3886 scope(failure) 3887 { 3888 // If an exception is thrown, all threads should bail. 3889 atomicStore(shouldContinue, false); 3890 } 3891 3892 static if (hasLvalueElements!R) 3893 { 3894 alias Temp = ElementType!R*[]; 3895 Temp temp; 3896 3897 // Returns: The previous value of nPopped. 3898 size_t makeTemp() 3899 { 3900 import std.algorithm.internal : addressOf; 3901 import std.array : uninitializedArray; 3902 3903 if (temp is null) 3904 { 3905 temp = uninitializedArray!Temp(workUnitSize); 3906 } 3907 3908 rangeMutex.lock(); 3909 scope(exit) rangeMutex.unlock(); 3910 3911 size_t i = 0; 3912 for (; i < workUnitSize && !range.empty; range.popFront(), i++) 3913 { 3914 temp[i] = addressOf(range.front); 3915 } 3916 3917 temp = temp[0 .. i]; 3918 auto ret = nPopped; 3919 nPopped += temp.length; 3920 return ret; 3921 } 3922 3923 } 3924 else 3925 { 3926 3927 alias Temp = ElementType!R[]; 3928 Temp temp; 3929 3930 // Returns: The previous value of nPopped. 3931 static if (!bufferTrick) size_t makeTemp() 3932 { 3933 import std.array : uninitializedArray; 3934 3935 if (temp is null) 3936 { 3937 temp = uninitializedArray!Temp(workUnitSize); 3938 } 3939 3940 rangeMutex.lock(); 3941 scope(exit) rangeMutex.unlock(); 3942 3943 size_t i = 0; 3944 for (; i < workUnitSize && !range.empty; range.popFront(), i++) 3945 { 3946 temp[i] = range.front; 3947 } 3948 3949 temp = temp[0 .. i]; 3950 auto ret = nPopped; 3951 nPopped += temp.length; 3952 return ret; 3953 } 3954 3955 static if (bufferTrick) size_t makeTemp() 3956 { 3957 import std.algorithm.mutation : swap; 3958 rangeMutex.lock(); 3959 scope(exit) rangeMutex.unlock(); 3960 3961 // Elide copying by just swapping buffers. 3962 temp.length = range.buf1.length; 3963 swap(range.buf1, temp); 3964 3965 // This is necessary in case popFront() has been called on 3966 // range before entering the parallel foreach loop. 3967 temp = temp[range.bufPos..$]; 3968 3969 static if (is(typeof(range._length))) 3970 { 3971 range._length -= (temp.length - range.bufPos); 3972 } 3973 3974 range.doBufSwap(); 3975 auto ret = nPopped; 3976 nPopped += temp.length; 3977 return ret; 3978 } 3979 } 3980 3981 while (atomicLoad(shouldContinue)) 3982 { 3983 auto overallIndex = makeTemp(); 3984 if (temp.empty) 3985 { 3986 atomicStore(shouldContinue, false); 3987 break; 3988 } 3989 3990 foreach (i; 0 .. temp.length) 3991 { 3992 scope(success) overallIndex++; 3993 3994 static if (hasLvalueElements!R) 3995 { 3996 static if (withIndex) 3997 { 3998 if (dg(overallIndex, *temp[i])) foreachErr(); 3999 } 4000 else 4001 { 4002 if (dg(*temp[i])) foreachErr(); 4003 } 4004 } 4005 else 4006 { 4007 static if (withIndex) 4008 { 4009 if (dg(overallIndex, temp[i])) foreachErr(); 4010 } 4011 else 4012 { 4013 if (dg(temp[i])) foreachErr(); 4014 } 4015 } 4016 } 4017 } 4018 } 4019 4020 submitAndExecute(pool, &doIt); 4021 4022 return 0; 4023 }; 4024 4025 4026 private struct ParallelForeach(R) 4027 { 4028 TaskPool pool; 4029 R range; 4030 size_t workUnitSize; 4031 alias E = ElementType!R; 4032 4033 static if (hasLvalueElements!R) 4034 { 4035 alias NoIndexDg = int delegate(ref E); 4036 alias IndexDg = int delegate(size_t, ref E); 4037 } 4038 else 4039 { 4040 alias NoIndexDg = int delegate(E); 4041 alias IndexDg = int delegate(size_t, E); 4042 } 4043 4044 int opApply(scope NoIndexDg dg) 4045 { 4046 static if (randLen!R) 4047 { 4048 mixin(parallelApplyMixinRandomAccess); 4049 } 4050 else 4051 { 4052 mixin(parallelApplyMixinInputRange); 4053 } 4054 } 4055 4056 int opApply(scope IndexDg dg) 4057 { 4058 static if (randLen!R) 4059 { 4060 mixin(parallelApplyMixinRandomAccess); 4061 } 4062 else 4063 { 4064 mixin(parallelApplyMixinInputRange); 4065 } 4066 } 4067 } 4068 4069 /* 4070 This struct buffers the output of a callable that outputs data into a 4071 user-supplied buffer into a set of buffers of some fixed size. It allows these 4072 buffers to be accessed with an input range interface. This is used internally 4073 in the buffer-recycling overload of TaskPool.asyncBuf, which creates an 4074 instance and forwards it to the input range overload of asyncBuf. 4075 */ 4076 private struct RoundRobinBuffer(C1, C2) 4077 { 4078 // No need for constraints because they're already checked for in asyncBuf. 4079 4080 alias Array = Parameters!(C1.init)[0]; 4081 alias T = typeof(Array.init[0]); 4082 4083 T[][] bufs; 4084 size_t index; 4085 C1 nextDel; 4086 C2 emptyDel; 4087 bool _empty; 4088 bool primed; 4089 4090 this( 4091 C1 nextDel, 4092 C2 emptyDel, 4093 size_t initialBufSize, 4094 size_t nBuffers 4095 ) { 4096 this.nextDel = nextDel; 4097 this.emptyDel = emptyDel; 4098 bufs.length = nBuffers; 4099 4100 foreach (ref buf; bufs) 4101 { 4102 buf.length = initialBufSize; 4103 } 4104 } 4105 4106 void prime() 4107 in 4108 { 4109 assert(!empty); 4110 } 4111 do 4112 { 4113 scope(success) primed = true; 4114 nextDel(bufs[index]); 4115 } 4116 4117 4118 T[] front() @property 4119 in 4120 { 4121 assert(!empty); 4122 } 4123 do 4124 { 4125 if (!primed) prime(); 4126 return bufs[index]; 4127 } 4128 4129 void popFront() 4130 { 4131 if (empty || emptyDel()) 4132 { 4133 _empty = true; 4134 return; 4135 } 4136 4137 index = (index + 1) % bufs.length; 4138 primed = false; 4139 } 4140 4141 bool empty() @property const @safe pure nothrow 4142 { 4143 return _empty; 4144 } 4145 } 4146 4147 version (StdUnittest) 4148 { 4149 // This was the only way I could get nested maps to work. 4150 private __gshared TaskPool poolInstance; 4151 } 4152 4153 // These test basic functionality but don't stress test for threading bugs. 4154 // These are the tests that should be run every time Phobos is compiled. 4155 @system unittest 4156 { 4157 import std.algorithm.comparison : equal, min, max; 4158 import std.algorithm.iteration : filter, map, reduce; 4159 import std.array : split; 4160 import std.conv : text; 4161 import std.exception : assertThrown; 4162 import std.math : approxEqual, sqrt, log; 4163 import std.range : indexed, iota, join; 4164 import std.typecons : Tuple, tuple; 4165 import std.stdio; 4166 4167 poolInstance = new TaskPool(2); 4168 scope(exit) poolInstance.stop(); 4169 4170 // The only way this can be verified is manually. 4171 debug(std_parallelism) stderr.writeln("totalCPUs = ", totalCPUs); 4172 4173 auto oldPriority = poolInstance.priority; 4174 poolInstance.priority = Thread.PRIORITY_MAX; 4175 assert(poolInstance.priority == Thread.PRIORITY_MAX); 4176 4177 poolInstance.priority = Thread.PRIORITY_MIN; 4178 assert(poolInstance.priority == Thread.PRIORITY_MIN); 4179 4180 poolInstance.priority = oldPriority; 4181 assert(poolInstance.priority == oldPriority); 4182 4183 static void refFun(ref uint num) 4184 { 4185 num++; 4186 } 4187 4188 uint x; 4189 4190 // Test task(). 4191 auto t = task!refFun(x); 4192 poolInstance.put(t); 4193 t.yieldForce; 4194 assert(t.args[0] == 1); 4195 4196 auto t2 = task(&refFun, x); 4197 poolInstance.put(t2); 4198 t2.yieldForce; 4199 assert(t2.args[0] == 1); 4200 4201 // Test scopedTask(). 4202 auto st = scopedTask!refFun(x); 4203 poolInstance.put(st); 4204 st.yieldForce; 4205 assert(st.args[0] == 1); 4206 4207 auto st2 = scopedTask(&refFun, x); 4208 poolInstance.put(st2); 4209 st2.yieldForce; 4210 assert(st2.args[0] == 1); 4211 4212 // Test executeInNewThread(). 4213 auto ct = scopedTask!refFun(x); 4214 ct.executeInNewThread(Thread.PRIORITY_MAX); 4215 ct.yieldForce; 4216 assert(ct.args[0] == 1); 4217 4218 // Test ref return. 4219 uint toInc = 0; 4220 static ref T makeRef(T)(ref T num) 4221 { 4222 return num; 4223 } 4224 4225 auto t3 = task!makeRef(toInc); 4226 taskPool.put(t3); 4227 assert(t3.args[0] == 0); 4228 t3.spinForce++; 4229 assert(t3.args[0] == 1); 4230 4231 static void testSafe() @safe { 4232 static int bump(int num) 4233 { 4234 return num + 1; 4235 } 4236 4237 auto safePool = new TaskPool(0); 4238 auto t = task(&bump, 1); 4239 taskPool.put(t); 4240 assert(t.yieldForce == 2); 4241 4242 auto st = scopedTask(&bump, 1); 4243 taskPool.put(st); 4244 assert(st.yieldForce == 2); 4245 safePool.stop(); 4246 } 4247 4248 auto arr = [1,2,3,4,5]; 4249 auto nums = new uint[5]; 4250 auto nums2 = new uint[5]; 4251 4252 foreach (i, ref elem; poolInstance.parallel(arr)) 4253 { 4254 elem++; 4255 nums[i] = cast(uint) i + 2; 4256 nums2[i] = elem; 4257 } 4258 4259 assert(nums == [2,3,4,5,6], text(nums)); 4260 assert(nums2 == nums, text(nums2)); 4261 assert(arr == nums, text(arr)); 4262 4263 // Test const/immutable arguments. 4264 static int add(int lhs, int rhs) 4265 { 4266 return lhs + rhs; 4267 } 4268 immutable addLhs = 1; 4269 immutable addRhs = 2; 4270 auto addTask = task(&add, addLhs, addRhs); 4271 auto addScopedTask = scopedTask(&add, addLhs, addRhs); 4272 poolInstance.put(addTask); 4273 poolInstance.put(addScopedTask); 4274 assert(addTask.yieldForce == 3); 4275 assert(addScopedTask.yieldForce == 3); 4276 4277 // Test parallel foreach with non-random access range. 4278 auto range = filter!"a != 666"([0, 1, 2, 3, 4]); 4279 4280 foreach (i, elem; poolInstance.parallel(range)) 4281 { 4282 nums[i] = cast(uint) i; 4283 } 4284 4285 assert(nums == [0,1,2,3,4]); 4286 4287 auto logs = new double[1_000_000]; 4288 foreach (i, ref elem; poolInstance.parallel(logs)) 4289 { 4290 elem = log(i + 1.0); 4291 } 4292 4293 foreach (i, elem; logs) 4294 { 4295 assert(approxEqual(elem, cast(double) log(i + 1))); 4296 } 4297 4298 assert(poolInstance.amap!"a * a"([1,2,3,4,5]) == [1,4,9,16,25]); 4299 assert(poolInstance.amap!"a * a"([1,2,3,4,5], new long[5]) == [1,4,9,16,25]); 4300 assert(poolInstance.amap!("a * a", "-a")([1,2,3]) == 4301 [tuple(1, -1), tuple(4, -2), tuple(9, -3)]); 4302 4303 auto tupleBuf = new Tuple!(int, int)[3]; 4304 poolInstance.amap!("a * a", "-a")([1,2,3], tupleBuf); 4305 assert(tupleBuf == [tuple(1, -1), tuple(4, -2), tuple(9, -3)]); 4306 poolInstance.amap!("a * a", "-a")([1,2,3], 5, tupleBuf); 4307 assert(tupleBuf == [tuple(1, -1), tuple(4, -2), tuple(9, -3)]); 4308 4309 // Test amap with a non-array buffer. 4310 auto toIndex = new int[5]; 4311 auto ind = indexed(toIndex, [3, 1, 4, 0, 2]); 4312 poolInstance.amap!"a * 2"([1, 2, 3, 4, 5], ind); 4313 assert(equal(ind, [2, 4, 6, 8, 10])); 4314 assert(equal(toIndex, [8, 4, 10, 2, 6])); 4315 poolInstance.amap!"a / 2"(ind, ind); 4316 assert(equal(ind, [1, 2, 3, 4, 5])); 4317 assert(equal(toIndex, [4, 2, 5, 1, 3])); 4318 4319 auto buf = new int[5]; 4320 poolInstance.amap!"a * a"([1,2,3,4,5], buf); 4321 assert(buf == [1,4,9,16,25]); 4322 poolInstance.amap!"a * a"([1,2,3,4,5], 4, buf); 4323 assert(buf == [1,4,9,16,25]); 4324 4325 assert(poolInstance.reduce!"a + b"([1]) == 1); 4326 assert(poolInstance.reduce!"a + b"([1,2,3,4]) == 10); 4327 assert(poolInstance.reduce!"a + b"(0.0, [1,2,3,4]) == 10); 4328 assert(poolInstance.reduce!"a + b"(0.0, [1,2,3,4], 1) == 10); 4329 assert(poolInstance.reduce!(min, max)([1,2,3,4]) == tuple(1, 4)); 4330 assert(poolInstance.reduce!("a + b", "a * b")(tuple(0, 1), [1,2,3,4]) == 4331 tuple(10, 24)); 4332 4333 immutable serialAns = reduce!"a + b"(iota(1000)); 4334 assert(poolInstance.reduce!"a + b"(0, iota(1000)) == serialAns); 4335 assert(poolInstance.reduce!"a + b"(iota(1000)) == serialAns); 4336 4337 // Test worker-local storage. 4338 auto wl = poolInstance.workerLocalStorage(0); 4339 foreach (i; poolInstance.parallel(iota(1000), 1)) 4340 { 4341 wl.get = wl.get + i; 4342 } 4343 4344 auto wlRange = wl.toRange; 4345 auto parallelSum = poolInstance.reduce!"a + b"(wlRange); 4346 assert(parallelSum == 499500); 4347 assert(wlRange[0 .. 1][0] == wlRange[0]); 4348 assert(wlRange[1 .. 2][0] == wlRange[1]); 4349 4350 // Test finish() 4351 { 4352 static void slowFun() { Thread.sleep(dur!"msecs"(1)); } 4353 4354 auto pool1 = new TaskPool(); 4355 auto tSlow = task!slowFun(); 4356 pool1.put(tSlow); 4357 pool1.finish(); 4358 tSlow.yieldForce; 4359 // Can't assert that pool1.status == PoolState.stopNow because status 4360 // doesn't change until after the "done" flag is set and the waiting 4361 // thread is woken up. 4362 4363 auto pool2 = new TaskPool(); 4364 auto tSlow2 = task!slowFun(); 4365 pool2.put(tSlow2); 4366 pool2.finish(true); // blocking 4367 assert(tSlow2.done); 4368 4369 // Test fix for https://issues.dlang.org/show_bug.cgi?id=8582 by making pool size zero. 4370 auto pool3 = new TaskPool(0); 4371 auto tSlow3 = task!slowFun(); 4372 pool3.put(tSlow3); 4373 pool3.finish(true); // blocking 4374 assert(tSlow3.done); 4375 4376 // This is correct because no thread will terminate unless pool2.status 4377 // and pool3.status have already been set to stopNow. 4378 assert(pool2.status == TaskPool.PoolState.stopNow); 4379 assert(pool3.status == TaskPool.PoolState.stopNow); 4380 } 4381 4382 // Test default pool stuff. 4383 assert(taskPool.size == totalCPUs - 1); 4384 4385 nums = new uint[1000]; 4386 foreach (i; parallel(iota(1000))) 4387 { 4388 nums[i] = cast(uint) i; 4389 } 4390 assert(equal(nums, iota(1000))); 4391 4392 assert(equal( 4393 poolInstance.map!"a * a"(iota(3_000_001), 10_000), 4394 map!"a * a"(iota(3_000_001)) 4395 )); 4396 4397 // The filter is to kill random access and test the non-random access 4398 // branch. 4399 assert(equal( 4400 poolInstance.map!"a * a"( 4401 filter!"a == a"(iota(3_000_001) 4402 ), 10_000, 1000), 4403 map!"a * a"(iota(3_000_001)) 4404 )); 4405 4406 assert( 4407 reduce!"a + b"(0UL, 4408 poolInstance.map!"a * a"(iota(300_001), 10_000) 4409 ) == 4410 reduce!"a + b"(0UL, 4411 map!"a * a"(iota(300_001)) 4412 ) 4413 ); 4414 4415 assert(equal( 4416 iota(1_000_002), 4417 poolInstance.asyncBuf(filter!"a == a"(iota(1_000_002))) 4418 )); 4419 4420 { 4421 import std.conv : to; 4422 import std.file : deleteme; 4423 4424 string temp_file = deleteme ~ "-tempDelMe.txt"; 4425 auto file = File(temp_file, "wb"); 4426 scope(exit) 4427 { 4428 file.close(); 4429 import std.file; 4430 remove(temp_file); 4431 } 4432 4433 auto written = [[1.0, 2, 3], [4.0, 5, 6], [7.0, 8, 9]]; 4434 foreach (row; written) 4435 { 4436 file.writeln(join(to!(string[])(row), "\t")); 4437 } 4438 4439 file = File(temp_file); 4440 4441 void next(ref char[] buf) 4442 { 4443 file.readln(buf); 4444 import std..string : chomp; 4445 buf = chomp(buf); 4446 } 4447 4448 double[][] read; 4449 auto asyncReader = taskPool.asyncBuf(&next, &file.eof); 4450 4451 foreach (line; asyncReader) 4452 { 4453 if (line.length == 0) continue; 4454 auto ls = line.split("\t"); 4455 read ~= to!(double[])(ls); 4456 } 4457 4458 assert(read == written); 4459 file.close(); 4460 } 4461 4462 // Test Map/AsyncBuf chaining. 4463 4464 auto abuf = poolInstance.asyncBuf(iota(-1.0, 3_000_000), 100); 4465 auto temp = poolInstance.map!sqrt( 4466 abuf, 100, 5 4467 ); 4468 auto lmchain = poolInstance.map!"a * a"(temp, 100, 5); 4469 lmchain.popFront(); 4470 4471 int ii; 4472 foreach ( elem; (lmchain)) 4473 { 4474 if (!approxEqual(elem, ii)) 4475 { 4476 stderr.writeln(ii, '\t', elem); 4477 } 4478 ii++; 4479 } 4480 4481 // Test buffer trick in parallel foreach. 4482 abuf = poolInstance.asyncBuf(iota(-1.0, 1_000_000), 100); 4483 abuf.popFront(); 4484 auto bufTrickTest = new size_t[abuf.length]; 4485 foreach (i, elem; parallel(abuf)) 4486 { 4487 bufTrickTest[i] = i; 4488 } 4489 4490 assert(equal(iota(1_000_000), bufTrickTest)); 4491 4492 auto myTask = task!(std.math.abs)(-1); 4493 taskPool.put(myTask); 4494 assert(myTask.spinForce == 1); 4495 4496 // Test that worker local storage from one pool receives an index of 0 4497 // when the index is queried w.r.t. another pool. The only way to do this 4498 // is non-deterministically. 4499 foreach (i; parallel(iota(1000), 1)) 4500 { 4501 assert(poolInstance.workerIndex == 0); 4502 } 4503 4504 foreach (i; poolInstance.parallel(iota(1000), 1)) 4505 { 4506 assert(taskPool.workerIndex == 0); 4507 } 4508 4509 // Test exception handling. 4510 static void parallelForeachThrow() 4511 { 4512 foreach (elem; parallel(iota(10))) 4513 { 4514 throw new Exception(""); 4515 } 4516 } 4517 4518 assertThrown!Exception(parallelForeachThrow()); 4519 4520 static int reduceException(int a, int b) 4521 { 4522 throw new Exception(""); 4523 } 4524 4525 assertThrown!Exception( 4526 poolInstance.reduce!reduceException(iota(3)) 4527 ); 4528 4529 static int mapException(int a) 4530 { 4531 throw new Exception(""); 4532 } 4533 4534 assertThrown!Exception( 4535 poolInstance.amap!mapException(iota(3)) 4536 ); 4537 4538 static void mapThrow() 4539 { 4540 auto m = poolInstance.map!mapException(iota(3)); 4541 m.popFront(); 4542 } 4543 4544 assertThrown!Exception(mapThrow()); 4545 4546 struct ThrowingRange 4547 { 4548 @property int front() 4549 { 4550 return 1; 4551 } 4552 void popFront() 4553 { 4554 throw new Exception(""); 4555 } 4556 enum bool empty = false; 4557 } 4558 4559 assertThrown!Exception(poolInstance.asyncBuf(ThrowingRange.init)); 4560 } 4561 4562 //version = parallelismStressTest; 4563 4564 // These are more like stress tests than real unit tests. They print out 4565 // tons of stuff and should not be run every time make unittest is run. 4566 version (parallelismStressTest) 4567 { 4568 @safe unittest 4569 { 4570 size_t attempt; 4571 for (; attempt < 10; attempt++) 4572 foreach (poolSize; [0, 4]) 4573 { 4574 4575 poolInstance = new TaskPool(poolSize); 4576 4577 uint[] numbers = new uint[1_000]; 4578 4579 foreach (i; poolInstance.parallel( iota(0, numbers.length)) ) 4580 { 4581 numbers[i] = cast(uint) i; 4582 } 4583 4584 // Make sure it works. 4585 foreach (i; 0 .. numbers.length) 4586 { 4587 assert(numbers[i] == i); 4588 } 4589 4590 stderr.writeln("Done creating nums."); 4591 4592 4593 auto myNumbers = filter!"a % 7 > 0"( iota(0, 1000)); 4594 foreach (num; poolInstance.parallel(myNumbers)) 4595 { 4596 assert(num % 7 > 0 && num < 1000); 4597 } 4598 stderr.writeln("Done modulus test."); 4599 4600 uint[] squares = poolInstance.amap!"a * a"(numbers, 100); 4601 assert(squares.length == numbers.length); 4602 foreach (i, number; numbers) 4603 { 4604 assert(squares[i] == number * number); 4605 } 4606 stderr.writeln("Done squares."); 4607 4608 auto sumFuture = task!( reduce!"a + b" )(numbers); 4609 poolInstance.put(sumFuture); 4610 4611 ulong sumSquares = 0; 4612 foreach (elem; numbers) 4613 { 4614 sumSquares += elem * elem; 4615 } 4616 4617 uint mySum = sumFuture.spinForce(); 4618 assert(mySum == 999 * 1000 / 2); 4619 4620 auto mySumParallel = poolInstance.reduce!"a + b"(numbers); 4621 assert(mySum == mySumParallel); 4622 stderr.writeln("Done sums."); 4623 4624 auto myTask = task( 4625 { 4626 synchronized writeln("Our lives are parallel...Our lives are parallel."); 4627 }); 4628 poolInstance.put(myTask); 4629 4630 auto nestedOuter = "abcd"; 4631 auto nestedInner = iota(0, 10, 2); 4632 4633 foreach (i, letter; poolInstance.parallel(nestedOuter, 1)) 4634 { 4635 foreach (j, number; poolInstance.parallel(nestedInner, 1)) 4636 { 4637 synchronized writeln(i, ": ", letter, " ", j, ": ", number); 4638 } 4639 } 4640 4641 poolInstance.stop(); 4642 } 4643 4644 assert(attempt == 10); 4645 writeln("Press enter to go to next round of unittests."); 4646 readln(); 4647 } 4648 4649 // These unittests are intended more for actual testing and not so much 4650 // as examples. 4651 @safe unittest 4652 { 4653 foreach (attempt; 0 .. 10) 4654 foreach (poolSize; [0, 4]) 4655 { 4656 poolInstance = new TaskPool(poolSize); 4657 4658 // Test indexing. 4659 stderr.writeln("Creator Raw Index: ", poolInstance.threadIndex); 4660 assert(poolInstance.workerIndex() == 0); 4661 4662 // Test worker-local storage. 4663 auto workerLocalStorage = poolInstance.workerLocalStorage!uint(1); 4664 foreach (i; poolInstance.parallel(iota(0U, 1_000_000))) 4665 { 4666 workerLocalStorage.get++; 4667 } 4668 assert(reduce!"a + b"(workerLocalStorage.toRange) == 4669 1_000_000 + poolInstance.size + 1); 4670 4671 // Make sure work is reasonably balanced among threads. This test is 4672 // non-deterministic and is more of a sanity check than something that 4673 // has an absolute pass/fail. 4674 shared(uint)[void*] nJobsByThread; 4675 foreach (thread; poolInstance.pool) 4676 { 4677 nJobsByThread[cast(void*) thread] = 0; 4678 } 4679 nJobsByThread[ cast(void*) Thread.getThis()] = 0; 4680 4681 foreach (i; poolInstance.parallel( iota(0, 1_000_000), 100 )) 4682 { 4683 atomicOp!"+="( nJobsByThread[ cast(void*) Thread.getThis() ], 1); 4684 } 4685 4686 stderr.writeln("\nCurrent thread is: ", 4687 cast(void*) Thread.getThis()); 4688 stderr.writeln("Workload distribution: "); 4689 foreach (k, v; nJobsByThread) 4690 { 4691 stderr.writeln(k, '\t', v); 4692 } 4693 4694 // Test whether amap can be nested. 4695 real[][] matrix = new real[][](1000, 1000); 4696 foreach (i; poolInstance.parallel( iota(0, matrix.length) )) 4697 { 4698 foreach (j; poolInstance.parallel( iota(0, matrix[0].length) )) 4699 { 4700 matrix[i][j] = i * j; 4701 } 4702 } 4703 4704 // Get around weird bugs having to do w/ sqrt being an intrinsic: 4705 static real mySqrt(real num) 4706 { 4707 return sqrt(num); 4708 } 4709 4710 static real[] parallelSqrt(real[] nums) 4711 { 4712 return poolInstance.amap!mySqrt(nums); 4713 } 4714 4715 real[][] sqrtMatrix = poolInstance.amap!parallelSqrt(matrix); 4716 4717 foreach (i, row; sqrtMatrix) 4718 { 4719 foreach (j, elem; row) 4720 { 4721 real shouldBe = sqrt( cast(real) i * j); 4722 assert(approxEqual(shouldBe, elem)); 4723 sqrtMatrix[i][j] = shouldBe; 4724 } 4725 } 4726 4727 auto saySuccess = task( 4728 { 4729 stderr.writeln( 4730 "Success doing matrix stuff that involves nested pool use."); 4731 }); 4732 poolInstance.put(saySuccess); 4733 saySuccess.workForce(); 4734 4735 // A more thorough test of amap, reduce: Find the sum of the square roots of 4736 // matrix. 4737 4738 static real parallelSum(real[] input) 4739 { 4740 return poolInstance.reduce!"a + b"(input); 4741 } 4742 4743 auto sumSqrt = poolInstance.reduce!"a + b"( 4744 poolInstance.amap!parallelSum( 4745 sqrtMatrix 4746 ) 4747 ); 4748 4749 assert(approxEqual(sumSqrt, 4.437e8)); 4750 stderr.writeln("Done sum of square roots."); 4751 4752 // Test whether tasks work with function pointers. 4753 auto nanTask = task(&isNaN, 1.0L); 4754 poolInstance.put(nanTask); 4755 assert(nanTask.spinForce == false); 4756 4757 if (poolInstance.size > 0) 4758 { 4759 // Test work waiting. 4760 static void uselessFun() 4761 { 4762 foreach (i; 0 .. 1_000_000) {} 4763 } 4764 4765 auto uselessTasks = new typeof(task(&uselessFun))[1000]; 4766 foreach (ref uselessTask; uselessTasks) 4767 { 4768 uselessTask = task(&uselessFun); 4769 } 4770 foreach (ref uselessTask; uselessTasks) 4771 { 4772 poolInstance.put(uselessTask); 4773 } 4774 foreach (ref uselessTask; uselessTasks) 4775 { 4776 uselessTask.workForce(); 4777 } 4778 } 4779 4780 // Test the case of non-random access + ref returns. 4781 int[] nums = [1,2,3,4,5]; 4782 static struct RemoveRandom 4783 { 4784 int[] arr; 4785 4786 ref int front() 4787 { 4788 return arr.front; 4789 } 4790 void popFront() 4791 { 4792 arr.popFront(); 4793 } 4794 bool empty() 4795 { 4796 return arr.empty; 4797 } 4798 } 4799 4800 auto refRange = RemoveRandom(nums); 4801 foreach (ref elem; poolInstance.parallel(refRange)) 4802 { 4803 elem++; 4804 } 4805 assert(nums == [2,3,4,5,6], text(nums)); 4806 stderr.writeln("Nums: ", nums); 4807 4808 poolInstance.stop(); 4809 } 4810 } 4811 } 4812 4813 @system unittest 4814 { 4815 static struct __S_12733 4816 { 4817 invariant() { assert(checksum == 1_234_567_890); } 4818 this(ulong u){n = u;} 4819 void opAssign(__S_12733 s){this.n = s.n;} 4820 ulong n; 4821 ulong checksum = 1_234_567_890; 4822 } 4823 4824 static auto __genPair_12733(ulong n) { return __S_12733(n); } 4825 immutable ulong[] data = [ 2UL^^59-1, 2UL^^59-1, 2UL^^59-1, 112_272_537_195_293UL ]; 4826 4827 auto result = taskPool.amap!__genPair_12733(data); 4828 } 4829 4830 @safe unittest 4831 { 4832 import std.range : iota; 4833 4834 // this test was in std.range, but caused cycles. 4835 assert(__traits(compiles, { foreach (i; iota(0, 100UL).parallel) {} })); 4836 } 4837 4838 @safe unittest 4839 { 4840 import std.algorithm.iteration : each; 4841 4842 long[] arr; 4843 static assert(is(typeof({ 4844 arr.parallel.each!"a++"; 4845 }))); 4846 } 4847 4848 // https://issues.dlang.org/show_bug.cgi?id=17539 4849 @system unittest 4850 { 4851 import std.random : rndGen; 4852 // ensure compilation 4853 try foreach (rnd; rndGen.parallel) break; 4854 catch (ParallelForeachError e) {} 4855 }