#scala #jdbc #akka #akka-persistence
#scala #jdbc #akka #akka-постоянство
Вопрос:
Я пытаюсь использовать сохранение AKKA вместе с плагином JDBC. Однако, экспериментируя с постоянными участниками, я сталкиваюсь с проблемой, заключающейся в том, что мой постоянный участник не получает сообщения, которые он должен получать.
Вот мой файл application.conf, который я использую для плагина JDBC.
akka {
loglevel = DEBUG
actor {
#provider = cluster
}
persistence {
journal.plugin = "jdbc-journal"
snapshot-store.plugin = "jdbc-snapshot-store"
}
}
jdbc-journal {
slick = ${slick}
}
# the akka-persistence-snapshot-store in use
jdbc-snapshot-store {
slick = ${slick}
}
jdbc-read-journal {
slick = ${slick}
}
slick {
profile = "slick.jdbc.MySQLProfile$"
dataSourceClass = "slick.jdbc.DriverDataSource"
db {
dataSourceClass = "slick.jdbc.DriverDataSource"
driver = "com.mysql.cj.jdbc.Driver"
url = "jdbc:mysql://localhost:3306/db_persistence"
user = "root"
password = ""
numThreads = 5
maxConnections = 5
minConnections = 1
}
}
Это код моего постоянного субъекта (основанный на коде документации AKKA)
case class Cmd(data: String)
case class Evt(data: String)
case class ExampleState(events: List[String] = Nil) {
def updated(evt: Evt): ExampleState = copy(evt.data :: events)
def size: Int = events.length
override def toString: String = events.reverse.toString
}
class ExampleActor extends PersistentActor {
override def persistenceId = "sample-id-1"
var state = ExampleState()
def updateState(event: Evt): Unit = {
state = state.updated(event)
}
def numEvents =
state.size
override def receiveRecover: Receive = {
case evt: Evt => updateState(evt)
case SnapshotOffer(_, snapshot: ExampleState) => state = snapshot
}
val snapShotInterval = 1000
val receiveCommand: Receive = {
case Cmd(data) => {
println("in the command code block")
persist(Evt(s"${data}-${numEvents}")) { event => {
updateState(event)
context.system.eventStream.publish(event)
if (lastSequenceNr % snapShotInterval == 0 amp;amp; lastSequenceNr != 0)
saveSnapshot(state)
}
}
}
case "print"=>println(state)
}
}
наконец, мой тест, в котором я нахожу проблему.
"The persistent actor" should {
"Receive Command" in {
val persistentActor = system.actorOf(Props[ExampleActor](),"persistentActorOne")
Thread.sleep(2000)
println("before the send")
persistentActor ! Cmd("foo")
persistentActor ! Cmd("bar")
persistentActor ! Cmd("fizz")
persistentActor ! Cmd("buzz")
persistentActor ! "print"
Thread.sleep(10000)
println("after messages should be sent and received")
}
}
}
Кто-нибудь может понять причину, по которой мой постоянный участник не получает мои сообщения / команды?
Заранее спасибо!
Комментарии:
1. Можете ли вы попробовать
override def receiveCommand: Receive = ...
вместоval receiveCommand: Receive = ...
?2. К сожалению, это дает тот же результат
3.Используя плагин сохранения памяти, этот код работает и печатает:
before the send
in the command code block
in the command code block
in the command code block
in the command code block
List(foo-0, bar-1, fizz-2, buzz-3)
after messages should be sent and received
4. Да, однако, при использовании плагина jdbc я вообще не получаю сообщений, поэтому я не вижу, где я допустил ошибку.
5. Ну, вы сказали, что не получили никаких полученных команд, поэтому я предположил, что вы не получили никаких выходных данных от своих команд печати. но теперь, похоже, вы говорите, что да, но ваш плагин сохранения не работает? Что вы получаете на выходе? Что-нибудь? Вы пытались запустить его не из test, а из «main» и посмотреть, работает ли это так?