A few hard-won microseconds shaved off the break-even point for amap, reduce and parallel foreach. This commit also adds some stuff to the unittests to make the intermittently failing FreeBSD build hopefully fail more often.

This commit is contained in:
dsimcha 2011-05-21 10:48:14 -04:00
parent d4f9af8a2d
commit 4177f2f636

View file

@ -279,7 +279,7 @@ private enum TaskStatus : ubyte {
// This is conceptually the base class for all Task types. The only Task type // This is conceptually the base class for all Task types. The only Task type
// that is public is the one actually named Task. There is also a slightly // that is public is the one actually named Task. There is also a slightly
// customized ParallelForeachTask and AMapTask. // customized ParallelForeachTask and AmapTask.
private template BaseMixin(ubyte initTaskStatus) { private template BaseMixin(ubyte initTaskStatus) {
AbstractTask* prev; AbstractTask* prev;
AbstractTask* next; AbstractTask* next;
@ -545,8 +545,8 @@ struct Task(alias fun, Args...) {
} }
} }
pool.lock(); pool.waiterLock();
scope(exit) pool.unlock(); scope(exit) pool.waiterUnlock();
while(atomicReadUbyte(this.taskStatus) != TaskStatus.done) { while(atomicReadUbyte(this.taskStatus) != TaskStatus.done) {
pool.waitUntilCompletion(); pool.waitUntilCompletion();
@ -583,7 +583,7 @@ struct Task(alias fun, Args...) {
} }
} }
pool.lock(); pool.queueLock();
AbstractTask* job; AbstractTask* job;
try { try {
// Locking explicitly and calling popNoSync() because // Locking explicitly and calling popNoSync() because
@ -591,7 +591,7 @@ struct Task(alias fun, Args...) {
// in the queue. // in the queue.
job = pool.popNoSync(); job = pool.popNoSync();
} finally { } finally {
pool.unlock(); pool.queueUnlock();
} }
if(job !is null) { if(job !is null) {
@ -880,7 +880,8 @@ private:
PoolState status = PoolState.running; PoolState status = PoolState.running;
Condition workerCondition; Condition workerCondition;
Condition waiterCondition; Condition waiterCondition;
Mutex mutex; Mutex queueMutex;
Mutex waiterMutex; // For waiterCondition
// The instanceStartIndex of the next instance that will be created. // The instanceStartIndex of the next instance that will be created.
__gshared static size_t nextInstanceIndex = 1; __gshared static size_t nextInstanceIndex = 1;
@ -907,9 +908,9 @@ private:
scope(exit) { scope(exit) {
if(!isSingleTask) { if(!isSingleTask) {
lock(); waiterLock();
notifyWaiters(); notifyWaiters();
unlock(); waiterUnlock();
} }
} }
@ -938,10 +939,10 @@ private:
// does more than one task. // does more than one task.
void workLoop() { void workLoop() {
// Initialize thread index. // Initialize thread index.
lock(); queueLock();
threadIndex = nextThreadIndex; threadIndex = nextThreadIndex;
nextThreadIndex++; nextThreadIndex++;
unlock(); queueUnlock();
while(atomicReadUbyte(status) != PoolState.stopNow) { while(atomicReadUbyte(status) != PoolState.stopNow) {
AbstractTask* task = pop(); AbstractTask* task = pop();
@ -958,13 +959,13 @@ private:
// Pop a task off the queue. // Pop a task off the queue.
AbstractTask* pop() { AbstractTask* pop() {
lock(); queueLock();
auto ret = popNoSync(); auto ret = popNoSync();
while(ret is null && status == PoolState.running) { while(ret is null && status == PoolState.running) {
wait(); wait();
ret = popNoSync(); ret = popNoSync();
} }
unlock(); queueUnlock();
return ret; return ret;
} }
@ -997,9 +998,9 @@ private:
// Push a task onto the queue. // Push a task onto the queue.
void abstractPut(AbstractTask* task) { void abstractPut(AbstractTask* task) {
lock(); queueLock();
abstractPutNoSync(task); abstractPutNoSync(task);
unlock(); queueUnlock();
} }
void abstractPutNoSync(AbstractTask* task) void abstractPutNoSync(AbstractTask* task)
@ -1023,6 +1024,19 @@ private:
notify(); notify();
} }
void abstractPutGroupNoSync(AbstractTask* h, AbstractTask* t) {
if(head is null) {
head = h;
tail = t;
} else {
h.prev = tail;
tail.next = h;
tail = t;
}
notifyAll();
}
void tryDeleteExecute(AbstractTask* toExecute) { void tryDeleteExecute(AbstractTask* toExecute) {
if(isSingleTask) return; if(isSingleTask) return;
@ -1044,17 +1058,13 @@ private:
} }
bool deleteItem(AbstractTask* item) { bool deleteItem(AbstractTask* item) {
lock(); queueLock();
auto ret = deleteItemNoSync(item); auto ret = deleteItemNoSync(item);
unlock(); queueUnlock();
return ret; return ret;
} }
bool deleteItemNoSync(AbstractTask* item) bool deleteItemNoSync(AbstractTask* item) {
out {
assert(item.next is null);
assert(item.prev is null);
} body {
if(item.taskStatus != TaskStatus.notStarted) { if(item.taskStatus != TaskStatus.notStarted) {
return false; return false;
} }
@ -1096,12 +1106,20 @@ private:
return (rangeLen / fourSize) + ((rangeLen % fourSize == 0) ? 0 : 1); return (rangeLen / fourSize) + ((rangeLen % fourSize == 0) ? 0 : 1);
} }
void lock() { void queueLock() {
if(!isSingleTask) mutex.lock(); if(!isSingleTask) queueMutex.lock();
} }
void unlock() { void queueUnlock() {
if(!isSingleTask) mutex.unlock(); if(!isSingleTask) queueMutex.unlock();
}
void waiterLock() {
if(!isSingleTask) waiterMutex.lock();
}
void waiterUnlock() {
if(!isSingleTask) waiterMutex.unlock();
} }
void wait() { void wait() {
@ -1191,9 +1209,10 @@ public:
nextInstanceIndex += nWorkers; nextInstanceIndex += nWorkers;
} }
mutex = new Mutex(this); queueMutex = new Mutex(this);
workerCondition = new Condition(mutex); waiterMutex = new Mutex();
waiterCondition = new Condition(mutex); workerCondition = new Condition(queueMutex);
waiterCondition = new Condition(waiterMutex);
pool = new Thread[nWorkers]; pool = new Thread[nWorkers];
foreach(ref poolThread; pool) { foreach(ref poolThread; pool) {
@ -1579,6 +1598,11 @@ public:
} }
} }
// Recycle an AmapImpl struct so we don't have to allocate
// one on every buffer swap. This declaration has to be down
// here b/c of weird forward referencing issues.
AmapImpl!(fun, FromType, E[]) amapImpl;
static if(hasLength!R) { static if(hasLength!R) {
size_t _length; size_t _length;
@ -1608,6 +1632,7 @@ public:
_length = range.length; _length = range.length;
} }
amapImpl.__ctor(pool, workUnitSize, FromType.init, buf1);
fillBuf(buf1); fillBuf(buf1);
submitBuf2(); submitBuf2();
} }
@ -1623,11 +1648,18 @@ public:
} }
buf = buf[0..min(buf.length, toMap.length)]; buf = buf[0..min(buf.length, toMap.length)];
pool.amap!(functions)(
toMap, // Handle as a special case:
workUnitSize, if(pool.size == 0) {
buf size_t index = 0;
); foreach(elem; toMap) {
buf[index++] = fun(elem);
}
return buf;
}
amapImpl.reuse(toMap, buf);
amapImpl.submitAndExecute();
return buf; return buf;
} }
@ -2018,6 +2050,7 @@ public:
size_t curPos = 0; size_t curPos = 0;
void useTask(ref RTask task) { void useTask(ref RTask task) {
task.pool = this;
task.args[3] = min(len, curPos + workUnitSize); // upper bound. task.args[3] = min(len, curPos + workUnitSize); // upper bound.
task.args[1] = range; // range task.args[1] = range; // range
@ -2030,15 +2063,36 @@ public:
} }
curPos += workUnitSize; curPos += workUnitSize;
put(task);
} }
foreach(ref task; tasks) { foreach(ref task; tasks) {
useTask(task); useTask(task);
} }
foreach(i; 1..tasks.length - 1) {
tasks[i].next = cast(AbstractTask*) &tasks[i + 1];
tasks[i + 1].prev = cast(AbstractTask*) &tasks[i];
}
if(tasks.length > 1) {
queueLock();
scope(exit) queueUnlock();
abstractPutGroupNoSync(
cast(AbstractTask*) &tasks[1],
cast(AbstractTask*) &tasks[$ - 1]
);
}
try {
(cast(AbstractTask*) &tasks[0]).job();
} catch(Throwable e) {
tasks[0].exception = e;
}
tasks[0].taskStatus = TaskStatus.done;
// Try to execute each of these in the current thread // Try to execute each of these in the current thread
foreach(ref task; tasks) { foreach(ref task; tasks[1..$]) {
tryDeleteExecute( cast(AbstractTask*) &task); tryDeleteExecute( cast(AbstractTask*) &task);
} }
@ -2349,8 +2403,8 @@ public:
need the results. need the results.
*/ */
void stop() @trusted { void stop() @trusted {
lock(); queueLock();
scope(exit) unlock(); scope(exit) queueUnlock();
atomicSetUbyte(status, PoolState.stopNow); atomicSetUbyte(status, PoolState.stopNow);
notifyAll(); notifyAll();
} }
@ -2397,8 +2451,8 @@ public:
not block. not block.
*/ */
void finish() @trusted { void finish() @trusted {
lock(); queueLock();
scope(exit) unlock(); scope(exit) queueUnlock();
atomicCasUbyte(status, PoolState.running, PoolState.finishing); atomicCasUbyte(status, PoolState.running, PoolState.finishing);
notifyAll(); notifyAll();
} }
@ -2482,15 +2536,15 @@ public:
setter has no effect. setter has no effect.
*/ */
bool isDaemon() @property @trusted { bool isDaemon() @property @trusted {
lock(); queueLock();
scope(exit) unlock(); scope(exit) queueUnlock();
return (size == 0) ? true : pool[0].isDaemon(); return (size == 0) ? true : pool[0].isDaemon();
} }
/// Ditto /// Ditto
void isDaemon(bool newVal) @property @trusted { void isDaemon(bool newVal) @property @trusted {
lock(); queueLock();
scope(exit) unlock(); scope(exit) queueUnlock();
foreach(thread; pool) { foreach(thread; pool) {
thread.isDaemon = newVal; thread.isDaemon = newVal;
} }
@ -2649,8 +2703,8 @@ if(isRandomAccessRange!R && hasLength!R) {
return; return;
} }
pool.lock(); pool.waiterLock();
scope(exit) pool.unlock(); scope(exit) pool.waiterUnlock();
// No trying to execute here b/c the function that waits on this task // No trying to execute here b/c the function that waits on this task
// wants to recycle it as soon as it finishes. // wants to recycle it as soon as it finishes.
@ -2713,8 +2767,8 @@ if(!isRandomAccessRange!R || !hasLength!R) {
return; return;
} }
pool.lock(); pool.waiterLock();
scope(exit) pool.unlock(); scope(exit) pool.waiterUnlock();
// Don't try to execute in this thread b/c the function that waits on // Don't try to execute in this thread b/c the function that waits on
// this task wants to recycle it as soon as it finishes. // this task wants to recycle it as soon as it finishes.
@ -2729,13 +2783,13 @@ if(!isRandomAccessRange!R || !hasLength!R) {
} }
} }
private struct AMapTask(alias fun, R, ReturnType) private struct AmapTask(alias fun, R, ReturnType)
if(isRandomAccessRange!R && hasLength!R) { if(isRandomAccessRange!R && hasLength!R) {
static void impl(void* myTask) { static void impl(void* myTask) {
auto myCastedTask = cast(AMapTask!(fun, R, ReturnType)*) myTask; auto myCastedTask = cast(AmapTask!(fun, R, ReturnType)*) myTask;
foreach(i; myCastedTask.lowerBound..myCastedTask.upperBound) { foreach(i; myCastedTask.lowerBound..myCastedTask.upperBound) {
myCastedTask.results[i] = uFun(myCastedTask.range[i]); myCastedTask.results[i] = fun(myCastedTask.range[i]);
} }
// Nullify stuff, make GC's life easier. // Nullify stuff, make GC's life easier.
@ -2748,7 +2802,6 @@ if(isRandomAccessRange!R && hasLength!R) {
TaskPool pool; TaskPool pool;
// More specific stuff. // More specific stuff.
alias unaryFun!fun uFun;
R range; R range;
alias ElementType!R E; alias ElementType!R E;
ReturnType results; ReturnType results;
@ -2761,8 +2814,8 @@ if(isRandomAccessRange!R && hasLength!R) {
return; return;
} }
pool.lock(); pool.waiterLock();
scope(exit) pool.unlock(); scope(exit) pool.waiterUnlock();
while(!done()) { while(!done()) {
pool.waitUntilCompletion(); pool.waitUntilCompletion();
@ -2798,13 +2851,45 @@ private mixin template ResubmittingTasks() {
void submitResubmittingTask(AbstractTask* toSubmit) { void submitResubmittingTask(AbstractTask* toSubmit) {
// Synchronizing on the pool to prevent some other thread from deleting // Synchronizing on the pool to prevent some other thread from deleting
// the job before it's submitted. // the job before it's submitted.
pool.lock(); pool.queueLock();
atomicSetUbyte(toSubmit.taskStatus, TaskStatus.notStarted); toSubmit.taskStatus = TaskStatus.notStarted;
pool.abstractPutNoSync(toSubmit); pool.abstractPutNoSync(toSubmit);
pool.unlock(); pool.queueUnlock();
} }
void submitJobs() { void submitJobs() {
AbstractTask* first; // Do in this thread.
AbstractTask* head; // For abstractPutGroupNoSync
AbstractTask* tail; // For abstactPutGroupNoSync
void doGroupSubmit() {
if(head is null) return;
pool.queueLock();
scope(exit) pool.queueUnlock();
auto t = head;
while(t !is null) {
assert(t.taskStatus == TaskStatus.done);
t.taskStatus = TaskStatus.notStarted;
t = t.next;
}
assert(head.prev is null);
assert(tail.next is null);
pool.abstractPutGroupNoSync(head, tail);
}
void doFirst() {
if(first is null) return;
try {
first.job();
} catch(Throwable e) {
first.exception = e;
}
atomicSetUbyte(first.taskStatus, TaskStatus.done);
}
// Search for slots. // Search for slots.
foreach(ref task; tasks) { foreach(ref task; tasks) {
try { try {
@ -2821,16 +2906,40 @@ private mixin template ResubmittingTasks() {
} }
useTask(task); useTask(task);
assert(task.next is null);
assert(task.prev is null);
if(first is null) {
first = cast(AbstractTask*) &task;
first.taskStatus = TaskStatus.inProgress;
} else if(head is null) {
head = tail = cast(AbstractTask*) &task;
} else {
auto at = cast(AbstractTask*) &task;
tail.next = at;
at.prev = tail;
tail = at;
}
if(emptyCheck()) { if(emptyCheck()) {
doGroupSubmit();
atomicSetUbyte(doneSubmitting, 1); atomicSetUbyte(doneSubmitting, 1);
doFirst();
return; return;
} }
} }
doGroupSubmit();
submitResubmittingTask(cast(AbstractTask*) &submitNextBatch); submitResubmittingTask(cast(AbstractTask*) &submitNextBatch);
doFirst();
} }
void submitAndExecute() { void submitAndExecute() {
if(tasks.length == 0) {
assert(range.empty);
return;
}
// See documentation for BaseMixin.shouldSetDone. // See documentation for BaseMixin.shouldSetDone.
submitNextBatch.shouldSetDone = false; submitNextBatch.shouldSetDone = false;
submitNextBatch.isScoped = false; submitNextBatch.isScoped = false;
@ -2875,7 +2984,7 @@ private struct AmapImpl(alias fun, Range, Buf) {
Buf buf; Buf buf;
size_t curPos; size_t curPos;
size_t len; size_t len;
alias AMapTask!(fun, Range, typeof(buf)) MTask; alias AmapTask!(fun, Range, typeof(buf)) MTask;
MTask[] tasks; MTask[] tasks;
this(TaskPool pool, size_t workUnitSize, Range range, Buf buf) { this(TaskPool pool, size_t workUnitSize, Range range, Buf buf) {
@ -2884,11 +2993,35 @@ private struct AmapImpl(alias fun, Range, Buf) {
this.range = range; this.range = range;
this.buf = buf; this.buf = buf;
submitNextBatch = scopedTask(&submitJobs); submitNextBatch = scopedTask(&submitJobs);
// Two tasks for every worker thread, plus two for the submitting
// thread.
tasks.length = pool.size * 2 + 2;
len = range.length; // In case evaluating length is expensive. len = range.length; // In case evaluating length is expensive.
// Four tasks for every worker thread, plus four for the submitting
// thread.
tasks.length = min(
pool.size * 4 + 4,
len / workUnitSize + (len % workUnitSize > 0)
);
}
// This funciton resets the struct for reuse. It's called from Map.
// This has to be done carefully to avoid touching submitNextBatch,
// since there's a slight chance it could still be referenced from the
// previous use.
void reuse(Range range, Buf buf) {
this.range = range;
this.buf = buf;
len = range.length;
curPos = 0;
firstException = null;
lastException = null;
doneSubmitting = 0;
tasks.length = min(
pool.size * 4 + 4,
len / workUnitSize + (len % workUnitSize > 0)
);
tasks[] = MTask.init;
} }
bool emptyCheck() { bool emptyCheck() {
@ -2903,8 +3036,6 @@ private struct AmapImpl(alias fun, Range, Buf) {
task.results = buf; task.results = buf;
task.pool = pool; task.pool = pool;
curPos += workUnitSize; curPos += workUnitSize;
submitResubmittingTask(cast(AbstractTask*) &task);
} }
} }
@ -2925,11 +3056,16 @@ if(randLen!Range) {
this.range = range; this.range = range;
this.dg = dg; this.dg = dg;
submitNextBatch = scopedTask(&submitJobs); submitNextBatch = scopedTask(&submitJobs);
len = range.length; // In case evaluating length is expensive.
// Two tasks for every worker thread, plus two for the submitting // Two tasks for every worker thread, plus two for the submitting
// thread. // thread.
tasks.length = pool.size * 2 + 2; // Four tasks for every worker thread, plus four for the submitting
len = range.length; // In case evaluating length is expensive. // thread.
tasks.length = min(
pool.size * 4 + 4,
len / workUnitSize + (len % workUnitSize > 0)
);
} }
void useTask(ref PTask task) { void useTask(ref PTask task) {
@ -2940,8 +3076,6 @@ if(randLen!Range) {
task.runMe = dg; task.runMe = dg;
task.pool = pool; task.pool = pool;
curPos += workUnitSize; curPos += workUnitSize;
submitResubmittingTask(cast(AbstractTask*) &task);
} }
bool emptyCheck() { bool emptyCheck() {
@ -2973,9 +3107,9 @@ if(!randLen!Range) {
this.dg = dg; this.dg = dg;
submitNextBatch = scopedTask(&submitJobs); submitNextBatch = scopedTask(&submitJobs);
// Two tasks for every worker thread, plus two for the submitting // Four tasks for every worker thread, plus four for the submitting
// thread. // thread.
tasks.length = pool.size * 2 + 2; tasks.length = pool.size * 4 + 4;
} }
void useTask(ref PTask task) { void useTask(ref PTask task) {
@ -3021,7 +3155,6 @@ if(!randLen!Range) {
task.startIndex = this.startIndex; task.startIndex = this.startIndex;
this.startIndex += task.elements.length; this.startIndex += task.elements.length;
submitResubmittingTask(cast(AbstractTask*) &task);
} }
bool emptyCheck() { bool emptyCheck() {
@ -3210,26 +3343,37 @@ unittest {
uint[] nums = appNums.data, nums2 = appNums2.data; uint[] nums = appNums.data, nums2 = appNums2.data;
sort!"a.at!0 < b.at!0"(zip(nums, nums2)); sort!"a.at!0 < b.at!0"(zip(nums, nums2));
assert(nums == [2,3,4,5,6]); assert(nums == [2,3,4,5,6], text(nums));
assert(nums2 == nums); assert(nums2 == nums, text(nums2));
assert(arr == nums); assert(arr == nums, text(arr));
// Test parallel foreach with non-random access range. // Test parallel foreach with non-random access range.
nums = null; appNums.clear();
nums2 = null; appNums2.clear();
auto range = filter!"a != 666"([0, 1, 2, 3, 4]); auto range = filter!"a != 666"([0, 1, 2, 3, 4]);
foreach(i, elem; poolInstance.parallel(range)) { foreach(i, elem; poolInstance.parallel(range)) {
synchronized { synchronized {
nums ~= cast(uint) i; appNums.put(cast(uint) i);
nums2 ~= cast(uint) i; appNums2.put(cast(uint) i);
} }
} }
nums = appNums.data;
nums2 = appNums2.data;
sort!"a.at!0 < b.at!0"(zip(nums, nums2)); sort!"a.at!0 < b.at!0"(zip(nums, nums2));
assert(nums == nums2); assert(nums == nums2);
assert(nums == [0,1,2,3,4]); assert(nums == [0,1,2,3,4]);
auto logs = new double[1_000_000];
foreach(i, ref elem; poolInstance.parallel(logs)) {
elem = log(i + 1.0);
}
foreach(i, elem; logs) {
assert(approxEqual(elem, cast(double) log(i + 1)));
}
assert(poolInstance.amap!"a * a"([1,2,3,4,5]) == [1,4,9,16,25]); assert(poolInstance.amap!"a * a"([1,2,3,4,5]) == [1,4,9,16,25]);
assert(poolInstance.amap!"a * a"([1,2,3,4,5], new long[5]) == [1,4,9,16,25]); assert(poolInstance.amap!"a * a"([1,2,3,4,5], new long[5]) == [1,4,9,16,25]);
assert(poolInstance.amap!("a * a", "-a")([1,2,3]) == assert(poolInstance.amap!("a * a", "-a")([1,2,3]) ==
@ -3269,12 +3413,13 @@ unittest {
// Test default pool stuff. // Test default pool stuff.
assert(taskPool.size == totalCPUs - 1); assert(taskPool.size == totalCPUs - 1);
nums = null; appNums.clear();
foreach(i; parallel(iota(1000))) { foreach(i; parallel(iota(1000))) {
synchronized { synchronized {
nums ~= i; appNums.put(i);
} }
} }
nums = appNums.data;
sort(nums); sort(nums);
assert(equal(nums, iota(1000))); assert(equal(nums, iota(1000)));
@ -3553,4 +3698,3 @@ version(parallelismStressTest) {
} }
} }
} }