Formazione distribuita di molti piccoli modelli ML

1

Ho un'applicazione di data science che prevede l'addestramento di decine di migliaia di piccoli modelli Gaussiani individuali. Con "piccolo", intendo che qualsiasi modello individuale può essere facilmente addestrato su uno dei nostri server di lavoro. In effetti, dovremmo essere in grado di addestrare diversi contemporaneamente su ciascun nodo.

Ho esplorato Spark con Yarn, ma Spark sembra davvero destinato alla formazione di grandi modelli su più macchine, non su molti piccoli modelli contenuti su singole macchine.

Sto immaginando un flusso di lavoro che assomiglia a:

  1. raggruppa i dati per chiave
  2. invia interi gruppi di dati alle singole macchine worker,
  3. forma modelli per gruppi sulla stessa macchina worker.
  4. Segnala indietro o salva modelli addestrati.

Potrei usare alcune indicazioni su come implementarlo. L'allenamento del modello è imbarazzantemente parallelizzabile.

    
posta andrew 17.09.2018 - 20:22
fonte

2 risposte

0

Ecco un esempio MCVE di come ho finito per implementarlo. Questo funziona interamente in PySpark V2.1.1 con l'aiuto di scikit-learn sotto alcune rigorose ipotesi (vedi Requisito 2).

Requisiti:

  1. scikit-learn è installato su ogni macchina worker
  2. per un singolo modello, tutti i dati e l'overhead di formazione possono essere contenuti su una singola macchina worker

Il flusso di lavoro generale è:

  1. Trasforma il DataFrame in un RDD [(K, V)] in cui le chiavi sono ID di gruppo e i valori sono osservazioni di dati individuali
  2. Utilizza groupByKey per mescolare tutti i dati per una singola chiave su una macchina singola worker
  3. Formi i modelli per ciascun tasto sulle macchine worker
  4. Raccogli i modelli addestrati per la serializzazione per il successivo recupero

Per la mia applicazione, questo approccio funziona in pochi minuti su un grande cluster Spark. Sto allenando decine di migliaia di modelli.

import numpy as np
from pyspark.sql import SparkSession
from sklearn.mixture import GaussianMixture

##
# Make the example PySpark DataFrame
##

# Generate example data
nsamps = 500
cv1_1 = np.array([[1,0],[0,1]])
cv1_2 = np.array([[2,0],[0,0.5]])
cv2_1 = np.array([[2, -1.5,],[-1.5, 2]])
cv2_2 = np.array([[2, 1.5,],[1.5, 2]])

mu1_1 = np.array([0,0])
mu1_2 = np.array([0,3])

mu2_1 = mu1_1 + np.array([5,5])
mu2_2 = mu2_1 + np.array([5,5])

# Group 1 data
x1_1 = np.matmul(np.random.randn(nsamps,2), cv1_1) + mu1_1
x1_2 = np.matmul(np.random.randn(nsamps,2), cv1_2) + mu1_2
X1 = np.concatenate([x1_1, x1_2])

# Group 2 data
# X2 = np.matmul(np.random.randn(nsamps,2), cv2_1) + mu2_1
x2_1 = np.matmul(np.random.randn(nsamps,2), cv2_1) + mu2_1
x2_2 = np.matmul(np.random.randn(nsamps,2), cv2_2) + mu2_2
X2 = np.concatenate([x2_1, x2_2])

# Group lables
labs = 2*nsamps*["a"] + 2*nsamps*["b"]

# Create the data frame
X = np.concatenate([X1, X2]).tolist()
dat = [(i, x[0], x[1]) for (i, x) in zip(labs, X)]
cols = ["id", "x", "y"]
df = spark.createDataFrame(dat, cols)

##
# Shuffle groups to individual workers and train models
##

# group by ids
kv = df.rdd.map(lambda r: (r.id, [r.x, r.y]))
# create a distrributed RDD where each group is localized on a single worker node
groups = kv.groupByKey()
# a single group is a tuple of id and an iterable with the data
# e.g. (u'a', <pyspark.resultiterable.ResultIterable at 0x7effd7debb90>)

# helper function to train GMMs on the data iterables
def trainGMM(data_itr):
    # Returns a trained GMM
    X = np.array(data_itr.data).astype(np.float64)
    gmm = GaussianMixture(n_components=2, covariance_type='full', tol=0.001, reg_covar=1e-06, max_iter=100,
                          n_init=1, init_params='kmeans', weights_init=None, means_init=None, precisions_init=None,
                          random_state=None, warm_start=False, verbose=0, verbose_interval=10)
    gmm.fit(X)
    return gmm

# Train GMMs
gmms = groups.mapValues(trainGMM)  # still just a transformation

##
# Collect and Serialize GMMs
##

# the trained models are small, so we can collect to a single machine
collected_gmms = gmms.collect()

# pickle models for restoring later
outRoot = "local/output/dir/"

for tup in collected_gmms:
    id = tup[0]
    gmm = tup[1]
    with open("%s/%s_gmm.pkl" % (outRoot, id), 'w') as fout:
        pickle.dump(gmm, fout)
    
risposta data 25.09.2018 - 18:54
fonte
2

Non c'è un modo giusto o sbagliato per farlo perché questo dipende dai progetti e se è possibile sfruttare la struttura dei dati per renderli efficienti.

es. per un progetto una tantum, è possibile installare il software necessario su tutti i server, preparare pacchetti di lavoro, preparare un accesso SSH per tutti i server e quindi utilizzare GNU Parallel per mantenere tutti i server impegnati nell'elaborazione dei pacchetti di lavoro. Questo è particolarmente adatto per un approccio ad-hoc in cui i dati di input e i modelli di output sono memorizzati come file semplici e se si ha familiarità con la riga di comando.

Se si desidera addestrare regolarmente e automaticamente i nuovi modelli, potrebbe essere preferibile creare una coda di elementi di lavoro, ovvero un database condiviso che contenga tutti gli elementi di lavoro e i risultati. Quindi si utilizza un software di gestione per distribuire il software del server di lavoro personalizzato su tutti i nodi del cluster. Questo server di lavoro attende la coda per i pacchetti di lavoro e scrive nuovamente i risultati nel database. Questo potrebbe anche essere combinato con un intelligente auto-ridimensionamento per adattare il numero di lavoratori alla quantità di lavoro in sospeso, ma potrebbe essere eccessivo per un progetto semplice.

In entrambi i casi:

  1. Inizia scrivendo un semplice software per i lavoratori in grado di addestrare i modelli localmente.
  2. Estendi il software in modo che più operatori del tuo computer locale possano allenarsi in parallelo - non utilizzare più thread. Questo può comportare un database o un software come GNU Parallelo per sincronizzare i lavoratori.
  3. Trova un modo per eseguire quei lavoratori distribuiti su più computer. Il tuo software worker è già in grado di farlo, questo passaggio è principalmente un problema sysadmin ("ops").
risposta data 18.09.2018 - 13:57
fonte

Leggi altre domande sui tag