Ho trovato la soluzione che utilizza solo 1 oggetto per pause
.
Sembra qualcosa del tipo:
A un certo punto ho deciso che sarebbe stato bello mettere in pausa, riprendere lo streaming con la stessa funzione ed essere molto molto molto attento dove lo chiamo effettivamente. Questo è essenziale, la maggior parte dei problemi che ho avuto, perché stavo facendo una pausa, riprendendo nel posto sbagliato.
enum StreamEventType {
PAUSE,
RESUME
}
function pauseStream (s: NodeJS.ReadableStream,
flag: StreamEventType) {
switch (flag) {
case StreamEventType.PAUSE:
s.pause()
break
case StreamEventType.RESUME:
s.resume()
break
}
}
Ritorna sia il mio stream di testo, sia la mia pausa osservabile ( Subject
) come segue:
import { split, map } from 'event-stream'
function streamProcessing (stream: NodeJS.ReadableStream) {
const playPauseStream = new Subject()
const sourceObservable = Observable.create((observer) => {
const s = stream
.pipe(split())
.on('data', (line) => { observer.next(line) })
.on('error', (err) => { observer.error(err) })
.on('end', () => { observer.complete() })
playPauseStream.subscribe(f => pauseStream(s, (f as StreamEventType)))
})
return {
text$: sourceObservable,
playPause$: playPauseStream
}
}
Il Pausa / Riprendi (❚❚ ►) Osservabile è più essenziale (e delicato) di quello che ti aspetti.
Per consumare questo flusso ho 2 casi d'uso, uno è XML e l'altro è solo \f
interruzioni di pagina. Quest'ultimo è più facile, quindi l'ho fatto per primo. Abbiamo bisogno di qualcosa che recuperi il testo riga per riga, controlli \f
e si interrompa di conseguenza, e restituisci semplicemente l'interruttore playPause con il buffer ottenuto. Prima di pubblicare il buffer, salveremo in da qualche parte nel mondo noi pause
lo stream.
Una pagina di testo non è molto più di una preoccupazione 'memoria-saggia'. Creeremo una funzione che fa questo con l'argomento observer
, e quindi useremo quell'osservatore per creare un osservabile:
function documentHandlers (asciiStream: NodeJS.ReadableStream) {
const { text$, playPause$ } = streamProcessing(asciiStream)
function obs (observer) {
let buffer: string[] = []
text$
.finally(() => { observer.complete() })
.catch((err) => Observable.throw(err))
.subscribe((l) => {
const line = (l as string)
const firstChar = line.charAt(0)
if (firstChar !== '\f' || buffer.length === 0) {
buffer.push(line)
} else {
const bf = buffer.slice()
buffer = [line]
playPause$.next(StreamEventType.PAUSE)
observer.next(bf)
}
})
}
return {
documentObserver: obs,
playPauseObservable: playPause$
}
}
Ora è il momento di creare l'osservabile. Presta attenzione all'argomento cbPromise
, che restituisce una promessa, una volta che la promessa è stata risolta, allora resume
lo stream:
const documentStream = (asciiStream, cbPromise) => {
const { documentObserver, playPauseObservable } = documentHandlers(asciiStream)
const document$ = Observable.create(documentObserver)
.map(document =>
Observable.fromPromise(cbPromise(document)
.then(_ => {
playPauseObservable.next(StreamEventType.RESUME)
return true
})
.catch(e => false)
)
)
return document$
}
Questo ci porta finalmente al punto di partenza, ora implementiamo una funzione che l'unità verifica questo comportamento (o integra i test?):
- Leggi il file dal disco, salva sul disco
- Leggi il file da S3 locale, Salva su S3 locale
- Leggi file da S3, Salva su S3
Il motivo per cui è stato così importante scomporlo è che dobbiamo limitare il nostro salvataggio di S3 a meno di 300 richieste al secondo, questa cosa può gestire migliaia al secondo e La mia implementazione non deve preoccuparsi di questo . Inoltre i miei test di unità possono:
- Controlla se la cosa controlla le richieste a 300 al secondo o meno.
- Verifica se l'output dei file è corretto e guarda come previsto.
Inoltre non vogliamo fare molte impostazioni per fare questo test, quindi nei miei test di unità faccio la funzione che salverà su disco, S3 locale e web S3 con la seguente firma:
const processFileTest = (document: string[]) => {
return new Promise((res, rej) => {
// save to disk right here
res()
})
}
Poi passa alla mia funzione e iscriviti per eseguire il processo sullo stream:
const documentObservable: Observable<any> = documentStream(stream, processFileTest).subscribe()
Mi piace molto farlo in questo modo perché mi dà flessibilità di controllo sul flusso. Non esitate a lasciare commenti.
Se ti stai chiedendo Che diavolo è lo stream , nel caso di un file locale è il seguente:
const stream = createReadStream('C:\Some Path\To File.txt')
.setEncoding('ascii')