Потоковая передача PySpark из Cloudkarfka в Google Dataproc

# #pyspark #spark-streaming #google-cloud-dataproc

Вопрос:

Сервис: GCP Dataproc и DataprocHub

Сервис: Cloudkarafka (www.cloudkarafka.com), простой и быстрый способ запустить свой сервис kafka.

1: GCP-DataprocHub из коробки дает искру 2.4.8.

2. Создайте кластер Dataproc с определенной версией.

 gcloud shell:
gcloud dataproc clusters create dataproc-spark312  --image-version=2.0-ubuntu18  --region=us-central1 --single-node
 

3: Экспортируйте его и сохраните в корзине gs

 gcloud dataproc clusters export dataproc-spark312 --destination dataproc-spark312.yaml --region us-central1
gsutil cp dataproc-spark312.yaml gs://gcp-learn-lib/
 

4: Создайте файл ENV

 DATAPROC_CONFIGS=gs://gcp-learn-lib/dataproc-spark312.yaml
NOTEBOOKS_LOCATION=gs://gcp-learn-notebooks/notebooks
DATAPROC_LOCATIONS_LIST=a,b,c
 

Сохраните как: dataproc-hub-config.env и загрузите в gs: bucket

5. Создайте концентратор данных и свяжите вышеупомянутый кластер. Раздел: «пользовательская настройка env» ключ:значение файла контейнера env: gs://gcp-learn-lib/dataproc-концентратор-конфигурация.env

6: Завершите создание

7: Нажмите на ссылку Jupyter

8: Он должен отображать ваш кластер, выберите его. и выберите регион (такой же, как исходный кластер на шаге 2)

9: Dataproc -> Кластер ->> Нажмите на Кластер (начинается с концентратора -) — > > > Экземпляры виртуальных машин ->>>> SSH Загрузите и скопируйте банки кафки в

 cd /usr/lib/spark/jars/
wget https://repo1.maven.org/maven2/org/apache/commons/commons-pool2/2.6.2/commons-pool2-2.6.2.jar
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/2.6.0/kafka-clients-2.6.0.jar
wget https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.12/3.1.2/spark-sql-kafka-0-10_2.12-3.1.2.jar
wget https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-kafka-0-10_2.12/3.1.2/spark-streaming-kafka-0-10_2.12-3.1.2.jar
wget https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-kafka-0-10-assembly_2.12/3.1.2/spark-streaming-kafka-0-10-assembly_2.12-3.1.2.jar
 

10: Подготовьте файл JAAS и CA

 vi /tmp/cloudkarafka_gcp_oct2021.jaas
vi /tmp/cloudkarafka_gcp_oct2021.ca
 

11: Добавить в конец файла:

 sudo vi /etc/spark/conf/spark-defaults.conf
spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/tmp/cloudkarafka_gcp_oct2021.jaas -Dsasl.jaas.config=/tmp/cloudkarafka_gcp_oct2021.jaas -Dssl.ca.location=/tmp/cloudkarafka_gcp_oct2021.ca
spark.driver.extraJavaOptions=-Djava.security.auth.login.config=/tmp/cloudkarafka_gcp_oct2021.jaas
 

12: Возвращайся на Юпитерхаб. Перезагрузите ядро — Проверьте свой код

 spark.readStream.format("kafka") 
.option("kafka.bootstrap.servers", "server1:9094,serer2:9094,server3:9094") 
.option("subscribe", "foo-default") 
.option("kafka.security.protocol", "SASL_SSL") 
.option("kafka.sasl.mechanism", "SCRAM-SHA-256") 
.load() 
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") 
.writeStream.format("kafka") 
.option("kafka.bootstrap.servers", "server1:9094,serer2:9094,server3:9094") 
.option("topic", "foo-test") 
.option("kafka.security.protocol", "SASL_SSL") 
.option("kafka.sasl.mechanism", "SCRAM-SHA-256") 
.option("checkpointLocation", "/tmp/stream/kafkatest") 
.start()
 

13: Наслаждайтесь

==============

Комментарии:

1. Это не вопрос, верно?