Ottima domanda! Ho scritto un piccolo esempio (utilizza solo 6 thread, ma può essere facilmente espanso) per illustrare come leggere più file (un thread per leggere ciascun file) ed elaborare i dati con più thread.
Quindi iniziamo con Controller
, che è fondamentalmente solo il direttore responsabile della creazione e della gestione degli altri thread. Noterai che fornisce ad ogni thread un riferimento alla coda che consente ai thread di svolgere il proprio lavoro: aggiungere elementi o rimuovere elementi dalla coda. Noterai inoltre che conserva due raccolte di thread: uno per i thread di produzione e l'altro per tutti i thread. La raccolta del thread del produttore viene utilizzata per fornire ai thread dei consumatori un modo per sapere se devono continuare ad attendere ulteriori input. La raccolta contenente tutti i thread viene utilizzata per impedire l'uscita del controller prima che tutti i produttori e i consumatori abbiano completato il proprio lavoro.
package multithreading.producer_consumer.blockingQueue;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
public class Controller {
private static final int NUMBER_OF_CONSUMERS = 3;
private static final int NUMBER_OF_PRODUCERS = 3;
private static final int QUEUE_SIZE = 2;
private static BlockingQueue<String> queue;
private static Collection<Thread> producerThreadCollection, allThreadCollection;
public static void main(String[] args) {
producerThreadCollection = new ArrayList<Thread>();
allThreadCollection = new ArrayList<Thread>();
queue = new LinkedBlockingDeque<String>(QUEUE_SIZE);
createAndStartProducers();
createAndStartConsumers();
for(Thread t: allThreadCollection){
try {
t.join();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
System.out.println("Controller finished");
}
private static void createAndStartProducers(){
for(int i = 1; i <= NUMBER_OF_PRODUCERS; i++){
Producer producer = new Producer(Paths.get("./src/multithreading/producer_consumer/blockingQueue/file"+i+".txt"), queue);
Thread producerThread = new Thread(producer,"producer-"+i);
producerThreadCollection.add(producerThread);
producerThread.start();
}
allThreadCollection.addAll(producerThreadCollection);
}
private static void createAndStartConsumers(){
for(int i = 0; i < NUMBER_OF_CONSUMERS; i++){
Thread consumerThread = new Thread(new Consumer(queue), "consumer-"+i);
allThreadCollection.add(consumerThread);
consumerThread.start();
}
}
public static boolean isProducerAlive(){
for(Thread t: producerThreadCollection){
if(t.isAlive())
return true;
}
return false;
}
}
Successivamente, ecco il codice per la classe Producer
che verrà utilizzata per creare tutti i thread il cui lavoro è di leggere un singolo file ciascuno. Vedrai che il produttore legge un file specifico riga per riga e aggiunge quelle righe alla coda in quanto c'è spazio disponibile facendo uso del metodo put
.
package multithreading.producer_consumer.blockingQueue;
import java.io.BufferedReader;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.concurrent.BlockingQueue;
public class Producer implements Runnable{
private Path fileToRead;
private BlockingQueue<String> queue;
public Producer(Path filePath, BlockingQueue<String> q){
fileToRead = filePath;
queue = q;
}
@Override
public void run() {
try {
BufferedReader reader = Files.newBufferedReader(fileToRead);
String line;
while((line = reader.readLine()) != null){
try {
queue.put(line);
System.out.println(Thread.currentThread().getName() + " added \"" + line + "\" to queue, queue size: " + queue.size());
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName()+" finished");
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
Infine, ecco la classe Consumer
che sarà responsabile della lettura dei dati dalla coda e dell'elaborazione in modo appropriato. Si noti che questa classe non utilizza il metodo take
. L'ho scritto in questo modo in modo che il programma terminasse dopo aver elaborato tutti i file. Se vuoi che i consumatori rimangano vivi, puoi sostituire poll
con take
(insieme ad alcune piccole modifiche al metodo run
come passare il InterruptedException
che potrebbe verificarsi mentre aspetti che take
restituisca un valore valore).
package multithreading.producer_consumer.blockingQueue;
import java.util.concurrent.BlockingQueue;
public class Consumer implements Runnable{
private BlockingQueue<String> queue;
public Consumer(BlockingQueue<String> q){
queue = q;
}
public void run(){
while(true){
String line = queue.poll();
if(line == null && !Controller.isProducerAlive())
return;
if(line != null){
System.out.println(Thread.currentThread().getName()+" processing line: "+line);
//Do something with the line here like see if it contains a string
}
}
}
}
Ecco i 3 file di input che ho usato:
file1.txt
file #1 line 1
file #1 line 2
file #1 line 3
file #1 line 4
file #1 line 5
file2.txt
This is file #2 line 1
This is file #2 line 2
This is file #2 line 3
This is file #2 line 4
This is file #2 line 5
file3.txt
Lastly we have file #3 line 1
Lastly we have file #3 line 2
Lastly we have file #3 line 3
Lastly we have file #3 line 4
Lastly we have file #3 line 5
Ecco alcuni esempi di output dal programma. Tieni presente che System.out.println non è sincronizzato, quindi l'output non è in ordine.
consumer-0 processing line: Lastly we have file #3 line 1
consumer-0 processing line: This is file #2 line 1
producer-2 added "This is file #2 line 1" to queue, queue size: 1
producer-2 added "This is file #2 line 2" to queue, queue size: 1
producer-2 added "This is file #2 line 3" to queue, queue size: 1
producer-2 added "This is file #2 line 4" to queue, queue size: 2
consumer-1 processing line: file #1 line 1
consumer-1 processing line: This is file #2 line 4
consumer-1 processing line: This is file #2 line 5
producer-1 added "file #1 line 1" to queue, queue size: 1
producer-1 added "file #1 line 2" to queue, queue size: 0
producer-3 added "Lastly we have file #3 line 1" to queue, queue size: 0
producer-1 added "file #1 line 3" to queue, queue size: 1
consumer-1 processing line: file #1 line 2
producer-2 added "This is file #2 line 5" to queue, queue size: 0
producer-1 added "file #1 line 4" to queue, queue size: 2
producer-2 finished
consumer-2 processing line: This is file #2 line 3
consumer-2 processing line: Lastly we have file #3 line 2
consumer-2 processing line: file #1 line 4
consumer-2 processing line: file #1 line 5
consumer-0 processing line: This is file #2 line 2
producer-1 added "file #1 line 5" to queue, queue size: 0
producer-1 finished
consumer-1 processing line: file #1 line 3
producer-3 added "Lastly we have file #3 line 2" to queue, queue size: 2
producer-3 added "Lastly we have file #3 line 3" to queue, queue size: 1
producer-3 added "Lastly we have file #3 line 4" to queue, queue size: 1
producer-3 added "Lastly we have file #3 line 5" to queue, queue size: 0
producer-3 finished
consumer-0 processing line: Lastly we have file #3 line 3
consumer-2 processing line: Lastly we have file #3 line 5
consumer-1 processing line: Lastly we have file #3 line 4
Controller finished
Spero che questo sia utile per illustrare come è possibile eseguire l'attività senza utilizzare ExecutorService. Divertiti!