Threading Patterns: The Worker Thread Pattern

As promised, I am beginning this series of posts about threading patterns that I find useful in everyday coding. To start off, I will introduce you guys to the concept of the worker thread pattern. 

This article assumes that you know a thing or two about the concept of threading in general.

.Net Threading Basics

Like any discussion about threading and concurrency, I will start off with the obligatory Threading Basics.

For the purpose of illustration we’ll use the scenario of copying files from a given directory. We will be using a class named CopyInfo to encapsulate the details of the copy instruction.

    1 public class CopyInfo

    2 {

    3     private string _source;

    4     private bool _recursive;

    5     private string _destination;

    6 

    7     public CopyInfo(

    8         string source,

    9         string destination,

   10         bool recursive)

   11     {

   12         _source = source;

   13         _destination = destination;

   14         _recursive = recursive;

   15     }

   16 

   17     public string Source

   18     {

   19         get { return _source; }           

   20     }

   21 

   22     public string Destination

   23     {

   24         get { return _destination; }           

   25     }

   26 

   27     public bool Recursive

   28     {

   29         get { return _recursive; }           

   30     }

   31 }

And a function named CopyFiles that copies files from a source directory to a destination directory.

    1 static void CopyFiles(CopyInfo copyInfo)

    2 {

    3     if (!Directory.Exists(copyInfo.Destination))

    4     {

    5         Directory.CreateDirectory(copyInfo.Destination);

    6     }

    7 

    8     Console.WriteLine("CopyFiles from '{0}' to '{1}' {2}...",

    9         copyInfo.Source,

   10         copyInfo.Destination,

   11         copyInfo.Recursive ? "recursive" : "non-recursive");

   12 

   13     foreach (string file in

   14         Directory.GetFiles(copyInfo.Source))

   15     {

   16         string destination = Path.Combine(

   17             copyInfo.Destination,

   18             Path.GetFileName(file));

   19         File.Copy(file, destination);

   20     }

   21 

   22     if (copyInfo.Recursive)

   23     {

   24         foreach (string directory in

   25             Directory.GetDirectories(copyInfo.Source))

   26         {

   27             string destination = Path.Combine(

   28                 copyInfo.Destination,

   29                 Path.GetFileName(directory) //get the directory name for the path

   30                 );

   31 

   32             CopyFiles(

   33                 new CopyInfo(

   34                     directory,

   35                     destination,

   36                     copyInfo.Recursive));

   37 

   38         }

   39     }

   40 

   41     Console.WriteLine("CopyFiles finished.");           

   42 }

The main class in .Net threading is the Thread class. In order to use it, you need to have a function which matches the signature of either ThreadStart or ParameterizedThreadStart delegate. For your reference they’re declarations are shown below.

public delegate void ThreadStart();

public delegate void ParameterizedThreadStart(object obj);

Then, you take the name of that function and pass it as a parameter to the constructor of the Thread class. This function can be referred to as the ThreadProc. Once started, the Thread class takes the instructions contained in the ThreadProc and executes it in parallel to the calling function. You can start the thread by calling one of the two overloads of the Start method. If you have a function that matches the signature of the ThreadStart delegate, you can call the parameter-less overload of the Start method. And if your function matches the signature of the ParameterizedThreadStart delegate you can call either of the two overloads of the Start method.

Note that it is an error to pass a parameter to the Start method if your function does not accept a parameter.

With this out of the way, below is an example of starting a Thread using a ThreadStart delegate

    1 private static CopyInfo _copyInfo;

    2 static void Main(string[] args)

    3 {

    4     _copyInfo = new CopyInfo(

    5         Environment.GetFolderPath(Environment.SpecialFolder.MyPictures),

    6         @"c:\pictures",

    7         true);

    8 

    9     Thread simpleThread = new Thread(CopyFilesProc);

   10     simpleThread.Name = "CopyFiles";

   11     simpleThread.Start();

   12     simpleThread.Join();

   13 }

   14 

   15 static void CopyFilesProc()

   16 {

   17     CopyFiles(_copyInfo);

   18 }

And another example which starts a Thread using a ParameterizedThreadStart delegate

    1 private static CopyInfo _copyInfo;

    2 static void Main(string[] args)

    3 {

    4     _copyInfo = new CopyInfo(

    5         Environment.GetFolderPath(Environment.SpecialFolder.MyPictures),

    6         @"c:\pictures",

    7         true);

    8 

    9     Thread parameterizedThread = new Thread(ParameterizedCopyFilesProc);

   10     parameterizedThread.Name = "CopyFiles";

   11     parameterizedThread.Start(_copyInfo);

   12     parameterizedThread.Join();

   13 }

   14 

   15 static void ParameterizedCopyFilesProc(object state)

   16 {

   17     if (state is CopyInfo)

   18     {

   19         CopyFiles(state as CopyInfo);

   20     }

   21 }

The Impedance Mismatch

As you can see, the method which gets executed by the thread is defined outside of it. What's up with that? It doesn’t look natural in an OO kind of sense. And it can be easily mixed up with other parts of the application. Imagine looking at code for a Form object sprinkled with threading and synchronization logic. It doesn’t look too pretty huh?

In the next section, we'll refactor the code above and encapsulate the threading and synchronization logic in its own class in order to provide an object-oriented interface to the Thread class.

Worker Threads

The Worker Thread pattern operates on the concept of Thread-Per-Message. That is, a data/operation pair is executed in the context of its own thread. It effectively speeds up the execution of multiple long running processes when compared to the time it will take them to execute sequentially. It can also be used to move some long running processes in a different thread in order to avoid freezing the ui.

Below is the code for the abstract class WorkerThreadBase. This class exposes different protected constructors which advanced users can use to specify the name, priority and background/foreground behavior of threads. It has one abstract method Work that represents the ThreadProc. Subclasses need to implement this method in order to take advantage of per thread execution. It has two public Methods Start and Stop that does exactly what their name suggests. The Start operation is asynchronous. And the Stop operation blocks until the thread completes execution. There is one Boolean protected property called StopRequested which can be polled inside the ThreadProc method to detect if the user wants to stop the thread.

    1 public abstract class WorkerThreadBase : IDisposable

    2 {

    3     private Thread _workerThread;

    4     private ManualResetEvent _stopping;

    5     private ManualResetEvent _stopped;

    6     private bool _disposed;

    7     private bool _disposing;

    8 

    9     protected WorkerThreadBase()

   10         : this(null, ThreadPriority.Normal)

   11     {

   12     }

   13 

   14     protected WorkerThreadBase(string name)

   15         : this(name, ThreadPriority.Normal)

   16     {

   17     }

   18 

   19     protected WorkerThreadBase(string name,

   20         ThreadPriority priority)

   21         : this(name, priority, false)

   22     {

   23     }

   24 

   25     protected WorkerThreadBase(string name,

   26         ThreadPriority priority,

   27         bool isBackground)

   28     {

   29         _disposing = false;

   30         _disposed = false;

   31         _stopping = new ManualResetEvent(false);

   32         _stopped = new ManualResetEvent(false);

   33 

   34         _workerThread = new Thread(threadProc);

   35         _workerThread.Name = name == null ? GetType().Name : name;

   36         _workerThread.Priority = priority;

   37         _workerThread.IsBackground = isBackground;

   38     }

   39 

   40     protected bool StopRequested

   41     {

   42         get { return _stopping.WaitOne(1, true); }

   43     }

   44 

   45     public void Start()

   46     {

   47         ThrowIfDisposedOrDisposing();

   48         _workerThread.Start();

   49     }

   50 

   51     public void Stop()

   52     {

   53         ThrowIfDisposedOrDisposing();

   54         _stopping.Set();

   55         _stopped.WaitOne();

   56     }

   57 

   58     #region IDisposable Members

   59 

   60     public void Dispose()

   61     {

   62         Dispose(true);

   63     }

   64 

   65     #endregion

   66 

   67     public static void WaitAll(params WorkerThreadBase[] threads)

   68     {

   69         WaitHandle.WaitAll(

   70             Array.ConvertAll<WorkerThreadBase, WaitHandle>(

   71                 threads,

   72                 delegate(WorkerThreadBase workerThread)

   73                 { return workerThread._stopped; }));

   74     }

   75 

   76     public static void WaitAny(params WorkerThreadBase[] threads)

   77     {

   78         WaitHandle.WaitAny(

   79             Array.ConvertAll<WorkerThreadBase, WaitHandle>(

   80                 threads,

   81                 delegate(WorkerThreadBase workerThread)

   82                 { return workerThread._stopped; }));

   83     }

   84 

   85     protected virtual void Dispose(bool disposing)

   86     {

   87         //do nothing if disposed more than once

   88         if (_disposed)

   89         {

   90             return;

   91         }

   92 

   93         if (disposing)

   94         {

   95             _disposing = disposing;

   96 

   97             //stop the thread;

   98             Stop();

   99 

  100             //make sure the thread joins the main thread

  101             _workerThread.Join(1000);

  102 

  103             //dispose of the waithandles

  104             disposeWaitHandle(_stopping);

  105             disposeWaitHandle(_stopped);

  106 

  107             _disposing = false;

  108             //mark as disposed

  109             _disposed = true;

  110         }

  111     }

  112 

  113     protected void ThrowIfDisposedOrDisposing()

  114     {

  115         if (_disposing)

  116         {

  117             throw new InvalidOperationException(

  118                 Properties.Resources.ERROR_OBJECT_DISPOSING);

  119         }

  120 

  121         if (_disposed)

  122         {

  123             throw new ObjectDisposedException(

  124                 GetType().Name,

  125                 Properties.Resources.ERROR_OBJECT_DISPOSED);

  126         }

  127     }

  128 

  129     protected abstract void Work();

  130 

  131     private void threadProc()

  132     {

  133         Work();

  134         _stopped.Set();

  135     }

  136 

  137     private void disposeWaitHandle(WaitHandle waitHandle)

  138     {

  139         if (waitHandle != null)

  140         {

  141             waitHandle.Close();

  142             waitHandle = null;

  143         }

  144     }

  145 }

For your reference I’ve created a dummy worker thread class to show how to use WorkerThreadBase. Notice how it uses the StopRequested property to check if the user called the Stop method.

    1 public class DummyWorker:WorkerThreadBase

    2 {

    3     protected override void Work()

    4     {

    5         //a relatively long task

    6         System.Threading.Thread.Sleep(10000);

    7 

    8         if (StopRequested)

    9         {

   10             return;

   11         }

   12 

   13         //a relatively long task

   14         System.Threading.Thread.Sleep(5000);

   15 

   16         Console.WriteLine("Dummy worker completed.");

   17     }

   18 }

In order to use it, one has to simply instantiate the class and call the Start/Stop method correspondingly.

DummyWorker dummy = new DummyWorker();

dummy.Start();

//do some stuff in the calling method

 

//wait for the dummy worker to stop

dummy.Stop();

And finally, lets refactor the copy file operation to create a CopyFileWorker. Note how we used the constructor to pass the data that the thread operates on.

    1 public class CopyFileWorker : WorkerThreadBase

    2 {

    3     private static CopyInfo _copyInfo;

    4 

    5     public CopyFileWorker(CopyInfo copyInfo)

    6     {

    7         _copyInfo = copyInfo;

    8     }

    9 

   10     protected override void Work()

   11     {

   12         copyFiles(_copyInfo);

   13     }

   14 

   15     private void copyFiles(CopyInfo copyInfo)

   16     {

   17     //check if the user called Stop

   18     if (StopRequested)

   19     {

   20         Console.WriteLine("User called Stop.");

   21         Console.WriteLine(

   22             "Terminating thread while copying directory '{0}'.",

   23             copyInfo.Source);

   24         return;

   25     }

   26 

   27         if (!Directory.Exists(copyInfo.Destination))

   28         {

   29             Directory.CreateDirectory(copyInfo.Destination);

   30         }

   31 

   32         Console.WriteLine("CopyFiles from '{0}' to '{1}' {2}...",

   33             copyInfo.Source,

   34             copyInfo.Destination,

   35             copyInfo.Recursive ? "recursive" : "non-recursive");

   36 

   37         foreach (string file in

   38             Directory.GetFiles(copyInfo.Source))

   39         {

   40             string destination = Path.Combine(

   41                 copyInfo.Destination,

   42                 Path.GetFileName(file));

   43             File.Copy(file, destination);

   44         }

   45 

   46         if (copyInfo.Recursive)

   47         {

   48             foreach (string directory in

   49                 Directory.GetDirectories(copyInfo.Source))

   50             {

   51                 string destination = Path.Combine(

   52                     copyInfo.Destination,

   53                     Path.GetFileName(directory) //get the directory name for the path

   54                     );

   55 

   56                 copyFiles(

   57                     new CopyInfo(

   58                         directory,

   59                         destination,

   60                         copyInfo.Recursive));

   61 

   62             }

   63         }

   64 

   65         Console.WriteLine("CopyFiles finished.");

   66     }

   67 

   68 }

To use it in your Main method:

    1 private static CopyInfo _copyInfo;

    2 static void Main(string[] args)

    3 {

    4     _copyInfo = new CopyInfo(

    5         Environment.GetFolderPath(Environment.SpecialFolder.MyPictures),

    6         @"c:\pictures",

    7         true);

    8 

    9     CopyFileWorker copyFileWorker = new CopyFileWorker(_copyInfo);

   10     copyFileWorker.Start();

   11 

   12     //do some stuff in the calling method

   13 

   14     //wait for the copy worker to stop

   15     copyFileWorker.Stop();   

   16 }


You might just be asking what If I want to execute 2 or more parallel operations and wait for them to complete before I can proceed. Well youre in luck. There’s a static WaitAll method on the WorkerThreadBase class that you can use to wait for as many threads as you like. Below is a sample of how to use this.

    1 private static CopyInfo _copyInfo;

    2 static void Main(string[] args)

    3 {

    4     _copyInfo = new CopyInfo(

    5         Environment.GetFolderPath(Environment.SpecialFolder.MyPictures),

    6         @"c:\pictures",

    7         true);

    8 

    9     //start dummy worker

   10     DummyWorker dummyWorker = new DummyWorker();

   11     dummyWorker.Start();           

   12 

   13     //start copy worker

   14     CopyFileWorker copyFileWorker = new CopyFileWorker(_copyInfo);

   15     copyFileWorker.Start();

   16 

   17     //wait for the two threads to finish

   18     WorkerThreadBase.WaitAll(copyFileWorker, dummyWorker);

   19 }

Some Final Notes

The Thread-Per-Message worker thread can be used to easily perform operations in parallel. However, you must be be aware about the overhead of thread creation and clean-up. As such, ensure that the operations you encapsulate in this pattern is limited only to the long running processes.

Source code for the sample provided in this post are available here.

Comments

# modchip said:

Nice job!

Friday, December 19, 2008 6:20 PM
# cruizer said:

wow! :D please post more :)

Friday, December 19, 2008 10:36 PM
# jakelite said:

hey guys. thanks. glad you find it interesting. :D

Saturday, December 20, 2008 6:50 AM
# jakelite said:

This is the second installment to my series of posts about threading patterns . In the previous article

Monday, January 12, 2009 1:07 AM
# lamia said:

Hi! Just want to let you know that I will be seriously reviewing multi-threading concepts and application for Java and .Net. I'll be using your series of article starting with this one, starting tonight. :)

Monday, March 23, 2009 1:47 AM
# jakelite said:

nice! please let us know how it plays out. concurrency is quite unavoidable this days.

Monday, March 23, 2009 9:49 AM