October 20, 2011

Java and the Intrinsic Lock

I was working on converting a Java application to a multi-threaded application today and wanted to use the simplest means possible. This involves the following steps:
  1. Make the class implement Runnable or descend from the Thread class
  2. Put the previously sequential code into threads
  3. Use the "synchronized" keyword on methods that should be critical sections
  4. Join the threads before working on the results
Now, there are variations on this theme. Because each object in Java has an intrinsic lock, you can use any old Object as a mutex by calling synchronized(object){} around the critical section you want to protect.
so this:
private synchronized foo(){
  // critical section stuff
}
is the same as this:
private foo(){
  synchronized(this){
    // critical section stuff
  }
}

And when trying to synchronize multiple threads, both are wrong!!

The problem is that each object has an intrinsic lock, not each class. This is easiest to see in the second case. Because each thread is a different object, or the different thread objects contain their own instance of our Runnable class, every single lock is different. The first case has the same problem, not because the method is different in each case, but because the method is logically different.

So, what should we do?
In each case, we need to make the intrinsic lock static (or global to all instances of the class). In the first case, this can be accomplished by making the method static:
static synchronized foo(){
  // critical section stuff
}
in the second  case, we need some other object that is static to use as a lock.
static object myLock = new Object();
private foo(){
  synchronize(myLock){
    // critical section stuff
  }
}
Why would we use the second method? Probably because we want more fine grained protection over the locking than at the procedure level, or because we are referencing properties that should not be made static inside the function.

October 4, 2011

Threaded Search

I spent a little bit of time threadifying the classic mergesort. This sort works by using recursively dividing the array to be sorted down to some depth, and then eventually merging the sorted members of the array. This sort also has the desirable characteristic that it sorts in-place. There is no additional memory location required. This is a *good thing* when you have hundreds of thousands of elements in your array. In this particular implementation, an insertion sort is used when the recursive division has less than 100 elements. Multi-threading *could* occur at the first n levels of bifurcation, at the bottom level only, or at some intermediary number of levels. Limiting the number of threads and making them a small multiple of the number of CPU cores causes the threading overhead and context switching to be limited as well. The optimal multiplier appears to be quite low, only about 2 threads per CPU core. For these reasons, I choose to thread only at one depth level. Let's look at the code:
#include 
#include 
#include 
#include 
#include 
#include "mergesort.h"

#define TRUE 1
#define FALSE 0

#define THREADED

// function prototypes
void merge(int A[], int p, int q, int r);
void insertion_sort(int A[], int p, int r);

const int INSERTION_SORT_CUTOFF = 100; //based on trial and error
const int THREAD_DEPTH = 1; //2^(n+1) threads created at this level

struct sortstruct {
	int p;
	int r;
	int d;	// depth
	int *A;
};

/*
 * insertion_sort(int A[], int p, int r):
 *
 * description: Sort the section of the array A[p..r].
 */
void insertion_sort(int A[], int p, int r)
{
	int j;

	for (j=p+1; j<=r; j++) {
		int key = A[j];
		int i = j-1;
		while ((i > p-1) && (A[i] > key)) {	
			A[i+1] = A[i];
			i--;
		}
		A[i+1] = key;
	}
}

void * mergesort_function(void* arg){
	struct sortstruct * s;
	s = (struct sortstruct *) arg;
	printf("In mergesort thread (%d, %d) \n", s->p, s->r);
	mergesort(s->A, s->p, s->r, s->d);
	return NULL;
}

/*
 * mergesort(int A[], int p, int r):
 *
 * description: Sort the section of the array A[p..r].
 * track depth d
 */
void mergesort(int A[], int p, int r, int d)
{
	if (r-p+1 <= INSERTION_SORT_CUTOFF)  {
		insertion_sort(A,p,r);
	} else {
		int q = (p+r)/2;

#ifdef THREADED
		if(d <= THREAD_DEPTH){
			int status;
			pthread_t thread1, thread2;
			struct sortstruct *s1, *s2;

			s1 =(struct sortstruct *) malloc(sizeof(struct sortstruct));
			s1->A=A; s1->p=p; s1->r=q; s1->d=d+1;
			status = pthread_create(&thread1, NULL, mergesort_function, (void*) s1);
			if(status) exit(status);

			s2 = (struct sortstruct*)malloc(sizeof(struct sortstruct));
			s2->A=A; s2->p=q+1; s2->r=r; s2->d=d+1;
			status = pthread_create(&thread2, NULL, mergesort_function, (void*) s2);
			if(status) exit(status);

			pthread_join(thread1, NULL);
			pthread_join(thread2, NULL);

			free(s1);
			free(s2);
		}else{
#endif
			mergesort(A, p, q, d+1);
			mergesort(A, q+1, r, d+1);
#ifdef THREADED
		}
#endif
		merge(A, p, q, r);
	}
}

/*
 * merge(int A[], int p, int q, int r):
 *
 * description: Merge two sorted sequences A[p..q] and A[q+1..r] 
 *              and place merged output back in array A. Uses extra
 *              space proportional to A[p..r].
 */     
void merge(int A[], int p, int q, int r) 
{
	int *B = (int *) malloc(sizeof(int) * (r-p+1));

	int i = p;
	int j = q+1;
	int k = 0;
	int l;

	// as long as both lists have unexamined elements
	// this loop keeps executing.
	while ((i <= q) && (j <= r)) {
		if (A[i] < A[j]) {
			B[k] = A[i];
			i++;
		} else {
			B[k] = A[j];
			j++;
		}
		k++;
	}

	// now only at most one list has unprocessed elements.

	if (i <= q) { 
		// copy remaining elements from the first list
		for (l=i; l<=q; l++) {
			B[k] = A[l];
			k++;
		}
	} else {
		// copy remaining elements from the second list
		for (l=j; l<=r; l++) {
			B[k] = A[l];
			k++;
		}
	}

	// copy merged output from array B back to array A
	k=0;
	for (l=p; l<=r; l++) {
		A[l] = B[k];
		k++;
	}

	free(B);
}
,
We changed the signature of the Mergesort function to include a parameter "d", for depth. This requires the initial call to pass the number 0 to prime the recursive function.There is a wrapper function (mergesort_function) that is the threaded function call. We used this call notation
nice -n -20 ./mergesort 10000000 2
Here are the observed results:
Number of ElementsUnthreaded Time (s)Threaded Time
10000004.3300004.040000
10000004.2300004.150000
10000004.1500004.040000
10000004.2200004.060000
average4.2325004.072500
There appears to be a slight advantage on my 2 CPU virtual machine when using the multi-threaded application. However, it does not approach anything like the multiplier that we would expect for an application that is using two cpus instead of one. This is probably due to the way that vmWare is managing the CPUs. I'd be interested to see how it works in a native machine.