#python #numpy
#python #numpy
Вопрос:
У меня есть некоторые двоичные данные для анализа, и мне нужно найти, где начинаются пакеты. Все пакеты начинаются с одного и того же заголовка, но размер пакета является переменным. Заголовок представляет собой 32-разрядное целое число без знака.
Ниже приведена моя реализация, но она медленная. Есть ли какая-то функциональность numpy или другие опции, чтобы ускорить эту операцию?
"""Example of binary data:
d9 37 b2 a5 08 31 03 ... 46 00 00 01 b9 1e 43 ... d9 37 b2 a5 30 90 06 00 cb... 08 00 30 43 d9 37 b2 a5 ... 04 01 c8 f4 ...
"""
def sliding_window(iterable, n=2):
"""Return a zipped object where each item is a sliding group of n elements from iterable.
Example:
in = [1,2,3,4,5,6,7,8]
out = [[1,2,3,4],[2,3,4,5],[3,4,5,6],[4,5,6,7],[5,6,7,8]]
"""
iterables = itertools.tee(iterable, n)
for iterable, num_skipped in zip(iterables, itertools.count()):
for _ in range(num_skipped):
next(iterable, None)
return zip(*iterables)
packet_header = 0xd937b2a5
dat_file = "path/to/file"
dat = np.fromfile(file=dat_file,dtype = np.uint8)
sw_u8s = sliding_window(dat,4)
#this is really slow
sw_u32s = [struct.unpack('>I', bytes(bb)) for bb in sw_u8s]
# then do something like
# packet_start = np.argwhere(sw_u32s == packet_header)
# to find the indices of the packet headers
Ответ №1:
Прочитайте ваш массив как np.uint8
:
import numpy as np
# -v- packet header starts at byte 3
data_bytes = bytes([0x32, 0x12, 0x8a, 0xd9, 0x37, 0xb2, 0xa5, 0xf8, 0x3d])
dat = np.frombuffer(data_bytes, dtype=np.uint8)
Создайте представление значений «windowed int32»:
dat_32win = np.ndarray((len(dat) - 3,), dtype=np.uint32, buffer=dat, strides=(1,))
Теперь вы берете заголовок своего пакета в порядке байтов вашего компьютера. При вероятной вероятности того, что он имеет малый порядковый номер, вам нужно изменить порядок байтов (вы можете сделать это программно с int.to_bytes
помощью and int.from_bytes
, но, похоже, это не стоит того):
packet_header_le = 0xa5b237d9
Теперь вам просто нужно найти индекс этого значения:
idx = np.argmax(dat_32win == packet_header_le)
print(idx)
# 3
Одно примечание: если заголовок пакета не находится в последовательности байтов, то этот последний np.argmax
вернет 0
(потому что его аргументом будет массив, полный False
значений), что является тем же результатом, который вы получили бы, если бы заголовок пакета начинался с первого байта. Возможно, вы захотите обработать это условие ошибки, проверив, что значение заголовка пакета действительно существует.
Комментарии:
1. Это здорово! Почему вы решили использовать argmax вместо argwhere?
2. @bass2 Я предположил, что вам нужен индекс первого вхождения заголовка пакета, что
argmax
и даст вам (если заголовка пакета вообще нет в данных, как уже отмечалось).).argwhere
даст вам индекс всех вхождений. Я не уверен, что для вас более полезно, но идея остается той же.3. @bass2 Я немного изменил ответ, в предыдущем ответе были отброшены некоторые последние байты (например, если у вас было 11 байт, последние три будут отброшены), поэтому, если заголовок пакета оказался там частично, он не будет обнаружен.