Проблема с асинхронной синхронизацией Nodejs

#javascript #jquery #ajax #node.js #mongodb

#javascript #jquery #ajax #node.js #mongodb

Вопрос:

В настоящее время я работаю над проектом Nodejs, который использует MongoDB и внимательно отслеживает поток данных сервера Nodejs.

Код моего сервера узлов выглядит следующим образом:

Receive.js
1. Сервер узла получает текстовый файл в формате JSON
2. Сервер узла сообщает MongoDB (upsert), что мы получили файл

Process.js
3. Сервер узла обновляет JSON-файл в MongoDB
4. Сервер узла сообщает MongoDB, что мы обработали указанный файл.

Проблема в том, что иногда # 4 появляется перед # 2, несмотря на то, что # 1 всегда появляется перед # 3. Моя программа мониторинга начинает отображать, что обработано больше файлов, чем получено. Есть ли способ устранить эту проблему, не делая этот сервер полностью синхронным?

Например, если я отправлю своему узловому серверу 500 текстовых файлов JSON, самый последний раз, когда узел выполняет # 2, должен быть до последнего выполнения # 4.

** Примечание: App.js вызовы .получать от Receive.js и Receive.js calls .сохранить из Process.js **

app.js

 http.createServer(app).listen(app.get('port'), function(){
  mongodb.createConnection(function(db){
    if(db){
      console.log('success connecting to mongodb!');
    }
  });
});  
app.post('/log', logCollection.receive);   
  


Receive.js

 exports.receive = function(req, res, next){
  var usagelog = req.body.log;

  if(!usagelog){
    log4js.logger.error("Usagelog is Null!");
    res.end("fail");
  }else{      
    count  ;

    //Upsert the Receive Count for Monitor  
      mongodb.getConnection(function(db){

          dateFormat.initDate();
          //Upsert trip (usage logs)
          var currentDateTime =  moment().format('YYYY-MM-DD hh:mm:ss');
          db.collection("ReceiveCount").update( {"date":dateFormat.currentDate(), "pid":process.pid }, 
                    {"date":dateFormat.currentDate(), "IPAddress": ip.address(), "pid":process.pid, "countReceive" : count, "LastReceivedTime": currentDateTime}, 
                    {upsert:true}, function(err, result) {            
            });
          });

    //End Receive count
    var usagelogJSON = convertToJson(usagelog);
    var usagelogWithCurrentDate = addUpdateTime(usagelogJSON);
    usagelogDao.save(usagelogWithCurrentDate, res);
  }
};  
  

Process.js

 exports.save = function(usagelog, res){
  var selector = upsertCondition(usagelog);

  mongodb.getConnection(function(db){
    //Upsert trip (usage logs)
    db.collection(collectionName()).update(selector, usagelog, {upsert:true, fullResult:true}, function(err, result) {
      if(err){
        log4js.logger.error(err);
        res.end("fail");
      }else{

        if(result.nModified == 0)
            countInsert  ;
        else
            countUpdate  ;

        //Upsert Processed Count (both Updated and Inserted     
        db.collection("ReceiveCount").find({"date":dateFormat.currentDate(), "pid":process.pid },{}).toArray(function (err, docs) {

            var receivecount = docs[0].countReceive;
            var receivetime = docs[0].LastReceivedTime;
            var currentDateTime = moment().format('YYYY-MM-DD hh:mm:ss');
            var MongoCount=0;

            db.collection("raw_" dateFormat.currentDate()).count(function(err, count){
                console.log("raw_" dateFormat.currentDate());

                MongoCount = count;
                console.log("Mongo count is :"  MongoCount);
            });

            db.collection("ReceiveCount").update( {"date":dateFormat.currentDate(), "pid":process.pid }, 
                {"date":dateFormat.currentDate(), "IPAddress": ip.address(), "pid":process.pid, "countUpdate" : countUpdate, "countInsert":countInsert, "countTotal":countUpdate countInsert, "LastProcessTime": currentDateTime,
                "countReceive":receivecount, "LastReceivedTime":receivetime, "MongoCount":MongoCount}, {upsert:true}, function(err, result) {      
            }); 
        });
        res.end("success");
      }
    });
  });
};
  

Комментарии:

1. Несмотря на положительные отзывы, здесь не очень ясный вопрос в терминах SO. Есть способы справиться с этим с помощью асинхронной обработки и библиотек, которые также помогают. Но нам нужно увидеть некоторый код, чтобы на этот вопрос можно было ответить и решить. Покажите свой код, чтобы мы могли видеть, где вы ошибаетесь. Голоса за!= ответы.

2. Лучше для обновления, но я думаю, что контекст того, как вы вызываете свой экспорт «получать» и «сохранять», который является более важной частью вашей проблемы прямо здесь.

Ответ №1:

Вы можете использовать модуль async (https://github.com/caolan/async). Вот псевдокод:

 async.parallel([
  fun1(cb) {
    tellDBYouReceivedFile(..., onFinished() {
      cb();
    });
  },
  fun2(cb) {
    saveYourFileToDB(..., onFinished() {
      cb();
    });
  },
], function() {
  tellDBYouProcessedFile();
});
  

То, чего вы пытаетесь достичь, не может быть выполнено полностью асинхронно, потому что последний шаг (# 4) должен дождаться завершения # 2 и # 3 первыми. Приведенный выше пример может сделать # 2 и # 3 асинхронными и гарантировать, что # 4 выполняется последним.