Как создать список хронологических событий в Splunk

#events #splunk #chronological

#Мероприятия #splunk #хронологический

Вопрос:

У меня есть индекс, в котором перечислены (среди прочего) устройство, дата события и уровень (1-4). Устройства меняют уровни через случайные промежутки времени. Мне нужно создать поиск, который показывает, как долго конкретное устройство находится на определенном уровне, но я не могу выполнить простой подсчет; если устройство находится на уровне 1 в течение трех дней, переходит на уровень 2 в течение пяти дней, затем возвращается на уровень 1 в течение двух дней, подсчет будетпоказать пять дней, что явно неверно. Как я могу сгенерировать поле «Последовательные дни на текущем уровне»?

Мне нужен запрос, который сообщает устройство, дату, уровень и дни на текущем уровне. Спасибо!

Комментарии:

1. Привет, вы пробовали выполнять вложенные запросы?

Ответ №1:

Я подозреваю, что транзакция — это то, что вы ищете. Это группирует события с полями, которые соответствуют (и / или другим критериям), и сообщает о количестве событий и продолжительности.

К сожалению, неясно, как выглядят ваши данные…. Сначала я подумал, что это может быть приближением к вашим данным … устройства регистрируются в случайное время, предоставляя отчет о том, на каком уровне они находятся в данный момент:

 | makeresults count=100 
| eval decrementsecs=random(), decrementday=random()%2,state=random()%4 1,device=random() 1,device=case(device=1,"A",device=2,"B",device=3,"C",device=4,"D",device=5,"E",device=6,"F",device=7,"G",device=8,"H",device=9,"I",device=10,"J") 
| streamstats sum(decrementday) as days sum(decrementsecs) as secs 
| eval _time=_time-secs-(24*60*60*days) 
| fields - days secs decrement* 
  

В этом мире выборки вы ищете количество времени, прошедшего с тех пор, как устройство сообщило о состоянии, отличном от самого последнего сообщенного состояния … что-то вроде:

 <base search>
| streamstats current=f global=f window=2 last(state) as last by device 
| where isnull(last) OR last!=state 
| dedup 2 device 
| transaction device
| eval days_at_current_level=if(eventcount=2,round(duration/(24*60*60)),"unknown")
  

Если, однако, эти образцы не являются образцами и фактически являются самими переходами, то:

 <base search>
| stats max(_time) as _time latest(state) as level by device
| eval days_at_current_level=round((now()-_time)/(24*60*60))
  

Но затем, исходя из того, как вы описывали вещи, кажется, что у вас может быть X количество устройств, каждое из которых ежедневно сообщает о своем состоянии… в этом случае ваши данные на самом деле, вероятно, выглядят так:

 | makeresults count=10
| streamstats count
| eval _time=_time-(24*60*60*(count-1)), device=split("A,B,C,D,E,F,G,H,I,J,K,L,M,N,O,P",",")
| mvexpand device
| eval level=random()%4 1
  

И в этом случае, поскольку я не беспокоюсь об изменениях в течение нескольких дней, мой поиск может выглядеть так просто, как:

 <base search>
| transaction device level
  

(Предостережение в отношении транзакций, помните о необходимых вам данных и ограничьте количество данных, поступающих в транзакцию, для повышения производительности… если вам нужны только _time, device и level, используйте поля, чтобы избавиться от всего остального до транзакции …. например ... | fields - _raw | fields _time device level , ограничивая данные перед транзакцией, вы ограничиваете данные, возвращаемые из индексаторов, и вы ограничиваете объем данных, которые команда транзакции должна хранить вместе/ отслеживайте и, таким образом, вы получаете лучшую производительность)