Как транслировать все записи с Кассандры?

#scala #cassandra #akka

Вопрос:

Мне нужно транслировать все записи с Кассандры. В настоящее время я использую akka-persistence-cassandra для потоковой передачи данных:

 val querier =
        PersistenceQuery(system)
          .readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
      
      val selectDistinctPersistenceIds = new SimpleStatement(
      "SELECT DISTINCT persistence_id, partition_nr FROM messages")
        .setFetchSize(100000)

        querier.session.select(selectDistinctPersistenceIds).map { row =>
          val id = row.getString(0)
          id
        }
 

Это прекрасно работает, когда количество записей составляет около 1,5 миллионов. Но когда количество записей превышает > 1,5 миллиона записей, я получаю > read timeout ошибку.

Я использую:

 "com.typesafe.akka" %% "akka-persistence-cassandra" % "0.58"
"com.typesafe.akka" %% "akka-persistence" % "2.6.12"
"com.typesafe.akka" %% "akka-persistence-query" % "2.6.12"
 

Редактировать:
Журналы ошибок:

 com.datastax.driver.core.exceptions.OperationTimedOutException: [/<ip-address>:9042] Timed out waiting for server response", exceptionStackTrace="java.util.concurrent.ExecutionException: com.datastax.driver.core.exceptions.OperationTimedOutException: [/<ip-address>:9042] Timed out waiting for server response
    at com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:552)
    at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:513)
    at akka.persistence.cassandra.package$ListenableFutureConverter$anon$2.$anonfun$run$2(package.scala:25)
...
 

Ответ №1:

Проблема в том, что ваш сеанс драйвера отключен, настройте его в соответствии с вашими потребностями.

Может возникнуть проблема с таймаутом или увеличением количества повторных попыток. и настройка тайм-аута .

Комментарии:

1. К сожалению, я не могу изменить какие-либо настройки, так как не уверен, сколько данных содержится в Cassandra.

2. У вас есть с собой журналы регистрации. в чем ошибка, это даст некоторую подсказку, иначе отладить проблему невозможно

3. «Я не уверен, сколько данных есть в Кассандре». Это действительно то, что вы должны знать, прежде чем пытаться передать все это в потоковом режиме. @himanshuIIITian

4. Для оценки вышеприведенный код завершается неудачей для записей размером ~8 метров. Следовательно, данные составляют >8 млн записей.

5. здесь это явно указывает на исключение OperationTimedOutException, поэтому вам придется увеличить настройку тайм-аута

Ответ №2:

Я смог устранить эту проблему, установив более высокое значение cassandra-journal.socket.read-timeout-millis , чем значение по умолчанию 12000 мс.

 cassandra-journal {
  ...

  socket {
    # the per-host read timeout in milliseconds. Should be higher than the timeout settings
    # used on the Cassandra side.
    read-timeout-millis = 30000
}