Какой более быстрый способ прочитать двоичный файл как uint8, создать скользящее окно uint32 и найти индексы, где значение uint32 == x?

#python #numpy

#python #numpy

Вопрос:

У меня есть некоторые двоичные данные для анализа, и мне нужно найти, где начинаются пакеты. Все пакеты начинаются с одного и того же заголовка, но размер пакета является переменным. Заголовок представляет собой 32-разрядное целое число без знака.

Ниже приведена моя реализация, но она медленная. Есть ли какая-то функциональность numpy или другие опции, чтобы ускорить эту операцию?

 """Example of binary data:

d9 37 b2 a5 08 31 03 ... 46 00 00 01 b9 1e 43 ... d9 37 b2 a5 30 90 06 00 cb... 08 00 30 43 d9 37 b2 a5 ... 04 01 c8 f4 ...

"""

def sliding_window(iterable, n=2):
    """Return a zipped object where each item is a sliding group of n elements from iterable.

    Example: 

    in = [1,2,3,4,5,6,7,8]
    out = [[1,2,3,4],[2,3,4,5],[3,4,5,6],[4,5,6,7],[5,6,7,8]]

    """

    iterables = itertools.tee(iterable, n)

    for iterable, num_skipped in zip(iterables, itertools.count()):
        for _ in range(num_skipped):
            next(iterable, None)

    return zip(*iterables)


packet_header = 0xd937b2a5

dat_file = "path/to/file"
dat = np.fromfile(file=dat_file,dtype = np.uint8)
sw_u8s = sliding_window(dat,4)

#this is really slow
sw_u32s = [struct.unpack('>I', bytes(bb)) for bb in sw_u8s]

# then do something like 
# packet_start = np.argwhere(sw_u32s == packet_header)
# to find the indices of the packet headers
  

Ответ №1:

Прочитайте ваш массив как np.uint8 :

 import numpy as np
#                                      -v- packet header starts at byte 3
data_bytes = bytes([0x32, 0x12, 0x8a, 0xd9, 0x37, 0xb2, 0xa5, 0xf8, 0x3d])
dat = np.frombuffer(data_bytes, dtype=np.uint8)
  

Создайте представление значений «windowed int32»:

 dat_32win = np.ndarray((len(dat) - 3,), dtype=np.uint32, buffer=dat, strides=(1,))
  

Теперь вы берете заголовок своего пакета в порядке байтов вашего компьютера. При вероятной вероятности того, что он имеет малый порядковый номер, вам нужно изменить порядок байтов (вы можете сделать это программно с int.to_bytes помощью and int.from_bytes , но, похоже, это не стоит того):

 packet_header_le = 0xa5b237d9
  

Теперь вам просто нужно найти индекс этого значения:

 idx = np.argmax(dat_32win == packet_header_le)
print(idx)
# 3
  

Одно примечание: если заголовок пакета не находится в последовательности байтов, то этот последний np.argmax вернет 0 (потому что его аргументом будет массив, полный False значений), что является тем же результатом, который вы получили бы, если бы заголовок пакета начинался с первого байта. Возможно, вы захотите обработать это условие ошибки, проверив, что значение заголовка пакета действительно существует.

Комментарии:

1. Это здорово! Почему вы решили использовать argmax вместо argwhere?

2. @bass2 Я предположил, что вам нужен индекс первого вхождения заголовка пакета, что argmax и даст вам (если заголовка пакета вообще нет в данных, как уже отмечалось).). argwhere даст вам индекс всех вхождений. Я не уверен, что для вас более полезно, но идея остается той же.

3. @bass2 Я немного изменил ответ, в предыдущем ответе были отброшены некоторые последние байты (например, если у вас было 11 байт, последние три будут отброшены), поэтому, если заголовок пакета оказался там частично, он не будет обнаружен.