Limite de simultanéité dans les promesses Q – nœud

Existe-t-il une méthode pour limiter la simultanéité des promesses en utilisant la bibliothèque Q promises?

Cette question est un peu liée à Comment puis-je limiter la simultanéité de la promesse Q?

mais le problème est que j’essaie de faire quelque chose comme ceci:

for (var i = 0; i <= 1000; i++) { return Q.all([ task1(i), task2(i) ]); // <-- limit this to 2 at a time. } 

Le cas d’utilisation réel est:

  1. Récupérer les messages de la firebase database
  2. Boucle chaque poste dans la firebase database comme posts.forEach(function(post) {}
  3. Pour chaque message, tâche1, tâche2, tâche3 (récupérer des compteurs sociaux, récupérer le nombre de commentaires, etc.)
  4. Enregistrer les nouvelles données de publication dans la firebase database.

Mais le problème est que le noeud exécute toutes les tâches pour tous les messages en même temps, comme demander à Facebook le “nombre de” J’aime “pour 500 messages en même temps.

Comment je peux limiter Q.all() afin que seulement 2 postes à la fois exécutent leurs tâches? Ou quelles autres solutions possibles peuvent s’appliquer ici?

Remarque: la plupart des tâches (si pas toutes) dépendent de la bibliothèque de demandes.

Grâce à Dan, sa réponse et son aide pour l’intégrer à mon code, cela peut être fait en utilisant son essence et un extrait comme celui-ci:

 var qlimit = require('../libs/qlimit'); var test = function(id) { console.log('Running ' + id); return Q.nfcall(request, 'some dummy url which takes some time to process, for example a php file with sleep(5)').spread(function(response, body) { console.log('Response ' + id); return body; }); } test = qlimit.limitConcurrency(test, 1); var data = [0, 1, 2]; data.forEach(function(id) { console.log('Starting item ' + id); Q.all([ test(id) ]); }); 

De cette façon, vous obtenez quelque chose comme:

  • Point de départ 0
  • Point de départ 1
  • Point de départ 2
  • Courir 0
  • Réponse 0
  • En cours d’exécution 1
  • Réponse 1
  • En cours d’exécution 2
  • Réponse 2

Ce qui est clairement 1 demande à la fois.

Le point entier qui me manquait dans la mise en œuvre est que vous devez re-déclarer la fonction en utilisant limitConcurrency AVANT de démarrer la boucle, pas à l’intérieur.

J’ai posé une question très similaire il y a quelques jours: Node.js / Express et les files d’attente parallèles

La solution que j’ai trouvée (voir ma propre réponse) était d’utiliser l’async de Caolan . Il vous permet de créer des “files d’opérations” et vous pouvez en limiter le nombre: voir la méthode “file”.

Dans votre cas, la boucle principale du nœud extrairait des éléments de Q et créerait une tâche dans la queue pour chacun d’eux. Vous pouvez également limiter cela (pour ne pas recréer la queue en dehors de Q), par exemple en ajoutant N nouveaux éléments à la queue uniquement lorsque le dernier élément est en cours d’exécution (le rappel “vide” pour la méthode “queue” ).

Voici le code que j’utilise pour étrangler les q promesses.

Je viens de l’extraire d’un projet pour lequel j’en avais besoin. S’il y a plus de personnes intéressées, je pourrais en faire un module ou quelque chose du genre.

Découvrez les méthodes spex.page et spex.sequence . Ils ont été conçus pour mettre en œuvre toute stratégie possible de limitation des données et d’équilibrage de la charge des promesses.

Voir ci-dessous quelques exemples de la documentation du projet.

Page équilibrée

L’exemple ci-dessous utilise la méthode method pour initier une séquence de 5 pages, puis enregistre les données résolues dans la console. La fonction source sert chaque page avec un délai d’une demi-seconde.

 var $q = require('q'); var spex = require('spex')($q); function source(index, data, delay) { return new $q.Promise(function (resolve, reject) { setTimeout(function () { resolve([ "page-" + index, // simple value; $q.resolve(Date.now()) // promise value; ]) }, 500); // wait 1/2 second before serving the next page; }); } function logger(index, data, delay) { console.log("LOG:", data); } spex.page(source, {dest: logger, limit: 5}) .then(function (data) { console.log("FINISHED:", data); }); 

Sortie:

 LOG: [ 'page-0', 1446050705823 ] LOG: [ 'page-1', 1446050706327 ] LOG: [ 'page-2', 1446050706834 ] LOG: [ 'page-3', 1446050707334 ] LOG: [ 'page-4', 1446050707839 ] FINISHED: { pages: 5, total: 10, duration: 2520 } 

Récepteur de séquence équilibré

Dans l’exemple suivant, nous avons une séquence qui renvoie des données lorsque l’index est inférieur à 5 et la fonction de destination qui applique un délai d’une seconde lors du traitement de chaque donnée résolue à partir de la source.

 var $q = require('q'); var spex = require('spex')($q); function source(index, data, delay) { console.log("SOURCE:", index, data, delay); if (index < 5) { return $q.resolve(index); } } function dest(index, data, delay) { console.log("DEST:", index, data, delay); return new $q.Promise(function (resolve, reject) { setTimeout(function () { resolve(); }, 1000); }); } spex.sequence(source, dest) .then(function (data) { console.log("DATA:", data); }); 

Sortie:

 SOURCE: 0 undefined undefined DEST: 0 0 undefined SOURCE: 1 0 1011 DEST: 1 1 1001 SOURCE: 2 1 1001 DEST: 2 2 1001 SOURCE: 3 2 1000 DEST: 3 3 1000 SOURCE: 4 3 1001 DEST: 4 4 1001 SOURCE: 5 4 1000 DATA: { total: 5, duration: 5013 }