#scala #optimization #file-io
#scala #оптимизация #file-io
Вопрос:
Постановка проблемы ниже,
У нас есть большой файл журнала, в котором хранятся взаимодействия пользователя с приложением. Записи в файле журнала следуют следующей схеме: {Идентификатор пользователя, временная метка, ActionType}, где ActionType — одно из двух возможных значений: [открыть, закрыть]
Ограничения:
- Файл журнала слишком велик, чтобы поместиться в памяти на одном компьютере. Также предположим, что агрегированные данные не помещаются в память.
- Код должен быть способен выполняться на одной машине.
- Не следует использовать готовую реализацию mapreduce или сторонней базы данных; не предполагайте, что у нас есть Hadoop, Spark или другая платформа распределенных вычислений.
- Для каждого пользователя может быть несколько записей каждого типа ActionType, и в файле журнала могут отсутствовать записи. Таким образом, пользователь может пропустить закрытую запись между двумя открытыми записями или наоборот.
- Временные метки будут располагаться в строго возрастающем порядке.
Для решения этой проблемы нам необходимо реализовать класс / классы, которые вычисляют среднее время, затраченное каждым пользователем между открытием и закрытием. Имейте в виду, что у некоторых пользователей отсутствуют записи, поэтому нам придется выбирать, как обрабатывать эти записи при выполнении наших вычислений. Код должен следовать согласованной политике в отношении того, как мы делаем этот выбор.
Желаемый результат для решения должен быть [{userId, timeSpent}, ….] для всех пользователей в файле журнала.
Пример файла журнала (текстовый файл, разделенный запятыми)
1,1435456566,open
2,1435457643,open
3,1435458912,open
1,1435459567,close
4,1435460345,open
1,1435461234,open
2,1435462567,close
1,1435463456,open
3,1435464398,close
4,1435465122,close
1,1435466775,close
Подход
Ниже приведен код, который я написал на Python и Scala, который кажется неэффективным и соответствует ожиданиям данного сценария, я хотел бы получить отзывы от сообщества разработчиков на этом форуме о том, как лучше мы могли бы оптимизировать этот код в соответствии с заданным сценарием.
Реализация Scala
import java.io.FileInputStream
import java.util.{Scanner, Map, LinkedList}
import java.lang.Long
import scala.collection.mutable
object UserMetrics extends App {
if (args.length == 0) {
println("Please provide input data file name for processing")
}
val userMetrics = new UserMetrics()
userMetrics.readInputFile(args(0),if (args.length == 1) 600000 else args(1).toInt)
}
case class UserInfo(userId: Integer, prevTimeStamp: Long, prevStatus: String, timeSpent: Long, occurence: Integer)
class UserMetrics {
val usermap = mutable.Map[Integer, LinkedList[UserInfo]]()
def readInputFile(stArr:String, timeOut: Int) {
var inputStream: FileInputStream = null
var sc: Scanner = null
try {
inputStream = new FileInputStream(stArr);
sc = new Scanner(inputStream, "UTF-8");
while (sc.hasNextLine()) {
val line: String = sc.nextLine();
processInput(line, timeOut)
}
for ((key: Integer, userLs: LinkedList[UserInfo]) <- usermap) {
val userInfo:UserInfo = userLs.get(0)
val timespent = if (userInfo.occurence>0) userInfo.timeSpent/userInfo.occurence else 0
println("{" key "," timespent "}")
}
if (sc.ioException() != null) {
throw sc.ioException();
}
} finally {
if (inputStream != null) {
inputStream.close();
}
if (sc != null) {
sc.close();
}
}
}
def processInput(line: String, timeOut: Int) {
val strSp = line.split(",")
val userId: Integer = Integer.parseInt(strSp(0))
val curTimeStamp = Long.parseLong(strSp(1))
val status = strSp(2)
val uInfo: UserInfo = UserInfo(userId, curTimeStamp, status, 0, 0)
val emptyUserInfo: LinkedList[UserInfo] = new LinkedList[UserInfo]()
val lsUserInfo: LinkedList[UserInfo] = usermap.getOrElse(userId, emptyUserInfo)
if (lsUserInfo != null amp;amp; lsUserInfo.size() > 0) {
val lastUserInfo: UserInfo = lsUserInfo.get(lsUserInfo.size() - 1)
val prevTimeStamp: Long = lastUserInfo.prevTimeStamp
val prevStatus: String = lastUserInfo.prevStatus
if (prevStatus.equals("open")) {
if (status.equals(lastUserInfo.prevStatus)) {
val timeSelector = if ((curTimeStamp - prevTimeStamp) > timeOut) timeOut else curTimeStamp - prevTimeStamp
val timeDiff = lastUserInfo.timeSpent timeSelector
lsUserInfo.remove()
lsUserInfo.add(UserInfo(userId, curTimeStamp, status, timeDiff, lastUserInfo.occurence 1))
} else if(!status.equals(lastUserInfo.prevStatus)){
val timeDiff = lastUserInfo.timeSpent curTimeStamp - prevTimeStamp
lsUserInfo.remove()
lsUserInfo.add(UserInfo(userId, curTimeStamp, status, timeDiff, lastUserInfo.occurence 1))
}
} else if(prevStatus.equals("close")) {
if (status.equals(lastUserInfo.prevStatus)) {
lsUserInfo.remove()
val timeSelector = if ((curTimeStamp - prevTimeStamp) > timeOut) timeOut else curTimeStamp - prevTimeStamp
lsUserInfo.add(UserInfo(userId, curTimeStamp, status, lastUserInfo.timeSpent timeSelector, lastUserInfo.occurence 1))
}else if(!status.equals(lastUserInfo.prevStatus))
{
lsUserInfo.remove()
lsUserInfo.add(UserInfo(userId, curTimeStamp, status, lastUserInfo.timeSpent, lastUserInfo.occurence))
}
}
}else if(lsUserInfo.size()==0){
lsUserInfo.add(uInfo)
}
usermap.put(userId, lsUserInfo)
}
}
Реализация на Python
import sys
def fileBlockStream(fp, number_of_blocks, block):
#A generator that splits a file into blocks and iterates over the lines of one of the blocks.
assert 0 <= block and block < number_of_blocks #Assertions to validate number of blocks given
assert 0 < number_of_blocks
fp.seek(0,2) #seek to end of file to compute block size
file_size = fp.tell()
ini = file_size * block / number_of_blocks #compute start amp; end point of file block
end = file_size * (1 block) / number_of_blocks
if ini <= 0:
fp.seek(0)
else:
fp.seek(ini-1)
fp.readline()
while fp.tell() < end:
yield fp.readline() #iterate over lines of the particular chunk or block
def computeResultDS(chunk,avgTimeSpentDict,defaultTimeOut):
countPos,totTmPos,openTmPos,closeTmPos,nextEventPos = 0,1,2,3,4
for rows in chunk.splitlines():
if len(rows.split(",")) != 3:
continue
userKeyID = rows.split(",")[0]
try:
curTimeStamp = int(rows.split(",")[1])
except ValueError:
print("Invalid Timestamp for ID:" str(userKeyID))
continue
curEvent = rows.split(",")[2]
if userKeyID in avgTimeSpentDict.keys() and avgTimeSpentDict[userKeyID][nextEventPos]==1 and curEvent == "close":
#Check if already existing userID with expected Close event 0 - Open; 1 - Close
#Array value within dictionary stores [No. of pair events, total time spent (Close tm-Open tm), Last Open Tm, Last Close Tm, Next expected Event]
curTotalTime = curTimeStamp - avgTimeSpentDict[userKeyID][openTmPos]
totalTime = curTotalTime avgTimeSpentDict[userKeyID][totTmPos]
eventCount = avgTimeSpentDict[userKeyID][countPos] 1
avgTimeSpentDict[userKeyID][countPos] = eventCount
avgTimeSpentDict[userKeyID][totTmPos] = totalTime
avgTimeSpentDict[userKeyID][closeTmPos] = curTimeStamp
avgTimeSpentDict[userKeyID][nextEventPos] = 0 #Change next expected event to Open
elif userKeyID in avgTimeSpentDict.keys() and avgTimeSpentDict[userKeyID][nextEventPos]==0 and curEvent == "open":
avgTimeSpentDict[userKeyID][openTmPos] = curTimeStamp
avgTimeSpentDict[userKeyID][nextEventPos] = 1 #Change next expected event to Close
elif userKeyID in avgTimeSpentDict.keys() and avgTimeSpentDict[userKeyID][nextEventPos]==1 and curEvent == "open":
curTotalTime,closeTime = missingHandler(defaultTimeOut,avgTimeSpentDict[userKeyID][openTmPos],curTimeStamp)
totalTime = curTotalTime avgTimeSpentDict[userKeyID][totTmPos]
avgTimeSpentDict[userKeyID][totTmPos]=totalTime
avgTimeSpentDict[userKeyID][closeTmPos]=closeTime
avgTimeSpentDict[userKeyID][openTmPos]=curTimeStamp
eventCount = avgTimeSpentDict[userKeyID][countPos] 1
avgTimeSpentDict[userKeyID][countPos] = eventCount
elif userKeyID in avgTimeSpentDict.keys() and avgTimeSpentDict[userKeyID][nextEventPos]==0 and curEvent == "close":
curTotalTime,openTime = missingHandler(defaultTimeOut,avgTimeSpentDict[userKeyID][closeTmPos],curTimeStamp)
totalTime = curTotalTime avgTimeSpentDict[userKeyID][totTmPos]
avgTimeSpentDict[userKeyID][totTmPos]=totalTime
avgTimeSpentDict[userKeyID][openTmPos]=openTime
eventCount = avgTimeSpentDict[userKeyID][countPos] 1
avgTimeSpentDict[userKeyID][countPos] = eventCount
elif curEvent == "open":
#Initialize userid with Open event
avgTimeSpentDict[userKeyID] = [0,0,curTimeStamp,0,1]
elif curEvent == "close":
#Initialize userid with missing handler function since there is no Open event for this User
totaltime,OpenTime = missingHandler(defaultTimeOut,0,curTimeStamp)
avgTimeSpentDict[userKeyID] = [1,totaltime,OpenTime,curTimeStamp,0]
def missingHandler(defaultTimeOut,curTimeVal,lastTimeVal):
if lastTimeVal - curTimeVal > defaultTimeOut:
return defaultTimeOut,curTimeVal
else:
return lastTimeVal - curTimeVal,curTimeVal
def computeAvg(avgTimeSpentDict,defaultTimeOut):
resDict = {}
for k,v in avgTimeSpentDict.iteritems():
if v[0] == 0:
resDict[k] = 0
else:
resDict[k] = v[1]/v[0]
return resDict
if __name__ == "__main__":
avgTimeSpentDict = {}
if len(sys.argv) < 2:
print("Please provide input data file name for processing")
sys.exit(1)
fileObj = open(sys.argv[1])
number_of_chunks = 4 if len(sys.argv) < 3 else int(sys.argv[2])
defaultTimeOut = 60000 if len(sys.argv) < 4 else int(sys.argv[3])
for chunk_number in range(number_of_chunks):
for chunk in fileBlockStream(fileObj, number_of_chunks, chunk_number):
computeResultDS(chunk, avgTimeSpentDict, defaultTimeOut)
print (computeAvg(avgTimeSpentDict,defaultTimeOut))
avgTimeSpentDict.clear() #Nullify dictionary
fileObj.close #Close the file object
Обе вышеприведенные программы дают желаемый результат, но для данного конкретного сценария важна эффективность. Дайте мне знать, если у вас есть что-нибудь получше или какие-либо предложения по существующей реализации.
Заранее спасибо!!
Комментарии:
1. codereview.stackexchange.com это лучшее место для такого рода вопросов
Ответ №1:
То, что вам нужно, — это использование итератора. Я не собираюсь переписывать ваш код заново, но хитрость здесь, скорее всего, заключается в использовании an iterator
. К счастью, Scala предоставляет достойный готовый инструментарий для этой работы.
import scala.io.Source
object ReadBigFiles {
def read(fileName: String): Unit = {
val lines: Iterator[String] = Source.fromFile(fileName).getLines
// now you get iterator semantics for the file line traversal
// that means you can only go through the lines once, but you don't incur a penalty on heap usage
}
}
Для вашего варианта использования вам, похоже, требуется a lastUser
, поэтому вы имеете дело с группами из 2 записей. Я думаю, у вас есть два варианта: либо выбрать iterator.sliding(2)
, который будет создавать итераторы для каждой пары, либо просто добавить рекурсию в микс, используя опции.
def navigate(source: Iterator[String], last: Option[User]): ResultType = {
if (source.hasNext) {
val current = source.next()
last match {
case Some(existing) => // compare with previous user etc
case None => navigate(source, Some(current))
}
} else {
// exit recursion, return result
}
}
Вы можете избежать всего кода, который вы написали для чтения файла и так далее. Если вам нужно подсчитать вхождения, просто создайте a Map
внутри вашей рекурсии и увеличивайте вхождения на каждом шаге на основе вашей бизнес-логики.
Комментарии:
1. Спасибо за ответ, это может быть очень простой вопрос, но я хочу четко понять. Я не совсем понял, когда вы говорите lastUser? Мне нужно вернуть среднее время использования для всех пользователей, и записи журнала для этих пользователей расположены в порядке возрастания на основе метки времени, поэтому пары (открытие / закрытие записи для каждого пользователя) могут находиться далеко друг от друга. Запись этого в рекурсии не займет много памяти?
2. Затем создайте карту со средними значениями по идентификатору пользователя и вычисляйте их на постоянной основе, это довольно легко сделать. Вас не волнует расстояние между парами, это
lastUser
был всего лишь пример, я действительно не тратил время на то, чтобы понять вашу бизнес-логику, потому что в данном случае это не меняет подхода.
Ответ №2:
from queue import LifoQueue, Queue
def averageTime() -> float:
logs = {}
records = Queue()
with open("log.txt") as fp:
lines = fp.readlines()
for line in lines:
if line[0] not in logs:
logs[line[0]] = LifoQueue()
logs[line[0]].put((line[1], line[2]))
else:
logs[line[0]].put((line[1], line[2]))
for k in logs:
somme = 0
count = 0
while not logs[k].empty():
l = logs[k].get()
somme = (somme l[0]) if l[1] == "open" else (somme - l[0])
count = count 1
records.put([k, somme, count//2])
while not records.empty():
record = records.get()
print(f"UserId={record[0]} Avg={record[1]/record[2]}")