wtorek, 24 lutego 2009

lambdas + threading = problem

Lambdas introduced in C# 3.0 are great, but sometimes it might be tempting to use them to fit prototype of method invoked in thread to Thread class constructor argument e.g.:

static void f(int i)
{
Console.WriteLine(i);
}

static void Main(string[] args)
{
for (int i = 0; i < 10; ++i)
{
new Thread(() => f(i)).Start();
}
...
}

Interesting results - isn't it? No copy - no good!
For one argument it can be written

static void f(int i)
{
Console.WriteLine(i);
}

static void Main(string[] args)
{
for (int i = 0; i < 10; ++i)
{
new Thread(a => f((int)a)).Start(i);
}
...
}

niedziela, 22 lutego 2009

Producers-Consumers (ordered, bounded buffer) in C# - reloaded

Well, before using new possibilities it is better to recognize how they really work. Parallel Extensions introduces Task class and as name states it shall be used in "bag of tasks" pattern - it shall not be important how many of them works at the moment. Unfortunately, in last code example, solution follows was not only "bag of tasks" pattern but also "pipeline" pattern. In that case it is important how many of the tasks are working at the moment. In each of the pipeline stages at least one thread shall work. This might not be true when using Task objects for all of them. On single core machine only one or two of tasks might started to work (e.g. reader and one compressor, but no writer) and everything hangs.

Another place that has changed is handling of queue with data items (full queue). Solution has been divided into two classes - first UnorderedItemsPool that behaves like in previous post (multiple consumers, FIFO order) - order in which data is obtained by the consumers does not matter. Id of data is not stored when data is post to the queue. Id (ticket) is generated during data retrieval by the consumer.
Second class - OrderedItemsPool - allows consumer to obtain data in proper order (might be different from producers posting order). Each element posted to the queue has id (ticket - unique and sequential number) assigned by producer. Internally SortedList class was used to imitate priority queue.


namespace ThreadingColaboration
{
public class EndOfDataException : Exception
{
}

public abstract class ItemsPool where T : new()
{
private int itemsCreated_ = 0;
private int itemsMaxNumber_;
// stack for better cache locality?
private Stack emptyStack_ = new Stack();

protected int lastItemIdDequeued_ = -1;
protected object fullQueueLock_ = new object();
protected bool endOfData_ = false;

// number of buffers not constrained
public ItemsPool()
{
itemsMaxNumber_ = int.MaxValue;
}

public ItemsPool(int itemsMaxNumber)
{
itemsMaxNumber_ = itemsMaxNumber;
}

public void EnqueueEmpty(T buf)
{
lock (emptyStack_)
{
emptyStack_.Push(buf);
Monitor.Pulse(emptyStack_);
}
}

public T DequeueEmpty()
{
T res;

lock (emptyStack_)
{
if (emptyStack_.Count == 0 && itemsCreated_ < itemsMaxNumber_)
{
res = new T();
++itemsCreated_;
}
else
{
while (emptyStack_.Count == 0)
Monitor.Wait(emptyStack_);
res = emptyStack_.Pop();
}
}

return res;
}

public void EnqueueEndOfData()
{
lock (fullQueueLock_)
{
endOfData_ = true;
Monitor.PulseAll(fullQueueLock_);
}
}

public abstract void EnqueueData(int itemId, T buf);

public abstract int DequeueData(out T buf);
}

public class UnorderedItemsPool : ItemsPool where T : new()
{
private Queue fullQueue_ = new Queue();

public UnorderedItemsPool()
{
}

public UnorderedItemsPool(int itemsMaxNumber)
: base(itemsMaxNumber)
{
}

public override void EnqueueData(int itemId, T buf)
{
// object cannot be used for data enqueuing after marking endOfData_
if (endOfData_)
throw new EndOfDataException();

lock (fullQueueLock_)
{
fullQueue_.Enqueue(buf);
Monitor.Pulse(fullQueueLock_);
}
}

public override int DequeueData(out T buf)
{
int res;
lock (fullQueueLock_)
{
while (fullQueue_.Count == 0)
{
// no more data exception
if (endOfData_)
throw new EndOfDataException();
Monitor.Wait(fullQueueLock_);
}
buf = fullQueue_.Dequeue();
res = ++lastItemIdDequeued_;
}
return res;
}
}

public class OrderedItemsPool : ItemsPool where T : new()
{
private SortedList fullPriorQueue_ = new SortedList();

public OrderedItemsPool()
{
}

public OrderedItemsPool(int itemsMaxNumber)
: base(itemsMaxNumber)
{
}

public override void EnqueueData(int itemId, T buf)
{
// object cannot be used for data enqueuing after marking endOfData_
if (endOfData_)
throw new EndOfDataException();

lock (fullQueueLock_)
{
fullPriorQueue_.Add(itemId, buf);
Monitor.PulseAll(fullQueueLock_);
}
}

public override int DequeueData(out T buf)
{
int res;
lock (fullQueueLock_)
{
while ((fullPriorQueue_.Count == 0) || (fullPriorQueue_.First().Key != lastItemIdDequeued_ + 1))
{
// no more data exception
if ((fullPriorQueue_.Count == 0) && endOfData_)
throw new EndOfDataException();

Monitor.Wait(fullQueueLock_);
}
buf = fullPriorQueue_.First().Value;
Debug.Assert(fullPriorQueue_.First().Key == lastItemIdDequeued_ + 1);
fullPriorQueue_.RemoveAt(0);
res = ++lastItemIdDequeued_;
Monitor.PulseAll(fullQueueLock_);
}
return res;
}
}
}


Below we can see that Thread class was used in place of Task for reader and writer.
Thanks to new pool buffers we have simplified code for Compress tasks.


class Program
{
static int bufferSize = 64 * 1024;

public class BuffType
{
public byte[] buf;

public BuffType()
{
buf = new byte[bufferSize];
}
}

static public void Reader(Stream strmIn, ThreadingColaboration.ItemsPool inputBuffers)
{
int bytesRead;
do
{
BuffType res = inputBuffers.DequeueEmpty();

bytesRead = strmIn.Read(res.buf, 0, res.buf.Length);

// last buffer might be smaller
if (bytesRead != res.buf.Length)
Array.Resize(ref res.buf, bytesRead);
// itemId does not matter for unordered input buffer
inputBuffers.EnqueueData(0, res);
} while (bytesRead != 0);

inputBuffers.EnqueueEndOfData();
}

static public void Compress(
ThreadingColaboration.ItemsPool inputBuffers,
ThreadingColaboration.ItemsPool outputBuffers)
{
BuffType inputBuffer;
int actual;

try
{
actual = inputBuffers.DequeueData(out inputBuffer);

if (inputBuffer.buf.Length != 0)
{
MemoryStream ms = outputBuffers.DequeueEmpty();

// new instance because we need clean object to compress data
// without garbage from previous data compression (iteration)
using (DeflateStream ds = new DeflateStream(ms, CompressionMode.Compress, true))
{
ds.Write(inputBuffer.buf, 0, inputBuffer.buf.Length);
}
inputBuffers.EnqueueEmpty(inputBuffer);
// put buffer into the queue without waiting
outputBuffers.EnqueueData(actual, ms);
}
}
catch (ThreadingColaboration.EndOfDataException)
{
}
}

static public void Writer(BinaryWriter writer, ThreadingColaboration.ItemsPool outputBuffers)
{
try
{
while (true)
{
MemoryStream ms;
int bufNo = outputBuffers.DequeueData(out ms);
// compressed data length - reason why we need BinaryWriter (not only Stream)
writer.Write((int)ms.Position);
// Position not Size because buffer can be larger than data stored inside
writer.Write(ms.GetBuffer(), 0, (int)ms.Position);
ms.Position = 0;
outputBuffers.EnqueueEmpty(ms);
}
}
catch (ThreadingColaboration.EndOfDataException)
{
}
}

static void Main(string[] args)
{
int bufferNo = 8;
string inputFilename = "uncompressed.in";
string outputFilename = "compressed.out";

ThreadingColaboration.ItemsPool inputBuffers =
new ThreadingColaboration.UnorderedItemsPool(bufferNo);
ThreadingColaboration.ItemsPool outputBuffers =
new ThreadingColaboration.OrderedItemsPool();

using (Stream strmIn = File.OpenRead(inputFilename))
{
new Thread(o => { Reader(strmIn, inputBuffers); }).Start();

using (Stream strmOut = File.Create(outputFilename))
{
using (BinaryWriter writer = new BinaryWriter(strmOut))
{
Thread tw = new Thread(o => { Writer(writer, outputBuffers); });
tw.Start();

List ts = new List();
FileInfo fi = new FileInfo(inputFilename);
int tn = (int)(fi.Length / bufferSize) + 1;
for (int i = 0; i < tn; ++i)
ts.Add(Task.Create(o => { Compress(inputBuffers, outputBuffers); }));
// we have to wait till all task finishes
Task.WaitAll(ts.ToArray());
// and then mark EndOfData for writer thread
outputBuffers.EnqueueEndOfData();

// wait till writer thread finishes
tw.Join();
}
}
}
}
}

środa, 18 lutego 2009

Producers-Consumers (ordered, bounded buffer) in C#

Here is new version of file compression in multi-threaded manner. Presented OrderedItemsPool class within ThreadingColaboration namespce uses Producers-Consumers pattern in relationship between threads. Elements with data are returned in FIFO order. Each of them is assigned a ticket with number to synchronize further processing. Empty elements are stored in FILO manner - it might support cache locality.
Presented class supports bounded or unbounded (i.e. number of items created is not constrained) buffer pattern (constructor with parameter and prameterless constructor).
Another problem is connected to end of processing. In presented code when last element is put into queue, producer shall invoke EnqueueEndOfData method (when there are multiple producers, method shall be invoked after all producers finishes). After EndOfData mark is set, no more data can be post into queue (otherwise exception will be raised). Additionally to inform consumer(s) exception is thrown when consumer is trying to get data that will not appear.


namespace ThreadingColaboration
{
public class EndOfDataException : Exception
{
}

public class OrderedItemsPool where T : new()
{

private int itemsCreated_ = 0;
private int lastItemId_ = -1;
private int itemsMaxNumber_;
private Queue fullQueue_ = new Queue();
private Stack emptyStack_ = new Stack();
private bool endOfData_ = false;

// number of buffers not constrained
public OrderedItemsPool()
{
itemsMaxNumber_ = int.MaxValue;
}

public OrderedItemsPool(int itemsMaxNumber)
{
itemsMaxNumber_ = itemsMaxNumber;
}

public void EnqueueData(T buf)
{
// object cannot be used for data enqueuing after marking endOfData_
if (endOfData_)
throw new EndOfDataException();

lock (fullQueue_)
{
fullQueue_.Enqueue(buf);
Monitor.Pulse(fullQueue_);
}
}

public void EnqueueEndOfData()
{
lock (fullQueue_)
{
endOfData_ = true;
Monitor.PulseAll(fullQueue_);
}
}

public int DequeueData(out T buf)
{
int res;
lock (fullQueue_)
{
while (fullQueue_.Count == 0)
{
// if no more data - exception
if (endOfData_)
throw new EndOfDataException();
Monitor.Wait(fullQueue_);
}
buf = fullQueue_.Dequeue();
res = ++lastItemId_;
}
return res;
}

public void EnqueueEmpty(T buf)
{
lock (emptyStack_)
{
emptyStack_.Push(buf);
Monitor.Pulse(emptyStack_);
}
}

public T DequeueEmpty()
{
T res;

lock (emptyStack_)
{
lock (emptyStack_)
{
if (emptyStack_.Count == 0 && itemsCreated_ < itemsMaxNumber_)
{
res = new T();
++itemsCreated_;
}
else
{
while (emptyStack_.Count == 0)
Monitor.Wait(emptyStack_);
res = emptyStack_.Pop();
}
}

return res;
}
}
}


Here is sample program that uses above class to compress file using multiplethreads.
Solution uses elements from .NET 3.5 and Parallel Extensions.
No thread is created explicitly - Task class is used. Here we have one task that reads from file, multiple tasks that obatains uncopressed data and produces compressed. Last task saves data to output file. Number of task that compresses data is equal to number of data block that shall be read from file. It is one of the properties of tasks - it is light weight object (comparing to thread). Creation of one thread is about 1MB (more info in video).


class Program
{
static private ThreadingColaboration.OrderedItemsPool inputBuffers;
static private ThreadingColaboration.OrderedItemsPool outputBuffers;

static private int writePos = 0;
static private object o = new object();

static int bufferSize = 64 * 1024;

class BuffType
{
public byte[] buf;

public BuffType()
{
buf = new byte[bufferSize];
}
}

static public void Reader(Stream strmIn)
{
int bytesRead;
do
{
BuffType res = inputBuffers.DequeueEmpty();

bytesRead = strmIn.Read(res.buf, 0, res.buf.Length);

// last buffer might be smaller
if (bytesRead != res.buf.Length)
Array.Resize(ref res.buf, bytesRead);

inputBuffers.EnqueueData(res);

} while (bytesRead != 0);

inputBuffers.EnqueueEndOfData();
}

static public void Compress()
{
BuffType inputBuffer;
int actual;

try
{
actual = inputBuffers.DequeueData(out inputBuffer);

if (inputBuffer.buf.Length != 0)
{
MemoryStream ms = outputBuffers.DequeueEmpty();

// new instance because we need clean object to compress data
// without garbage from previous data compression (iteration)
using (DeflateStream ds = new DeflateStream(ms, CompressionMode.Compress, true))
{
ds.Write(inputBuffer.buf, 0, inputBuffer.buf.Length);
}
inputBuffers.EnqueueEmpty(inputBuffer);
// enter monitor
lock (o)
{
// check order
while (actual != writePos) Monitor.Wait(o);

outputBuffers.EnqueueData(ms);

++writePos;

//wake up waiting
Monitor.PulseAll(o);
}
}
}
catch (ThreadingColaboration.EndOfDataException)
{
}
}

static public void Writer(BinaryWriter writer)
{
try
{
while (true)
{
MemoryStream ms;
int bufNo = outputBuffers.DequeueData(out ms);

// compressed data length - reason why we need BinaryWriter (not only Stream)
writer.Write((int)ms.Position);
// Position not Size because buffer can be larger than data stored inside
writer.Write(ms.GetBuffer(), 0, (int)ms.Position);

ms.Position = 0;
outputBuffers.EnqueueEmpty(ms);
}
}
catch (ThreadingColaboration.EndOfDataException)
{
}
}

static void Main(string[] args)
{
int bufferNo = 8;
string inputFilename = "uncompressed.in";
string outputFilename = "compressed.out";

inputBuffers = new ThreadingColaboration.OrderedItemsPool(bufferNo);
outputBuffers = new ThreadingColaboration.OrderedItemsPool();


using (Stream strmIn = File.OpenRead(inputFilename))
{
Task.Create(o => { Reader(strmIn); });

using (Stream strmOut = File.Create(outputFilename))
{
using (BinaryWriter writer = new BinaryWriter(strmOut))
{
Task tw = Task.Create(o => { Writer(writer); });

List ts = new List();
FileInfo fi = new FileInfo(inputFilename);
int tn = (int)(fi.Length / bufferSize) + 1;
for (int i = 0; i < tn; ++i)
ts.Add(Task.Create(o => { Compress(); }));
// we have to wait till all task finishes
Task.WaitAll(ts.ToArray());
// and then mark endOfData_ for writer thread
outputBuffers.EnqueueEndOfData();

// wait till writer thread finishes
tw.Wait();
}
}
}
}
}

sobota, 14 lutego 2009

Multi-core file compression in .NET

Recently I have came across article on Inform It called Compressing a Very Large File and its parallel version Making It Faster both written by Jim Mischel. Some might say that compression by deflate algorithm is nothing special, but parallelism is what I am really interested in. I have also observed that in some situations code does not work properly. What I mean is to compress already compressed files. In that case such weak algorithm as deflate will produce files larger than original. The case is quite common - almost every document format has internal compression nowadays. The code presented in articles hangs when trying to compress already compressed file - buffer overflow occurs (as author said it is not a production quality and when used to plain text file, like xml, it works properly).
I tried to remove problem with buffer overflow – just not to attach buffer to stream – let it grow.
I have also removed busy waiting and use monitor to synchronization.
Order in which blocks are read and written is set up using critical section with r object.
Degree of parallelism (number of threads) is contained in variable tn.
In some spare time I will try to add some syntactic sugar from .NET 3.5 and new threading functionalities from Microsoft Parallel Extensions.

Here is simple solution:


class Program
{
static private int readPos = 0;
static private int writePos = 0;
static private object r = new object();
static private object o = new object();
static private void Compress(BinaryWriter writer, Stream strmIn)
{
using (MemoryStream ms = new MemoryStream())
{
byte[] inputBuffer = new byte[bufferSize];
int bytesRead;
do
{
int actual;
lock (r)
{
bytesRead = strmIn.Read(inputBuffer, 0, inputBuffer.Length);
actual = readPos++;
}
if (bytesRead != 0)
{
// new instance because we need clean object to compress data
// without garbage from previous data compression (iteration)
using (DeflateStream ds = new DeflateStream(ms, CompressionMode.Compress, true))
{
ds.Write(inputBuffer, 0, bytesRead);
}
// enter monitor
lock (o)
{
// check order
while (actual != writePos)
Monitor.Wait(o);
// compressed data length - reason why we need BinaryWriter (not only Stream)
writer.Write((int)ms.Position);
// Position not Size because buffer can be larger than data stored inside
writer.Write(ms.GetBuffer(), 0, (int)ms.Position);
++writePos;
//wake up waiting
Monitor.PulseAll(o);
}
// not to expand buffer in next iteration
ms.Position = 0;
}
} while (bytesRead != 0);
}
}

const string inputFilename = "file.in";
const string outputFilename = "file_compressed.out";
const int bufferSize = 64 * 1024;
static int tn = 2;

static void Main(string[] args)
{
if (args.Length >= 1)
tn = int.Parse(args[0]);
Stopwatch sw = new Stopwatch();
sw.Start();
using (Stream strmIn = File.OpenRead(inputFilename))
{
using (Stream strmOut = File.Create(outputFilename))
{
using (BinaryWriter writer = new BinaryWriter(strmOut))
{
List ts = new List();
for (int i = 0; i < tn; ++i)
ts.Add(new Thread(delegate() { Compress(writer, strmIn); }));
foreach (Thread t in ts)
t.Start();
foreach (Thread t in ts)
t.Join();

// to mark data end explicitly
writer.Write((int)0);
}
}
}
Console.WriteLine(sw.ElapsedMilliseconds);
}
}

piątek, 13 lutego 2009

Hello World!

Welcome to my new blog site.

I hope that the journey will be long lasting.