import java.util.Random; import java.util.Arrays; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.Semaphore; /** * Bruker strategien kombinert fra oppgave b.1 og b.4 fra forrige uke * som basis for denne oppgaven (se metoden * FindMaxWorkerThread.run()). Som forrige uke parallelliserer vi * problemet med ? finne det st?rste elementet i en heltallsarray. * * Men denne uka tester vi forskjellige m?ter ? vente p? resultatet * fra alle tr?dene. join(), CyclicBarrier og Semaphore er * implementert, og tidene skrives ut for de forskjellige metodene. * * For ? lage et tilfeldig array brukes java.util.Random for ? genere * tall. Alle metodene henter fra den globale arrayen "randomNumbers", * det er kun lesing fra denne arrayen, s? den blir ikke redigert * etter at den er fylt med tilfeldige tall. * * Vi tester for n = 1000, 100000, ..., 10 millioner elementer i arrayen * (arrayen er 10 millioner stort, men n?r vi ser p? lavere tall ser * vi bare p? mindre deler av arrayen). * * Vi tester alt sammen et gitt antall ganger (numRepetitions), for ? * ta hensyn til JIT-kompilering, og skriver deretter ut mediantiden. * * Det at det brukes String-sammenligning i forhold til enums for ? * sjekke om det er semaphore, join eller barrier har ingen ting ? si * p? kj?retiden (sjekket). */ public class Uke2_Oppg1 { public static final int TEN_MILLIONS = 10000000; private final int numThreads; private int[] randomNumbers; private int globalMax; public static void main(String[] args) { int localNumThreads = Runtime.getRuntime().availableProcessors(); System.out.println("Number of threads: " + localNumThreads + "\n"); Uke2_Oppg1 o = new Uke2_Oppg1(TEN_MILLIONS, localNumThreads); final int numRepetitons = 9; // Arrays to store the timing results: double[] sequentialNumbers = new double[numRepetitons]; double[] semaphoreNumbers = new double[numRepetitons]; double[] joinNumbers = new double[numRepetitons]; double[] barrierNumbers = new double[numRepetitons]; for (int i = 1000; i <= TEN_MILLIONS; i *= 100) { for (int j = 0; j < numRepetitons; j++) { sequentialNumbers[j] = o.sequentialFindMax(i, j); o.globalMax = 0; // Reset semaphoreNumbers[j] = o.parallelFindMax(i, "semaphore", j); o.globalMax = 0; // Reset joinNumbers[j] = o.parallelFindMax(i, "join", j); o.globalMax = 0; // Reset barrierNumbers[j] = o.parallelFindMax(i, "barrier", j); } System.out.println("\t\tNumber of elements: " + i); calculateAndPrintMedian(sequentialNumbers, semaphoreNumbers, joinNumbers, barrierNumbers); System.out.println(); } } static void calculateAndPrintMedian(double[] seq, double[] semaphore, double[] join, double[] barrier) { Arrays.sort(seq); Arrays.sort(semaphore); Arrays.sort(join); Arrays.sort(barrier); System.out.println("\t\tSequential median: " + seq[seq.length/2]); System.out.println("\t\tSemaphore median: " + semaphore[semaphore.length/2]); System.out.println("\t\tJoin median: " + join[join.length/2]); System.out.println("\t\tBarrier median: " + barrier[barrier.length/2]); } Uke2_Oppg1(int numElements, int numThreads) { this.numThreads = numThreads; randomNumbers = new int[numElements]; Random r = new Random(); // Populate the array with random numbers for (int i = 0; i < numElements; i++) { randomNumbers[i] = r.nextInt(Integer.MAX_VALUE); } } public void printAnswerAndTime(int round, String type, int elements, double time, int number) { System.out.printf("round %2d: %19s (%8d numbers checked) in time %5.2f, max number %4d\n", round, type, elements, time, number); } /** * Sekvensiell l?sning for ? finne st?rste nummer in en array. * @param round denne metoden kj?res flere ganger, og denne * variabelen sier hvilken "runde" vi er p? n? */ public double sequentialFindMax(int numElements, int round) { long start = System.nanoTime(); int localMax = randomNumbers[0]; for (int i = 1; i < numElements; i++) { if (randomNumbers[i] > localMax) { localMax = randomNumbers[i]; } } long end = System.nanoTime(); printAnswerAndTime(round, "sequential", numElements, (end-start)/1000000.0, localMax); return (end-start)/1000000.0; } /** * Fordel elementene til tr?dene og starte dem opp. * @param round denne metoden kj?res flere ganger, og denne * variabelen sier hvilken "runde" vi er p? n? */ public double parallelFindMax(int numElements, String type, int round) { long start = System.nanoTime(); final int elementsPerThread = numElements / numThreads; final int remainderForLastThread = numElements % numThreads; CyclicBarrier barrier = null; Semaphore semaphore = null; if (type.equals("barrier")) { barrier = new CyclicBarrier(numThreads + 1); // +1 for main } else if (type.equals("semaphore")) { semaphore = new Semaphore(-numThreads + 1); } Thread[] workerThreads = new Thread[numThreads]; for (int i = 0; i < numThreads; i++) { int startRange = i * elementsPerThread; int endRange = (i+1) * elementsPerThread; // Last element takes the remainder if (i == numThreads - 1) endRange += remainderForLastThread; workerThreads[i] = new Thread(new FindMaxWorkerThread(startRange, endRange, barrier, semaphore, type)); workerThreads[i].start(); } // Different mechanismes of waiting for all threads to finish: if (type.equals("barrier")) { try { barrier.await(); } catch (Exception e) { return 0.0; } } else if (type.equals("join")) { try { for (int i = 0; i < numThreads; i++) { workerThreads[i].join(); } } catch (InterruptedException e) { e.printStackTrace(); return 0.0; } } else if (type.equals("semaphore")) { try { semaphore.acquire(); } catch(Exception e) { e.printStackTrace(); return 0.0; } } long end = System.nanoTime(); printAnswerAndTime(round, "parallel" + " " + type, numElements, (end-start)/1000000.0, globalMax); return (end-start)/1000000.0; } /** * Oppdaterer globalMax om det er funnet ny st?rste. Denne er * synkronisert, s? om flere tr?der kaller p? denne tr?den vil de * bli plassert i en k? f?r de f?r l?sen. */ public synchronized void synchronizedGlobalMax(int possibleMax) { if (possibleMax > globalMax) { globalMax = possibleMax; } } class FindMaxWorkerThread implements Runnable { private final int fromIndex, toIndex; private final String type; private CyclicBarrier barrier; private Semaphore semaphore; private int localMax; FindMaxWorkerThread(int fromIndex, int toIndex, CyclicBarrier barrier, Semaphore semaphore, String type) { this.fromIndex = fromIndex; this.toIndex = toIndex; this.barrier = barrier; this.semaphore = semaphore; this.type = type; } /** * Finner det tallet som er h?yest i den sammenhengende * tallf?lgen denne tr?den ble tildelt. N?r den har funnet * dette tallet, sammenligner den med globalMax */ public void run() { for (int i = fromIndex; i < toIndex; i++) { if (randomNumbers[i] > localMax) { localMax = randomNumbers[i]; } } synchronizedGlobalMax(localMax); if (type.equals("barrier")) { try { // Wait for all threads to finish barrier.await(); } catch (Exception e) { return; } } else if (type.equals("semaphore")) { // Release the resource this thread occupy, and then // terminate. semaphore.release(); } } } }