I tried to remove problem with buffer overflow – just not to attach buffer to stream – let it grow.
I have also removed busy waiting and use monitor to synchronization.
Order in which blocks are read and written is set up using critical section with r object.
Degree of parallelism (number of threads) is contained in variable tn.
In some spare time I will try to add some syntactic sugar from .NET 3.5 and new threading functionalities from Microsoft Parallel Extensions.
Here is simple solution:
class Program
static private int readPos = 0;
static private int writePos = 0;
static private object r = new object();
static private object o = new object();
static private void Compress(BinaryWriter writer, Stream strmIn)
using (MemoryStream ms = new MemoryStream())
byte[] inputBuffer = new byte[bufferSize];
int bytesRead;
int actual;
lock (r)
bytesRead = strmIn.Read(inputBuffer, 0, inputBuffer.Length);
actual = readPos++;
if (bytesRead != 0)
// new instance because we need clean object to compress data
// without garbage from previous data compression (iteration)
using (DeflateStream ds = new DeflateStream(ms, CompressionMode.Compress, true))
ds.Write(inputBuffer, 0, bytesRead);
// enter monitor
lock (o)
// check order
while (actual != writePos)
// compressed data length - reason why we need BinaryWriter (not only Stream)
// Position not Size because buffer can be larger than data stored inside
writer.Write(ms.GetBuffer(), 0, (int)ms.Position);
//wake up waiting
// not to expand buffer in next iteration
ms.Position = 0;
} while (bytesRead != 0);
const string inputFilename = "file.in";
const string outputFilename = "file_compressed.out";
const int bufferSize = 64 * 1024;
static int tn = 2;
static void Main(string[] args)
if (args.Length >= 1)
tn = int.Parse(args[0]);
Stopwatch sw = new Stopwatch();
using (Stream strmIn = File.OpenRead(inputFilename))
using (Stream strmOut = File.Create(outputFilename))
using (BinaryWriter writer = new BinaryWriter(strmOut))
List ts = new List();
for (int i = 0; i < tn; ++i)
ts.Add(new Thread(delegate() { Compress(writer, strmIn); }));
foreach (Thread t in ts)
foreach (Thread t in ts)
// to mark data end explicitly
Brak komentarzy:
Prześlij komentarz