Как вызвать batchWriteItemAsync для загрузки DynamoDB в scala/spark

#scala #apache-spark #amazon-dynamodb

Вопрос:

Я пытаюсь запустить batchwriteAsync для загрузки данных в DynamoDB с помощью Scala(Spark), но при запуске встречаю ошибку ниже, могу ли я спросить, как правильно обрабатывать batchwriteAsync в Scala?

Код

 import com.amazonaws.services.dynamodbv2.model._
import com.amazonaws.services.dynamodbv2.{AmazonDynamoDBAsyncClientBuilder, AmazonDynamoDBAsync, AmazonDynamoDBClientBuilder, AmazonDynamoDB}
import java.util.concurrent.Executors
import scala.concurrent.{ExecutionContext,Future}

val POOL_SIZE = 3
val client: AmazonDynamoDBAsync = AmazonDynamoDBAsyncClientBuilder.standard().withRegion(REGION).build()
// init threads pool
val jobExecutorPool = Executors.newFixedThreadPool(POOL_SIZE)
// create the implicit ExecutionContext based on our thread pool
implicit val xc: ExecutionContext = ExecutionContext.fromExecutorService(jobExecutorPool)

var contentSeq = ...
val batchWriteItems: java.util.List[WriteRequest] = contentSeq.asJava

def batchWriteFuture(tableName: String, batchWriteItems: java.util.List[WriteRequest])(implicit xc: ExecutionContext): Future[BatchWriteItemResult] = {
    client.batchWriteItemAsync(
      (new BatchWriteItemRequest()).withRequestItems(Map(tableName -> batchWriteItems).asJava)
    )
  }

batchWriteFuture(tableName, batchWriteItems)
 

Ошибка:

 error: type mismatch;
 found   : java.util.concurrent.Future[com.amazonaws.services.dynamodbv2.model.BatchWriteItemResult]
 required: scala.concurrent.Future[com.amazonaws.services.dynamodbv2.model.BatchWriteItemResult]
    client.batchWriteItemAsync(
 

Ответ №1:

client.batchWriteItemAsync является Java API, поэтому он возвращает java.util.concurrent.Future

Ваш метод batchWriteFuture имеет возвращаемый тип scala.concurrent.Future

Попробуйте преобразовать вызов DynamoDB в scala, например

 def batchWriteFuture(tableName: String, batchWriteItems: java.util.List[WriteRequest])(implicit xc: ExecutionContext): Future[BatchWriteItemResult] = {
    client.batchWriteItemAsync(
      (new BatchWriteItemRequest()).withRequestItems(Map(tableName -> batchWriteItems).asJava)
    ).asScala // <---------- converting to scala future
  }