#c# #azure-cosmosdb #bulkinsert #asp.net-core-2.1 #azure-cosmosdb-sqlapi
#c# #azure-cosmosdb #bulkinsert #asp.net-core-2.1 #azure-cosmosdb-sqlapi
Вопрос:
Я пытаюсь реализовать массовую вставку с этим примером CosmosDB. Этот пример создан с использованием .NET Core 3.* и поддержкой System.Text.Json.
При использовании метода CreateItemAsync он работает отлично:
var concurrentTasks = new List<Task<ItemResponse<Notification>>>();
foreach (var entity in entities)
{
entity.Id = GenerateId(entity);
var requestOptions = new ItemRequestOptions();
requestOptions.EnableContentResponseOnWrite = false; // We don't need to get the entire body returend.
concurrentTasks.Add(Container.CreateItemAsync(entity, new PartitionKey(entity.UserId), requestOptions));
}
await Task.WhenAll(concurrentTasks);
Тем не менее, я пытаюсь выяснить, могу ли я уменьшить количество RU путем потоковой передачи данных непосредственно в CosmosDB, надеясь, что CosmosDB не взимает с меня плату за десериализацию самого JSON.
Я работаю в .NET Core 2.1 и Newtonsoft.Json. Это мой код, который не возвращает код успешного состояния. Код дополнительного статуса в заголовке ответа равен «0».
Notification[] notifications = entities.ToArray();
var itemsToInsert = new Dictionary<PartitionKey, Stream>();
foreach (var notification in notifications)
{
MemoryStream ms = new MemoryStream();
StreamWriter writer = new StreamWriter(ms);
JsonTextWriter jsonWriter = new JsonTextWriter(writer);
JsonSerializer ser = new JsonSerializer();
ser.Serialize(jsonWriter, notification);
await jsonWriter.FlushAsync();
await writer.FlushAsync();
itemsToInsert.Add(new PartitionKey(notification.UserId), ms);
}
List<Task> tasks = new List<Task>(notifications.Length);
foreach (KeyValuePair<PartitionKey, Stream> item in itemsToInsert)
{
tasks.Add(Container.CreateItemStreamAsync(item.Value, item.Key)
.ContinueWith((Task<ResponseMessage> task) =>
{
using (ResponseMessage response = task.Result)
{
if (!response.IsSuccessStatusCode)
{
Console.WriteLine($"Received {response.StatusCode} ({response.ErrorMessage}).");
}
else
{
}
}
}));
}
// Wait until all are done
await Task.WhenAll(tasks);
ответ.Код состояния: неверный запрос
ответ.Сообщение об ошибке: null
Я предполагаю, что я неправильно сериализую в поток. У кого-нибудь есть подсказка?
Обновить
Я обнаружил, что новый System.Text.Пакет Json также реализует .NET Standard 2.0, поэтому я установил его из NuGet. Теперь я могу скопировать пример кода с Github, упомянутый ранее.
Notification[] notifications = entities.ToArray();
var itemsToInsert = new List<Tuple<PartitionKey, Stream>>();
foreach (var notification in notifications)
{
notification.id = $"{notification.UserId}:{Guid.NewGuid()}";
MemoryStream stream = new MemoryStream();
await JsonSerializer.SerializeAsync(stream, notification);
itemsToInsert.Add(new Tuple<PartitionKey, Stream>(new PartitionKey(notification.RoleId), stream));
}
List<Task> tasks = new List<Task>(notifications.Length);
foreach (var item in itemsToInsert)
{
tasks.Add(Container.CreateItemStreamAsync(item.Item2, item.Item1)
.ContinueWith((Task<ResponseMessage> task) =>
{
using (ResponseMessage response = task.Result)
{
if (!response.IsSuccessStatusCode)
{
Console.WriteLine($"Received {response.StatusCode} ({response.ErrorMessage}).");
}
else
{
}
}
}));
}
// Wait until all are done
await Task.WhenAll(tasks);
Я дважды проверил, что BulkInsert включен (иначе первый метод также не будет работать). Все еще есть неверный запрос и значение NULL для сообщения об ошибке.
Я также проверил, что данные не добавляются в контейнер из-за неправильного запроса.
Комментарии:
1. Ключ уже в базе данных?
2. Возможно, стоит установить нулевую позицию потока памяти перед его добавлением.
ms.Position = 0;
3. @jdweng спасибо, ключи являются идентификаторами GUID, поэтому они всегда (глобально) уникальны.
4. Если вы дважды запустите код в существующей базе данных, вы дважды введете одни и те же значения в базу данных.
5. @jdweng Я знаю, поэтому идентификатор генерируется следующим образом: $»{уведомление. Идентификатор пользователя}:{Guid. NewGuid()}»; Каждый раз генерируется новый идентификатор GUID. Чтобы быть уверенным, я очистил базу данных от документов для этого ключа раздела.
Ответ №1:
Я нашел проблему.
Я настроил свой контекст Cosmos со следующими параметрами:
var cosmosSerializationOptions = new CosmosSerializationOptions();
cosmosSerializationOptions.PropertyNamingPolicy = CosmosPropertyNamingPolicy.CamelCase;
CosmosClientOptions cosmosClientOptions = new CosmosClientOptions();
cosmosClientOptions.SerializerOptions = cosmosSerializationOptions;
Отсюда CamelCase
соглашение. В моем первом (рабочем) примере кода я бы позволил десериализовать контекст CosmosDB в JSON. Он будет сериализован с помощью этого соглашения camelCase, поэтому мой разделительный ключ UserId
будет сериализован в userId
.
Однако, чтобы уменьшить некоторые RU, я буду использовать CreateItemStreamAsync
, что делает меня ответственным за сериализацию. И произошла ошибка, мое свойство было определено как:
public int UserId { get; set; }
Таким образом, он был бы сериализован в json UserId: 1
.
Однако ключ раздела определяется как /userId
. Итак, если я добавлю атрибут JsonPropertyName, это сработает:
[JsonPropertyName("userId")]
public int UserId { get; set; }
… если бы только сообщение об ошибке сообщило мне об этом.
При использовании этого CreateItemStream
метода экономия составляет около 3%. Однако, я думаю, со временем это постепенно сэкономило бы некоторые RU в общей сложности.
Ответ №2:
Похоже, что поток не читается. Отсюда и плохой запрос. Я бы внес небольшие изменения в то, как MemoryStream
создается:
foreach (var notification in notifications)
{
itemsToInsert.Add(new PartitionKey(notification.UserId), new MemoryStream(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(notification))));
}
Конечно, я использую Newtonsoft.json для JsonConvert.
Комментарии:
1. Спасибо! Я попробовал ваше предложение, но все еще получаю неверные запросы. Но с этим ответом я, по крайней мере, узнал более читаемый / компактный способ сериализации в поток с помощью Newtonsoft.
2. Позвольте мне повторить пример еще раз.
3. Смотрите мой ответ / решение ниже. Ваше предложение также работает, если мои атрибуты сериализации верны