#python #mysql #etl #data-warehouse
Вопрос:
У меня есть требование, в котором мне нужно получить огромную таблицу, которая в настоящее время хранится в базе данных MySQL с использованием python. Я пробовал использовать Ограничение и смещение, но это продолжает замедляться с увеличением числа итераций.
Я использую mysqlclient
для установления связей.
Мое текущее требование — записывать его в файл после каждой итерации. Для тестирования я использую CSV, но я переключусь на parquet
формат файла, как только найду хорошее решение
select * from table_name limit {} ,{}
Пожалуйста, предложите мне некоторые из методов, которые вы используете, и они работают безупречно.
фрагмент кода, который я использую :
i = 1
limit = 1000000
curs.execute("select * from table_name limit 1")
col = [col_[0] for col_ in curs.description]
print(col)
while True:
start_ = time.time()
query = f'select * from table_name limit {i} ,{limit} '
print(query)
curs.execute(query)
rows = curs.fetchall()
if not len(rows):
print("done")
break
print("###########")
print(i)
print("###########")
end_ = time.time()
print(f"Runtime of the program is {end_ - start_}")
df_temp = pd.DataFrame(rows, columns=col)
df_temp.to_csv(f"test_{i}.csv", header=True, index=False)
# df_total.append(df_temp)
i = i limit
Или мне следует рассмотреть возможность использования разных языков, таких как scala?
Комментарии:
1. Приведи их и что сделай? Где именно находится узкое место? Какой разъем вы используете?
2. @deceze , поэтому я извлекаю и записываю его в csv прямо сейчас (но в будущем могу переключиться на parquet) . Я использую mysqlclient для установления соединений
3. Панды, похоже, здесь перебор. Просто используйте
csv
модуль Python, выберите все строки без ограничений, аfor..in
затем по строкам результатов и запишите их в CSV? Возможно, измените пакетное считывание курсора, чтобы этот процесс был как можно более быстрым.4. Проблема в том, что вы используете
fetchall
для чтения всех строк в память сразу. Это было бы ненужно, если бы вы читали строки одну за другой и записывали их в CSV одну за другой.5. Вы можете легко перебирать курсор, чтобы получить только одну строку за раз. Это очень эффективно, потому что базовый драйвер БД будет получать данные по частям и, следовательно, использовать минимум памяти. Однако, похоже, вам нужны все 50 миллионов строк в кадре данных в памяти. Я думаю, что именно здесь у вас, скорее всего, возникнут проблемы, если у вас нет большого количества оперативной памяти (виртуальной или другой). Вам следует вообще забыть об использовании pandas и написать CSV-файл самостоятельно (т. е. простое форматирование строк).
Ответ №1:
Во-первых: строки в Mysql и других СУБД не имеют внутреннего порядка. Таким образом, результирующий набор со СМЕЩЕНИЕМ 100 от ПРЕДЕЛА не гарантированно даст следующие строки после результирующего набора со СМЕЩЕНИЕМ 0 от ПРЕДЕЛА 100. Если ваш запрос не содержит предложение ORDER BY, СУБД выдает результаты в непредсказуемом порядке. Непредсказуемое хуже случайного; при случайном заказе у вас есть шанс обнаружить проблему в тесте. Таким образом, ваш текущий пакетный подход неверен.
Во-вторых: Не ЗАКАЗЫВАЙТЕ набор результатов, подобный этому, за исключением, возможно, первичного ключа. В противном случае вашему серверу потребуется очень много времени, чтобы заказать пятьдесят мегаватт. И ему придется делать это снова для каждой партии. Увидимся в следующем году! -:)
В-третьих: Очень быстрый способ создания файла .csv-это ВЫБОР … В OUTFILE …. Вам следует подумать об этом, а затем заставить вашу программу python прочитать файл .csv. Возможно, вы даже сможете найти программный инструмент или скрипт для преобразования файла .csv в формат parquet.
Четвертое: вы хотите транслировать этот набор результатов, а не глотать его. Вы хотите обрабатывать его строка за строкой, чтобы не потерять оперативную память вашей программы на python. Используйте необработанный курсор r и используйте for (...) in cursor:
его для извлечения строк по одной. Обработайте каждую строку, прежде чем выбрать следующую.
Пятое: Не беспокойтесь о пятидесяти миллионах итераций вашего цикла for; ваша программа все равно должна прочитать и написать столько строк.
Шестое: если ваша база данных занята во время выполнения этой операции, другие пользователи могут быть заблокированы. Чтобы устранить эту блокировку, выполните инструкцию SQL, УСТАНОВИТЕ УРОВЕНЬ ИЗОЛЯЦИИ ТРАНЗАКЦИЙ, ПРОЧИТАЙТЕ НЕЗАФИКСИРОВАННОЕ прямо перед инструкцией SELECT.
Один из пунктов всей этой технологии СУБД заключается в том, чтобы позволить программам использовать наборы данных, которые на порядки больше доступной оперативной памяти. Потоковая передача данных-единственный способ сделать это, когда вам нужно обработать весь набор данных.
Комментарии:
1. Отличный список предложений . Большое спасибо. Еще один вопрос: если я выберу outfile , смогу ли я сохранить csv в своей локальной системе ? Мой sql работает в AWS RDS .
2. О, нет, INTO OUTFILE экспортирует свой файл в локальную файловую систему сервера MySQL. Но вполне возможно, что племя амазонок, управляющее своей службой баз данных, предоставило способ записать ее в корзину S3. Поищите его или обратитесь к ним за помощью.
Ответ №2:
Вот пример того, как можно эффективно извлечь все содержимое таблицы базы данных MySQL и создать файл CSV без помощи панд. В моей таблице (ips) всего 5 столбцов, 3 из которых каждый объявлен как VARCHAR(255), один двойной и один int. Таблица содержит 1 миллион строк. В таблице нет никаких ограничений. Этот код выполняется за ~1,75 секунды:-
from MySQLdb import _mysql
CSVFILE = '/Users/andy/PrivateStuff/iplist.csv'
TABLE = 'IPS'
CHUNK = 10_000
CONFIG = {
'user': 'andy',
'passwd': 'monster',
'db': 'andy'
}
class Connection():
def __init__(self, config):
self._config = config
self._db = None
@property
def db(self):
if not self._db:
self._db = _mysql.connection(**self._config)
return self._db
def __enter__(self):
return self
def __exit__(self, *args):
if self._db:
self._db.close()
self._db = None
def decode(items):
return ', '.join([item.decode() if isinstance(item, bytes) else item if item else 'NULL' for item in items])
with open(CSVFILE, 'w') as csvfile:
with Connection(CONFIG) as conn:
conn.db.query(f'SHOW COLUMNS FROM {TABLE}')
r = conn.db.store_result()
cols = [col[0] for col in r.fetch_row(maxrows=0)]
print(decode(cols), file=csvfile)
conn.db.query(f'SELECT * FROM {TABLE}')
r = conn.db.use_result()
while (rows := r.fetch_row(maxrows=CHUNK)):
for row in rows:
print(decode(row), file=csvfile)
Ответ №3:
Почему ты хочешь свалить на Паркет? Использовать его в ситуации с озером данных? Если это так, рассмотрите возможность использования Presto или Spark для подключения к MySQL и создания файла Parquet для вас.
С учетом сказанного, я уже создавал скрипт на Python для сброса результата запроса в CSV. Если вы все сделаете правильно, вы сможете за несколько минут сбросить 50 миллионов строк данных в файл CSV (в зависимости от среднего размера ваших строк в байтах).
Панды, вероятно, замедляют вас здесь. Честно говоря, я бы даже не стал использовать CSV-пакет Python. Я бы просто сделал некоторые базовые манипуляции со строками, как это:
# create a file to append to
my_file = with open("mytable.csv", "a") as my_file:
# get sql result
my_result = my_cursor.fetchall()
# get amount of columns in result
row_len = len(my_result[0])
# iterate over results
for row in my_result:
# reset string to append to file
row_string = ""
# iterate over all columns except last one
for col_num in range(row_len - 1):
# escape double quotes with two double quotes (common CSV pattern)
escaped_col_val = col.replace(""", """")
# encapsulate the column within double quotes, separate with a comma
row_string = row_string ""{escaped_col_val}"" ","
# same escaping as before, just for last column
escaped_col_val = col.replace(""", """")
# last column uses a newline instead of comma, to indicate a new row
row_string = row_string ""{escaped_col_val}"" "n"
# append the result to a string
my_file.write(row_string)
Как бы то ни было, я создаю SQLPipe нового продукта, который можно легко расширить, чтобы отправить результат запроса в CSV. Возможно, проверьте мой профиль для получения контактной информации.
Комментарии:
1. ты говоришь об этом ? pypi.org/project/sqlpipe
2. Извините, надо было выразиться яснее. Добавлена ссылка на проект (sqlpipe.com) в ответе.
Ответ №4:
с ограничением и смещением, но это продолжает замедляться с увеличением числа итераций.
Да, это факт.
Вместо этого «вспомните, на чем вы остановились»; это будет намного быстрее. Более подробная информация: http://mysql.rjweb.org/doc.php/pagination
Между тем, LIMIT
» без ORDER BY
» требует непредсказуемых результатов. Кроме того, при вставках и удалениях такое разбиение на фрагменты может привести к дублированию и/или отсутствию строк!
Если ваша конечная цель-скопировать его в CSV-файл, просто сделайте это
SELECT ... INTO OUTFILE "..." ...
Это займет один проход-быстрее, чем все, что вы можете сделать с циклами на Python или любом другом языке.
Также подумайте о том, чтобы запустить это удаленно (то есть с вашего клиентского компьютера, а не с сервера AWS).
mysql -h ... --batch -e "SELECT * FROM tbl" > foo.csv
(Для этого может потребоваться больше параметров.) (Это приведет к появлению вкладок между столбцами и начальной строки с именами столбцов.)
Ответ №5:
Вы можете заказать по первичному ключу и запросить что-то вроде:
select * from table order by id limit 1000
в первый раз- а затем выполните
select * from table where id > %(last_id_you_saw)s order by id limit 1000
Это не должно замедляться с течением времени.