Отправка большого словаря через вызов API прерывает сервер разработки

#python #django #dictionary #django-rest-framework

#python #django #словарь #django-rest-framework

Вопрос:

Я запускаю приложение django с базой данных PostgreSQL и пытаюсь отправить в базу данных очень большой словарь (состоящий из данных временных рядов).

Моя цель — записать мои данные в БД как можно быстрее. Я использую запросы библиотеки для отправки данных через API-вызов (созданный с помощью django REST):

Мой API-вид прост:

 @api_view(["POST"])
def CreateDummy(request):

    for elem, ts in request.data['time_series'] :
        TimeSeries.objects.create(data_json=ts)

    msg = {"detail": "Created successfully"}
    return Response(msg, status=status.HTTP_201_CREATED)
  

request.data['time_series'] является ли огромный словарь структурированным следующим образом:

 {Building1: {1:123, 2: 345, 4:567 .... 31536000: 2345}, .... Building30: {..... }}
  

Это означает, что у меня есть 30 ключей с 30 значениями, тогда как каждое значение является dict с 31536000 элементами.

Мой запрос API выглядит следующим образом (где данные — это мой словарь, описанный выше):

  payload = {
            "time_series": data,
           } 

 requests.request(
        "post", url=endpoint, json=payload
    )
  

Код сохраняет данные временных рядов в jsonb-поле в серверной части. Теперь это работает, если я перебираю только первые 4 элемента словаря. Я могу получить эти данные примерно через 1 минуту. Но когда я перебираю весь dict, мой сервер разработки отключается. Я думаю, это потому, что памяти недостаточно. Я получаю requests.exceptions.ConnectionError: ('Connection aborted.', RemoteDisconnected('Remote end closed connection without response')) . Сохраняется ли весь dict в памяти до начала итерации? Я сомневаюсь в этом, потому что я читал, что в python3 цикл with .items() возвращает итератор и является предпочтительным способом сделать это.

Есть ли лучший способ справиться с массивными dicts в django / python? Должен ли я перебирать половину, а затем другую половину? Или есть более быстрый способ? Может быть, с помощью pandas ? Или, может быть, отправка данных по-другому? Я думаю, я ищу наиболее эффективный способ сделать это.

Рад предоставить больше кода, если это необходимо.

Любая помощь, подсказки или руководства очень ценятся! Заранее спасибо

EDIT2: я думаю, что это не из-за использования моей оперативной памяти или размера dict. У меня все еще остается 5 ГБ оперативной памяти, когда сервер выключается. ~~ И размер dict составляет 1176bytes ~~ Dict намного больше, см. Комментарии

EDIT3: я даже не могу распечатать огромный dict. Затем он также отключается

EDIT4: когда данные разделяются и отправляются не все сразу, сервер может их обработать. Но когда я пытаюсь запросить его обратно, сервер снова прерывается. Он прерывается на моем производственном сервере (настройка nginx AWS RDS) и прерывается на моем локальном сервере разработки. Я почти уверен, что это потому, что django не может обрабатывать такие большие запросы при моей текущей настройке. Но как я мог это решить?

EDIT5: Итак, я ищу решение из двух частей. Один для создания данных и один для запроса данных. Создание данных, которые я описал выше. Но даже если я получу все эти данные в базу данных, у меня все равно будут проблемы с их извлечением.

Я попробовал это, создав данные не все вместе, а каждый временной ряд отдельно. Итак, давайте предположим, что у меня есть эти огромные данные в моей БД, и я пытаюсь запросить их обратно. Все объекты временных рядов принадлежат сети, поэтому я попробовал это так:

 
class TimeSeriesByTypeAndCreationMethod(ListAPIView):
    """Query time-series in specific network."""

    serializer_class = TimeSeriesSerializer

    def get_queryset(self):
        """Query time-series

        Query by name of network, type of data, creation method and
        source.
        """

        network = self.kwargs["name_network"]

        if TimeSeries.objects.filter(
            network_element__network__name=network,
        ).exists():
            time_series = TimeSeries.objects.filter(
                network_element__network__name=network,
            )
            return time_series
        else:
            raise NotFound()

  

Но запрос прерывает работу сервера, как и при создании данных ранее. Я также думаю, что это слишком большая нагрузка на данные. Я думал, что смогу использовать необработанный sql, чтобы избежать взлома сервера… Или есть лучший способ?

ПРАВКА6: соответствующие модели:

 
class TimeSeries(models.Model):

    TYPE_DATA_CHOICES = [
        ....many choices...
    ]

    CREATION_METHOD_CHOICES = [
        ....many choices...
    ]

    description = models.CharField(
        max_length=120,
        null=True,
        blank=True,
    )

    network_element = models.ForeignKey(
        Building,
        on_delete=models.CASCADE,
        null=True,
        blank=True,
    )
    type_data = models.CharField(
        null=True,
        blank=True,
        max_length=30,
        choices=TYPE_DATA_CHOICES,
    )

    creation_method = models.CharField(
        null=True,
        blank=True,
        max_length=30,
        choices=CREATION_METHOD_CHOICES,
    )

    source = models.CharField(
        null=True,
        blank=True,
        max_length=300
    )

    data_json = JSONField(
        help_text="Data for time series in JSON format. Valid JSON expected."
    )

    creation_date = models.DateTimeField(auto_now=True, null=True, blank=True)

    def __str__(self):
        return f"{self.creation_method}:{self.type_data}"



class Building(models.Model):

    USAGE_CHOICES = [
        ...
    ]

    name = models.CharField(
        max_length=120,
        null=True,
        blank=True,
    )
    street = models.CharField(
        max_length=120,
        null=True,
        blank=True,
    )
    house_number = models.CharField(
        max_length=20,
        null=True,
        blank=True,
    )
    zip_code = models.CharField(
        max_length=5,
        null=True,
        blank=True,
    )
    city = models.CharField(
        max_length=120,
        null=True,
        blank=True,
    )
    usage = models.CharField(
        max_length=120,
        choices=USAGE_CHOICES,
        null=True,
        blank=True,
    )
    .....many more fields....
   
  

Комментарии:

1. Завершает ли Ngnix (или аналогичный сервер) запрос?

2. Попробуйте использовать bulk_create() 🙂 надеюсь, это поможет

3. Ваш dict не может быть 1176 байт, поскольку он содержит 946 миллионов записей (30 ключей * 31 536 000). Всего один dict с 31 536 000 элементов составляет 1,25 ГБ. Каждый раз, когда в dict заканчивается место, он удваивает объем памяти. Dict с 22 300 000 записей занимает 0,625 ГБ, но 22 400 000 записей увеличиваются до 1,25 ГБ. >>> print(sys.getsizeof({x: x для x в диапазоне(31536000)}) / 1024.0 / 1024.0 / 1024.0, » ГБ»)

4. Хорошо, спасибо, я вас понял! Я напечатал это с sys.getsizeof(request.data["time_series"]) помощью, и это то, что появилось. Будет запущен еще один тест… Но является ли dict плохой структурой данных для этого варианта использования, если он удваивает объем памяти?

5. n 1 запрос, нет разбивки на страницы и индексов в базе данных для эффективного поиска. Пожалуйста, добавьте соответствующие модели для получения рекомендаций по правильной индексации

Ответ №1:

Вы можете решить свои проблемы, используя два метода.

Создание данных

Используйте bulk_create для вставки большого количества записей, Если ошибка SQL возникает из-за большого размера запроса и т. Д., Затем Укажите batch_size in bulk_create .

 records = []
for elem, ts in request.data['time_series'] :
    records.append(
         TimeSeries(data_json=ts)
    )

# setting batch size t 1000

TimeSeries.objects.bulk_create(records, batch_size=1000)
  

С bulk_create есть некоторые предостережения, например, он не будет генерировать сигналы, а другие подробнее см. в документе

Извлечение данных

Настройте rest framework для использования конфигурации разбивки на страницы по умолчанию

 REST_FRAMEWORK = {
    'DEFAULT_PAGINATION_CLASS': 'rest_framework.pagination.LimitOffsetPagination',
    'PAGE_SIZE': 100
}
  

Для использования пользовательской конфигурации

 class TimeSeriesResultsSetPagination(PageNumberPagination):
    page_size = 50
    page_size_query_param = 'page_size'
    max_page_size = 10000

class BillingRecordsView(generics.ListAPIView):
   serializer_class = TimeSeriesSerializer
   pagination_class = TimeSeriesResultsSetPagination


   def get_queryset(self):
    """Query time-series

    Query by name of network, type of data, creation method and
    source.
    """

    network = self.kwargs["name_network"]

    if TimeSeries.objects.filter(
        network_element__network__name=network,
    ).exists():
        time_series = TimeSeries.objects.filter(
            network_element__network__name=network,
        )
        return time_series
    else:
        raise NotFound()
   
  

Смотрите другие методы разбивки на страницы по https://www.django-rest-framework.org/api-guide/pagination /

Комментарии:

1. спасибо sonus21 и извините за поздний ответ. Позвольте мне попробовать это и отчитаться!

Ответ №2:

@micromegas когда ваше решение теоретически верно, однако, вызывая create() много раз в цикле, я считаю, что это вызывает исключение ConnectionError.

попробуйте выполнить рефакторинг на что-то вроде:

 big_data_holder = []
for elem, ts in request.data['time_series'] :
    big_data_holder.append(
         TimeSeries(data_json=ts)
    )

# examine the structure 
print(big_data_holder) 

TimeSeries.objects.bulk_create(big_data_holder)
  

пожалуйста, проверьте некоторые недостатки этого метода
Django Docs bulk_create

Комментарии:

1. Спасибо, я попробую это сейчас и отчитаюсь. Это решило бы часть создания. Я думаю, что это все равно сломает мой сервер, когда я запрошу эти данные обратно

2. Приведите пример кода и чего вы хотите достичь, и я постараюсь сделать все возможное, чтобы помочь. Обычно, когда мы запрашиваем DB, мы пытаемся ограничить поиск по pk или определенному столбцу, в то время как Postgres предлагает широкий выбор поиска по строкам. Личный совет: постарайтесь сделать его как можно более легким, разделите куски с учетом ограничения скорости соединения, и если это действительно невозможно, вы должны учитывать параллелизм.

3. Большое спасибо. Я отредактировал вопрос (Edit5). Я бы опубликовал отдельный вопрос, но я думаю, что это на самом деле та же проблема. Ввод и вывод данных нарушают работу сервера. Как вы думаете, я должен использовать необработанный SQL и поиск по первичному ключу вместо сетевого имени? И как я мог бы разбить запрос на фрагменты или сделать его более производительным? Еще раз спасибо

4. Теперь после этих попыток вы, возможно, захотите перепроектировать саму модель. не могу получить полную картину, но, как вы говорите, даже после разбивки на страницы и разделения все еще прерывается, обрабатывайте исключения, чтобы узнать, какая ошибка возникла с помощью блоков try except, кроме «у вас будет лучшее сообщение об ошибках». Вот некоторые мысли: — Отдельная БД для записи и чтения — Отдельная БД для поиска только с одним или двумя реляционными ключами к основной большой БД с кэшированием и публикацией изменений только при наличии изменений. — Перейдите в асинхронный режим и дайте ему время «проверить django и gevent» для обработки параллелизма. «Я бы рекомендовал это»

5. используйте индексы базы данных и разбивку на страницы для оптимизации поиска. Кроме того, используйте асинхронный маршрут с фоновыми рабочими и при необходимости увеличьте ресурсы БД. Увеличение времени ожидания должно быть последним средством, поскольку само по себе это не масштабируемо.