Lazy Processing of Streams

3

Ho il seguente problema:

  • Ho un file di testo e devo leggerlo e dividerlo in righe.
  • Potrebbe essere necessario eliminare alcune righe (in base a criteri non risolti).
  • Le righe che non vengono rilasciate devono essere analizzate in alcuni record predefiniti.
  • I record che non sono validi devono essere eliminati.
  • I record duplicati possono esistere e, in tal caso, sono consecutivi. Se esistono duplicati / più record, è necessario conservare solo un elemento.
  • I record rimanenti devono essere raggruppati in base al valore contenuto in un campo; tutti i record appartenenti allo stesso gruppo appaiono uno dopo l'altro (ad esempio AAAABBBBCCDEEEFF e così via).
  • Le registrazioni di ciascun gruppo devono essere numerate (1, 2, 3, 4, ...). Per ogni gruppo la numerazione inizia da 1.
  • I record devono quindi essere salvati da qualche parte / consumati nello stesso ordine in cui sono stati prodotti.

Devo implementarlo in Java o C ++.

La mia prima idea era di definire funzioni / metodi come:

  • Un metodo per ottenere tutte le linee dal file.
  • Un metodo per filtrare le linee indesiderate.
  • Un metodo per analizzare le righe filtrate in record validi.
  • Un metodo per rimuovere i record duplicati.
  • Un metodo per raggruppare i record e numerarli.

Il problema è che i dati che ho intenzione di leggere possono essere troppo grandi e potrebbero non adattarsi alla memoria principale: quindi non posso semplicemente costruire tutte queste liste e applicare le mie funzioni una dopo l'altra.

D'altra parte, penso di non aver bisogno di inserire tutti i dati nella memoria principale in una sola volta perché una volta che un record è stato consumato tutti i suoi dati sottostanti (fondamentalmente le righe di testo tra il record precedente e il record corrente, e il disco stesso) possono essere eliminati.

Con la poca conoscenza che ho di Haskell ho immediatamente pensato a una sorta di valutazione lazy, in cui invece di applicare funzioni a liste che sono state completamente calcolate, ho flussi di dati diversi che sono costruiti uno sopra l'altro e, in ogni momento, solo la porzione necessaria di ogni flusso si materializza nella memoria principale.

Ma devo implementarlo in Java o C ++. Quindi la mia domanda è quale modello di progettazione o altra tecnica possa permettermi di implementare questa elaborazione lenta di flussi in uno di questi linguaggi.

    
posta Giorgio 15.04.2012 - 01:21
fonte

2 risposte

6

Dovresti esaminare gli iteratori se decidi di usare Java.

Scrivi un Iterator<String> che legge una riga dal file. Scrivi un iteratore di filtro che accetta l'iteratore sopra nel costruttore e genera solo quelle linee a cui sei interessato. Scrivi una percentuale% co_de che accetta un iteratore di stringa nel costruttore e divide ciascuna riga in un record. E così via.

Molto probabilmente scoprirai che eseguirai l'elaborazione in "ci sono di più?" sezione per ottenere la logica giusta.

    
risposta data 15.04.2012 - 02:24
fonte
2

Anche se questa domanda ha già una risposta accettata, ho recentemente trovato un'altra soluzione interessante, che vorrei condividere qui. Sarei interessato a conoscere altre tecniche per implementare i flussi in Java.

Le prime idee per la mia soluzione sono venute da un corso su Scala in cui ho potuto sperimentare i flussi di libreria di Scala ( scala.collection.immutable.Stream ). Il corso ha spiegato che uno stream in Scala è molto simile a un elenco (sequenza di celle cons) in cui la funzione / metodo tail () viene calcolata pigramente: la sequenza di coda viene calcolata su richiesta quando tail () viene chiamato per la prima volta e memorizzato nella cache per ulteriori accessi.

Naturalmente, anche la coda è calcolata pigramente, cioè viene generata solo la prima cella di controllo della coda quando viene chiamato tail (): gli elementi successivi del flusso coda vengono generati quando tail () viene chiamato su questo flusso, e così via.

La seconda parte della soluzione proviene da questa domanda e alcune delle sue risposte: l'unica estensione di cui avevo bisogno era implementare le celle contro con code pigre.

Sulla base di questa soluzione, è stato facile definire le solite funzioni (metodi) take e drop , e le funzioni di ordine superiore map , filter , dropWhile , takeWhile che anche lavorare pigramente sui flussi. Con questi strumenti il problema originale di manipolare i flussi di stringhe è molto più facile da risolvere.

La mia soluzione

Ho implementato per la prima volta gli elenchi come indicato nella domanda programmers e implementato un tipo di dati simile Stream<T> . Una cella contro consiste di un oggetto di classe Cons<T> contenente

  1. Un valore di tipo T (la testa) e
  2. Una chiusura (oggetto funzione) con un metodo apply che restituisce il flusso di coda.

Ecco le definizioni chiave:

// An interface to store functions (closures) that are
// used to produce a stream's tail on demand.
public interface IStreamFunction<A>
{
  public Stream<A> apply();
}

// Wrapper object that invokes a stream function
// and then stores the result for future invocations of tail().
public class Memoizer<Stream<A>>
{
  private final IStreamFunction<A> _f;

  private Stream<A> _v;

  public Memoizer(IStreamFunction<A> f)
  {
    _f = f;

    _v = null;
  }

  public Stream<A> value()
  {
    if (_v == null)
    {
      _v = _f.apply();
    }

    return _v;
  }
}

// Stream class with two subclasses implementing the empty stream
// and a cons cell, respectively.
public abstract class Stream<A>
{
  // Private constructor cannot be called by any subclasses except inner classes.
  private Stream()
  {
  }

  public abstract A head();

  public abstract Stream<A> tail();

  public abstract boolean isEmpty();

  // Empty stream.
  public static final class Nil<A> extends Stream<A>
  {
    public Nil()
    {
    }

    public A head()
    {
      throw new NoSuchElementException("Nil.head");
    }

    public Stream<A> tail()
    {
      throw new NoSuchElementException("Nil.tail");
    }

    public boolean isEmpty()
    {
      return true;
    }
  }

  // Cons cell.
  public static final class Cons<A> extends Stream<A>
  {
    private final A _h;
    private final Memoizer<Stream<A>> _m;

    public Cons(A h, IStreamFunction<A> f)
    {
      _h = h;
      _m = new Memoizer<Stream<A>>(f);
    }

    public A head()
    {
      return _h;
    }

    public Stream<A> tail()
    {
      return _m.value();
    }

    public boolean isEmpty()
    {
      return false;
    }
  }
}
    
risposta data 14.11.2012 - 23:44
fonte

Leggi altre domande sui tag