Недокументированное Ограничение? публикация в теме *из* триггера pubsub

# #google-cloud-pubsub

Вопрос:

Я не знаю, схожу ли я с ума, или это ограничение, которое просто не задокументировано (я просмотрел документы GCP API):

Возможно ли создать облачную функцию с триггером pubsub в «теме A» и внутри этой облачной функции опубликовать сообщение в «теме B».

Я перепробовал все другие триггеры с одинаковым запущенным кодом (облачные функции, такие как триггеры HTTP, триггеры облачного хранилища, триггеры Firebase), и все они успешно публикуются в разделах. Но в тот момент, когда я (почти буквально) копирую свой код в триггер pubsub, после использования сообщения, когда оно пытается опубликовать свое собственное сообщение в следующей теме, оно просто зависает. Функция просто отключается при попытке публикации.

Итак, резюмируя, возможно ли следующее в GCP?

Тема PubSub A —> >Облачная функция —> Pubsub >Тема B

Заранее спасибо за любые разъяснения! Все это в Java 11. Вот код:

 ...<bunch of imports>

public class SignedURLGenerator implements BackgroundFunction<PubSubMessage> {
  private static final String PROJECT_ID = System.getenv("GOOGLE_CLOUD_PROJECT");
  private static final Logger logger = Logger.getLogger(SignedURLGenerator.class.getName());

  /**
  * Handle the incoming PubsubMessage
  **/
 @Override
  public void accept(PubSubMessage message, Context context) throws IOException, InterruptedException {
    String data = new String(Base64.getDecoder().decode(message.data));
    System.out.println("The input message is: "   data.toString());

    //Do a bunch of other stuff not relevant to the issue at hand...

    publishSignedURL(url.toString());
  }

  //Here's the interesting part
  public static void publishSignedURL(String message) throws IOException, InterruptedException {
    String topicName = "url-ready-notifier";
    String responseMessage;
    Publisher publisher = null;
    

    try {
      // Create the PubsubMessage object
      ByteString byteStr = ByteString.copyFrom(message, StandardCharsets.UTF_8);
      PubsubMessage pubsubApiMessage = PubsubMessage.newBuilder().setData(byteStr).build();
      System.out.println("Message Constructed:"   message); 
      //This part works fine, the message gets constructed

      publisher = Publisher.newBuilder(ProjectTopicName.of(PROJECT_ID, topicName)).build();
      System.out.println("Publisher Created.");
      //This part also works fine, the publisher gets created

      publisher.publish(pubsubApiMessage).get();
      responseMessage = "Message published.";
      //The code NEVER GETS HERE.  The message is never published.  And eventually the cloud function time's out :(    

    } catch (InterruptedException | ExecutionException e) {
        System.out.println("Something went wrong with publishing: "   e.getMessage());
      }

    System.out.println("Everything wrapped up.");

  }
 

Редактировать
Как и было запрошено, это мой текущий ПОМ

     <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
      <groupId>cloudfunctions</groupId>
      <artifactId>pubsub-function</artifactId>
      <version>1.0-SNAPSHOT</version>
    
      <properties>
        <maven.compiler.target>11</maven.compiler.target>
        <maven.compiler.source>11</maven.compiler.source>
      </properties>
    
      <dependencies>
        <dependency>
          <groupId>com.google.cloud</groupId>
          <artifactId>libraries-bom</artifactId>
          <version>20.6.0</version>
          <type>pom</type>
          <scope>import</scope>
      </dependency>
        <dependency>
          <groupId>com.google.cloud.functions</groupId>
          <artifactId>functions-framework-api</artifactId>
          <version>1.0.1</version>
          <type>jar</type>
        </dependency>
        <dependency>
          <groupId>com.google.cloud</groupId>
          <artifactId>google-cloud-storage</artifactId>
          <version>1.117.1</version>
        </dependency>
        <dependency>
         <groupId>com.google.cloud</groupId>
          <artifactId>google-cloud-pubsub</artifactId>
          <version>1.113.4</version>
        </dependency>
        <dependency>
          <groupId>com.google.api</groupId>
          <artifactId>gax</artifactId>
          <version>1.66.0</version>
        </dependency>
        <dependency>
          <groupId>com.google.api</groupId>
          <artifactId>gax-grpc</artifactId>
          <version>1.66.0</version>
        </dependency>
        <dependency>
          <groupId>org.threeten</groupId>
          <artifactId>threetenbp</artifactId>
          <version>0.7.2</version>
        </dependency>    
      </dependencies>
    </project>

 

Ответ №1:

Можете ли вы попытаться явно задать параметр управления потоком в клиенте издателя? как это

        publisher = Publisher.newBuilder(ProjectTopicName.of(PROJECT_ID, topicName)).setBatchingSettings(BatchingSettings.newBuilder()
                .setDelayThreshold(Duration.of(10, ChronoUnit.SECONDS))
                .setElementCountThreshold(1L)
                .setIsEnabled(true)
                .build()).build();
 

Я не знаю, что происходит, может быть, стандартная и глобальная конфигурация PubSub. Если это не так, я удалю этот ответ.


ПРАВКА 1

Здесь снимок экрана класса builder в родительском классе издателя

введите описание изображения здесь

У вас есть все значения библиотеки по умолчанию. Однако поведение, которое вы наблюдаете, ненормально. Значение по умолчанию должно оставаться значением по умолчанию, даже если вы находитесь в триггере PubSub. Я открою проблему и направлю ее непосредственно команде.

Комментарии:

1. Эй! это сработало! это потрясающе. Но также и очень сбивает с толку. На ум приходят два вопроса: 1. Зачем этому конкретному коду понадобились параметры управления потоком? почему ни один из других триггеров не нуждался в этом для публикации? 2. Каковы размеры пакетов GCP по умолчанию? Прежде чем опубликовать этот комментарий, я просмотрел несколько документов GCP, чтобы посмотреть значения по умолчанию, но все, что говорит Google, — это «есть значения по умолчанию», а не то, что по умолчанию… есть какие-нибудь идеи?

2. Я поделился снимком с экрана. Класс интересен для понимания базового механизма, но то, что вы видите в облачных функциях, ненормально!

3. Для получения информации: issuetracker.google.com/issues/193263948

4. Спасибо, Гийом, я буду следить за этим вопросом и, надеюсь, он будет решен. Кроме того, чтение javadoc для класса издателя [ googleapis.dev/java/gax/последняя версия/com/google/api/gax/пакетирование/… показывает, что размер пакета по умолчанию равен 1.

5. Можете ли вы поделиться своей версией зависимостей function framework и PubSub (или своей полной pom.xml файл). Кроме того, можете ли вы попробовать использовать последнюю версию всех библиотек? И на основе этого примера: github.com/GoogleCloudPlatform/java-docs-samples/tree/…