Фрейм данных с несколькими списками преобразуется в столбец в структурированной потоковой передаче spark

#pyspark #spark-structured-streaming

Вопрос:

У меня следующая проблема, у меня есть фрейм данных в структурированной потоковой передаче spark, который содержит два столбца со списком словарей. Схема, которую я создал для структуры данных, которая у меня есть, выглядит следующим образом:

         tick_by_tick_schema = StructType([
            StructField('localSymbol', StringType()),
            StructField('tickByTicks', ArrayType(StructType([
                StructField('price', StringType()),
                StructField('size', StringType()),
                StructField('specialConditions', StringType()),
            ]))),
            StructField('domBids', ArrayType(StructType([
                StructField('price_bid', StringType()),
                StructField('size_bid', StringType()),
                StructField('marketMaker_bid', StringType()),
            ]))),
            StructField('domAsks', ArrayType(StructType([
                StructField('price_ask', StringType()),
                StructField('size_ask', StringType()),
                StructField('marketMaker_ask', StringType()),
            ])))
        ])

 

Мой фрейм данных таков:

  ----------- ------------------ ---------------------------------------------------------------------------------------------------- ------------------------------------------------------------------------------------------------- 
|localSymbol|tickByTicks       |domBids                                                                                             |domAsks                                                                                          |
 ----------- ------------------ ---------------------------------------------------------------------------------------------------- ------------------------------------------------------------------------------------------------- 
|BABA       |[{213.73, 134, T}]|[{213.51, 1, ARCA}, {213.51, 1, NSDQ}, {213.5, 12, NSDQ}, {213.06, 1, ARCA}, {213.01, 10, DRCTEDGE}]|[{213.75, 45, ARCA}, {213.95, 1, DRCTEDGE}, {214.0, 1, ARCA}, {214.0, 1, NSDQ}, {214.1, 1, NSDQ}]|
 ----------- ------------------ ---------------------------------------------------------------------------------------------------- ------------------------------------------------------------------------------------------------- 
 

Теперь я хотел бы получить что-то вроде этого:

  ----------- ------ --------- --------- 
|localSymbol|price |price_bid|price_ask|
 ----------- ------ --------- --------- 
|BABA       |213.73|213.51   |213.75   |
|BABA       |213.73|213.51   |213.95   |
|BABA       |213.73|213.5    |214.0    |
|BABA       |213.73|213.06   |214.0    |
|BABA       |213.73|213.01   |214.1    | 
 ----------- ------ --------- --------- 
 

я попробовал это:

         df = self.tick_by_tick_data_processed
            .withColumn('price', f.explode(f.col('tickByTicks.price'))) 
            .withColumn('price_ask', f.explode(f.col('domAsks.price_ask'))) 
            .withColumn('price_bid', f.explode(f.col('domBids.price_bid'))).select('localSymbol','price','price_bid','price_ask')
 

но не работает, я бы не хотел группироваться по временному окну, поэтому я бы не хотел делать группировку по

не могли бы вы мне помочь?

Спасибо!

Комментарии:

1. @Кафельс, не могли бы вы мне помочь?

2. Просто обратите внимание, переполнение стека не уведомляет пользователя, даже если вы ставите @[имя пользователя], и я комментирую здесь, потому что это было совпадение

Ответ №1:

Мое решение состоит в том, что:

         df1 = self.tick_by_tick_data_processed
            .select(
                f.col('localSymbol').alias('localSymbol_bids'),
                f.col('time').cast('string').alias('time_bids'),
                f.posexplode('domBids.price_bid').alias('id_bids','price_bids')
            )

        df2 = self.tick_by_tick_data_processed
            .select(
                f.col('localSymbol').alias('localSymbol_asks'),
                f.col('time').cast('string').alias('time_asks'),
                f.posexplode('domAsks.price_ask').alias('id_asks','price_asks')
            )

        join_expr = "id_bids=id_asks AND localSymbol_bids=localSymbol_asks AND time_bids=time_asks"
        join_type = "inner"
        join_df = df1.join(df2, f.expr(join_expr), join_type)
 

и в результате получается:

  ---------------- -------------------------- ------- ---------- ---------------- -------------------------- ------- ---------- 
|localSymbol_bids|time_bids                 |id_bids|price_bids|localSymbol_asks|time_asks                 |id_asks|price_asks|
 ---------------- -------------------------- ------- ---------- ---------------- -------------------------- ------- ---------- 
|BABA            |2021-06-10 13:23:50.701279|0      |219.35    |BABA            |2021-06-10 13:23:50.701279|0      |213.45    |
|BABA            |2021-06-10 13:23:50.701279|1      |214.0     |BABA            |2021-06-10 13:23:50.701279|1      |213.46    |
|BABA            |2021-06-10 13:23:50.701279|4      |213.5     |BABA            |2021-06-10 13:23:50.701279|4      |213.5     |
|BABA            |2021-06-10 13:23:50.701279|2      |213.6     |BABA            |2021-06-10 13:23:50.701279|2      |213.5     |
|BABA            |2021-06-10 13:23:50.701279|3      |213.55    |BABA            |2021-06-10 13:23:50.701279|3      |213.5     |
 ---------------- -------------------------- ------- ---------- ---------------- -------------------------- ------- ----------