Как эффективно объединять таблицы измерений, особенно для потоковых вычислений?

#streaming #dolphindb

#потоковый #dolphindb

Вопрос:

Например, две таблицы измерений: потоковая таблица A (идентификатор, дата, время, col1, col2, … col10) и таблица dfs B (идентификатор, … значение). Я хотел бы выполнить вычисления для записей в таблице A, где столбец идентификатора соответствует диапазону идентификатора в таблице измерений B при значенииgt;5. Какую функцию я могу использовать для достижения этой цели в DolphinDB?

Ответ №1:

Предположим, у нас есть таблица A и таблица B

 n = 5000 tableB = table("no." string(1..n) as `id, rand(1..10, n) as value) rowA = 100000000 rowA = 100000000 tableA = table(rand("no." string(1..n), rowA) as `id, rand(1..100000, rowA) as `col1, rand(1..100000, rowA) as `col2, rand(1..100000, rowA) as `col3)  

Используйте общий метод объединения для таблиц измерений:

 select tableA.* from lj(tableA, tableB, `id) where tableB.value gt; 5  

используйте метод словаря вместо метода объединения. Результат тот же, что и в приведенном выше примере.

 dict_B = dict(tableB.id, tableB.value) select * from tableA where dict_B[tableA.id] gt; 5  

Методы объединения таблиц измерений могут быть заменены словарем, что не только повышает производительность, но и делает код более кратким. Он также может быть использован в потоковом вычислительном движке.

 // Simulated table A, table B n = 5000 tableB = table("no." string(1..n) as `id, rand(1..10, n) as value) rowA = 1000000 streamtableA_temp = streamTable(rand("no." string(1..n), rowA) as `id, rand(1..100000, rowA) as `col1, rand(1..100000, rowA) as `col2, rand(1..100000, rowA) as `col3, rand(1..100000, rowA) as `col4, rand(1..100000, rowA) as `col5, rand(1..100000, rowA) as `col6) share streamtableA_temp as streamtableA  // create a cross-sectional engine, filter table A for calculation  // Here, dict_B[id] is passed in the filter parameter, which realizes filtering table A according to the records in table B output = table(1:0, `code`v1`v2`v3`b_value, [STRING,INT, INT,DOUBLE,INT]) rse = createReactiveStateEngine(name="ReactiveStateEngine1", metrics =[lt;col1gt;, lt;col2-col3gt;, lt;(col4 col5)col6gt;, lt;dict_B[id]gt;], dummyTable=streamtableA, outputTable=output, keyColumn=`id, filter= lt; dict_B[id]gt;5 gt;)   // subscribe to table A subscribeTable(tableName = "streamtableA", actionName="filterAndCalculate", offset=0, handler=append!{rse}, msgAsTable=true, reconnect=true)