Ho un caso d'uso per il quale ho bisogno di costruire una pipeline di elaborazione dei dati
- Il contatto con il cliente conduce a dati provenienti da diverse fonti di dati come csv, data base, api deve essere prima mappato a campi di uno schema universale. Potrebbero esserci circa 100.000 file ogni giorno che devono essere elaborate.
- Quindi alcuni campi devono essere puliti, convalidati e arricchiti. Ad esempio, il campo email deve essere convalidato chiamando una API esterna per verificare se è valida e non rimbalza, il campo indirizzo deve essere standardizzato in un formato particolare. Ci sono altre operazioni come la stima della città, lo stato da zip, la convalida del numero di telefono. Le operazioni di Atleast 20 sono già pianificate, altre in futuro
- Le regole precedenti sono non fisse e possono cambiare in base a ciò che l'utente desidera fare con i suoi dati (salvati dall'interfaccia utente). Ad esempio, per un dato particolare, un utente può solo scegliere di standardizzare il suo numero di telefono, ma non verificare se è valido: quindi le operazioni eseguite sui dati sono dinamiche.
Ecco cosa sto facendo attualmente:
-
Carica i dati come frame dati panda (hai preso in considerazione la scintilla, ma il data set non è così grande [max 200 mb-] per usare la scintilla). Avere un elenco di operazioni definite dall'utente che devono essere eseguite su ogni campo come
actions = {"phone_number": ['cleanse', 'standardize'], "zip": ["arricchisci", "convalida"]}
Come accennato in precedenza, le azioni sono dinamiche e variano dall'origine dati all'origine dati in base a ciò che l'utente sceglie di fare su ciascun campo. Ci sono molte attività personalizzate come questa che possono essere applicate specificamente a un campo specifico.
- Ho una funzione personalizzata per ogni operazione che l'utente può definire per i campi. Li chiamo in base al dizionario "azioni" e trasferisco il frame di dati alla funzione: la funzione applica la logica scritta al frame di dati e restituisce il frame di dati modificato.
def cleanse_phone_no(df, configs): # Logic return modified_df
Non sono sicuro se questo sia l'approccio giusto per farlo. Le cose si complicheranno quando dovrò chiamare API esterne per arricchire determinati campi in futuro. Quindi sto considerando un modello produttore-consumatore
a. Avere un modulo produttore che crei tale suddivisione ogni riga nel file (1 record di contatto) come singolo messaggio in una coda come AMQ o Kafka
b. Avere la logica per elaborare i dati nei consumatori: prenderanno un messaggio alla volta e li elaboreranno
c. Il vantaggio che vedo con questo approccio è: semplifica il processo di elaborazione dei dati, i dati vengono elaborati un record alla volta. C'è più controllo e granularità Lo svantaggio è che creerà un sovraccarico in termini di calcolo come un record elaborato uno ad uno - che posso superare in misura maggiore utilizzando più consumatori
Ecco le mie domande:
- Qual è la tua opinione sull'approccio? Hai qualche suggerimento per un approccio migliore?
- C'è un motivo più elegante che posso usare per applicare le regole personalizzate al set di dati che attualmente sto utilizzando
- Sta usando un modello produttore-consumatore per elaborare i dati una riga alla volta rispetto all'intero insieme di dati consigliabile (considerando tutta la complessità della logica che verrebbe in futuro)? Se è così dovrei usare AMQ o Kafka?