#amazon-web-services #apache-beam #amazon-sqs #credentials #assume-role
Вопрос:
Я хотел бы получить доступ к AWS SQS с кратковременными учетными данными от Apache Beam Pipleline.
В AWS IAM я создал роль со следующими отношениями доверия:
{
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:sts::xxxxxx:assumed-role/gcp_role/gcp-project-session-name",
"Service": "sqs.amazonaws.com"
},
"Action": "sts:AssumeRole"
},
С помощью этой роли я могу получить доступ к SQS с моего локального компьютера.
Я использовал AWS BasicSessionCredentials следующим образом:
BasicSessionCredentials refreshedAWSCredentials = new BasicSessionCredentials(
refreshedCredentials.getAccessKeyId(),
refreshedCredentials.getSecretAccessKey(),
refreshedCredentials.getSessionToken());
AWSSecurityTokenService service = AWSSecurityTokenServiceClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(refreshedAWSCredentials))
.withRegion(options.getAwsRegion()).build();
Я добавляю объект учетных данных в параметры конвейера:
options.setAwsSessionToken(refreshedAWSCredentials.getSessionToken());
options.setAwsCredentialsProvider(new AWSStaticCredentialsProvider(refreshedAWSCredentials));
return Pipeline.create(options);
В конце я всегда сталкиваюсь со следующей ошибкой:
Вызвано: org.apache.beam.sdk.util.Исключение пользователя: com.amazonaws.services.sqs.модель.Исключение AmazonSQSException: Маркер безопасности, включенный в запрос, недействителен. (Служба: AmazonSQS; Код состояния: 403; Код ошибки: InvalidClientTokenId; Идентификатор запроса: 501e9869-ea58-5e80-9ec1-c1exxxx; Прокси: null
Я предполагаю, что поставщик AWSStaticCredentialsProvider не знает об AWS_SECRET_TOKEN. Вот почему я настраиваю STSAssumeRoleSessionCredentialsProvider, который должен работать с временными учетными данными
STSAssumeRoleSessionCredentialsProvider stsSessionProvider = new STSAssumeRoleSessionCredentialsProvider
.Builder(awsRoleArn, awsRoleSession)
.withStsClient(service)
.build();
This is the associated pipeline code
p.apply(SqsIO.read().withQueueUrl(options.getSourceQueueUrl())
.withMaxNumRecords(options.getNumberOfRecords()))
.apply(ParDo.of(new SqsMessageToJson()))
.apply(TextIO.write()
.to(options.getDestinationBucketUrl() "/purchase_intent/")
.withSuffix(".json"));
Даже если я использовал вышеуказанного поставщика, который также работал локально, я получил исключение sam, показанное выше. Итак, мне интересно, как настроить SqsIO с временными учетными данными.