Threading Patterns: Producer-Consumer Pattern Examples

This is the third installment to my series of posts about threading patterns.

Today we will look into familiar scenarios where we can apply the producer-consumer pattern   particularly applications of the ThreadedQueueBase<T> class.

Usage Pattern

Note that the threaded queue classes that I provided in the last article allows you to have an in memory queue of work items. Because of this nature the normal usage pattern is to start the thread during application start up. And close it during application shutdown. It also contains two Stop methods. The signatures are as follows:

    1 public new void Stop();

    2 public void Stop(PendingItemAction pendingItemAction);

The parameter-less Stop method is just a wrapper to a call to the second Stop method passing the parameter PendingItemAction.ProcessPendingItems

The second Stop method accepts a parameter of type PendingItemAction. The definition for this enumeration is found below. Calling this method with the PendingItemAction.ProcessPendingItems as parameter will cause the queue to process all pending items before exiting its thread-proc. While passing the parameter PendingItemAction.AbortPendingItems will cause the thread to exit immediatel

    1 public enum PendingItemAction

    2     {

    3         ProcessPendingItems,

    4         AbortPendingItems

    5     }

File Copying

We will now be revisiting the file copying mechanism that we have been working on for some time now. We will be modifying it in order to be able to use the ThreadedQueueBase<T>  class.

    1 public class CopyFileQueue : ThreadedQueueBase<CopyInfo>

    2 {

    3     Logger _logger;

    4     public CopyFileQueue(Logger logger)

    5     {

    6         _logger = logger;

    7     }

    8 

    9     protected override void ProcessItem(CopyInfo item)

   10     {

   11         //check if the user called Stop

   12         if (StopRequested)

   13         {

   14             _logger.Log(LogLevel.Information, "User called Stop.");

   15             _logger.Log(LogLevel.Information,

   16                 "Terminating thread while copying directory '{0}'.",

   17                 item.Source);

   18             return;

   19         }

   20 

   21         _logger.Log(LogLevel.Debug,

   22             "Checking if '{0}' exists...", item.Destination);               

   23         if (!Directory.Exists(item.Destination))

   24         {

   25             _logger.Log(LogLevel.Debug,

   26                 "Creating '{0}'.", item.Destination);               

   27             Directory.CreateDirectory(item.Destination);

   28         }

   29 

   30         _logger.Log(LogLevel.Information,

   31             "CopyFiles from '{0}' to '{1}' {2}...",

   32             item.Source,

   33             item.Destination,

   34             item.Recursive ? "recursive" : "non-recursive");

   35 

   36         foreach (string file in

   37             Directory.GetFiles(item.Source))

   38         {

   39             string destination = Path.Combine(

   40                 item.Destination,

   41                 Path.GetFileName(file));

   42             _logger.Log(LogLevel.Information,

   43                 "Copying file from '{0}' to '{1}'...",

   44                 file,

   45                 destination);

   46 

   47             _logger.Log(LogLevel.Debug,

   48                 "Copying file from '{0}' to '{1}'...",

   49                 file,

   50                 destination);                           

   51             File.Copy(file, destination, item.Overwrite);

   52         }

   53 

   54         if (item.Recursive)

   55         {

   56             foreach (string directory in

   57                 Directory.GetDirectories(item.Source))

   58             {

   59                 string destination = Path.Combine(

   60                     item.Destination,

   61                     Path.GetFileName(directory) //get the directory

   62                     );

   63 

   64                 _logger.Log(LogLevel.Debug,

   65                     "Copying directory from '{0}' to '{1}'...",

   66                     directory,

   67                     destination);

   68                 EnqueueItem(

   69                     new CopyInfo(

   70                         directory,

   71                         destination,

   72                         item.Recursive,

   73                         item.Overwrite));

   74 

   75             }

   76         }           

   77     }

   78 }

As it turns out, not a lot is needed to be changed in the file copying method in order to take advantage of the new class. But this approach now en-queues new CopyInfos to be copied instead of actually recursing the directory structure. This kind of coding is an example of a non-recursive tree traversal.

Logging

The queuing mechanism introduced in the producer-consumer article can be used to create a Logger. The logger will asynchronously accept inputs. And depending on the configuration the log processor thread can write the entries to a backing store like a text/xml file or database.

LogLevel

Each log entry should specify a log level. This will indicate whether or not the log entry is created for Information, Warning, Debugging or Error Notification purposes.

    1 [Flags]

    2 public enum LogLevel

    3 {

    4     Information = 1,

    5     Debug = 2,

    6     Warning = 4,

    7     Error = 8,

    8     LogAll = Information | Debug | Warning | Error

    9 }

LogEntry

Each log entry will contain a log level, the date of creation, the log message. This class is shown below.

    1 public class LogEntry

    2 {

    3     private DateTime _dateCreated;

    4     private string _message;

    5     private LogLevel _logLevel;

    6 

    7     public LogEntry(LogLevel logLevel,       

    8         string message,

    9         params object[] args)

   10         : this(logLevel, message)

   11     {

   12         _message = string.Format(message, args);

   13     }

   14 

   15     public LogEntry(LogLevel logLevel,           

   16         string message)

   17     {

   18         _dateCreated = DateTime.Now;

   19         _message = message;

   20         _logLevel = logLevel;           

   21     }

   22 

   23     public DateTime DateCreated

   24     {

   25         get { return _dateCreated; }       

   26     }

   27 

   28     public string Message

   29     {

   30         get { return _message; }       

   31     }

   32 

   33     public LogLevel LogLevel

   34     {

   35         get { return _logLevel; }       

   36     }

   37 }

Logger

I will not bore you with details of file IO. If you need to have a look the sourcecode is available. The file logging mechanism featured in the sample code contains features such as rolling file mechanism, deleting files which are greater than a certain number of days old and configuring setup items like Log path, Filename, and date format to use.

Final Notes

The examples provided here just scratches the surface on what you can do with the ThreadQueueBase<T> class. I have packaged the codes in my last articles into a single VS2005 solution. Please feel free to incorporate my threading base classes in your own projects and send me an email about the interesting stuff that you create from it.

If you have any inquiries regarding the use of the threading base classes just let me know and I will be happy to answer them.  

Code samples can be found here.

Cheers!