Копирование больших двоичных объектов из исходного контейнера и асинхронная загрузка в целевой контейнер

#python #azure-blob-storage #python-asyncio

#python #azure-blob-хранилище #python-asyncio

Вопрос:

Я пытаюсь использовать azure.storage.blob.aio для загрузки больших двоичных объектов из одного контейнера в другой контейнер, но если функция работает должным образом, я продолжаю получать предупреждающее сообщение:

 Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7f6a2ebf6e50>
Unclosed connector
connections: ['[(<aiohttp.client_proto.ResponseHandler object at 0x7f6a2e3553a0>, 32966.677798886)]']
connector: <aiohttp.connector.TCPConnector object at 0x7f6a2ebf6eb0>
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7f6a2ec03400>
Unclosed connector
...
  

Что не так с моим кодом и как я могу исправить приведенное выше сообщение?

 async def upload_blob(blob_args):
    # srource blob
    src_container_client = ContainerClient.from_connection_string(blob_args["src_connect_str"], blob_args["src_container"])

    # destination blob
    target_container_client = ContainerClient.from_connection_string(blob_args["target_connect_str"], blob_args["target_container"])
  
    tasks = []
    print(f"nReading blobs from the container:")
    async for source_blob in src_container_client.list_blobs():
        # push data into specified stream
        source_blob_client = src_container_client.get_blob_client(source_blob.name)
        stream_downloader = await source_blob_client.download_blob()
        stream = await stream_downloader.readall()
            
        # create the BlobClient from the ContainerClient to interact with a specific blob
        target_blob_client = target_container_client.get_blob_client(source_blob.name)

        print(f"t- Transfering {source_blob.name}.")
        tasks.append(asyncio.create_task(target_blob_client.upload_blob(stream)))

    await asyncio.gather(*tasks)
    print("Finished")

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(upload_blob(blob_args))
  

Ответ №1:

Что касается варинга, вы не запускаете async context manager со своим ContainerClient .

Например

 import io
from azure.storage.blob.aio import ContainerClient
import asyncio


async def copy_blobs():
    # srource blob
    source_container_client = ContainerClient.from_connection_string(
        conn_str, 'upload')

    # destination blob
    target_container_client = ContainerClient.from_connection_string(
        conn_str, 'copy')
    async with source_container_client, target_container_client:
        tasks = []
        print(f"nReading blobs from the container:")
        async for source_blob in source_container_client.list_blobs():
            source_blob_client = src_container_client.get_blob_client(source_blob.name)
            stream_downloader = await source_blob_client.download_blob()
            stream = await stream_downloader.readall()
            
            tasks.append(asyncio.create_task(target_blob_client.upload_blob(stream)))
            
            print(f"t- Transfering {source_blob.name}.")

        await asyncio.gather(*tasks)
        print("Finished")

if __name__ == "__main__":
    asyncio.set_event_loop(asyncio.new_event_loop())
    loop = asyncio.get_event_loop()
    loop.run_until_complete(copy_blobs())
  

введите описание изображения здесь

Комментарии:

1. кстати, я обнаружил ошибку в своем коде, она создаст файл в целевом контейнере без данных, пожалуйста, проверьте раздел кода в моем вопросе еще раз и обновите свой код (для тех, кто хочет использовать его повторно). Заранее благодарю