Another place that has changed is handling of queue with data items (full queue). Solution has been divided into two classes - first UnorderedItemsPool that behaves like in previous post (multiple consumers, FIFO order) - order in which data is obtained by the consumers does not matter. Id of data is not stored when data is post to the queue. Id (ticket) is generated during data retrieval by the consumer.
Second class - OrderedItemsPool - allows consumer to obtain data in proper order (might be different from producers posting order). Each element posted to the queue has id (ticket - unique and sequential number) assigned by producer. Internally SortedList class was used to imitate priority queue.
namespace ThreadingColaboration
{
public class EndOfDataException : Exception
{
}
public abstract class ItemsPoolwhere T : new()
{
private int itemsCreated_ = 0;
private int itemsMaxNumber_;
// stack for better cache locality?
private StackemptyStack_ = new Stack ();
protected int lastItemIdDequeued_ = -1;
protected object fullQueueLock_ = new object();
protected bool endOfData_ = false;
// number of buffers not constrained
public ItemsPool()
{
itemsMaxNumber_ = int.MaxValue;
}
public ItemsPool(int itemsMaxNumber)
{
itemsMaxNumber_ = itemsMaxNumber;
}
public void EnqueueEmpty(T buf)
{
lock (emptyStack_)
{
emptyStack_.Push(buf);
Monitor.Pulse(emptyStack_);
}
}
public T DequeueEmpty()
{
T res;
lock (emptyStack_)
{
if (emptyStack_.Count == 0 && itemsCreated_ < itemsMaxNumber_)
{
res = new T();
++itemsCreated_;
}
else
{
while (emptyStack_.Count == 0)
Monitor.Wait(emptyStack_);
res = emptyStack_.Pop();
}
}
return res;
}
public void EnqueueEndOfData()
{
lock (fullQueueLock_)
{
endOfData_ = true;
Monitor.PulseAll(fullQueueLock_);
}
}
public abstract void EnqueueData(int itemId, T buf);
public abstract int DequeueData(out T buf);
}
public class UnorderedItemsPool: ItemsPool where T : new()
{
private QueuefullQueue_ = new Queue ();
public UnorderedItemsPool()
{
}
public UnorderedItemsPool(int itemsMaxNumber)
: base(itemsMaxNumber)
{
}
public override void EnqueueData(int itemId, T buf)
{
// object cannot be used for data enqueuing after marking endOfData_
if (endOfData_)
throw new EndOfDataException();
lock (fullQueueLock_)
{
fullQueue_.Enqueue(buf);
Monitor.Pulse(fullQueueLock_);
}
}
public override int DequeueData(out T buf)
{
int res;
lock (fullQueueLock_)
{
while (fullQueue_.Count == 0)
{
// no more data exception
if (endOfData_)
throw new EndOfDataException();
Monitor.Wait(fullQueueLock_);
}
buf = fullQueue_.Dequeue();
res = ++lastItemIdDequeued_;
}
return res;
}
}
public class OrderedItemsPool: ItemsPool where T : new()
{
private SortedListfullPriorQueue_ = new SortedList ();
public OrderedItemsPool()
{
}
public OrderedItemsPool(int itemsMaxNumber)
: base(itemsMaxNumber)
{
}
public override void EnqueueData(int itemId, T buf)
{
// object cannot be used for data enqueuing after marking endOfData_
if (endOfData_)
throw new EndOfDataException();
lock (fullQueueLock_)
{
fullPriorQueue_.Add(itemId, buf);
Monitor.PulseAll(fullQueueLock_);
}
}
public override int DequeueData(out T buf)
{
int res;
lock (fullQueueLock_)
{
while ((fullPriorQueue_.Count == 0) || (fullPriorQueue_.First().Key != lastItemIdDequeued_ + 1))
{
// no more data exception
if ((fullPriorQueue_.Count == 0) && endOfData_)
throw new EndOfDataException();
Monitor.Wait(fullQueueLock_);
}
buf = fullPriorQueue_.First().Value;
Debug.Assert(fullPriorQueue_.First().Key == lastItemIdDequeued_ + 1);
fullPriorQueue_.RemoveAt(0);
res = ++lastItemIdDequeued_;
Monitor.PulseAll(fullQueueLock_);
}
return res;
}
}
}
Below we can see that Thread class was used in place of Task for reader and writer.
Thanks to new pool buffers we have simplified code for Compress tasks.
class Program
{
static int bufferSize = 64 * 1024;
public class BuffType
{
public byte[] buf;
public BuffType()
{
buf = new byte[bufferSize];
}
}
static public void Reader(Stream strmIn, ThreadingColaboration.ItemsPoolinputBuffers)
{
int bytesRead;
do
{
BuffType res = inputBuffers.DequeueEmpty();
bytesRead = strmIn.Read(res.buf, 0, res.buf.Length);
// last buffer might be smaller
if (bytesRead != res.buf.Length)
Array.Resize(ref res.buf, bytesRead);
// itemId does not matter for unordered input buffer
inputBuffers.EnqueueData(0, res);
} while (bytesRead != 0);
inputBuffers.EnqueueEndOfData();
}
static public void Compress(
ThreadingColaboration.ItemsPoolinputBuffers,
ThreadingColaboration.ItemsPooloutputBuffers)
{
BuffType inputBuffer;
int actual;
try
{
actual = inputBuffers.DequeueData(out inputBuffer);
if (inputBuffer.buf.Length != 0)
{
MemoryStream ms = outputBuffers.DequeueEmpty();
// new instance because we need clean object to compress data
// without garbage from previous data compression (iteration)
using (DeflateStream ds = new DeflateStream(ms, CompressionMode.Compress, true))
{
ds.Write(inputBuffer.buf, 0, inputBuffer.buf.Length);
}
inputBuffers.EnqueueEmpty(inputBuffer);
// put buffer into the queue without waiting
outputBuffers.EnqueueData(actual, ms);
}
}
catch (ThreadingColaboration.EndOfDataException)
{
}
}
static public void Writer(BinaryWriter writer, ThreadingColaboration.ItemsPooloutputBuffers)
{
try
{
while (true)
{
MemoryStream ms;
int bufNo = outputBuffers.DequeueData(out ms);
// compressed data length - reason why we need BinaryWriter (not only Stream)
writer.Write((int)ms.Position);
// Position not Size because buffer can be larger than data stored inside
writer.Write(ms.GetBuffer(), 0, (int)ms.Position);
ms.Position = 0;
outputBuffers.EnqueueEmpty(ms);
}
}
catch (ThreadingColaboration.EndOfDataException)
{
}
}
static void Main(string[] args)
{
int bufferNo = 8;
string inputFilename = "uncompressed.in";
string outputFilename = "compressed.out";
ThreadingColaboration.ItemsPoolinputBuffers =
new ThreadingColaboration.UnorderedItemsPool(bufferNo);
ThreadingColaboration.ItemsPooloutputBuffers =
new ThreadingColaboration.OrderedItemsPool();
using (Stream strmIn = File.OpenRead(inputFilename))
{
new Thread(o => { Reader(strmIn, inputBuffers); }).Start();
using (Stream strmOut = File.Create(outputFilename))
{
using (BinaryWriter writer = new BinaryWriter(strmOut))
{
Thread tw = new Thread(o => { Writer(writer, outputBuffers); });
tw.Start();
Listts = new List ();
FileInfo fi = new FileInfo(inputFilename);
int tn = (int)(fi.Length / bufferSize) + 1;
for (int i = 0; i < tn; ++i)
ts.Add(Task.Create(o => { Compress(inputBuffers, outputBuffers); }));
// we have to wait till all task finishes
Task.WaitAll(ts.ToArray());
// and then mark EndOfData for writer thread
outputBuffers.EnqueueEndOfData();
// wait till writer thread finishes
tw.Join();
}
}
}
}
}
Brak komentarzy:
Prześlij komentarz