#hadoop #hortonworks-data-platform #confluent-platform #apache-kafka-connect
#hadoop #hortonworks-платформа данных #confluent-платформа #apache-kafka-connect
Вопрос:
У меня есть 2 кластера:
- один в доме с confluent (3.0.0-1)
- один в AWS, с hadoop (hdp 2.4)
Я пытаюсь использовать соединитель hdfs для записи из confluent в hadoop.
Короче говоря: соединитель пытается подключиться к частному IP-адресу кластера hadoop вместо использования имени хоста. Во внутреннем кластере /etc/hosts был обновлен, чтобы преобразовать внутренние имена хостов hadoop в соответствующий общедоступный IP.
Я использую распределенный соединитель, у меня есть несколько файлов JSON соединителя следующим образом:
{
"name": "sent-connector",
"connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
"tasks.max": "1",
"topics": "sent",
"topics.dir":"/kafka-connect/topics",
"logs.dir":"/kafka-connect/wal",
"hdfs.url": "hdfs://ambari:8020",
"hadoop.conf.dir": "/etc/hadoop/conf",
"hadoop.home": "/usr/hdp/current/hadoop-client",
"flush.size": "100",
"hive.integration":"true",
"hive.metastore.uris":"thrift://ambari:9083",
"hive.database":"events",
"hive.home": "/usr/hdp/current/hive-client",
"hive.conf.dir": "/etc/hive/conf",
"schema.compatibility":"FULL",
"partitioner.class": "io.confluent.connect.hdfs.partitioner.HourlyPartitioner",
"path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH/",
"locale": "C",
"timezone": "UTC",
"rotate.interval.ms": "2000"
}
и рабочий определяется как таковой:
rest.port=8083
bootstrap.servers=<eth0 IP of the server>:9092
group.id=dp2hdfs
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=schemareg.dpe.webpower.io
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=schemareg.dpe.webpower.io
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
config.storage.topic=k2hdfs-configs
offset.storage.topic=k2hdfs-offsets
status.storage.topic=k2hdfs-statuses
debug=true
Несколько замечаний:
- / kafka-connect существует в hdfs, доступен для записи во всем мире
- 3 темы (*.storage.topic) действительно существуют
- У меня есть один рабочий, работающий на каждом (3) серверах с kafka broker (на всех брокерах также есть реестр схем, rest API и сервер zookeeper)
- Я установил для dfs.client.use.datanode.hostname значение true, и это свойство настраивается на клиенте в $HADOOP_HOME/hdfs-site.xml
Я вижу, что подкаталоги /kafka-connect создаются так же, как и метаданные hive. Когда я запускаю соединитель, появляется сообщение:
ИНФОРМАЦИОННОЕ исключение в createBlockOutputStream (org.apache.hadoop.hdfs.DFSClient: 1471) org.apache.hadoop.net.ConnectTimeoutException: тайм-аут 60000 миллис в ожидании готовности канала к подключению. ch: java.nio.channels.SocketChannel[ожидающее подключения удаление в org.apache.hadoop.net.NetUtils.connect(NetUtils.java: 533) в org.apache.hadoop.hdfs.DFSOutputStream.createSocketForPipeline(DFSOutputStream.java:1610) в org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1408) в org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1361) в org.apache.hadoop.hdfs.DFSOutputStream $DataStreamer.run(DFSOutputStream.java:588) ИНФОРМАЦИЯ об отказе от BP-429601535-10.0.0.167-1471011443948 :blk_1073742319_1495 (org.apache.hadoop.hdfs.DFSClient:1364) ИНФОРМАЦИЯ, исключая datanode 10.0.0.231:50010 (org.apache.hadoop.hdfs.DFSClient: 1368) [повторите и повторите с другими узлами данных]
Есть идеи, как это исправить? Похоже, что confluent получает IP напрямую, а не имя хоста.
Комментарии:
1. Короткая история: клиент HDFS связывается с NameNode (используя его общедоступный адрес) и запрашивает создание нового файла; NN отвечает с помощью host:порт DataNode, который был назначен в качестве основного записывающего устройства, известного как NameNode ; бум.
2. Если у вас есть доступ к конфигурации вашего маршрутизатора, вы можете попробовать привязать (или, скорее, похлопать) все возможные IP: порты для AWS DataNodes к их общедоступным эквивалентам, чтобы ваш клиент следовал (тупой) Инструкции NN, но трафик автоматически туннелируется в (правильное) место назначения. Фу.
3. Моя проблема в том, что NN должен использовать hostname:port, а не IP: port. Это должно работать так, как в /etc/hosts кластера kafka, я привязал эти имена хостов к общедоступному IP-адресу DNs.
4. В конфигурации NN указаны настройки DNS . Не уверен, что это волнует
/etc/hosts
, потому что в любом случае он не будет работать с Kerberos (Krb по умолчанию требует канонических DNS-имен, чтобы избежать атак с подменой IP)