Comment obtenir un onFinish unifié à partir de stream séparés (certains créés à partir du stream d’origine)

J’ai un processus de stream comme ceci:

Incomming file via HTTP (original stream) -> Check if zipfile - Yes -> push through an unzip2-stream - No -> push to S3 

Lorsque le stream unzip2 trouve des entrées zip, celles-ci sont poussées dans la même chaîne de stream, c.-à-d.

 Incomming file entry from zip file ("child" stream) -> Check if zipfile - Yes -> push through an unzip2-stream - No -> push to S3 

Merci à https://stackoverflow.com/users/3580261/eljefedelrodeodeljefe J’ai réussi à résoudre le problème principal après cette conversation: comment redirect un stream vers un autre stream en fonction des données contenues dans le premier bloc?

Le problème avec la création de nouveaux stream “enfants” pour chaque entrée zip est que ceux-ci n’auront aucune connexion au stream d’origine, je ne peux donc pas obtenir un onFinish unifié pour tous les stream.

Je ne veux pas envoyer de 202 à l’expéditeur avant d’avoir traité (décompressé et envoyé à S3) chaque fichier. Comment puis-je accomplir cela?

Je pense que j’ai peut-être besoin d’une sorte d’object de contrôle qui attend onFinish pour tous les stream enfants et force le processus à demeurer dans l’événement onFinish d’origine jusqu’à ce que tous les fichiers soient traités. Est-ce que ce serait exagéré? Y a-t-il une solution plus simple?

J’ai fini par faire un compteur séparé pour les stream. Il y a probablement une meilleure solution, mais cela fonctionne.

J’envoie l’object compteur en tant qu’argument au premier appel de ma fonction saveFile (). Le compteur est transmis au stream décompressé afin qu’il puisse être transmis à saveFile pour chaque entrée de fichier.

  • Juste avant le début d’un stream (c’est-à-dire une canalisation), j’appelle streamCounter.streamStarted ().
  • Dans le dernier onFinish de la chaîne de tuyaux, j’appelle streamCounter.streamFinished ()
  • Au cas où un stream deviendrait mauvais, j’appellerais streamCounter.streamFailed ()

Juste avant d’envoyer le 202 sous la forme post route, j’attends la résolution de streamCounter.streamPromise .

Je ne suis pas très fier de la solution setInterval. Ce serait probablement mieux avec une sorte d’événement émettant.

 module.exports.streamCounter = function() { let streamCount = 0; let isStarted = false; let errors = []; this.streamStarted = function(options) { isStarted = true; streamCount += 1; log.debug(`Stream started for ${options.filename}. New streamCount: ${streamCount}`); }; this.streamFinished = function(options) { streamCount -= 1; log.debug(`Finished stream for ${options.filename}. New streamCount: ${streamCount}`); }; this.streamFailed = function(err) { streamCount -= 1; errors.push(err); log.debug(`Failed stream because (${err.message}). New streamCount: ${streamCount}`); }; this.streamPromise = new Promise(function(resolve, reject) { let interval = setInterval(function() { if(isStarted && streamCount === 0) { clearInterval(interval); if(errors.length === 0) { log.debug('StreamCounter back on 0. Resolving streamPromise'); resolve(); } else { log.debug('StreamCounter back on 0. Errors encountered.. Rejecting streamPromise'); reject(errors[errors.length-1]); } } }, 100); }); }; 

Au début, j’ai essayé ce concept avec un tableau de promesses et ai attendu Promise.all () avant d’envoyer le statut 202. Mais Promise.all () ne fonctionne qu’avec des tableaux statiques pour autant que je sache. Mon “streamCount” évolue pendant la diffusion, il me fallait donc un “Promise.all” plus dynamic.