PySpark добавляет строки в набор данных в потоковой передаче

#python #apache-spark #pyspark #global-variables #spark-streaming

Вопрос:

Я новичок в мире Spark и, в частности, в pyspark. У меня есть приложение, которое получает данные от Кафки в потоковом режиме и должно каким-то образом обрабатывать эти данные. Что мне нужно, так это добавить новую строку в исходный набор данных, в нескольких словах, мой набор данных в моем приложении расширяется. Вот моя попытка:

 universe = spark_session.read.csv('./dataset/data_v4', header=True, inferSchema=True)
universe = universe.withColumn('id', monotonically_increasing_id())
universe.createOrReplaceTempView('universe')

universe.show()

ssc = StreamingContext(spark_session.sparkContext, 5)

quiet_logs(spark_context)

brokers = [0]
topic = 'mytopic'
directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic], kafkaParams={"bootstrap.servers": 'myAmazonServer'})

print('Attendo i dati...')
directKafkaStream.foreachRDD(compute_rdd)
ssc.start()
ssc.awaitTermination()
 

В функции compute_rdd я пытаюсь сделать это:

 def compute_rdd(time, rdd):

    if rdd.count() > 0:
       
        print('New data...')

        stream_data = rdd.collect()
        data = json.loads(stream_data[0][1])
        date_format = '%Y-%m-%dT%H:%M:%S'

        new_data = {
            'id': universe.select('id').collect()[-1]['id']   1, # cambiare con funzione random o qualche funzione di spark
            'timestamp': datetime.strptime(data['timestamp'], date_format),
            'vessel': str(data['vessel']),
            'velocity': float(data['velocity']),
            'distance': float(data['distance']),
            'drift_angle': float(data['drift_angle']),
            'decision': int(data['decision'])

        }

        
        times = {}
        if universe.where(f'vessel=="{new_data["vessel"]}"').rdd.isEmpty():

            print('Add data:', new_data)
            new_row = spark_session.createDataFrame([new_data], schema=universe.schema)

            universe = universe.union(new_row) # here the error
 

Проблема в том, что я не могу добавить новую строку, используя классический sintax:

 df = df.union(new_row)
 

Возникшая ошибка заключается в UnboundLocalError: local variable 'universe' referenced before assignment
Я попытался сделать свою переменную universe , содержащую мой набор данных, глобальной, написав эти функции:

 def read_universe(datafile):
    if 'universe' not in globals():
        universe = get_spark_context().read.csv(datafile, header=True, inferSchema=True)
        universe = universe.withColumn(ID_COLUMN_NAME, monotonically_increasing_id())
        universe.createOrReplaceTempView('universe')

        globals()['universe'] = universe
        return globals()['universe']

    return globals()['universe']


def get_universe():
    return globals()['universe']

def universe_update(new_universe):
    globals()['universe'] = new_universe

    return globals()['universe']
 

In this way, using these functions, I solved my problem, but is it the right way to reasoning in pyspark framework? How works Global variables in the spark environment? Global variables have impact on scalability and parallelism in Spark?

Of course, I’m here also for suggestions and other solutions.