Threading Patterns: The Worker Thread Pattern
As promised, I am beginning this series of posts about threading patterns that I find useful in everyday coding. To start off, I will introduce you guys to the concept of the worker thread pattern.
This article assumes that you know a thing or two about the concept of threading in general.
.Net Threading Basics
Like any discussion about threading and concurrency, I will start off with the obligatory Threading Basics.
For the purpose of illustration we’ll use the scenario of copying files from a given directory. We will be using a class named CopyInfo to encapsulate the details of the copy instruction.
1 public class CopyInfo
2 {
3 private string _source;
4 private bool _recursive;
5 private string _destination;
6
7 public CopyInfo(
8 string source,
9 string destination,
10 bool recursive)
11 {
12 _source = source;
13 _destination = destination;
14 _recursive = recursive;
15 }
16
17 public string Source
18 {
19 get { return _source; }
20 }
21
22 public string Destination
23 {
24 get { return _destination; }
25 }
26
27 public bool Recursive
28 {
29 get { return _recursive; }
30 }
31 }
And a function named CopyFiles that copies files from a source directory to a destination directory.
1 static void CopyFiles(CopyInfo copyInfo)
2 {
3 if (!Directory.Exists(copyInfo.Destination))
4 {
5 Directory.CreateDirectory(copyInfo.Destination);
6 }
7
8 Console.WriteLine("CopyFiles from '{0}' to '{1}' {2}...",
9 copyInfo.Source,
10 copyInfo.Destination,
11 copyInfo.Recursive ? "recursive" : "non-recursive");
12
13 foreach (string file in
14 Directory.GetFiles(copyInfo.Source))
15 {
16 string destination = Path.Combine(
17 copyInfo.Destination,
18 Path.GetFileName(file));
19 File.Copy(file, destination);
20 }
21
22 if (copyInfo.Recursive)
23 {
24 foreach (string directory in
25 Directory.GetDirectories(copyInfo.Source))
26 {
27 string destination = Path.Combine(
28 copyInfo.Destination,
29 Path.GetFileName(directory) //get the directory name for the path
30 );
31
32 CopyFiles(
33 new CopyInfo(
34 directory,
35 destination,
36 copyInfo.Recursive));
37
38 }
39 }
40
41 Console.WriteLine("CopyFiles finished.");
42 }
The main class in .Net threading is the Thread class. In order to use it, you need to have a function which matches the signature of either ThreadStart or ParameterizedThreadStart delegate. For your reference they’re declarations are shown below.
public delegate void ThreadStart();
public delegate void ParameterizedThreadStart(object obj);
Then, you take the name of that function and pass it as a parameter to the constructor of the Thread class. This function can be referred to as the ThreadProc. Once started, the Thread class takes the instructions contained in the ThreadProc and executes it in parallel to the calling function. You can start the thread by calling one of the two overloads of the Start method. If you have a function that matches the signature of the ThreadStart delegate, you can call the parameter-less overload of the Start method. And if your function matches the signature of the ParameterizedThreadStart delegate you can call either of the two overloads of the Start method.
Note that it is an error to pass a parameter to the Start method if your function does not accept a parameter.
With this out of the way, below is an example of starting a Thread using a ThreadStart delegate
1 private static CopyInfo _copyInfo;
2 static void Main(string[] args)
3 {
4 _copyInfo = new CopyInfo(
5 Environment.GetFolderPath(Environment.SpecialFolder.MyPictures),
6 @"c:\pictures",
7 true);
8
9 Thread simpleThread = new Thread(CopyFilesProc);
10 simpleThread.Name = "CopyFiles";
11 simpleThread.Start();
12 simpleThread.Join();
13 }
14
15 static void CopyFilesProc()
16 {
17 CopyFiles(_copyInfo);
18 }
And another example which starts a Thread using a ParameterizedThreadStart delegate
1 private static CopyInfo _copyInfo;
2 static void Main(string[] args)
3 {
4 _copyInfo = new CopyInfo(
5 Environment.GetFolderPath(Environment.SpecialFolder.MyPictures),
6 @"c:\pictures",
7 true);
8
9 Thread parameterizedThread = new Thread(ParameterizedCopyFilesProc);
10 parameterizedThread.Name = "CopyFiles";
11 parameterizedThread.Start(_copyInfo);
12 parameterizedThread.Join();
13 }
14
15 static void ParameterizedCopyFilesProc(object state)
16 {
17 if (state is CopyInfo)
18 {
19 CopyFiles(state as CopyInfo);
20 }
21 }
The Impedance Mismatch
As you can see, the method which gets executed by the thread is defined outside of it. What's up with that? It doesn’t look natural in an OO kind of sense. And it can be easily mixed up with other parts of the application. Imagine looking at code for a Form object sprinkled with threading and synchronization logic. It doesn’t look too pretty huh?
In the next section, we'll refactor the code above and encapsulate the threading and synchronization logic in its own class in order to provide an object-oriented interface to the Thread class.
Worker Threads
The Worker Thread pattern operates on the concept of Thread-Per-Message. That is, a data/operation pair is executed in the context of its own thread. It effectively speeds up the execution of multiple long running processes when compared to the time it will take them to execute sequentially. It can also be used to move some long running processes in a different thread in order to avoid freezing the ui.
Below is the code for the abstract class WorkerThreadBase. This class exposes different protected constructors which advanced users can use to specify the name, priority and background/foreground behavior of threads. It has one abstract method Work that represents the ThreadProc. Subclasses need to implement this method in order to take advantage of per thread execution. It has two public Methods Start and Stop that does exactly what their name suggests. The Start operation is asynchronous. And the Stop operation blocks until the thread completes execution. There is one Boolean protected property called StopRequested which can be polled inside the ThreadProc method to detect if the user wants to stop the thread.
1 public abstract class WorkerThreadBase : IDisposable
2 {
3 private Thread _workerThread;
4 private ManualResetEvent _stopping;
5 private ManualResetEvent _stopped;
6 private bool _disposed;
7 private bool _disposing;
8
9 protected WorkerThreadBase()
10 : this(null, ThreadPriority.Normal)
11 {
12 }
13
14 protected WorkerThreadBase(string name)
15 : this(name, ThreadPriority.Normal)
16 {
17 }
18
19 protected WorkerThreadBase(string name,
20 ThreadPriority priority)
21 : this(name, priority, false)
22 {
23 }
24
25 protected WorkerThreadBase(string name,
26 ThreadPriority priority,
27 bool isBackground)
28 {
29 _disposing = false;
30 _disposed = false;
31 _stopping = new ManualResetEvent(false);
32 _stopped = new ManualResetEvent(false);
33
34 _workerThread = new Thread(threadProc);
35 _workerThread.Name = name == null ? GetType().Name : name;
36 _workerThread.Priority = priority;
37 _workerThread.IsBackground = isBackground;
38 }
39
40 protected bool StopRequested
41 {
42 get { return _stopping.WaitOne(1, true); }
43 }
44
45 public void Start()
46 {
47 ThrowIfDisposedOrDisposing();
48 _workerThread.Start();
49 }
50
51 public void Stop()
52 {
53 ThrowIfDisposedOrDisposing();
54 _stopping.Set();
55 _stopped.WaitOne();
56 }
57
58 #region IDisposable Members
59
60 public void Dispose()
61 {
62 Dispose(true);
63 }
64
65 #endregion
66
67 public static void WaitAll(params WorkerThreadBase[] threads)
68 {
69 WaitHandle.WaitAll(
70 Array.ConvertAll<WorkerThreadBase, WaitHandle>(
71 threads,
72 delegate(WorkerThreadBase workerThread)
73 { return workerThread._stopped; }));
74 }
75
76 public static void WaitAny(params WorkerThreadBase[] threads)
77 {
78 WaitHandle.WaitAny(
79 Array.ConvertAll<WorkerThreadBase, WaitHandle>(
80 threads,
81 delegate(WorkerThreadBase workerThread)
82 { return workerThread._stopped; }));
83 }
84
85 protected virtual void Dispose(bool disposing)
86 {
87 //do nothing if disposed more than once
88 if (_disposed)
89 {
90 return;
91 }
92
93 if (disposing)
94 {
95 _disposing = disposing;
96
97 //stop the thread;
98 Stop();
99
100 //make sure the thread joins the main thread
101 _workerThread.Join(1000);
102
103 //dispose of the waithandles
104 disposeWaitHandle(_stopping);
105 disposeWaitHandle(_stopped);
106
107 _disposing = false;
108 //mark as disposed
109 _disposed = true;
110 }
111 }
112
113 protected void ThrowIfDisposedOrDisposing()
114 {
115 if (_disposing)
116 {
117 throw new InvalidOperationException(
118 Properties.Resources.ERROR_OBJECT_DISPOSING);
119 }
120
121 if (_disposed)
122 {
123 throw new ObjectDisposedException(
124 GetType().Name,
125 Properties.Resources.ERROR_OBJECT_DISPOSED);
126 }
127 }
128
129 protected abstract void Work();
130
131 private void threadProc()
132 {
133 Work();
134 _stopped.Set();
135 }
136
137 private void disposeWaitHandle(WaitHandle waitHandle)
138 {
139 if (waitHandle != null)
140 {
141 waitHandle.Close();
142 waitHandle = null;
143 }
144 }
145 }
For your reference I’ve created a dummy worker thread class to show how to use WorkerThreadBase. Notice how it uses the StopRequested property to check if the user called the Stop method.
1 public class DummyWorker:WorkerThreadBase
2 {
3 protected override void Work()
4 {
5 //a relatively long task
6 System.Threading.Thread.Sleep(10000);
7
8 if (StopRequested)
9 {
10 return;
11 }
12
13 //a relatively long task
14 System.Threading.Thread.Sleep(5000);
15
16 Console.WriteLine("Dummy worker completed.");
17 }
18 }
In order to use it, one has to simply instantiate the class and call the Start/Stop method correspondingly.
DummyWorker dummy = new DummyWorker();
dummy.Start();
//do some stuff in the calling method
//wait for the dummy worker to stop
dummy.Stop();
And finally, lets refactor the copy file operation to create a CopyFileWorker. Note how we used the constructor to pass the data that the thread operates on.
1 public class CopyFileWorker : WorkerThreadBase
2 {
3 private static CopyInfo _copyInfo;
4
5 public CopyFileWorker(CopyInfo copyInfo)
6 {
7 _copyInfo = copyInfo;
8 }
9
10 protected override void Work()
11 {
12 copyFiles(_copyInfo);
13 }
14
15 private void copyFiles(CopyInfo copyInfo)
16 {
17 //check if the user called Stop
18 if (StopRequested)
19 {
20 Console.WriteLine("User called Stop.");
21 Console.WriteLine(
22 "Terminating thread while copying directory '{0}'.",
23 copyInfo.Source);
24 return;
25 }
26
27 if (!Directory.Exists(copyInfo.Destination))
28 {
29 Directory.CreateDirectory(copyInfo.Destination);
30 }
31
32 Console.WriteLine("CopyFiles from '{0}' to '{1}' {2}...",
33 copyInfo.Source,
34 copyInfo.Destination,
35 copyInfo.Recursive ? "recursive" : "non-recursive");
36
37 foreach (string file in
38 Directory.GetFiles(copyInfo.Source))
39 {
40 string destination = Path.Combine(
41 copyInfo.Destination,
42 Path.GetFileName(file));
43 File.Copy(file, destination);
44 }
45
46 if (copyInfo.Recursive)
47 {
48 foreach (string directory in
49 Directory.GetDirectories(copyInfo.Source))
50 {
51 string destination = Path.Combine(
52 copyInfo.Destination,
53 Path.GetFileName(directory) //get the directory name for the path
54 );
55
56 copyFiles(
57 new CopyInfo(
58 directory,
59 destination,
60 copyInfo.Recursive));
61
62 }
63 }
64
65 Console.WriteLine("CopyFiles finished.");
66 }
67
68 }
To use it in your Main method:
1 private static CopyInfo _copyInfo;
2 static void Main(string[] args)
3 {
4 _copyInfo = new CopyInfo(
5 Environment.GetFolderPath(Environment.SpecialFolder.MyPictures),
6 @"c:\pictures",
7 true);
8
9 CopyFileWorker copyFileWorker = new CopyFileWorker(_copyInfo);
10 copyFileWorker.Start();
11
12 //do some stuff in the calling method
13
14 //wait for the copy worker to stop
15 copyFileWorker.Stop();
16 }
You might just be asking what If I want to execute 2 or more parallel operations and wait for them to complete before I can proceed. Well youre in luck. There’s a static WaitAll method on the WorkerThreadBase class that you can use to wait for as many threads as you like. Below is a sample of how to use this.
1 private static CopyInfo _copyInfo;
2 static void Main(string[] args)
3 {
4 _copyInfo = new CopyInfo(
5 Environment.GetFolderPath(Environment.SpecialFolder.MyPictures),
6 @"c:\pictures",
7 true);
8
9 //start dummy worker
10 DummyWorker dummyWorker = new DummyWorker();
11 dummyWorker.Start();
12
13 //start copy worker
14 CopyFileWorker copyFileWorker = new CopyFileWorker(_copyInfo);
15 copyFileWorker.Start();
16
17 //wait for the two threads to finish
18 WorkerThreadBase.WaitAll(copyFileWorker, dummyWorker);
19 }
Some Final Notes
The Thread-Per-Message worker thread can be used to easily perform operations in parallel. However, you must be be aware about the overhead of thread creation and clean-up. As such, ensure that the operations you encapsulate in this pattern is limited only to the long running processes.
Source code for the sample provided in this post are available here.