CodeBetter.Com
CodeBetter.Com
RSS 2.0 via Feedburner
           Do you Twitter? Follow us @CodeBetter

Jean-Paul S. Boodhoo

Develop With Passion

Multithreading Question - One Solution

This morning had a good question asked:

Q: Basically we're writing the billing processing part of our business
application.  On the 1st and 15th of the month, we bill all our
policy holders and it's usually 10,000 or so transactions that need to
be run.  The processor gateway runs as a webservice.  Now since these
10,000 don't rely on each other at all, I figured to speed it up I
could run 10-20 at a time in a job.
 
So I wrote this simple class so far, but I'm not sure if it's even
close, it just takes a processing date, loads all the queued
transactions, and sends them into the authorize and payment manager.
It seems that async threads you have to delegate to a void
parameterless method, so I built a locked incrementer for the index on
the generic list of transactions and try to process them like that.
It's the while loop stuff that is lame, I'd really like it to just
spawn X number of threads where X is configurable.  Maybe I just dont'
get it, here's the code though: 

public class Billing { public Billing(DateTime ProcessDate) { _ProcessDate = ProcessDate; PopulateTransactions(); ExecuteBilling(); } private Int32 _TransactionIndex = 0; private List<Transaction _TransactionsToBill; private DateTime _ProcessDate; private void PopulateTransactions() { _TransactionsToBill = TransactionManager.GetQueuedTransactions(_ProcessDate); } private void ExecuteBilling() { do { Thread t1 = new Thread(new ThreadStart(ProcessTransaction)); t1.Start(); Thread t2 = new Thread(new ThreadStart(ProcessTransaction)); t2.Start(); Thread t3 = new Thread(new ThreadStart(ProcessTransaction)); t3.Start(); Thread t4 = new Thread(new ThreadStart(ProcessTransaction)); t4.Start(); Thread t5 = new Thread(new ThreadStart(ProcessTransaction)); t5.Start(); } while (_TransactionIndex < _TransactionsToBill.Count); } private void ProcessTransaction() { string Message; Transaction transaction = GetNextTransaction(); if (transaction != null) CyberSourceManager.AuthorizeAndCapture(transaction.InvoiceID,transaction.ID, out Message); } private Transaction GetNextTransaction() { Interlocked.Increment(ref _TransactionIndex); if (_TransactionsToBill.Count _TransactionIndex) { return _TransactionsToBill[_TransactionIndex - 1]; } return null; } }

A: The main problem that you are concerned about is : “I'd really like it to just spawn X number of threads where X is configurable”. The following code is another alternative implemented using the Monitor class to create a producer/consumer queue:

public class Billing { private Queue<Transaction> transactionQueue; private IList<Thread> workerThreads; private object mutex; public Billing(int numberOfWorkerThreadsToUse,IEnumerable<Transaction> itemsToProcess) { mutex = new object(); workerThreads = new List<Thread>(); transactionQueue = new Queue<Transaction>(); InitializeConsumers(numberOfWorkerThreadsToUse); QueueUp(itemsToProcess); QueueEmptyTransactionsForEachActiveThread(); } private void QueueUp(IEnumerable<Transaction> toProcess) { foreach (Transaction transaction in toProcess) { QueueForProcessing(transaction); } } private void QueueEmptyTransactionsForEachActiveThread() { foreach (Thread wokerThread in workerThreads) { QueueForProcessing(null); } } private void QueueForProcessing(Transaction transaction) { lock (mutex) { transactionQueue.Enqueue(transaction); Monitor.PulseAll(mutex); } } private void InitializeConsumers(int numberToInitialize) { for (int i = 0; i < numberToInitialize; i++) { Thread thread = new Thread(ProcessTransactions); workerThreads.Add(thread); thread.Start(); } } private void ProcessTransactions() { while (true) { Transaction transaction = null; lock (mutex) { while (transactionQueue.Count == 0) Monitor.Wait(mutex); transaction = transactionQueue.Dequeue(); } if (transaction == null) return; Process(transaction); } } private void Process(Transaction transaction) { //Do you work here } }

The advantage of this code over the prior code is it mitigates unecessary allocation of an unknown number of threads and opts for the creation of an explicit “known” number of worker threads that will process items on the queue. This code leverages the ability to wait on a locked object. When a transaction is Queued up, Monitor.PulseAll is invoked (on the mutex) to wake up all threads that may already be waiting on that mutex,based on which thread is currently highest in the lock queue, it will be able to Dequeue a single transaction from the “transaction” queue and process it.

The constructor for the Billing class allows you to specify how many worker threads should be created. You should notice, that the Consumer threads can start immediately processing Transactions the second a Transaction is queued for processing. This has the benefit of not needing to have the queue populated in its entirety before processing. As new items are added to the queue, a worker thread can pick it up and process it. Once all of the real transactions have been added to the queue for processing, a “null” transaction is placed onto the queue for each worker thread that was created. This ensures that each thread will actually terminate.

Notice how in the ProcessTransactions method, it does not stop processing if the number of items in the queue == 0. This is because (again, multi threaded coding is hard) since items are being added to the queue from a separate thread, and items are being processed by 1 of the many worker threads, the client thread may not have had the opportunity to place items on the queue before the (transactionQueue.Count == 0) condition is evaluated. If I used the count of the items in the queue as my termination condition, the worker thread would most likely terminate too early. Hence the need for the null transaction that each thread will have to handle. Once the worker dequeues a null transaction, it knows that it is time for it to finish.

I’m not saying this is the only way to solve this problem, I just wanted to demonstrate how you could share the load of the processing between a finite amount of worker threads, while utilizing the Monitor class to perform fine grained synchronization.

The usage for this class becomes very simple:

new Billing(5,transactionService.GetTransactionsToProcess());

 

 


Published Apr 17 2007, 01:35 PM by bitwisejp
Filed under: , ,

Comments

Ayende Rahien said:

Or you could give up manual threading (recommended) and just use the ThreadPool to do it:

for(int i=0;i<10000;i++)

  ThreadPool.QueueUserWorkItem(ProcessTransaction);

And then use a CountdownLatch lock to wait for them to finish

# April 17, 2007 4:27 PM

bitwisejp said:

Hey Ayende,

I was just waiting for someone to post that!! I purposefully chose not to go with ThreadPool, so I could demonstrate explicit fine grained locking semantics.

As a side note,once you have Queued up a work item on the ThreadPool, there is no nice easy way to dequeue that item in case you no longer want to execute it.

Thanks

# April 17, 2007 4:43 PM

shanecourtrille said:

As a completely unrelated comment.. It would actually make the code a lot easier to read (and therefore understand) if the methods were in the same order they are called.

Beyond that little nitpick it's cool to see this level of detail.  Though I think Ayende has it right for a large percentage of the situations out there.

# April 17, 2007 6:09 PM

fatal said:

Another option would be to use the SmartThreadPool from codeproject - http://www.codeproject.com/cs/threads/smartthreadpool.asp . This provides (among many other benefits over ThreadPool) dequeuing of queued workitems if they have not yet commenced execution.

# April 17, 2007 6:54 PM

Ian Cooper said:

The other  threadpool issue is that it does not let me throttle the number of threads that I am using on what is a shared resource. This can lead to thread exhaustion deadlock if we run out of worker threads. This can be mitigated by not peforming actions on our thread which in turn spawn new threads. See the section on deadlocks in the following article: http://msdn2.microsoft.com/en-us/library/ms973903.aspx

I tend to favour wanting to configure the number of threads used by work processors over jumping into the pool. Jean-Paul's monitor based solution seems fairly elegant.

# April 18, 2007 3:18 AM

karl said:

"I figured to speed it up I could run 10-20 at a time in a job"

So is it actually faster and worth the increased complexity? I don't see anything that could really block, unless CyberSourceManager.AuthorizeAndCapture is really slow. There's also some hardware questions to take into account.

# April 18, 2007 7:01 AM

jfcantin said:

It is nice to have that threadPool discussion, but why is all the Queuing and Thread management in Billing? Billing should probably do the billing only and not manage the queue and threads. I like Ayende's recommendation which keeps Billing clean. ThreadPool or an home grown one injected in.

# April 18, 2007 8:09 AM

Garry Shutler said:

Is there a reason for using PulseAll rather than Pulse?

As far as I can work out as you add one work item, there's only a point in waking one waiting thread.

# April 20, 2007 7:38 AM

bitwisejp said:

@Gary,

Good eye. In this scenario there is no need for me to PulseAll. There is no harm in doing it, but it would definitely be more efficient to Pulse as (you said) the condition all of the threads are waiting on is the same and only a single item can be removed from the queue at a time.

If each of the threads were using the same monitor but watching different queues, then that would be a scenario you would want to use PulseAll.

# April 20, 2007 9:53 AM

Leave a Comment

(required)  
(optional)
(required)  

Enter the numbers above:
Add
Check out Devlicio.us!

Our Sponsors

Free Tech Publications