соединитель kafka HDFS, подключающийся к частному IP вместо имени хоста при настройке с несколькими постоянными токами

#hadoop #hortonworks-data-platform #confluent-platform #apache-kafka-connect

#hadoop #hortonworks-платформа данных #confluent-платформа #apache-kafka-connect

Вопрос:

У меня есть 2 кластера:

  • один в доме с confluent (3.0.0-1)
  • один в AWS, с hadoop (hdp 2.4)

Я пытаюсь использовать соединитель hdfs для записи из confluent в hadoop.

Короче говоря: соединитель пытается подключиться к частному IP-адресу кластера hadoop вместо использования имени хоста. Во внутреннем кластере /etc/hosts был обновлен, чтобы преобразовать внутренние имена хостов hadoop в соответствующий общедоступный IP.

Я использую распределенный соединитель, у меня есть несколько файлов JSON соединителя следующим образом:

 {
   "name": "sent-connector",

   "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
   "tasks.max": "1",
   "topics": "sent",

   "topics.dir":"/kafka-connect/topics",
   "logs.dir":"/kafka-connect/wal",
   "hdfs.url": "hdfs://ambari:8020",

   "hadoop.conf.dir": "/etc/hadoop/conf",
   "hadoop.home": "/usr/hdp/current/hadoop-client",

   "flush.size": "100",

   "hive.integration":"true",
   "hive.metastore.uris":"thrift://ambari:9083",
   "hive.database":"events",
   "hive.home": "/usr/hdp/current/hive-client",
   "hive.conf.dir": "/etc/hive/conf",

   "schema.compatibility":"FULL",

   "partitioner.class": "io.confluent.connect.hdfs.partitioner.HourlyPartitioner",
   "path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH/",
   "locale": "C",
   "timezone": "UTC",

   "rotate.interval.ms": "2000"

}
  

и рабочий определяется как таковой:

 rest.port=8083
bootstrap.servers=<eth0 IP of the server>:9092
group.id=dp2hdfs
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=schemareg.dpe.webpower.io
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=schemareg.dpe.webpower.io
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
config.storage.topic=k2hdfs-configs
offset.storage.topic=k2hdfs-offsets
status.storage.topic=k2hdfs-statuses
debug=true
  

Несколько замечаний:

  • / kafka-connect существует в hdfs, доступен для записи во всем мире
  • 3 темы (*.storage.topic) действительно существуют
  • У меня есть один рабочий, работающий на каждом (3) серверах с kafka broker (на всех брокерах также есть реестр схем, rest API и сервер zookeeper)
  • Я установил для dfs.client.use.datanode.hostname значение true, и это свойство настраивается на клиенте в $HADOOP_HOME/hdfs-site.xml

Я вижу, что подкаталоги /kafka-connect создаются так же, как и метаданные hive. Когда я запускаю соединитель, появляется сообщение:

ИНФОРМАЦИОННОЕ исключение в createBlockOutputStream (org.apache.hadoop.hdfs.DFSClient: 1471) org.apache.hadoop.net.ConnectTimeoutException: тайм-аут 60000 миллис в ожидании готовности канала к подключению. ch: java.nio.channels.SocketChannel[ожидающее подключения удаление в org.apache.hadoop.net.NetUtils.connect(NetUtils.java: 533) в org.apache.hadoop.hdfs.DFSOutputStream.createSocketForPipeline(DFSOutputStream.java:1610) в org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1408) в org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1361) в org.apache.hadoop.hdfs.DFSOutputStream $DataStreamer.run(DFSOutputStream.java:588) ИНФОРМАЦИЯ об отказе от BP-429601535-10.0.0.167-1471011443948 :blk_1073742319_1495 (org.apache.hadoop.hdfs.DFSClient:1364) ИНФОРМАЦИЯ, исключая datanode 10.0.0.231:50010 (org.apache.hadoop.hdfs.DFSClient: 1368) [повторите и повторите с другими узлами данных]

Есть идеи, как это исправить? Похоже, что confluent получает IP напрямую, а не имя хоста.

Комментарии:

1. Короткая история: клиент HDFS связывается с NameNode (используя его общедоступный адрес) и запрашивает создание нового файла; NN отвечает с помощью host:порт DataNode, который был назначен в качестве основного записывающего устройства, известного как NameNode ; бум.

2. Если у вас есть доступ к конфигурации вашего маршрутизатора, вы можете попробовать привязать (или, скорее, похлопать) все возможные IP: порты для AWS DataNodes к их общедоступным эквивалентам, чтобы ваш клиент следовал (тупой) Инструкции NN, но трафик автоматически туннелируется в (правильное) место назначения. Фу.

3. Моя проблема в том, что NN должен использовать hostname:port, а не IP: port. Это должно работать так, как в /etc/hosts кластера kafka, я привязал эти имена хостов к общедоступному IP-адресу DNs.

4. В конфигурации NN указаны настройки DNS . Не уверен, что это волнует /etc/hosts , потому что в любом случае он не будет работать с Kerberos (Krb по умолчанию требует канонических DNS-имен, чтобы избежать атак с подменой IP)