# #pyspark #spark-streaming #google-cloud-dataproc
Вопрос:
Сервис: GCP Dataproc и DataprocHub
Сервис: Cloudkarafka (www.cloudkarafka.com), простой и быстрый способ запустить свой сервис kafka.
1: GCP-DataprocHub из коробки дает искру 2.4.8.
2. Создайте кластер Dataproc с определенной версией.
gcloud shell:
gcloud dataproc clusters create dataproc-spark312 --image-version=2.0-ubuntu18 --region=us-central1 --single-node
3: Экспортируйте его и сохраните в корзине gs
gcloud dataproc clusters export dataproc-spark312 --destination dataproc-spark312.yaml --region us-central1
gsutil cp dataproc-spark312.yaml gs://gcp-learn-lib/
4: Создайте файл ENV
DATAPROC_CONFIGS=gs://gcp-learn-lib/dataproc-spark312.yaml
NOTEBOOKS_LOCATION=gs://gcp-learn-notebooks/notebooks
DATAPROC_LOCATIONS_LIST=a,b,c
Сохраните как: dataproc-hub-config.env и загрузите в gs: bucket
5. Создайте концентратор данных и свяжите вышеупомянутый кластер. Раздел: «пользовательская настройка env» ключ:значение файла контейнера env: gs://gcp-learn-lib/dataproc-концентратор-конфигурация.env
6: Завершите создание
7: Нажмите на ссылку Jupyter
8: Он должен отображать ваш кластер, выберите его. и выберите регион (такой же, как исходный кластер на шаге 2)
9: Dataproc -> Кластер ->> Нажмите на Кластер (начинается с концентратора -) — > > > Экземпляры виртуальных машин ->>>> SSH Загрузите и скопируйте банки кафки в
cd /usr/lib/spark/jars/
wget https://repo1.maven.org/maven2/org/apache/commons/commons-pool2/2.6.2/commons-pool2-2.6.2.jar
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/2.6.0/kafka-clients-2.6.0.jar
wget https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.12/3.1.2/spark-sql-kafka-0-10_2.12-3.1.2.jar
wget https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-kafka-0-10_2.12/3.1.2/spark-streaming-kafka-0-10_2.12-3.1.2.jar
wget https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-kafka-0-10-assembly_2.12/3.1.2/spark-streaming-kafka-0-10-assembly_2.12-3.1.2.jar
10: Подготовьте файл JAAS и CA
vi /tmp/cloudkarafka_gcp_oct2021.jaas
vi /tmp/cloudkarafka_gcp_oct2021.ca
11: Добавить в конец файла:
sudo vi /etc/spark/conf/spark-defaults.conf
spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/tmp/cloudkarafka_gcp_oct2021.jaas -Dsasl.jaas.config=/tmp/cloudkarafka_gcp_oct2021.jaas -Dssl.ca.location=/tmp/cloudkarafka_gcp_oct2021.ca
spark.driver.extraJavaOptions=-Djava.security.auth.login.config=/tmp/cloudkarafka_gcp_oct2021.jaas
12: Возвращайся на Юпитерхаб. Перезагрузите ядро — Проверьте свой код
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "server1:9094,serer2:9094,server3:9094")
.option("subscribe", "foo-default")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "SCRAM-SHA-256")
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream.format("kafka")
.option("kafka.bootstrap.servers", "server1:9094,serer2:9094,server3:9094")
.option("topic", "foo-test")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "SCRAM-SHA-256")
.option("checkpointLocation", "/tmp/stream/kafkatest")
.start()
13: Наслаждайтесь
==============
Комментарии:
1. Это не вопрос, верно?