Подключение к кластеру Amazon MSK

#amazon-web-services #aws-lambda #serverless-framework #aws-msk

Вопрос:

Я пытаюсь настроить кластер Amazon MSK и подключиться к нему с помощью лямбда-функции. Функция lambda будет производителем сообщений, а не потребителем.

Я использую бессерверную платформу для обеспечения всего и в моем бессерверном.yml Я добавил следующее, и это, кажется, работает нормально.

     MSK:
      Type: AWS::MSK::Cluster
      Properties:
        ClusterName: kafkaOne
        KafkaVersion: 2.2.1
        NumberOfBrokerNodes: 3
        BrokerNodeGroupInfo:
          InstanceType: kafka.t3.small
          ClientSubnets:
            - Ref: PrivateSubnet1
            - Ref: PrivateSubnet2
            - Ref: PrivateSubnet3
 

Но при попытке подключиться к этому кластеру для фактической отправки сообщений я не уверен, как получить строку подключения здесь? Я предполагаю, что это должна быть строка zookeeperconnect?
Я новичок в кафке/msk, так что, возможно, я не вижу чего-то очевидного.

Любой совет очень ценен. Ваше здоровье.

Ответ №1:

Я не знаю, какую базу кода вы используете, поэтому я добавлю свой код, который написал в GO.

По сути, вы должны подключиться к кластеру MSK так же, как вы подключились бы к какому-либо отдельному экземпляру Кафки. Мы используем брокеров для «подключения» или, лучше сказать, записи в кластер MSK.

Я использую библиотеку segmentio/kafka-go. Моя функция отправки события в кластер MSK выглядит следующим образом

 // Add event
func addEvent(ctx context.Context, requestBody RequestBodyType) (bool, error) {

    // Prepare dialer
    dialer := amp;kafka.Dialer{
        Timeout:   2 * time.Second,
        DualStack: true,
    }

    brokers := []string{os.Getenv("KAFKA_BROKER_1"), os.Getenv("KAFKA_BROKER_2"), os.Getenv("KAFKA_BROKER_3"), os.Getenv("KAFKA_BROKER_4")}


    // Prepare writer config
    kafkaConfig := kafka.WriterConfig{
        Brokers:  brokers,
        Topic:    os.Getenv("KAFKA_TOPIC"),
        Balancer: amp;kafka.Hash{},
        Dialer:   dialer,
    }

    // Prepare writer
    w := kafka.NewWriter(kafkaConfig)


    // Convert struct to json string
    event, err := json.Marshal(requestBody)
    if err != nil {
        fmt.Println("Convert struct to json for writing to KAFKA failed")
        panic(err)
    }

    // Write message
    writeError := w.WriteMessages(ctx,
        kafka.Message{
            Key:   []byte(requestBody.Event),
            Value: []byte(event),
        },
    )
    if writeError != nil {
        fmt.Println("ERROR WRITING EVENT TO KAFKA")
        panic("could not write message "   err.Error())
    }

    return true, nil
}
 

Мой бессерверный.yml

Верхний код (addEvent) принадлежит функциям -> postEvent в файле serverless.yml… Если вы потребляете из кафки, то вам следует проверить функции -> Событие процесса. Потреблять событие довольно просто, но настраивать все для создания Кафки-это безумие. Мы, вероятно, работаем над этим в течение полутора месяцев и все еще выясняем, как все должно быть настроено. К сожалению, бессерверный не все делает за вас, поэтому вам придется «нажимать» вручную в AWS, но мы сравнили его с другими фреймворками, и бессерверный все еще лучший прямо сейчас

 provider:
  name: aws
  runtime: go1.x
  stage: dev
  profile: ${env:AWS_PROFILE}
  region: ${env:REGION}
  apiName: my-app-${sls:stage}
  lambdaHashingVersion: 20201221
  environment:
    ENV: ${env:ENV}
    KAFKA_TOPIC: ${env:KAFKA_TOPIC}
    KAFKA_BROKER_1: ${env:KAFKA_BROKER_1}
    KAFKA_BROKER_2: ${env:KAFKA_BROKER_2}
    KAFKA_BROKER_3: ${env:KAFKA_BROKER_3}
    KAFKA_BROKER_4: ${env:KAFKA_BROKER_4}
    KAFKA_ARN: ${env:KAFKA_ARN}
    ACCESS_CONTROL_ORIGINS: ${env:ACCESS_CONTROL_ORIGINS}
    ACCESS_CONTROL_HEADERS: ${env:ACCESS_CONTROL_HEADERS}
    ACCESS_CONTROL_METHODS: ${env:ACCESS_CONTROL_METHODS}
    BATCH_SIZE: ${env:BATCH_SIZE}
    SLACK_API_TOKEN: ${env:SLACK_API_TOKEN}
    SLACK_CHANNEL_ID: ${env:SLACK_CHANNEL_ID}
  httpApi:
    cors: true
  apiGateway:
    resourcePolicy:
      - Effect: Allow
        Action: '*'
        Resource: '*'
        Principal: '*'
  vpc:
    securityGroupIds:
      - sg-*********
    subnetIds:
      - subnet-******
      - subnet-*******

functions:
  postEvent:
    handler: bin/postEvent
    package:
      patterns:
        - bin/postEvent
    events:
      - http:
          path: event
          method: post
          cors:
            origin: ${env:ACCESS_CONTROL_ORIGINS}
            headers:
              - Content-Type
              - Content-Length
              - Accept-Encoding
              - Origin
              - Referer
              - Authorization
              - X-CSRF-Token
              - X-Amz-Date
              - X-Api-Key
              - X-Amz-Security-Token
              - X-Amz-User-Agent
            allowCredentials: false
            methods:
              - OPTIONS
              - POST
  processEvent:
    handler: bin/processEvent
    package:
      patterns:
        - bin/processEvent
    events:
      - msk:
          arn: ${env:KAFKA_ARN}
          topic: ${env:KAFKA_TOPIC}
          batchSize: ${env:BATCH_SIZE}
          startingPosition: LATEST
resources:
  Resources:
    GatewayResponseDefault4XX:
      Type: 'AWS::ApiGateway::GatewayResponse'
      Properties:
        ResponseParameters:
          gatewayresponse.header.Access-Control-Allow-Origin: "'*'"
          gatewayresponse.header.Access-Control-Allow-Headers: "'*'"
        ResponseType: DEFAULT_4XX
        RestApiId:
          Ref: 'ApiGatewayRestApi'
    myDefaultRole:
      Type: AWS::IAM::Role
      Properties:
        Path: /
        RoleName: my-app-dev-eu-serverless-lambdaRole-${sls:stage} # required if you want to use 'serverless deploy --function' later on
        AssumeRolePolicyDocument:
          Version: '2012-10-17'
          Statement:
            - Effect: Allow
              Principal:
                Service:
                  - lambda.amazonaws.com
              Action: sts:AssumeRole
        # note that these rights are needed if you want your function to be able to communicate with resources within your vpc
        ManagedPolicyArns:
          - arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole
          - arn:aws:iam::aws:policy/service-role/AWSLambdaMSKExecutionRole
 

Я должен предупредить вас, что мы тратим много времени на выяснение того, как правильно настроить VPC и другие сетевые / разрешительные функции. Мой коллаж напишет пост в блоге, как только он вернется из отпуска. 🙂 Я надеюсь, что это вам как-то поможет. Желаю удачи 😉

Обновить

Если вы используете javascript, то вы бы подключились к Кафке примерно так

 const { Kafka } = require('kafkajs')

const kafka = new Kafka({
  clientId: 'order-app',
  brokers: [
    'broker1:port',
    'broker2:port',
  ],
  ssl: true, // false
})