Не удается записать данные из кафки в hdfs

#apache-spark #pyspark #apache-kafka #parquet #spark-structured-streaming

Вопрос:

Я новичок в искре. Я использую spark версии 2.3.0.cloudera2 и jdk версии 1.8.0_232. Я работаю в проекте, где мне нужно использовать spark stream для чтения потоковых данных из темы кафки в hdfs в формате parquet. Я определяю схему, записываю необработанные данные во фрейм данных и анализирую их. Я могу создавать каталоги в hdfs, но не могу записывать данные в файл parquet. Данные из раздела кафка представлены в формате json. Я не уверен, нужно ли мне включать какие-либо дополнительные параметры в код writestream. Любая помощь будет признательна.

ВХОДНОЙ КОД:

 #Importing Libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pyspark.sql.functions as F


# Establishing Spark Session
spark= SparkSession  
        .builder  
        .appName("StructuredSocketRead")  
        .getOrCreate()
spark.sparkContext.setLogLevel('ERROR')


# Reading data from the given Kafka Server amp; Topic name
raw_data = spark  
        .readStream  
        .format("kafka")  
        .option("kafka.bootstrap.servers","localhost:9092")  
        .option("subscribe","healthcheck")  
        .option("failOnDataLoss","false") 
        .option("startingOffsets", "earliest")  
        .load()
raw_data.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

# Defining Schema
Data_schema =  StructType([
                StructField("heartBeat", IntegerType()),
                StructField("bp", IntegerType()) ,
                StructField("customerId", IntegerType()),
                StructField("timestamp", TimestampType())
                ])

# Casting raw data as string
in_df1 = raw_data.select(from_json(col("value") 
                                    .cast("string") 
                                    , Data_schema).alias("parsed"))

# Parsed DF
in_df2 = in_df1.select("parsed.*")

df_1 = in_df2.select("heartBeat","bp","customerId","timestamp")

#query to write streaming data to parquet
query = in_df2.writeStream.outputMode("Append") 
.format("parquet") 
.option("format","append") 
.option("path","patients_ vital_info") 
.option("checkpointLocation", "patients_vital_info_parquet") 
.trigger(processingTime="1 minute") 
.start()

query_1 = df_1 
.writeStream  
.outputMode("append")  
.format("console")  
.start()

query.awaitTermination()
query_1.awaitTermination()
 

Ниже приведены каталоги, созданные по пути hdfs.

 hadoop fs -ls patients_ vital_info
Found 1 items
drwxr-xr-x   - ec2-user ec2-user          0 2021-05-25 17:44 patients_ vital_info/_spark_metadata
 

Вывод из темы кафки для чтения:

 {"customerId": 4, "heartBeat": 72, "bp": 160}
{"customerId": 5, "heartBeat": 66, "bp": 171}
{"customerId": 1, "heartBeat": 78, "bp": 172}
{"customerId": 2, "heartBeat": 71, "bp": 172}
{"customerId": 3, "heartBeat": 68, "bp": 169}
 

Файл журнала из вывода spark2-отправить

 [ec2-user@ip-172-30-25-220 ~]$ export SPARK_KAFKA_VERSION=0.10
[ec2-user@ip-172-30-25-220 ~]$ spark2-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 kafka_spark_patient_vitals.py
Ivy Default Cache set to: /home/ec2-user/.ivy2/cache
The jars for the packages stored in: /home/ec2-user/.ivy2/jars
:: loading settings :: url = jar:file:/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.apache.spark#spark-sql-kafka-0-10_2.11 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0
        confs: [default]
        found org.apache.spark#spark-sql-kafka-0-10_2.11;2.3.0 in central
        found org.apache.kafka#kafka-clients;0.10.0.1 in central
        found net.jpountz.lz4#lz4;1.3.0 in central
        found org.xerial.snappy#snappy-java;1.1.2.6 in central
        found org.slf4j#slf4j-api;1.7.16 in central
        found org.spark-project.spark#unused;1.0.0 in central
:: resolution report :: resolve 1066ms :: artifacts dl 24ms
        :: modules in use:
        net.jpountz.lz4#lz4;1.3.0 from central in [default]
        org.apache.kafka#kafka-clients;0.10.0.1 from central in [default]
        org.apache.spark#spark-sql-kafka-0-10_2.11;2.3.0 from central in [default]
        org.slf4j#slf4j-api;1.7.16 from central in [default]
        org.spark-project.spark#unused;1.0.0 from central in [default]
        org.xerial.snappy#snappy-java;1.1.2.6 from central in [default]
        ---------------------------------------------------------------------
        |                  |            modules            ||   artifacts   |
        |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
        ---------------------------------------------------------------------
        |      default     |   6   |   0   |   0   |   0   ||   6   |   0   |
        ---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent
        confs: [default]
        0 artifacts copied, 6 already retrieved (0kB/10ms)
21/05/30 18:25:44 INFO spark.SparkContext: Running Spark version 2.3.0.cloudera2
21/05/30 18:25:44 INFO spark.SparkContext: Submitted application: StructuredSocketRead
21/05/30 18:25:44 INFO spark.SecurityManager: Changing view acls to: ec2-user
21/05/30 18:25:44 INFO spark.SecurityManager: Changing modify acls to: ec2-user
21/05/30 18:25:44 INFO spark.SecurityManager: Changing view acls groups to:
21/05/30 18:25:44 INFO spark.SecurityManager: Changing modify acls groups to:
21/05/30 18:25:44 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(ec2-user); groups with view permissions: Set(); users  with modify permissions: Set(ec2-user); groups with modify permissions: Set()
21/05/30 18:25:45 INFO util.Utils: Successfully started service 'sparkDriver' on port 45282.
21/05/30 18:25:45 INFO spark.SparkEnv: Registering MapOutputTracker
21/05/30 18:25:45 INFO spark.SparkEnv: Registering BlockManagerMaster
21/05/30 18:25:45 INFO storage.BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
21/05/30 18:25:45 INFO storage.BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
21/05/30 18:25:45 INFO storage.DiskBlockManager: Created local directory at /tmp/blockmgr-9f529fb8-9527-42d5-a9ce-7d08b5ee9147
21/05/30 18:25:45 INFO memory.MemoryStore: MemoryStore started with capacity 366.3 MB
21/05/30 18:25:46 INFO spark.SparkEnv: Registering OutputCommitCoordinator
21/05/30 18:25:46 INFO util.log: Logging initialized @14948ms
21/05/30 18:25:46 INFO server.Server: jetty-9.3.z-SNAPSHOT
21/05/30 18:25:46 INFO server.Server: Started @15059ms
21/05/30 18:25:46 INFO server.AbstractConnector: Started ServerConnector@268a4ae3{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
21/05/30 18:25:46 INFO util.Utils: Successfully started service 'SparkUI' on port 4040.
21/05/30 18:25:46 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@1ef38de2{/jobs,null,AVAILABLE,@Spark}
21/05/30 18:25:46 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@45baf6cf{/jobs/json,null,AVAILABLE,@Spark}
21/05/30 18:25:46 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@1a1d037c{/jobs/job,null,AVAILABLE,@Spark}
21/05/30 18:25:46 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@1d94a97e{/jobs/job/json,null,AVAILABLE,@Spark}
21/05/30 18:25:46 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@76dd292b{/stages,null,AVAILABLE,@Spark}
21/05/30 18:25:46 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@31a48512{/stages/json,null,AVAILABLE,@Spark}
21/05/30 18:25:46 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@43582072{/stages/stage,null,AVAILABLE,@Spark}
21/05/30 18:25:46 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@7be89d8{/stages/stage/json,null,AVAILABLE,@Spark}
21/05/30 18:25:46 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@5e64296d{/stages/pool,null,AVAILABLE,@Spark}
21/05/30 18:25:46 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@132bb741{/stages/pool/json,null,AVAILABLE,@Spark}
21/05/30 18:25:46 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@225559e1{/storage,null,AVAILABLE,@Spark}
21/05/30 18:25:46 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@4c946773{/storage/json,null,AVAILABLE,@Spark}
21/05/30 18:25:46 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@30949078{/storage/rdd,null,AVAILABLE,@Spark}
21/05/30 18:25:46 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@789a752{/storage/rdd/json,null,AVAILABLE,@Spark}
21/05/30 18:25:46 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@19609de3{/environment,null,AVAILABLE,@Spark}
21/05/30 18:25:46 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@74a14074{/environment/json,null,AVAILABLE,@Spark}
21/05/30 18:25:46 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@7a1893f3{/executors,null,AVAILABLE,@Spark}
21/05/30 18:25:46 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@7c7a9bb9{/executors/json,null,AVAILABLE,@Spark}
21/05/30 18:25:46 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@71d10562{/executors/threadDump,null,AVAILABLE,@Spark}
21/05/30 18:25:46 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@6d070b7b{/executors/threadDump/json,null,AVAILABLE,@Spark}
21/05/30 18:25:46 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@1f64b6af{/static,null,AVAILABLE,@Spark}
21/05/30 18:25:46 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@5f9fd900{/,null,AVAILABLE,@Spark}
21/05/30 18:25:46 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@371e3a17{/api,null,AVAILABLE,@Spark}
21/05/30 18:25:46 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@45ceb74{/jobs/job/kill,null,AVAILABLE,@Spark}
21/05/30 18:25:46 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@e39441{/stages/stage/kill,null,AVAILABLE,@Spark}
21/05/30 18:25:46 INFO ui.SparkUI: Bound SparkUI to 0.0.0.0, and started at http://ip-172-30-25-220.ec2.internal:4040
21/05/30 18:25:47 INFO util.Utils: Using initial executors = 0, max of spark.dynamicAllocation.initialExecutors, spark.dynamicAllocation.minExecutors and spark.executor.instances
21/05/30 18:25:48 INFO client.RMProxy: Connecting to ResourceManager at ip-172-30-25-220.ec2.internal/172.30.25.220:8032
21/05/30 18:25:48 INFO yarn.Client: Requesting a new application from cluster with 1 NodeManagers
21/05/30 18:25:48 INFO yarn.Client: Verifying our application has not requested more than the maximum memory capability of the cluster (8192 MB per container)
21/05/30 18:25:48 INFO yarn.Client: Will allocate AM container, with 896 MB memory including 384 MB overhead
21/05/30 18:25:48 INFO yarn.Client: Setting up container launch context for our AM
21/05/30 18:25:48 INFO yarn.Client: Setting up the launch environment for our AM container
21/05/30 18:25:48 INFO yarn.Client: Preparing resources for our AM container
21/05/30 18:25:48 INFO yarn.Client: Uploading resource file:/home/ec2-user/.ivy2/jars/org.apache.spark_spark-sql-kafka-0-10_2.11-2.3.0.jar -> hdfs://ip-172-30-25-220.ec2.internal:8020/user/ec2-user/.sparkStaging/application_1622393510796_0005/org.apache.spark_spark-sql-kafka-0-10_2.11-2.3.0.jar
21/05/30 18:25:49 INFO yarn.Client: Uploading resource file:/home/ec2-user/.ivy2/jars/org.apache.kafka_kafka-clients-0.10.0.1.jar -> hdfs://ip-172-30-25-220.ec2.internal:8020/user/ec2-user/.sparkStaging/application_1622393510796_0005/org.apache.kafka_kafka-clients-0.10.0.1.jar
21/05/30 18:25:49 INFO yarn.Client: Uploading resource file:/home/ec2-user/.ivy2/jars/org.spark-project.spark_unused-1.0.0.jar -> hdfs://ip-172-30-25-220.ec2.internal:8020/user/ec2-user/.sparkStaging/application_1622393510796_0005/org.spark-project.spark_unused-1.0.0.jar
21/05/30 18:25:49 INFO yarn.Client: Uploading resource file:/home/ec2-user/.ivy2/jars/net.jpountz.lz4_lz4-1.3.0.jar -> hdfs://ip-172-30-25-220.ec2.internal:8020/user/ec2-user/.sparkStaging/application_1622393510796_0005/net.jpountz.lz4_lz4-1.3.0.jar
21/05/30 18:25:50 INFO yarn.Client: Uploading resource file:/home/ec2-user/.ivy2/jars/org.xerial.snappy_snappy-java-1.1.2.6.jar -> hdfs://ip-172-30-25-220.ec2.internal:8020/user/ec2-user/.sparkStaging/application_1622393510796_0005/org.xerial.snappy_snappy-java-1.1.2.6.jar
21/05/30 18:25:50 INFO yarn.Client: Uploading resource file:/home/ec2-user/.ivy2/jars/org.slf4j_slf4j-api-1.7.16.jar -> hdfs://ip-172-30-25-220.ec2.internal:8020/user/ec2-user/.sparkStaging/application_1622393510796_0005/org.slf4j_slf4j-api-1.7.16.jar
21/05/30 18:25:50 WARN yarn.Client: Same path resource file:///home/ec2-user/.ivy2/jars/org.apache.spark_spark-sql-kafka-0-10_2.11-2.3.0.jar added multiple times to distributed cache.
21/05/30 18:25:50 WARN yarn.Client: Same path resource file:///home/ec2-user/.ivy2/jars/org.apache.kafka_kafka-clients-0.10.0.1.jar added multiple times to distributed cache.
21/05/30 18:25:50 WARN yarn.Client: Same path resource file:///home/ec2-user/.ivy2/jars/org.spark-project.spark_unused-1.0.0.jar added multiple times to distributed cache.
21/05/30 18:25:50 WARN yarn.Client: Same path resource file:///home/ec2-user/.ivy2/jars/net.jpountz.lz4_lz4-1.3.0.jar added multiple times to distributed cache.
21/05/30 18:25:50 WARN yarn.Client: Same path resource file:///home/ec2-user/.ivy2/jars/org.xerial.snappy_snappy-java-1.1.2.6.jar added multiple times to distributed cache.
21/05/30 18:25:50 WARN yarn.Client: Same path resource file:///home/ec2-user/.ivy2/jars/org.slf4j_slf4j-api-1.7.16.jar added multiple times to distributed cache.
21/05/30 18:25:51 INFO yarn.Client: Uploading resource file:/tmp/spark-5eef8b91-a5bc-4d8f-9c6e-05f413b7d5bb/__spark_conf__4453488105854780815.zip -> hdfs://ip-172-30-25-220.ec2.internal:8020/user/ec2-user/.sparkStaging/application_1622393510796_0005/__spark_conf__.zip
21/05/30 18:25:51 INFO spark.SecurityManager: Changing view acls to: ec2-user
21/05/30 18:25:51 INFO spark.SecurityManager: Changing modify acls to: ec2-user
21/05/30 18:25:51 INFO spark.SecurityManager: Changing view acls groups to:
21/05/30 18:25:51 INFO spark.SecurityManager: Changing modify acls groups to:
21/05/30 18:25:51 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(ec2-user); groups with view permissions: Set(); users  with modify permissions: Set(ec2-user); groups with modify permissions: Set()
21/05/30 18:25:53 INFO yarn.Client: Submitting application application_1622393510796_0005 to ResourceManager
21/05/30 18:25:53 INFO impl.YarnClientImpl: Submitted application application_1622393510796_0005
21/05/30 18:25:53 INFO cluster.SchedulerExtensionServices: Starting Yarn extension services with app application_1622393510796_0005 and attemptId None
21/05/30 18:25:54 INFO yarn.Client: Application report for application_1622393510796_0005 (state: ACCEPTED)
21/05/30 18:25:54 INFO yarn.Client:
         client token: N/A
         diagnostics: N/A
         ApplicationMaster host: N/A
         ApplicationMaster RPC port: -1
         queue: root.users.ec2-user
         start time: 1622399153678
         final status: UNDEFINED
         tracking URL: http://ip-172-30-25-220.ec2.internal:8088/proxy/application_1622393510796_0005/
         user: ec2-user
21/05/30 18:25:55 INFO yarn.Client: Application report for application_1622393510796_0005 (state: ACCEPTED)
21/05/30 18:25:56 INFO yarn.Client: Application report for application_1622393510796_0005 (state: ACCEPTED)
21/05/30 18:25:57 INFO yarn.Client: Application report for application_1622393510796_0005 (state: ACCEPTED)
21/05/30 18:25:58 INFO yarn.Client: Application report for application_1622393510796_0005 (state: ACCEPTED)
21/05/30 18:25:59 INFO yarn.Client: Application report for application_1622393510796_0005 (state: ACCEPTED)
21/05/30 18:26:01 INFO yarn.Client: Application report for application_1622393510796_0005 (state: ACCEPTED)
21/05/30 18:26:02 INFO yarn.Client: Application report for application_1622393510796_0005 (state: ACCEPTED)
21/05/30 18:26:03 INFO yarn.Client: Application report for application_1622393510796_0005 (state: ACCEPTED)
21/05/30 18:26:03 INFO cluster.YarnClientSchedulerBackend: Add WebUI Filter. org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter, Map(PROXY_HOSTS -> ip-172-30-25-220.ec2.internal, PROXY_URI_BASES -> http://ip-172-30-25-220.ec2.internal:8088/proxy/application_1622393510796_0005), /proxy/application_1622393510796_0005
21/05/30 18:26:03 INFO ui.JettyUtils: Adding filter: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
21/05/30 18:26:04 INFO yarn.Client: Application report for application_1622393510796_0005 (state: ACCEPTED)
21/05/30 18:26:04 INFO cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: ApplicationMaster registered as NettyRpcEndpointRef(spark-client://YarnAM)
21/05/30 18:26:05 INFO yarn.Client: Application report for application_1622393510796_0005 (state: RUNNING)
21/05/30 18:26:05 INFO yarn.Client:
         client token: N/A
         diagnostics: N/A
         ApplicationMaster host: 172.30.25.220
         ApplicationMaster RPC port: 0
         queue: root.users.ec2-user
         start time: 1622399153678
         final status: UNDEFINED
         tracking URL: http://ip-172-30-25-220.ec2.internal:8088/proxy/application_1622393510796_0005/
         user: ec2-user
21/05/30 18:26:05 INFO cluster.YarnClientSchedulerBackend: Application application_1622393510796_0005 has started running.
21/05/30 18:26:05 INFO util.Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 39624.
21/05/30 18:26:05 INFO netty.NettyBlockTransferService: Server created on ip-172-30-25-220.ec2.internal:39624
21/05/30 18:26:05 INFO storage.BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
21/05/30 18:26:05 INFO storage.BlockManagerMaster: Registering BlockManager BlockManagerId(driver, ip-172-30-25-220.ec2.internal, 39624, None)
21/05/30 18:26:05 INFO storage.BlockManagerMasterEndpoint: Registering block manager ip-172-30-25-220.ec2.internal:39624 with 366.3 MB RAM, BlockManagerId(driver, ip-172-30-25-220.ec2.internal, 39624, None)
21/05/30 18:26:05 INFO storage.BlockManagerMaster: Registered BlockManager BlockManagerId(driver, ip-172-30-25-220.ec2.internal, 39624, None)
21/05/30 18:26:05 INFO storage.BlockManager: external shuffle service port = 7337
21/05/30 18:26:05 INFO storage.BlockManager: Initialized BlockManager: BlockManagerId(driver, ip-172-30-25-220.ec2.internal, 39624, None)
21/05/30 18:26:05 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@8d5187e{/metrics/json,null,AVAILABLE,@Spark}
21/05/30 18:26:05 INFO scheduler.EventLoggingListener: Logging events to hdfs://ip-172-30-25-220.ec2.internal:8020/user/spark/spark2ApplicationHistory/application_1622393510796_0005
21/05/30 18:26:05 INFO util.Utils: Using initial executors = 0, max of spark.dynamicAllocation.initialExecutors, spark.dynamicAllocation.minExecutors and spark.executor.instances
21/05/30 18:26:05 INFO spark.SparkContext: Registered listener com.cloudera.spark.lineage.NavigatorAppListener
21/05/30 18:26:05 INFO cluster.YarnClientSchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.8
21/05/30 18:26:07 INFO internal.SharedState: loading hive config file: file:/etc/spark2/conf.cloudera.spark2_on_yarn/yarn-conf/hive-site.xml
21/05/30 18:26:07 INFO internal.SharedState: spark.sql.warehouse.dir is not set, but hive.metastore.warehouse.dir is set. Setting spark.sql.warehouse.dir to the value of hive.metastore.warehouse.dir ('/user/hive/warehouse').
21/05/30 18:26:07 INFO internal.SharedState: Warehouse path is '/user/hive/warehouse'.
21/05/30 18:26:07 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@78c44c49{/SQL,null,AVAILABLE,@Spark}
21/05/30 18:26:07 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@492f0586{/SQL/json,null,AVAILABLE,@Spark}
21/05/30 18:26:07 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@486064db{/SQL/execution,null,AVAILABLE,@Spark}
21/05/30 18:26:07 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@78259e96{/SQL/execution/json,null,AVAILABLE,@Spark}
21/05/30 18:26:07 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@56063ce9{/static/sql,null,AVAILABLE,@Spark}
21/05/30 18:26:08 INFO state.StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint
 

Комментарии:

1. Похоже, вы перепутали «patients_vital_info_parquet» с «patients_ vital_info». Файлы паркета записываются в расположение, указанное опцией path , а не в расположение контрольной точки. Примечание . Строка в параметре путь содержит пробел.

2. Я также не смог найти никаких файлов паркета в каталоге path

3. Почему у вас есть .option("format","append") ? Вы пробовали удалить эту строку? Общие журналы включают журналы запуска заданий, можете ли вы поделиться журналами после развертывания приложения в клиентском режиме, например spark2-submit --deploy-mode client --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 kafka_spark_patient_vitals.py . Или записывайте свои журналы из диспетчера cloudera или журналов emr