Какое правильное ограничение следует использовать для разбитого DoFn, считывающего неограниченную итерацию?

# #java #mongodb #google-cloud-dataflow #apache-beam

Вопрос:

Я пишу разделяемый DoFn для чтения потока изменений MongoDB. Это позволяет мне наблюдать события, описывающие изменения в коллекции, и я могу начать чтение с произвольной метки времени кластера, которую я хочу, при условии, что у oplog достаточно истории. Метки времени кластера-это секунды, прошедшие с эпохи, в сочетании с серийным номером операции за данную секунду.

Я просмотрел другие примеры SDF, но все, что я видел до сих пор, предполагает «доступный» источник данных (раздел Кафки, файл Parquet/Avro и т. Д.).

Интерфейс, предоставляемый MongoDB, является простым итерируемым, поэтому я не могу seek получить точное смещение (кроме получения нового итерируемого, начинающегося после метки времени), и события, создаваемые им, имеют только метки времени кластера — опять же, нет точного смещения, связанного с выходным элементом.

Для настройки SDF я использую следующий класс в качестве типа элемента ввода:

   public static class StreamConfig implements Serializable {
    public final String databaseName;
    public final String collectionName;
    public final Instant startFrom;

  ...
  }
 

В качестве ограничения я использую OffsetRange , так как я могу конвертировать эти временные метки в длинные значения и обратно.
Для отслеживания смещения я выбрал растущий Offsetrangetracker, так как он может обрабатывать потенциально бесконечный диапазон.

У меня возникли проблемы с оценкой конечного диапазона — в конце концов, я предположил now() , что это будет максимальная потенциальная временная метка, поскольку самое быстрое, что мы можем прочитать поток,-это в реальном времени.

   @GetInitialRestriction
  public OffsetRange getInitialRestriction(@Element StreamConfig element) {
    final int fromEpochSecond =
        (int) (Optional.ofNullable(element.startFrom).orElse(Instant.now()).getMillis() / 1000);
    final BsonTimestamp bsonTimestamp = new BsonTimestamp(fromEpochSecond, 0);
    return new OffsetRange(bsonTimestamp.getValue(), Long.MAX_VALUE);
  }

  @NewTracker
  public GrowableOffsetRangeTracker newTracker(@Restriction OffsetRange restriction) {
    return new GrowableOffsetRangeTracker(restriction.getFrom(), new MongoChangeStreamEstimator());
  }

  public static class MongoChangeStreamEstimator implements RangeEndEstimator {
    @Override
    public long estimate() {
      // estimating the range to current timestamp since we're reading them in real-time
      return new BsonTimestamp((int) (Instant.now().getMillis() / 1000L), Integer.MAX_VALUE)
          .getValue();
    }
  }

 

Есть ли лучший выбор типа ограничения в такой ситуации — бесконечный поток элементов
с метками времени, но без назначенного смещения?

Кроме того, эта реализация, похоже, потребляет много процессора при запуске на DirectRunner — tryClaim возвращает false , что, похоже, открывает множество новых итераторов.

Есть ли способ сказать Beam, чтобы он не разделял ограничение или не распараллеливал эту операцию менее агрессивно?

 
@ProcessElement
  public ProcessContinuation process(
      @Element StreamConfig element,
      RestrictionTracker<OffsetRange, Long> tracker,
      OutputReceiver<ChangeStreamDocument<BsonDocument>> outputReceiver) {
    final BsonTimestamp restrictionStart =
        new BsonTimestamp(tracker.currentRestriction().getFrom());

    final MongoCollection<BsonDocument> collection = getCollection(element);

    final ChangeStreamIterable<BsonDocument> iterable =
        collection.watch().startAtOperationTime(restrictionStart);
    final long restrictionEnd = tracker.currentRestriction().getTo();

    try {

      final MongoCursor<ChangeStreamDocument<BsonDocument>> iterator = iterable.iterator();
      while (iterator.hasNext()) {
        ChangeStreamDocument<BsonDocument> changeStreamDocument = iterator.next();
        final BsonTimestamp clusterTime = changeStreamDocument.getClusterTime();
        final long clusterTimeValue = clusterTime.getValue();

        if (clusterTimeValue >= restrictionEnd) {
          LOGGER.warn(
              "breaking out: "   clusterTimeValue   " outside restriction "   restrictionEnd);
          break;
        }

        if (!tracker.tryClaim(clusterTimeValue)) {
          LOGGER.warn("failed to claim "   clusterTimeValue);
          iterator.close();
          return ProcessContinuation.stop();
        }

        final int epochSecondsClusterTs = clusterTime.getTime();

        outputReceiver.outputWithTimestamp(
            changeStreamDocument, Instant.ofEpochSecond(epochSecondsClusterTs));
      }
    } catch (MongoNodeIsRecoveringException | MongoChangeStreamException | MongoSocketException e) {

      LOGGER.warn("Failed to open change stream, retrying", e);
      return ProcessContinuation.resume().withResumeDelay(Duration.standardSeconds(10L));
    }

    return ProcessContinuation.resume();
  }
 

Комментарии:

1. Обратите внимание, что clusterTimeValue >= restrictionEnd условие является избыточным при повторном требовании.

Ответ №1:

Использование метки времени в качестве смещения-это прекрасная вещь для использования в качестве ограничения, если вы можете гарантировать, что сможете прочитать все элементы до заданной метки времени. (Приведенный выше цикл предполагает, что итератор выдает элементы в порядке временных меток, в частности, что, как только вы увидите временную метку за пределами диапазона, вы можете выйти из цикла и не беспокоиться о более ранних элементах в более поздних частях итератора.)

Что касается того, почему tryClaim так часто терпит неудачу, это, вероятно, связано с тем, что прямой бегун делает довольно агрессивное расщепление: https://github.com/apache/beam/blob/release-2.33.0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java#L178

Комментарии:

1.Похоже, что элементы будут поступать в порядке временных меток в соответствии с документами MongoDB: потоки изменений обеспечивают общий порядок изменений по сегментам за счет использования глобальных логических часов. MongoDB гарантирует, что порядок изменений сохраняется, и уведомления о потоке изменений могут быть безопасно интерпретированы в полученном порядке. Например, курсор потока изменений, открытый в кластере с 3 сегментами, возвращает уведомления об изменениях с учетом общего порядка этих изменений во всех трех shards.docs.mongodb.com/manual/administration /…

2. Будет ли иметь смысл предоставлять мой собственный DoFn.splitrestrictionв этом случае использования? Если я правильно понимаю, я мог бы затем построить свою собственную логику для контрольной точки, скажем, каждые 1 минуту (на основе метки времени кластера).

3. DoFn.Разделение влияет только на начальные ограничения.

4. Однако вы могли бы заставить большие фрагменты округлять временные метки до более низкой степени детализации. например, вы могли бы требовать только временные метки, округленные до ближайших 10, 60 или любых других секунд.