#scala #apache-spark #amazon-dynamodb
Вопрос:
Я пытаюсь запустить batchwriteAsync для загрузки данных в DynamoDB с помощью Scala(Spark), но при запуске встречаю ошибку ниже, могу ли я спросить, как правильно обрабатывать batchwriteAsync в Scala?
Код
import com.amazonaws.services.dynamodbv2.model._
import com.amazonaws.services.dynamodbv2.{AmazonDynamoDBAsyncClientBuilder, AmazonDynamoDBAsync, AmazonDynamoDBClientBuilder, AmazonDynamoDB}
import java.util.concurrent.Executors
import scala.concurrent.{ExecutionContext,Future}
val POOL_SIZE = 3
val client: AmazonDynamoDBAsync = AmazonDynamoDBAsyncClientBuilder.standard().withRegion(REGION).build()
// init threads pool
val jobExecutorPool = Executors.newFixedThreadPool(POOL_SIZE)
// create the implicit ExecutionContext based on our thread pool
implicit val xc: ExecutionContext = ExecutionContext.fromExecutorService(jobExecutorPool)
var contentSeq = ...
val batchWriteItems: java.util.List[WriteRequest] = contentSeq.asJava
def batchWriteFuture(tableName: String, batchWriteItems: java.util.List[WriteRequest])(implicit xc: ExecutionContext): Future[BatchWriteItemResult] = {
client.batchWriteItemAsync(
(new BatchWriteItemRequest()).withRequestItems(Map(tableName -> batchWriteItems).asJava)
)
}
batchWriteFuture(tableName, batchWriteItems)
Ошибка:
error: type mismatch;
found : java.util.concurrent.Future[com.amazonaws.services.dynamodbv2.model.BatchWriteItemResult]
required: scala.concurrent.Future[com.amazonaws.services.dynamodbv2.model.BatchWriteItemResult]
client.batchWriteItemAsync(
Ответ №1:
client.batchWriteItemAsync
является Java API, поэтому он возвращает java.util.concurrent.Future
Ваш метод batchWriteFuture
имеет возвращаемый тип scala.concurrent.Future
Попробуйте преобразовать вызов DynamoDB в scala, например
def batchWriteFuture(tableName: String, batchWriteItems: java.util.List[WriteRequest])(implicit xc: ExecutionContext): Future[BatchWriteItemResult] = {
client.batchWriteItemAsync(
(new BatchWriteItemRequest()).withRequestItems(Map(tableName -> batchWriteItems).asJava)
).asScala // <---------- converting to scala future
}