#apache-spark #pyspark #apache-kafka #parquet #spark-structured-streaming
Вопрос:
Я новичок в искре. Я использую spark версии 2.3.0.cloudera2 и jdk версии 1.8.0_232. Я работаю в проекте, где мне нужно использовать spark stream для чтения потоковых данных из темы кафки в hdfs в формате parquet. Я определяю схему, записываю необработанные данные во фрейм данных и анализирую их. Я могу создавать каталоги в hdfs, но не могу записывать данные в файл parquet. Данные из раздела кафка представлены в формате json. Я не уверен, нужно ли мне включать какие-либо дополнительные параметры в код writestream. Любая помощь будет признательна.
ВХОДНОЙ КОД:
#Importing Libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pyspark.sql.functions as F
# Establishing Spark Session
spark= SparkSession
.builder
.appName("StructuredSocketRead")
.getOrCreate()
spark.sparkContext.setLogLevel('ERROR')
# Reading data from the given Kafka Server amp; Topic name
raw_data = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers","localhost:9092")
.option("subscribe","healthcheck")
.option("failOnDataLoss","false")
.option("startingOffsets", "earliest")
.load()
raw_data.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
# Defining Schema
Data_schema = StructType([
StructField("heartBeat", IntegerType()),
StructField("bp", IntegerType()) ,
StructField("customerId", IntegerType()),
StructField("timestamp", TimestampType())
])
# Casting raw data as string
in_df1 = raw_data.select(from_json(col("value")
.cast("string")
, Data_schema).alias("parsed"))
# Parsed DF
in_df2 = in_df1.select("parsed.*")
df_1 = in_df2.select("heartBeat","bp","customerId","timestamp")
#query to write streaming data to parquet
query = in_df2.writeStream.outputMode("Append")
.format("parquet")
.option("format","append")
.option("path","patients_ vital_info")
.option("checkpointLocation", "patients_vital_info_parquet")
.trigger(processingTime="1 minute")
.start()
query_1 = df_1
.writeStream
.outputMode("append")
.format("console")
.start()
query.awaitTermination()
query_1.awaitTermination()
Ниже приведены каталоги, созданные по пути hdfs.
hadoop fs -ls patients_ vital_info
Found 1 items
drwxr-xr-x - ec2-user ec2-user 0 2021-05-25 17:44 patients_ vital_info/_spark_metadata
Вывод из темы кафки для чтения:
{"customerId": 4, "heartBeat": 72, "bp": 160}
{"customerId": 5, "heartBeat": 66, "bp": 171}
{"customerId": 1, "heartBeat": 78, "bp": 172}
{"customerId": 2, "heartBeat": 71, "bp": 172}
{"customerId": 3, "heartBeat": 68, "bp": 169}
Файл журнала из вывода spark2-отправить
[ec2-user@ip-172-30-25-220 ~]$ export SPARK_KAFKA_VERSION=0.10
[ec2-user@ip-172-30-25-220 ~]$ spark2-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 kafka_spark_patient_vitals.py
Ivy Default Cache set to: /home/ec2-user/.ivy2/cache
The jars for the packages stored in: /home/ec2-user/.ivy2/jars
:: loading settings :: url = jar:file:/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.apache.spark#spark-sql-kafka-0-10_2.11 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0
confs: [default]
found org.apache.spark#spark-sql-kafka-0-10_2.11;2.3.0 in central
found org.apache.kafka#kafka-clients;0.10.0.1 in central
found net.jpountz.lz4#lz4;1.3.0 in central
found org.xerial.snappy#snappy-java;1.1.2.6 in central
found org.slf4j#slf4j-api;1.7.16 in central
found org.spark-project.spark#unused;1.0.0 in central
:: resolution report :: resolve 1066ms :: artifacts dl 24ms
:: modules in use:
net.jpountz.lz4#lz4;1.3.0 from central in [default]
org.apache.kafka#kafka-clients;0.10.0.1 from central in [default]
org.apache.spark#spark-sql-kafka-0-10_2.11;2.3.0 from central in [default]
org.slf4j#slf4j-api;1.7.16 from central in [default]
org.spark-project.spark#unused;1.0.0 from central in [default]
org.xerial.snappy#snappy-java;1.1.2.6 from central in [default]
---------------------------------------------------------------------
| | modules || artifacts |
| conf | number| search|dwnlded|evicted|| number|dwnlded|
---------------------------------------------------------------------
| default | 6 | 0 | 0 | 0 || 6 | 0 |
---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent
confs: [default]
0 artifacts copied, 6 already retrieved (0kB/10ms)
21/05/30 18:25:44 INFO spark.SparkContext: Running Spark version 2.3.0.cloudera2
21/05/30 18:25:44 INFO spark.SparkContext: Submitted application: StructuredSocketRead
21/05/30 18:25:44 INFO spark.SecurityManager: Changing view acls to: ec2-user
21/05/30 18:25:44 INFO spark.SecurityManager: Changing modify acls to: ec2-user
21/05/30 18:25:44 INFO spark.SecurityManager: Changing view acls groups to:
21/05/30 18:25:44 INFO spark.SecurityManager: Changing modify acls groups to:
21/05/30 18:25:44 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(ec2-user); groups with view permissions: Set(); users with modify permissions: Set(ec2-user); groups with modify permissions: Set()
21/05/30 18:25:45 INFO util.Utils: Successfully started service 'sparkDriver' on port 45282.
21/05/30 18:25:45 INFO spark.SparkEnv: Registering MapOutputTracker
21/05/30 18:25:45 INFO spark.SparkEnv: Registering BlockManagerMaster
21/05/30 18:25:45 INFO storage.BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
21/05/30 18:25:45 INFO storage.BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
21/05/30 18:25:45 INFO storage.DiskBlockManager: Created local directory at /tmp/blockmgr-9f529fb8-9527-42d5-a9ce-7d08b5ee9147
21/05/30 18:25:45 INFO memory.MemoryStore: MemoryStore started with capacity 366.3 MB
21/05/30 18:25:46 INFO spark.SparkEnv: Registering OutputCommitCoordinator
21/05/30 18:25:46 INFO util.log: Logging initialized @14948ms
21/05/30 18:25:46 INFO server.Server: jetty-9.3.z-SNAPSHOT
21/05/30 18:25:46 INFO server.Server: Started @15059ms
21/05/30 18:25:46 INFO server.AbstractConnector: Started ServerConnector@268a4ae3{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
21/05/30 18:25:46 INFO util.Utils: Successfully started service 'SparkUI' on port 4040.
21/05/30 18:25:46 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@1ef38de2{/jobs,null,AVAILABLE,@Spark}
21/05/30 18:25:46 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@45baf6cf{/jobs/json,null,AVAILABLE,@Spark}
21/05/30 18:25:46 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@1a1d037c{/jobs/job,null,AVAILABLE,@Spark}
21/05/30 18:25:46 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@1d94a97e{/jobs/job/json,null,AVAILABLE,@Spark}
21/05/30 18:25:46 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@76dd292b{/stages,null,AVAILABLE,@Spark}
21/05/30 18:25:46 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@31a48512{/stages/json,null,AVAILABLE,@Spark}
21/05/30 18:25:46 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@43582072{/stages/stage,null,AVAILABLE,@Spark}
21/05/30 18:25:46 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@7be89d8{/stages/stage/json,null,AVAILABLE,@Spark}
21/05/30 18:25:46 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@5e64296d{/stages/pool,null,AVAILABLE,@Spark}
21/05/30 18:25:46 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@132bb741{/stages/pool/json,null,AVAILABLE,@Spark}
21/05/30 18:25:46 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@225559e1{/storage,null,AVAILABLE,@Spark}
21/05/30 18:25:46 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@4c946773{/storage/json,null,AVAILABLE,@Spark}
21/05/30 18:25:46 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@30949078{/storage/rdd,null,AVAILABLE,@Spark}
21/05/30 18:25:46 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@789a752{/storage/rdd/json,null,AVAILABLE,@Spark}
21/05/30 18:25:46 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@19609de3{/environment,null,AVAILABLE,@Spark}
21/05/30 18:25:46 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@74a14074{/environment/json,null,AVAILABLE,@Spark}
21/05/30 18:25:46 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@7a1893f3{/executors,null,AVAILABLE,@Spark}
21/05/30 18:25:46 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@7c7a9bb9{/executors/json,null,AVAILABLE,@Spark}
21/05/30 18:25:46 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@71d10562{/executors/threadDump,null,AVAILABLE,@Spark}
21/05/30 18:25:46 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@6d070b7b{/executors/threadDump/json,null,AVAILABLE,@Spark}
21/05/30 18:25:46 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@1f64b6af{/static,null,AVAILABLE,@Spark}
21/05/30 18:25:46 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@5f9fd900{/,null,AVAILABLE,@Spark}
21/05/30 18:25:46 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@371e3a17{/api,null,AVAILABLE,@Spark}
21/05/30 18:25:46 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@45ceb74{/jobs/job/kill,null,AVAILABLE,@Spark}
21/05/30 18:25:46 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@e39441{/stages/stage/kill,null,AVAILABLE,@Spark}
21/05/30 18:25:46 INFO ui.SparkUI: Bound SparkUI to 0.0.0.0, and started at http://ip-172-30-25-220.ec2.internal:4040
21/05/30 18:25:47 INFO util.Utils: Using initial executors = 0, max of spark.dynamicAllocation.initialExecutors, spark.dynamicAllocation.minExecutors and spark.executor.instances
21/05/30 18:25:48 INFO client.RMProxy: Connecting to ResourceManager at ip-172-30-25-220.ec2.internal/172.30.25.220:8032
21/05/30 18:25:48 INFO yarn.Client: Requesting a new application from cluster with 1 NodeManagers
21/05/30 18:25:48 INFO yarn.Client: Verifying our application has not requested more than the maximum memory capability of the cluster (8192 MB per container)
21/05/30 18:25:48 INFO yarn.Client: Will allocate AM container, with 896 MB memory including 384 MB overhead
21/05/30 18:25:48 INFO yarn.Client: Setting up container launch context for our AM
21/05/30 18:25:48 INFO yarn.Client: Setting up the launch environment for our AM container
21/05/30 18:25:48 INFO yarn.Client: Preparing resources for our AM container
21/05/30 18:25:48 INFO yarn.Client: Uploading resource file:/home/ec2-user/.ivy2/jars/org.apache.spark_spark-sql-kafka-0-10_2.11-2.3.0.jar -> hdfs://ip-172-30-25-220.ec2.internal:8020/user/ec2-user/.sparkStaging/application_1622393510796_0005/org.apache.spark_spark-sql-kafka-0-10_2.11-2.3.0.jar
21/05/30 18:25:49 INFO yarn.Client: Uploading resource file:/home/ec2-user/.ivy2/jars/org.apache.kafka_kafka-clients-0.10.0.1.jar -> hdfs://ip-172-30-25-220.ec2.internal:8020/user/ec2-user/.sparkStaging/application_1622393510796_0005/org.apache.kafka_kafka-clients-0.10.0.1.jar
21/05/30 18:25:49 INFO yarn.Client: Uploading resource file:/home/ec2-user/.ivy2/jars/org.spark-project.spark_unused-1.0.0.jar -> hdfs://ip-172-30-25-220.ec2.internal:8020/user/ec2-user/.sparkStaging/application_1622393510796_0005/org.spark-project.spark_unused-1.0.0.jar
21/05/30 18:25:49 INFO yarn.Client: Uploading resource file:/home/ec2-user/.ivy2/jars/net.jpountz.lz4_lz4-1.3.0.jar -> hdfs://ip-172-30-25-220.ec2.internal:8020/user/ec2-user/.sparkStaging/application_1622393510796_0005/net.jpountz.lz4_lz4-1.3.0.jar
21/05/30 18:25:50 INFO yarn.Client: Uploading resource file:/home/ec2-user/.ivy2/jars/org.xerial.snappy_snappy-java-1.1.2.6.jar -> hdfs://ip-172-30-25-220.ec2.internal:8020/user/ec2-user/.sparkStaging/application_1622393510796_0005/org.xerial.snappy_snappy-java-1.1.2.6.jar
21/05/30 18:25:50 INFO yarn.Client: Uploading resource file:/home/ec2-user/.ivy2/jars/org.slf4j_slf4j-api-1.7.16.jar -> hdfs://ip-172-30-25-220.ec2.internal:8020/user/ec2-user/.sparkStaging/application_1622393510796_0005/org.slf4j_slf4j-api-1.7.16.jar
21/05/30 18:25:50 WARN yarn.Client: Same path resource file:///home/ec2-user/.ivy2/jars/org.apache.spark_spark-sql-kafka-0-10_2.11-2.3.0.jar added multiple times to distributed cache.
21/05/30 18:25:50 WARN yarn.Client: Same path resource file:///home/ec2-user/.ivy2/jars/org.apache.kafka_kafka-clients-0.10.0.1.jar added multiple times to distributed cache.
21/05/30 18:25:50 WARN yarn.Client: Same path resource file:///home/ec2-user/.ivy2/jars/org.spark-project.spark_unused-1.0.0.jar added multiple times to distributed cache.
21/05/30 18:25:50 WARN yarn.Client: Same path resource file:///home/ec2-user/.ivy2/jars/net.jpountz.lz4_lz4-1.3.0.jar added multiple times to distributed cache.
21/05/30 18:25:50 WARN yarn.Client: Same path resource file:///home/ec2-user/.ivy2/jars/org.xerial.snappy_snappy-java-1.1.2.6.jar added multiple times to distributed cache.
21/05/30 18:25:50 WARN yarn.Client: Same path resource file:///home/ec2-user/.ivy2/jars/org.slf4j_slf4j-api-1.7.16.jar added multiple times to distributed cache.
21/05/30 18:25:51 INFO yarn.Client: Uploading resource file:/tmp/spark-5eef8b91-a5bc-4d8f-9c6e-05f413b7d5bb/__spark_conf__4453488105854780815.zip -> hdfs://ip-172-30-25-220.ec2.internal:8020/user/ec2-user/.sparkStaging/application_1622393510796_0005/__spark_conf__.zip
21/05/30 18:25:51 INFO spark.SecurityManager: Changing view acls to: ec2-user
21/05/30 18:25:51 INFO spark.SecurityManager: Changing modify acls to: ec2-user
21/05/30 18:25:51 INFO spark.SecurityManager: Changing view acls groups to:
21/05/30 18:25:51 INFO spark.SecurityManager: Changing modify acls groups to:
21/05/30 18:25:51 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(ec2-user); groups with view permissions: Set(); users with modify permissions: Set(ec2-user); groups with modify permissions: Set()
21/05/30 18:25:53 INFO yarn.Client: Submitting application application_1622393510796_0005 to ResourceManager
21/05/30 18:25:53 INFO impl.YarnClientImpl: Submitted application application_1622393510796_0005
21/05/30 18:25:53 INFO cluster.SchedulerExtensionServices: Starting Yarn extension services with app application_1622393510796_0005 and attemptId None
21/05/30 18:25:54 INFO yarn.Client: Application report for application_1622393510796_0005 (state: ACCEPTED)
21/05/30 18:25:54 INFO yarn.Client:
client token: N/A
diagnostics: N/A
ApplicationMaster host: N/A
ApplicationMaster RPC port: -1
queue: root.users.ec2-user
start time: 1622399153678
final status: UNDEFINED
tracking URL: http://ip-172-30-25-220.ec2.internal:8088/proxy/application_1622393510796_0005/
user: ec2-user
21/05/30 18:25:55 INFO yarn.Client: Application report for application_1622393510796_0005 (state: ACCEPTED)
21/05/30 18:25:56 INFO yarn.Client: Application report for application_1622393510796_0005 (state: ACCEPTED)
21/05/30 18:25:57 INFO yarn.Client: Application report for application_1622393510796_0005 (state: ACCEPTED)
21/05/30 18:25:58 INFO yarn.Client: Application report for application_1622393510796_0005 (state: ACCEPTED)
21/05/30 18:25:59 INFO yarn.Client: Application report for application_1622393510796_0005 (state: ACCEPTED)
21/05/30 18:26:01 INFO yarn.Client: Application report for application_1622393510796_0005 (state: ACCEPTED)
21/05/30 18:26:02 INFO yarn.Client: Application report for application_1622393510796_0005 (state: ACCEPTED)
21/05/30 18:26:03 INFO yarn.Client: Application report for application_1622393510796_0005 (state: ACCEPTED)
21/05/30 18:26:03 INFO cluster.YarnClientSchedulerBackend: Add WebUI Filter. org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter, Map(PROXY_HOSTS -> ip-172-30-25-220.ec2.internal, PROXY_URI_BASES -> http://ip-172-30-25-220.ec2.internal:8088/proxy/application_1622393510796_0005), /proxy/application_1622393510796_0005
21/05/30 18:26:03 INFO ui.JettyUtils: Adding filter: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
21/05/30 18:26:04 INFO yarn.Client: Application report for application_1622393510796_0005 (state: ACCEPTED)
21/05/30 18:26:04 INFO cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: ApplicationMaster registered as NettyRpcEndpointRef(spark-client://YarnAM)
21/05/30 18:26:05 INFO yarn.Client: Application report for application_1622393510796_0005 (state: RUNNING)
21/05/30 18:26:05 INFO yarn.Client:
client token: N/A
diagnostics: N/A
ApplicationMaster host: 172.30.25.220
ApplicationMaster RPC port: 0
queue: root.users.ec2-user
start time: 1622399153678
final status: UNDEFINED
tracking URL: http://ip-172-30-25-220.ec2.internal:8088/proxy/application_1622393510796_0005/
user: ec2-user
21/05/30 18:26:05 INFO cluster.YarnClientSchedulerBackend: Application application_1622393510796_0005 has started running.
21/05/30 18:26:05 INFO util.Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 39624.
21/05/30 18:26:05 INFO netty.NettyBlockTransferService: Server created on ip-172-30-25-220.ec2.internal:39624
21/05/30 18:26:05 INFO storage.BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
21/05/30 18:26:05 INFO storage.BlockManagerMaster: Registering BlockManager BlockManagerId(driver, ip-172-30-25-220.ec2.internal, 39624, None)
21/05/30 18:26:05 INFO storage.BlockManagerMasterEndpoint: Registering block manager ip-172-30-25-220.ec2.internal:39624 with 366.3 MB RAM, BlockManagerId(driver, ip-172-30-25-220.ec2.internal, 39624, None)
21/05/30 18:26:05 INFO storage.BlockManagerMaster: Registered BlockManager BlockManagerId(driver, ip-172-30-25-220.ec2.internal, 39624, None)
21/05/30 18:26:05 INFO storage.BlockManager: external shuffle service port = 7337
21/05/30 18:26:05 INFO storage.BlockManager: Initialized BlockManager: BlockManagerId(driver, ip-172-30-25-220.ec2.internal, 39624, None)
21/05/30 18:26:05 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@8d5187e{/metrics/json,null,AVAILABLE,@Spark}
21/05/30 18:26:05 INFO scheduler.EventLoggingListener: Logging events to hdfs://ip-172-30-25-220.ec2.internal:8020/user/spark/spark2ApplicationHistory/application_1622393510796_0005
21/05/30 18:26:05 INFO util.Utils: Using initial executors = 0, max of spark.dynamicAllocation.initialExecutors, spark.dynamicAllocation.minExecutors and spark.executor.instances
21/05/30 18:26:05 INFO spark.SparkContext: Registered listener com.cloudera.spark.lineage.NavigatorAppListener
21/05/30 18:26:05 INFO cluster.YarnClientSchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.8
21/05/30 18:26:07 INFO internal.SharedState: loading hive config file: file:/etc/spark2/conf.cloudera.spark2_on_yarn/yarn-conf/hive-site.xml
21/05/30 18:26:07 INFO internal.SharedState: spark.sql.warehouse.dir is not set, but hive.metastore.warehouse.dir is set. Setting spark.sql.warehouse.dir to the value of hive.metastore.warehouse.dir ('/user/hive/warehouse').
21/05/30 18:26:07 INFO internal.SharedState: Warehouse path is '/user/hive/warehouse'.
21/05/30 18:26:07 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@78c44c49{/SQL,null,AVAILABLE,@Spark}
21/05/30 18:26:07 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@492f0586{/SQL/json,null,AVAILABLE,@Spark}
21/05/30 18:26:07 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@486064db{/SQL/execution,null,AVAILABLE,@Spark}
21/05/30 18:26:07 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@78259e96{/SQL/execution/json,null,AVAILABLE,@Spark}
21/05/30 18:26:07 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@56063ce9{/static/sql,null,AVAILABLE,@Spark}
21/05/30 18:26:08 INFO state.StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint
Комментарии:
1. Похоже, вы перепутали «patients_vital_info_parquet» с «patients_ vital_info». Файлы паркета записываются в расположение, указанное опцией
path
, а не в расположение контрольной точки. Примечание . Строка в параметре путь содержит пробел.2. Я также не смог найти никаких файлов паркета в каталоге path
3. Почему у вас есть
.option("format","append")
? Вы пробовали удалить эту строку? Общие журналы включают журналы запуска заданий, можете ли вы поделиться журналами после развертывания приложения в клиентском режиме, напримерspark2-submit --deploy-mode client --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 kafka_spark_patient_vitals.py
. Или записывайте свои журналы из диспетчера cloudera или журналов emr