Secuencias de lectura de nodos: ¿cómo puedo limitar la cantidad de arc

Secuencias de lectura de nodos: ¿cómo puedo limitar la cantidad de archivos abiertos?

Me encuentro con AggregateError: EMFILE: too many open files mientras transmito varios archivos.

Detalles de la máquina: macos monterrey, MacBook Pro (14 pulgadas, 2021), microprocesador apple m1 pro, Memoria 16GB, Nodo v16.13.0

He intentado aumentar los límites sin suerte. Idealmente, me gustaría poder establecer el límite de la cantidad de archivos abiertos a la vez o resolverlos cerrando los archivos tan pronto como se hayan utilizado.

Código a continuación. Intenté eliminar el código no relacionado y reemplazarlo con '//...'.

const MultiStream = require('multistream');
const fs = require('fs-extra'); // Also tried graceful-fs and the standard fs
const { fdir } = require("fdir");
// Also have a require for the bz2 and split2 functions but editing from phone right now

//...

let files = [];

//...

(async() => {

  const crawler = await new fdir()
  .filter((path, isDirectory) => path.endsWith(".bz2"))
  .withFullPaths()
  .crawl("Dir/Sub Dir")
  .withPromise();

  for(const file of crawler){
    files = [...files, fs.createReadStream(file)]
  }

  multi = await new MultiStream(files)
    // Unzip
    .pipe(bz2())
    // Create chunks from lines
    .pipe(split2())
    .on('data', function (obj) {
      // Code to filter data and extract what I need
      //...
    })
    .on("error", function(error) {
      // Handling parsing errors
      //...
    })
    .on('end', function(error) {
      // Output results
      //...
    })

})();
Mostrar la mejor respuesta

¿Alguna razón por la que no está simplemente empujando el flujo de lectura a la matriz de archivos? también, ¿cuántos archivos estás tratando de leer? todo tiene sus limites

¿Cuántos archivos?

Mi código original funcionó durante un mes de datos, que contenían 28 000 archivos de NDJSON, pero falló cuando intenté procesar los datos de un año, que contenían 323 000 archivos de NDJSON. Probando el código sugerido ahora :D

Para evitar la preapertura de un identificador de archivo para cada uno de los archivos de su conjunto, solo debe abrir los archivos cuando se solicite cuando sea el turno de transmisión de ese archivo en particular. Y puede hacerlo con transmisión múltiple.

Según el doc de flujo múltiple, puede crear lentamente los flujos de lectura cambiando esto:

  for(const file of crawler){
    files = [...files, fs.createReadStream(file)]
  }

a esto:

  let files = crawler.map((f) => {
      return function() {
          return fs.createReadStream(f);
      }
  });

joder, fui demasiado lento. También el uso del mapa aquí es fluido.

Guau, un cambio tan fácil de hacer en el código y se realizó a la primera sin errores. Reduzca unos minutos también al no tener que recorrer los archivos. ¡Gracias!

Después de leer la página de npm para multistream, creo que encontré algo que me ayudará. . También he editado dónde está agregando la secuencia a la matriz de archivos, ya que no veo la necesidad de instanciar una nueva matriz y distribuir los elementos existentes como lo está haciendo.

Para crear flujos de forma perezosa, envuélvalos en una función:

    var streams = [
      fs.createReadStream(__dirname + '/numbers/1.txt'),
      function () { // will be executed when the stream is active
        return fs.createReadStream(__dirname + '/numbers/2.txt')
      },
      function () { // same
        return fs.createReadStream(__dirname + '/numbers/3.txt')
      }
    ]
    
    new MultiStream(streams).pipe(process.stdout) // => 123 ```

Con eso podemos actualizar su lógica para incluir esta funcionalidad simplemente envolviendo los flujos de lectura en funciones, de esta manera los flujos no se crearán hasta que se necesiten. Esto evitará que tengas demasiados abiertos a la vez. Podemos hacer esto simplemente actualizando su bucle de archivos:

for(const file of crawler){
    files.push(function() {
        return fs.createReadStream(file)
    })
}

Esto también funcionó, ¡gracias! Miré los documentos pero no entendí la situación lo suficiente como para darme cuenta de que esta era la solución. Después de leer sobre graceful-fs y cómo trató de abordar los errores de EMFILE, pensé con certeza que era donde me estaba equivocando, no cerraba los archivos o algo así. ¡Gracias de nuevo por ayudarme a entender la situación!