Квс.Ввод-вывод в Apache Beam и учетные данные сеанса

#amazon-web-services #apache-beam #amazon-sqs #credentials #assume-role

Вопрос:

Я хотел бы получить доступ к AWS SQS с кратковременными учетными данными от Apache Beam Pipleline.

В AWS IAM я создал роль со следующими отношениями доверия:

 {
  "Effect": "Allow",
  "Principal": {
    "AWS": "arn:aws:sts::xxxxxx:assumed-role/gcp_role/gcp-project-session-name",
    "Service": "sqs.amazonaws.com"
  },
  "Action": "sts:AssumeRole"
},
 

С помощью этой роли я могу получить доступ к SQS с моего локального компьютера.
Я использовал AWS BasicSessionCredentials следующим образом:

   BasicSessionCredentials refreshedAWSCredentials = new BasicSessionCredentials(
            refreshedCredentials.getAccessKeyId(),
            refreshedCredentials.getSecretAccessKey(),
            refreshedCredentials.getSessionToken());

    AWSSecurityTokenService service = AWSSecurityTokenServiceClientBuilder.standard()
            .withCredentials(new AWSStaticCredentialsProvider(refreshedAWSCredentials))
            .withRegion(options.getAwsRegion()).build();
 

Я добавляю объект учетных данных в параметры конвейера:

      options.setAwsSessionToken(refreshedAWSCredentials.getSessionToken());
     options.setAwsCredentialsProvider(new AWSStaticCredentialsProvider(refreshedAWSCredentials));      
    return Pipeline.create(options); 
 

В конце я всегда сталкиваюсь со следующей ошибкой:

Вызвано: org.apache.beam.sdk.util.Исключение пользователя: com.amazonaws.services.sqs.модель.Исключение AmazonSQSException: Маркер безопасности, включенный в запрос, недействителен. (Служба: AmazonSQS; Код состояния: 403; Код ошибки: InvalidClientTokenId; Идентификатор запроса: 501e9869-ea58-5e80-9ec1-c1exxxx; Прокси: null

Я предполагаю, что поставщик AWSStaticCredentialsProvider не знает об AWS_SECRET_TOKEN. Вот почему я настраиваю STSAssumeRoleSessionCredentialsProvider, который должен работать с временными учетными данными

 STSAssumeRoleSessionCredentialsProvider stsSessionProvider = new STSAssumeRoleSessionCredentialsProvider
            .Builder(awsRoleArn, awsRoleSession)
            .withStsClient(service)
            .build();
 

This is the associated pipeline code

 p.apply(SqsIO.read().withQueueUrl(options.getSourceQueueUrl())
            .withMaxNumRecords(options.getNumberOfRecords()))
            .apply(ParDo.of(new SqsMessageToJson()))
            .apply(TextIO.write()
                    .to(options.getDestinationBucketUrl()   "/purchase_intent/")
                    .withSuffix(".json"));
 

Даже если я использовал вышеуказанного поставщика, который также работал локально, я получил исключение sam, показанное выше. Итак, мне интересно, как настроить SqsIO с временными учетными данными.