Постоянный субъект AKKA не получает командных сообщений в функции приема команд

#scala #jdbc #akka #akka-persistence

#scala #jdbc #akka #akka-постоянство

Вопрос:

Я пытаюсь использовать сохранение AKKA вместе с плагином JDBC. Однако, экспериментируя с постоянными участниками, я сталкиваюсь с проблемой, заключающейся в том, что мой постоянный участник не получает сообщения, которые он должен получать.

Вот мой файл application.conf, который я использую для плагина JDBC.

 akka {
  loglevel = DEBUG

  actor {
    #provider = cluster
  }

  persistence {
    journal.plugin = "jdbc-journal"
    snapshot-store.plugin = "jdbc-snapshot-store"
  }
}

  jdbc-journal {
  slick = ${slick}
}

# the akka-persistence-snapshot-store in use
jdbc-snapshot-store {
  slick = ${slick}
}

  jdbc-read-journal {
  slick = ${slick}
}

  slick {
  profile = "slick.jdbc.MySQLProfile$"
  dataSourceClass = "slick.jdbc.DriverDataSource"
  db {
  dataSourceClass = "slick.jdbc.DriverDataSource"
  driver = "com.mysql.cj.jdbc.Driver"
  url = "jdbc:mysql://localhost:3306/db_persistence"
  user = "root"
  password = ""
  numThreads = 5
  maxConnections = 5
  minConnections = 1
}
}
 

Это код моего постоянного субъекта (основанный на коде документации AKKA)

 case class Cmd(data: String)
case class Evt(data: String)

case class ExampleState(events: List[String] = Nil) {
  def updated(evt: Evt): ExampleState = copy(evt.data :: events)
  def size: Int = events.length
  override def toString: String = events.reverse.toString
}

class ExampleActor extends PersistentActor {
  override def persistenceId = "sample-id-1"

   var state = ExampleState()

  def updateState(event: Evt): Unit = {
    state = state.updated(event)
  }

  def numEvents =
    state.size

  override def receiveRecover: Receive = {
    case evt: Evt                                 => updateState(evt)
    case SnapshotOffer(_, snapshot: ExampleState) => state = snapshot
  }

  val snapShotInterval = 1000

  val receiveCommand: Receive = {
    case Cmd(data) => {
      println("in the command code block")
      persist(Evt(s"${data}-${numEvents}")) { event => {
        updateState(event)
        context.system.eventStream.publish(event)
        if (lastSequenceNr % snapShotInterval == 0 amp;amp; lastSequenceNr != 0)
          saveSnapshot(state)
      }
      }
    }
    case "print"=>println(state)
  }
}
 

наконец, мой тест, в котором я нахожу проблему.

 "The persistent actor" should {
    "Receive Command" in {
      val persistentActor = system.actorOf(Props[ExampleActor](),"persistentActorOne")
      Thread.sleep(2000)
      println("before the send")
      persistentActor ! Cmd("foo")
      persistentActor ! Cmd("bar")
      persistentActor ! Cmd("fizz")
      persistentActor ! Cmd("buzz")
      persistentActor ! "print"

      Thread.sleep(10000)
      println("after messages should be sent and received")
    }
  }
}
 

Кто-нибудь может понять причину, по которой мой постоянный участник не получает мои сообщения / команды?

Заранее спасибо!

Комментарии:

1. Можете ли вы попробовать override def receiveCommand: Receive = ... вместо val receiveCommand: Receive = ... ?

2. К сожалению, это дает тот же результат

3.Используя плагин сохранения памяти, этот код работает и печатает: before the send in the command code block in the command code block in the command code block in the command code block List(foo-0, bar-1, fizz-2, buzz-3) after messages should be sent and received

4. Да, однако, при использовании плагина jdbc я вообще не получаю сообщений, поэтому я не вижу, где я допустил ошибку.

5. Ну, вы сказали, что не получили никаких полученных команд, поэтому я предположил, что вы не получили никаких выходных данных от своих команд печати. но теперь, похоже, вы говорите, что да, но ваш плагин сохранения не работает? Что вы получаете на выходе? Что-нибудь? Вы пытались запустить его не из test, а из «main» и посмотреть, работает ли это так?