Aerospike аномальное увеличение потребления памяти при использовании UDF через Java-клиент (агрегация запросов)

#java #lua #aerospike

#java #lua #aerospike

Вопрос:

Итак, позвольте мне сначала объяснить. У меня есть пример фрагмента кода из Java, который написан следующим образом:

 Statement statement = new Statement();
statement.setNamespace("foo");
statement.setSetName("bar");
statement.setAggregateFunction(Thread.currentThread().getContextClassLoader(),
    "udf/resource/path","udfFilename","udfFunctionName",
    "args1","args2","args3","args4");
ResultSet rs = aerospikeClient.getClient().queryAggregate(null,statement);

while(rs.next()){
   //insert logic code here 
}
  

С этим фрагментом примера кода я смог использовать UDF, написанный на lua, как указано в документации Aerospike. UDF просто выполняет поиск по нескольким ячейкам и возвращает свои результаты, он никогда не сохраняется и не преобразует какие-либо данные.

Теперь дело в том, что когда функция, использующая этот код, который вызывает UDF, в AMC (консоль управления Aerospike) она порождает задания агрегации, которые помечаются как «выполнено (ok)», но никогда не помечаются как завершенные, и по-прежнему находятся в таблице «Выполняемые задания», а не в таблице «Завершенные задания». (см. Рисунок ниже)

Задания в таблице Выполняемых заданий

Задания в заполненной таблице Jos

и по команде терминала Bash «Top» я увидел, что процент использования памяти сервером Aerospike продолжает расти по мере увеличения числа заданий до тех пор, пока сервер Aerospike не выйдет из строя, поскольку он максимально использовал память компьютера.

Мои вопросы,

  1. возможно ли, чтобы задания освобождали эти ресурсы (если они действительно возникают при ненормальном увеличении памяти)?
  2. если виноваты не задания, то что?

*** ОТРЕДАКТИРОВАНО: Пример кода Lua:

 local function map_request(record)
    return map {response = record.response,
                templateId = record.templateId, 
                id = record.id, requestSent = record.requestSent,
                dateReplied = record.dateReplied}
end

function checkResponse(stream, responseFilter, templateId, validtyPeriod, currentDate)
       local function filterResponse(record)
            if responseFilter ~= "FOO" and validityPeriod > 0 then
                   return (record.response == responseFilter) and 
                          (record.templateId == templateId) and 
                          (record.dateReplied   validityPeriod) > currentDate
            else
                   return (record.response == responseFilter) and 
                          (record.templateId == templateId)
            end
        end
        return stream:filter (filterResponse):map(map_request)
end
  

Комментарии:

1. Прежде всего, напишите свой код lua, если вы пометили свой вопрос как lua

2. привет @Vyacheslav, извини за это, приятель