import java.util.*; import java.util.concurrent.*; import java.util.concurrent.locks.*; import java.util.concurrent.atomic.*; ///-------------------------------------------------------- // Fil: OddEvenPara.java // Implements Sequential and Parallel OddEven // written by: Eric Jul, University of Oslo, 2024 // //-------------------------------------------------------- public class OddEvenPara { CyclicBarrier readyToGo, allDone; Semaphore lockEntryRegion; Semaphore[] sems; Semaphore printLock = new Semaphore(1); AtomicInteger passId; int[] arr; int[] arrPara; int[] arrOrg; int segmentLength, segmentCount; Boolean testOutput = false; OddEvenPara (int max) { } // end constructor OddEvenPara public void fillArr(int[] arr) { int seed = 123; int n = arr.length; Random r = new Random(seed); for (int i = 0; i < n; i++){ arr[i] = r.nextInt(10*n); } } // end fillArr public void copyArr(int[] arr, int[] copyArr) { for (int i = 0; i < arr.length; i++) copyArr[i] = arr[i]; } public void printArr(int[] arr) { if (!testOutput) return; int n = arr.length; try { printLock.acquire(); } catch (Exception e) {return;} for (int i = 0; i < n; i++){ System.out.println(""+i+": "+arr[i]); } try { printLock.release(); } catch (Exception e) {return;} } // end printArr public void safePrint(int id, String s) { if (!testOutput) return; try { printLock.acquire(); } catch (Exception e) {return;} System.out.println("T"+id+": "+s); try { printLock.release(); } catch (Exception e) {return;} } // end printArr public Boolean checkMonotonic(int[] arr) { int n = arr.length; Boolean result = true; for (int i=1; i < n; i++) { if (arr[i-1] > arr[i]) { result = false; break; } } return result; } public void oddEven(int[] arr) { boolean isSorted = false; // Initially array is unsorted int n = arr.length; while (!isSorted) { isSorted = true; int temp = 0; // Perform Bubble sort on odd indexed element for (int i = 1; i <= n - 2; i = i + 2) { if (arr[i] > arr[i + 1]) { temp = arr[i]; arr[i] = arr[i + 1]; arr[i + 1] = temp; isSorted = false; } } // Perform Bubble sort on even indexed element for (int i = 0; i <= n - 2; i = i + 2) { if (arr[i] > arr[i + 1]) { temp = arr[i]; arr[i] = arr[i + 1]; arr[i + 1] = temp; isSorted = false; } } } return; } public static void main(String args[]) throws InterruptedException { OddEvenPara b; int min = 12; if ( args.length != 2) { System.out.println("use: >java OddEvenPara "); System.exit(0); } int num = Integer.parseInt(args[0]); int threadCount = Integer.parseInt(args[1]); if (!( (num >= min) && (threadCount >= 2) )) { System.out.println("Bad parameter(s): Max must be at least " + min + " and threadCount at least 2"); System.exit(0); }; b = new OddEvenPara(num); b.doit(b, num, threadCount); } // end main() void doit(OddEvenPara b, int num, int threadCount) throws InterruptedException { arrOrg = new int[num]; arr = new int[num]; arrPara = new int[num]; b.fillArr(arrOrg); b.copyArr(arrOrg, arr); b.copyArr(arrOrg, arrPara); // Sequential version of OddEven b.fillArr(arr); long seqTime = System.nanoTime(); // Start sequential timing b.oddEven(arr); seqTime = System.nanoTime()-seqTime; System.out.println("Array after seq OddEven sort: "); printArr(arr); System.out.println("Check of monoticity, sequential: " + b.checkMonotonic(arr)); System.out.println("OddEven Sequential time " + (seqTime/1000000.0) + " ms\n"); // Parallel version System.out.println("Initial array:"); printArr(arrPara); long paraTime = System.nanoTime(); readyToGo = new CyclicBarrier(threadCount+1); // includes main() thread allDone = new CyclicBarrier(threadCount+1); lockEntryRegion = new Semaphore(1, true); // Divide the array into threadCount*2 segments // Create a Semaphore to protect each segment segmentCount = threadCount * 2; segmentLength = num/segmentCount; // Make sure segment lengths are EVEN - otherwise starting Odd-Even passes over the segment // will become more complicated. if (segmentLength % 2 == 1) segmentLength++; segmentCount = num/segmentLength; if (num % segmentLength >0) segmentCount++; System.out.println("XX Segment length: "+segmentLength+" segment count: "+segmentCount); if (segmentLength < 4) { System.out.println("Bad parameters: Too many threads for such a small array"); System.exit(0); } sems = new Semaphore[segmentCount]; // Create a semaphore for each segment passId = new AtomicInteger(num); // passId for (int k = 0; k < segmentCount; k++) sems[k] = new Semaphore(1, true); // fill array. b.fillArr(arrPara); // start threads int lastThread = threadCount - 1; for (int i = 0; i < threadCount; i++) { //System.out.println("Starting thread "+i); new Thread(new Para(i, b, num, arrPara)).start(); } try { readyToGo.await(); // await all threads ready to execute } catch (Exception e) {return;} // Now the threads are doing their thing try { allDone.await(); // await all worker threads DONE } catch (Exception e) {return;} // Combine results paraTime = System.nanoTime()-paraTime; System.out.println("OddEven Parallel time " + (paraTime/1000000.0) + " ms\nSpeedup "+ seqTime*1.0/paraTime); System.out.println("Check of monoticity, parallel version: " + b.checkMonotonic(arrPara)); // Check that result of sort is the same for both. testOutput = false; Arrays.sort(arrOrg); System.out.println("SEQENTIAL SORTED ARRAY:"); printArr(arr); for (int i=0; i < num; i++) { if (arrOrg[i] != arr[i]) { System.out.println ("Error: difference in seq array at position "+i); System.exit(0); } } System.out.println("SEQENTIAL SORTED ARRAY is OK"); System.out.println("PARALLEL SORTED ARRAY:"); printArr(arrPara); for (int i=0; i < num; i++) { if (arr[i] != arrPara[i]) { System.out.println ("Error: difference in parallel array at position "+i); System.exit(0); } } System.out.println("PARALLEL SORTED ARRAY is OK"); Arrays.sort(arrPara); for (int i=0; i < num; i++) { if (arr[i] != arrPara[i]) { System.out.println ("Error: difference in parallel array at position "+i); System.exit(0); } } System.out.println("Check that sequential and parallel results are the same as the original: OK"); } class Para implements Runnable { int ind; OddEvenPara b; int myPassId; int num, currentSeg, myLastSeg, temp; int[] arr; Para(int in, OddEvenPara b, int num, int[] arr) { ind = in; this.b = b; this.num = num; this.arr = arr; } // konstruktor public void run() { // Her er det som kjores i parallell: boolean isSorted = true; // local to THIS thread int arr[] = this.arr; try { readyToGo.await(); // await all threads ready to execute } catch (Exception e) {return;} // ************************ Thread code for parallel part ************************* int currentSeg; safePrint(ind, "Start"); while (passId.get() > 1) { // start up // enter entry region and grab next available pass try { lockEntryRegion.acquire(); } catch (Exception e) { return; } myPassId = passId.getAndDecrement(); isSorted = true; // local to THIS thread and THIS pass safePrint(ind, "Starting pass number: "+myPassId); if (myPassId <= 1) { // others have done the work, so quit lockEntryRegion.release(); break; } currentSeg = 0; try { sems[currentSeg].acquire(); } catch (Exception e) { return; } safePrint(ind, "XX Seg length "+segmentLength+" Seg Count: "+segmentCount); lockEntryRegion.release(); // end of entry region - protected by lockEntryRegion // Now repeatedly OddEven sort thru the segments myLastSeg = segmentCount-1; System.out.println("T"+ind+" start pass "+myPassId); for (int s = 0; s <= myLastSeg; s++) { int j; // for each segment do the OddEven bubbling int firstSegIndex = s*segmentLength+1; int segEnd = (s+1)*segmentLength-1; // Adjust end of the last segment to not reach outside the array if (segEnd >= num) segEnd = num-1; safePrint(ind, "Starting seg "+s+" segEnd: "+segEnd+" myLastSeg "+myLastSeg); // We can check, if another thread has found out that the array is sorted at this time. // If that is the case, there is no need to continue because we will find the same result if (passId.get() < 1) { safePrint(ind, "Another thread has announced termination, so finish up"); isSorted = true; // another thread found out already System.out.println("T"+ind+" *** This thread quitting because another has announced termination, pass: "+myPassId+" Releasing seg "+s+" ***"); sems[s].release(); // release the lock on the sgement break; } if (s < myLastSeg) { // Must handle overlap with next segment at boundary, so must lock both segments safePrint(ind, "XX Pass: "+myPassId+" Trying to acquire seg "+(s+1)+" Queue length: "+sems[s+1].getQueueLength()); try { sems[s+1].acquire(); } catch (Exception e) { return; } safePrint(ind, "XX Got seg "+(s+1)+" "); } // Perform Bubble sort on Odd indexed elements safePrint(ind, "OddEven sort segment: "+s+" from "+s*segmentLength+" to "+ segEnd+ " array before:"); printArr(arr); for (j = s*segmentLength+1; j < segEnd; j = j+2) { if (arr[j] > arr[j+1]) { // swap elements safePrint(ind, "Odd swamp at: "+j+" values: "+arr[j]+" <-> "+arr[j+1]); temp = arr[j]; arr[j] = arr[j+1]; arr[j+1] = temp; isSorted = false; } } if (s < myLastSeg) { if (arr[segEnd] > arr[segEnd+1]) { // swap elements safePrint(ind, "Odd boundary swamp at: "+segEnd+" values: "+arr[segEnd]+" <-> "+arr[segEnd+1]); temp = arr[segEnd+1]; arr[segEnd+1] = arr[segEnd]; arr[segEnd] = temp; isSorted = false; } } safePrint(ind, "Array after Odd bubble (seg number: "+s+") Array after:"); printArr(arr); // Perform Bubble sort on Even indexed elements for (j = s*segmentLength; j < segEnd; j = j+2) { if (arr[j] > arr[j+1]) { // swap elements safePrint(ind, "Even swamp at: "+j+" values: "+arr[j]+" <-> "+arr[j+1]); temp = arr[j]; arr[j] = arr[j+1]; arr[j+1] = temp; isSorted = false; } } safePrint(ind, "Array after Even bubble, pass: "+myPassId+" isSorted: "+isSorted); printArr(arr); // done with this segment so release the lock on the segment safePrint(ind, "XX Segment done, pass: "+myPassId+" Releasing seg "+s); safePrint(ind, "XXX Sem queuelength: "+sems[s].getQueueLength()); sems[s].release(); } // for each segment if (myPassId < 1) { System.out.println("T"+ind+" *** I have been terminated, myPassId: "+myPassId+" ***"); } // IF isSorted is true, no exchange has been done and the array is now sorted. // Therefore we need to stop all other threads. // We do this by setting passID to zero - every thread checks that the passId is non-zero // before starting a new pass. if (isSorted && myPassId >0) { System.out.println("T"+ind+" *** DECLARING SORT DONE, myPassId: "+myPassId+" ***"); printArr(arr); passId.set(0); // this stops all threads from starting a new pass break; } System.out.println("T"+ind+" Pass done: "+myPassId); } // ************************ Thread specific code done ***************************** System.out.println("T"+ind+" DONE - and awaiting all to complete "); try { allDone.await(); // await all threads done } catch (Exception e) {return;} } // end run } // end class Para } // end class OddEvenPara