Gestione dei flussi in pausa con RxJS

1

Leggere un flusso del nodo Voglio poter ricevere un flusso di testo e attivare la continuazione del mio flusso.

Il seguente codice risolve i miei scopi, ma di recente ho letto che stiamo usando il soggetto troppo quando non abbiamo bisogno di farlo. Userò questo codice per leggere i file dal disco o da S3.

È possibile sostituire i miei soggetti con osservabili o è possibile farlo senza usare il soggetto?

La migliore pratica è l'uso Soggetto con parsimonia a causa di motivi diversi: il mio importante è che non è riutilizzabile, se qualcosa non riesce nell'elaborazione non posso ricominciare daccapo.

Penso di poter eseguire il mio continuaStream utilizzando un emettitore di eventi come origine dati e digerirlo tramite un Observable.from . Vale la pena seguire questo percorso?

import { split, mapSync } from 'event-stream'

import { Subject } from 'rxjs/Subject'
import 'rxjs/add/operator/finally'
import 'rxjs/add/operator/catch'

// This will receive your Node Stream
// It will return two observable, one for getting the lines
// and another to continue pulling from the stream
function streamProcessing (stream: NodeJS.ReadableStream) {
  const source = new Subject()
  const continueStream = new Subject()
  const s = stream
    .pipe(split())
    .on('data', (line) => {
      s.pause()
      source.next({
        line: line
      })
    })
    .on('error', (err) => {
      source.error(err)
    })
    .on('end', () => {
      source.complete()
    })
  continueStream.subscribe(() => {
    s.resume()
  })
  return {
    text$: source,
    continue$: continueStream
  }
}

// this is my control function, basically what I'll
// use on my unit test or on my main function
export function processTxt (asciiStream: NodeJS.ReadableStream) {
  return new Promise((res, rej) => {
    const { text$, continue$ } = streamProcessing(asciiStream)
    text$
      .finally(() => {
        console.log('ended')
        res('ended')
      })
      .subscribe((lineOfText) => {
        console.log(lineOfText)
        continue$.next()
      })
  })
}
    
posta Claudiordgz 22.01.2018 - 18:52
fonte

1 risposta

1

Ho trovato la soluzione che utilizza solo 1 oggetto per pause .

Sembra qualcosa del tipo:

A un certo punto ho deciso che sarebbe stato bello mettere in pausa, riprendere lo streaming con la stessa funzione ed essere molto molto molto attento dove lo chiamo effettivamente. Questo è essenziale, la maggior parte dei problemi che ho avuto, perché stavo facendo una pausa, riprendendo nel posto sbagliato.

enum StreamEventType {
  PAUSE,
  RESUME
}

function pauseStream (s: NodeJS.ReadableStream,
                      flag: StreamEventType) {
  switch (flag) {
    case StreamEventType.PAUSE:
      s.pause()
      break
    case StreamEventType.RESUME:
      s.resume()
      break
  }
}

Ritorna sia il mio stream di testo, sia la mia pausa osservabile ( Subject ) come segue:

import { split, map } from 'event-stream'

function streamProcessing (stream: NodeJS.ReadableStream) {
 const playPauseStream = new Subject()
  const sourceObservable = Observable.create((observer) => {
    const s = stream
      .pipe(split())
      .on('data', (line) => { observer.next(line) })
      .on('error', (err) => { observer.error(err) })
      .on('end', () => { observer.complete() })
    playPauseStream.subscribe(f => pauseStream(s, (f as StreamEventType)))
  })
  return {
    text$: sourceObservable,
    playPause$: playPauseStream
  }
}

Il Pausa / Riprendi (❚❚ ►) Osservabile è più essenziale (e delicato) di quello che ti aspetti.

Per consumare questo flusso ho 2 casi d'uso, uno è XML e l'altro è solo \f interruzioni di pagina. Quest'ultimo è più facile, quindi l'ho fatto per primo. Abbiamo bisogno di qualcosa che recuperi il testo riga per riga, controlli \f e si interrompa di conseguenza, e restituisci semplicemente l'interruttore playPause con il buffer ottenuto. Prima di pubblicare il buffer, salveremo in da qualche parte nel mondo noi pause lo stream.

Una pagina di testo non è molto più di una preoccupazione 'memoria-saggia'. Creeremo una funzione che fa questo con l'argomento observer , e quindi useremo quell'osservatore per creare un osservabile:

function documentHandlers (asciiStream: NodeJS.ReadableStream) {
  const { text$, playPause$ } = streamProcessing(asciiStream)
  function obs (observer) {
    let buffer: string[] = []
    text$
      .finally(() => { observer.complete() })
      .catch((err) => Observable.throw(err))
      .subscribe((l) => {
        const line = (l as string)
        const firstChar = line.charAt(0)
        if (firstChar !== '\f' || buffer.length === 0) {
          buffer.push(line)
        } else {
          const bf = buffer.slice()
          buffer = [line]
          playPause$.next(StreamEventType.PAUSE)
          observer.next(bf)
        }
      })
  }
  return {
    documentObserver: obs,
    playPauseObservable: playPause$
  }
}

Ora è il momento di creare l'osservabile. Presta attenzione all'argomento cbPromise , che restituisce una promessa, una volta che la promessa è stata risolta, allora resume lo stream:

const documentStream = (asciiStream, cbPromise) => {
  const { documentObserver, playPauseObservable } = documentHandlers(asciiStream)
  const document$ = Observable.create(documentObserver)
    .map(document =>
      Observable.fromPromise(cbPromise(document)
        .then(_ => {
          playPauseObservable.next(StreamEventType.RESUME)
          return true
        })
        .catch(e => false)
      )
    )
  return document$
}

Questo ci porta finalmente al punto di partenza, ora implementiamo una funzione che l'unità verifica questo comportamento (o integra i test?):

  • Leggi il file dal disco, salva sul disco
  • Leggi il file da S3 locale, Salva su S3 locale
  • Leggi file da S3, Salva su S3

Il motivo per cui è stato così importante scomporlo è che dobbiamo limitare il nostro salvataggio di S3 a meno di 300 richieste al secondo, questa cosa può gestire migliaia al secondo e La mia implementazione non deve preoccuparsi di questo . Inoltre i miei test di unità possono:

  • Controlla se la cosa controlla le richieste a 300 al secondo o meno.
  • Verifica se l'output dei file è corretto e guarda come previsto.

Inoltre non vogliamo fare molte impostazioni per fare questo test, quindi nei miei test di unità faccio la funzione che salverà su disco, S3 locale e web S3 con la seguente firma:

const processFileTest = (document: string[]) => {
  return new Promise((res, rej) => {
    // save to disk right here
    res()
  })
}

Poi passa alla mia funzione e iscriviti per eseguire il processo sullo stream:

const documentObservable: Observable<any> = documentStream(stream, processFileTest).subscribe()

Mi piace molto farlo in questo modo perché mi dà flessibilità di controllo sul flusso. Non esitate a lasciare commenti.

Se ti stai chiedendo Che diavolo è lo stream , nel caso di un file locale è il seguente:

const stream = createReadStream('C:\Some Path\To File.txt')
      .setEncoding('ascii')
    
risposta data 23.01.2018 - 15:01
fonte

Leggi altre domande sui tag