#streaming #dolphindb
#потоковый #dolphindb
Вопрос:
Например, две таблицы измерений: потоковая таблица A (идентификатор, дата, время, col1, col2, … col10) и таблица dfs B (идентификатор, … значение). Я хотел бы выполнить вычисления для записей в таблице A, где столбец идентификатора соответствует диапазону идентификатора в таблице измерений B при значенииgt;5. Какую функцию я могу использовать для достижения этой цели в DolphinDB?
Ответ №1:
Предположим, у нас есть таблица A и таблица B
n = 5000 tableB = table("no." string(1..n) as `id, rand(1..10, n) as value) rowA = 100000000 rowA = 100000000 tableA = table(rand("no." string(1..n), rowA) as `id, rand(1..100000, rowA) as `col1, rand(1..100000, rowA) as `col2, rand(1..100000, rowA) as `col3)
Используйте общий метод объединения для таблиц измерений:
select tableA.* from lj(tableA, tableB, `id) where tableB.value gt; 5
используйте метод словаря вместо метода объединения. Результат тот же, что и в приведенном выше примере.
dict_B = dict(tableB.id, tableB.value) select * from tableA where dict_B[tableA.id] gt; 5
Методы объединения таблиц измерений могут быть заменены словарем, что не только повышает производительность, но и делает код более кратким. Он также может быть использован в потоковом вычислительном движке.
// Simulated table A, table B n = 5000 tableB = table("no." string(1..n) as `id, rand(1..10, n) as value) rowA = 1000000 streamtableA_temp = streamTable(rand("no." string(1..n), rowA) as `id, rand(1..100000, rowA) as `col1, rand(1..100000, rowA) as `col2, rand(1..100000, rowA) as `col3, rand(1..100000, rowA) as `col4, rand(1..100000, rowA) as `col5, rand(1..100000, rowA) as `col6) share streamtableA_temp as streamtableA // create a cross-sectional engine, filter table A for calculation // Here, dict_B[id] is passed in the filter parameter, which realizes filtering table A according to the records in table B output = table(1:0, `code`v1`v2`v3`b_value, [STRING,INT, INT,DOUBLE,INT]) rse = createReactiveStateEngine(name="ReactiveStateEngine1", metrics =[lt;col1gt;, lt;col2-col3gt;, lt;(col4 col5)col6gt;, lt;dict_B[id]gt;], dummyTable=streamtableA, outputTable=output, keyColumn=`id, filter= lt; dict_B[id]gt;5 gt;) // subscribe to table A subscribeTable(tableName = "streamtableA", actionName="filterAndCalculate", offset=0, handler=append!{rse}, msgAsTable=true, reconnect=true)