Threading Patterns: The Producer-Consumer Pattern

This is the second installment to my series of posts about threading patterns.

In the previous article, we looked into the worker thread pattern. It models the concept of thread-per-message by creating one thread per instruction/message pair. This approach is effective in achieving some level of parallelism, but suffers the overhead of thread creation and clean-up each time a worker is spawned.


Today we will look closely on the producer-consumer pattern using .Net threading. This is done by having a producer identify the data to be processed and adding it to a to-do list. The consumer on the other hand looks into the to-do list, takes an item from it and performs the instruction on that item. Below is an image showing this relationship.

A consumer can also act as producer when its output is stored into another consumers’ to-do list. This execution will result in pipelined execution. It also models the concept of thread-per-instruction. Thread-per-instruction decouples the data to be processed from the actual execution of instructions performed on the data.

For performance reasons, one can also create multiple producers/consumers each accessing a shared queue of work items. This approach can only do so much to increase performance. Because like the worker thread approach, it suffers from the overhead of thread creation and clean-up. Also, the shared queue becomes a point of contention. You might end up with a bunch of threads who most of the time is sitting idle waiting for the queue synchronize.

My advice is tune the number of threads every time you try this approach. And know when you are already getting diminishing returns.

Some Hand-Waving

I do not condone doing the third approach simply because I believe it is not the right tool for the job. Hence, I will introduce you to the ThreadedQueueBase<T> class. This class ties the concept of the queue to the consumer thread. Because there will be one consumer per queue, this will allow us to write programs that run like images 1 and 2 and the multiple producer version of 3.

The ThreadedQueueBase<T> Class

Our friends in the Java world have the BlockingQueue. Too bad for us .Net doesn’t have a built-in one. In this section, we will extend the WorkerThreadBase from the previous article and create a class that contains an AutoResetEvent named _itemArrived and a Queue<T> named _itemQueue.

    1 private AutoResetEvent _itemArrived;

    2 private Queue<T> _itemQueue;

When the Work method (the thread method) starts, we will do the following:

   1. wait for the signal that an item has arrived
   2. continuously de-queue  and process the items while it is not empty
   3. if the Stop or Dispose method has been called, exit the thread else go to step 1.

    1 protected override void Work()

    2 {

    3     do

    4     {

    5         _itemArrived.WaitOne();

    6         do

    7         {

    8             if (_itemQueue.Count > 0)

    9             {

   10                 lock (((System.Collections.ICollection)

   11                     _itemQueue).SyncRoot)

   12                 {

   13                     T item = _itemQueue.Dequeue();

   14                     if (item != null)

   15                     {

   16                         ProcessItem(item);

   17                     }

   18                 }

   19             }

   20         } while (_itemQueue.Count > 0);

   21         Thread.Sleep(0);

   22     } while (!Disposing && !StopRequested);

   23 }

We will also expose a public method called EnqueueItem which will accept the item, add it to the queue and send the signal to the Work method that an item has arrived.

    1 public void EnqueueItem(T item)

    2 {

    3     ThrowIfDisposedOrDisposing();

    4     _itemQueue.Enqueue(item);

    5     _itemArrived.Set();

    6 }

And lastly, we will create an abstract method called ProcessItem. This method is called by the Work method passing one item from the queue. This method should contain the instructions for processing one item.

    1 protected abstract void ProcessItem(T item);

Final Words

Today, we learned more about the ThreadedQueue class. And howit can perform just like the BlockingQueue except that it does not block the incoming items whenever the queue is full.

Next time, we will look into applying the ThreadedQueue class into familiar scenarios like logging or file copying.

Examples are located here. And source codes are located here.


12-January-2009: Reposted! Changed the title to "Producer-Consumer".  Sorry about that. =D

10-February-2009: Added links to the samples and sources


# jakelite said:

This is the third installment to my series of posts about threading patterns. Today we will look into

Tuesday, February 03, 2009 5:53 AM