Job batch controller

I would like to share an small pattern a came up some time ago which could be handy when throwing a finite number of asynchronous jobs and wait for all of them to finish.

Job batch controller

The basic idea is to execute a finite numbers of parallel jobs using a thread pool and let each one notify its parent controller after the job has been finished successfully or aborted. The controller will be awaken when a new notification arrives, collect it, do something with the data and go back to sleep until all jobs have been executed or some condition is met. After all jobs have been finished a new bunch of jobs can be executed again.

Sompe implementation in C# follows:

BatchController.cs:


using System;
using System.Collections;
using System.Collections.Generic;
using System.Threading;

namespace BatchController
{
	
public interface IThreadJob
{
	void Execute();
	void DoSomething();
	
	void SetParams(IDictionary parameters, 
	               Queue jobs_queue);
	void SetEventHandlers(ref EventWaitHandle waitHandle, 
	                      ref EventWaitHandle toSignalHandle);
}

public class DummyJob : IThreadJob
{
	EventWaitHandle _waitHandle, _toSignalHandle;
	Guid _guid;
	Queue _jobsQueue;
	IDictionary _parameters; 
		
	public void Execute(){
		// Do you stuff here
		// ...
		int wait = 400;
		System.Console.Write("Job " + _guid.ToString() + " is waiting " 
		                     + wait.ToString() + " millisecs.\n");
		
		Thread.Sleep(wait);
		System.Console.Write("Job " + _guid.ToString() 
		                     + " executed.\n");
		
		// wait for the queue lock
		_waitHandle.WaitOne();
		// Add ourselves to the dosomething queue
		_jobsQueue.Enqueue(this);
		// Let the controller know we are done
		_toSignalHandle.Set();
	}
	
	public void DoSomething(){
		Thread.Sleep(10);
		System.Console.Write("Job " + _guid.ToString() 
		                     + " has done something.\n");
	}
	
	public void SetParams(IDictionary parameters, 
	                      Queue jobs_queue){
		_guid = Guid.NewGuid();
		_jobsQueue = jobs_queue;
		_parameters = parameters;
	}
	
	public void SetEventHandlers(ref EventWaitHandle waitHandle, 
	                             ref EventWaitHandle toSignalHandle)
	{
		_waitHandle = waitHandle;
		_toSignalHandle = toSignalHandle;
	}
}

public class Controller
{
	Queue jobsQueue;
	ArrayList jobsList;
	EventWaitHandle jobNotificationEvent;
	EventWaitHandle readyToDoSomethingEvent;
	
	public Controller()
	{
		jobsQueue = new Queue();
		jobNotificationEvent = 
			new EventWaitHandle(false, EventResetMode.AutoReset);
		readyToDoSomethingEvent = 
			new EventWaitHandle(false, EventResetMode.AutoReset);
	}
	
	public void SetJobList(ArrayList jobs)
	{
		jobsList = jobs;
	}
	
	public void ExecuteJobs()
	{
		foreach(IThreadJob job in jobsList)
		{
			job.SetParams(null, jobsQueue);
			job.SetEventHandlers(ref readyToDoSomethingEvent, 
			                     ref jobNotificationEvent);
			Thread.Sleep(200); // Do not launch to many jobs at once
			System.Threading.ThreadPool.QueueUserWorkItem(Callback_DoJob, 
			                                              job);
		}
	}
	
	public void DoSomethingLater(){
		int num = 0;
		while(num < jobsList.Count){
			// Let know the jobs we are idle
			readyToDoSomethingEvent.Set();
			// Go to sleep while waiting
			jobNotificationEvent.WaitOne();
			// We got something to do
			IThreadJob job = jobsQueue.Dequeue();
			job.DoSomething();
			
			num++;
		}
		// All jobs are done
	}
	
	static  void Callback_DoJob(object job)
    {
		// Unboxing performance penalty
		IThreadJob thread_job = (IThreadJob)job;
		thread_job.Execute();
    }
	
}
}

Main.cs:

using System;
using System.Collections;

namespace BatchController
{
class MainClass
{
	public static void Main(string[] args)
	{
		Console.WriteLine("***Starting demo...\n\n");
		
		Controller controller = new Controller();
		ArrayList arrayList = new ArrayList();
		
		for(int i=0; i < 10; i++){
			arrayList.Add(new DummyJob());
		}
		
		controller.SetJobList(arrayList);
		controller.ExecuteJobs();
		// Do some stuff here with this thread
		controller.DoSomethingLater();
		
		Console.WriteLine("\n***Demo finished.");
	}
}
}
Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s