Пустые пункты назначения — Как правильно настроить поток доставки Firehose на красное смещение с помощью boto3

#amazon-web-services #amazon-redshift #boto3 #amazon-kinesis-firehose #localstack

Вопрос:

Я пытаюсь настроить два потока доставки Firehose: один для «хороших» проверенных образцов и один для «плохих» поврежденных образцов, которые не прошли проверку. Просто для практических целей я хочу отправить хорошие в Redshift, а плохие в S3. Они оба будут получать данные из потока данных Kinesis.

Я также пытаюсь использовать LocalStack для разработки всего этого, поэтому я не могу просто перейти на консоль AWS и создавать ресурсы через веб — интерфейс-вся настройка должна выполняться либо в сценариях оболочки с командами cli, либо на Python.

В настоящее время пожарный шланг с назначением S3, похоже, работает, по крайней Destinations мере, файл describe-delivery-stream не пуст, и в S3 list-objects будет что-то в нем, когда данные будут отправлены в поток. Но не для Красного Смещения. При описании этого поля я получаю пустой список для Destinations поля. Я надеюсь получить несколько советов о том, как мне следует отлаживать это.

Одна вещь, которая пришла мне в голову: я не использовал для этого группы безопасности, только роли IAM. Я предоставил всем доступ к Красному смещению. Должен ли доступ к кластерам красного смещения осуществляться через группы безопасности?

Кроме того, я использую строку подключения к БД Postgres вместо красного смещения ClusterJDBCURL для локального тестирования, это будет проблемой?

Пример кода примерно того, как я настроил поток:

 kinesis_client = aws_client('kinesis', endpoint_url=localstack_url)
firehose_client = aws_client('firehose', endpoint_url=localstack_url)
iam_client = aws_client('iam', endpoint_url=localstack_url)
s3_client = aws_client('s3', endpoint_url=localstack_url)
redshift_client = aws_client('redshift', endpoint_url=localstack_url)

# create the intermediate s3 bucket
s3_client.create_bucket(Bucket='interim', CreateBucketConfiguration='ca-central-1')
s3_arn = "arn:aws:s3:::interim"

# create IAM role for firehose
firehose_assume_role = {
        'Version': '2012-10-17',
        'Statement': [
            {
                'Sid': '',
                'Effect': 'Allow',
                'Principal': {
                    'Service': 'firehose.amazonaws.com'
                },
                'Action': 'sts:AssumeRole'
            }
        ]
    }

result = iam_client.create_role(RoleName=iam_role_name,
                                AssumeRolePolicyDocument=json.dumps(firehose_assume_role))
firehose_role_arn = result['Role']['Arn']

# add kinesis access for kinesis data stream as source
kinesis_access = {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": sid,
                "Effect": "Allow",
                "Action": [
                    "kinesis:DescribeStream",
                    "kinesis:GetShardIterator",
                    "kinesis:GetRecords"
                ],
                "Resource": [
                    f"{stream_name}"
                ]
            }
        ]
    }
iam_client.put_role_policy(RoleName=iam_role_name,
                           PolicyName='firehose_from_kinesis,
                           PolicyDocument=json.dumps(kinesis_access))


# add s3 access to the role 
s3_access = {
        "Version": '2012-10-17',
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "s3:AbortMultipartUpload",
                    "s3:GetBucketLocation",
                    "s3:GetObject",
                    "s3:ListBucket",
                    "s3:ListBucketMultipartUploads",
                    "s3:PutObject"
                ],
                "Resource": [
                    f"{s3_bucket_arn}/*",
                    f"{s3_bucket_arn}"
                ]
            }
        ]
}

iam_client.put_role_policy(RoleName=iam_role_name,
                           PolicyName='firehose_to_s3',
                           PolicyDocument=json.dumps(s3_access))

# Then I'm not sure if I need this, but I also added access to Redshift for the role
redshift_access = {
        "Version": "2012-10-17",
        "Statement": {
            "Effect": "Allow",
            "Action": "redshift:*",
            "Resource": "*"
        }
}

iam_client.put_role_policy(RoleName=iam_role_name,
                           PolicyName='firehose_to_redshift',
                           PolicyDocument=json.dumps(redshift_access))

# create the Redshift cluster
redshift_client.create_cluster(
            DBName=db_name,
            ClusterIdentifier=cluster_name,
            NodeType=REDSHIFT['node-type'],
            MasterUsername=REDSHIFT['username'],
            MasterUserPassword=REDSHIFT['password'],
            PubliclyAccessible=True,  # TODO: set up VPC later?
            IamRoles=(iam_role_arn,)
        )

#### create the delivery stream

# set up config
source_stream_config = {
        'KinesisStreamARN': firehose_src_stream_arn,
        'RoleARN': iam_role_arn,
}

s3_config = {
        'BucketARN': s3_bucket_arn,
        'RoleARN': iam_role_arn,
        'BufferingHints': {
            'IntervalInSeconds': buffer_interval,
        },
}

redshift_config = {
                'RoleARN': iam_role_arn,
                'ClusterJDBCURL': current_app.config['SQLALCHEMY_DATABASE_URI'],  # TODO: might need to change later
                'CopyCommand': {
                    'DataTableName': REDSHIFT['database']
                },
                'Username': REDSHIFT['username'],
                'Password': REDSHIFT['password'],
                'S3Configuration': s3_config
}

firehose_client.create_delivery_stream(
                DeliveryStreamName=firehose_name,
                DeliveryStreamType='KinesisStreamAsSource',
                KinesisStreamSourceConfiguration=source_stream_config,
                RedshiftDestinationConfiguration=redshift_config
)

# and code to wait till it's active...

 

Then when I run the program (it’s all part of a Flask app btw) along with LocalStack container running in the background, I see the streams, firehoses, s3 buckets and redshift cluster I created, but the firehose doesn’t seem to connect to the cluster or the interim s3.
Below is the CLI output ( awslocal is just alias for aws --endpoint-url localhost:4566 )

 awslocal firehose describe-delivery-stream --delivery-stream-name firehose_to_redshift
{
    "DeliveryStreamDescription": {
        "DeliveryStreamName": "firehose_to_redshift",
        "DeliveryStreamARN": "arn:aws:firehose:ca-central-1:000000000000:deliverystream/firehose_to_redshift",
        "DeliveryStreamStatus": "ACTIVE",
        "DeliveryStreamType": "KinesisStreamAsSource",
        "VersionId": "1",
        "CreateTimestamp": 1633503444.4631286,
        "Destinations": [],
        "HasMoreDestinations": false
    }
}
 

Notice how the Destinations has an empty list of [] .

Whereas the firehose delivery stream I set up for S3 destination seems to work better:

 awslocal firehose describe-delivery-stream --delivery-stream-name firehose_to_s3
{
    "DeliveryStreamDescription": {
        "DeliveryStreamName": "firehose_to_s3",
        "DeliveryStreamARN": "arn:aws:firehose:ca-central-1:000000000000:deliverystream/firehose_to_s3",
        "DeliveryStreamStatus": "ACTIVE",
        "DeliveryStreamType": "KinesisStreamAsSource",
        "VersionId": "1",
        "CreateTimestamp": 1633503011.3702254,
        "Destinations": [
            {
                "DestinationId": "19f4231c",
                "S3DestinationDescription": {
                    "RoleARN": "arn:aws:iam::000000000000:role/kinesis_to_firehose_to_s3",
                    "BucketARN": "arn:aws:s3:::ingestion",
                    "BufferingHints": {
                        "IntervalInSeconds": 60
                    }
                }
            }
        ],
        "HasMoreDestinations": false
    }
}
 

For the S3 one I used:

 firehose_client.create_delivery_stream(
                DeliveryStreamName=firehose_name,
                DeliveryStreamType='KinesisStreamAsSource',
                KinesisStreamSourceConfiguration=source_stream_config,
                ExtendedS3DestinationConfiguration=s3_config)
 

Я совсем новичок в AWS. Любая обратная связь/совет/предложение приветствуется!

Спасибо.