niedziela, 22 lutego 2009

Producers-Consumers (ordered, bounded buffer) in C# - reloaded

Well, before using new possibilities it is better to recognize how they really work. Parallel Extensions introduces Task class and as name states it shall be used in "bag of tasks" pattern - it shall not be important how many of them works at the moment. Unfortunately, in last code example, solution follows was not only "bag of tasks" pattern but also "pipeline" pattern. In that case it is important how many of the tasks are working at the moment. In each of the pipeline stages at least one thread shall work. This might not be true when using Task objects for all of them. On single core machine only one or two of tasks might started to work (e.g. reader and one compressor, but no writer) and everything hangs.

Another place that has changed is handling of queue with data items (full queue). Solution has been divided into two classes - first UnorderedItemsPool that behaves like in previous post (multiple consumers, FIFO order) - order in which data is obtained by the consumers does not matter. Id of data is not stored when data is post to the queue. Id (ticket) is generated during data retrieval by the consumer.
Second class - OrderedItemsPool - allows consumer to obtain data in proper order (might be different from producers posting order). Each element posted to the queue has id (ticket - unique and sequential number) assigned by producer. Internally SortedList class was used to imitate priority queue.


namespace ThreadingColaboration
{
public class EndOfDataException : Exception
{
}

public abstract class ItemsPool where T : new()
{
private int itemsCreated_ = 0;
private int itemsMaxNumber_;
// stack for better cache locality?
private Stack emptyStack_ = new Stack();

protected int lastItemIdDequeued_ = -1;
protected object fullQueueLock_ = new object();
protected bool endOfData_ = false;

// number of buffers not constrained
public ItemsPool()
{
itemsMaxNumber_ = int.MaxValue;
}

public ItemsPool(int itemsMaxNumber)
{
itemsMaxNumber_ = itemsMaxNumber;
}

public void EnqueueEmpty(T buf)
{
lock (emptyStack_)
{
emptyStack_.Push(buf);
Monitor.Pulse(emptyStack_);
}
}

public T DequeueEmpty()
{
T res;

lock (emptyStack_)
{
if (emptyStack_.Count == 0 && itemsCreated_ < itemsMaxNumber_)
{
res = new T();
++itemsCreated_;
}
else
{
while (emptyStack_.Count == 0)
Monitor.Wait(emptyStack_);
res = emptyStack_.Pop();
}
}

return res;
}

public void EnqueueEndOfData()
{
lock (fullQueueLock_)
{
endOfData_ = true;
Monitor.PulseAll(fullQueueLock_);
}
}

public abstract void EnqueueData(int itemId, T buf);

public abstract int DequeueData(out T buf);
}

public class UnorderedItemsPool : ItemsPool where T : new()
{
private Queue fullQueue_ = new Queue();

public UnorderedItemsPool()
{
}

public UnorderedItemsPool(int itemsMaxNumber)
: base(itemsMaxNumber)
{
}

public override void EnqueueData(int itemId, T buf)
{
// object cannot be used for data enqueuing after marking endOfData_
if (endOfData_)
throw new EndOfDataException();

lock (fullQueueLock_)
{
fullQueue_.Enqueue(buf);
Monitor.Pulse(fullQueueLock_);
}
}

public override int DequeueData(out T buf)
{
int res;
lock (fullQueueLock_)
{
while (fullQueue_.Count == 0)
{
// no more data exception
if (endOfData_)
throw new EndOfDataException();
Monitor.Wait(fullQueueLock_);
}
buf = fullQueue_.Dequeue();
res = ++lastItemIdDequeued_;
}
return res;
}
}

public class OrderedItemsPool : ItemsPool where T : new()
{
private SortedList fullPriorQueue_ = new SortedList();

public OrderedItemsPool()
{
}

public OrderedItemsPool(int itemsMaxNumber)
: base(itemsMaxNumber)
{
}

public override void EnqueueData(int itemId, T buf)
{
// object cannot be used for data enqueuing after marking endOfData_
if (endOfData_)
throw new EndOfDataException();

lock (fullQueueLock_)
{
fullPriorQueue_.Add(itemId, buf);
Monitor.PulseAll(fullQueueLock_);
}
}

public override int DequeueData(out T buf)
{
int res;
lock (fullQueueLock_)
{
while ((fullPriorQueue_.Count == 0) || (fullPriorQueue_.First().Key != lastItemIdDequeued_ + 1))
{
// no more data exception
if ((fullPriorQueue_.Count == 0) && endOfData_)
throw new EndOfDataException();

Monitor.Wait(fullQueueLock_);
}
buf = fullPriorQueue_.First().Value;
Debug.Assert(fullPriorQueue_.First().Key == lastItemIdDequeued_ + 1);
fullPriorQueue_.RemoveAt(0);
res = ++lastItemIdDequeued_;
Monitor.PulseAll(fullQueueLock_);
}
return res;
}
}
}


Below we can see that Thread class was used in place of Task for reader and writer.
Thanks to new pool buffers we have simplified code for Compress tasks.


class Program
{
static int bufferSize = 64 * 1024;

public class BuffType
{
public byte[] buf;

public BuffType()
{
buf = new byte[bufferSize];
}
}

static public void Reader(Stream strmIn, ThreadingColaboration.ItemsPool inputBuffers)
{
int bytesRead;
do
{
BuffType res = inputBuffers.DequeueEmpty();

bytesRead = strmIn.Read(res.buf, 0, res.buf.Length);

// last buffer might be smaller
if (bytesRead != res.buf.Length)
Array.Resize(ref res.buf, bytesRead);
// itemId does not matter for unordered input buffer
inputBuffers.EnqueueData(0, res);
} while (bytesRead != 0);

inputBuffers.EnqueueEndOfData();
}

static public void Compress(
ThreadingColaboration.ItemsPool inputBuffers,
ThreadingColaboration.ItemsPool outputBuffers)
{
BuffType inputBuffer;
int actual;

try
{
actual = inputBuffers.DequeueData(out inputBuffer);

if (inputBuffer.buf.Length != 0)
{
MemoryStream ms = outputBuffers.DequeueEmpty();

// new instance because we need clean object to compress data
// without garbage from previous data compression (iteration)
using (DeflateStream ds = new DeflateStream(ms, CompressionMode.Compress, true))
{
ds.Write(inputBuffer.buf, 0, inputBuffer.buf.Length);
}
inputBuffers.EnqueueEmpty(inputBuffer);
// put buffer into the queue without waiting
outputBuffers.EnqueueData(actual, ms);
}
}
catch (ThreadingColaboration.EndOfDataException)
{
}
}

static public void Writer(BinaryWriter writer, ThreadingColaboration.ItemsPool outputBuffers)
{
try
{
while (true)
{
MemoryStream ms;
int bufNo = outputBuffers.DequeueData(out ms);
// compressed data length - reason why we need BinaryWriter (not only Stream)
writer.Write((int)ms.Position);
// Position not Size because buffer can be larger than data stored inside
writer.Write(ms.GetBuffer(), 0, (int)ms.Position);
ms.Position = 0;
outputBuffers.EnqueueEmpty(ms);
}
}
catch (ThreadingColaboration.EndOfDataException)
{
}
}

static void Main(string[] args)
{
int bufferNo = 8;
string inputFilename = "uncompressed.in";
string outputFilename = "compressed.out";

ThreadingColaboration.ItemsPool inputBuffers =
new ThreadingColaboration.UnorderedItemsPool(bufferNo);
ThreadingColaboration.ItemsPool outputBuffers =
new ThreadingColaboration.OrderedItemsPool();

using (Stream strmIn = File.OpenRead(inputFilename))
{
new Thread(o => { Reader(strmIn, inputBuffers); }).Start();

using (Stream strmOut = File.Create(outputFilename))
{
using (BinaryWriter writer = new BinaryWriter(strmOut))
{
Thread tw = new Thread(o => { Writer(writer, outputBuffers); });
tw.Start();

List ts = new List();
FileInfo fi = new FileInfo(inputFilename);
int tn = (int)(fi.Length / bufferSize) + 1;
for (int i = 0; i < tn; ++i)
ts.Add(Task.Create(o => { Compress(inputBuffers, outputBuffers); }));
// we have to wait till all task finishes
Task.WaitAll(ts.ToArray());
// and then mark EndOfData for writer thread
outputBuffers.EnqueueEndOfData();

// wait till writer thread finishes
tw.Join();
}
}
}
}
}

Brak komentarzy:

Prześlij komentarz