#multithreading #scala #apache-spark #redis #jedis
Вопрос:
У меня есть программа Scala, использующая потоковую обработку (на самом деле Spark) и Redis (Jedis). Я определил object
для своих операций Redis, где у меня есть a Lazy val
для подключения. Мне нужно, чтобы каждый поток открывал соединение с Redis и работал с ним параллельно.
Объект подключения:
object redisOp{
lazy val r = new Jedis("127.0.0.1",6379,30)
def find(u: Long): Option[Long] = Option(r.get(s"p$u")).flatMap(p => if (p.toLong == u) Some(u) else find(p.toLong))
// and other functions
}
Когда я использую его с одним потоком, он работает хорошо. Но когда несколько потоков используют его, я получаю ошибки. Сначала я попал Unknown replay: 4
в каждый поток, где «4» — случайный символ ( redis.clients.jedis.exceptions.JedisConnectionException: Unknown reply:
).
Затем из redis-cli
я попытался установить config set timeout 30000
и 30000
, как я также видел, redis.clients.jedis.exceptions.JedisConnectionException: Unexpected end of stream
а иногда redis.clients.jedis.exceptions.JedisDataException: ERR Protocol error: invalid multibulk length
и в журналах. И теперь в некоторых запусках(при переключении на 2 потока вместо 4) программа застревает на этапе навсегда без ошибок! Я проверил Spark-UI, чтобы проверить журнал исполнителей, но не могу найти ничего полезного: https://pastebin.com/iJMeBD0D
Я думаю, что проблема заключается в определении и использовании связи с Redis. Кроме того, пожалуйста, сообщите мне, как правильно закрыть соединение, если это необходимо.
Комментарии:
1. ИМХО, вместо обновления вопроса вам следует создать новый вопрос (только с обновленной частью).
Ответ №1:
Объект Jedis не является потокобезопасным. Вы должны использовать какой-то пул объектов/соединений в многопоточной среде. Для этой цели Jedis предоставляет JedisPool. Более подробную информацию можно найти в Jedis Wiki.
Основная идея состоит в том, чтобы получить объект Джедая JedisPool.getResource()
и вернуть этот объект Jedis.close()
.
Комментарии:
1. Спасибо. Таким образом, у меня будет экземпляр «пула» в моем синглтоне, затем для каждой функции я
getResource()
сначала выполняю работу, а затем.close
ее, верно? Разве не дорого для каждого потока открывать/закрывать соединение при каждом вызове функции?2. Соединения не будут открываться/закрываться при каждом вызове функции, а будут использоваться повторно. Если Jedis является частью JedisPool, за дверью,
.close()
на самом деле не закроет соединение, но объявит, что оно свободно для повторного использования. Он (.close) включен в этот процесс, чтобы воспользоваться преимуществами парадигмы Java «попробуй с ресурсами».3. У меня все еще есть проблемы, когда мне нужно увеличить
.setMaxIdle(8000)
, а.setMaxTotal(8000)
в противном случае программа зависает! Мне нужна помощь! теперь это моя связь: pastebin.com/Z8LKuXTF