Преобразование csv в .avro с использованием зависимостей PySpark — отсутствуют

#python #csv #apache-spark #pyspark #avro

Вопрос:

У меня есть скрипт на Python, который использует fastavro библиотеку для преобразования csv-файла и сериализует его в соответствии с предоставленной схемой:

 from fastavro import writer
from fastavro.schema import load_schema
import csv

schema = load_schema('schema.avsc')

def csv_reader():
    with open('data.csv') as f:
        yield from csv.DictReader(f)
with open('data.snappy.avro', 'wb') as out:
    writer(out, schema, csv_reader(), codec='snappy')
 

Вышесказанное прекрасно работает с небольшими файлами, но ужасно медленно с большими файлами. Сериализация csv — файла размером 185 МБ заняла 4,5 минуты, а размер некоторых моих файлов приближается к 5 ГБ.

Итак, я решил проверить и посмотреть, как Spark обрабатывает преобразования csv в avro с помощью PySpark 2.4.3:

 from pyspark.sql import SparkSession
from pyspark.sql.avro.functions import from_avro, to_avro

spark = SparkSession 
    .builder 
    .appName("Avro testing") 
    .getOrCreate()

schema = open("schema.avsc", "r").read()

df = spark.read.csv(path="/data/data.csv",
                    header=True)
output = df
  .select(from_avro("value", schema).alias("user"))
  .where('user.favorite_color == "red"')
  .select(to_avro("user.name").alias("value"))
 

но это возвращает следующую ошибку:

ModuleNotFoundError: Нет модуля с именем «pyspark.sql.avro»

Хорошо, я понимаю, что библиотека avro не включена по умолчанию, и имеет смысл, что я получаю ошибку.

Документация Spark предлагает запустить ./bin/spark-submit --packages org.apache.spark:spark-avro_2.12:3.1.2 ... , но я не запускаю spark-отправить непосредственно в командной строке — он вызывается из моего кода Python.

Мой вопрос: как мне изменить свой код Python, чтобы задание Spark по-прежнему вызывалось из кода, но также включало отсутствующую внешнюю библиотеку avro at pyspark.sql.avro ?

Ответ №1:

Если я правильно понимаю вопрос, вы ищете переменную окружения

 import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-avro_2.12:3.1.2' 

spark = SparkSession 
    .builder 
    .appName("Avro testing") 
    .getOrCreate()
 

Однако, если вы читаете CSV-файл, вам не следует использовать from_avro

Если вы хотите преобразовать CSV в Avro, вам нужно только to_avro

И если вы запускаете это только локально, на одной машине, я был бы удивлен, если бы Spark был хоть немного быстрее, чем простой подход Python