#amazon-web-services #aws-lambda #serverless-framework #aws-msk
Вопрос:
Я пытаюсь настроить кластер Amazon MSK и подключиться к нему с помощью лямбда-функции. Функция lambda будет производителем сообщений, а не потребителем.
Я использую бессерверную платформу для обеспечения всего и в моем бессерверном.yml Я добавил следующее, и это, кажется, работает нормально.
MSK:
Type: AWS::MSK::Cluster
Properties:
ClusterName: kafkaOne
KafkaVersion: 2.2.1
NumberOfBrokerNodes: 3
BrokerNodeGroupInfo:
InstanceType: kafka.t3.small
ClientSubnets:
- Ref: PrivateSubnet1
- Ref: PrivateSubnet2
- Ref: PrivateSubnet3
Но при попытке подключиться к этому кластеру для фактической отправки сообщений я не уверен, как получить строку подключения здесь? Я предполагаю, что это должна быть строка zookeeperconnect?
Я новичок в кафке/msk, так что, возможно, я не вижу чего-то очевидного.
Любой совет очень ценен. Ваше здоровье.
Ответ №1:
Я не знаю, какую базу кода вы используете, поэтому я добавлю свой код, который написал в GO.
По сути, вы должны подключиться к кластеру MSK так же, как вы подключились бы к какому-либо отдельному экземпляру Кафки. Мы используем брокеров для «подключения» или, лучше сказать, записи в кластер MSK.
Я использую библиотеку segmentio/kafka-go. Моя функция отправки события в кластер MSK выглядит следующим образом
// Add event
func addEvent(ctx context.Context, requestBody RequestBodyType) (bool, error) {
// Prepare dialer
dialer := amp;kafka.Dialer{
Timeout: 2 * time.Second,
DualStack: true,
}
brokers := []string{os.Getenv("KAFKA_BROKER_1"), os.Getenv("KAFKA_BROKER_2"), os.Getenv("KAFKA_BROKER_3"), os.Getenv("KAFKA_BROKER_4")}
// Prepare writer config
kafkaConfig := kafka.WriterConfig{
Brokers: brokers,
Topic: os.Getenv("KAFKA_TOPIC"),
Balancer: amp;kafka.Hash{},
Dialer: dialer,
}
// Prepare writer
w := kafka.NewWriter(kafkaConfig)
// Convert struct to json string
event, err := json.Marshal(requestBody)
if err != nil {
fmt.Println("Convert struct to json for writing to KAFKA failed")
panic(err)
}
// Write message
writeError := w.WriteMessages(ctx,
kafka.Message{
Key: []byte(requestBody.Event),
Value: []byte(event),
},
)
if writeError != nil {
fmt.Println("ERROR WRITING EVENT TO KAFKA")
panic("could not write message " err.Error())
}
return true, nil
}
Мой бессерверный.yml
Верхний код (addEvent) принадлежит функциям -> postEvent в файле serverless.yml… Если вы потребляете из кафки, то вам следует проверить функции -> Событие процесса. Потреблять событие довольно просто, но настраивать все для создания Кафки-это безумие. Мы, вероятно, работаем над этим в течение полутора месяцев и все еще выясняем, как все должно быть настроено. К сожалению, бессерверный не все делает за вас, поэтому вам придется «нажимать» вручную в AWS, но мы сравнили его с другими фреймворками, и бессерверный все еще лучший прямо сейчас
provider:
name: aws
runtime: go1.x
stage: dev
profile: ${env:AWS_PROFILE}
region: ${env:REGION}
apiName: my-app-${sls:stage}
lambdaHashingVersion: 20201221
environment:
ENV: ${env:ENV}
KAFKA_TOPIC: ${env:KAFKA_TOPIC}
KAFKA_BROKER_1: ${env:KAFKA_BROKER_1}
KAFKA_BROKER_2: ${env:KAFKA_BROKER_2}
KAFKA_BROKER_3: ${env:KAFKA_BROKER_3}
KAFKA_BROKER_4: ${env:KAFKA_BROKER_4}
KAFKA_ARN: ${env:KAFKA_ARN}
ACCESS_CONTROL_ORIGINS: ${env:ACCESS_CONTROL_ORIGINS}
ACCESS_CONTROL_HEADERS: ${env:ACCESS_CONTROL_HEADERS}
ACCESS_CONTROL_METHODS: ${env:ACCESS_CONTROL_METHODS}
BATCH_SIZE: ${env:BATCH_SIZE}
SLACK_API_TOKEN: ${env:SLACK_API_TOKEN}
SLACK_CHANNEL_ID: ${env:SLACK_CHANNEL_ID}
httpApi:
cors: true
apiGateway:
resourcePolicy:
- Effect: Allow
Action: '*'
Resource: '*'
Principal: '*'
vpc:
securityGroupIds:
- sg-*********
subnetIds:
- subnet-******
- subnet-*******
functions:
postEvent:
handler: bin/postEvent
package:
patterns:
- bin/postEvent
events:
- http:
path: event
method: post
cors:
origin: ${env:ACCESS_CONTROL_ORIGINS}
headers:
- Content-Type
- Content-Length
- Accept-Encoding
- Origin
- Referer
- Authorization
- X-CSRF-Token
- X-Amz-Date
- X-Api-Key
- X-Amz-Security-Token
- X-Amz-User-Agent
allowCredentials: false
methods:
- OPTIONS
- POST
processEvent:
handler: bin/processEvent
package:
patterns:
- bin/processEvent
events:
- msk:
arn: ${env:KAFKA_ARN}
topic: ${env:KAFKA_TOPIC}
batchSize: ${env:BATCH_SIZE}
startingPosition: LATEST
resources:
Resources:
GatewayResponseDefault4XX:
Type: 'AWS::ApiGateway::GatewayResponse'
Properties:
ResponseParameters:
gatewayresponse.header.Access-Control-Allow-Origin: "'*'"
gatewayresponse.header.Access-Control-Allow-Headers: "'*'"
ResponseType: DEFAULT_4XX
RestApiId:
Ref: 'ApiGatewayRestApi'
myDefaultRole:
Type: AWS::IAM::Role
Properties:
Path: /
RoleName: my-app-dev-eu-serverless-lambdaRole-${sls:stage} # required if you want to use 'serverless deploy --function' later on
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service:
- lambda.amazonaws.com
Action: sts:AssumeRole
# note that these rights are needed if you want your function to be able to communicate with resources within your vpc
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole
- arn:aws:iam::aws:policy/service-role/AWSLambdaMSKExecutionRole
Я должен предупредить вас, что мы тратим много времени на выяснение того, как правильно настроить VPC и другие сетевые / разрешительные функции. Мой коллаж напишет пост в блоге, как только он вернется из отпуска. 🙂 Я надеюсь, что это вам как-то поможет. Желаю удачи 😉
Обновить
Если вы используете javascript, то вы бы подключились к Кафке примерно так
const { Kafka } = require('kafkajs')
const kafka = new Kafka({
clientId: 'order-app',
brokers: [
'broker1:port',
'broker2:port',
],
ssl: true, // false
})