import java.util.Random; import java.util.concurrent.CyclicBarrier; /** * b.1, b.2, b.3 og b.4 er implementert, kommenter ut den oppgaven du * vil teste i metoden AccumulateSumWorkerThread.run() * * SAMME PROBLEM SOM OPPGAVE 2, BARE AT N? SKAL VI SUMMERE ISTEDENFOR * ? FINNE MAKS * * Vi skal n? parallellisere problemet med ? finne det summen av alle * elementene i en heltallsarray a[] av lengde n - og skrive: 'int * finnSum(int a[]) {..}; * * Arrayen vi skal f?rst lage en slik array med tilfeldig innhold med * klassen Random i pakken: * java.util * hvor man nytter metoden nextInt(n) for ? trekke elementene i a[]. * * Vi skal nedenfor teste denne for n = 100, 1000, ..., 10 millioner. */ public class Uke1_Oppg3 { public static final int TEN_MILLIONS = 10000000; private final int numThreads; private int[] randomNumbers; private long globalSum; public static void main(String[] args) { // Number of threads default to available processors int localNumThreads = Runtime.getRuntime().availableProcessors(); if (args.length == 1) { // Optional argument to specify number of threads localNumThreads = Integer.parseInt(args[0]); } Uke1_Oppg3 o = new Uke1_Oppg3(TEN_MILLIONS, localNumThreads); for (int i = 100; i <= TEN_MILLIONS; i *= 10) { o.sequentialAccumulateSum(i); o.globalSum = 0; o.parallelAccumulateSum(i); System.out.println(); } } Uke1_Oppg3(int numElements, int numThreads) { this.numThreads = numThreads; randomNumbers = new int[numElements]; Random r = new Random(); // Populate the array with random numbers for (int i = 0; i < numElements; i++) { randomNumbers[i] = r.nextInt(numElements); } } public void printAnswerAndTime(String type, int elements, double time, long number) { System.out.printf("%10s (%8d numbers checked) in time %5.2f, sum %4d\n", type, elements, time, number); } /** * Sekvensiell l?sning for ? finne summen av alle elementene i en * array. */ public void sequentialAccumulateSum(int numElements) { long start = System.nanoTime(); long localSum = randomNumbers[0]; for (int i = 1; i < numElements; i++) { localSum += randomNumbers[i]; } long end = System.nanoTime(); printAnswerAndTime("sequential", numElements, (end-start)/1000000.0, localSum); } /** * Fordel elementene til tr?dene og starte dem opp. */ public void parallelAccumulateSum(int numElements) { long start = System.nanoTime(); int elementsPerThread = numElements / numThreads; int remainderForLastThread = numElements % numThreads; CyclicBarrier barrier = new CyclicBarrier(numThreads + 1); // +1 for main for (int threadId = 0; threadId < numThreads; threadId++) { int startRange = threadId * elementsPerThread; int endRange = (threadId+1) * elementsPerThread; // Last element takes the remainder if (threadId == numThreads - 1) endRange += remainderForLastThread; new Thread(new AccumulateSumWorkerThread(threadId, numElements, startRange, endRange, barrier)).start(); } try { // Wait for all threads to finish barrier.await(); } catch (Exception e) { return; } long end = System.nanoTime(); printAnswerAndTime("parallel", numElements, (end-start)/1000000.0, globalSum); } /** * Oppdaterer globalSum. Denne er synkronisert, s? om flere tr?der * kaller p? denne tr?den vil de bli plassert i en k? f?r de f?r * l?sen. * * @param addToSum legger denne til globalSum */ public synchronized void synchronizedGlobalSum(long addToSum) { globalSum += addToSum; } class AccumulateSumWorkerThread implements Runnable { private final int fromIndex, toIndex; private final int threadId; private final int numElements; private long localSum; private CyclicBarrier barrier; AccumulateSumWorkerThread(int threadId, int numElements, int fromIndex, int toIndex, CyclicBarrier barrier) { this.fromIndex = fromIndex; this.numElements = numElements; this.toIndex = toIndex; this.barrier = barrier; this.threadId = threadId; } /** * Avkommenter den metoden du ?nsker ? teste * * b_1_3(): Oppgave b.1 og b.3 * b_2_3(): Oppgave b.2 og b.3 * b_1_4(): Oppgave b.1 og b.4 * b_2_4(): Oppgave b.2 og b.4 */ public void run() { // Using synchronizedGlobalSum() for every element: //b_1_3(); // Threads pick numbers from a range //b_2_3(); // Threads pick numbers spread // Using synchronizedGlobalSum() for localSum only: b_1_4(); // Threads pick numbers from a range //b_2_4(); // Threads pick numbers spread try { // Wait for all threads to finish barrier.await(); } catch (Exception e) { return; } } /** * Denne legger til hvert element i sin del av arrayen til den * globale summen via en synchronized-metode. * * Elementene plukkes ut i en sammehengende f?lge, * f.eks. tr?d 0 har elementene "0,1,2" og tr?d 1 har * elementene "3,4,5" */ public void b_1_3() { for (int i = fromIndex; i < toIndex; i++) { synchronizedGlobalSum(randomNumbers[i]); } } /** * Samme som b_1_3, men elementene plukkes p? f?lgende m?te: * * Deretter deler vi arrayen a[] slik at tr?d 0 tester * element: 0, k, 2k, 3k,..osv. Tr?d 1 tester element * nr. 1, k+1, 2k+1, .... Tr?d 2 tester element 2, k+2, 2k+2,.. * * OBS: Denne originale oppgaven hadde en skrivefeil, dette er * den oppdaterte og korrekte versjonen. * * numThreads = k */ public void b_2_3() { int elementIndex = threadId; int threadCounter = 1; // Only go as long as the index is lower than number of // elements to inspect while (elementIndex < numElements) { synchronizedGlobalSum(randomNumbers[elementIndex]); elementIndex = (threadCounter * numThreads) + threadId; threadCounter++; } } /** * Her summeres alle tallene opp f?rst lokalt, deretter * summeres de opp i den globale variabelen. * * Elementene plukkes ut i en sammehengende f?lge, * f.eks. tr?d 0 har elementene "0,1,2" og tr?d 1 har * elementene "3,4,5" */ public void b_1_4() { for (int i = fromIndex; i < toIndex; i++) { localSum += randomNumbers[i]; } synchronizedGlobalSum(localSum); } /** * Samme som b_1_4, men elementene plukkes p? f?lgende m?te: * * Deretter deler vi arrayen a[] slik at tr?d 0 tester * element: 0, k, 2k, 3k,..osv. Tr?d 1 tester element * nr. 1, k+1, 2k+1, .... Tr?d 2 tester element 2, k+2, 2k+2,.. * * OBS: Denne originale oppgaven hadde en skrivefeil, dette er * den oppdaterte og korrekte versjonen. * * numThreads = k */ public void b_2_4() { int elementIndex = threadId; int threadCounter = 1; while (elementIndex < numElements) { localSum += randomNumbers[elementIndex]; elementIndex = (threadCounter * numThreads) + threadId; threadCounter++; } synchronizedGlobalSum(localSum); } } }