#python #function #with-statement #modularization
#python #функция #with-statement #модульность
Вопрос:
У меня есть этот код, который я использую для получения информации из базы данных mysql
def query_result_connect(_query):
with SSHTunnelForwarder((ssh_host, ssh_port),
ssh_password=ssh_password,
ssh_username=ssh_user,
remote_bind_address=('127.0.0.1', 3306)) as server:
connection = mdb.connect(user=sql_username,
passwd=sql_password,
db=sql_main_database,
host='127.0.0.1',
port=server.local_bind_port)
cursor = connection.cursor()
cursor.execute(_query)
connection.commit()
try:
y = pd.read_sql(_query, connection)
return y
except TypeError as e:
x = cursor.fetchall()
return x
Я хотел бы создать функцию, которая включает в себя следующую часть.
with SSHTunnelForwarder((ssh_host, ssh_port),
ssh_password=ssh_password,
ssh_username=ssh_user,
remote_bind_address=('127.0.0.1', 3306)) as server:
connection = mdb.connect(user=sql_username,
passwd=sql_password,
db=sql_main_database,
host='127.0.0.1',
port=server.local_bind_port)
и выполнить его в функции query_result_connect() . Проблема в том, что я не знаю, как включить больше кода в оператор ‘with’. Код должен выглядеть примерно так:
# Maybe introduce some arguments
def db_connection():
with SSHTunnelForwarder((ssh_host, ssh_port),
ssh_password=ssh_password,
ssh_username=ssh_user,
remote_bind_address=('127.0.0.1', 3306)) as server:
connection = mdb.connect(user=sql_username,
passwd=sql_password,
db=sql_main_database,
host='127.0.0.1',
port=server.local_bind_port)
# Maybe return something
def query_result_connect(_query):
# call the db_connection() function somehow.
# Write the following code in a way that is within the 'with' statement of the db_connection() function.
cursor = connection.cursor()
cursor.execute(_query)
connection.commit()
try:
y = pd.read_sql(_query, connection)
return y
except TypeError as e:
x = cursor.fetchall()
return x
Спасибо
Ответ №1:
Что собирается сделать «do_connection» самим контекстным менеджером?
@contextmanager
def do_connection():
# prepare connection
# yield connection
# close connection (__exit__). Perhaps you even want to call "commit" here.
Затем вы будете использовать его следующим образом:
with do_connection() as connection:
cursor = connection.cursor()
...
Это обычный подход к использованию контекстных менеджеров для создания соединений с БД.
Ответ №2:
Вы могли бы создать свой собственный класс подключения, который работает как менеджер conext.
__enter__
настраивает ssh-туннель и подключение к БД.
__exit__
пытается закрыть курсор, соединение с БД и туннель ssh.
from sshtunnel import SSHTunnelForwarder
import psycopg2, traceback
class MyDatabaseConnection:
def __init__(self):
self.ssh_host = '...'
self.ssh_port = 22
self.ssh_user = '...'
self.ssh_password = '...'
self.local_db_port = 59059
def _connect_db(self, dsn):
try:
self.con = psycopg2.connect(dsn)
self.cur = self.con.cursor()
except:
traceback.print_exc()
def _create_tunnel(self):
try:
self.tunnel = SSHTunnelForwarder(
(self.ssh_host, self.ssh_port),
ssh_password=self.ssh_password,
ssh_username=self.ssh_user,
remote_bind_address=('localhost', 5959),
local_bind_address=('localhost', self.local_db_port)
)
self.tunnel.start()
if self.tunnel.local_bind_port == self.local_db_port:
return True
except:
traceback.print_exc()
def __enter__(self):
if self._create_tunnel():
self._connect_db(
"dbname=mf port=%s host='localhost' user=mf_usr" %
self.local_db_port
)
return self
def __exit__(self, *args):
for c in ('cur', 'con', 'tunnel'):
try:
obj = getattr(self, c)
obj.close()
obj = None
del obj
except:
pass
with MyDatabaseConnection() as db:
print(db)
db.cur.execute('Select count(*) from platforms')
print(db.cur.fetchone())
Out:
<__main__.MyDatabaseConnection object at 0x1017cb6d0>
(8,)
Примечание:
Я подключаюсь к Postgres, но это тоже должно работать mysql
. Вероятно, вам нужно настроить в соответствии с вашими собственными потребностями.