#python #pandas #dataframe #influxdb-python #influxdb-2
Вопрос:
Мне нужно записать исторические данные в InfluxDB (я использую Python, который в данном случае не является обязательным, поэтому я, возможно, готов принять решения, отличные от Python). Я настроил API записи следующим образом
write_api = client.write_api(write_options=ASYNCHRONOUS)
Данные поступают из фрейма данных с меткой времени в качестве ключа, поэтому я записываю их в базу данных следующим образом
result = write_api.write(bucket=bucket, data_frame_measurement_name=field_key, record=a_data_frame)
Этот вызов не генерирует исключение, даже если сервер InfluxDB не работает. result
имеет защищенный атрибут _success
, который является логическим значением при отладке, но я не могу получить к нему доступ из кода.
Как мне проверить, прошла ли запись успешно?
Комментарии:
1. Rant: Как я ненавижу этот дивный новый мир, основанный на событиях, где все «в конечном итоге согласовано, а может быть, и никогда».
Ответ №1:
Если вы используете пакетную обработку в фоновом режиме, вы можете добавить пользовательские обратные вызовы, ошибки и повторные вызовы.
from influxdb_client import InfluxDBClient
def success_cb(details, data):
url, token, org = details
print(url, token, org)
data = data.decode('utf-8').split('n')
print('Total Rows Inserted:', len(data))
def error_cb(details, data, exception):
print(exc)
def retry_cb(details, data, exception):
print('Retrying because of an exception:', exc)
with InfluxDBClient(url, token, org) as client:
with client.write_api(success_callback=success_cb,
error_callback=error_cb,
retry_callback=retry_cb) as write_api:
write_api.write(...)
Если вы хотите протестировать все обратные вызовы и не хотите ждать, пока все попытки будут завершены, вы можете переопределить интервал и количество попыток.
from influxdb_client import InfluxDBClient, WriteOptions
with InfluxDBClient(url, token, org) as client:
with client.write_api(success_callback=success_cb,
error_callback=error_cb,
retry_callback=retry_cb,
write_options=WriteOptions(retry_interval=60,
max_retries=2),
) as write_api:
...
Ответ №2:
если вы хотите немедленно записать данные в базу данных, используйте СИНХРОННУЮ версию write_api
— https://github.com/influxdata/influxdb-client-python/blob/58343322678dd20c642fdf9d0a9b68bc2c09add9/examples/example.py#L12
Асинхронная запись должна быть «вызвана» вызовом .get()
— https://github.com/influxdata/influxdb-client-python#asynchronous-client
С уважением
Комментарии:
1. Ваш ответ не совсем правильный, но он привел меня к правильному решению.
Ответ №3:
write_api.write()
возвращает multiprocessing.pool.AsyncResult
или multiprocessing.pool.AsyncResult
(оба одинаковые).
С помощью этого возвращаемого объекта вы можете проверить асинхронный запрос несколькими способами. Смотрите здесь: https://docs.python.org/2/library/multiprocessing.html#multiprocessing.pool .AsyncResult
Если вы можете использовать блокирующий запрос, то write_api = client.write_api(write_options=SYNCRONOUS)
его можно использовать.