Как я могу выполнить PySpark Stream с Azure Storage directory в качестве источника потоковых данных

#azure #pyspark #azure-blob-storage #spark-streaming

#azure #pyspark #azure-blob-хранилище #spark-streaming

Вопрос:

Я хотел выполнить задания Spark Stream с Azure Blob Storage в качестве источника для моего потока. Как я могу это сделать с помощью Python

Комментарии:

1. Пожалуйста, отредактируйте свой вопрос, чтобы показать свою работу: что вы пробовали, где вы застряли, ошибки, проблемы с выводом и т.д. В настоящее время это слишком широко, без каких-либо подробностей.

2. Пожалуйста, предоставьте достаточно кода, чтобы другие могли лучше понять или воспроизвести проблему.

Ответ №1:

Мы можем запускать задания spark в Azure batch с помощью Azure container и blob-сервиса. Пакет Azure используется для выполнения заданий, поскольку они имеют низкую стоимость.

Для этого нам потребуется несколько необходимых настроек, таких как учетная запись хранилища, реестр контейнеров и пакет Azure для выполнения заданий.

Ниже приведен пример кода python для запуска простого задания spark:

 import argparse  
  
from pyspark.sql import SparkSession  
  
import config  
  
  
def get_azure_spark_connection(storage_account_name, storage_account_key):  
    spark = (  
        SparkSession.builder  
            .config('spark.jars.packages', 'org.apache.hadoop:hadoop-azure:2.7.3')  
            .config('spark.hadoop.fs.azure', "org.apache.hadoop.fs.azure.NativeAzureFileSystem")  
            .config("spark.hadoop.fs.azure.account.key."   storage_account_name   ".blob.core.windows.net",  
                    storage_account_key)  
            .appName("AzureSparkDemo")  
            .getOrCreate())  
  
    (spark.sparkContext._jsc.hadoopConfiguration().set("fs.wasbs.impl",  
                                                       "org.apache.hadoop.fs.azure.NativeAzureFileSystem"))  
    return spark  
  
  
if __name__ == '__main__':  
    parser = argparse.ArgumentParser()  
    parser.add_argument("-i", "--input", help="input file to parse", type=str)  
    parser.add_argument("-o", "--output", help="result file to write", type=str)  
    args = parser.parse_args()  
    spark = get_azure_spark_connection(config.STORAGE_ACCOUNT_NAME, config.STORAGE_ACCOUNT_KEY)  
    df = (spark.read.option("header", "true")  
          .option("delimiter", ",")  
          .option("inferSchema", "true")  
          .csv(args.input))  
    df.registerTempTable("airlines")  
    result = spark.sql("""  
      select Year, Month, DayofMonth, _avg_(ArrDelay) as avg_ArrDelay, _avg_(DepDelay) as avg_DepDelay  
      from airlines   
      group by Year, Month, DayofMonth  
""")  
    result.repartition(1).write.mode("overwrite").parquet(args.output)
 

Ниже приведены требования, которые используются:

 azure  
azure-storage  
azure-storage-blob  
pyspark==2.4.0
 

Вы можете обратиться к этим блогам, чтобы узнать больше о выполнении заданий с Azure Storage с использованием python.