sobota, 14 lutego 2009

Multi-core file compression in .NET

Recently I have came across article on Inform It called Compressing a Very Large File and its parallel version Making It Faster both written by Jim Mischel. Some might say that compression by deflate algorithm is nothing special, but parallelism is what I am really interested in. I have also observed that in some situations code does not work properly. What I mean is to compress already compressed files. In that case such weak algorithm as deflate will produce files larger than original. The case is quite common - almost every document format has internal compression nowadays. The code presented in articles hangs when trying to compress already compressed file - buffer overflow occurs (as author said it is not a production quality and when used to plain text file, like xml, it works properly).
I tried to remove problem with buffer overflow – just not to attach buffer to stream – let it grow.
I have also removed busy waiting and use monitor to synchronization.
Order in which blocks are read and written is set up using critical section with r object.
Degree of parallelism (number of threads) is contained in variable tn.
In some spare time I will try to add some syntactic sugar from .NET 3.5 and new threading functionalities from Microsoft Parallel Extensions.

Here is simple solution:


class Program
{
static private int readPos = 0;
static private int writePos = 0;
static private object r = new object();
static private object o = new object();
static private void Compress(BinaryWriter writer, Stream strmIn)
{
using (MemoryStream ms = new MemoryStream())
{
byte[] inputBuffer = new byte[bufferSize];
int bytesRead;
do
{
int actual;
lock (r)
{
bytesRead = strmIn.Read(inputBuffer, 0, inputBuffer.Length);
actual = readPos++;
}
if (bytesRead != 0)
{
// new instance because we need clean object to compress data
// without garbage from previous data compression (iteration)
using (DeflateStream ds = new DeflateStream(ms, CompressionMode.Compress, true))
{
ds.Write(inputBuffer, 0, bytesRead);
}
// enter monitor
lock (o)
{
// check order
while (actual != writePos)
Monitor.Wait(o);
// compressed data length - reason why we need BinaryWriter (not only Stream)
writer.Write((int)ms.Position);
// Position not Size because buffer can be larger than data stored inside
writer.Write(ms.GetBuffer(), 0, (int)ms.Position);
++writePos;
//wake up waiting
Monitor.PulseAll(o);
}
// not to expand buffer in next iteration
ms.Position = 0;
}
} while (bytesRead != 0);
}
}

const string inputFilename = "file.in";
const string outputFilename = "file_compressed.out";
const int bufferSize = 64 * 1024;
static int tn = 2;

static void Main(string[] args)
{
if (args.Length >= 1)
tn = int.Parse(args[0]);
Stopwatch sw = new Stopwatch();
sw.Start();
using (Stream strmIn = File.OpenRead(inputFilename))
{
using (Stream strmOut = File.Create(outputFilename))
{
using (BinaryWriter writer = new BinaryWriter(strmOut))
{
List ts = new List();
for (int i = 0; i < tn; ++i)
ts.Add(new Thread(delegate() { Compress(writer, strmIn); }));
foreach (Thread t in ts)
t.Start();
foreach (Thread t in ts)
t.Join();

// to mark data end explicitly
writer.Write((int)0);
}
}
}
Console.WriteLine(sw.ElapsedMilliseconds);
}
}

Brak komentarzy:

Prześlij komentarz