#mysql #python-3.x
#mysql #python-3.x
Вопрос:
У меня есть достаточно большой набор данных, состоящий примерно из 6 000 000 строк X 60 столбцов, которые я пытаюсь вставить в базу данных. Я разбиваю их на фрагменты и вставляю по 10 000 за раз в базу данных mysql, используя класс, который я написал, и pymysql. Проблема в том, что я иногда отключаю сервер во время записи, поэтому я изменил свой вызов executemany для повторного подключения при ошибках. Это отлично работает, когда я теряю соединение один раз, но если я теряю ошибку во второй раз, я получаю pymysql.InternalException, указывающее, что превышен тайм-аут ожидания блокировки. Мне было интересно, как я мог бы изменить следующий код, чтобы перехватить это и полностью уничтожить транзакцию, прежде чем пытаться снова.
Я пытался вызвать rollback () для соединения, но это вызывает другое InternalException, если соединение разрушено, потому что курсора больше нет.
Буду признателен за любую помощь (я также не понимаю, почему я получаю таймауты для начала, но данные относительно большие.)
class Database:
def __init__(self, **creds):
self.conn = None
self.user = creds['user']
self.password = creds['password']
self.host = creds['host']
self.port = creds['port']
self.database = creds['database']
def connect(self, type=None):
self.conn = pymysql.connect(
host = self.host,
user = self.user,
password = self.password,
port = self.port,
database = self.database
)
def executemany(self, sql, data):
while True:
try:
with self.conn.cursor() as cursor:
cursor.executemany(sql, data)
self.conn.commit()
break
except pymysql.err.OperationalError:
print('Connection error. Reconnecting to database.')
time.sleep(2)
self.connect()
continue
return cursor
и я называю это так:
for index, chunk in enumerate(dataframe_chunker(df), start=1):
print(f"Writing chunkt{index}t{timer():.2f}")
db.executemany(insert_query, chunk.values.tolist())
Ответ №1:
Взгляните на то, что делает MySQL. Тайм-ауты ожидания блокировки вызваны тем, что вставки не могут быть выполнены до завершения чего-либо другого, что может быть вашим собственным кодом.
SELECT * FROM `information_schema`.`innodb_locks`;
Отобразятся текущие блокировки.
select * from information_schema.innodb_trx where trx_id = [lock_trx_id];
Покажет задействованные транзакции
SELECT * FROM INFORMATION_SCHEMA.PROCESSLIST where id = [trx_mysql_thread_id];
Покажет задействованное соединение и может показать запрос, блокировка которого приводит к таймауту ожидания блокировки. Возможно, произошла незафиксированная транзакция.
Скорее всего, это ваш собственный код из-за взаимодействия с вашей executemany
функцией, которая улавливает исключения и повторно подключается к базе данных. Что с предыдущим подключением? Прерывает ли тайм-аут ожидания блокировки предыдущее соединение? Это, хотя и true, будет проблемой.
Для кода, вызываемого executemany
при подключении к БД, будьте более защищенными при попытке / за исключением чего-то вроде:
def executemany(self, sql, data):
while True:
try:
with self.conn.cursor() as cursor:
cursor.executemany(sql, data)
self.conn.commit()
break
except pymysql.err.OperationalError:
print('Connection error. Reconnecting to database.')
if self.conn.is_connected():
connection.close()
finally:
time.sleep(2)
self.connect()
Но решением здесь будет не вызывать тайм-ауты ожидания блокировки, если нет других клиентов базы данных.
Комментарии:
1. Я полагал, что это был мой собственный код, но я не совсем уверен, как я буду прерывать соединение, если получу первую ошибку начального тайм-аута, которая требует повторной попытки подключения. Кроме того, раздел information_schema пуст — это то, что мне нужно было бы настроить?
2. Вам нужно посмотреть на блокировки во время выполнения вашего кода и истечения времени ожидания. Это сложно сделать, но именно здесь вы увидите блокировки. Я обновил ответ, чтобы включить закрытие соединения перед повторной попыткой.
3. В таблице information_schema для lock_timeouts установлено максимальное значение в два, поэтому, если скрипт завершается с ошибкой один раз, но затем повторно подключается и выполняется, проблемы нет, но второй сбой вызовет ошибку timeout_lock. Существует ли опасность увеличения этого предела ожидания блокировки, если это хранилище данных будет использовано программным обеспечением viz позже? Процесс дампа происходит всего лишь по ночам, но ежедневно могут быть сотни / тысячи людей, извлекающих данные из местоположения, которое загружается этими данными.
4.
show PROCESSLIST;
также может быть полезно при устранении неполадок. Что касается оставшихся точек на многих устройствах чтения / записи с одним устройством, то это вопрос архитектуры базы данных, который лучше всего решать администратору базы данных.5. ПОКАЗАТЬ ПОЛНЫЙ СПИСОК ПРОЦЕССОВ показывает две строки, если он повторно подключается дважды, так что это определенно не завершает транзакцию, как я сейчас написал. Несколько быстрых вопросов, 1. у вас есть это, если в блоке except, заключенном в круглые скобки. Есть ли причина для этого? 2. Я бы хотел вызвать close () для self.conn, а не для подключения, верно? 3. было бы лучше проверить, открыто ли соединение, вызвать conn.rollback(), затем conn.close(), чтобы правильно закрыть соединение и создать новое соединение? 4. есть ли какие-либо накладные расходы на создание нового объекта conn для каждой подобной транзакции, если я делаю ~ 600 записей в базу данных?