Come ottimizzare / parallelizzare il seguente algoritmo di clustering / join:

5

Ho un algoritmo relativamente piccolo che occupa circa il 60% del tempo di esecuzione totale del mio codice scientifico (57 righe di 3600), quindi mi piacerebbe trovare un modo per ottimizzare ciò che sto facendo e fare il codice indipendente dall'ordine in modo da poter applicare una stringa parallela cilk_for .

Ecco cosa fa, verbalmente : ho un std::vector di puntatori agli oggetti personalizzati chiamati Segment ( vector<Segment*> newSegment ). Ogni Segment contiene un std::vector di numeri interi (indici mesh). In questa funzione, vorrei trovare qualsiasi Segment che si sovrapponga a qualsiasi altro, con la sovrapposizione definita come il membro indices che si sovrappone alla riga del numero. Se si sovrappongono, mi piacerebbe unirli insieme (inserire A.indices in B.indices ) ed eliminarne uno (elimina A ).

es. 1: A.indices = {1,2,3} B.indices = {4,5,6} non si sovrappongono; non fare nulla

es. 2:   A.indices = {1,2,4} B.indices = {3,5,6} si sovrappongono; A = eliminato B.indices = {1,2,3,4,5,6}

Le sovrapposizioni sono sparse, ma esistenti.

Ecco il codice corrente :

Algoritmo principale:

//make sure segments don't overlap
for (unsigned i = 0; i < newSegment.size(); ++i) {
    if (newSegment[i]->size() == 0) continue;
    for (unsigned j = i + 1; j < newSegment.size(); ++j) {
        if (newSegment[i]->size() == 0) continue;
        if (newSegment[j]->size() == 0) continue;
        int i1 = newSegment[i]->begin();
        int i2 = static_cast<int>(newSegment[i]->end());
        int j1 = newSegment[j]->begin();
        int j2 = static_cast<int>(newSegment[j]->end());
        int L1 = abs(i1 - i2); 
        int L2 = abs(j1 - j2); 
        int dist = max(i1,i2,j1,j2) - min(i1,i2,j1,j2);

        //if overlap, fold segments together
        //copy indices from shorter segment to taller segment
        if (dist <= L1 + L2) {
            unsigned more, less;
            if (newSegment[i]->slope == newSegment[j]->slope) {
                if (value_max[i] > value_max[j]) {
                    more = i;
                    less = j;
                } else {
                    more = j;
                    less = i;
                }
            } else if (newSegment[i]->size() == 1) {
                more = j; less = i;
            } else if (newSegment[j]->size() == 1) {
                more = i; less = j;
            } else assert(1 == 0);
              while(!newSegment[less]->indices.empty()) {
                unsigned index = newSegment[less]->indices.back();
                newSegment[less]->indices.pop_back();
                newSegment[more]->indices.push_back(index);
            }
        }
    }

}//end overlap check

//delete empty segments
vector<unsigned> delList;
for (unsigned i = 0; i < newSegment.size(); ++i) {
    if (newSegment[i]->size() == 0) {                            //delete empty
        delList.push_back(i);
        continue;
    }
}
while (delList.size() > 0) {
    unsigned index = delList.back();
    delete newSegment.at(index);
    newSegment.erase(newSegment.begin() + index);
    delList.pop_back();
}

Rilevante Segment definizione della classe dell'oggetto e funzioni membro:

class Segment{

    public:
    Segment();
    ~Segment();

    unsigned size();
    int begin();
    unsigned end();
    std::vector<int> indices;
    double slope;
};

int Segment::begin() {
    if (!is_sorted(indices.begin(),indices.end()))      std::sort(indices.begin(),indices.end());
    if (indices.size() == 0) return -1; 
    return indices[0];
}

unsigned Segment::end() {
    if (!is_sorted(indices.begin(),indices.end()))    std::sort(indices.begin(),indices.end());
    return indices.back();
}

unsigned Segment::size() {
    unsigned indSize = indices.size();
    if (indSize == 1) {
        if (indices[0] == -1) return 0;
    }   
    return indSize;
}

idee :

  1. Dato che non mi interessa l'ordine degli oggetti Segment , potrebbero trovarsi in un contenitore privo di ordine?
  2. Nel mio algoritmo, trovo la sovrapposizione guardando il primo e l'ultimo indices di ogni segmento. Faccio un std::is_sorted (e poi forse un std::sort ) quando prendo il indices perché l'elenco può cambiare quando vengono inseriti più indici. Forse potrei inserire indices in std::set anziché std::vector per salvare l'ordinamento / controllo di ordinamento esplicito?
  3. Sono abbastanza sicuro che modificando il indices mentre procedo, questo lo rende dipendente dall'ordine. Forse, potrei suddividere il codice nella seguente organizzazione usando il concetto di un grafo non orientato per renderlo indipendente dall'ordine:

    • scoperta del bordo (senza modificare indices )
    • unire i cluster dei nodi connessi ( Segment oggetti che si sovrappongono) utilizzando un attraversamento grafico
    • cancella gli oggetti Segment vuoti

Domande

  1. Le idee sopra sono valide o trascurabili per le prestazioni?
  2. In quale altro modo posso ottimizzarlo?
  3. Come (se non sopra) posso rendere l'algoritmo indipendente dall'ordine?
posta Stershic 08.11.2015 - 16:15
fonte

2 risposte

4

La funzione is_sorted() è probabilmente costosa, quindi dovresti evitarlo. Perché non ordinare tutto in una volta all'inizio, prima di entrare nei loop?

Il modo migliore per ottimizzare il tuo codice è inventare un nuovo algoritmo che eviti i loop nidificati di N, perché ha una complessità di O (N ^ 2) (vedi "big-Oh notation".) Vedi Bart van Ingen Il commento di Schenau su come ottenere questo risultato.

    
risposta data 08.11.2015 - 18:52
fonte
0

Sono arrivato a un algoritmo identico a @BartVanIngenSchenau in questo commento In pratica ordina il set di segmenti in base all'elemento min di ogni segmento. Quindi due elementi adiacenti si sovrappongono se e solo se Segment[i].max >= Segment[i+1].min

Ma vorrei aggiungere che l'ordinamento sembra assolutamente non necessario e che mantiene solo l'elemento max e min. Basta aggiornarli quando si uniscono i segmenti. (segment1+segment2).min = min(segment1.min,segment2.min) e (segment1+segment2).max = max(segment1.max,segment2.max) Inoltre se il segmento è ordinato per elemento min hai (Segment[i]+Segment[i+1]).min = segment[i].min (ma quest'ultima cosa potrebbe essere l'ottimizzazione prematura.) Ho notato + l'unione di due segmenti.

Per la localizzazione della cache, la soluzione migliore per unire potrebbe essere quella di avere un layout simile al seguente layout

ptr_to_2nd_segment
n_elt_of_1st_segment,
min_elt_of_1st_segment,
[
[other_elts_of_1st_segment,]
max_elt_of_1st_segment,]

ptr_to_3rd_segment
n_elt_of_2nd_segment,
min_elt_of_2nd_segment,
[
[other_elts_of_2nd_segment,]
max_elt_of_2nd_segment,]

...

Unire due elementi in questa configurazione sarebbe abbastanza semplice, lo sarebbe è solo questione di aggiornare ptr al prossimo elemento, aggiungendo il numero di elementi, spostando i secondi elementi del segmento e scambiando gli elementi massimi se necessario. Ciò consentirebbe una certa quantità di spazzatura dopo ogni unione (8 byte su architettura a 32 bit e 16 byte su architettura a 64 bit). Sapere se è possibile supportare tale posta indesiderata dipende dall'applicazione (inoltre, si potrebbe fare una sorta di garbage collection tra due iterazioni dell'algoritmo.)

Per parallelizzare, una volta che il set di segmenti è ordinato per min elemento, puoi dividere in una parte l'insieme di segmenti e fare l'unione in modo indipendente. Quindi unire solo al bordo di ciascuna parte. Ma come dice @MikeNakis in questo commento poiché la fusione è abbastanza legata alla memoria, potrebbe non essere ben parallelizzata

    
risposta data 10.11.2015 - 19:19
fonte

Leggi altre domande sui tag