Проблема со структурированным потоковым потоком записи Spark для консоли

#windows #pyspark #jupyter-notebook #spark-structured-streaming

Вопрос:

Я использую ноутбук jupyter и работаю над Windows, чтобы написать простое структурированное потоковое приложение spark.

Вот мой код:

 import sys
import time
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

spark=SparkSession.builder.appName("StructuredNetworkCount").getOrCreate()

lines=spark.readStream.format("socket").option("host","localhost").option("port",9999).load()
words=lines.select(explode(split(lines.value," ")).alias("word"))
wordCounts=words.groupBy("word").count()

query=wordCounts.writeStream.outputMode("complete").format("console").start()
query.awaitTermination()
 

Я получаю следующую ошибку, которую я не могу решить

 Py4JJavaError: An error occurred while calling o183.start.
: java.io.IOException: (null) entry in command string: null chmod 0644 C:UsersfhaouAppDataLocalTemptemporary-f4c98508-2cbe-4009-bdae-cf93d5b722be.metadata.0666cc4c-27d8-4ccc-bf10-6ec00474859b.tmp
    at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:770)
    at org.apache.hadoop.util.Shell.execCommand(Shell.java:866)
    at org.apache.hadoop.util.Shell.execCommand(Shell.java:849)
    at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:733)
    at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:225)
    at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:209)
    at org.apache.hadoop.fs.RawLocalFileSystem.createOutputStreamWithMode(RawLocalFileSystem.java:307)
    at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:296)
    at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:328)
    at org.apache.hadoop.fs.FileSystem.primitiveCreate(FileSystem.java:1017)
    at org.apache.hadoop.fs.DelegateToFileSystem.createInternal(DelegateToFileSystem.java:99)
    at org.apache.hadoop.fs.ChecksumFs$ChecksumFSOutputSummer.<init>(ChecksumFs.java:352)
    at org.apache.hadoop.fs.ChecksumFs.createInternal(ChecksumFs.java:399)
    at org.apache.hadoop.fs.AbstractFileSystem.create(AbstractFileSystem.java:584)
    at org.apache.hadoop.fs.FileContext$3.next(FileContext.java:686)
    at org.apache.hadoop.fs.FileContext$3.next(FileContext.java:682)
    at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
    at org.apache.hadoop.fs.FileContext.create(FileContext.java:688)
    at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.createTempFile(CheckpointFileManager.scala:311)
    at org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.<init>(CheckpointFileManager.scala:133)
    at org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.<init>(CheckpointFileManager.scala:136)
    at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.createAtomic(CheckpointFileManager.scala:318)
    at org.apache.spark.sql.execution.streaming.StreamMetadata$.write(StreamMetadata.scala:78)
    at org.apache.spark.sql.execution.streaming.StreamExecution$anonfun$2.apply(StreamExecution.scala:125)
    at org.apache.spark.sql.execution.streaming.StreamExecution$anonfun$2.apply(StreamExecution.scala:123)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.sql.execution.streaming.StreamExecution.<init>(StreamExecution.scala:123)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.<init>(MicroBatchExecution.scala:48)
    at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:281)
    at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:322)
    at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:325)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
 

Не могли бы вы посоветовать, пожалуйста. Пожалуйста, обратите внимание, что я запускаю другие потоковые приложения Spark совершенно нормально, но когда я решил перейти на структурированную потоковую передачу, я столкнулся с этой проблемой.

Комментарии:

1. Вы пробовали display(lines) ? Вы видите какие-нибудь выходные данные?

2. Я попробовал отобразить(строки). Я получаю этот фрейм данных[значение: строка], и даже отображение(слова) и отображение(подсчеты слов) работают совершенно нормально.

3. Отображение этого wordCounts тоже работает?

4. Спасибо @XXavier. Проблема решена Мне просто нужно было исправить HADOOP_HOME в переменных пользователей, чтобы указать путь, по которому binwinutils.exe существует. Я также добавил %HADOOP_HOME%bin в путь к системным переменным.