#python #gnuradio
#python #gnuradio
Вопрос:
Я работал с помощью руководств по Gnuradio, чтобы понять, как создаются блоки.
Я пытаюсь создать блок интерполяции (1 вход: 2 выхода), который возвращает входные значения, умноженные на константу в обоих выходных потоках. Я реализовал это как класс multiply_out_fff(), который наследуется от interp_block
import numpy
from gnuradio import gr
class multiply_out_fff(gr.interp_block):
"""
docstring for block multiply_out_fff
"""
def __init__(self, multiple):
gr.interp_block.__init__(self,
name="multiply_out_fff",
in_sig=[numpy.float32],
out_sig=[numpy.float32], interp = 2)
self.multiple = multiple
def work(self, input_items, output_items):
in0 = input_items[0]
out = output_items[0]
print("the data coming in: ", in0)
print("in shape ",in0.shape)
for i in range(0,len(out)/2):
out[i] = in0[i] * self.multiple
for i in range(len(out)/2,len(out)):
out[i] = in0[i-len(out)] * self.multiple
print("the data going out: ", out)
print("out shape ", out.shape)
return len(output_items[0])
Я написал тест для этого блока, и мне удалось заставить его пройти, но не так, как я изначально думал, что это сработает
from gnuradio import gr, gr_unittest
from gnuradio import blocks
from multiply_out_fff import multiply_out_fff
class qa_multiply_out_fff (gr_unittest.TestCase):
def setUp (self):
self.tb = gr.top_block ()
def tearDown (self):
self.tb = None
def test_001_t (self):
# set up fg
self.data = (0,1,-2,5.5)
self.expected_result = (0,2,-4,11,0,2,-4,11)
print("---------testing----------")
print("test data: ", self.data)
#make blocks
self.src = blocks.vector_source_f(self.data)
self.mult = multiply_out_fff(2)
self.snk1 = blocks.vector_sink_f()
self.snk2 = blocks.vector_sink_f()
#make connections
self.tb.connect((self.src,0),(self.mult,0))
self.tb.connect((self.mult,0),(self.snk1,0))
#self.tb.connect((self.mult,1),(self.snk2,0))
self.tb.run ()
self.result_data1 = self.snk1.data()
#self.result_data2 = self.snk2.data()
print("The output data: ", self.result_data1)
print("--------test complete-------------n")
# check data
self.assertFloatTuplesAlmostEqual(self.expected_result, self.result_data1)
#self.assertFloatTuplesAlmostEqual(self.expected_result, self.result_data2)
if __name__ == '__main__':
gr_unittest.run(qa_multiply_out_fff, "qa_multiply_out_fff.xml")
запуск тестового скрипта дает:
~/gnuradio/gr-tutorial/python$ python qa_multiply_out_fff.py
---------testing----------
('test data: ', (0, 1, -2, 5.5))
('the data coming in: ', array([ 0. , 1. , -2. , 5.5], dtype=float32))
('in shape ', (4,))
('the data going out: ', array([ 0., 2., -4., 11., 0., 2., -4., 11.], dtype=float32))
('out shape ', (8,))
('The output data: ', (0.0, 2.0, -4.0, 11.0, 0.0, 2.0, -4.0, 11.0))
--------test complete-------------
.
----------------------------------------------------------------------
Ran 1 test in 0.002s
OK
Концепция, которую я изо всех сил пытаюсь понять, заключается в том, как мой блок возвращает данные. Прямо сейчас он возвращает оба выходных потока интерполяции в одном массиве с удвоенной длиной входных данных. Я думал, что он должен иметь два независимых выходных массива и позволять мне подключать отдельный приемник к каждому выходу, вот так (прокомментировано в тесте):
self.tb.connect((self.mult,0),(self.snk1,0))
self.tb.connect((self.mult,1),(self.snk2,0))
Вместо этого все мои данные поступают в snk1. Если я раскомментирую второе соединение, я получу сообщение об ошибке, информирующее меня о том, что блок self.mult не может иметь больше выходных подключений.
ValueError: port number 1 exceeds max of 0
Как я могу создать блок интерполяции, с помощью которого я могу выполнять несколько выходных подключений?
Ответ №1:
Похоже, я неправильно понял параметр ‘interp’, который определяет соотношение между длиной входного вектора и длиной выходного вектора. Правильный способ создания нескольких выходных данных для этого блока — изменить параметр ‘out_sig’, чтобы он был списком типов выходных данных, как показано ниже в модифицированном классе multiply_out_fff.
import numpy
from gnuradio import gr
class multiply_out_fff(gr.interp_block):
"""
docstring for block multiply_out_fff
"""
def __init__(self, multiple):
gr.interp_block.__init__(self,
name="multiply_out_fff",
in_sig=[numpy.float32],
out_sig=[numpy.float32,numpy.float32], interp = 1)
self.multiple = multiple
def work(self, input_items, output_items):
in0 = input_items[0]
out1 = output_items[0]
out2 = output_items[1]
# print("the data coming in: ", in0)
# print("in shape ",in0.shape)
for i in range(len(in0)):
out1[i] = in0[i] * self.multiple
for i in range(len(in0)):
out2[i] = in0[i] * self.multiple * 2
# print("the data going out: ", out)
# print("out shape ", out.shape)
return len(output_items[0])
Gnuradio — «Если бы нам нужны были векторы, мы могли бы определить их как in_sig=[(numpy.float32,4),numpy.float32]. Это означает, что есть два входных порта, один для векторов с 4 плавающими значениями, а другой для скаляров.»