#scala #cassandra #akka
Вопрос:
Мне нужно транслировать все записи с Кассандры. В настоящее время я использую akka-persistence-cassandra
для потоковой передачи данных:
val querier =
PersistenceQuery(system)
.readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
val selectDistinctPersistenceIds = new SimpleStatement(
"SELECT DISTINCT persistence_id, partition_nr FROM messages")
.setFetchSize(100000)
querier.session.select(selectDistinctPersistenceIds).map { row =>
val id = row.getString(0)
id
}
Это прекрасно работает, когда количество записей составляет около 1,5 миллионов. Но когда количество записей превышает > 1,5 миллиона записей, я получаю > read timeout
ошибку.
Я использую:
"com.typesafe.akka" %% "akka-persistence-cassandra" % "0.58"
"com.typesafe.akka" %% "akka-persistence" % "2.6.12"
"com.typesafe.akka" %% "akka-persistence-query" % "2.6.12"
Редактировать:
Журналы ошибок:
com.datastax.driver.core.exceptions.OperationTimedOutException: [/<ip-address>:9042] Timed out waiting for server response", exceptionStackTrace="java.util.concurrent.ExecutionException: com.datastax.driver.core.exceptions.OperationTimedOutException: [/<ip-address>:9042] Timed out waiting for server response
at com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:552)
at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:513)
at akka.persistence.cassandra.package$ListenableFutureConverter$anon$2.$anonfun$run$2(package.scala:25)
...
Ответ №1:
Проблема в том, что ваш сеанс драйвера отключен, настройте его в соответствии с вашими потребностями.
Может возникнуть проблема с таймаутом или увеличением количества повторных попыток. и настройка тайм-аута .
Комментарии:
1. К сожалению, я не могу изменить какие-либо настройки, так как не уверен, сколько данных содержится в Cassandra.
2. У вас есть с собой журналы регистрации. в чем ошибка, это даст некоторую подсказку, иначе отладить проблему невозможно
3. «Я не уверен, сколько данных есть в Кассандре». Это действительно то, что вы должны знать, прежде чем пытаться передать все это в потоковом режиме. @himanshuIIITian
4. Для оценки вышеприведенный код завершается неудачей для записей размером ~8 метров. Следовательно, данные составляют >8 млн записей.
5. здесь это явно указывает на исключение OperationTimedOutException, поэтому вам придется увеличить настройку тайм-аута
Ответ №2:
Я смог устранить эту проблему, установив более высокое значение cassandra-journal.socket.read-timeout-millis
, чем значение по умолчанию 12000 мс.
cassandra-journal {
...
socket {
# the per-host read timeout in milliseconds. Should be higher than the timeout settings
# used on the Cassandra side.
read-timeout-millis = 30000
}