Come simulare il join interno su file molto grandi in java (senza esaurire la memoria)

3

Sto provando a simulare i join SQL usando java e file di testo molto grandi (INNER, RIGHT OUTER e LEFT OUTER). I file sono già stati ordinati utilizzando una routine di ordinamento esterna.

Il problema che ho è che sto cercando di trovare il modo più efficiente per gestire la parte di unner dell'algoritmo. In questo momento sto usando due elenchi per memorizzare le righe che hanno la stessa chiave e scorrere l'insieme di righe nel file giusto una volta per ogni riga nel file di sinistra (a condizione che le chiavi corrispondano ancora). In altre parole, la chiave di join non è univoca in ogni file, quindi è necessario tenere conto delle situazioni del prodotto cartesiano ...

left_01, 1
left_02, 1
right_01, 1
right_02, 1
right_03, 1

left_01 joins to right_01 using key 1
left_01 joins to right_02 using key 1
left_01 joins to right_03 using key 1
left_02 joins to right_01 using key 1
left_02 joins to right_02 using key 1
left_02 joins to right_03 using key 1

La mia preoccupazione è quella della memoria. Mancherò di memoria se utilizzo l'approccio in basso ma voglio comunque che la parte inner join funzioni abbastanza velocemente. Qual è l'approccio migliore per affrontare la parte di join INNER tenendo presente che questi file potrebbero potenzialmente essere enormi

public class Joiner {

  private void join(BufferedReader left, BufferedReader right, BufferedWriter output) throws Throwable {

    BufferedReader _left = left;
    BufferedReader _right = right;
    BufferedWriter _output = output;
    Record _leftRecord;
    Record _rightRecord;

    _leftRecord = read(_left);
    _rightRecord = read(_right);

    while( _leftRecord != null && _rightRecord != null ) {

      if( _leftRecord.getKey() < _rightRecord.getKey() ) {
        write(_output, _leftRecord, null);
        _leftRecord = read(_left);
      }
      else if( _leftRecord.getKey() > _rightRecord.getKey() ) {
        write(_output, null, _rightRecord);
        _rightRecord = read(_right);
      }
      else {
        List<Record> leftList = new ArrayList<Record>();
        List<Record> rightList = new ArrayList<Record>();

        _leftRecord = readRecords(leftList, _leftRecord, _left);
        _rightRecord = readRecords(rightList, _rightRecord, _right);

        for( Record equalKeyLeftRecord : leftList ){
          for( Record equalKeyRightRecord : rightList ){
            write(_output, equalKeyLeftRecord, equalKeyRightRecord);
          }
        }
      }
    }

    if( _leftRecord != null ) {
      write(_output, _leftRecord, null);
      _leftRecord = read(_left);
      while(_leftRecord != null) {
        write(_output, _leftRecord, null);
        _leftRecord = read(_left);        
      }
    }
    else {  
      if( _rightRecord != null ) {
        write(_output, null, _rightRecord);
        _rightRecord = read(_right);
        while(_rightRecord != null) {
          write(_output, null, _rightRecord);
          _rightRecord = read(_right);
        }
      }     
    }
    _left.close();
    _right.close();
    _output.flush();
    _output.close();
  }

  private Record read(BufferedReader reader) throws Throwable {
    Record record = null;

    String data = reader.readLine();
    if( data != null ) {
      record = new Record(data.split("\t"));
    }
    return record;
  }

  private Record readRecords(List<Record> list, Record record, BufferedReader reader) throws Throwable {
    int key = record.getKey();
    list.add(record);
    record = read(reader);
    while( record != null && record.getKey() == key) {
      list.add(record);
      record = read(reader);
    }
    return record;    
  }

  private void write(BufferedWriter writer, Record left, Record right) throws Throwable {   
    String leftKey = (left == null ? "null" : Integer.toString(left.getKey()));
    String leftData = (left == null ? "null" : left.getData());
    String rightKey = (right == null ? "null" : Integer.toString(right.getKey()));
    String rightData = (right == null ? "null" : right.getData());

    writer.write("[" + leftKey + "][" + leftData + "][" + rightKey + "][" + rightData + "]\n");
  }

  public static void main(String[] args) {
        try {

          BufferedReader leftReader = new BufferedReader(new FileReader("LEFT.DAT"));
          BufferedReader rightReader = new BufferedReader(new FileReader("RIGHT.DAT"));
          BufferedWriter output = new BufferedWriter(new FileWriter("OUTPUT.DAT"));

      Joiner joiner = new Joiner();
      joiner.join(leftReader, rightReader, output);
    } 
        catch (Throwable e) {
      e.printStackTrace();
    }
  }
}

Dopo aver applicato le idee dalla risposta proposta, ho cambiato il loop in questo

private void join(RandomAccessFile left, RandomAccessFile right, BufferedWriter output) throws Throwable {

    long _pointer = 0;

    RandomAccessFile _left = left;
    RandomAccessFile _right = right;

    BufferedWriter _output = output;

    Record _leftRecord;
    Record _rightRecord;

    _leftRecord = read(_left);
    _rightRecord = read(_right);

    while( _leftRecord != null && _rightRecord != null ) {

        if( _leftRecord.getKey() < _rightRecord.getKey() ) {
            write(_output, _leftRecord, null);
            _leftRecord = read(_left);
        }
        else if( _leftRecord.getKey() > _rightRecord.getKey() ) {
            write(_output, null, _rightRecord);             
            _pointer = _right.getFilePointer();
            _rightRecord = read(_right);
        }
        else {          
            long _tempPointer = 0;
            int key = _leftRecord.getKey();                                                         
            while( _leftRecord != null && _leftRecord.getKey() == key ) {
                _right.seek(_pointer);
                _rightRecord = read(_right);
                while( _rightRecord != null && _rightRecord.getKey() == key ) {
                    write(_output, _leftRecord, _rightRecord );
                    _tempPointer = _right.getFilePointer();
                    _rightRecord = read(_right); 
                }
                _leftRecord = read(_left);
            }
            _pointer = _tempPointer;
        }
    }

    if( _leftRecord != null ) {
        write(_output, _leftRecord, null);
        _leftRecord = read(_left);
        while(_leftRecord != null) {
            write(_output, _leftRecord, null);
            _leftRecord = read(_left);              
        }
    }
    else {  
        if( _rightRecord != null ) {
            write(_output, null, _rightRecord);
            _rightRecord = read(_right);
            while(_rightRecord != null) {
                write(_output, null, _rightRecord);
                _rightRecord = read(_right);
            }
        }           
    }
    _left.close();
    _right.close();
    _output.flush();
    _output.close();
}

UPDATE Mentre questo approccio funzionava, era terribilmente lento e quindi ho modificato questo per creare file come buffer e questo funziona molto bene. Ecco l'aggiornamento ...

private long getMaxBufferedLines(File file) throws Throwable {
    long freeBytes = Runtime.getRuntime().freeMemory() / 2;     
    return (freeBytes / (file.length() / getLineCount(file)));
}

private void join(File left, File right, File output, JoinType joinType) throws Throwable {

    BufferedReader leftFile = new BufferedReader(new FileReader(left));
    BufferedReader rightFile = new BufferedReader(new FileReader(right));
    BufferedWriter outputFile = new BufferedWriter(new FileWriter(output));
    long maxBufferedLines = getMaxBufferedLines(right);

    Record leftRecord;
    Record rightRecord;

    leftRecord = read(leftFile);
    rightRecord = read(rightFile);

    while( leftRecord != null && rightRecord != null ) {

        if( leftRecord.getKey().compareTo(rightRecord.getKey()) < 0) {
            if( joinType == JoinType.LeftOuterJoin 
                    || joinType == JoinType.LeftExclusiveJoin
                    || joinType == JoinType.FullExclusiveJoin
                    || joinType == JoinType.FullOuterJoin ) { 
                write(outputFile, leftRecord, null);
            }
            leftRecord = read(leftFile);
        }
        else if( leftRecord.getKey().compareTo(rightRecord.getKey()) > 0 ) {
            if( joinType == JoinType.RightOuterJoin 
                    || joinType == JoinType.RightExclusiveJoin
                    || joinType == JoinType.FullExclusiveJoin
                    || joinType == JoinType.FullOuterJoin ) {
                write(outputFile, null, rightRecord);
            }
            rightRecord = read(rightFile);
        }
        else if( leftRecord.getKey().compareTo(rightRecord.getKey()) == 0 ) {
            String key = leftRecord.getKey(); 
            List<File> rightRecordFileList = new ArrayList<File>();
            List<Record> rightRecordList = new ArrayList<Record>();
            rightRecordList.add(rightRecord);
            rightRecord = consume(key, rightFile, rightRecordList, rightRecordFileList, maxBufferedLines);

            while( leftRecord != null && leftRecord.getKey().compareTo(key) == 0 ) {
                processRightRecords(outputFile, leftRecord, rightRecordFileList, rightRecordList, joinType);
                leftRecord = read(leftFile);
            }

            // need a dispose for deleting files in list
        }
        else {
            throw new Exception("DATA IS NOT SORTED");
        }
    }

    if( leftRecord != null ) {
        if( joinType == JoinType.LeftOuterJoin 
                || joinType == JoinType.LeftExclusiveJoin
                || joinType == JoinType.FullExclusiveJoin
                || joinType == JoinType.FullOuterJoin ) {
            write(outputFile, leftRecord, null);
        }
        leftRecord = read(leftFile);
        while(leftRecord != null) {
            if( joinType == JoinType.LeftOuterJoin 
                    || joinType == JoinType.LeftExclusiveJoin
                    || joinType == JoinType.FullExclusiveJoin
                    || joinType == JoinType.FullOuterJoin ) {
                write(outputFile, leftRecord, null);
            }
            leftRecord = read(leftFile);              
        }
    }
    else {  
        if( rightRecord != null ) {
            if( joinType == JoinType.RightOuterJoin 
                    || joinType == JoinType.RightExclusiveJoin
                    || joinType == JoinType.FullExclusiveJoin
                    || joinType == JoinType.FullOuterJoin ) {
                write(outputFile, null, rightRecord);
            }
            rightRecord = read(rightFile);
            while(rightRecord != null) {
                if( joinType == JoinType.RightOuterJoin 
                        || joinType == JoinType.RightExclusiveJoin
                        || joinType == JoinType.FullExclusiveJoin
                        || joinType == JoinType.FullOuterJoin ) {
                    write(outputFile, null, rightRecord);
                }
                rightRecord = read(rightFile);
            }
        }           
    }
    leftFile.close();
    rightFile.close();
    outputFile.flush();
    outputFile.close();
}

public void processRightRecords(BufferedWriter outputFile, Record leftRecord, List<File> rightFiles, List<Record> rightRecords, JoinType joinType) throws Throwable {

    for(File rightFile : rightFiles) {
        BufferedReader rightReader = new BufferedReader(new FileReader(rightFile));
        Record rightRecord = read(rightReader);
        while(rightRecord != null){

            if( joinType == JoinType.LeftOuterJoin 
                    || joinType == JoinType.RightOuterJoin
                    || joinType == JoinType.FullOuterJoin
                    || joinType == JoinType.InnerJoin ) {               

                write(outputFile, leftRecord, rightRecord);
            }
            rightRecord = read(rightReader);
        }
        rightReader.close();
    }

    for(Record rightRecord : rightRecords) {
        if( joinType == JoinType.LeftOuterJoin 
                || joinType == JoinType.RightOuterJoin
                || joinType == JoinType.FullOuterJoin
                || joinType == JoinType.InnerJoin ) {               

            write(outputFile, leftRecord, rightRecord);
        }
    }
}

/**
 * consume all records having key (either to a single list or multiple files) each file will 
 * store a buffer full of data. The right record returned represents the outside flow (key is 
 * already positioned to next one or null) so we can't use this record in below while loop or 
 * within this block in general when comparing current key. The trick is to keep consuming 
 * from a List. When it becomes empty, re-fill it from the next file until all files have 
 * been consumed (and the last node in the list is read). The next outside iteration will be 
 * ready to be processed (either it will be null or it points to the next biggest key
 * @throws Throwable 
 * 
 */
private Record consume(String key, BufferedReader reader, List<Record> records, List<File> files, long bufferMaxRecordLines ) throws Throwable {
    boolean processComplete = false;
    Record record = records.get(records.size() - 1);

    while(!processComplete){
        long recordCount = records.size();
        if( record.getKey().compareTo(key) == 0 ){
            record = read(reader);                
            while( record != null && record.getKey().compareTo(key) == 0 && recordCount < bufferMaxRecordLines ) {
                records.add(record);
                recordCount++;
                record = read(reader);
            }
        }
        processComplete = true;
        // if record is null, we are done
        if( record != null ) {

            // if the key has changed, we are done
            if( record.getKey().compareTo(key) == 0 ) {

                // Same key means we have exhausted the buffer.
                // Dump entire buffer into a file. The list of file 
                // pointers will keep track of the files ...                    
                processComplete = false;
                dumpBufferToFile(records, files);
                records.clear();
                records.add(record);
            }
        }
    }
    return record;
}

/**
 * Dump all records in List of Record objects to a file. Then, add that 
 * file to List of File objects
 * 
 * NEED TO PLACE A LIMIT ON NUMBER OF FILE POINTERS (check size of file list)
 * 
 * @param records
 * @param files
 * @throws Throwable 
 */
private void dumpBufferToFile(List<Record> records, List<File> files) throws Throwable {
    String prefix = "joiner_" + files.size() + 1;
    String suffix = ".dat";
    File file = File.createTempFile(prefix, suffix, new File("cache"));
    BufferedWriter writer = new BufferedWriter(new FileWriter(file));       

    for( Record record : records ) {
        writer.write( record.dump() );
    }

    files.add(file);
    writer.flush();
    writer.close();
}
    
posta Constantin 06.09.2013 - 15:24
fonte

2 risposte

3

puoi utilizzare un RandomAccessFile

a caso.
private void join(RandomAccessFile left, RandomAccessFile right, BufferedWriter output) throws Throwable {

    long rightIndex = right.getFilePointer();

    Record leftRecord = read(left);
    Record rightRecord;

    while((leftRecord )!=null){
        right.seek(rightIndex);

        while((rightRecord = read(right))!=null && leftRecord.getKey() > rightRecord.getKey()){
            //skip right until it can match left
            // store pointer because we don't need to return to earlier
            rightIndex = right.getFilePointer();
        }

        while(rightRecord !=null && leftRecord.getKey()==rightRecord.getKey()){
            write(_output, leftRecord, rightRecord );
            rightRecord = read(right))
        }
        leftRecord = read(left);
        //you can test the need to skip this left record if you keep the previous key and the current rightRecord.getKey()
    }
}

questo sarà un tempo O (n + m) e un algoritmo di spazio O (1)

    
risposta data 06.09.2013 - 16:26
fonte
0

Perché vorresti usare "File Cache"?

Si suppone che il caching venga utilizzato quando si dispone di un livello con una capacità di accesso più rapida per ridurre il tempo sui livelli più lenti.

Quindi, se leggi i file attraverso la rete, la memorizzazione nella cache tramite file system ha senso perché il file system locale sarà più veloce della rete. Se si sta lavorando su file system, il livello più veloce sarà la memoria e BufferedReader gestirà la memorizzazione nella cache della memoria. È possibile scrivere il proprio wrapper CustomBufferredReader per BufferedReader per controllare con maggiore precisione quanti record si desidera leggere in anticipo e mettere in memoria. Per fare ciò correttamente è necessario effettuare una ricerca sull'hardware / disco che si sta utilizzando per capire come ottimizzare i record.

Nel tuo caso, stai memorizzando nella cache le operazioni del file system sulle operazioni del file system. Non ha senso per me. Non chiamerei affatto questo caching. Questo ti aiuterà solo quando stai memorizzando nella cache il lavoro intermedio (come quando fai la programmazione dinamica), ma non è il caso perché i record sono stati ordinati esternamente tramite l'ordinamento esterno come hai menzionato. Non c'è lavoro intermedio.

E si menziona il prodotto cartesiano nella parte interna della join, se i record sono tutti ordinati per ordinamento esterno, non è necessario preoccuparsi dei prodotti cartesiani, basta fare una scansione lineare a zig-zag tra la lista dei record di sinistra e la destra recrod list.

    
risposta data 05.06.2014 - 04:34
fonte

Leggi altre domande sui tag