#amazon-web-services #amazon-redshift #boto3 #amazon-kinesis-firehose #localstack
Вопрос:
Я пытаюсь настроить два потока доставки Firehose: один для «хороших» проверенных образцов и один для «плохих» поврежденных образцов, которые не прошли проверку. Просто для практических целей я хочу отправить хорошие в Redshift, а плохие в S3. Они оба будут получать данные из потока данных Kinesis.
Я также пытаюсь использовать LocalStack для разработки всего этого, поэтому я не могу просто перейти на консоль AWS и создавать ресурсы через веб — интерфейс-вся настройка должна выполняться либо в сценариях оболочки с командами cli, либо на Python.
В настоящее время пожарный шланг с назначением S3, похоже, работает, по крайней Destinations
мере, файл describe-delivery-stream
не пуст, и в S3 list-objects
будет что-то в нем, когда данные будут отправлены в поток. Но не для Красного Смещения. При описании этого поля я получаю пустой список для Destinations
поля. Я надеюсь получить несколько советов о том, как мне следует отлаживать это.
Одна вещь, которая пришла мне в голову: я не использовал для этого группы безопасности, только роли IAM. Я предоставил всем доступ к Красному смещению. Должен ли доступ к кластерам красного смещения осуществляться через группы безопасности?
Кроме того, я использую строку подключения к БД Postgres вместо красного смещения ClusterJDBCURL
для локального тестирования, это будет проблемой?
Пример кода примерно того, как я настроил поток:
kinesis_client = aws_client('kinesis', endpoint_url=localstack_url)
firehose_client = aws_client('firehose', endpoint_url=localstack_url)
iam_client = aws_client('iam', endpoint_url=localstack_url)
s3_client = aws_client('s3', endpoint_url=localstack_url)
redshift_client = aws_client('redshift', endpoint_url=localstack_url)
# create the intermediate s3 bucket
s3_client.create_bucket(Bucket='interim', CreateBucketConfiguration='ca-central-1')
s3_arn = "arn:aws:s3:::interim"
# create IAM role for firehose
firehose_assume_role = {
'Version': '2012-10-17',
'Statement': [
{
'Sid': '',
'Effect': 'Allow',
'Principal': {
'Service': 'firehose.amazonaws.com'
},
'Action': 'sts:AssumeRole'
}
]
}
result = iam_client.create_role(RoleName=iam_role_name,
AssumeRolePolicyDocument=json.dumps(firehose_assume_role))
firehose_role_arn = result['Role']['Arn']
# add kinesis access for kinesis data stream as source
kinesis_access = {
"Version": "2012-10-17",
"Statement": [
{
"Sid": sid,
"Effect": "Allow",
"Action": [
"kinesis:DescribeStream",
"kinesis:GetShardIterator",
"kinesis:GetRecords"
],
"Resource": [
f"{stream_name}"
]
}
]
}
iam_client.put_role_policy(RoleName=iam_role_name,
PolicyName='firehose_from_kinesis,
PolicyDocument=json.dumps(kinesis_access))
# add s3 access to the role
s3_access = {
"Version": '2012-10-17',
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:AbortMultipartUpload",
"s3:GetBucketLocation",
"s3:GetObject",
"s3:ListBucket",
"s3:ListBucketMultipartUploads",
"s3:PutObject"
],
"Resource": [
f"{s3_bucket_arn}/*",
f"{s3_bucket_arn}"
]
}
]
}
iam_client.put_role_policy(RoleName=iam_role_name,
PolicyName='firehose_to_s3',
PolicyDocument=json.dumps(s3_access))
# Then I'm not sure if I need this, but I also added access to Redshift for the role
redshift_access = {
"Version": "2012-10-17",
"Statement": {
"Effect": "Allow",
"Action": "redshift:*",
"Resource": "*"
}
}
iam_client.put_role_policy(RoleName=iam_role_name,
PolicyName='firehose_to_redshift',
PolicyDocument=json.dumps(redshift_access))
# create the Redshift cluster
redshift_client.create_cluster(
DBName=db_name,
ClusterIdentifier=cluster_name,
NodeType=REDSHIFT['node-type'],
MasterUsername=REDSHIFT['username'],
MasterUserPassword=REDSHIFT['password'],
PubliclyAccessible=True, # TODO: set up VPC later?
IamRoles=(iam_role_arn,)
)
#### create the delivery stream
# set up config
source_stream_config = {
'KinesisStreamARN': firehose_src_stream_arn,
'RoleARN': iam_role_arn,
}
s3_config = {
'BucketARN': s3_bucket_arn,
'RoleARN': iam_role_arn,
'BufferingHints': {
'IntervalInSeconds': buffer_interval,
},
}
redshift_config = {
'RoleARN': iam_role_arn,
'ClusterJDBCURL': current_app.config['SQLALCHEMY_DATABASE_URI'], # TODO: might need to change later
'CopyCommand': {
'DataTableName': REDSHIFT['database']
},
'Username': REDSHIFT['username'],
'Password': REDSHIFT['password'],
'S3Configuration': s3_config
}
firehose_client.create_delivery_stream(
DeliveryStreamName=firehose_name,
DeliveryStreamType='KinesisStreamAsSource',
KinesisStreamSourceConfiguration=source_stream_config,
RedshiftDestinationConfiguration=redshift_config
)
# and code to wait till it's active...
Then when I run the program (it’s all part of a Flask app btw) along with LocalStack container running in the background, I see the streams, firehoses, s3 buckets and redshift cluster I created, but the firehose doesn’t seem to connect to the cluster or the interim s3.
Below is the CLI output ( awslocal
is just alias for aws --endpoint-url localhost:4566
)
awslocal firehose describe-delivery-stream --delivery-stream-name firehose_to_redshift
{
"DeliveryStreamDescription": {
"DeliveryStreamName": "firehose_to_redshift",
"DeliveryStreamARN": "arn:aws:firehose:ca-central-1:000000000000:deliverystream/firehose_to_redshift",
"DeliveryStreamStatus": "ACTIVE",
"DeliveryStreamType": "KinesisStreamAsSource",
"VersionId": "1",
"CreateTimestamp": 1633503444.4631286,
"Destinations": [],
"HasMoreDestinations": false
}
}
Notice how the Destinations
has an empty list of []
.
Whereas the firehose delivery stream I set up for S3 destination seems to work better:
awslocal firehose describe-delivery-stream --delivery-stream-name firehose_to_s3
{
"DeliveryStreamDescription": {
"DeliveryStreamName": "firehose_to_s3",
"DeliveryStreamARN": "arn:aws:firehose:ca-central-1:000000000000:deliverystream/firehose_to_s3",
"DeliveryStreamStatus": "ACTIVE",
"DeliveryStreamType": "KinesisStreamAsSource",
"VersionId": "1",
"CreateTimestamp": 1633503011.3702254,
"Destinations": [
{
"DestinationId": "19f4231c",
"S3DestinationDescription": {
"RoleARN": "arn:aws:iam::000000000000:role/kinesis_to_firehose_to_s3",
"BucketARN": "arn:aws:s3:::ingestion",
"BufferingHints": {
"IntervalInSeconds": 60
}
}
}
],
"HasMoreDestinations": false
}
}
For the S3 one I used:
firehose_client.create_delivery_stream(
DeliveryStreamName=firehose_name,
DeliveryStreamType='KinesisStreamAsSource',
KinesisStreamSourceConfiguration=source_stream_config,
ExtendedS3DestinationConfiguration=s3_config)
Я совсем новичок в AWS. Любая обратная связь/совет/предложение приветствуется!
Спасибо.