Merge pull request #1154 from 9rnsr/fix_indent

Fix code format issues in std.parallelism
This commit is contained in:
Jonathan M Davis 2013-02-23 22:23:23 -08:00
commit 82b4f1c901

View file

@ -39,12 +39,12 @@ Synopsis:
import std.algorithm, std.parallelism, std.range;
void main() {
// Parallel reduce can be combined with
// std.algorithm.map to interesting effect.
// The following example (thanks to Russel Winder)
// calculates pi by quadrature using
// Parallel reduce can be combined with
// std.algorithm.map to interesting effect.
// The following example (thanks to Russel Winder)
// calculates pi by quadrature using
// std.algorithm.map and TaskPool.reduce.
// getTerm is evaluated in parallel as needed by
// getTerm is evaluated in parallel as needed by
// TaskPool.reduce.
//
// Timings on an Athlon 64 X2 dual core machine:
@ -209,18 +209,18 @@ private template MapType(R, functions...)
{
static if(functions.length == 0)
{
alias typeof(unaryFun!(functions[0])(ElementType!(R).init)) MapType;
alias typeof(unaryFun!(functions[0])(ElementType!R.init)) MapType;
}
else
{
alias typeof(adjoin!(staticMap!(unaryFun, functions))
(ElementType!(R).init)) MapType;
(ElementType!R.init)) MapType;
}
}
private template ReduceType(alias fun, R, E)
{
alias typeof(binaryFun!(fun)(E.init, ElementType!(R).init)) ReduceType;
alias typeof(binaryFun!fun(E.init, ElementType!R.init)) ReduceType;
}
private template noUnsharedAliasing(T)
@ -234,11 +234,10 @@ private template noUnsharedAliasing(T)
private template isSafeTask(F)
{
enum bool isSafeTask =
((functionAttributes!(F) & FunctionAttribute.safe) ||
(functionAttributes!(F) & FunctionAttribute.trusted)) &&
!(functionAttributes!F & FunctionAttribute.ref_) &&
(isFunctionPointer!F || !hasUnsharedAliasing!F) &&
allSatisfy!(noUnsharedAliasing, ParameterTypeTuple!F);
(functionAttributes!F & (FunctionAttribute.safe | FunctionAttribute.trusted)) != 0 &&
(functionAttributes!F & FunctionAttribute.ref_) == 0 &&
(isFunctionPointer!F || !hasUnsharedAliasing!F) &&
allSatisfy!(noUnsharedAliasing, ParameterTypeTuple!F);
}
unittest
@ -248,13 +247,13 @@ unittest
alias void function(uint, string) @trusted F3;
alias void function(uint, char[]) F4;
static assert(isSafeTask!(F1));
static assert(!isSafeTask!(F2));
static assert(isSafeTask!(F3));
static assert(!isSafeTask!(F4));
static assert( isSafeTask!F1);
static assert(!isSafeTask!F2);
static assert( isSafeTask!F3);
static assert(!isSafeTask!F4);
alias uint[] function(uint, string) pure @trusted F5;
static assert(isSafeTask!(F5));
static assert( isSafeTask!F5);
}
// This function decides whether Tasks that meet all of the other requirements
@ -371,11 +370,8 @@ private template isRoundRobin(T)
unittest
{
static assert(isRoundRobin!(
RoundRobinBuffer!(void delegate(char[]), bool delegate())
));
static assert(!isRoundRobin!uint);
static assert( isRoundRobin!(RoundRobinBuffer!(void delegate(char[]), bool delegate())));
static assert(!isRoundRobin!(uint));
}
// This is the base "class" for all of the other tasks. Using C-style
@ -392,24 +388,24 @@ private struct AbstractTask
ubyte taskStatus = TaskStatus.notStarted;
bool done() @property
{
if(atomicReadUbyte(taskStatus) == TaskStatus.done)
{
if(exception)
if(atomicReadUbyte(taskStatus) == TaskStatus.done)
{
throw exception;
if(exception)
{
throw exception;
}
return true;
}
return true;
return false;
}
return false;
}
void job()
{
runTask(&this);
}
void job()
{
runTask(&this);
}
}
/**
@ -442,8 +438,7 @@ Bugs: Changes to $(D ref) and $(D out) arguments are not propagated to the
*/
struct Task(alias fun, Args...)
{
AbstractTask base = {runTask :
&impl};
AbstractTask base = {runTask : &impl};
alias base this;
private @property AbstractTask* basePtr()
@ -773,7 +768,7 @@ AbstractTask base = {runTask :
// Calls $(D fpOrDelegate) with $(D args). This is an
// adapter that makes $(D Task) work with delegates, function pointers and
// functors instead of just aliases.
ReturnType!(F) run(F, Args...)(F fpOrDelegate, ref Args args)
ReturnType!F run(F, Args...)(F fpOrDelegate, ref Args args)
{
return fpOrDelegate(args);
}
@ -793,7 +788,7 @@ import std.file;
void main()
{
// Create and execute a Task for reading
// Create and execute a Task for reading
// foo.txt.
auto file1Task = task!read("foo.txt");
file1Task.executeInNewThread();
@ -808,10 +803,10 @@ void main()
---
// Sorts an array using a parallel quick sort algorithm.
// The first partition is done serially. Both recursion
// The first partition is done serially. Both recursion
// branches are then executed in parallel.
//
// Timings for sorting an array of 1,000,000 doubles on
// Timings for sorting an array of 1,000,000 doubles on
// an Athlon 64 X2 dual core machine:
//
// This implementation: 176 milliseconds.
@ -837,7 +832,7 @@ void parallelSort(T)(T[] data)
greaterEqual = data[$ - greaterEqual.length..$];
// Execute both recursion branches in parallel.
auto recurseTask = task!(parallelSort)(greaterEqual);
auto recurseTask = task!parallelSort(greaterEqual);
taskPool.put(recurseTask);
parallelSort(less);
recurseTask.yieldForce;
@ -855,14 +850,14 @@ class/struct with overloaded opCall.
Examples:
---
// Read two files in at the same time again,
// but this time use a function pointer instead
// Read two files in at the same time again,
// but this time use a function pointer instead
// of an alias to represent std.file.read.
import std.file;
void main()
{
// Create and execute a Task for reading
// Create and execute a Task for reading
// foo.txt.
auto file1Task = task(&read, "foo.txt");
file1Task.executeInNewThread();
@ -1100,7 +1095,7 @@ private:
// This function performs initialization for each thread that affects
// thread local storage and therefore must be done from within the
// worker thread. It then calls executeWorkLoop().
// worker thread. It then calls executeWorkLoop().
void startWorkLoop()
{
// Initialize thread index.
@ -1110,7 +1105,7 @@ private:
threadIndex = nextThreadIndex;
nextThreadIndex++;
}
executeWorkLoop();
}
@ -1118,7 +1113,7 @@ private:
// until they terminate. It's also entered by non-worker threads when
// finish() is called with the blocking variable set to true.
void executeWorkLoop()
{
{
while(atomicReadUbyte(status) != PoolState.stopNow)
{
AbstractTask* task = pop();
@ -1188,7 +1183,7 @@ private:
void abstractPut(AbstractTask* task)
{
queueLock();
scope(exit) queueUnlock();
scope(exit) queueUnlock();
abstractPutNoSync(task);
}
@ -1217,7 +1212,7 @@ private:
"finish() or stop()."
);
}
task.next = null;
if (head is null) //Queue is empty.
{
@ -1243,7 +1238,7 @@ private:
"finish() or stop()."
);
}
if(head is null)
{
head = h;
@ -1414,8 +1409,7 @@ public:
}
immutable size_t eightSize = 4 * (this.size + 1);
auto ret = (rangeLen / eightSize) +
((rangeLen % eightSize == 0) ? 0 : 1);
auto ret = (rangeLen / eightSize) + ((rangeLen % eightSize == 0) ? 0 : 1);
return max(ret, 1);
}
@ -1673,8 +1667,7 @@ public:
}
else
{
auto buf = uninitializedArray!(MapType!(Args[0], functions)[])
(len);
auto buf = uninitializedArray!(MapType!(Args[0], functions)[])(len);
alias args args2;
alias Args Args2;
}
@ -1790,9 +1783,9 @@ public:
Examples:
---
// Pipeline reading a file, converting each line
// to a number, taking the logarithms of the numbers,
// and performing the additions necessary to find
// Pipeline reading a file, converting each line
// to a number, taking the logarithms of the numbers,
// and performing the additions necessary to find
// the sum of the logarithms.
auto lineRange = File("numberList.txt").byLine();
@ -1853,11 +1846,11 @@ public:
size_t bufPos;
bool lastTaskWaited;
static if(isRandomAccessRange!S)
{
alias S FromType;
static if(isRandomAccessRange!S)
{
alias S FromType;
void popSource()
void popSource()
{
static if(__traits(compiles, source[0..source.length]))
{
@ -1870,17 +1863,15 @@ public:
else
{
static assert(0, "S must have slicing for Map."
~ " " ~ R.stringof ~ " doesn't.");
~ " " ~ S.stringof ~ " doesn't.");
}
}
}
else static if(bufferTrick)
{
// Make sure we don't have the buffer recycling overload of
// asyncBuf.
static if(
{
// Make sure we don't have the buffer recycling overload of
// asyncBuf.
static if(
is(typeof(source.source)) &&
isRoundRobin!(typeof(source.source))
)
@ -1914,11 +1905,10 @@ public:
return from;
}
}
else
{
alias ElementType!(S)[] FromType;
alias ElementType!S[] FromType;
// The temporary array that data is copied to before being
// mapped.
@ -1940,155 +1930,155 @@ public:
}
static if(hasLength!S)
{
size_t _length;
{
size_t _length;
public @property size_t length() const pure nothrow @safe
public @property size_t length() const pure nothrow @safe
{
return _length;
}
}
this(S source, size_t bufSize, size_t workUnitSize, TaskPool pool)
{
static if(bufferTrick)
this(S source, size_t bufSize, size_t workUnitSize, TaskPool pool)
{
bufSize = source.buf1.length;
}
buf1.length = bufSize;
buf2.length = bufSize;
static if(!isRandomAccessRange!S)
{
from.length = bufSize;
}
this.workUnitSize = (workUnitSize == size_t.max) ?
pool.defaultWorkUnitSize(bufSize) : workUnitSize;
this.source = source;
this.pool = pool;
static if(hasLength!S)
{
_length = source.length;
}
buf1 = fillBuf(buf1);
submitBuf2();
}
// The from parameter is a dummy and ignored in the random access
// case.
E[] fillBuf(E[] buf)
{
static if(isRandomAccessRange!S)
{
auto toMap = take(source, buf.length);
scope(success) popSource();
}
else
{
auto toMap = dumpToFrom();
}
buf = buf[0..min(buf.length, toMap.length)];
// Handle as a special case:
if(pool.size == 0)
{
size_t index = 0;
foreach(elem; toMap)
static if(bufferTrick)
{
buf[index++] = fun(elem);
bufSize = source.buf1.length;
}
return buf;
}
pool.amap!(functions)(toMap, workUnitSize, buf);
return buf;
}
void submitBuf2()
in
{
assert(nextBufTask.prev is null);
assert(nextBufTask.next is null);
} body
{
// Hack to reuse the task object.
nextBufTask = typeof(nextBufTask).init;
nextBufTask._args[0] = &fillBuf;
nextBufTask._args[1] = buf2;
pool.put(nextBufTask);
}
void doBufSwap()
{
if(lastTaskWaited)
{
// Then the source is empty. Signal it here.
buf1 = null;
buf2 = null;
buf1.length = bufSize;
buf2.length = bufSize;
static if(!isRandomAccessRange!S)
{
from = null;
from.length = bufSize;
}
return;
this.workUnitSize = (workUnitSize == size_t.max) ?
pool.defaultWorkUnitSize(bufSize) : workUnitSize;
this.source = source;
this.pool = pool;
static if(hasLength!S)
{
_length = source.length;
}
buf1 = fillBuf(buf1);
submitBuf2();
}
buf2 = buf1;
buf1 = nextBufTask.yieldForce;
bufPos = 0;
if(source.empty)
// The from parameter is a dummy and ignored in the random access
// case.
E[] fillBuf(E[] buf)
{
lastTaskWaited = true;
static if(isRandomAccessRange!S)
{
auto toMap = take(source, buf.length);
scope(success) popSource();
}
else
{
auto toMap = dumpToFrom();
}
buf = buf[0..min(buf.length, toMap.length)];
// Handle as a special case:
if(pool.size == 0)
{
size_t index = 0;
foreach(elem; toMap)
{
buf[index++] = fun(elem);
}
return buf;
}
pool.amap!functions(toMap, workUnitSize, buf);
return buf;
}
void submitBuf2()
in
{
assert(nextBufTask.prev is null);
assert(nextBufTask.next is null);
} body
{
// Hack to reuse the task object.
nextBufTask = typeof(nextBufTask).init;
nextBufTask._args[0] = &fillBuf;
nextBufTask._args[1] = buf2;
pool.put(nextBufTask);
}
void doBufSwap()
{
if(lastTaskWaited)
{
// Then the source is empty. Signal it here.
buf1 = null;
buf2 = null;
static if(!isRandomAccessRange!S)
{
from = null;
}
return;
}
buf2 = buf1;
buf1 = nextBufTask.yieldForce;
bufPos = 0;
if(source.empty)
{
lastTaskWaited = true;
}
else
{
submitBuf2();
}
}
public:
@property auto front()
{
return buf1[bufPos];
}
void popFront()
{
static if(hasLength!S)
{
_length--;
}
bufPos++;
if(bufPos >= buf1.length)
{
doBufSwap();
}
}
static if(std.range.isInfinite!S)
{
enum bool empty = false;
}
else
{
submitBuf2();
bool empty() @property
{
// popFront() sets this when source is empty
return buf1.length == 0;
}
}
}
public:
@property auto front()
{
return buf1[bufPos];
}
void popFront()
{
static if(hasLength!S)
{
_length--;
}
bufPos++;
if(bufPos >= buf1.length)
{
doBufSwap();
}
}
static if(std.range.isInfinite!S)
{
enum bool empty = false;
}
else
{
bool empty() @property
{
// popFront() sets this when source is empty
return buf1.length == 0;
}
}
}
return new Map(source, bufSize, workUnitSize, this);
}
}
@ -2099,7 +2089,7 @@ public:
$(D source) into a buffer of $(D bufSize) elements in a worker thread,
while making prevously buffered elements from a second buffer, also of size
$(D bufSize), available via the range interface of the returned
object. The returned range has a length iff $(D hasLength!(S)).
object. The returned range has a length iff $(D hasLength!S).
$(D asyncBuf) is useful, for example, when performing expensive operations
on the elements of ranges that represent data on a disk or network.
@ -2156,10 +2146,10 @@ public:
static if(hasLength!S)
{
size_t _length;
size_t _length;
// Available if hasLength!(S).
public @property size_t length() const pure nothrow @safe
// Available if hasLength!S.
public @property size_t length() const pure nothrow @safe
{
return _length;
}
@ -2301,8 +2291,8 @@ public:
Examples:
---
// Fetch lines of a file in a background
// thread while processing prevously fetched
// Fetch lines of a file in a background
// thread while processing prevously fetched
// lines, without duplicating any lines.
auto file = File("foo.txt");
@ -2311,8 +2301,8 @@ public:
file.readln(buf);
}
// Fetch more lines in the background while we
// process the lines already read into memory
// Fetch more lines in the background while we
// process the lines already read into memory
// into a matrix of doubles.
double[][] matrix;
auto asyncReader = taskPool.asyncBuf(&next, &file.eof);
@ -2336,15 +2326,13 @@ public:
processes them is in queue. This is checked for at compile time
and will result in a static assertion failure.
*/
auto asyncBuf(C1, C2)
(C1 next, C2 empty, size_t initialBufSize = 0, size_t nBuffers = 100)
auto asyncBuf(C1, C2)(C1 next, C2 empty, size_t initialBufSize = 0, size_t nBuffers = 100)
if(is(typeof(C2.init()) : bool) &&
ParameterTypeTuple!(C1).length == 1 &&
ParameterTypeTuple!(C2).length == 0 &&
isArray!(ParameterTypeTuple!(C1)[0])
ParameterTypeTuple!C1.length == 1 &&
ParameterTypeTuple!C2.length == 0 &&
isArray!(ParameterTypeTuple!C1[0])
) {
auto roundRobin = RoundRobinBuffer!(C1, C2)
(next, empty, initialBufSize, nBuffers);
auto roundRobin = RoundRobinBuffer!(C1, C2)(next, empty, initialBufSize, nBuffers);
return asyncBuf(roundRobin, nBuffers / 2);
}
@ -2380,7 +2368,7 @@ public:
those generated by $(XREF algorithm, _reduce) or depending on how many work
units are used. The next argument must be the range to be reduced.
---
// Find the sum of squares of a range in parallel, using
// Find the sum of squares of a range in parallel, using
// an explicit seed.
//
// Timings on an Athlon 64 X2 dual core machine:
@ -2397,7 +2385,7 @@ public:
is used as a seed. For the final reduction, the result from the first
work unit is used as the seed.
---
// Find the sum of a range in parallel, using the first
// Find the sum of a range in parallel, using the first
// element of each work unit as the seed.
auto sum = taskPool.reduce!"a + b"(nums);
---
@ -2436,8 +2424,8 @@ public:
///
auto reduce(Args...)(Args args)
{
alias reduceAdjoin!(functions) fun;
alias reduceFinish!(functions) finishFun;
alias reduceAdjoin!functions fun;
alias reduceFinish!functions finishFun;
static if(isIntegral!(Args[$ - 1]))
{
@ -2459,8 +2447,7 @@ public:
}
else
{
typeof(adjoin!(staticMap!(binaryFun, functions))(e, e))
seed = void;
typeof(adjoin!(staticMap!(binaryFun, functions))(e, e)) seed = void;
foreach (i, T; seed.Types)
{
auto p = (cast(void*) &seed.expand[i])
@ -2505,8 +2492,7 @@ public:
alias typeof(seed) E;
alias typeof(range) R;
E reduceOnRange
(R range, size_t lowerBound, size_t upperBound)
E reduceOnRange(R range, size_t lowerBound, size_t upperBound)
{
// This is for exploiting instruction level parallelism by
// using multiple accumulator variables within each thread,
@ -2603,8 +2589,7 @@ public:
workUnitSize = len;
}
immutable size_t nWorkUnits = (len / workUnitSize) +
((len % workUnitSize == 0) ? 0 : 1);
immutable size_t nWorkUnits = (len / workUnitSize) + ((len % workUnitSize == 0) ? 0 : 1);
assert(nWorkUnits * workUnitSize >= len);
alias Task!(run, typeof(&reduceOnRange), R, size_t, size_t) RTask;
@ -2740,10 +2725,10 @@ public:
Examples:
---
// Execute a loop that computes the greatest common
// divisor of every number from 0 through 999 with
// Execute a loop that computes the greatest common
// divisor of every number from 0 through 999 with
// 42 in parallel. Write the results out to
// a set of files, one for each thread. This allows
// a set of files, one for each thread. This allows
// results to be written out without any synchronization.
import std.conv, std.range, std.numeric, std.stdio;
@ -2773,9 +2758,8 @@ public:
size_t workerIndex() @property @safe const nothrow
{
immutable rawInd = threadIndex;
return (rawInd >= instanceStartIndex &&
rawInd < instanceStartIndex + size) ?
(rawInd - instanceStartIndex + 1) : 0;
return (rawInd >= instanceStartIndex && rawInd < instanceStartIndex + size) ?
(rawInd - instanceStartIndex + 1) : 0;
}
/**
@ -2965,7 +2949,7 @@ public:
atomicSetUbyte(barrierDummy, 1);
}
return WorkerLocalStorageRange!(T)(this);
return WorkerLocalStorageRange!T(this);
}
}
@ -2989,7 +2973,7 @@ public:
size_t _length;
size_t beginOffset;
this(WorkerLocalStorage!(T) wl)
this(WorkerLocalStorage!T wl)
{
this.workerLocalStorage = wl;
_length = wl.size;
@ -3066,9 +3050,9 @@ public:
create one instance of a class for each worker. For usage example,
see the $(D WorkerLocalStorage) struct.
*/
WorkerLocalStorage!(T) workerLocalStorage(T)(lazy T initialVal = T.init)
WorkerLocalStorage!T workerLocalStorage(T)(lazy T initialVal = T.init)
{
WorkerLocalStorage!(T) ret;
WorkerLocalStorage!T ret;
ret.initialize(this);
foreach(i; 0..size + 1)
{
@ -3109,10 +3093,10 @@ public:
before returning. This option might be used in applications where
task results are never consumed-- e.g. when $(D TaskPool) is employed as a
rudimentary scheduler for tasks which communicate by means other than
return values.
return values.
Warning: Calling this function with $(D blocking = true) from a worker
thread that is a member of the same $(D TaskPool) that
thread that is a member of the same $(D TaskPool) that
$(D finish) is being called on will result in a deadlock.
*/
void finish(bool blocking = false) @trusted
@ -3123,23 +3107,23 @@ public:
atomicCasUbyte(status, PoolState.running, PoolState.finishing);
notifyAll();
}
if (blocking)
if (blocking)
{
// Use this thread as a worker until everything is finished.
executeWorkLoop();
foreach(t; pool)
foreach(t; pool)
{
// Maybe there should be something here to prevent a thread
// from calling join() on itself if this function is called
// from a worker thread in the same pool, but:
//
// 1. Using an if statement to skip join() would result in
// 1. Using an if statement to skip join() would result in
// finish() returning without all tasks being finished.
//
// 2. If an exception were thrown, it would bubble up to the
// Task from which finish() was called and likely be
// swallowed.
// swallowed.
t.join();
}
}
@ -3333,8 +3317,8 @@ readable.
Example:
---
// Find the logarithm of every number from
// 1 to 1_000_000 in parallel, using the
// Find the logarithm of every number from
// 1 to 1_000_000 in parallel, using the
// default TaskPool instance.
auto logs = new double[1_000_000];
@ -3492,11 +3476,11 @@ int doSizeZeroCase(R, Delegate)(ref ParallelForeach!R p, Delegate dg)
// The explicit ElementType!R in the foreach loops is necessary for
// correct behavior when iterating over strings.
static if(hasLvalueElements!(R))
static if(hasLvalueElements!R)
{
foreach(ref ElementType!R elem; range)
{
static if(ParameterTypeTuple!(dg).length == 2)
static if(ParameterTypeTuple!dg.length == 2)
{
res = dg(index, elem);
}
@ -3512,7 +3496,7 @@ int doSizeZeroCase(R, Delegate)(ref ParallelForeach!R p, Delegate dg)
{
foreach(ElementType!R elem; range)
{
static if(ParameterTypeTuple!(dg).length == 2)
static if(ParameterTypeTuple!dg.length == 2)
{
res = dg(index, elem);
}
@ -3621,7 +3605,8 @@ enum string parallelApplyMixinInputRange = q{
enum bool bufferTrick = true;
}
else {
else
{
enum bool bufferTrick = false;
}
@ -3635,7 +3620,7 @@ enum string parallelApplyMixinInputRange = q{
static if(hasLvalueElements!R)
{
alias ElementType!(R)*[] Temp;
alias ElementType!R*[] Temp;
Temp temp;
// Returns: The previous value of nPopped.
@ -3643,7 +3628,7 @@ enum string parallelApplyMixinInputRange = q{
{
if(temp is null)
{
temp = uninitializedArray!(Temp)(workUnitSize);
temp = uninitializedArray!Temp(workUnitSize);
}
rangeMutex.lock();
@ -3665,7 +3650,7 @@ enum string parallelApplyMixinInputRange = q{
else
{
alias ElementType!(R)[] Temp;
alias ElementType!R[] Temp;
Temp temp;
// Returns: The previous value of nPopped.
@ -3673,7 +3658,7 @@ enum string parallelApplyMixinInputRange = q{
{
if(temp is null)
{
temp = uninitializedArray!(Temp)(workUnitSize);
temp = uninitializedArray!Temp(workUnitSize);
}
rangeMutex.lock();
@ -3786,7 +3771,8 @@ private void addToChain(
lastException.next = e;
lastException = findLastException(e);
}
else {
else
{
firstException = e;
lastException = findLastException(e);
}
@ -4117,7 +4103,7 @@ unittest
pool1.put(tSlow);
pool1.finish();
tSlow.yieldForce();
// Can't assert that pool1.status == PoolState.stopNow because status
// Can't assert that pool1.status == PoolState.stopNow because status
// doesn't change until after the "done" flag is set and the waiting
// thread is woken up.
@ -4126,14 +4112,14 @@ unittest
pool2.put(tSlow2);
pool2.finish(true); // blocking
assert(tSlow2.done);
// Test fix for Bug 8582 by making pool size zero.
auto pool3 = new TaskPool(0);
auto tSlow3 = task!slowFun();
pool3.put(tSlow3);
pool3.finish(true); // blocking
assert(tSlow3.done);
// This is correct because no thread will terminate unless pool2.status
// and pool3.status have already been set to stopNow.
assert(pool2.status == TaskPool.PoolState.stopNow);
@ -4300,7 +4286,8 @@ unittest
assertThrown!Exception(mapThrow());
struct ThrowingRange {
struct ThrowingRange
{
@property int front()
{
return 1;
@ -4390,8 +4377,7 @@ version(parallelismStressTest)
{
foreach(j, number; poolInstance.parallel(nestedInner, 1))
{
synchronized writeln
(i, ": ", letter, " ", j, ": ", number);
synchronized writeln(i, ": ", letter, " ", j, ": ", number);
}
}
@ -4417,7 +4403,7 @@ version(parallelismStressTest)
assert(poolInstance.workerIndex() == 0);
// Test worker-local storage.
auto workerLocalStorage = poolInstance.workerLocalStorage!(uint)(1);
auto workerLocalStorage = poolInstance.workerLocalStorage!uint(1);
foreach(i; poolInstance.parallel(iota(0U, 1_000_000)))
{
workerLocalStorage.get++;