Python InfluxDB2 — write_api.write(…) Как проверить на успех?

#python #pandas #dataframe #influxdb-python #influxdb-2

Вопрос:

Мне нужно записать исторические данные в InfluxDB (я использую Python, который в данном случае не является обязательным, поэтому я, возможно, готов принять решения, отличные от Python). Я настроил API записи следующим образом

 write_api = client.write_api(write_options=ASYNCHRONOUS)
 

Данные поступают из фрейма данных с меткой времени в качестве ключа, поэтому я записываю их в базу данных следующим образом

 result = write_api.write(bucket=bucket, data_frame_measurement_name=field_key, record=a_data_frame)
 

Этот вызов не генерирует исключение, даже если сервер InfluxDB не работает. result имеет защищенный атрибут _success , который является логическим значением при отладке, но я не могу получить к нему доступ из кода.

Как мне проверить, прошла ли запись успешно?

Комментарии:

1. Rant: Как я ненавижу этот дивный новый мир, основанный на событиях, где все «в конечном итоге согласовано, а может быть, и никогда».

Ответ №1:

Если вы используете пакетную обработку в фоновом режиме, вы можете добавить пользовательские обратные вызовы, ошибки и повторные вызовы.

 from influxdb_client import InfluxDBClient

def success_cb(details, data):
    url, token, org = details
    print(url, token, org)
    data = data.decode('utf-8').split('n')
    print('Total Rows Inserted:', len(data))  

def error_cb(details, data, exception):
    print(exc)

def retry_cb(details, data, exception):
    print('Retrying because of an exception:', exc)    


with InfluxDBClient(url, token, org) as client:
    with client.write_api(success_callback=success_cb,
                          error_callback=error_cb,
                          retry_callback=retry_cb) as write_api:

        write_api.write(...)
 

Если вы хотите протестировать все обратные вызовы и не хотите ждать, пока все попытки будут завершены, вы можете переопределить интервал и количество попыток.

 from influxdb_client import InfluxDBClient, WriteOptions

with InfluxDBClient(url, token, org) as client:
    with client.write_api(success_callback=success_cb,
                          error_callback=error_cb,
                          retry_callback=retry_cb,
                          write_options=WriteOptions(retry_interval=60,
                                                     max_retries=2),
                          ) as write_api:
        ...
 

Ответ №2:

если вы хотите немедленно записать данные в базу данных, используйте СИНХРОННУЮ версию write_api https://github.com/influxdata/influxdb-client-python/blob/58343322678dd20c642fdf9d0a9b68bc2c09add9/examples/example.py#L12

Асинхронная запись должна быть «вызвана» вызовом .get() https://github.com/influxdata/influxdb-client-python#asynchronous-client

С уважением

Комментарии:

1. Ваш ответ не совсем правильный, но он привел меня к правильному решению.

Ответ №3:

write_api.write() возвращает multiprocessing.pool.AsyncResult или multiprocessing.pool.AsyncResult (оба одинаковые).

С помощью этого возвращаемого объекта вы можете проверить асинхронный запрос несколькими способами. Смотрите здесь: https://docs.python.org/2/library/multiprocessing.html#multiprocessing.pool .AsyncResult

Если вы можете использовать блокирующий запрос, то write_api = client.write_api(write_options=SYNCRONOUS) его можно использовать.