Threading Patterns: Producer-Consumer Pattern Examples
This is the third installment to my series of posts about threading patterns.
Today we will look into familiar scenarios where we can apply the producer-consumer pattern particularly applications of the ThreadedQueueBase<T> class.
Usage Pattern
Note that the threaded queue classes that I provided in the last article
allows you to have an in memory queue of work items. Because of this
nature the normal usage pattern is to start the thread during
application start up. And close it during application shutdown. It also
contains two Stop methods. The signatures are as follows:
1 public new void Stop();
2 public void Stop(PendingItemAction pendingItemAction);
The parameter-less Stop method is just a wrapper to a call to the second Stop method passing the parameter PendingItemAction.ProcessPendingItems
The second Stop method accepts a parameter of type PendingItemAction.
The definition for this enumeration is found below. Calling this method
with the PendingItemAction.ProcessPendingItems as parameter will cause
the queue to process all pending items before exiting its thread-proc.
While passing the parameter PendingItemAction.AbortPendingItems will
cause the thread to exit immediatel
1 public enum PendingItemAction
2 {
3 ProcessPendingItems,
4 AbortPendingItems
5 }
File Copying
We will now be revisiting the file copying mechanism that we have
been working on for some time now. We will be modifying it in order to
be able to use the ThreadedQueueBase<T> class.
1 public class CopyFileQueue : ThreadedQueueBase<CopyInfo>
2 {
3 Logger _logger;
4 public CopyFileQueue(Logger logger)
5 {
6 _logger = logger;
7 }
8
9 protected override void ProcessItem(CopyInfo item)
10 {
11 //check if the user called Stop
12 if (StopRequested)
13 {
14 _logger.Log(LogLevel.Information, "User called Stop.");
15 _logger.Log(LogLevel.Information,
16 "Terminating thread while copying directory '{0}'.",
17 item.Source);
18 return;
19 }
20
21 _logger.Log(LogLevel.Debug,
22 "Checking if '{0}' exists...", item.Destination);
23 if (!Directory.Exists(item.Destination))
24 {
25 _logger.Log(LogLevel.Debug,
26 "Creating '{0}'.", item.Destination);
27 Directory.CreateDirectory(item.Destination);
28 }
29
30 _logger.Log(LogLevel.Information,
31 "CopyFiles from '{0}' to '{1}' {2}...",
32 item.Source,
33 item.Destination,
34 item.Recursive ? "recursive" : "non-recursive");
35
36 foreach (string file in
37 Directory.GetFiles(item.Source))
38 {
39 string destination = Path.Combine(
40 item.Destination,
41 Path.GetFileName(file));
42 _logger.Log(LogLevel.Information,
43 "Copying file from '{0}' to '{1}'...",
44 file,
45 destination);
46
47 _logger.Log(LogLevel.Debug,
48 "Copying file from '{0}' to '{1}'...",
49 file,
50 destination);
51 File.Copy(file, destination, item.Overwrite);
52 }
53
54 if (item.Recursive)
55 {
56 foreach (string directory in
57 Directory.GetDirectories(item.Source))
58 {
59 string destination = Path.Combine(
60 item.Destination,
61 Path.GetFileName(directory) //get the directory
62 );
63
64 _logger.Log(LogLevel.Debug,
65 "Copying directory from '{0}' to '{1}'...",
66 directory,
67 destination);
68 EnqueueItem(
69 new CopyInfo(
70 directory,
71 destination,
72 item.Recursive,
73 item.Overwrite));
74
75 }
76 }
77 }
78 }
As it turns out, not a lot is needed to be changed in the file
copying method in order to take advantage of the new class. But this
approach now en-queues new CopyInfos to be copied instead of actually
recursing the directory structure. This kind of coding is an example of
a non-recursive tree traversal.
Logging
The queuing mechanism introduced in the producer-consumer
article can be used to create a Logger. The logger will asynchronously
accept inputs. And depending on the configuration the log processor
thread can write the entries to a backing store like a text/xml file or
database.
LogLevel
Each log entry should specify a log level. This will indicate
whether or not the log entry is created for Information, Warning,
Debugging or Error Notification purposes.
1 [Flags]
2 public enum LogLevel
3 {
4 Information = 1,
5 Debug = 2,
6 Warning = 4,
7 Error = 8,
8 LogAll = Information | Debug | Warning | Error
9 }
LogEntry
Each log entry will contain a log level, the date of creation, the log message. This class is shown below.
1 public class LogEntry
2 {
3 private DateTime _dateCreated;
4 private string _message;
5 private LogLevel _logLevel;
6
7 public LogEntry(LogLevel logLevel,
8 string message,
9 params object[] args)
10 : this(logLevel, message)
11 {
12 _message = string.Format(message, args);
13 }
14
15 public LogEntry(LogLevel logLevel,
16 string message)
17 {
18 _dateCreated = DateTime.Now;
19 _message = message;
20 _logLevel = logLevel;
21 }
22
23 public DateTime DateCreated
24 {
25 get { return _dateCreated; }
26 }
27
28 public string Message
29 {
30 get { return _message; }
31 }
32
33 public LogLevel LogLevel
34 {
35 get { return _logLevel; }
36 }
37 }
Logger
I will not bore you with details of file IO. If you need to have a look the sourcecode
is available. The file logging mechanism featured in the sample code
contains features such as rolling file mechanism, deleting files which
are greater than a certain number of days old and configuring setup
items like Log path, Filename, and date format to use.
Final Notes
The examples provided here just scratches the surface on what you can do with the ThreadQueueBase<T> class. I have packaged the codes in my last articles into a single VS2005 solution. Please feel free to incorporate my threading base classes in your own projects and send me an email about the interesting stuff that you create from it.
If you have any inquiries regarding the use of the threading base classes just let me know and I will be happy to answer them.
Code samples can be found here.
Cheers!