I tried to remove problem with buffer overflow – just not to attach buffer to stream – let it grow.
I have also removed busy waiting and use monitor to synchronization.
Order in which blocks are read and written is set up using critical section with r object.
Degree of parallelism (number of threads) is contained in variable tn.
In some spare time I will try to add some syntactic sugar from .NET 3.5 and new threading functionalities from Microsoft Parallel Extensions.
Here is simple solution:
class Program
{
static private int readPos = 0;
static private int writePos = 0;
static private object r = new object();
static private object o = new object();
static private void Compress(BinaryWriter writer, Stream strmIn)
{
using (MemoryStream ms = new MemoryStream())
{
byte[] inputBuffer = new byte[bufferSize];
int bytesRead;
do
{
int actual;
lock (r)
{
bytesRead = strmIn.Read(inputBuffer, 0, inputBuffer.Length);
actual = readPos++;
}
if (bytesRead != 0)
{
// new instance because we need clean object to compress data
// without garbage from previous data compression (iteration)
using (DeflateStream ds = new DeflateStream(ms, CompressionMode.Compress, true))
{
ds.Write(inputBuffer, 0, bytesRead);
}
// enter monitor
lock (o)
{
// check order
while (actual != writePos)
Monitor.Wait(o);
// compressed data length - reason why we need BinaryWriter (not only Stream)
writer.Write((int)ms.Position);
// Position not Size because buffer can be larger than data stored inside
writer.Write(ms.GetBuffer(), 0, (int)ms.Position);
++writePos;
//wake up waiting
Monitor.PulseAll(o);
}
// not to expand buffer in next iteration
ms.Position = 0;
}
} while (bytesRead != 0);
}
}
const string inputFilename = "file.in";
const string outputFilename = "file_compressed.out";
const int bufferSize = 64 * 1024;
static int tn = 2;
static void Main(string[] args)
{
if (args.Length >= 1)
tn = int.Parse(args[0]);
Stopwatch sw = new Stopwatch();
sw.Start();
using (Stream strmIn = File.OpenRead(inputFilename))
{
using (Stream strmOut = File.Create(outputFilename))
{
using (BinaryWriter writer = new BinaryWriter(strmOut))
{
List ts = new List();
for (int i = 0; i < tn; ++i)
ts.Add(new Thread(delegate() { Compress(writer, strmIn); }));
foreach (Thread t in ts)
t.Start();
foreach (Thread t in ts)
t.Join();
// to mark data end explicitly
writer.Write((int)0);
}
}
}
Console.WriteLine(sw.ElapsedMilliseconds);
}
}
Brak komentarzy:
Prześlij komentarz