Ricerca di linee guida sul thread Safe Scripting

4

Sto provando a scrivere uno script che richiederà più percorsi ai file su vari server, li cercherà tutti contemporaneamente e restituirà un singolo elenco di risultati a un utente. Inizialmente, stavo usando solo i thread Python per fare ciò, tuttavia ho riscontrato alcuni problemi noti:

  1. Non stavo controllando quanti thread potevano essere avviati. Quindi se qualcuno inviava 100 file per interrogare un server, avresti 100 thread avviati su quella macchina, che erano cattive notizie.

  2. I risultati che stavo tornando erano incompleti e variati drasticamente. Se eseguissi le ricerche in modo lineare (senza discussioni), otterrei risultati completi, ma ci sarebbe voluto molto tempo. Ho concluso basandomi su questa e alcune ricerche personali che non stavo adottando un approccio sicuro per i thread e ho iniziato a esaminare il modulo Queue.

Ho finito con qualcosa di simile ...

def worker():
   while q.qsize != 0:
      cmd = q.get()
      # kick off a bash command that zgreps files from different servers
      p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
      results.extend(''.join(p.stdout.readlines()).split('\n')[:-1])
      q.task_done()

NUM_WORKER_THREADS = 10
results = []

q = Queue.Queue()
for i in range(NUM_WORKER_THREADS):
   t = threading.Thread(target=worker)
   t.daemon = True
   t.start()

""" Code to generate input commands needed here """

for c in commands:
   q.put(c)

q.join()

""" Post processing of collected *results* array"""

Dopo aver inserito alcuni vincoli del pool di thread attorno al mio programma e aver controllato ogni thread se c'è ancora qualcosa nella coda, i miei risultati sono in linea con quello che mi aspetterei. Dopo il test, i risultati corrispondono all'output di un singolo approccio filettato (tranne che è molto più veloce).

Le mie domande sono le seguenti:

  1. Il mio approccio è thread-safe? C'è qualche possibilità che uno dei 10 thread di lavoro possa sovrascrivere un tentativo di estendere l'array di risultati? Sono preoccupato che abbia appena diminuito le possibilità di sovrascrittura, allocando un pool di thread più piccolo per gestire l'input, ma in realtà non abbiamo risolto il problema.

  2. Capisco dalla lettura che le code dovrebbero essere thread-safe. Tuttavia, quando elimino i pool di thread e non controllo la dimensione della coda nei thread, posso riprodurre lo stesso problema che avevo prima con grandi volumi di input. Qualcuno può spiegare perché è così?

posta Elias51 15.10.2014 - 16:19
fonte

1 risposta

3

1a. Is my approach thread-safe?

No perché i tuoi risultati non sono thread-safe ma vengono letti e scritti da molti thread. Prendi in considerazione la possibilità di trasformarlo in una coda.

1b. Is there any chance one of the 10 worker threads could overwrite anothers attempt to extend the results array?

Sì, è esattamente ciò che potrebbe accadere, motivo per cui dovresti evitare l'uso di un array in questo modo durante il threading.

2. I understand from reading that queues are supposed to be thread-safe. However, when I eliminate thread pools and don't check for the queue size in my threads, I can reproduce the same problem I was having before with large volumes of input. Can someone explain why that is?

Dovresti davvero usare il pool di threading prebaked per fare il tuo lavoro. Dovrebbe rendere la tua vita molto più semplice. Prende una funzione per chiamare, e una lista che contiene tutte le variabili con cui dovrebbe essere chiamata, e quindi distribuisce l'elenco di lavoro tra i thread. L'unico problema è che la funzione che stai chiamando può essere passata solo con un parametro, ma dal momento che ne stai usando solo uno puoi adattare il tuo codice abbastanza facilmente

Ad esempio:

from multiprocessing import Pool

def do_work(cmd):
  # kick off a bash command that zgreps files from different server
  # Not sure if this can be done better. Not clear what command you're running
  p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
  return ''.join(p.stdout.readlines()).split('\n')[:-1]

NUM_WORKER_THREADS = 10

p = Pool(NUM_WORKER_THREADS)
results = p.map(do_work, commands)

Come nota a margine, puoi sempre aggirare il limite di 1 argomento semplicemente comprimendo tutti gli argomenti necessari in una tupla e poi scompattandoli all'inizio del metodo. Le tuple sono economiche in python.

Ad esempio se avevi una funzione do_work che richiedeva due comandi come segue:

def do_work(cmd_one, cmd_two):
  p = subprocess.Popen(cmd_one, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
  p2 = subprocess.Popen(cmd_two, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
  return ''.join(p.stdout.readlines()) + ''.join(p2.stdout.readlines())

potremmo riscriverlo come segue:

def do_work(commands):
  cmd_one, cmd_two= commands
  p = subprocess.Popen(cmd_one, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
  p2 = subprocess.Popen(cmd_two, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
  return ''.join(p.stdout.readlines()) + ''.join(p2.stdout.readlines())

e cambia il nostro metodo principale in modo che assomigli al seguente:

from multiprocessing import Pool

NUM_WORKER_THREADS = 10

p = Pool(NUM_WORKER_THREADS)
results = p.map(do_work, [(x, y) for x in command1_list for y in command2_list])

In questo modo, impacchettiamo entrambe le nostre variabili in una tupla per la chiamata, e quindi le disimballiamo immediatamente quando ne abbiamo bisogno.

    
risposta data 15.10.2014 - 17:15
fonte

Leggi altre domande sui tag