System.env и значения в файле свойств не считываются на рабочих узлах кластера Spark

#apache-spark #apache-kafka #spark-streaming

#apache-spark #apache-kafka #искровая потоковая передача

Вопрос:

У меня есть многоузловой кластер spark, и я отправляю свою программу spark на узел, где находится master.

Когда задание отправляется на рабочие узлы, параметр имени ХОСТА выдает нулевое значение. Вот строка, в которой свойства считываются как null .

System.getenv(ИМЯ ХОСТА) не считывается с рабочего узла.

         System.out.println("line 76 System.getenv(HOSTNAME)="   System.getenv("HOSTNAME"));
  

AUDIT_USER, AUDIT_PASSWORD также имеют значение null при чтении (они оба были в файле свойств).

Если я отправляю задание с одним узлом, у меня нет проблем с этими параметрами. Но, если вы отправляете задание в автономном режиме с 6 узлами, я получаю эту проблему.

Я создал одну и ту же папку для файла свойств на всех узлах.

Вот мой код. не могли бы вы сообщить мне, почему System.env не выдает значение null, а мои свойства равны null?

 package com.fb.cpd.myapp;

import java.io.Serializable;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.Future;

import org.apache.commons.configuration.ConfigurationConverter;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.configuration.reloading.FileChangedReloadingStrategy;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;

import kafka.common.TopicAndPartition;
import kafka.message.MessageAndMetadata;
import kafka.serializer.DefaultDecoder;
import kafka.serializer.StringDecoder;

public class GenericLogic implements Serializable {
    /**
     * 
     */
    private static final long serialVersionUID = 1L;
    private static final Logger logger = LogManager.getLogger(GenericLogic.class);
    private PropertiesConfiguration props;
    private Producer<String, String> producer = null;
    private Future<RecordMetadata> receipt = null;
    private RecordMetadata receiptInfo = null;
    private ConnectToRDBMS auditor = null;
    private ConnectToRDBMS df = null;

    private static String myId = null;

    private Map<TopicAndPartition, Long> getOffsets(String topic) throws SQLException {
        String appName = "myapp";
        String TopicName = topic;
        Map<TopicAndPartition, Long> topicMap = new HashMap<>(); //
        System.out.println("line 64 before making connection");

        try {
            props = new PropertiesConfiguration("/app/lock/conf/empty.properties");
        } catch (ConfigurationException e) { // TODO Auto-generated catch block
            System.out.println("Line 70");
            e.printStackTrace();
        }

        try {
            System.out.println("line 76 System.getenv(HOSTNAME)="   System.getenv("HOSTNAME"));
            auditor = new ConnectToRDBMS(System.getenv("HOSTNAME"), "lockSparkCollector", null, null, null, null, null,
                    0, props.getString("AUDIT_USER"), props.getString("AUDIT_PASSWORD"),
                    props.getString("AUDIT_DB_URL"));
        } catch (SQLException e) {
            logger.error("ASSERT: run() ERROR CONNECTING TO AUDIT DB "   e.getMessage());
        }
        System.out.println("line 64 after making connection");

        Statement stmt = null;

        String query = "select va_application, topic_name, partition_id, from_offset,until_offset from lock_spark_offsets where va_application = "
                  "'"   appName   "'"   " and topic_name= "   "'"   TopicName   "'";
        System.out.println("query"   query);
        System.out.println("before query exection");
        try {
            stmt = auditor.dbConnection.createStatement();
            System.out.println("line 81");

            ResultSet rs = stmt.executeQuery(query);
            System.out.println("line 83");
            while (rs.next()) {
                System.out.println("pass 1 of Resultset");
                System.out.println("getOffsets="   topic.trim()   " "   rs.getInt("partition_id")   " "
                          rs.getString("until_offset")   " "   rs.getString("until_offset"));
                Integer partition = rs.getInt("partition_id");

                TopicAndPartition tp = new TopicAndPartition(topic.trim(), partition);
                System.out.println("102");
                Long.parseLong(rs.getString("until_offset"));
                topicMap.put(tp, Long.parseLong(rs.getString("until_offset")));
                System.out.println("105");

            }
            System.out.println("after populating topic map");

        } catch (

        SQLException e) {
            System.out.println("printing exception");
            e.printStackTrace();
        } finally {
            if (stmt != null) {
                System.out.println("closing statement");
                stmt.close();
            }
        }
        return topicMap;
    }

    public void setDefaultProperties() {
        FileChangedReloadingStrategy strategy = new FileChangedReloadingStrategy();
        strategy.setRefreshDelay(10000);
        System.out.println("Line 45");
        // supply the properties file.
        try {
            props = new PropertiesConfiguration("/app/lock/conf/empty.properties");
        } catch (ConfigurationException e) {
            // TODO Auto-generated catch block
            System.out.println("Line 51");
            e.printStackTrace();
        }
        props.setReloadingStrategy(strategy);
        System.out.println("Line 56");

        // Producer configs
        if (!props.containsKey("acks")) {
            props.setProperty("acks", "1");
        }

        if (!props.containsKey("retries")) {
            props.setProperty("retries", "1000");
        }

        if (!props.containsKey("compression.type")) {
            props.setProperty("compression.type", "gzip");
        }

        if (!props.containsKey("request.timeout.ms")) {
            props.setProperty("request.timeout.ms", "600000");
        }

        if (!props.containsKey("batch.size")) {
            props.setProperty("batch.size", "32768");
        }

        if (!props.containsKey("buffer.memory")) {
            props.setProperty("buffer.memory", "134217728");
        }

        if (!props.containsKey("block.on.buffer.full")) {
            props.setProperty("block.on.buffer.full", "true");
        }

        if (!props.containsKey("SHUTDOWN")) {
            props.setProperty("SHUTDOWN", "false");
        }

        if (!props.containsKey("producer.topic")) {
            props.setProperty("producer.topic", "mytopic1");
        }

        Properties producer_props = ConfigurationConverter.getProperties(props);

        producer_props.setProperty("bootstrap.servers", props.getString("target.bootstrap.servers"));
        producer_props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        producer_props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // ????

        this.producer = new KafkaProducer<String, String>(producer_props);
        System.out.println("Line 107");

    }

    public void PublishMessages(String st) {

        try {
            System.out.println("Line 111");
            String key = UUID.randomUUID().toString().replace("-", "");
            System.out.println("Started Producing...");

            receipt = producer.send(new ProducerRecord<String, String>(props.getString("producer.topic"), key, // Key
                    st));
            System.out.println("After Completion of Producing Producing");
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("Exception in PublishMessages ");
        }

    }

    public void DBConnect() {
        try {
            auditor = new ConnectToRDBMS(System.getenv("HOSTNAME"), "myapp", props.getString("consumer.topic"), null,
                    null, null, null, 0, props.getString("AUDIT_USER"), props.getString("AUDIT_PASSWORD"),
                    props.getString("AUDIT_DB_URL"));
        } catch (SQLException e) {
            logger.error("ASSERT: run() ERROR CONNECTING TO AUDIT DB "   e.getMessage());
            return;
        }
    }

    private void writeToDB(Long startTime, Integer partnId, String fromOffset, String untilOffset, Integer count) {
        this.auditor.audit(startTime, partnId, fromOffset, untilOffset, count);

    }

    /**
     * 
     * @param jsc
     * @param topicSet
     * @throws Exception
     */
    public static void main(String[] args) {
        String topicNames = "MySourceTopic";
        GenericLogic ec = new GenericLogic();
        Map<TopicAndPartition, Long> topicMap = null;
        try {

            topicMap = ec.getOffsets("MySourceTopic");

        } catch (SQLException e1) {
            // TODO Auto-generated catch block
            e1.printStackTrace();
        }

        boolean clusterMode = false;

        Integer batchDuration = Integer.parseInt("30000");
        JavaSparkContext sparkConf = new JavaSparkContext("abcd.net:7077", "Kafka-Spark-Integration");

        sparkConf.getConf().set("spark.local.ip", "lock-dt-a4d.xyz.com");
        sparkConf.getConf().set("spark.eventLog.enabled", "false");
        sparkConf.getConf().set("spark.shuffle.blockTransferService", "nio");

        JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, new Duration(10000));
        Map<String, String> kafkaParams = new HashMap<String, String>();
        String pollInterval = "10000";
        String zookeeper = "lock-dt-a5d.xyz.com:2181,lock-dt-a6d.xyz.com:2181";

        kafkaParams.put("metadata.broker.list", "lock-dt-a5d.xyz.com:9092,lock-dt-a6d.xyz.com:9092");
        kafkaParams.put("group.id", "Consumer");
        kafkaParams.put("client.id", "Consumer");
        kafkaParams.put("zookeeper.connect", zookeeper);

        JavaInputDStream<byte[]> directKafkaStream = KafkaUtils.createDirectStream(jsc, String.class, byte[].class,
                StringDecoder.class, DefaultDecoder.class, byte[].class, kafkaParams, topicMap,
                (Function<MessageAndMetadata<String, byte[]>, byte[]>) MessageAndMetadata::message);

        directKafkaStream.foreachRDD(rdd -> {
            if (rdd.isEmpty()) {
                System.out.println("No events polled in last "   pollInterval   " milli seconds");
                return;
            }

            rdd.foreachPartition(itr -> {
                Integer partnId = TaskContext.get().partitionId();
                Long systime = System.nanoTime();
                Map<String, String> hmap = new HashMap<String, String>();

                GenericLogic ec2 = new GenericLogic();
                ec2.setDefaultProperties();
                ec2.DBConnect();

                try {

                    while (itr.hasNext()) {
                        System.out.println("232");
                    }

                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }

            });
        });
        jsc.start();
        jsc.awaitTermination();
    }

}
  

Комментарии:

1. в какой файл конфигурации вы экспортируете ИМЯ ХОСТА?

2. вы также поместили тот же файл свойств на рабочие машины?

3. у меня есть файл свойств на всех узлах. ИМЯ ХОСТА должно быть общим, верно ???? мне не нужно указывать его в файле свойств. Не так ли?

4. когда вы запускаете его локально, какое значение присваивается System.env (ИМЯ ХОСТА)

5. Я не пробовал это локально. Но когда я выполнил задание, System.env (ИМЯ ХОСТА) печатает адрес главного узла.

Ответ №1:

Я начал мази с start-slaves.sh . В этом проблема. Я должен запустить workers, указав основные адреса.

Ответ №2:

Не могли бы вы сообщить нам, пожалуйста, ОС всех узлов, и если вы убедились, что запись на главном узле экспортирует ИМЯ ХОСТА. Ответ на ваш вопрос будет лучше, если вы сообщите нам о деталях вашей операционной системы.

Может быть неправильно связано с вашим контекстом, но только для информационной системы.getenv(«ИМЯ ХОСТА») может не указывать имя хоста на всех платформах (например, Ubuntu или Mac).

Лучше, почему бы не экспортировать ИМЯ ХОСТА.

Я предполагаю, что вы уже проверили, что props не равен null или empty? Если нет, выполните отладку и проверьте, загружены ли файлы свойств или нет, и если загружены, это не пустой файл свойств, и, следовательно, он загрузил свойства из файла.

Глядя на вашу проблему (не только переменная среды, но и свойства также не возвращаются, возможно, что-то не так с файлом свойств или его относительным расположением на разных компьютерах. Если это не точная копия, которая размещена на разных компьютерах, пожалуйста, также проверьте, подходит ли этот файл для Linux (не записывается и не редактируется в Windows и не помещается в Linux).