#windows #pyspark #jupyter-notebook #spark-structured-streaming
Вопрос:
Я использую ноутбук jupyter и работаю над Windows, чтобы написать простое структурированное потоковое приложение spark.
Вот мой код:
import sys
import time
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
spark=SparkSession.builder.appName("StructuredNetworkCount").getOrCreate()
lines=spark.readStream.format("socket").option("host","localhost").option("port",9999).load()
words=lines.select(explode(split(lines.value," ")).alias("word"))
wordCounts=words.groupBy("word").count()
query=wordCounts.writeStream.outputMode("complete").format("console").start()
query.awaitTermination()
Я получаю следующую ошибку, которую я не могу решить
Py4JJavaError: An error occurred while calling o183.start.
: java.io.IOException: (null) entry in command string: null chmod 0644 C:UsersfhaouAppDataLocalTemptemporary-f4c98508-2cbe-4009-bdae-cf93d5b722be.metadata.0666cc4c-27d8-4ccc-bf10-6ec00474859b.tmp
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:770)
at org.apache.hadoop.util.Shell.execCommand(Shell.java:866)
at org.apache.hadoop.util.Shell.execCommand(Shell.java:849)
at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:733)
at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:225)
at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:209)
at org.apache.hadoop.fs.RawLocalFileSystem.createOutputStreamWithMode(RawLocalFileSystem.java:307)
at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:296)
at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:328)
at org.apache.hadoop.fs.FileSystem.primitiveCreate(FileSystem.java:1017)
at org.apache.hadoop.fs.DelegateToFileSystem.createInternal(DelegateToFileSystem.java:99)
at org.apache.hadoop.fs.ChecksumFs$ChecksumFSOutputSummer.<init>(ChecksumFs.java:352)
at org.apache.hadoop.fs.ChecksumFs.createInternal(ChecksumFs.java:399)
at org.apache.hadoop.fs.AbstractFileSystem.create(AbstractFileSystem.java:584)
at org.apache.hadoop.fs.FileContext$3.next(FileContext.java:686)
at org.apache.hadoop.fs.FileContext$3.next(FileContext.java:682)
at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
at org.apache.hadoop.fs.FileContext.create(FileContext.java:688)
at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.createTempFile(CheckpointFileManager.scala:311)
at org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.<init>(CheckpointFileManager.scala:133)
at org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.<init>(CheckpointFileManager.scala:136)
at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.createAtomic(CheckpointFileManager.scala:318)
at org.apache.spark.sql.execution.streaming.StreamMetadata$.write(StreamMetadata.scala:78)
at org.apache.spark.sql.execution.streaming.StreamExecution$anonfun$2.apply(StreamExecution.scala:125)
at org.apache.spark.sql.execution.streaming.StreamExecution$anonfun$2.apply(StreamExecution.scala:123)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.execution.streaming.StreamExecution.<init>(StreamExecution.scala:123)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.<init>(MicroBatchExecution.scala:48)
at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:281)
at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:322)
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:325)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Не могли бы вы посоветовать, пожалуйста. Пожалуйста, обратите внимание, что я запускаю другие потоковые приложения Spark совершенно нормально, но когда я решил перейти на структурированную потоковую передачу, я столкнулся с этой проблемой.
Комментарии:
1. Вы пробовали
display(lines)
? Вы видите какие-нибудь выходные данные?2. Я попробовал отобразить(строки). Я получаю этот фрейм данных[значение: строка], и даже отображение(слова) и отображение(подсчеты слов) работают совершенно нормально.
3. Отображение этого
wordCounts
тоже работает?4. Спасибо @XXavier. Проблема решена Мне просто нужно было исправить HADOOP_HOME в переменных пользователей, чтобы указать путь, по которому binwinutils.exe существует. Я также добавил %HADOOP_HOME%bin в путь к системным переменным.