High-perf SelectInParallel in 120 lines of C#
Monday, March 23, 2009 at 3:13PM A few months ago at Lokad, we started working on 8-core machines. Multi-core machines need adequate algorithmic design to leverage their processing power; and such a design can be more or less complicated depending of the algorithm that you are trying to parallelize.
In our case, there were many situations where the parallelization was quite straightforward: large loops, all iterations being independents. At that time, PLinq, the parallelization library from Microsoft wasn't still available as a final product (it will be shipped with Visual Studio 2010). Thus, since we were quite in a hurry, we decided to code our own SelectInParallel method (code being provided below). Basically, it's just Select but with a parallel execution for each item being selected.
Although, being surprisingly simple, we found out that, at least for Lokad, SelectInParallel alone was fitting virtually 99% of our multi-core parallelization needs.
Yet, when we did start to try to speed-up algorithms with our first SelectInParallel implementation, we did end-up stuck with poor speed-up ratio at 3x or even 2x where I was expecting near 8x speed-up.
At first, I thought it was an illustration of the Amdahl's law. But a more detailed performance investigation did show I was just plain wrong. The harsh reality was: threads, when not (very) carefully managed, involve a (very) significant overhead.
Our last SelectInParallel implementation is now 120 lines long with a quasi-negligible overhead, i.e. bringing a near linear speed-up with the number of CPU cores on your machine. Yet, this performance wasn't easy to achieve. Let's review two key aspects of the implementation.
Keep your main thread working: In the first implementation, we did follow the naive pattern: start N-threads (N being the number of CPUs), wait for them to finish, collect the results and proceed. Bad idea, if the amount of work happens to be small, then, simply waiting for your threads to start is going to be a performance killer. Instead, you should start N-1 threads, and get your calling thread working right away.
Avoid synchronization altogether: At first, we were using a Producer - Consumer threading pattern. Bad idea again: it produces a lot of locking contention, the work queue becoming the main bottleneck of the process. Instead, an arithmetic trick can be used to let the workers tackle disjoint workset right from the beginning and without any synchronization.
So far, we have been quite satisfied by our 120-lines ersatz to PLinq. Hope this piece of code can help a few other people to get the most of their many-core machines. If you have ideas to improve further the performance of this SelectInParallel implementation, just let me know.
using System;
using System.Threading;
namespace Lokad.Threading
{
///<summary>
/// Quick alternative to PLinq.
///</summary>
public static class ParallelExtensions
{
static int _threadCount = Environment.ProcessorCount;
/// <summary>Get or sets the number of threads to be used in
/// the parallel extensions. </summary>
public static int ThreadCount
{
get { return _threadCount; }
set
{
_threadCount = value;
}
}
/// <summary>Fast parallelization of a function over an array.</summary>
/// <param name="input">Input array to processed in parallel.</param>
/// <param name="func">The action to perform (parameters and all the members should be immutable!!!).</param>
/// <remarks>Threads are recycled. Synchronization overhead is minimal.</remarks>
public static TResult[] SelectInParallel<TItem, TResult>(this TItem[] input, Func<TItem,TResult> func)
{
var results = new TResult[input.Length];
if (_threadCount == 1 || input.Length == 1)
{
for(int i = 0; i < input.Length; i++)
{
results[i] = func(input[i]);
}
return results;
}
// perf: no more thread than items in collection
int threadCount = Math.Min(_threadCount, input.Length);
// perf: start by syncless process, then finish with light index-based sync
// to adjust varying execution time of the various threads.
int threshold = Math.Max(0, input.Length - (int) Math.Sqrt(input.Length) - 2*threadCount);
int workingIndex = threshold - 1;
var sync = new object();
Exception exception = null;
int completedCount = 0;
WaitCallback worker = index =>
{
try
{
// no need for lock - disjoint processing
for(var i = (int) index; i < threshold; i += threadCount)
{
results[i] = func(input[i]);
}
// joint processing
int j;
while((j = Interlocked.Increment(ref workingIndex)) < input.Length)
{
results[j] = func(input[j]);
}
var r = Interlocked.Increment(ref completedCount);
// perf: only the terminating thread actually acquires a lock.
if (r == threadCount && (int)index != 0)
{
lock (sync) Monitor.Pulse(sync);
}
}
catch (Exception ex)
{
exception = ex;
lock (sync) Monitor.Pulse(sync);
}
};
for (int i = 1; i < threadCount; i++)
{
ThreadPool.QueueUserWorkItem(worker, i);
}
worker((object) 0); // perf: recycle current thread
// waiting until completion or failure
while(completedCount < threadCount && exception == null)
{
// CAUTION: limit on wait time is needed because if threads
// have terminated
// - AFTER the test of the 'while' loop, and
// - BEFORE the inner 'lock'
// then, there is no one left to call for 'Pulse'.
lock (sync) Monitor.Wait(sync, 10.Milliseconds());
}
if(exception != null)
{
throw exception;
}
return results;
}
}
}
C#,
algorithm,
dotnet,
multithreading in
.NET,
Algorithms,
Software Engineering 