Эффективный способ оптимизации кода Scala для чтения большого файла, который не помещается в памяти

#scala #optimization #file-io

#scala #оптимизация #file-io

Вопрос:

Постановка проблемы ниже,

У нас есть большой файл журнала, в котором хранятся взаимодействия пользователя с приложением. Записи в файле журнала следуют следующей схеме: {Идентификатор пользователя, временная метка, ActionType}, где ActionType — одно из двух возможных значений: [открыть, закрыть]

Ограничения:

  1. Файл журнала слишком велик, чтобы поместиться в памяти на одном компьютере. Также предположим, что агрегированные данные не помещаются в память.
  2. Код должен быть способен выполняться на одной машине.
  3. Не следует использовать готовую реализацию mapreduce или сторонней базы данных; не предполагайте, что у нас есть Hadoop, Spark или другая платформа распределенных вычислений.
  4. Для каждого пользователя может быть несколько записей каждого типа ActionType, и в файле журнала могут отсутствовать записи. Таким образом, пользователь может пропустить закрытую запись между двумя открытыми записями или наоборот.
  5. Временные метки будут располагаться в строго возрастающем порядке.

Для решения этой проблемы нам необходимо реализовать класс / классы, которые вычисляют среднее время, затраченное каждым пользователем между открытием и закрытием. Имейте в виду, что у некоторых пользователей отсутствуют записи, поэтому нам придется выбирать, как обрабатывать эти записи при выполнении наших вычислений. Код должен следовать согласованной политике в отношении того, как мы делаем этот выбор.

Желаемый результат для решения должен быть [{userId, timeSpent}, ….] для всех пользователей в файле журнала.

Пример файла журнала (текстовый файл, разделенный запятыми)

 1,1435456566,open 
2,1435457643,open 
3,1435458912,open 
1,1435459567,close 
4,1435460345,open 
1,1435461234,open 
2,1435462567,close 
1,1435463456,open 
3,1435464398,close 
4,1435465122,close 
1,1435466775,close
 

Подход

Ниже приведен код, который я написал на Python и Scala, который кажется неэффективным и соответствует ожиданиям данного сценария, я хотел бы получить отзывы от сообщества разработчиков на этом форуме о том, как лучше мы могли бы оптимизировать этот код в соответствии с заданным сценарием.

Реализация Scala

 import java.io.FileInputStream
import java.util.{Scanner, Map, LinkedList}
import java.lang.Long
import scala.collection.mutable

object UserMetrics extends App {
  if (args.length == 0) {
    println("Please provide input data file name for processing")
  } 
  val userMetrics = new UserMetrics()
  userMetrics.readInputFile(args(0),if (args.length == 1) 600000 else args(1).toInt)
}

case class UserInfo(userId: Integer, prevTimeStamp: Long, prevStatus: String, timeSpent: Long, occurence: Integer)

class UserMetrics {

  val usermap = mutable.Map[Integer, LinkedList[UserInfo]]()

  def readInputFile(stArr:String, timeOut: Int) {
    var inputStream: FileInputStream = null
    var sc: Scanner = null
    try {
      inputStream = new FileInputStream(stArr);
      sc = new Scanner(inputStream, "UTF-8");
      while (sc.hasNextLine()) {
        val line: String = sc.nextLine();
        processInput(line, timeOut)
      }
      
      for ((key: Integer, userLs: LinkedList[UserInfo]) <- usermap) {
        val userInfo:UserInfo = userLs.get(0)
        val timespent = if (userInfo.occurence>0) userInfo.timeSpent/userInfo.occurence else 0
        println("{"   key  "," timespent   "}")
      }

      if (sc.ioException() != null) {
        throw sc.ioException();
      }
    } finally {
      if (inputStream != null) {
        inputStream.close();
      }
      if (sc != null) {
        sc.close();
      }
    }
  }

  def processInput(line: String, timeOut: Int) {
    val strSp = line.split(",")

    val userId: Integer = Integer.parseInt(strSp(0))
    val curTimeStamp = Long.parseLong(strSp(1))
    val status = strSp(2)
    val uInfo: UserInfo = UserInfo(userId, curTimeStamp, status, 0, 0)
    val emptyUserInfo: LinkedList[UserInfo] = new LinkedList[UserInfo]()

    val lsUserInfo: LinkedList[UserInfo] = usermap.getOrElse(userId, emptyUserInfo)

    if (lsUserInfo != null amp;amp; lsUserInfo.size() > 0) {
      val lastUserInfo: UserInfo = lsUserInfo.get(lsUserInfo.size() - 1)
      val prevTimeStamp: Long = lastUserInfo.prevTimeStamp
      val prevStatus: String = lastUserInfo.prevStatus
      
      if (prevStatus.equals("open")) {
        if (status.equals(lastUserInfo.prevStatus)) {
           val timeSelector = if ((curTimeStamp - prevTimeStamp) > timeOut) timeOut else curTimeStamp - prevTimeStamp
           val timeDiff = lastUserInfo.timeSpent   timeSelector
          lsUserInfo.remove()
          lsUserInfo.add(UserInfo(userId, curTimeStamp, status, timeDiff, lastUserInfo.occurence   1))
        } else if(!status.equals(lastUserInfo.prevStatus)){
          val timeDiff = lastUserInfo.timeSpent   curTimeStamp - prevTimeStamp
          lsUserInfo.remove()
          lsUserInfo.add(UserInfo(userId, curTimeStamp, status, timeDiff, lastUserInfo.occurence   1))
        }
      } else if(prevStatus.equals("close")) {
        if (status.equals(lastUserInfo.prevStatus)) {
          lsUserInfo.remove()
          val timeSelector = if ((curTimeStamp - prevTimeStamp) > timeOut) timeOut else curTimeStamp - prevTimeStamp
          lsUserInfo.add(UserInfo(userId, curTimeStamp, status, lastUserInfo.timeSpent   timeSelector, lastUserInfo.occurence 1))
        }else if(!status.equals(lastUserInfo.prevStatus))
          {     
          lsUserInfo.remove()
          lsUserInfo.add(UserInfo(userId, curTimeStamp, status, lastUserInfo.timeSpent, lastUserInfo.occurence))
        }
      }
    }else if(lsUserInfo.size()==0){
      lsUserInfo.add(uInfo)
    }
    usermap.put(userId, lsUserInfo)
  }

}
 

Реализация на Python

 import sys

def fileBlockStream(fp, number_of_blocks, block):
    #A generator that splits a file into blocks and iterates over the lines of one of the blocks.
 
    assert 0 <= block and block < number_of_blocks #Assertions to validate number of blocks given
    assert 0 < number_of_blocks
 
    fp.seek(0,2) #seek to end of file to compute block size
    file_size = fp.tell() 
 
    ini = file_size * block / number_of_blocks #compute start amp; end point of file block
    end = file_size * (1   block) / number_of_blocks
 
    if ini <= 0:
        fp.seek(0)
    else:
        fp.seek(ini-1)
        fp.readline()
 
    while fp.tell() < end:
        yield fp.readline() #iterate over lines of the particular chunk or block

def computeResultDS(chunk,avgTimeSpentDict,defaultTimeOut):
    countPos,totTmPos,openTmPos,closeTmPos,nextEventPos = 0,1,2,3,4
    for rows in chunk.splitlines():
        if len(rows.split(",")) != 3:
            continue
        userKeyID = rows.split(",")[0]
        try:
            curTimeStamp = int(rows.split(",")[1])
        except ValueError:
            print("Invalid Timestamp for ID:"   str(userKeyID))
            continue
        curEvent = rows.split(",")[2]
        if userKeyID in avgTimeSpentDict.keys() and avgTimeSpentDict[userKeyID][nextEventPos]==1 and curEvent == "close": 
        #Check if already existing userID with expected Close event 0 - Open; 1 - Close
        #Array value within dictionary stores [No. of pair events, total time spent (Close tm-Open tm), Last Open Tm, Last Close Tm, Next expected Event]
            curTotalTime = curTimeStamp - avgTimeSpentDict[userKeyID][openTmPos]
            totalTime = curTotalTime   avgTimeSpentDict[userKeyID][totTmPos]
            eventCount = avgTimeSpentDict[userKeyID][countPos]   1
            avgTimeSpentDict[userKeyID][countPos] = eventCount
            avgTimeSpentDict[userKeyID][totTmPos] = totalTime
            avgTimeSpentDict[userKeyID][closeTmPos] = curTimeStamp
            avgTimeSpentDict[userKeyID][nextEventPos] = 0 #Change next expected event to Open
        
        elif userKeyID in avgTimeSpentDict.keys() and avgTimeSpentDict[userKeyID][nextEventPos]==0 and curEvent == "open":
            avgTimeSpentDict[userKeyID][openTmPos] = curTimeStamp
            avgTimeSpentDict[userKeyID][nextEventPos] = 1 #Change next expected event to Close
        
        elif userKeyID in avgTimeSpentDict.keys() and avgTimeSpentDict[userKeyID][nextEventPos]==1 and curEvent == "open":
            curTotalTime,closeTime = missingHandler(defaultTimeOut,avgTimeSpentDict[userKeyID][openTmPos],curTimeStamp)
            totalTime = curTotalTime   avgTimeSpentDict[userKeyID][totTmPos]
            avgTimeSpentDict[userKeyID][totTmPos]=totalTime
            avgTimeSpentDict[userKeyID][closeTmPos]=closeTime
            avgTimeSpentDict[userKeyID][openTmPos]=curTimeStamp
            eventCount = avgTimeSpentDict[userKeyID][countPos]   1
            avgTimeSpentDict[userKeyID][countPos] = eventCount          

        elif userKeyID in avgTimeSpentDict.keys() and avgTimeSpentDict[userKeyID][nextEventPos]==0 and curEvent == "close": 
            curTotalTime,openTime = missingHandler(defaultTimeOut,avgTimeSpentDict[userKeyID][closeTmPos],curTimeStamp)
            totalTime = curTotalTime   avgTimeSpentDict[userKeyID][totTmPos]
            avgTimeSpentDict[userKeyID][totTmPos]=totalTime
            avgTimeSpentDict[userKeyID][openTmPos]=openTime
            eventCount = avgTimeSpentDict[userKeyID][countPos]   1
            avgTimeSpentDict[userKeyID][countPos] = eventCount

        elif curEvent == "open":
            #Initialize userid with Open event
            avgTimeSpentDict[userKeyID] = [0,0,curTimeStamp,0,1]
        
        elif curEvent == "close":
            #Initialize userid with missing handler function since there is no Open event for this User
            totaltime,OpenTime = missingHandler(defaultTimeOut,0,curTimeStamp)
            avgTimeSpentDict[userKeyID] = [1,totaltime,OpenTime,curTimeStamp,0]

def missingHandler(defaultTimeOut,curTimeVal,lastTimeVal):
    if lastTimeVal - curTimeVal > defaultTimeOut:
        return defaultTimeOut,curTimeVal
    else:
        return lastTimeVal - curTimeVal,curTimeVal

def computeAvg(avgTimeSpentDict,defaultTimeOut):
    resDict = {}
    for k,v in avgTimeSpentDict.iteritems():
        if v[0] == 0:
            resDict[k] = 0
        else:
            resDict[k] = v[1]/v[0]
    return resDict

if __name__ == "__main__":
    avgTimeSpentDict = {}
    if len(sys.argv) < 2:
        print("Please provide input data file name for processing")
        sys.exit(1)
        
    fileObj = open(sys.argv[1])
    number_of_chunks = 4 if len(sys.argv) < 3 else int(sys.argv[2])
    defaultTimeOut = 60000 if len(sys.argv) < 4 else int(sys.argv[3])
    for chunk_number in range(number_of_chunks):
        for chunk in fileBlockStream(fileObj, number_of_chunks, chunk_number):
            computeResultDS(chunk, avgTimeSpentDict, defaultTimeOut)
    print (computeAvg(avgTimeSpentDict,defaultTimeOut))
    avgTimeSpentDict.clear() #Nullify dictionary 
    fileObj.close #Close the file object

 

Обе вышеприведенные программы дают желаемый результат, но для данного конкретного сценария важна эффективность. Дайте мне знать, если у вас есть что-нибудь получше или какие-либо предложения по существующей реализации.

Заранее спасибо!!

Комментарии:

1. codereview.stackexchange.com это лучшее место для такого рода вопросов

Ответ №1:

То, что вам нужно, — это использование итератора. Я не собираюсь переписывать ваш код заново, но хитрость здесь, скорее всего, заключается в использовании an iterator . К счастью, Scala предоставляет достойный готовый инструментарий для этой работы.

 import scala.io.Source
object ReadBigFiles {
  def read(fileName: String): Unit = {
    val lines: Iterator[String] = Source.fromFile(fileName).getLines
    // now you get iterator semantics for the file line traversal
    // that means you can only go through the lines once, but you don't incur a penalty on heap usage
  }
}
 

Для вашего варианта использования вам, похоже, требуется a lastUser , поэтому вы имеете дело с группами из 2 записей. Я думаю, у вас есть два варианта: либо выбрать iterator.sliding(2) , который будет создавать итераторы для каждой пары, либо просто добавить рекурсию в микс, используя опции.

 def navigate(source: Iterator[String], last: Option[User]): ResultType = {
  if (source.hasNext) {
    val current = source.next()
    last match {
      case Some(existing) => // compare with previous user etc
      case None => navigate(source, Some(current))
    }
  } else {
    // exit recursion, return result
  }
}
 

Вы можете избежать всего кода, который вы написали для чтения файла и так далее. Если вам нужно подсчитать вхождения, просто создайте a Map внутри вашей рекурсии и увеличивайте вхождения на каждом шаге на основе вашей бизнес-логики.

Комментарии:

1. Спасибо за ответ, это может быть очень простой вопрос, но я хочу четко понять. Я не совсем понял, когда вы говорите lastUser? Мне нужно вернуть среднее время использования для всех пользователей, и записи журнала для этих пользователей расположены в порядке возрастания на основе метки времени, поэтому пары (открытие / закрытие записи для каждого пользователя) могут находиться далеко друг от друга. Запись этого в рекурсии не займет много памяти?

2. Затем создайте карту со средними значениями по идентификатору пользователя и вычисляйте их на постоянной основе, это довольно легко сделать. Вас не волнует расстояние между парами, это lastUser был всего лишь пример, я действительно не тратил время на то, чтобы понять вашу бизнес-логику, потому что в данном случае это не меняет подхода.

Ответ №2:

 from queue import LifoQueue, Queue

def averageTime() -> float:

   logs = {}
   records = Queue()

   with open("log.txt") as fp:
       lines = fp.readlines()   
       for line in lines:
           if line[0] not in logs:
               logs[line[0]] = LifoQueue()
               logs[line[0]].put((line[1], line[2]))
           else:
               logs[line[0]].put((line[1], line[2]))

   for k in logs:
       somme = 0
       count = 0
       while not logs[k].empty():
           l = logs[k].get()
           somme = (somme   l[0]) if l[1] == "open" else (somme - l[0])
           count = count   1
       records.put([k, somme, count//2])

   while not records.empty():
       record = records.get()
       print(f"UserId={record[0]} Avg={record[1]/record[2]}")