AWS Kinesis NextShardIterator Никогда не получает значение Null

#amazon-web-services #.net-core #amazon-kinesis

#amazon-веб-сервисы #.net-ядро #amazon-kinesis

Вопрос:

Контекст: я пытаюсь получить записи из потока Kinesis со ссылками на API. Я использую .Net Core (версия 3.1).

Я записываю данные с помощью API в Kinesis Stream. Эта проблема не имеет никаких проблем. Но у меня есть некоторые проблемы с чтением данных. Я поместил метод GetRecord в цикл do-while. Условие while значение nextShardIterator равно null или нет? Но это значение никогда не становится нулевым. Я не могу разорвать цикл.

Некоторые ответы включают фразу: «NextShardIterator — Следующая позиция в сегменте, с которой можно начать последовательное чтение записей данных. Если установлено значение null, сегмент закрыт, и запрошенный итератор больше не возвращает никаких данных «.

У меня есть только 1 поток и 1 сегмент. Я добавляю 2 записи. После этого я выполняю метод чтения. Он получает эти записи. Но после этого nextShardIterator никогда не получает null, даже если записи израсходованы.

  • Список потоков (у меня только 1 поток)
  • Опишите поток и получите сегмент (у меня есть только 1 сегмент)
  • Получите ShardIterator с типом TRIM_HORIZON
  • Получить запись в цикле (в то время как{if nextShardIterator != null})
  • После нахождения записи я пытаюсь использовать итератор сегментов AFTER_SEQUENCE
  • Если он не может получить какую-либо запись, я пытаюсь использовать ПОСЛЕДНИЙ итератор сегментов

Код PutRecord такой:

 public async Task<ResponseModel> PutRecord(string orderFlow, string documentId)
        {
            byte[] bytedata = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(orderFlow));
            using MemoryStream memoryStream = new MemoryStream(bytedata);
            var putRecordRequest = new PutRecordRequest();
            putRecordRequest.StreamName = myStreamName;
            putRecordRequest.PartitionKey = "partition"   documentId;
            putRecordRequest.Data = memoryStream;
            try
            {
                var putRecordResponse = await kinesisClient.PutRecordAsync(putRecordRequest);
                return new ResponseModel
                {
                    Data = putRecordResponse,
                    Status = ResponseStatus.Success,
                    Message = "Successfully put record!"
                };
            }
            catch (Exception e)
            {
                return new ResponseModel
                {
                    Data = null,
                    Status = ResponseStatus.Error,
                    Message = "PutRecord Error: "   e.Message
                };
            }
        }
  

Но у меня есть некоторые проблемы с чтением данных. Код GetRecord такой:

 public async Task<ResponseModel> GetRecords()
        {
            var listStreams = await ListStreams();
            if (listStreams.StreamNames.Count == 0)
            {
                return new ResponseModel
                {
                    Data = null,
                    Status = ResponseStatus.Warning,
                    Message = "Do not have any Stream!"
                };
            }
            myStreamName = listStreams.StreamNames[0];

            var describeStreams = await DescribeStream(listStreams.StreamNames[0]);
            if (describeStreams.StreamDescription.StreamStatus != "ACTIVE")
            {
                return new ResponseModel
                {
                    Data = null,
                    Status = ResponseStatus.Warning,
                    Message = "Stream status: "   describeStreams.StreamDescription.StreamStatus
                };
            }

            var shards = describeStreams.StreamDescription.Shards;
            if (shards.Count == 0)
            {
                return new ResponseModel
                {
                    Data = null,
                    Status = ResponseStatus.Warning,
                    Message = "Do not have any Shard (or data)!"
                };
            }
            var shardId = shards[0].ShardId;

            var shardIterator = await GetShardIterator(shardId);
            if (string.IsNullOrWhiteSpace(shardIterator.ShardIterator))
            {
                return new ResponseModel
                {
                    Data = null,
                    Status = ResponseStatus.Warning,
                    Message = "ShardIterator is null or empty!"
                };
            }

            var getRecords = await GetRecords(shardIterator.ShardIterator);
            Console.WriteLine("First Iterator: "   shardIterator);
            var dataList = new List<Record>();

            do
            {
                if (getRecords.Records.Count == 0)
                {
                    Console.WriteLine("Records are empty!");
                    var nextShardIterator = GetShardLatest(shardId).Result.ShardIterator;
                    getRecords = await GetRecords(nextShardIterator);
                    Console.WriteLine("Latest Iterator: "   nextShardIterator);
                }
                else
                {
                    Console.WriteLine("We have records!");
                    foreach (var record in getRecords.Records)
                    {
                        dataList.Add(record);
                    }
                    var nextShardIterator = GetShardIteratorWithSequence(shardId, getRecords.Records[getRecords.Records.Count-1].SequenceNumber).Result.ShardIterator;
                    Console.WriteLine("AfterSequence Iterator: "   nextShardIterator);
                    getRecords = await GetRecords(nextShardIterator);
                }
            } while (getRecords.NextShardIterator != null);

            return new ResponseModel
            {
                Data = dataList,
                Status = ResponseStatus.Success,
                Message = "Successfull"
            };
        }
  

Ответ №1:

Поток Kinesis — это потенциально бесконечная последовательность записей, которые могут быть добавлены несколькими производителями в любое время. В результате итератор сегментов открытого потока никогда не будет иметь значения null.

Если вы хотите выйти из цикла, когда дойдете до «конца» потока, посмотрите на MillisBehindLatest поле в ответе GetRecords. Процитировать документацию:

Значение, равное нулю, указывает на то, что обработка записей завершена, и в данный момент нет новых записей для обработки.

Однако будьте осторожны, что новые записи могут быть добавлены в любое время. Если вы все-таки выйдете из цикла, обязательно сохраните возвращенные данные SequenceNumber из последней обработанной записи, чтобы продолжить с того места, где вы остановились.

Комментарии:

1. Большое спасибо! Миллисекунды решили мою проблему. Как вы говорите, я сохраню порядковый номер и получу записи с итератором сегментов типа AFTER_SEQUENCE_NUMBER.