#apache-spark #apache-kafka #spark-streaming
#apache-spark #apache-kafka #искровая потоковая передача
Вопрос:
У меня есть многоузловой кластер spark, и я отправляю свою программу spark на узел, где находится master.
Когда задание отправляется на рабочие узлы, параметр имени ХОСТА выдает нулевое значение. Вот строка, в которой свойства считываются как null .
System.getenv(ИМЯ ХОСТА) не считывается с рабочего узла.
System.out.println("line 76 System.getenv(HOSTNAME)=" System.getenv("HOSTNAME"));
AUDIT_USER, AUDIT_PASSWORD также имеют значение null при чтении (они оба были в файле свойств).
Если я отправляю задание с одним узлом, у меня нет проблем с этими параметрами. Но, если вы отправляете задание в автономном режиме с 6 узлами, я получаю эту проблему.
Я создал одну и ту же папку для файла свойств на всех узлах.
Вот мой код. не могли бы вы сообщить мне, почему System.env не выдает значение null, а мои свойства равны null?
package com.fb.cpd.myapp;
import java.io.Serializable;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.Future;
import org.apache.commons.configuration.ConfigurationConverter;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.configuration.reloading.FileChangedReloadingStrategy;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import kafka.common.TopicAndPartition;
import kafka.message.MessageAndMetadata;
import kafka.serializer.DefaultDecoder;
import kafka.serializer.StringDecoder;
public class GenericLogic implements Serializable {
/**
*
*/
private static final long serialVersionUID = 1L;
private static final Logger logger = LogManager.getLogger(GenericLogic.class);
private PropertiesConfiguration props;
private Producer<String, String> producer = null;
private Future<RecordMetadata> receipt = null;
private RecordMetadata receiptInfo = null;
private ConnectToRDBMS auditor = null;
private ConnectToRDBMS df = null;
private static String myId = null;
private Map<TopicAndPartition, Long> getOffsets(String topic) throws SQLException {
String appName = "myapp";
String TopicName = topic;
Map<TopicAndPartition, Long> topicMap = new HashMap<>(); //
System.out.println("line 64 before making connection");
try {
props = new PropertiesConfiguration("/app/lock/conf/empty.properties");
} catch (ConfigurationException e) { // TODO Auto-generated catch block
System.out.println("Line 70");
e.printStackTrace();
}
try {
System.out.println("line 76 System.getenv(HOSTNAME)=" System.getenv("HOSTNAME"));
auditor = new ConnectToRDBMS(System.getenv("HOSTNAME"), "lockSparkCollector", null, null, null, null, null,
0, props.getString("AUDIT_USER"), props.getString("AUDIT_PASSWORD"),
props.getString("AUDIT_DB_URL"));
} catch (SQLException e) {
logger.error("ASSERT: run() ERROR CONNECTING TO AUDIT DB " e.getMessage());
}
System.out.println("line 64 after making connection");
Statement stmt = null;
String query = "select va_application, topic_name, partition_id, from_offset,until_offset from lock_spark_offsets where va_application = "
"'" appName "'" " and topic_name= " "'" TopicName "'";
System.out.println("query" query);
System.out.println("before query exection");
try {
stmt = auditor.dbConnection.createStatement();
System.out.println("line 81");
ResultSet rs = stmt.executeQuery(query);
System.out.println("line 83");
while (rs.next()) {
System.out.println("pass 1 of Resultset");
System.out.println("getOffsets=" topic.trim() " " rs.getInt("partition_id") " "
rs.getString("until_offset") " " rs.getString("until_offset"));
Integer partition = rs.getInt("partition_id");
TopicAndPartition tp = new TopicAndPartition(topic.trim(), partition);
System.out.println("102");
Long.parseLong(rs.getString("until_offset"));
topicMap.put(tp, Long.parseLong(rs.getString("until_offset")));
System.out.println("105");
}
System.out.println("after populating topic map");
} catch (
SQLException e) {
System.out.println("printing exception");
e.printStackTrace();
} finally {
if (stmt != null) {
System.out.println("closing statement");
stmt.close();
}
}
return topicMap;
}
public void setDefaultProperties() {
FileChangedReloadingStrategy strategy = new FileChangedReloadingStrategy();
strategy.setRefreshDelay(10000);
System.out.println("Line 45");
// supply the properties file.
try {
props = new PropertiesConfiguration("/app/lock/conf/empty.properties");
} catch (ConfigurationException e) {
// TODO Auto-generated catch block
System.out.println("Line 51");
e.printStackTrace();
}
props.setReloadingStrategy(strategy);
System.out.println("Line 56");
// Producer configs
if (!props.containsKey("acks")) {
props.setProperty("acks", "1");
}
if (!props.containsKey("retries")) {
props.setProperty("retries", "1000");
}
if (!props.containsKey("compression.type")) {
props.setProperty("compression.type", "gzip");
}
if (!props.containsKey("request.timeout.ms")) {
props.setProperty("request.timeout.ms", "600000");
}
if (!props.containsKey("batch.size")) {
props.setProperty("batch.size", "32768");
}
if (!props.containsKey("buffer.memory")) {
props.setProperty("buffer.memory", "134217728");
}
if (!props.containsKey("block.on.buffer.full")) {
props.setProperty("block.on.buffer.full", "true");
}
if (!props.containsKey("SHUTDOWN")) {
props.setProperty("SHUTDOWN", "false");
}
if (!props.containsKey("producer.topic")) {
props.setProperty("producer.topic", "mytopic1");
}
Properties producer_props = ConfigurationConverter.getProperties(props);
producer_props.setProperty("bootstrap.servers", props.getString("target.bootstrap.servers"));
producer_props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producer_props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // ????
this.producer = new KafkaProducer<String, String>(producer_props);
System.out.println("Line 107");
}
public void PublishMessages(String st) {
try {
System.out.println("Line 111");
String key = UUID.randomUUID().toString().replace("-", "");
System.out.println("Started Producing...");
receipt = producer.send(new ProducerRecord<String, String>(props.getString("producer.topic"), key, // Key
st));
System.out.println("After Completion of Producing Producing");
} catch (Exception e) {
e.printStackTrace();
System.out.println("Exception in PublishMessages ");
}
}
public void DBConnect() {
try {
auditor = new ConnectToRDBMS(System.getenv("HOSTNAME"), "myapp", props.getString("consumer.topic"), null,
null, null, null, 0, props.getString("AUDIT_USER"), props.getString("AUDIT_PASSWORD"),
props.getString("AUDIT_DB_URL"));
} catch (SQLException e) {
logger.error("ASSERT: run() ERROR CONNECTING TO AUDIT DB " e.getMessage());
return;
}
}
private void writeToDB(Long startTime, Integer partnId, String fromOffset, String untilOffset, Integer count) {
this.auditor.audit(startTime, partnId, fromOffset, untilOffset, count);
}
/**
*
* @param jsc
* @param topicSet
* @throws Exception
*/
public static void main(String[] args) {
String topicNames = "MySourceTopic";
GenericLogic ec = new GenericLogic();
Map<TopicAndPartition, Long> topicMap = null;
try {
topicMap = ec.getOffsets("MySourceTopic");
} catch (SQLException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
boolean clusterMode = false;
Integer batchDuration = Integer.parseInt("30000");
JavaSparkContext sparkConf = new JavaSparkContext("abcd.net:7077", "Kafka-Spark-Integration");
sparkConf.getConf().set("spark.local.ip", "lock-dt-a4d.xyz.com");
sparkConf.getConf().set("spark.eventLog.enabled", "false");
sparkConf.getConf().set("spark.shuffle.blockTransferService", "nio");
JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, new Duration(10000));
Map<String, String> kafkaParams = new HashMap<String, String>();
String pollInterval = "10000";
String zookeeper = "lock-dt-a5d.xyz.com:2181,lock-dt-a6d.xyz.com:2181";
kafkaParams.put("metadata.broker.list", "lock-dt-a5d.xyz.com:9092,lock-dt-a6d.xyz.com:9092");
kafkaParams.put("group.id", "Consumer");
kafkaParams.put("client.id", "Consumer");
kafkaParams.put("zookeeper.connect", zookeeper);
JavaInputDStream<byte[]> directKafkaStream = KafkaUtils.createDirectStream(jsc, String.class, byte[].class,
StringDecoder.class, DefaultDecoder.class, byte[].class, kafkaParams, topicMap,
(Function<MessageAndMetadata<String, byte[]>, byte[]>) MessageAndMetadata::message);
directKafkaStream.foreachRDD(rdd -> {
if (rdd.isEmpty()) {
System.out.println("No events polled in last " pollInterval " milli seconds");
return;
}
rdd.foreachPartition(itr -> {
Integer partnId = TaskContext.get().partitionId();
Long systime = System.nanoTime();
Map<String, String> hmap = new HashMap<String, String>();
GenericLogic ec2 = new GenericLogic();
ec2.setDefaultProperties();
ec2.DBConnect();
try {
while (itr.hasNext()) {
System.out.println("232");
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
});
});
jsc.start();
jsc.awaitTermination();
}
}
Комментарии:
1. в какой файл конфигурации вы экспортируете ИМЯ ХОСТА?
2. вы также поместили тот же файл свойств на рабочие машины?
3. у меня есть файл свойств на всех узлах. ИМЯ ХОСТА должно быть общим, верно ???? мне не нужно указывать его в файле свойств. Не так ли?
4. когда вы запускаете его локально, какое значение присваивается System.env (ИМЯ ХОСТА)
5. Я не пробовал это локально. Но когда я выполнил задание, System.env (ИМЯ ХОСТА) печатает адрес главного узла.
Ответ №1:
Я начал мази с start-slaves.sh . В этом проблема. Я должен запустить workers, указав основные адреса.
Ответ №2:
Не могли бы вы сообщить нам, пожалуйста, ОС всех узлов, и если вы убедились, что запись на главном узле экспортирует ИМЯ ХОСТА. Ответ на ваш вопрос будет лучше, если вы сообщите нам о деталях вашей операционной системы.
Может быть неправильно связано с вашим контекстом, но только для информационной системы.getenv(«ИМЯ ХОСТА») может не указывать имя хоста на всех платформах (например, Ubuntu или Mac).
Лучше, почему бы не экспортировать ИМЯ ХОСТА.
Я предполагаю, что вы уже проверили, что props не равен null или empty? Если нет, выполните отладку и проверьте, загружены ли файлы свойств или нет, и если загружены, это не пустой файл свойств, и, следовательно, он загрузил свойства из файла.
Глядя на вашу проблему (не только переменная среды, но и свойства также не возвращаются, возможно, что-то не так с файлом свойств или его относительным расположением на разных компьютерах. Если это не точная копия, которая размещена на разных компьютерах, пожалуйста, также проверьте, подходит ли этот файл для Linux (не записывается и не редактируется в Windows и не помещается в Linux).