Impossible de renseigner une grande quantité de données sur mongodb à l’aide de Node.js

On me demande d’importer une grande quantité de données météorologiques collectées à partir de nombreux sites dans toute la ville. Chaque site a 1 ordinateur avec un dossier, qui est synchronisé sur un serveur central toutes les 5 minutes. Chaque jour, un nouveau fichier est créé. Donc, fondamentalement, la structure est comme ça. Un fichier txt a le format csv, la 1ère ligne étant le champ, le rest étant des nombres.

folder_on_server
| __ site1 __ date1.txt
| | __ date2.txt
|
| __ site2 __ date1.txt
| __ date2.txt
J’ai écrit une petite application node.js pour renseigner ces données sur mongoDB. Cependant, pour le moment, nous n’avons que 3 sites, mais chaque site contient près de 900 fichiers txt, chaque fichier contient 24 * 20 = 288 lignes (les données étant enregistrées toutes les 5 minutes). J’ai essayé d’exécuter l’application de nœud, mais après avoir lu environ 100 fichiers du premier dossier, le programme se bloque avec une erreur concernant un échec d’allocation de mémoire.

J’ai essayé de nombreuses façons d’améliorer cela:

  1. Augmentez la taille de la mémoire de nodejs à 8 Go => un peu mieux, plus de fichiers seront lus mais ne pourront toujours pas passer au dossier suivant.
  2. Définir une variable sur null et undefined à la fin de la boucle _.forEach (j’utilise un trait de soulignement) => n’aide pas.
  3. Décalez le tableau de fichiers (utilisez fs.readdir), de sorte que le premier élément soit supprimé => ne sert à rien

Existe-t-il un moyen de forcer js à nettoyer la mémoire chaque fois qu’il a fini de lire un fichier? Merci

Mise à jour 1: J’ai fini par append 100 fichiers dans chaque dossier à la fois. Cela semble être fastidieux, mais cela a fonctionné, et c’est comme un travail ponctuel. Cependant, je veux toujours trouver une solution pour cela.

Comme Robbie l’a dit, les ruisseaux sont la solution. fs.createReadStream() doit être utilisé à la place de .readFileSync() . Je commencerais par créer un lecteur de ligne qui prend un chemin et quelle que soit la chaîne / regex sur laquelle vous souhaitez scinder:

linereader.js

 var fs = require("fs"); var util = require("util"); var EventEmitter = require("events").EventEmitter; function LineReader(path, splitOn) { var readStream = fs.createReadStream(path); var self = this; var lineNum = 0; var buff = "" var chunk; readStream.on("readable", function() { while( (chunk = readStream.read(100)) !== null) { buff += chunk.toSsortingng(); var lines = buff.split(splitOn); for (var i = 0; i < lines.length - 1; i++) { self.emit("line",lines[i]); lineNum += 1; } buff = lines[lines.length - 1]; } }); readStream.on("close", function() { self.emit("line", buff); self.emit("close") }); readStream.on("error", function(err) { self.emit("error", err); }) } util.inherits(LineReader, EventEmitter); module.exports = LineReader; 

Cela lira un fichier texte et émettra des événements "ligne" pour chaque ligne lue, de sorte que vous ne les aurez pas tous en mémoire en même temps. Ensuite, en utilisant le paquet async (ou la boucle async que vous voulez utiliser), parcourez les fichiers insérant chaque document:

app.js

 var LineReader = require("./linereader.js"); var async = require("async"); var paths = ["./text1.txt", "./text2.txt", "./path1/text3.txt"]; var reader; async.eachSeries(paths, function(path, callback) { reader = new LineReader(path, /\n/g); reader.on("line", function(line) { var doc = turnTextIntoObject(line); db.collection("mycollection").insert(doc); }) reader.on("close", callback); reader.on("error", callback); }, function(err) { // handle error and finish; }) 

Essayez d’utiliser des stream au lieu de charger chaque fichier en mémoire.

Je vous ai envoyé une demande d’extraction avec une implémentation utilisant des stream et une entrée / sortie asynchrone.

C’est la majeure partie:

 var Async = require('async'); var Csv = require('csv-streamify'); var Es = require('event-stream'); var Fs = require('fs'); var Mapping = require('./folder2siteRef.json'); var MongoClient = require('mongodb').MongoClient; var sourcePath = '/hnet/incoming/' + new Date().getFullYear(); Async.auto({ db: function (callback) { console.log('opening db connection'); MongoClient.connect('mongodb://localhost:27017/test3', callback); }, subDirectory: function (callback) { // read the list of subfolder, which are sites Fs.readdir(sourcePath, callback); }, loadData: ['db', 'subDirectory', function (callback, results) { Async.each(results.subDirectory, load(results.db), callback); }], cleanUp: ['db', 'loadData', function (callback, results) { console.log('closing db connection'); results.db.close(callback); }] }, function (err) { console.log(err || 'Done'); }); var load = function (db) { return function (directory, callback) { var basePath = sourcePath + '/' + directory; Async.waterfall([ function (callback) { Fs.readdir(basePath, callback); // array of files in a directory }, function (files, callback) { console.log('loading ' + files.length + ' files from ' + directory); Async.each(files, function (file, callback) { Fs.createReadStream(basePath + '/' + file) .pipe(Csv({objectMode: true, columns: true})) .pipe(transform(directory)) .pipe(batch(200)) .pipe(insert(db).on('end', callback)); }, callback); } ], callback); }; }; var transform = function (directory) { return Es.map(function (data, callback) { data.siteRef = Mapping[directory]; data.epoch = parseInt((data.TheTime - 25569) * 86400) + 6 * 3600; callback(null, data); }); }; var insert = function (db) { return Es.map( function (data, callback) { if (data.length) { var bulk = db.collection('hnet').initializeUnorderedBulkOp(); data.forEach(function (doc) { bulk.insert(doc); }); bulk.execute(callback); } else { callback(); } } ); }; var batch = function (batchSize) { batchSize = batchSize || 1000; var batch = []; return Es.through( function write (data) { batch.push(data); if (batch.length === batchSize) { this.emit('data', batch); batch = []; } }, function end () { if (batch.length) { this.emit('data', batch); batch = []; } this.emit('end'); } ); }; 

J’ai mis à jour votre script tomongo.js à l’aide de stream. Je l’ai également modifié pour utiliser asynchrone au lieu de sync pour son entrée / sortie de fichier.

Je l’ai testé par rapport à la structure définie dans votre code avec de petits ensembles de données et cela a très bien fonctionné. J’ai fait quelques tests limités contre 3xdirs avec 900xfiles et 288xlines. Je ne suis pas sûr de la taille de chaque rangée de vos données, j’ai donc ajouté quelques propriétés aléatoires. C’est assez rapide. Voyez comment cela se passe avec vos données. Si cela pose des problèmes, vous pouvez essayer de le limiter avec des problèmes d’écriture différents lors de l’exécution de l’opération d’insertion en bloc.

Consultez également certains de ces liens pour plus d’informations sur les stream dans node.js:

http://nodestreams.com – un outil écrit par John Resig avec de nombreux exemples de stream.

Et event-stream est un module de stream très utile.