Ecco un esempio MCVE di come ho finito per implementarlo. Questo funziona interamente in PySpark V2.1.1 con l'aiuto di scikit-learn sotto alcune rigorose ipotesi (vedi Requisito 2).
Requisiti:
- scikit-learn è installato su ogni macchina worker
- per un singolo modello, tutti i dati e l'overhead di formazione possono essere contenuti su una singola macchina worker
Il flusso di lavoro generale è:
- Trasforma il DataFrame in un RDD [(K, V)] in cui le chiavi sono ID di gruppo e i valori sono osservazioni di dati individuali
- Utilizza
groupByKey
per mescolare tutti i dati per una singola chiave su una macchina singola worker
- Formi i modelli per ciascun tasto sulle macchine worker
- Raccogli i modelli addestrati per la serializzazione per il successivo recupero
Per la mia applicazione, questo approccio funziona in pochi minuti su un grande cluster Spark. Sto allenando decine di migliaia di modelli.
import numpy as np
from pyspark.sql import SparkSession
from sklearn.mixture import GaussianMixture
##
# Make the example PySpark DataFrame
##
# Generate example data
nsamps = 500
cv1_1 = np.array([[1,0],[0,1]])
cv1_2 = np.array([[2,0],[0,0.5]])
cv2_1 = np.array([[2, -1.5,],[-1.5, 2]])
cv2_2 = np.array([[2, 1.5,],[1.5, 2]])
mu1_1 = np.array([0,0])
mu1_2 = np.array([0,3])
mu2_1 = mu1_1 + np.array([5,5])
mu2_2 = mu2_1 + np.array([5,5])
# Group 1 data
x1_1 = np.matmul(np.random.randn(nsamps,2), cv1_1) + mu1_1
x1_2 = np.matmul(np.random.randn(nsamps,2), cv1_2) + mu1_2
X1 = np.concatenate([x1_1, x1_2])
# Group 2 data
# X2 = np.matmul(np.random.randn(nsamps,2), cv2_1) + mu2_1
x2_1 = np.matmul(np.random.randn(nsamps,2), cv2_1) + mu2_1
x2_2 = np.matmul(np.random.randn(nsamps,2), cv2_2) + mu2_2
X2 = np.concatenate([x2_1, x2_2])
# Group lables
labs = 2*nsamps*["a"] + 2*nsamps*["b"]
# Create the data frame
X = np.concatenate([X1, X2]).tolist()
dat = [(i, x[0], x[1]) for (i, x) in zip(labs, X)]
cols = ["id", "x", "y"]
df = spark.createDataFrame(dat, cols)
##
# Shuffle groups to individual workers and train models
##
# group by ids
kv = df.rdd.map(lambda r: (r.id, [r.x, r.y]))
# create a distrributed RDD where each group is localized on a single worker node
groups = kv.groupByKey()
# a single group is a tuple of id and an iterable with the data
# e.g. (u'a', <pyspark.resultiterable.ResultIterable at 0x7effd7debb90>)
# helper function to train GMMs on the data iterables
def trainGMM(data_itr):
# Returns a trained GMM
X = np.array(data_itr.data).astype(np.float64)
gmm = GaussianMixture(n_components=2, covariance_type='full', tol=0.001, reg_covar=1e-06, max_iter=100,
n_init=1, init_params='kmeans', weights_init=None, means_init=None, precisions_init=None,
random_state=None, warm_start=False, verbose=0, verbose_interval=10)
gmm.fit(X)
return gmm
# Train GMMs
gmms = groups.mapValues(trainGMM) # still just a transformation
##
# Collect and Serialize GMMs
##
# the trained models are small, so we can collect to a single machine
collected_gmms = gmms.collect()
# pickle models for restoring later
outRoot = "local/output/dir/"
for tup in collected_gmms:
id = tup[0]
gmm = tup[1]
with open("%s/%s_gmm.pkl" % (outRoot, id), 'w') as fout:
pickle.dump(gmm, fout)