Проблемы с использованием потоков с Redis (Jedis) в Scala

#multithreading #scala #apache-spark #redis #jedis

Вопрос:

У меня есть программа Scala, использующая потоковую обработку (на самом деле Spark) и Redis (Jedis). Я определил object для своих операций Redis, где у меня есть a Lazy val для подключения. Мне нужно, чтобы каждый поток открывал соединение с Redis и работал с ним параллельно.
Объект подключения:

 object redisOp{
  lazy val r = new Jedis("127.0.0.1",6379,30)
  def find(u: Long): Option[Long] = Option(r.get(s"p$u")).flatMap(p => if (p.toLong == u) Some(u) else find(p.toLong))
  // and other functions
}
 

Когда я использую его с одним потоком, он работает хорошо. Но когда несколько потоков используют его, я получаю ошибки. Сначала я попал Unknown replay: 4 в каждый поток, где «4» — случайный символ ( redis.clients.jedis.exceptions.JedisConnectionException: Unknown reply: ).
Затем из redis-cli я попытался установить config set timeout 30000 и 30000 , как я также видел, redis.clients.jedis.exceptions.JedisConnectionException: Unexpected end of stream а иногда redis.clients.jedis.exceptions.JedisDataException: ERR Protocol error: invalid multibulk length и в журналах. И теперь в некоторых запусках(при переключении на 2 потока вместо 4) программа застревает на этапе навсегда без ошибок! Я проверил Spark-UI, чтобы проверить журнал исполнителей, но не могу найти ничего полезного: https://pastebin.com/iJMeBD0D

Я думаю, что проблема заключается в определении и использовании связи с Redis. Кроме того, пожалуйста, сообщите мне, как правильно закрыть соединение, если это необходимо.

Комментарии:

1. ИМХО, вместо обновления вопроса вам следует создать новый вопрос (только с обновленной частью).

Ответ №1:

Объект Jedis не является потокобезопасным. Вы должны использовать какой-то пул объектов/соединений в многопоточной среде. Для этой цели Jedis предоставляет JedisPool. Более подробную информацию можно найти в Jedis Wiki.

Основная идея состоит в том, чтобы получить объект Джедая JedisPool.getResource() и вернуть этот объект Jedis.close() .

Комментарии:

1. Спасибо. Таким образом, у меня будет экземпляр «пула» в моем синглтоне, затем для каждой функции я getResource() сначала выполняю работу, а затем .close ее, верно? Разве не дорого для каждого потока открывать/закрывать соединение при каждом вызове функции?

2. Соединения не будут открываться/закрываться при каждом вызове функции, а будут использоваться повторно. Если Jedis является частью JedisPool, за дверью, .close() на самом деле не закроет соединение, но объявит, что оно свободно для повторного использования. Он (.close) включен в этот процесс, чтобы воспользоваться преимуществами парадигмы Java «попробуй с ресурсами».

3. У меня все еще есть проблемы, когда мне нужно увеличить .setMaxIdle(8000) , а .setMaxTotal(8000) в противном случае программа зависает! Мне нужна помощь! теперь это моя связь: pastebin.com/Z8LKuXTF