Analisi di più file e relativi contenuti in Java utilizzando il multithreading senza ExecutorService

3

Sto imparando la concorrenza in Java e ho seguito le esercitazioni sul sito Web Oracle. Mentre ne ho capito un po ', una parte più grande mi sfugge. Stavo pensando a un problema ipotetico (sebbene possa essere o non sia un buon caso di utilizzare più thread) dove ho 100 file di testo e ho bisogno di cercare una parola particolare in tutti loro. Se implemento una coda di blocco e non voglio utilizzare ThreadPool con un servizio executor:

  1. Come dovrei risolvere il problema (pensando algoritmicamente)?
  2. Come dovrei farlo se voglio implementare il BlockingQueue con un modello per più produttori, più consumatori, in cui ho 100 thread per put () 100 contenuti di testo nel BlockingQueue e un altro da 100 a take () e cerca in essi una parola particolare?

Quello che ho scritto può o non ha senso, ma io sono solo un principiante e voglio saperne di più su questo problema, poiché questo problema è apparso frequentemente nelle interviste di programmazione.

    
posta Shehryar 30.11.2016 - 20:12
fonte

1 risposta

1

Ottima domanda! Ho scritto un piccolo esempio (utilizza solo 6 thread, ma può essere facilmente espanso) per illustrare come leggere più file (un thread per leggere ciascun file) ed elaborare i dati con più thread.

Quindi iniziamo con Controller , che è fondamentalmente solo il direttore responsabile della creazione e della gestione degli altri thread. Noterai che fornisce ad ogni thread un riferimento alla coda che consente ai thread di svolgere il proprio lavoro: aggiungere elementi o rimuovere elementi dalla coda. Noterai inoltre che conserva due raccolte di thread: uno per i thread di produzione e l'altro per tutti i thread. La raccolta del thread del produttore viene utilizzata per fornire ai thread dei consumatori un modo per sapere se devono continuare ad attendere ulteriori input. La raccolta contenente tutti i thread viene utilizzata per impedire l'uscita del controller prima che tutti i produttori e i consumatori abbiano completato il proprio lavoro.

package multithreading.producer_consumer.blockingQueue;

import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;

public class Controller {

    private static final int NUMBER_OF_CONSUMERS = 3;
    private static final int NUMBER_OF_PRODUCERS = 3;
    private static final int QUEUE_SIZE = 2;
    private static BlockingQueue<String> queue;
    private static Collection<Thread> producerThreadCollection, allThreadCollection;

    public static void main(String[] args) {
        producerThreadCollection = new ArrayList<Thread>();
        allThreadCollection = new ArrayList<Thread>();
        queue = new LinkedBlockingDeque<String>(QUEUE_SIZE);

        createAndStartProducers();
        createAndStartConsumers();

        for(Thread t: allThreadCollection){
            try {
                t.join();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }

        System.out.println("Controller finished");
    }

    private static void createAndStartProducers(){
        for(int i = 1; i <= NUMBER_OF_PRODUCERS; i++){
            Producer producer = new Producer(Paths.get("./src/multithreading/producer_consumer/blockingQueue/file"+i+".txt"), queue);
            Thread producerThread = new Thread(producer,"producer-"+i);
            producerThreadCollection.add(producerThread);
            producerThread.start();
        }
        allThreadCollection.addAll(producerThreadCollection);
    }

    private static void createAndStartConsumers(){
        for(int i = 0; i < NUMBER_OF_CONSUMERS; i++){
            Thread consumerThread = new Thread(new Consumer(queue), "consumer-"+i);
            allThreadCollection.add(consumerThread);
            consumerThread.start();
        }
    }

    public static boolean isProducerAlive(){
        for(Thread t: producerThreadCollection){
            if(t.isAlive())
                return true;
        }
        return false;
    }
}

Successivamente, ecco il codice per la classe Producer che verrà utilizzata per creare tutti i thread il cui lavoro è di leggere un singolo file ciascuno. Vedrai che il produttore legge un file specifico riga per riga e aggiunge quelle righe alla coda in quanto c'è spazio disponibile facendo uso del metodo put .

package multithreading.producer_consumer.blockingQueue;

import java.io.BufferedReader;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.concurrent.BlockingQueue;

public class Producer implements Runnable{

    private Path fileToRead;
    private BlockingQueue<String> queue;

    public Producer(Path filePath, BlockingQueue<String> q){
        fileToRead = filePath;
        queue = q;
    }

    @Override
    public void run() {
        try {
            BufferedReader reader = Files.newBufferedReader(fileToRead);
            String line;
            while((line = reader.readLine()) != null){
                try {
                    queue.put(line);
                    System.out.println(Thread.currentThread().getName() + " added \"" + line + "\" to queue, queue size: " + queue.size());             
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }

            System.out.println(Thread.currentThread().getName()+" finished");
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

}

Infine, ecco la classe Consumer che sarà responsabile della lettura dei dati dalla coda e dell'elaborazione in modo appropriato. Si noti che questa classe non utilizza il metodo take . L'ho scritto in questo modo in modo che il programma terminasse dopo aver elaborato tutti i file. Se vuoi che i consumatori rimangano vivi, puoi sostituire poll con take (insieme ad alcune piccole modifiche al metodo run come passare il InterruptedException che potrebbe verificarsi mentre aspetti che take restituisca un valore valore).

package multithreading.producer_consumer.blockingQueue;

import java.util.concurrent.BlockingQueue;

public class Consumer implements Runnable{
    private BlockingQueue<String> queue;

    public Consumer(BlockingQueue<String> q){
        queue = q;
    }

    public void run(){
        while(true){
            String line = queue.poll();

            if(line == null && !Controller.isProducerAlive())
                return;

            if(line != null){
                System.out.println(Thread.currentThread().getName()+" processing line: "+line);
                //Do something with the line here like see if it contains a string
            }

        }
    }
}

Ecco i 3 file di input che ho usato:

file1.txt

file #1 line 1
file #1 line 2
file #1 line 3
file #1 line 4
file #1 line 5

file2.txt

This is file #2 line 1
This is file #2 line 2
This is file #2 line 3
This is file #2 line 4
This is file #2 line 5

file3.txt

Lastly we have file #3 line 1
Lastly we have file #3 line 2
Lastly we have file #3 line 3
Lastly we have file #3 line 4
Lastly we have file #3 line 5

Ecco alcuni esempi di output dal programma. Tieni presente che System.out.println non è sincronizzato, quindi l'output non è in ordine.

consumer-0 processing line: Lastly we have file #3 line 1
consumer-0 processing line: This is file #2 line 1
producer-2 added "This is file #2 line 1" to queue, queue size: 1
producer-2 added "This is file #2 line 2" to queue, queue size: 1
producer-2 added "This is file #2 line 3" to queue, queue size: 1
producer-2 added "This is file #2 line 4" to queue, queue size: 2
consumer-1 processing line: file #1 line 1
consumer-1 processing line: This is file #2 line 4
consumer-1 processing line: This is file #2 line 5
producer-1 added "file #1 line 1" to queue, queue size: 1
producer-1 added "file #1 line 2" to queue, queue size: 0
producer-3 added "Lastly we have file #3 line 1" to queue, queue size: 0
producer-1 added "file #1 line 3" to queue, queue size: 1
consumer-1 processing line: file #1 line 2
producer-2 added "This is file #2 line 5" to queue, queue size: 0
producer-1 added "file #1 line 4" to queue, queue size: 2
producer-2 finished
consumer-2 processing line: This is file #2 line 3
consumer-2 processing line: Lastly we have file #3 line 2
consumer-2 processing line: file #1 line 4
consumer-2 processing line: file #1 line 5
consumer-0 processing line: This is file #2 line 2
producer-1 added "file #1 line 5" to queue, queue size: 0
producer-1 finished
consumer-1 processing line: file #1 line 3
producer-3 added "Lastly we have file #3 line 2" to queue, queue size: 2
producer-3 added "Lastly we have file #3 line 3" to queue, queue size: 1
producer-3 added "Lastly we have file #3 line 4" to queue, queue size: 1
producer-3 added "Lastly we have file #3 line 5" to queue, queue size: 0
producer-3 finished
consumer-0 processing line: Lastly we have file #3 line 3
consumer-2 processing line: Lastly we have file #3 line 5
consumer-1 processing line: Lastly we have file #3 line 4
Controller finished

Spero che questo sia utile per illustrare come è possibile eseguire l'attività senza utilizzare ExecutorService. Divertiti!

    
risposta data 02.12.2016 - 04:06
fonte

Leggi altre domande sui tag