Эмулятор PubSub — (поддержка публикации / получения сообщений в буфере Proto)

# #google-cloud-pubsub

Вопрос:

Я разрабатываю решение для использования общей библиотеки Proto Buffer для отправки и получения сообщений, используя прямую сериализацию proto buffer (ByteString) и десериализацию из a (ByteString) непосредственно в тот же класс Proto Buffer. Мое решение до сих пор не работает. Просто когда я использую настоящий PubSub.

На основе документа: локальное тестирование приложений с использованием информации об эмуляторе и более подробной информации в разделе «Знание ограничений«:

  • Эмулятор не обеспечивает поддержку схемы для буферов протокола.

Хотя я не использую какое-либо определение схемы в теме / подписке. Просто программно используя общую библиотеку proto buffer. Я боюсь, что существует ограничение эмуляции Pubsub, и по этой причине мое решение не работает с эмулятором.

Ниже моего тестового класса любые разъяснения, которые мы будем очень рады.

 package com.example.pubsubgcpspringapplications;


import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import com.alpian.common.pubsub.messages.OnfidoVerificationEvent;
import com.example.pubsubgcpspringapplications.config.PubSubTestConfig;
import com.example.pubsubgcpspringapplications.services.MessageRealGcpService;
import com.example.pubsubgcpspringapplications.util.DataGenerationUtils;
import com.google.api.core.ApiFuture;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import lombok.SneakyThrows;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;

//@ActiveProfiles("test")
public class EmulatorPubSubWithSpringTest {

  @BeforeAll
  static void startUpTests() throws IOException {
    PubSubTestConfig.setupPubSubEmulator();
  }

  @SneakyThrows
  @Test
  void successfulTest() throws InterruptedException {

    var status = DataGenerationUtils.STATUS_COMPLETE;
    var result = DataGenerationUtils.RESULT_CLEAR;
    var subResult = DataGenerationUtils.SUB_RESULT_CLEAR;

    var documentReport = DataGenerationUtils.generateOnfidoDocumentReport(status, result, subResult);
    var facialSimilarityReport = DataGenerationUtils
        .generateOnfidoFacialSimiliratyVideoReport(status, result, subResult);

    OnfidoVerificationEvent.Builder builder = OnfidoVerificationEvent.newBuilder();
    builder.setCheckId(DataGenerationUtils.FAKE_CHECK_ID);
    builder.setApplicantId(DataGenerationUtils.FAKE_APPLICANT_ID);
    builder.setDocument(documentReport);
    builder.setFacialSimilarityVideo(facialSimilarityReport);
    OnfidoVerificationEvent onfidoVerificationEvent = builder.build();

    publishProtoMessageTest(onfidoVerificationEvent);

    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          ByteString data = message.getData();

          // Get the schema encoding type.
          String encoding = message.getAttributesMap().get("googclient_schemaencoding");

          block:
          try {
            switch (encoding) {
              case "BINARY":
                // Obtain an object of the generated proto class.
                OnfidoVerificationEvent state = OnfidoVerificationEvent.parseFrom(data);
                System.out.println("Received a BINARY-formatted message: "   state);
                break;

              case "JSON":
                OnfidoVerificationEvent.Builder stateBuilder = OnfidoVerificationEvent.newBuilder();
                JsonFormat.parser().merge(data.toStringUtf8(), stateBuilder);
                System.out.println("Received a JSON-formatted message:"   stateBuilder.build());
                break;

              default:
                break block;
            }
          } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
          }

          consumer.ack();
          System.out.println("Ack'ed the message");
        };

    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(PubSubTestConfig.PROJECT_ID, PubSubTestConfig.SUBSCRIPTION_NAME);

    // Create subscriber client.
    Subscriber subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();

    try {
      subscriber.startAsync().awaitRunning();
      System.out.printf("Listening for messages on %s:n", subscriptionName);
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (TimeoutException timeoutException) {
      subscriber.stopAsync();
    }

    Thread.sleep(15000);

  }

  public static void publishProtoMessageTest(OnfidoVerificationEvent onfidoVerificationEvent)
      throws IOException, ExecutionException, InterruptedException {

    Publisher publisher = null;

    block:
    try {
      publisher = Publisher.newBuilder("projects/my-project-id/topics/topic-one").build();
      PubsubMessage.Builder message = PubsubMessage.newBuilder();
      // Prepare an appropriately formatted message based on topic encoding.
      message.setData(onfidoVerificationEvent.toByteString());
      System.out.println("Publishing a BINARY-formatted message:n"   message);

      // Publish the message.
      ApiFuture<String> future = publisher.publish(message.build());
      //System.out.println("Published message ID: "   future.get());

    } finally {
      if (publisher != null) {
        publisher.shutdown();
        publisher.awaitTermination(1, TimeUnit.MINUTES);
      }
    }
  }


}
 

Примечание: Пожалуйста, я просто скопировал некоторый сокращенный код из руководства Google и изменил его. Я не хочу использовать JSON, просто публиковать и получать сообщения с использованием файлов proto.

Заранее большое спасибо!

Комментарии:

1. Когда вы говорите, что ваше решение не работает, что вы имеете в виду? Вы получаете сообщение об ошибке?

2. Да, по какой-то причине я могу опубликовать «‘ Идентификатор опубликованного сообщения: 3372434705961298 «‘, но я не могу получить сообщение о том, что «‘ Прослушивание сообщений на втором: 2021-10-28 14:12:57.210 ОШИБКА 4570 — [bscriber-SE-2-1] c.g.c.p.v.StreamingSubscriberConnection : потоковая передача прекращена с исключением, вызванным: com.google.api.gax.rpc.NotFoundException: исключение ввода-вывода.grpc.StatusRuntimeException: NOT_FOUND: ресурс не найден (ресурс = sub-two). «Тем не менее, подписка была создана. Я знаю это, потому что я могу сделать это, используя в другом тесте. Но с использованием формата Json и PubSubTemplate.

3. Хотя в вашем сообщении об ошибке указано, что подписка может быть не создана, возможно, приложение обращается к неправильному экземпляру pub sub. добавьте следующую строку, также получите сведения об окружающей среде и распечатайте ее, чтобы проверить, находитесь ли вы в правильном экземпляре PubSub : string emulatorHostAndPort = Environment.GetEnvironmentVariable("PUBSUB_EMULATOR_HOST");

4. Все мои классы используют один и тот же ХОСТ. Я сделал то, что вы предложили. Спасибо!

Ответ №1:

РЕДАКТИРОВАТЬ: Лучшее разъяснение о симуляторе в комментариях и в другом опубликованном ответе.


Как вы указали, эмулятор PubSub в настоящее время не поддерживает сообщения use os protobuffer, и это то, что вы используете в своем коде (фрагменты из сообщений публикации / получения типа схемы protobuf), и в настоящее время оно не поддерживается. Вы можете попробовать использовать тип схемы Avro или открыть feature request в Google issue tracker для работы со схемами протобуферов в эмуляторе PubSub.

Комментарии:

1. Да, но мне все еще немного неудобно из-за этого. Потому что, когда Google говорит. «Поддержка схемы для буферов протокола» — это ограничение. Это означает, что эмулятор PubSub не позволяет определять схему при создании темы эмулятора или мы не можем протестировать сообщения Protobuf с помощью эмулятора PubSub? Потому что в моем случае я бы хотел просто отправлять и получать сообщения с помощью protobuf. Но я не могу сделать это с помощью Protobuf. Мой код просто работает с использованием реального PubSub. Есть ли у вас какие-либо подтверждения по этому поводу?

2. Как следует из фрагментов, швы как для создания фильтров с использованием схемы protobuf, так и для публикации / получения сообщений не поддерживаются. Использование эмулятора PubSub для тестирования производственных сред не рекомендуется. Лучший способ — создать для этого тестовый проект.

3. Ограничение, заключающееся в том, что схема не работает с буферами протокола в эмуляторе, применяется только к случаю, когда вы создаете схему и присоединяете ее к пабу / подтеме. Ничто не мешает вам фактически отправлять буфер сериализованного протокола в качестве данных в сообщении. Ограничение в эмуляторе связано с тем фактом, что анализатор .proto доступен только на C , в то время как эмулятор написан на Java.

4. Спасибо за разъяснения @KamalAboul-Hosn. Александр, пожалуйста, переместите или удалите согласие, если другое разъяснение поможет вам лучше.

Ответ №2:

Проблема «ресурс не найден» не будет иметь никакого отношения к Pub / Sub эмулятору, не поддерживающему схемы буфера протокола. Если бы вы попытались использовать буферы протокола неподдерживаемым способом (что означало бы создание Schema объекта, который используется PROTCOL_BUFFER как свой type ), то вы бы получили сообщение об ошибке, в частности, об отсутствии поддержки схем буферов протокола в эмуляторе.

Ваша проблема больше похожа на одну из следующих:

  1. Название подписки не совпадает с названием созданной вами подписки.
  2. На самом деле вы не создавали подписку в эмуляторе, а вместо этого создали ее в реальном пабе / вспомогательном сервисе.
  3. Вы не указали своему подписчику на эмулятор, установив PUBSUB_EMULATOR_HOST переменную среды.

Вы должны убедиться, что подписка существует в эмуляторе. Вы можете сделать это, запустив gcloud против него инструмент. Предположим, вы запустили свой эмулятор с помощью следующей команды:

 gcloud beta emulators pubsub start --project=my-test-project
 

Если при этом ваш эмулятор запускается на порту 8085, вы можете проверить, существует ли ваша подписка, выполнив:

 > CLOUDSDK_API_ENDPOINT_OVERRIDES_PUBSUB=http://localhost:8085/ gcloud --project my-test-topic pubsub subscriptions list
 

Если ваша подписка не существует при запуске этой команды, то это означает, что вы, вероятно, не создавали подписку в эмуляторе, а вместо этого создали ее в реальном сервисе. Если вы это видите, то, скорее всего, это означает, что ваш подписчик не отправляет запросы на эмулятор, а фактически отправляет запросы самой службе Pub / Sub.