Невозможно отправить сообщение в тему kafka из потока neo4j

#java #apache-kafka #neo4j #docker-compose #confluent-platform

#java #apache-kafka #neo4j #docker-compose #confluent-платформа

Вопрос:

Я пытаюсь отправлять сообщения из потока neo4j в тему kafka.

это моя конфигурация docker-compose для neo4j

 version: '3'
services:
  neo4j:
    image: neo4j:3.5.8-enterprise
    hostname: klinks
    container_name: klinks
    volumes:
      - ./plugins:/plugins
    ports:
    - "7474:7474"
    - "7687:7687"
    environment:
      NEO4J_dbms_logs_query_threshold: 0s
      NEO4J_dbms_logs_query_enabled: "true"
      NEO4J_cypher_lenient__create__relationship: "true"
      NEO4J_apoc_trigger_enabled: "true"
      NEO4J_dbms_security_procedures_unrestricted: "apoc.*,algo.*"
      NEO4J_ACCEPT_LICENSE_AGREEMENT: "yes"
      NEO4J_dbms_logs_debug_level: DEBUG
      NEO4J_dbms_memory_heap_max__size: 2G
      NEO4J_dbms_memory_heap_initial__size: 1G
      NEO4J_dbms_memory_pagecache_size: 1G
      NEO4J_AUTH: neo4j/admin  #la password è admin
      NEO4J_ACCEPT_LICENSE_AGREEMENT: "yes"
      NEO4J_kafka_group_id: p2
      NEO4J_streams_sink_topic_cypher_friends: "
        MERGE (p1:Person { name: event.initiated }) 
        MERGE (p2:Person { name: event.accepted }) 
        CREATE (p1)-[:FRIENDS { when: event.date }]->(p2)
      "
      NEO4J_streams_sink_enabled: "true"
      NEO4J_streams_procedures_enabled: "true"
      NEO4J_streams_source_enabled: "true"
      NEO4J_kafka_zookeeper_connect: docker.for.win.localhost:2181
      NEO4J_kafka_bootstrap_servers: docker.for.win.localhost:9092
      NEO4J_streams_source_topic_nodes_recommendations: Person{*}
      NEO4J_streams_source_schema_polling_interval: 10000
      NEO4J_kafka_acks: 1
      NEO4J_kafka_num_partitions: 1
      NEO4J_kafka_retries: 2
      NEO4J_kafka_batch_size: 16384
      NEO4J_kafka_buffer_memory: 33554432
      NEO4J_kafka_session_timeout_ms: 15000
      NEO4J_kafka_reindex_batch_size: 1000
      NEO4J_kafka_connection_timeout_ms: 10000
      NEO4J_kafka_replication: 1
      NEO4J_kafka_linger_ms: 1
  

мой окончательный поток должен быть

1 [OK]. Отправьте сообщение в friends тему kafka с производителем Java

2 [OK]. Опубликуйте этот friends поток с neo4j помощью соединителя приемника neo4j, применяя это правило

   MERGE (p1:Person { name: event.initiated }) 
        MERGE (p2:Person { name: event.accepted }) 
        CREATE (p1)-[:FRIENDS { when: event.date }]->(p2)
  

3 [Не работает]. Поток обратно к kafka некоторым данным из neo4j запроса или прослушивание на каком-то узле
В моей конфигурации (только для целей тестирования) я прослушиваю узел типа Person{*}

 ...
  NEO4J_streams_source_topic_nodes_recommendations: Person{*}
...
  

Шаги 1 и 2 работают как ожидалось, шаг 3 завершается неудачей, я вижу эти журналы из neo4j

 klinks   | 2020-10-02 11:01:17.402 0000 DEBUG Trying to send a transaction event with txId 8 and txEventId 8 to kafka -> recommendations
klinks   | 2020-10-02 11:02:17.404 0000 DEBUG Sent record in partition null offset null data null key size null
  

я думаю, что я что-то упустил в конфигурации потока neo4j. Может кто-нибудь помочь мне понять.

Комментарии:

1. Я пытаюсь использовать полный стек docker-compose .. включая службы kafka, и он работает так, как ожидалось. Поэтому я думаю, что в этом случае ошибка конфигурации сети.

2. Кстати, в вашем вопросе не отображается шаг 2.

3. Это было просто отсутствующее `

4. отсутствует ‘ где????

5. В вопросе, рядом с пунктом 2, но это была просто ошибка форматирования, не связанная с самим вопросом.