Как модулировать код, содержащий ‘with’ в Python

#python #function #with-statement #modularization

#python #функция #with-statement #модульность

Вопрос:

У меня есть этот код, который я использую для получения информации из базы данных mysql

 def query_result_connect(_query):
    with SSHTunnelForwarder((ssh_host, ssh_port),
                            ssh_password=ssh_password,
                            ssh_username=ssh_user,
                            remote_bind_address=('127.0.0.1', 3306)) as server:
        connection = mdb.connect(user=sql_username,
                                 passwd=sql_password,
                                 db=sql_main_database,
                                 host='127.0.0.1',
                                 port=server.local_bind_port)
        cursor = connection.cursor()

        cursor.execute(_query)
        connection.commit()
        try:
            y = pd.read_sql(_query, connection)
            return y
        except TypeError as e:
            x = cursor.fetchall()
            return x
 

Я хотел бы создать функцию, которая включает в себя следующую часть.

 with SSHTunnelForwarder((ssh_host, ssh_port),
                            ssh_password=ssh_password,
                            ssh_username=ssh_user,
                            remote_bind_address=('127.0.0.1', 3306)) as server:
        connection = mdb.connect(user=sql_username,
                                 passwd=sql_password,
                                 db=sql_main_database,
                                 host='127.0.0.1',
                                 port=server.local_bind_port)
 

и выполнить его в функции query_result_connect() . Проблема в том, что я не знаю, как включить больше кода в оператор ‘with’. Код должен выглядеть примерно так:

 # Maybe introduce some arguments
def db_connection():
    with SSHTunnelForwarder((ssh_host, ssh_port),
                            ssh_password=ssh_password,
                            ssh_username=ssh_user,
                            remote_bind_address=('127.0.0.1', 3306)) as server:
        connection = mdb.connect(user=sql_username,
                                 passwd=sql_password,
                                 db=sql_main_database,
                                 host='127.0.0.1',
                                 port=server.local_bind_port)
    #     Maybe return something
    

def query_result_connect(_query):
        # call the db_connection() function somehow.
        
        # Write the following code in a way that is within the 'with' statement of the db_connection() function.
        cursor = connection.cursor()

        cursor.execute(_query)
        connection.commit()
        try:
            y = pd.read_sql(_query, connection)
            return y
        except TypeError as e:
            x = cursor.fetchall()
            return x
 

Спасибо

Ответ №1:

Что собирается сделать «do_connection» самим контекстным менеджером?

 @contextmanager
def do_connection():
    # prepare connection
    # yield connection
    # close connection (__exit__). Perhaps you even want to call "commit" here.
 

Затем вы будете использовать его следующим образом:

 with do_connection() as connection:
    cursor = connection.cursor()
    ...
 

Это обычный подход к использованию контекстных менеджеров для создания соединений с БД.

Ответ №2:

Вы могли бы создать свой собственный класс подключения, который работает как менеджер conext.

__enter__ настраивает ssh-туннель и подключение к БД.
__exit__ пытается закрыть курсор, соединение с БД и туннель ssh.

 from sshtunnel import SSHTunnelForwarder
import psycopg2, traceback


class MyDatabaseConnection:
    def __init__(self):
        self.ssh_host = '...'
        self.ssh_port = 22
        self.ssh_user = '...'
        self.ssh_password = '...'
        self.local_db_port = 59059

    def _connect_db(self, dsn):
        try:
            self.con = psycopg2.connect(dsn)
            self.cur = self.con.cursor()
        except:
            traceback.print_exc()

    def _create_tunnel(self):
        try:
            self.tunnel = SSHTunnelForwarder(
                (self.ssh_host, self.ssh_port),
                ssh_password=self.ssh_password,
                ssh_username=self.ssh_user,
                remote_bind_address=('localhost', 5959),
                local_bind_address=('localhost', self.local_db_port)
            )
            self.tunnel.start()
            if self.tunnel.local_bind_port == self.local_db_port:
                return True
        except:
            traceback.print_exc()

    def __enter__(self):
        if self._create_tunnel():
            self._connect_db(
                "dbname=mf port=%s host='localhost' user=mf_usr" %
                self.local_db_port
            )
            return self

    def __exit__(self, *args):
        for c in ('cur', 'con', 'tunnel'):
            try:
                obj = getattr(self, c)
                obj.close()
                obj = None
                del obj
            except:
                pass


with MyDatabaseConnection() as db:
    print(db)
    db.cur.execute('Select count(*) from platforms')
    print(db.cur.fetchone())
 

Out:

 <__main__.MyDatabaseConnection object at 0x1017cb6d0>
(8,)
 

Примечание:

Я подключаюсь к Postgres, но это тоже должно работать mysql . Вероятно, вам нужно настроить в соответствии с вашими собственными потребностями.