Monday, 25 July 2011

Synchronizing Threads in Multithreaded application in .Net – C#



The Problem “Concurrency”
When you build multithreaded application, your program needs to ensure that shared data should be protected from against the possibility of multiple threads engagement with its value. What’s gonna happen if multiple threads were accessing the data at the same point? CLR can suspend your any thread for a while who’s going to update the value or is in the middle of updating the value and same time a thread comes to read that value which is not completely updated, that thread is reading an uncompleted/unstable data.
To illustrate the problem of concurrency let write some line of code
class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("----Synchronnization of Threads-----");
            Console.WriteLine("Main Thread {0}", Thread.CurrentThread.ManagedThreadId);
            Printer p = new Printer();

            Thread[] threads = new Thread[5];

            //Queue 5 threads
            for (int i = 0; i < 5; i++)
            {
                threads[i] = new Thread(new ThreadStart(p.PrintNumbersNonSync));
            }
            foreach (Thread t in threads)
            {
                t.Start();
            }

            Console.ReadLine();
        }
    }


    class Printer
    {
        public void PrintNumbersNonSync()
        {
            Console.WriteLine(" ");
            Console.WriteLine("Executing Thread {0}", Thread.CurrentThread.ManagedThreadId);
            for (int i = 1; i <= 10; i++)
            {
                Console.Write(i + " ");
            }

        }
           
    }

Run this program multiple times and watch the output:
Output1:

Output2:

your output could be different from these.

As you can see all the output would be different as many time you run the program.
What happening here is that all the threads are sharing the same object of Printer class and trying to execute the same function at the same time so every time the shared data is being updated in a random pattern which is an unstable state.
Synchronization of threads Solution to the problem of concurrency
Use the locks whenever there’s a Shared section. C# provides Lock keyword that can be used to lock the section that is being accessed by multiple threads. This is the very basic technique to avoid the instability in multithreaded environment while programming with c#.
The Lock keyword requires you to specify the token (an object reference) that must be acquired by a thread to enter within the lock scope. In case of private method you can lock it down by passing the reference of current type using “this” keyword.
For e.g.
        public void PrintNumbersSynchronized()
        {
            //Synchronization thread
            lock (this)
            {
                Console.WriteLine(" ");
                Console.WriteLine("Executing Thread {0}", Thread.CurrentThread.ManagedThreadId);
                for (int i = 1; i <= 10; i++)
                {
                    Console.Write(i + " ");
                }

            }
        }

However if you are locking down a region of code within a public member, it is safer (and best practice) to declare a private object member variable to server as the lock token:
    class Printer
    {
        // Synchronization token
        private Object ThreadLock = new object();

        public void PrintNumbersSynchronized()
        {
            //Synchronization thread
            lock (ThreadLock)
            {
                Console.WriteLine(" ");
                Console.WriteLine("Executing Thread {0}", Thread.CurrentThread.ManagedThreadId);
                for (int i = 1; i <= 10; i++)
                {
                    Console.Write(i + " ");
                }

            }
        }
    }
Output:

Other methods to perform the synchronization
Monitor is class in System.Threading namespace that also provides the same functionality. Actually lock is just a shorthand notation of Monitor class. Once compiler processed a lock it actually resolves to the following:
        public void PrintNumbersSync()
        {
            Monitor.Enter(ThreadLock);
            try
            {
                Console.WriteLine(" ");
                Console.WriteLine("Executing Thread {0}", Thread.CurrentThread.ManagedThreadId);
                for (int i = 1; i <= 10; i++)
                {
                    Console.Write(i + " ");
                }
            }
            finally
            {
                Monitor.Exit(ThreadLock);
            }

        }
Here little more code you can see having the try..finally block. Other than Enter() and Exit() methods, Monitor class also provides the methods like Monitor.Pulse() and PulseAll() to inform the waiting threads that its completed.
Simple operations using the Interlocked Type
To perform some basic operation it’s not necessary that you write the block using the lock or Monitor. System.Threading namespace provides an interesting class that can help you out.
Now you can replace this
            lock (ThreadLock)
            {
                intVal++;
            }

By
int newVal = Interlocked.Increment(ref intVal);

It returns the new values of updated variable as well as update the referenced value at the same time.
Similarly,
Add()
Exchange()  //for swapping
CompareExchange() // Compare a value and then exchange
Equals()
Decrement()
Read()
Are some basic operations you can perform in a multithreaded environment to make your threadsafe using the Interlocked class.
Synchronization using the [Synchronization] attribute
The Synchronization attribute is a member of System.Runtime.Remoting.Contexts namespace. In essence, this class-level attribute effectively lock downs all instance members code of the object for the thread safety. Additionally you have to derive your class from ContextBoundObject  to keep your object with in the contextual boundries.
Here’s the complete code:

    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("----Synchronnization of Threads-----");
            Console.WriteLine("Main Thread {0}", Thread.CurrentThread.ManagedThreadId);
            Printer p = new Printer();

            Thread[] threads = new Thread[5];

            //Queue 5 threads
            for (int i = 0; i < 5; i++)
            {
                threads[i] = new Thread(new ThreadStart(p.PrintNumbersNonSync));
            }
            foreach (Thread t in threads)
            {
                t.Start();
            }

            Console.ReadLine();
        }
    }

    [Synchronization]
    class Printer : ContextBoundObject
    {
        public void PrintNumbersNonSync()
        {
            Console.WriteLine(" ");
            Console.WriteLine("Executing Thread {0}", Thread.CurrentThread.ManagedThreadId);
            for (int i = 1; i <= 10; i++)
            {
                Console.Write(i + " ");
            }

        }
     
    }
Output:

This appraoch is a lazy way to write thread safe code because CLR can lock non-thread sensitive data and that could be an victim of OverLocking. So please choose this approach wisely and carefully.

Happy Threading…!!

Sunday, 24 July 2011

Optimizing wait in MultiThreading Environment – C#


While working in Multithreading environment the primary thread has to wait until it’s secondary thread to complete their execution. So in this case most of the time we use Thread.Sleep() and that is a bad practice because it blocks the primary thread for a time that might be less or exceeds than the completion time of secondary thread.  You can optimize the waiting time in multithreading by using System.Threading.AutoResetEvent class.
To achieve this we’ll utilize the WaitOne() method of AutoResetEvent class. it’ll wait until the it gets notified from secondary thread for completion and This I think should be the optimized way for wait here.
Let see the example:
using System;
using System.Threading;
namespace Threads_CSHandler_AutoResetEvent
{
    class Program
    {
        //Declare the wait handler
        private static AutoResetEvent waitHandle = new AutoResetEvent(false);

        static void Main(string[] args)
        {
            Console.WriteLine("Main() running on thread {0}", Thread.CurrentThread.ManagedThreadId);

            Program p = new Program();
            Thread t = new Thread(new ThreadStart(p.PrintNumbers));
            t.Start();
            //Wait untill get notified
            waitHandle.WaitOne();
            Console.WriteLine("\nSecondary thread is done!");
            Console.Read();
        }

        public void PrintNumbers()
        {
            Console.WriteLine(" ");
            Console.WriteLine("Executing Thread {0}", Thread.CurrentThread.ManagedThreadId);
            for (int i = 1; i <= 10; i++)
            {
                //Just to consume some time in operation
                Thread.Sleep(100);
                Console.Write(i + " ");
            }
            //Tell other other thread that we're done here
            waitHandle.Set();
        }
    }
}

Output:

Fun while it’snt done !!

Thursday, 21 July 2011

Delegate and Async Programming Part II - C# (AsyncCallback and Object State)


In the previous post we discussed about the use of delegates to call the methods asynchronously then we talked about Synchronization of threads in Multithreading environment. In the previous code example we used BeginInvoke() method make the Async call with below parameters:
Since I’m using the same example so just mentioning all the terms and code line we used in the previous example:
        //Simple delegate declaration
        public delegate int BinaryOp(int x, int y);

        //An Add() method that do some simple arithamtic operation
        public int Add(int a, int b)
        {
            Console.WriteLine("Add() running on thread {0}", Thread.CurrentThread.ManagedThreadId);
            Thread.Sleep(500);
            return (a + b);
        }

            BinaryOp bp = new BinaryOp(Add);
            IAsyncResult iftAr = bp.BeginInvoke(5, 5, null, null);

Just to remind that bp is the instance of BinaryOP delegate that pointing to simple Add() methods that perform some basic arithmetic operations. We passed the arguments to BeginInvoke() with some null values. Now instead of using null in the parameters you’ll use objects which are part of the signature of BeginInvoke() method of dynamic class of delegate BinaryOp.
BeginInvoke(int x, int y, AsyncCallback cb, Object state);
Let’s discuss what the last two parameters are for:
The AsyncCallback Delegate
Rather than polling the delegate to determine whether an asynchronous call is completed using the properties of IAsyncResult object, It would be more efficient to have the secondary inform the calling thread when  the task is finished. When you wish to enable this behavior you need to pass an instance of System.AsyncCallback delegate as a parameter to BeginInvoke(), which up until this point has been NULL.
Note: The Callback method will be called on the secondary thread not on the primary Main() thread.
The method that has to be passed into the AsyncCallback delegate should have void return type and IAsyncResult as a parameter to match the delegate signature.
        //Target of AsyncCallback delegate should match the following pattern
        public void AddComplete(IAsyncResult iftAr)

Note: In the previous Example we used an Boolean variable in two different thread which is not Thread safe. However to use such shared variable the very good rule of thumb is to ensure that data that can be shared among multiple thread is locked down.
Now when secondary thread completes the Add() operation the question should come into your mind  where I’m gonna call the EndInvoke().  Well you can call the EndInvoke in two places.
  1. Inside the main just after the condition where you’re waiting for the secondary thread to complete. But this is not solving the purpose of passing AsyncCallback.
        public delegate int BinaryOp(int x, int y);
        static void Main(string[] args)
        {
            Console.WriteLine("Main() running on thread {0}", Thread.CurrentThread.ManagedThreadId);

            Program p=new Program();
            BinaryOp bp = new BinaryOp(p.Add);
            IAsyncResult iftAr = bp.BeginInvoke(5, 5, new AsyncCallback(p.AddComplete), null);

            while (!iftAr.AsyncWaitHandle.WaitOne(100,true))
            {
                Console.WriteLine("Doing some work in Main()!");
            }
            int result = bp.EndInvoke(iftAr);
            Console.WriteLine("5 + 5 ={0}", result);
            Console.Read();
        }
//An Add() method that do some simple arithamtic operation
        public int Add(int a, int b)
        {
            Console.WriteLine("Add() running on thread {0}", Thread.CurrentThread.ManagedThreadId);
            Thread.Sleep(500);
            return (a + b);
        }

        //Target of AsyncCallback delegate should match the following pattern
        public void AddComplete(IAsyncResult iftAr)
        {
            Console.WriteLine("AddComplete() running on thread {0}", Thread.CurrentThread.ManagedThreadId);

            Console.WriteLine("Operation completed.");
        }
Output:

Here you can see the AddComplete() is called after the Main() completed its execution. If you are looking deep into the code and find iftAr.AsyncWaitHandle.WaitOne(100,true)) as something alien statement then you should look at the Delegate and Async Programming Part I - C#.

  1. Second and interesting approach is showing the result by calling EndInovke() inside the AddComplete(). Now you should have question here. The instance of BinaryOp delegate is running on the primary thread in the Main() How can we access it to secondary thread to call the EndInvoke()? Then answer is System.Runtime.Remoting.Messaging.AsyncResult class.

The AsyncResult class
The AddComplete() method of AsyncCallback takes the AsyncResult class object compatible to IAsyncResult as argument. The static AsyncDelegate property of AsyncResult class returns a reference to the original asynchronous delegate that was created elsewhere, and which is accessible through the IAsyncResult. Simply just cast the returned object by the AsyncDelegate to BinaryOp.
Update the code in the example remove the lines of EndInvoke from the Main() and let’s call it from the AddComplete() method.
//Target of AsyncCallback delegate should match the following pattern
        public void AddComplete(IAsyncResult iftAr)
        {
            Console.WriteLine("AddComplete() running on thread {0}", Thread.CurrentThread.ManagedThreadId);

            Console.WriteLine("Operation completed.");

            //Getting result
            AsyncResult ar = (AsyncResult)iftAr;
            BinaryOp bp = (BinaryOp)ar.AsyncDelegate;
            int result = bp.EndInvoke(iftAr);

            Console.WriteLine("5 + 5 ={0}", result);
            Console.WriteLine("Message recieved on thread {0}", Thread.CurrentThread.ManagedThreadId);
        }

Now the output will be:

Hope you can see the difference in the output of both the approaches here.

Passing and Receiving Custom State Data using the final argument of BeginInvoke()
Now let’s address the final argument of BeginInvoke() which has been null up to this point. This parameter allows you to pass additional state information from primary thread to secondary thread.
Because the proto type is System.Object so you can pass any object as parameter, as long as the callback knows what type of object to expect.
Now let demonstrate passing a simple string message form the primary thread in the BeginInvoke() and receiving it in the Callback on the secondary thread using the AsyncState property of IAsyncResult incoming parameter.
Now here’s the complete code:
class Program
    {
        //Simple delegate declaration
        public delegate int BinaryOp(int x, int y);
        static void Main(string[] args)
        {
            Console.WriteLine("Main() running on thread {0}", Thread.CurrentThread.ManagedThreadId);

            Program p=new Program();
            BinaryOp bp = new BinaryOp(p.Add);
            IAsyncResult iftAr = bp.BeginInvoke(5, 5, new AsyncCallback(p.AddComplete), "This message is from Main() thread "+Thread.CurrentThread.ManagedThreadId);
            while (!iftAr.AsyncWaitHandle.WaitOne(100,true))
            {
                Console.WriteLine("Doing some work in Main()!");
            }

            Console.Read();
        }

        //An Add() method that do some simple arithamtic operation
        public int Add(int a, int b)
        {
            Console.WriteLine("Add() running on thread {0}", Thread.CurrentThread.ManagedThreadId);
            Thread.Sleep(500);
            return (a + b);
        }

        //Target of AsyncCallback delegate should match the following pattern
        public void AddComplete(IAsyncResult iftAr)
        {
            Console.WriteLine("AddComplete() running on thread {0}", Thread.CurrentThread.ManagedThreadId);

            Console.WriteLine("Operation completed.");

            //Getting result
            AsyncResult ar = (AsyncResult)iftAr;
            BinaryOp bp = (BinaryOp)ar.AsyncDelegate;
            int result = bp.EndInvoke(iftAr);

            //Recieving the message from Main() thread.
            string msg = (string)iftAr.AsyncState;
            Console.WriteLine("5 + 5 ={0}", result);
            Console.WriteLine("Message recieved on thread {0}: {1}", Thread.CurrentThread.ManagedThreadId, msg);
        }

    }

Output:

I hope you enjoyed this.. I’m expecting that next time when you call BeginInvoke() you won’t pass NULL values to some useful arguments.
MultiThreading is what your multicore processor is doing!
So Cheers….