Как выполнить массовую вставку в Cosmos DB с помощью .NET Core 2.1 и Stream API

#c# #azure-cosmosdb #bulkinsert #asp.net-core-2.1 #azure-cosmosdb-sqlapi

#c# #azure-cosmosdb #bulkinsert #asp.net-core-2.1 #azure-cosmosdb-sqlapi

Вопрос:

Я пытаюсь реализовать массовую вставку с этим примером CosmosDB. Этот пример создан с использованием .NET Core 3.* и поддержкой System.Text.Json.

При использовании метода CreateItemAsync он работает отлично:

     var concurrentTasks = new List<Task<ItemResponse<Notification>>>();
    foreach (var entity in entities)
    {
        entity.Id = GenerateId(entity);

        var requestOptions = new ItemRequestOptions();
        requestOptions.EnableContentResponseOnWrite = false; // We don't need to get the entire body returend.
        concurrentTasks.Add(Container.CreateItemAsync(entity, new PartitionKey(entity.UserId), requestOptions));
    }

    await Task.WhenAll(concurrentTasks);
  

Тем не менее, я пытаюсь выяснить, могу ли я уменьшить количество RU путем потоковой передачи данных непосредственно в CosmosDB, надеясь, что CosmosDB не взимает с меня плату за десериализацию самого JSON.

Я работаю в .NET Core 2.1 и Newtonsoft.Json. Это мой код, который не возвращает код успешного состояния. Код дополнительного статуса в заголовке ответа равен «0».

     Notification[] notifications = entities.ToArray();
    var itemsToInsert = new Dictionary<PartitionKey, Stream>();

    foreach (var notification in notifications)
    {
        MemoryStream ms = new MemoryStream();
        StreamWriter writer = new StreamWriter(ms);
        JsonTextWriter jsonWriter = new JsonTextWriter(writer);
        JsonSerializer ser = new JsonSerializer();
                
        ser.Serialize(jsonWriter, notification);

        await jsonWriter.FlushAsync();
        await writer.FlushAsync();

        itemsToInsert.Add(new PartitionKey(notification.UserId), ms);
    }

    List<Task> tasks = new List<Task>(notifications.Length);
    foreach (KeyValuePair<PartitionKey, Stream> item in itemsToInsert)
    {
        tasks.Add(Container.CreateItemStreamAsync(item.Value, item.Key)
            .ContinueWith((Task<ResponseMessage> task) =>
            {
                using (ResponseMessage response = task.Result)
                {
                    if (!response.IsSuccessStatusCode)
                    {
                        Console.WriteLine($"Received {response.StatusCode} ({response.ErrorMessage}).");
                    }
                    else
                    {
                    }
                }
            }));
    }

    // Wait until all are done
    await Task.WhenAll(tasks);
  

ответ.Код состояния: неверный запрос
ответ.Сообщение об ошибке: null

Я предполагаю, что я неправильно сериализую в поток. У кого-нибудь есть подсказка?

Обновить

Я обнаружил, что новый System.Text.Пакет Json также реализует .NET Standard 2.0, поэтому я установил его из NuGet. Теперь я могу скопировать пример кода с Github, упомянутый ранее.

         Notification[] notifications = entities.ToArray();
        var itemsToInsert = new List<Tuple<PartitionKey, Stream>>();

        foreach (var notification in notifications)
        {
            notification.id = $"{notification.UserId}:{Guid.NewGuid()}";

            MemoryStream stream = new MemoryStream();
            await JsonSerializer.SerializeAsync(stream, notification);

            itemsToInsert.Add(new Tuple<PartitionKey, Stream>(new PartitionKey(notification.RoleId), stream));
        }

        List<Task> tasks = new List<Task>(notifications.Length);
        foreach (var item in itemsToInsert)
        {
            tasks.Add(Container.CreateItemStreamAsync(item.Item2, item.Item1)
                .ContinueWith((Task<ResponseMessage> task) =>
                {
                    using (ResponseMessage response = task.Result)
                    {
                        if (!response.IsSuccessStatusCode)
                        {
                            Console.WriteLine($"Received {response.StatusCode} ({response.ErrorMessage}).");
                        }
                        else
                        {
                        }
                    }
                }));
        }

        // Wait until all are done
        await Task.WhenAll(tasks);
  

Я дважды проверил, что BulkInsert включен (иначе первый метод также не будет работать). Все еще есть неверный запрос и значение NULL для сообщения об ошибке.

Я также проверил, что данные не добавляются в контейнер из-за неправильного запроса.

Комментарии:

1. Ключ уже в базе данных?

2. Возможно, стоит установить нулевую позицию потока памяти перед его добавлением. ms.Position = 0;

3. @jdweng спасибо, ключи являются идентификаторами GUID, поэтому они всегда (глобально) уникальны.

4. Если вы дважды запустите код в существующей базе данных, вы дважды введете одни и те же значения в базу данных.

5. @jdweng Я знаю, поэтому идентификатор генерируется следующим образом: $»{уведомление. Идентификатор пользователя}:{Guid. NewGuid()}»; Каждый раз генерируется новый идентификатор GUID. Чтобы быть уверенным, я очистил базу данных от документов для этого ключа раздела.

Ответ №1:

Я нашел проблему.

Я настроил свой контекст Cosmos со следующими параметрами:

 var cosmosSerializationOptions = new CosmosSerializationOptions();
cosmosSerializationOptions.PropertyNamingPolicy = CosmosPropertyNamingPolicy.CamelCase;

CosmosClientOptions cosmosClientOptions = new CosmosClientOptions();
cosmosClientOptions.SerializerOptions = cosmosSerializationOptions;
  

Отсюда CamelCase соглашение. В моем первом (рабочем) примере кода я бы позволил десериализовать контекст CosmosDB в JSON. Он будет сериализован с помощью этого соглашения camelCase, поэтому мой разделительный ключ UserId будет сериализован в userId .

Однако, чтобы уменьшить некоторые RU, я буду использовать CreateItemStreamAsync , что делает меня ответственным за сериализацию. И произошла ошибка, мое свойство было определено как:

 public int UserId { get; set; }
  

Таким образом, он был бы сериализован в json UserId: 1 .

Однако ключ раздела определяется как /userId . Итак, если я добавлю атрибут JsonPropertyName, это сработает:

 [JsonPropertyName("userId")]
public int UserId { get; set; } 
  

… если бы только сообщение об ошибке сообщило мне об этом.

При использовании этого CreateItemStream метода экономия составляет около 3%. Однако, я думаю, со временем это постепенно сэкономило бы некоторые RU в общей сложности.

Ответ №2:

Похоже, что поток не читается. Отсюда и плохой запрос. Я бы внес небольшие изменения в то, как MemoryStream создается:

 foreach (var notification in notifications)
    {
        
        itemsToInsert.Add(new PartitionKey(notification.UserId), new MemoryStream(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(notification))));
    }
  

Конечно, я использую Newtonsoft.json для JsonConvert.

Комментарии:

1. Спасибо! Я попробовал ваше предложение, но все еще получаю неверные запросы. Но с этим ответом я, по крайней мере, узнал более читаемый / компактный способ сериализации в поток с помощью Newtonsoft.

2. Позвольте мне повторить пример еще раз.

3. Смотрите мой ответ / решение ниже. Ваше предложение также работает, если мои атрибуты сериализации верны