Java OCPJP7: il Fork/Join Framework e la risoluzione parallela di problemi divide-and-conquer

Il framework Fork/Join è un meccanismo di alto livello, introdotto proprio con con Java7 all’interno del package java.util.concurrent, per la gestione del multithreading, ovvero dell’esecuzione parallela di un task da parte di più processori. In particolare il suo utilizzo è ideale per semplificare la risoluzione di problemi che possono essere affrontati utilizzando una strategia di tipo divide-and-conquer, cioè di quei problemi che possono essere scomposti in sottoproblemi uguali ma di dimensione inferiore che vengono risolti individualmente e le cui singole soluzioni vengono poi combinate per produrre la soluzione globale.

La classe principale del framework fork/join è ForkJoinPool, un’implementazione dell’interfaccia ExecutorService che fornisce un efficiente pool di threads in grado di eseguire dei ForkJoinTask. Efficiente perché, come indicato nelle API, impiega una strategia di work-stealing che prevede che un thread, quando ha concluso l’esecuzione del suo task, possa prendersi in carico (“fregare”) dei tasks creati da un altro thread che è ancora impegnato nell’esecuzione.

Come detto, i threads del ForkJoinPool possono eseguire dei ForkJoinTask. Questa è un’astrazione (classe astratta) a sua volta estesa da due classi astratte, RecursiveAction e RecursiveTask, utilizzate per differenziare tra i tasks che devono restituire un risultato al termine della loro computazione (RecursiveTask) e quelli che invece devono solo svolgere delle operazioni ma senza ritornare nessun valore (RecursiveAction). La differenza tra le 2 classi, che deve essere tenuta ben presente in occasione dell’esame OCPJP, è quindi nel valore di ritorno del metodo compute() utilizzato per invocare l’esecuzione del task.

La signature del metodo per la classe RecursiveTask è: protected abstract V compute()
Mentre per la classe RecursiveAction è: protected abstract void compute()

Nella pratica, quello che occorre fare per implementare una soluzione basata su una strategia divide-and-conquer è definire una dimensione del problema accettabile per essere risolta da un singolo thread e, ad ogni invocazione, controllare se tale dimensione è stata raggiunta dal processo di scomposizione in sottoproblemi. In questo caso il thread può procedere con le operazioni di calcolo della soluzione del problema, altrimenti, se la dimensione del problema è ancora troppo grande e deve essere ulteriormente scomposta, la si suddivide nuovamente e si indirizzano ricorsivamente i nuovi tasks verso altri threads. Al termine delle operazioni di calcolo effettuate dai vari thread sui diversi sottoproblemi, si combinano i risultati per costruire la soluzione globale al problema originale.

Per fare questo si utilizza il metodo fork(), fornito nella classe astratta ForkJoinTask, per mandare in esecuzione il thread che eseguirà il task su un sottoproblema, si invocherà nel thread corrente il metodo compute() per l’esecuzione del task sull’altro sottoproblema, e si combinerà poi questo risultato con quello fornito dal task forkato, invocando su di esso il metodo join(), che indica al thread corrente di aspettare la sua terminazione.

Ma passiamo ad una dimostrazione concreta di utilizzo, che vale forse più di mille spiegazioni.
Nell’esempio seguente applichiamo, tramite l’utilizzo del framework fork/join, una strategia divide-and-conquer alla risoluzione del problema della ricerca del numero di occorrenze di un elemento X all’interno di un array di interi.

Una cosa da ricordare ancora per l’esame OCPJP, prima di procedere con l’esempio, è che il ForkJoinPool può essere istanziato passando al suo costruttore un intero rappresentante il numero di processi paralleli che si vogliono utilizzare. Se viene invece invocato il costruttore senza parametri e quindi questo valore non gli viene fornito, il ForkJoinPool viene creato di default con un grado di parallelismo pari al numero di processori disponibili sulla macchina, ricavabile con la chiamata Runtime.getRuntime().availableProcessors().

Nell’esempio seguente, ricaviamo esplicitamente il numero di processori disponibili, tramite il metodo appena indicato, e utilizziamo tale valore nella creazione del ForkJoinPool anche se in questo caso avremmo appunto potuto crearlo tramite il costruttore di default. Dopodichè calcoliamo la dimensione massima del problema indirizzabile al singolo thread, dividendo il numero totale di elementi presenti nell’array per il numero di processori disponibili. A questo punto manderemo in esecuzione nel ForkJoinPool, tramite il metodo invoke(), il task rappresentante il problema iniziale, con dimensione pari alla lunghezza dell’intero array. Suddivideremo poi ricorsivamente il problema a metà, fino a raggiungere la dimensione risolvibile dai singoli thread, che a quel punto calcoleranno la soluzione parziale.


import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

public class ForkJoinFindXOccurrences {

    public static int   numOfThreads;
    public static final int[] SEARCH_ARRAY = { 5, 1, 3, 1, 20, 7, 8, 4, 22, 1, 45, 21, 9, 11, 34, 10, 14, 1 };
    public static final int   TO_FIND      = 1;

    public static void main(final String[] args) {

        // recupero il numero di processori disponibili sulla macchina
        System.out.println("Numero di processori: "
                           + Runtime.getRuntime().availableProcessors());
        numOfThreads = Runtime.getRuntime().availableProcessors();

        System.out.println("Numero di elementi dell'array: "
                           + SEARCH_ARRAY.length);

        // calcolo la dimensione dei blocchi in cui scomporre il problema
        int dim = (int) Math.ceil((double) SEARCH_ARRAY.length / numOfThreads);
        System.out.println("Dimensione blocchi: " + dim);

        // creo un fork-join pool con il numero di processori disponibili
        // in questo caso, essendo il numero di processori il valore di default
        // avrei potuto invocare semplicemente il costruttore senza paremetri
        final ForkJoinPool pool = new ForkJoinPool(numOfThreads);

        // lancio il calcolo tramite il pool
        final int n = SEARCH_ARRAY.length;
        final int numOfOcc = pool.invoke(new RecursiveCountOfX(0, n - 1, dim));

        // stampa del risultato FINALE
        System.out.printf("[" + Thread.currentThread().getName() + "] Numero totale di occorrenze dell'elemento "
                          + TO_FIND + ": " + numOfOcc);
    }


    static class RecursiveCountOfX extends RecursiveTask {

        int start, end, dim;

        // "start" e "end" sono gli estremi del range in cui effettuare la ricerca
        // "dim" rappresenta la dimensione dei blocchi da processare
        public RecursiveCountOfX(final int start, final int end, final int dim) {
            this.start = start;
            this.end = end;
            this.dim = dim;
        }

        @Override
        public Integer compute() {
            // se il range di elementi e' inferiore alla dimensione consentita lo processo
            if ((this.end - this.start) < this.dim) {
                int ris = 0;
                for (int i = this.start; i <= this.end; i++) {
                    if (SEARCH_ARRAY[i] == TO_FIND) {
                        ris++;
                    }
                }
                // mostro il risultato parziale del sottoproblema ed il thread che l'ha eseguito
                System.out.println("\t [" + Thread.currentThread().getName() + "]: " + ris + " occorrenze di " + TO_FIND
                                   + " nel range da " + this.start + " a "
                                   + this.end);
                return ris;
            }

            // altrimenti, se il range e' ancora troppo grande lo divido ulteriormente a meta'
            final int mid = (this.start + this.end) / 2;
            System.out.printf("Fork del calcolo in due ranges: "
                                      + "da %d a %d e da %d a %d %n",
                              this.start,
                              mid,
                              mid + 1,
                              this.end);

            // assegno la computazione della prima meta' ad un task
            final RecursiveCountOfX subTask1 = new RecursiveCountOfX(this.start,
                                                                      mid,
                                                                      this.dim);
            // e lo invoco, forkando un nuovo processo
            subTask1.fork();

            // assegno la computazione della seconda meta' ad un altro task
            final RecursiveCountOfX subTask2 = new RecursiveCountOfX(mid + 1,
                                                                       this.end,
                                                                       this.dim);
            // e lo calcolo
            final int resultSecond = subTask2.compute();

            // aspetto la terminazione del processo forkato e unisco i risultati
            return subTask1.join() + resultSecond;

        }
    }
}

Il codice del programma è ampiamente commentato, in modo da descrivere in dettaglio ogni passo della sua esecuzione.
L'output fornito è quello che segue, in cui sono indicati i range in cui il problema è stato scomposto, il thread che si è preso in carico la computazione di ciascuno, i risultati parziali calcolati sui sottoproblemi e la soluzione finale ottenuta aggregando di volta in volta le soluzioni dei sottoproblemi.

Numero di processori: 4
Numero di elementi dell array: 18
Dimensione blocchi: 5
Fork del calcolo in due ranges: da 0 a 8 e da 9 a 17 
Fork del calcolo in due ranges: da 9 a 13 e da 14 a 17 
Fork del calcolo in due ranges: da 0 a 4 e da 5 a 8 
	 [ForkJoinPool-1-worker-3]: 1 occorrenze di 1 nel range da 9 a 13
	 [ForkJoinPool-1-worker-1]: 1 occorrenze di 1 nel range da 14 a 17
	 [ForkJoinPool-1-worker-3]: 2 occorrenze di 1 nel range da 0 a 4
	 [ForkJoinPool-1-worker-2]: 0 occorrenze di 1 nel range da 5 a 8
[main] Numero totale di occorrenze dell elemento 1: 4

Ok, abbiamo realizzato il nostro primo programma utilizzando il fork/join framework.
Ora aggiungiamo ancora qualche considerazione, alcune direttamente legate all'esame OCPJP, altre meno.
Per prima cosa notiamo che i 4 sottotask in cui il problema è stato suddiviso, sono in realtà stati processati da 3 differenti threads, o workers come vengono chiamati dal ForkJoinPool, con il worker-3 che ne ha processati 2.
Questo perché, essendo il carico computazionale richiesto per completare il task pressoché nullo, il worker si è liberato e ha potuto prendersi carico di un altro sottoproblema, senza che per tale scopo venisse allocato un nuovo worker. Per verificare se effettivamente il numero di workers allocati corrisponde a quelli indicati in fase di creazione del ForkJoinPool, quindi 4 nel nostro caso, introduciamo un'operazione di sleep per prolungare il tempo di risoluzione del task da parte del singolo worker.
Modifichiamo quindi il codice, aggiungendo una chiamata Thread.sleep(1000) prima del ritorno del risultato della computazione effettiva del worker, che introduce un'attesa di un secondo (il parametro della sleep è espresso in millisecondi) prima della terminazione del task.


import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

public class ForkJoinFindXOccurrences {

    public static int   numOfThreads;
    public static final int[] SEARCH_ARRAY = { 5, 1, 3, 1, 20, 7, 8, 4, 22, 1, 45, 21, 9, 11, 34, 10, 14, 1 };
    public static final int   TO_FIND      = 1;

    public static void main(final String[] args) {

        // recupero il numero di processori disponibili sulla macchina
        System.out.println("Numero di processori: "
                           + Runtime.getRuntime().availableProcessors());
        numOfThreads = Runtime.getRuntime().availableProcessors();

        System.out.println("Numero di elementi dell'array: "
                           + SEARCH_ARRAY.length);

        // calcolo la dimensione dei blocchi in cui scomporre il problema
        int dim = (int) Math.ceil((double) SEARCH_ARRAY.length / numOfThreads);
        System.out.println("Dimensione blocchi: " + dim);

        // creo un fork-join pool con il numero di processori disponibili
        // in questo caso, essendo il numero di processori il valore di default
        // avrei potuto invocare semplicemente il costruttore senza paremetri
        final ForkJoinPool pool = new ForkJoinPool(numOfThreads);

        // lancio il calcolo tramite il pool
        final int n = SEARCH_ARRAY.length;
        final int numOfOcc = pool.invoke(new RecursiveCountOfX(0, n - 1, dim));

        // stampa del risultato FINALE
        System.out.printf("[" + Thread.currentThread().getName() + "] Numero totale di occorrenze dell'elemento "
                          + TO_FIND + ": " + numOfOcc);
    }


    static class RecursiveCountOfX extends RecursiveTask {

        int start, end, dim;

        // "start" e "end" sono gli estremi del range in cui effettuare la ricerca
        // "dim" rappresenta la dimensione dei blocchi da processare
        public RecursiveCountOfX(final int start, final int end, final int dim) {
            this.start = start;
            this.end = end;
            this.dim = dim;
        }

        @Override
        public Integer compute() {
            // se il range di elementi e' inferiore alla dimensione consentita lo processo
            if ((this.end - this.start) < this.dim) {
                int ris = 0;
                for (int i = this.start; i <= this.end; i++) {
                    if (SEARCH_ARRAY[i] == TO_FIND) {
                        ris++;
                    }
                }
                // mostro il risultato parziale del sottoproblema ed il thread che l'ha eseguito
                System.out.println("\t [" + Thread.currentThread().getName() + "]: " + ris + " occorrenze di " + TO_FIND
                                   + " nel range da " + this.start + " a "
                                   + this.end);

                try {
                    // simuliamo uno sforzo computazionale del task più intenso
                    Thread.sleep(1000);
                }
                catch (InterruptedException e) {
                    e.printStackTrace();
                }

                return ris;
            }

            // altrimenti, se il range e' ancora troppo grande lo divido ulteriormente a meta'
            final int mid = (this.start + this.end) / 2;
            System.out.printf("Fork del calcolo in due ranges: "
                                      + "da %d a %d e da %d a %d %n",
                              this.start,
                              mid,
                              mid + 1,
                              this.end);

            // assegno la computazione della prima meta' ad un task
            final RecursiveCountOfX subTask1 = new RecursiveCountOfX(this.start,
                                                                      mid,
                                                                      this.dim);
            // e lo invoco, forkando un nuovo processo
            subTask1.fork();

            // assegno la computazione della seconda meta' ad un altro task
            final RecursiveCountOfX subTask2 = new RecursiveCountOfX(mid + 1,
                                                                       this.end,
                                                                       this.dim);
            // e lo calcolo
            final int resultSecond = subTask2.compute();

            // aspetto la terminazione del processo forkato e unisco i risultati
            return subTask1.join() + resultSecond;

        }
    }
}

Di seguito vediamo l'output della nuova esecuzione in cui possiamo verificare l'utilizzo di 4 workers diversi per la risoluzione dei 4 sottoproblemi in cui abbiamo scomposto il problema originale, con il ricorso anche al worker-4 che in precedenza non era risultato necessario.

Numero di processori: 4
Numero di elementi dell array: 18
Dimensione blocchi: 5
Fork del calcolo in due ranges: da 0 a 8 e da 9 a 17 
Fork del calcolo in due ranges: da 9 a 13 e da 14 a 17 
Fork del calcolo in due ranges: da 0 a 4 e da 5 a 8 
	 [ForkJoinPool-1-worker-3]: 1 occorrenze di 1 nel range da 9 a 13
	 [ForkJoinPool-1-worker-1]: 1 occorrenze di 1 nel range da 14 a 17
	 [ForkJoinPool-1-worker-2]: 0 occorrenze di 1 nel range da 5 a 8
	 [ForkJoinPool-1-worker-4]: 2 occorrenze di 1 nel range da 0 a 4
[main] Numero totale di occorrenze dell elemento 1: 4

Un'altra cosa su cui è bene soffermarsi, e soprattutto su cui occorre prestare attenzione in sede di esame OCPJP7, è l'ordine delle chiamate dei metodi fondamentali del framework fork/join.
Vediamo il perché.
Iniziamo con analizzare le seguenti righe di codice, in cui assembliamo i risultati parziali dei 2 sottoproblemi risolti:

final int resultSecond = subTask2.compute();

// aspetto la terminazione del processo forkato e unisco i risultati
return subTask1.join() + resultSecond;

In questo caso, non sarebbe necessario assegnare ad una variabile dedicata il risultato della computazione del subTask2, ma potremmo utilizzare direttamente il valore restituito nello statement di return. Ed è proprio qua dove la cosa si può fare pericolosa, perché occorre assicurarsi che l'invocazione di compute() sul subTask2 venga effettuata prima della chiamata join() sul subTask1.
Quindi bisogna prestare attenzione che l'ordine delle invocazioni sia come segue:

// prima lancio la computazione poi chiamo la join sull'altro thread
return subTask2.compute() + subTask1.join();

Per semplicità non riportiamo nuovamente l'intero codice. Il risultato dell'esecuzione del programma con il risultato assemblato nell'unica riga indicata sopra è in linea con quello ottenuto precedentemente:

Numero di processori: 4
Numero di elementi dell array: 18
Dimensione blocchi: 5
Fork del calcolo in due ranges: da 0 a 8 e da 9 a 17 
Fork del calcolo in due ranges: da 9 a 13 e da 14 a 17 
Fork del calcolo in due ranges: da 0 a 4 e da 5 a 8 
	 [ForkJoinPool-1-worker-3]: 1 occorrenze di 1 nel range da 9 a 13
	 [ForkJoinPool-1-worker-3]: 2 occorrenze di 1 nel range da 0 a 4
	 [ForkJoinPool-1-worker-2]: 0 occorrenze di 1 nel range da 5 a 8
	 [ForkJoinPool-1-worker-1]: 1 occorrenze di 1 nel range da 14 a 17
[main] Numero totale di occorrenze dell elemento 1: 4

Ma cosa succede se invertiamo l'ordine delle chiamate compute() e join()? Succede che aspettiamo la terminazione dell'altro thread lanciato SENZA partire in parallelo con il calcolo nel thread corrente. Quindi, sostanzialmente, stiamo vanificando gli effetti del processamento parallelo e stiamo a tutti gli effetti procedendo in modo sequenziale, con un unico thread.
Non ci resta che verificare. Sostituiamo quindi la riga di codice del return, invertendo le chiamate compute() e join() in questo modo:

return subTask1.join() + subTask2.compute();

Ed eseguiamo nuovamente il programma. Il risultato ottenuto è il seguente:

Numero di processori: 4
Numero di elementi dell array: 18
Dimensione blocchi: 5
Fork del calcolo in due ranges: da 0 a 8 e da 9 a 17 
Fork del calcolo in due ranges: da 0 a 4 e da 5 a 8 
	 [ForkJoinPool-1-worker-1]: 2 occorrenze di 1 nel range da 0 a 4
	 [ForkJoinPool-1-worker-1]: 0 occorrenze di 1 nel range da 5 a 8
Fork del calcolo in due ranges: da 9 a 13 e da 14 a 17 
	 [ForkJoinPool-1-worker-1]: 1 occorrenze di 1 nel range da 9 a 13
	 [ForkJoinPool-1-worker-1]: 1 occorrenze di 1 nel range da 14 a 17
[main] Numero totale di occorrenze dell elemento 1: 4

Dall'output generato notiamo che TUTTI i sottoproblemi sono stati calcolati dallo stesso worker! Quindi il nostro programma non ha eseguito in modo concorrente ma in modalità single-threaded.
Notiamo, allo stesso modo, che il risultato finale non cambia. Non otteniamo né deadlock o altri problemi legati alla concorrenza, né tanto meno otteniamo un risultato errato o non prevedibile. Quello che otteniamo è lo stesso risultato, calcolato con delle performances peggiori, dovute al fatto di non sfruttare il multithreading.
Per cui, ancora una volta, è importante prestare attenzione a questa possibile confusione e conoscere qual è il comportamente del programma in questo caso.

Un'ultima cosa su cui vale la pena spendere ancora 2 parole, anche se non strettamente legata al framework fork/join, è il metodo in cui abbiamo calcolato la dimensione accettabile dei task.
La riga:

// calcolo la dimensione dei blocchi in cui scomporre il problema
int dim = (int) Math.ceil((double) SEARCH_ARRAY.length / numOfThreads);

arrotonda all'intero superiore la divisione. Questo ci serve per definire una dimensione dei sottoproblemi che sia gestibile dal numero di threads a disposizione. In questo caso, con 4 threads e 18 elementi da analizzare, devo creare degli intervalli da 5 elementi.
Se avessi utilizzato semplicemente la divisione intera, avrei ottenuto dei blocchi di dimensione 4, che non sarebbe stato possibile gestire con solo 4 tasks (essendo sempre 18 gli elementi). In questo caso avrei ottenuto delle suddivisioni ulteriori dei sottoproblemi e dei nuovi tasks creati ed invocati.

Ancora una volta verifichiamo, eliminando la chiamata al metodo statico Math.ceil() nel calcolo della dimensione dei sottoproblemi, che diventa quindi:

int dim = SEARCH_ARRAY.length / numOfThreads;

Eseguendo il programma otteniamo il seguente output:

Numero di processori: 4
Numero di elementi dell array: 18
Dimensione blocchi: 4
Fork del calcolo in due ranges: da 0 a 8 e da 9 a 17 
Fork del calcolo in due ranges: da 9 a 13 e da 14 a 17 
Fork del calcolo in due ranges: da 0 a 4 e da 5 a 8 
Fork del calcolo in due ranges: da 9 a 11 e da 12 a 13 
	 [ForkJoinPool-1-worker-1]: 1 occorrenze di 1 nel range da 14 a 17
	 [ForkJoinPool-1-worker-3]: 0 occorrenze di 1 nel range da 12 a 13
Fork del calcolo in due ranges: da 0 a 2 e da 3 a 4 
	 [ForkJoinPool-1-worker-4]: 1 occorrenze di 1 nel range da 3 a 4
	 [ForkJoinPool-1-worker-4]: 1 occorrenze di 1 nel range da 0 a 2
	 [ForkJoinPool-1-worker-2]: 0 occorrenze di 1 nel range da 5 a 8
	 [ForkJoinPool-1-worker-1]: 1 occorrenze di 1 nel range da 9 a 11
[main] Numero totale di occorrenze dell elemento 1: 4

in cui notiamo che:

- la riga "Dimensione blocchi" ora mostra il valore 4 invece che 5
- il numero di sottoproblemi in cui è stato scomposto il problema è 6 e non più 4
- i workers 4 e 1 risolvono 2 tasks a testa

2 thoughts on “Java OCPJP7: il Fork/Join Framework e la risoluzione parallela di problemi divide-and-conquer

  1. Pingback: Appunti OCPJP7: Localization | Dede Blog

  2. Ciao,

    intanto grazie mille del materiale!

    Nel primo esempio che hai pubblicato, ho dovuto fare un cast (int) nel return del metodo Integer compute(). Potresti darmi spiegarmi come mai? Con la parallelizzazione sono alle prime armi.
    Grazie

Leave a Reply to Niccolò Cancel reply

Your email address will not be published. Required fields are marked *