niedziela, 9 sierpnia 2009

Priority inversion interview

Interviews are great possibility to evalute one's own memory and cold blood during conversation. It is also good for remembering of some basic terms and problems.
Recentely I had to describe priority inversion problem. Basic stuff :) thread with lower priority is executed in place of higher priority thread. But why? Wait, ..., well, ..., shit, I do not remember.
Why Wikipedia is not connected to my brain - 3 threads, 2 competing for mutex, third executing, and so on (Mars Pathfinder problem, priority inheritance, priority ceiling, disabling interrupts).
Ok, but if I want to simulate such problem in Windows environment?
After quick search I found Priority Inversion and Windows NT Scheduler. I realized that:
1. real-time priority class shall be set for process - to disable kernel altering threads priorites,
2. example shall run on one core - in simple case of 3 threads,
3. on one core machine system will hang (real-time priority), therefore example can be run only on multi-core machine (but threads will use only one of the cores).
Example code for priority inversion:


class PrioriyInversion
{
static private object o = new object();

static void tf(object p)
{
string n = (string)p;
Console.WriteLine(p+" critical section needed");
lock (o)
{
Console.WriteLine(p+" critical section entered");
Thread.Sleep(5000);
Console.WriteLine(p+" after sleep");
}
Console.WriteLine(p+" critical section left");
}

static void tf2(object p)
{
string n = (string)p;
Console.WriteLine(p + " start");
for (int i = 0; i < 1000000; ++i)
for (int j = 0; j < 1000000; ++j)
;
Console.WriteLine(p + " stop");
}

static void Main(string[] args)
{
Console.ReadLine();

Thread t1 = new Thread(tf);
t1.Priority = ThreadPriority.BelowNormal;
Thread t2 = new Thread(tf);
t2.Priority = ThreadPriority.AboveNormal;
Thread t3 = new Thread(tf2);
t3.Priority = ThreadPriority.Normal;

t1.Start("t1");
Thread.Sleep(10);
t2.Start("t2");
t3.Start("t3");
}
}


Program has Console.ReadLine() at the beginning to let user change affinity to one of the cores only and set priority class of the process to real-time. If these conditions are not achieved, priority inversion will not appear.
Additionally to change affinity and priority Windows Task Manager can be used. But if you want to see threads inside process, Process Explorer from Sysinternals (now on Microsoft page) can be used.

środa, 8 lipca 2009

Domain Specific Language for WWW with Irony - Part 2

In last post I presented Irony usage for file download DSL. Library and console application was prepared and presented on CodeProject. This time I have added some GUI (WinForms) and multithreading to make application really useful. More in CodeProject article.

sobota, 6 czerwca 2009

Domain Specific Language for WWW with Irony

Recently I have published article at CodeProject. I was influenced by the idea of Domain Specific Languages for some specific tasks. In the article I have presented DSL used to automate some WWW operations (GET, POST, file download). To solve the problem I used Irony as DSL interpreter. More available here. More regarding great project Irony here.

sobota, 30 maja 2009

C# 4.0 (dynamic types, optional parameters, named arguments, covariants and contravariants handling)

New features of C# 4.o are presented in several places.
Ironically the most exposed is Doug Holland's at Intel's blog site :) - nice and very brief overview.
More info at Channel9 video provided by C# GoF :). The most striking facts are:
  1. dynamic typing is for better Office interaction,
  2. optional parameters and named arguments are evil, provided for VB developers to ease Office development in C#,
  3. covariance and contravariance are only interesting (good) changes in language, but it should have appeared in previous versions of C#.
Additionally more about covariance and contravariance is on Eric Lippert's blog and more about dynamic programming on Chris Burrows' blog.

czwartek, 21 maja 2009

volatile - what does it mean? (in C++, C, .NET and Java)

Well it is good felling that my interests are also Herb Sutter interests (in programming area of course:). In his article on Dr. Dobb's he presents differences between volatile meanings in different words (C++/C vs. .NET/Java).

Main points:
  • volatile in C++ is connected to optimization during access to variable - no optimization is allowed. Operations on nearby non volatile variables depends on compiler (can be move before or after volatile operation).
  • operations on volatile in C++ does not guarantee atomicity - resolution is atomic in C++ (or e.g. atomic_int in C) available in Boost (but I cannot find it) and will be in C++0x.
  • volatile in managed environments (.Net, Java) does not allow to move some operations on nearby non volatile variables - "ordinary reads and writes can't move upward across (from after to before) an ordered atomic read, and can't move downward across (from before to after) an ordered atomic write. In brief, that could move them out of a critical section of code, and you can write programs that can tell the difference.".
volatile in .Net/Java:
  • keeps order - lock free programming,
  • allows some code optimizations - not good for interactions with hardware - but BTW managed environments does not allow to treat memory as a resource to write somewhere programmer might like to - unmanaged code have to be used (e.g. C++ with its volatile:).
volatile in C++/C:
  • might not keep order - depends on compiler,
  • does not allow code optimization - good for interactions with hardware.
Generally speaking volatile in C++ means unoptimizable variable (optimizations in access are not allowed) and in managed worlds it means ordered atomics (other operations cannot move before/after and operations on volatile are atomic - sometimes?).

Not sure points:
  • atomicity of volatile operations in .Net/Java - for what variables, e.g. what about architectures 32, 64?,
  • "These (volatile) are suitable for nearly all lock-free code uses, except for rare examples similar to Dekker's algorithm. .NET is fixing these remaining corner cases in Visual Studio 2010" - what are the problems?

czwartek, 14 maja 2009

Optex, fairness v.s. convoys and critical regions

Interesting, but quit old article concerning synchronization (with fairness, convoys considerations) by Jeffrey Richter can be found at http://msdn.microsoft.com/en-us/magazine/cc163642.aspx.

According to author fairness cannot be assured in .NET - main reason is GC. GC has to suspend threads before execution but sometimes suspended is a thread that invokes unmanaged Win32 WaitForSingleObject or WaitForMultipleObjects function. After wake up order in waiting queue can be different than original. Other sources of lacking fairness are APC and debugger.

Optex synchronization mechanism:
  • point of the presented solutions is to avoid switching into kernel mode (interlocked is used for this task),
  • latter of the presented solutions allows new coming threads pass before threads waiting in semaphore queue (waiting in kernel mode). This unfair solution, in case of constant stream of threads requesting enter, has better throughput than former kernel unlocking solution,
  • fairness is completely sacrificed to avoid convoy problem.

Also some words concerning Critical Regions appear in the article:
  • "hand made" synchronization mechanisms shall invoke BeginCriticalRegion in Enter and EndCriticalRegion in Exit,
  • the mechanism informs CLR host that thread was interrupted (aborted) in critical region and some special action shall be performed (e.g. closing appdomain)
Question - how host is informed?

More Jeffrey Richter on MSDN Magazine - http://msdn.microsoft.com/en-us/magazine/cc301191.aspx

piątek, 20 marca 2009

asynchronous exceptions, threads and problems

Aborting thread is evil (lock, using - generally speaking asynchronous exceptions). Abort is ok when thread invokes it on its own (synchronously).

Aborting other thread is ok when we want to destroy also its appdomain.
Interesting article concerning asynchronous exceptions and lock on Eric Lippert's Blog:
"You don't want to take a thread down unless you are doing so in the process of taking down its appdomain. If you cannot write a thread that can shut itself down cleanly, then the appdomain is the sensible structure to use for work that may be arbitrarily cancelled." -- Eric Lippert

Info on thread's abort (from http://www.albahari.com/threading/part4.aspx):
  • Static class constructors are never aborted part-way through (so as not to potentially poison the class for the remaining life of the application domain)
  • All catch/finally blocks are honored, and never aborted mid-stream
  • If the thread is executing unmanaged code when aborted, execution continues until the next managed code statement is reached.
As we can see from above exception is delayed when happens during e.g. finally - it means that thread might never end (if it hangs in finally).

Additionally Suspend (and Resume) is even worse than Abort - can lead to deadlock (suspended in lock). Suspend can be safely called on thread's own. Other thread can invoke Resume - but it is not easy and error prone (more at http://www.albahari.com/threading/part4.aspx).

volatile, execution order, memory model, semantics

Here is my little investigation on volatile, execution order and everything what comes next...

From C# language specification - 3.10 Execution order:
"Execution of a C# program proceeds such that the side effects of each executing thread are preserved at critical execution points. A side effect is defined as a read or write of a volatile field, a write to a non-volatile variable, a write to an external resource, and the throwing of an exception. The critical execution points at which the order of these side effects must be preserved are references to volatile fields (§10.5.3), lock statements (§8.12), and thread creation and termination. The execution environment is free to change the order of execution of a C# program, subject to the following constraints:
  • Data dependence is preserved within a thread of execution. That is, the value of each variable is computed as if all statements in the thread were executed in original program order.
  • Initialization ordering rules are preserved (§10.5.4 and §10.5.5).
  • The ordering of side effects is preserved with respect to volatile reads and writes (§10.5.3). Additionally, the execution environment need not evaluate part of an expression if it can deduce that that expression’s value is not used and that no needed side effects are produced (including any caused by calling a method or accessing a volatile field). When program execution is interrupted by an asynchronous event (such as an exception thrown by another thread), it is not guaranteed that the observable side effects are visible in the original program order."

"The memory model in .NET talks about when reads and writes "actually" happen compared with when they occur in the program's instruction sequence. Reads and writes can be reordered in any way which doesn't violate the rules given by the memory model. As well as "normal" reads and writes there are volatile reads and writes. Every read which occurs after a volatile read in the instruction sequence occurs after the volatile read in the memory model too - they can't be reordered to before the volatile read. A volatile write goes the other way round - every write which occurs before a volatile write in the instruction sequence occurs before the volatile write in the memory model too." (from http://www.yoda.arachsys.com/csharp/threads/volatility.shtml)

"Since field finished has been declared volatile, the main thread must read the actual value from the field result." (from http://msdn.microsoft.com/en-us/library/aa645755(VS.71).aspx)

According to above execution order in .NET memory model is kept with volatile!
Does it mean that former writes to variables that are not volatile will be finished (probably yes) and visible to other threads?
How volatile influences operations on memory (especially cache)? What if each core has its own cache or multiprocessor is used?
Am I asking about volatile v.s. cache coherence?
Some reference to this problem:
"The JIT is responsible to maintain correct semantics for
a given target processor by emitting the necessary instruction for that
processor, including processor-specific memory ordering ops like
load-acquire, fences ..." (from http://www.eggheadcafe.com/conversation.aspx?messageid=30644089&threadid=30643832)
"The Whidbey memory model (Framework V2) targets both IA-32 and IA-64, this
memory model assumes that every shared write (ordinary as well as
interlocked) becomes globally visible to all other processors
simultaneously. This is implicitly true because all writes have release
semantics on IA-32 and X64 CPU's, on IA-64 it's just a matter of emitting a
st.rel (store release) instruction for every write to perform each
processor's stores in order and to make them visible in the same order to
other processors (that is, the execution environment is Processor
Consistent).
Above means that the JIT has to emit a st.rel for _name = name; when run on
IA-64 (64 bit managed code), so the other thread will actually see p.Name
pointing to the string.
Add to that that a thread creation implies a full barrier (fence), so in
this particular case it's not required to include a MemoryBarrier in the
thread procedure. Note that the CLR contains other services that implicitly
raise memory barriers, think of Monitor.Enter, Monitor.Exit, ThreadPool
services, IO services..... So I think that for all except the extreme cases,
you can live without thinking about MemoryBarriers in managed code even
when compiled for IA-64." (from http://www.eggheadcafe.com/conversation.aspx?messageid=30644255&threadid=30643832)

Well, It seems that everything is perfect:) We should rely on JIT to generate proper code for underlying hardware.

Double-Check Locking in C# using volatile (from http://msdn.microsoft.com/en-us/library/ms998558.aspx)

More about memory models,semantics on http://www.diag.com/ftp/Memory_Models.pdf

Article about volatile in C# at http://www.codeproject.com/KB/threads/volatile_look_inside.aspx

Producers-Consumers (ordered and unordered, bounded buffers) in C# on CodeProject

Recently I have published reworked version of producers-consumers library.
Available at Multithreaded buffer poll in .NET
Code contains examples: encryption-compression and decompress-decrypt programs.

wtorek, 24 lutego 2009

lambdas + threading = problem

Lambdas introduced in C# 3.0 are great, but sometimes it might be tempting to use them to fit prototype of method invoked in thread to Thread class constructor argument e.g.:

static void f(int i)
{
Console.WriteLine(i);
}

static void Main(string[] args)
{
for (int i = 0; i < 10; ++i)
{
new Thread(() => f(i)).Start();
}
...
}

Interesting results - isn't it? No copy - no good!
For one argument it can be written

static void f(int i)
{
Console.WriteLine(i);
}

static void Main(string[] args)
{
for (int i = 0; i < 10; ++i)
{
new Thread(a => f((int)a)).Start(i);
}
...
}

niedziela, 22 lutego 2009

Producers-Consumers (ordered, bounded buffer) in C# - reloaded

Well, before using new possibilities it is better to recognize how they really work. Parallel Extensions introduces Task class and as name states it shall be used in "bag of tasks" pattern - it shall not be important how many of them works at the moment. Unfortunately, in last code example, solution follows was not only "bag of tasks" pattern but also "pipeline" pattern. In that case it is important how many of the tasks are working at the moment. In each of the pipeline stages at least one thread shall work. This might not be true when using Task objects for all of them. On single core machine only one or two of tasks might started to work (e.g. reader and one compressor, but no writer) and everything hangs.

Another place that has changed is handling of queue with data items (full queue). Solution has been divided into two classes - first UnorderedItemsPool that behaves like in previous post (multiple consumers, FIFO order) - order in which data is obtained by the consumers does not matter. Id of data is not stored when data is post to the queue. Id (ticket) is generated during data retrieval by the consumer.
Second class - OrderedItemsPool - allows consumer to obtain data in proper order (might be different from producers posting order). Each element posted to the queue has id (ticket - unique and sequential number) assigned by producer. Internally SortedList class was used to imitate priority queue.


namespace ThreadingColaboration
{
public class EndOfDataException : Exception
{
}

public abstract class ItemsPool where T : new()
{
private int itemsCreated_ = 0;
private int itemsMaxNumber_;
// stack for better cache locality?
private Stack emptyStack_ = new Stack();

protected int lastItemIdDequeued_ = -1;
protected object fullQueueLock_ = new object();
protected bool endOfData_ = false;

// number of buffers not constrained
public ItemsPool()
{
itemsMaxNumber_ = int.MaxValue;
}

public ItemsPool(int itemsMaxNumber)
{
itemsMaxNumber_ = itemsMaxNumber;
}

public void EnqueueEmpty(T buf)
{
lock (emptyStack_)
{
emptyStack_.Push(buf);
Monitor.Pulse(emptyStack_);
}
}

public T DequeueEmpty()
{
T res;

lock (emptyStack_)
{
if (emptyStack_.Count == 0 && itemsCreated_ < itemsMaxNumber_)
{
res = new T();
++itemsCreated_;
}
else
{
while (emptyStack_.Count == 0)
Monitor.Wait(emptyStack_);
res = emptyStack_.Pop();
}
}

return res;
}

public void EnqueueEndOfData()
{
lock (fullQueueLock_)
{
endOfData_ = true;
Monitor.PulseAll(fullQueueLock_);
}
}

public abstract void EnqueueData(int itemId, T buf);

public abstract int DequeueData(out T buf);
}

public class UnorderedItemsPool : ItemsPool where T : new()
{
private Queue fullQueue_ = new Queue();

public UnorderedItemsPool()
{
}

public UnorderedItemsPool(int itemsMaxNumber)
: base(itemsMaxNumber)
{
}

public override void EnqueueData(int itemId, T buf)
{
// object cannot be used for data enqueuing after marking endOfData_
if (endOfData_)
throw new EndOfDataException();

lock (fullQueueLock_)
{
fullQueue_.Enqueue(buf);
Monitor.Pulse(fullQueueLock_);
}
}

public override int DequeueData(out T buf)
{
int res;
lock (fullQueueLock_)
{
while (fullQueue_.Count == 0)
{
// no more data exception
if (endOfData_)
throw new EndOfDataException();
Monitor.Wait(fullQueueLock_);
}
buf = fullQueue_.Dequeue();
res = ++lastItemIdDequeued_;
}
return res;
}
}

public class OrderedItemsPool : ItemsPool where T : new()
{
private SortedList fullPriorQueue_ = new SortedList();

public OrderedItemsPool()
{
}

public OrderedItemsPool(int itemsMaxNumber)
: base(itemsMaxNumber)
{
}

public override void EnqueueData(int itemId, T buf)
{
// object cannot be used for data enqueuing after marking endOfData_
if (endOfData_)
throw new EndOfDataException();

lock (fullQueueLock_)
{
fullPriorQueue_.Add(itemId, buf);
Monitor.PulseAll(fullQueueLock_);
}
}

public override int DequeueData(out T buf)
{
int res;
lock (fullQueueLock_)
{
while ((fullPriorQueue_.Count == 0) || (fullPriorQueue_.First().Key != lastItemIdDequeued_ + 1))
{
// no more data exception
if ((fullPriorQueue_.Count == 0) && endOfData_)
throw new EndOfDataException();

Monitor.Wait(fullQueueLock_);
}
buf = fullPriorQueue_.First().Value;
Debug.Assert(fullPriorQueue_.First().Key == lastItemIdDequeued_ + 1);
fullPriorQueue_.RemoveAt(0);
res = ++lastItemIdDequeued_;
Monitor.PulseAll(fullQueueLock_);
}
return res;
}
}
}


Below we can see that Thread class was used in place of Task for reader and writer.
Thanks to new pool buffers we have simplified code for Compress tasks.


class Program
{
static int bufferSize = 64 * 1024;

public class BuffType
{
public byte[] buf;

public BuffType()
{
buf = new byte[bufferSize];
}
}

static public void Reader(Stream strmIn, ThreadingColaboration.ItemsPool inputBuffers)
{
int bytesRead;
do
{
BuffType res = inputBuffers.DequeueEmpty();

bytesRead = strmIn.Read(res.buf, 0, res.buf.Length);

// last buffer might be smaller
if (bytesRead != res.buf.Length)
Array.Resize(ref res.buf, bytesRead);
// itemId does not matter for unordered input buffer
inputBuffers.EnqueueData(0, res);
} while (bytesRead != 0);

inputBuffers.EnqueueEndOfData();
}

static public void Compress(
ThreadingColaboration.ItemsPool inputBuffers,
ThreadingColaboration.ItemsPool outputBuffers)
{
BuffType inputBuffer;
int actual;

try
{
actual = inputBuffers.DequeueData(out inputBuffer);

if (inputBuffer.buf.Length != 0)
{
MemoryStream ms = outputBuffers.DequeueEmpty();

// new instance because we need clean object to compress data
// without garbage from previous data compression (iteration)
using (DeflateStream ds = new DeflateStream(ms, CompressionMode.Compress, true))
{
ds.Write(inputBuffer.buf, 0, inputBuffer.buf.Length);
}
inputBuffers.EnqueueEmpty(inputBuffer);
// put buffer into the queue without waiting
outputBuffers.EnqueueData(actual, ms);
}
}
catch (ThreadingColaboration.EndOfDataException)
{
}
}

static public void Writer(BinaryWriter writer, ThreadingColaboration.ItemsPool outputBuffers)
{
try
{
while (true)
{
MemoryStream ms;
int bufNo = outputBuffers.DequeueData(out ms);
// compressed data length - reason why we need BinaryWriter (not only Stream)
writer.Write((int)ms.Position);
// Position not Size because buffer can be larger than data stored inside
writer.Write(ms.GetBuffer(), 0, (int)ms.Position);
ms.Position = 0;
outputBuffers.EnqueueEmpty(ms);
}
}
catch (ThreadingColaboration.EndOfDataException)
{
}
}

static void Main(string[] args)
{
int bufferNo = 8;
string inputFilename = "uncompressed.in";
string outputFilename = "compressed.out";

ThreadingColaboration.ItemsPool inputBuffers =
new ThreadingColaboration.UnorderedItemsPool(bufferNo);
ThreadingColaboration.ItemsPool outputBuffers =
new ThreadingColaboration.OrderedItemsPool();

using (Stream strmIn = File.OpenRead(inputFilename))
{
new Thread(o => { Reader(strmIn, inputBuffers); }).Start();

using (Stream strmOut = File.Create(outputFilename))
{
using (BinaryWriter writer = new BinaryWriter(strmOut))
{
Thread tw = new Thread(o => { Writer(writer, outputBuffers); });
tw.Start();

List ts = new List();
FileInfo fi = new FileInfo(inputFilename);
int tn = (int)(fi.Length / bufferSize) + 1;
for (int i = 0; i < tn; ++i)
ts.Add(Task.Create(o => { Compress(inputBuffers, outputBuffers); }));
// we have to wait till all task finishes
Task.WaitAll(ts.ToArray());
// and then mark EndOfData for writer thread
outputBuffers.EnqueueEndOfData();

// wait till writer thread finishes
tw.Join();
}
}
}
}
}

środa, 18 lutego 2009

Producers-Consumers (ordered, bounded buffer) in C#

Here is new version of file compression in multi-threaded manner. Presented OrderedItemsPool class within ThreadingColaboration namespce uses Producers-Consumers pattern in relationship between threads. Elements with data are returned in FIFO order. Each of them is assigned a ticket with number to synchronize further processing. Empty elements are stored in FILO manner - it might support cache locality.
Presented class supports bounded or unbounded (i.e. number of items created is not constrained) buffer pattern (constructor with parameter and prameterless constructor).
Another problem is connected to end of processing. In presented code when last element is put into queue, producer shall invoke EnqueueEndOfData method (when there are multiple producers, method shall be invoked after all producers finishes). After EndOfData mark is set, no more data can be post into queue (otherwise exception will be raised). Additionally to inform consumer(s) exception is thrown when consumer is trying to get data that will not appear.


namespace ThreadingColaboration
{
public class EndOfDataException : Exception
{
}

public class OrderedItemsPool where T : new()
{

private int itemsCreated_ = 0;
private int lastItemId_ = -1;
private int itemsMaxNumber_;
private Queue fullQueue_ = new Queue();
private Stack emptyStack_ = new Stack();
private bool endOfData_ = false;

// number of buffers not constrained
public OrderedItemsPool()
{
itemsMaxNumber_ = int.MaxValue;
}

public OrderedItemsPool(int itemsMaxNumber)
{
itemsMaxNumber_ = itemsMaxNumber;
}

public void EnqueueData(T buf)
{
// object cannot be used for data enqueuing after marking endOfData_
if (endOfData_)
throw new EndOfDataException();

lock (fullQueue_)
{
fullQueue_.Enqueue(buf);
Monitor.Pulse(fullQueue_);
}
}

public void EnqueueEndOfData()
{
lock (fullQueue_)
{
endOfData_ = true;
Monitor.PulseAll(fullQueue_);
}
}

public int DequeueData(out T buf)
{
int res;
lock (fullQueue_)
{
while (fullQueue_.Count == 0)
{
// if no more data - exception
if (endOfData_)
throw new EndOfDataException();
Monitor.Wait(fullQueue_);
}
buf = fullQueue_.Dequeue();
res = ++lastItemId_;
}
return res;
}

public void EnqueueEmpty(T buf)
{
lock (emptyStack_)
{
emptyStack_.Push(buf);
Monitor.Pulse(emptyStack_);
}
}

public T DequeueEmpty()
{
T res;

lock (emptyStack_)
{
lock (emptyStack_)
{
if (emptyStack_.Count == 0 && itemsCreated_ < itemsMaxNumber_)
{
res = new T();
++itemsCreated_;
}
else
{
while (emptyStack_.Count == 0)
Monitor.Wait(emptyStack_);
res = emptyStack_.Pop();
}
}

return res;
}
}
}


Here is sample program that uses above class to compress file using multiplethreads.
Solution uses elements from .NET 3.5 and Parallel Extensions.
No thread is created explicitly - Task class is used. Here we have one task that reads from file, multiple tasks that obatains uncopressed data and produces compressed. Last task saves data to output file. Number of task that compresses data is equal to number of data block that shall be read from file. It is one of the properties of tasks - it is light weight object (comparing to thread). Creation of one thread is about 1MB (more info in video).


class Program
{
static private ThreadingColaboration.OrderedItemsPool inputBuffers;
static private ThreadingColaboration.OrderedItemsPool outputBuffers;

static private int writePos = 0;
static private object o = new object();

static int bufferSize = 64 * 1024;

class BuffType
{
public byte[] buf;

public BuffType()
{
buf = new byte[bufferSize];
}
}

static public void Reader(Stream strmIn)
{
int bytesRead;
do
{
BuffType res = inputBuffers.DequeueEmpty();

bytesRead = strmIn.Read(res.buf, 0, res.buf.Length);

// last buffer might be smaller
if (bytesRead != res.buf.Length)
Array.Resize(ref res.buf, bytesRead);

inputBuffers.EnqueueData(res);

} while (bytesRead != 0);

inputBuffers.EnqueueEndOfData();
}

static public void Compress()
{
BuffType inputBuffer;
int actual;

try
{
actual = inputBuffers.DequeueData(out inputBuffer);

if (inputBuffer.buf.Length != 0)
{
MemoryStream ms = outputBuffers.DequeueEmpty();

// new instance because we need clean object to compress data
// without garbage from previous data compression (iteration)
using (DeflateStream ds = new DeflateStream(ms, CompressionMode.Compress, true))
{
ds.Write(inputBuffer.buf, 0, inputBuffer.buf.Length);
}
inputBuffers.EnqueueEmpty(inputBuffer);
// enter monitor
lock (o)
{
// check order
while (actual != writePos) Monitor.Wait(o);

outputBuffers.EnqueueData(ms);

++writePos;

//wake up waiting
Monitor.PulseAll(o);
}
}
}
catch (ThreadingColaboration.EndOfDataException)
{
}
}

static public void Writer(BinaryWriter writer)
{
try
{
while (true)
{
MemoryStream ms;
int bufNo = outputBuffers.DequeueData(out ms);

// compressed data length - reason why we need BinaryWriter (not only Stream)
writer.Write((int)ms.Position);
// Position not Size because buffer can be larger than data stored inside
writer.Write(ms.GetBuffer(), 0, (int)ms.Position);

ms.Position = 0;
outputBuffers.EnqueueEmpty(ms);
}
}
catch (ThreadingColaboration.EndOfDataException)
{
}
}

static void Main(string[] args)
{
int bufferNo = 8;
string inputFilename = "uncompressed.in";
string outputFilename = "compressed.out";

inputBuffers = new ThreadingColaboration.OrderedItemsPool(bufferNo);
outputBuffers = new ThreadingColaboration.OrderedItemsPool();


using (Stream strmIn = File.OpenRead(inputFilename))
{
Task.Create(o => { Reader(strmIn); });

using (Stream strmOut = File.Create(outputFilename))
{
using (BinaryWriter writer = new BinaryWriter(strmOut))
{
Task tw = Task.Create(o => { Writer(writer); });

List ts = new List();
FileInfo fi = new FileInfo(inputFilename);
int tn = (int)(fi.Length / bufferSize) + 1;
for (int i = 0; i < tn; ++i)
ts.Add(Task.Create(o => { Compress(); }));
// we have to wait till all task finishes
Task.WaitAll(ts.ToArray());
// and then mark endOfData_ for writer thread
outputBuffers.EnqueueEndOfData();

// wait till writer thread finishes
tw.Wait();
}
}
}
}
}

sobota, 14 lutego 2009

Multi-core file compression in .NET

Recently I have came across article on Inform It called Compressing a Very Large File and its parallel version Making It Faster both written by Jim Mischel. Some might say that compression by deflate algorithm is nothing special, but parallelism is what I am really interested in. I have also observed that in some situations code does not work properly. What I mean is to compress already compressed files. In that case such weak algorithm as deflate will produce files larger than original. The case is quite common - almost every document format has internal compression nowadays. The code presented in articles hangs when trying to compress already compressed file - buffer overflow occurs (as author said it is not a production quality and when used to plain text file, like xml, it works properly).
I tried to remove problem with buffer overflow – just not to attach buffer to stream – let it grow.
I have also removed busy waiting and use monitor to synchronization.
Order in which blocks are read and written is set up using critical section with r object.
Degree of parallelism (number of threads) is contained in variable tn.
In some spare time I will try to add some syntactic sugar from .NET 3.5 and new threading functionalities from Microsoft Parallel Extensions.

Here is simple solution:


class Program
{
static private int readPos = 0;
static private int writePos = 0;
static private object r = new object();
static private object o = new object();
static private void Compress(BinaryWriter writer, Stream strmIn)
{
using (MemoryStream ms = new MemoryStream())
{
byte[] inputBuffer = new byte[bufferSize];
int bytesRead;
do
{
int actual;
lock (r)
{
bytesRead = strmIn.Read(inputBuffer, 0, inputBuffer.Length);
actual = readPos++;
}
if (bytesRead != 0)
{
// new instance because we need clean object to compress data
// without garbage from previous data compression (iteration)
using (DeflateStream ds = new DeflateStream(ms, CompressionMode.Compress, true))
{
ds.Write(inputBuffer, 0, bytesRead);
}
// enter monitor
lock (o)
{
// check order
while (actual != writePos)
Monitor.Wait(o);
// compressed data length - reason why we need BinaryWriter (not only Stream)
writer.Write((int)ms.Position);
// Position not Size because buffer can be larger than data stored inside
writer.Write(ms.GetBuffer(), 0, (int)ms.Position);
++writePos;
//wake up waiting
Monitor.PulseAll(o);
}
// not to expand buffer in next iteration
ms.Position = 0;
}
} while (bytesRead != 0);
}
}

const string inputFilename = "file.in";
const string outputFilename = "file_compressed.out";
const int bufferSize = 64 * 1024;
static int tn = 2;

static void Main(string[] args)
{
if (args.Length >= 1)
tn = int.Parse(args[0]);
Stopwatch sw = new Stopwatch();
sw.Start();
using (Stream strmIn = File.OpenRead(inputFilename))
{
using (Stream strmOut = File.Create(outputFilename))
{
using (BinaryWriter writer = new BinaryWriter(strmOut))
{
List ts = new List();
for (int i = 0; i < tn; ++i)
ts.Add(new Thread(delegate() { Compress(writer, strmIn); }));
foreach (Thread t in ts)
t.Start();
foreach (Thread t in ts)
t.Join();

// to mark data end explicitly
writer.Write((int)0);
}
}
}
Console.WriteLine(sw.ElapsedMilliseconds);
}
}

piątek, 13 lutego 2009

Hello World!

Welcome to my new blog site.

I hope that the journey will be long lasting.