środa, 18 lutego 2009

Producers-Consumers (ordered, bounded buffer) in C#

Here is new version of file compression in multi-threaded manner. Presented OrderedItemsPool class within ThreadingColaboration namespce uses Producers-Consumers pattern in relationship between threads. Elements with data are returned in FIFO order. Each of them is assigned a ticket with number to synchronize further processing. Empty elements are stored in FILO manner - it might support cache locality.
Presented class supports bounded or unbounded (i.e. number of items created is not constrained) buffer pattern (constructor with parameter and prameterless constructor).
Another problem is connected to end of processing. In presented code when last element is put into queue, producer shall invoke EnqueueEndOfData method (when there are multiple producers, method shall be invoked after all producers finishes). After EndOfData mark is set, no more data can be post into queue (otherwise exception will be raised). Additionally to inform consumer(s) exception is thrown when consumer is trying to get data that will not appear.


namespace ThreadingColaboration
{
public class EndOfDataException : Exception
{
}

public class OrderedItemsPool where T : new()
{

private int itemsCreated_ = 0;
private int lastItemId_ = -1;
private int itemsMaxNumber_;
private Queue fullQueue_ = new Queue();
private Stack emptyStack_ = new Stack();
private bool endOfData_ = false;

// number of buffers not constrained
public OrderedItemsPool()
{
itemsMaxNumber_ = int.MaxValue;
}

public OrderedItemsPool(int itemsMaxNumber)
{
itemsMaxNumber_ = itemsMaxNumber;
}

public void EnqueueData(T buf)
{
// object cannot be used for data enqueuing after marking endOfData_
if (endOfData_)
throw new EndOfDataException();

lock (fullQueue_)
{
fullQueue_.Enqueue(buf);
Monitor.Pulse(fullQueue_);
}
}

public void EnqueueEndOfData()
{
lock (fullQueue_)
{
endOfData_ = true;
Monitor.PulseAll(fullQueue_);
}
}

public int DequeueData(out T buf)
{
int res;
lock (fullQueue_)
{
while (fullQueue_.Count == 0)
{
// if no more data - exception
if (endOfData_)
throw new EndOfDataException();
Monitor.Wait(fullQueue_);
}
buf = fullQueue_.Dequeue();
res = ++lastItemId_;
}
return res;
}

public void EnqueueEmpty(T buf)
{
lock (emptyStack_)
{
emptyStack_.Push(buf);
Monitor.Pulse(emptyStack_);
}
}

public T DequeueEmpty()
{
T res;

lock (emptyStack_)
{
lock (emptyStack_)
{
if (emptyStack_.Count == 0 && itemsCreated_ < itemsMaxNumber_)
{
res = new T();
++itemsCreated_;
}
else
{
while (emptyStack_.Count == 0)
Monitor.Wait(emptyStack_);
res = emptyStack_.Pop();
}
}

return res;
}
}
}


Here is sample program that uses above class to compress file using multiplethreads.
Solution uses elements from .NET 3.5 and Parallel Extensions.
No thread is created explicitly - Task class is used. Here we have one task that reads from file, multiple tasks that obatains uncopressed data and produces compressed. Last task saves data to output file. Number of task that compresses data is equal to number of data block that shall be read from file. It is one of the properties of tasks - it is light weight object (comparing to thread). Creation of one thread is about 1MB (more info in video).


class Program
{
static private ThreadingColaboration.OrderedItemsPool inputBuffers;
static private ThreadingColaboration.OrderedItemsPool outputBuffers;

static private int writePos = 0;
static private object o = new object();

static int bufferSize = 64 * 1024;

class BuffType
{
public byte[] buf;

public BuffType()
{
buf = new byte[bufferSize];
}
}

static public void Reader(Stream strmIn)
{
int bytesRead;
do
{
BuffType res = inputBuffers.DequeueEmpty();

bytesRead = strmIn.Read(res.buf, 0, res.buf.Length);

// last buffer might be smaller
if (bytesRead != res.buf.Length)
Array.Resize(ref res.buf, bytesRead);

inputBuffers.EnqueueData(res);

} while (bytesRead != 0);

inputBuffers.EnqueueEndOfData();
}

static public void Compress()
{
BuffType inputBuffer;
int actual;

try
{
actual = inputBuffers.DequeueData(out inputBuffer);

if (inputBuffer.buf.Length != 0)
{
MemoryStream ms = outputBuffers.DequeueEmpty();

// new instance because we need clean object to compress data
// without garbage from previous data compression (iteration)
using (DeflateStream ds = new DeflateStream(ms, CompressionMode.Compress, true))
{
ds.Write(inputBuffer.buf, 0, inputBuffer.buf.Length);
}
inputBuffers.EnqueueEmpty(inputBuffer);
// enter monitor
lock (o)
{
// check order
while (actual != writePos) Monitor.Wait(o);

outputBuffers.EnqueueData(ms);

++writePos;

//wake up waiting
Monitor.PulseAll(o);
}
}
}
catch (ThreadingColaboration.EndOfDataException)
{
}
}

static public void Writer(BinaryWriter writer)
{
try
{
while (true)
{
MemoryStream ms;
int bufNo = outputBuffers.DequeueData(out ms);

// compressed data length - reason why we need BinaryWriter (not only Stream)
writer.Write((int)ms.Position);
// Position not Size because buffer can be larger than data stored inside
writer.Write(ms.GetBuffer(), 0, (int)ms.Position);

ms.Position = 0;
outputBuffers.EnqueueEmpty(ms);
}
}
catch (ThreadingColaboration.EndOfDataException)
{
}
}

static void Main(string[] args)
{
int bufferNo = 8;
string inputFilename = "uncompressed.in";
string outputFilename = "compressed.out";

inputBuffers = new ThreadingColaboration.OrderedItemsPool(bufferNo);
outputBuffers = new ThreadingColaboration.OrderedItemsPool();


using (Stream strmIn = File.OpenRead(inputFilename))
{
Task.Create(o => { Reader(strmIn); });

using (Stream strmOut = File.Create(outputFilename))
{
using (BinaryWriter writer = new BinaryWriter(strmOut))
{
Task tw = Task.Create(o => { Writer(writer); });

List ts = new List();
FileInfo fi = new FileInfo(inputFilename);
int tn = (int)(fi.Length / bufferSize) + 1;
for (int i = 0; i < tn; ++i)
ts.Add(Task.Create(o => { Compress(); }));
// we have to wait till all task finishes
Task.WaitAll(ts.ToArray());
// and then mark endOfData_ for writer thread
outputBuffers.EnqueueEndOfData();

// wait till writer thread finishes
tw.Wait();
}
}
}
}
}

Brak komentarzy:

Prześlij komentarz