#python #csv #apache-spark #pyspark #avro
Вопрос:
У меня есть скрипт на Python, который использует fastavro
библиотеку для преобразования csv-файла и сериализует его в соответствии с предоставленной схемой:
from fastavro import writer
from fastavro.schema import load_schema
import csv
schema = load_schema('schema.avsc')
def csv_reader():
with open('data.csv') as f:
yield from csv.DictReader(f)
with open('data.snappy.avro', 'wb') as out:
writer(out, schema, csv_reader(), codec='snappy')
Вышесказанное прекрасно работает с небольшими файлами, но ужасно медленно с большими файлами. Сериализация csv — файла размером 185 МБ заняла 4,5 минуты, а размер некоторых моих файлов приближается к 5 ГБ.
Итак, я решил проверить и посмотреть, как Spark обрабатывает преобразования csv в avro с помощью PySpark 2.4.3:
from pyspark.sql import SparkSession
from pyspark.sql.avro.functions import from_avro, to_avro
spark = SparkSession
.builder
.appName("Avro testing")
.getOrCreate()
schema = open("schema.avsc", "r").read()
df = spark.read.csv(path="/data/data.csv",
header=True)
output = df
.select(from_avro("value", schema).alias("user"))
.where('user.favorite_color == "red"')
.select(to_avro("user.name").alias("value"))
но это возвращает следующую ошибку:
ModuleNotFoundError: Нет модуля с именем «pyspark.sql.avro»
Хорошо, я понимаю, что библиотека avro не включена по умолчанию, и имеет смысл, что я получаю ошибку.
Документация Spark предлагает запустить ./bin/spark-submit --packages org.apache.spark:spark-avro_2.12:3.1.2 ...
, но я не запускаю spark-отправить непосредственно в командной строке — он вызывается из моего кода Python.
Мой вопрос: как мне изменить свой код Python, чтобы задание Spark по-прежнему вызывалось из кода, но также включало отсутствующую внешнюю библиотеку avro at pyspark.sql.avro
?
Ответ №1:
Если я правильно понимаю вопрос, вы ищете переменную окружения
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-avro_2.12:3.1.2'
spark = SparkSession
.builder
.appName("Avro testing")
.getOrCreate()
Однако, если вы читаете CSV-файл, вам не следует использовать from_avro
Если вы хотите преобразовать CSV в Avro, вам нужно только to_avro
И если вы запускаете это только локально, на одной машине, я был бы удивлен, если бы Spark был хоть немного быстрее, чем простой подход Python