Come funziona la soluzione consumatore-produttore?

0

Sono solo un principiante e il mio libro non tratta questo argomento. Ho studiato il mio problema e ho scoperto che un'implementazione del modello consumatore-produttore è la soluzione ideale e l'ho googolata, ho letto quello che potevo, ho tentato di enunciare esempi ... ma non sono stato fortunato. Gradirei davvero un po 'di orientamento.

Sto scrivendo in Ruby, se questo fa la differenza.

Sfondo

Sto scrivendo uno script che scrobbling il mio backlog della libreria musicale su Last.FM. Per chiunque non abbia familiarità con quel servizio, ciò significa semplicemente che sto facendo molte richieste POST a un'API HTTP.

Comincio con un array, ogni elemento è un hash / dizionario come {artist: "The Cure", track: "Siamese Twins"} . Eseguo le chiamate eseguendo un'iterazione sull'array e inviando la semplice chiamata di metodo lastfm.track.scrobble artist: song[:artist], track: song[:track] .

Problema

Fare questo in uno stile di blocco alla volta semplice funziona perfettamente, ma è molto molto lento, perché sto aspettando ~ 2 secondi per ogni richiesta HTTP di viaggiare per il mondo e ritornare. Potrei finire 5 volte più velocemente se avessi inviato contemporaneamente 5 richieste HTTP.

Ho chiesto su Stack Overflow quale sarebbe stata la soluzione migliore. Dividi l'array in cinque parti e dai a parte un thread che esegue una richiesta alla volta? Qualcosa come XMLHttpRequest di JavaScript che ha un ciclo di eventi e una funzione di callback? Mi hanno detto che questa scelta sarebbe più adatta ai programmatori SE, e che probabilmente desidero il modello del produttore-consumatore. L'ho cercato e sembra il tipo di cosa di cui ho bisogno.

Il mio tentativo in Ruby, basato su un post di StackOverflow

# "lastfm" is an object representing my Last.FM profile, 
# its .track.scrobble method handles a POST request and waits until it returns
# "songs" is an array of hashes like {artist: "The Cure", track: "One Hundred Years"}

queue = SizedQueue.new(10) # this will only allow 10 items on the queue at once

p1 = Thread.new do 
  songs.each do |song|
      scrobble = lastfm.track.scrobble artist: song[:artist], track: song[:track]
      puts "Scrobbled #{song[:track]} by #{song[:artist]}."
      queue << scrobble
  end
  queue << "done"
end

consumer = Thread.new do
  blocker = queue.pop(true) # don't block when zero items are in queue
  Thread.exit if blocker == "done"
  process(blocker)
end

# wait for the consumer to finish
consumer.join

Il mio errore

Errore nella chiamata al metodo "pop" con errore "coda vuota".

Non ne so abbastanza di questa roba per capire veramente cosa sta succedendo. Sembra che un thread stia compilando una coda con le chiamate API da eseguire, mai più di 10 alla volta, mentre un altro thread sta consumando da quella coda e sta eseguendo. Ma perché pop? Dove viene mai effettivamente eseguito? Perché aggiungo "completato" alla coda?

Sono preoccupato per la riga scrobble = lastfm.track.scrobble artist: song[:artist], track: song[:track] . Questo non chiamerà il metodo lastfm.track.scrobble a titolo definitivo e memorizzerà il suo valore di ritorno come scrobble ? Dovrei usare invece proc o lambda e chiamare quel proc / lambda nel consumatore?

    
posta GreenTriangle 26.10.2014 - 07:38
fonte

2 risposte

2

La risposta di scriptin sta dando una soluzione ragionevole, ma non risponde alle tue domande sul fallimento dell'implementazione che hai avuto.

  • Viene visualizzato un errore "coda vuota" quando chiami pop su una coda Ruby in modalità non bloccante ( true passato) e non ci sono elementi in coda.
  • Il tuo thread "produttore" viene creato e il tuo thread "consumatore" viene creato, ma verranno eseguiti indipendentemente e inizieranno ad essere eseguiti ogni volta che Ruby avrà voglia.
  • Se il tuo "consumatore" viene eseguito prima che il "produttore" abbia inserito qualcosa nella coda, ovviamente il "consumatore" poperà una coda vuota e genererà un'eccezione.

La soluzione a questo è facile; il consumatore non dovrebbe usare la modalità non bloccante. Non passare true , chiama solo pop() . Il punto intero del thread consumatore è che può bloccare una coda in attesa del produttore, senza che il resto dell'applicazione si fermi.

Come chiamare lastfm.track.scrobble - sì, questo è l'esecuzione di quel metodo immediatamente e spinge il risultato sulla coda, che non è esattamente quello che vuoi :-D ed è in effetti il motivo per cui "in modo affidabile" il tuo thread di consumo arriva a fare la coda prima che il produttore abbia la possibilità di spingere qualsiasi cosa (anche se il thread del produttore viene eseguito per primo, fa una lenta chiamata "scrobble" e si aspetta che le richieste / risposte HTTP siano completate; il thread consumatore viene eseguito e tenta di leggere dalla coda vuota).

Vuoi spingere i parametri a scrobble sulla coda, poi fare in modo che il thread del consumatore esegua lo scrobbling. È "più bello" / più strutturato per creare piccole classi che incapsulano il lavoro e spingere le istanze di quelle sulla tua coda, ma una soluzione poco costosa e malvagia usa semplicemente l'hash che scrobble prende come input. Dobbiamo anche ricordarci di eseguire il ciclo sopra le voci della coda all'interno del consumatore, invece di elaborarne solo una! Quindi ( UNTESTED ) potremmo avere:

queue = SizedQueue.new(10)

Thread.new do 
  songs.each do |song|
      queue << { artist: song[:artist], track: song[:track] }
  end
  queue << "done"
end

consumer = Thread.new do
  loop do
    scrobble_args = queue.pop()
    Thread.exit if scrobble_args == "done"
    lastfm.track.scrobble(scrobble_args)
    puts "Scrobbled #{scrobble_args[:track] by scrobble_args[:artist]}."
  end
end

consumer.join

Questo dovrebbe lavorare tecnicamente, ma è totalmente inutile, perché quel thread solitario per il consumatore sta solo elaborando la coda in serie. Quindi cerchiamo di semplificare questo. In realtà, vuoi solo scaricare tutte le "canzoni" in coda e fare in modo che il thread del consumatore faccia il lavoro. Poiché vogliamo eseguire in parallelo, creeremo una serie di thread.

# I'm not bothering with the producer thread here so the queue size is unlimited.
# You could just as well put the 'songs.each' inside a producer thread with a
# a sized queue, as per your original code, if you wanted.
#
queue = Queue.new
songs.each do | song |
  queue << song
end

consumers = []

1.upto( 10 ) do
  consumers << Thread.new do

    # Keep looping until asked to exit
    #
    loop do
      song = queue.pop()
      Thread.exit if song == 'done'
      lastfm.track.scrobble( artist: song[ :artist ], track: song[ :track ] )
      puts "Scrobbled #{ song[ :track ] } by #{ song[ :artist ] }."
    end
  end

  # Push one 'done' exit instruction for each consumer
  #
  queue << 'done'
end

# Wait for all consumer threads to complete
#
consumers.each do | consumer |
  consumer.join()
end

... e quello dovrebbe fare il trucco. La gemma "Parallela" probabilmente farà lo stesso genere di cose sotto il cofano.

    
risposta data 14.01.2015 - 05:29
fonte
1

Sembra che ogni canzone sia elaborata indipendentemente, nel qual caso hai solo bisogno di una raccolta parallela. Google veloce ha trovato questo gioiello: link

Il codice sarebbe qualcosa del genere:

Parallel.each(songs) do |song|
  lastfm.track.scrobble artist: song[:artist], track: song[:track]
  puts "Scrobbled #{song[:track]} by #{song[:artist]}."
end

o quello, se vuoi vedere i risultati:

scrobbled_songs = Parallel.map(songs) do |song|
  scrobbled_song = lastfm.track.scrobble artist: song[:artist], track: song[:track]
  puts "Scrobbled #{song[:track]} by #{song[:artist]}."
  scrobbled_song
end

Come dice il documento, puoi regolare il numero di processi / thread in modo che corrisponda al numero di core della CPU.

Sta ancora bloccando ogni richiesta HTTP (supponendo che lastfm.track.scrobble stia bloccando l'operazione), lo stai facendo in parallelo usando più core (supponendo che ne hai più di uno).

Se hai bisogno di requisitoria HTTP asincrona, potresti semplicemente cercare uno strumento valido, come em-http-request o simili.

    
risposta data 26.10.2014 - 12:41
fonte

Leggi altre domande sui tag