#python #asynchronous #boto3 #python-asyncio
Вопрос:
Я хочу найти все уникальные пути в корзине S3, мне нужны все пути непосредственно перед уровнем файла. Глубина каталогов может варьироваться, поэтому не все файлы находятся на одной глубине, например, у меня могут быть эти файлы :
data/subdir1/subdir2/file.csv data/subdir1/subdir3/subdir4/subdir5/file2.csv data/subdir6/subdir7/subdir8/file3.csv
и мне нужны эти каталоги:
data/subdir1/subdir2/ data/subdir1/subdir3/subdir4/subdir5/ data/subdir6/subdir7/subdir8/
Я использую приведенный ниже код, чтобы получить их. Я использую async for
цикл с пагинатором, потому что я думал, что они будут обрабатываться одновременно, но я не уверен, что это так. Это кажется очень медленным, поэтому я думаю, что они все еще делаются последовательно:
subfolders = set() current_path = None paginator = self.s3_client.get_paginator("list_objects") async for result in paginator.paginate(Bucket=bucket, Prefix=prefix): for file in result.get("Contents", []): current_path = os.path.dirname(file.get("Key")) if current_path not in subfolders: subfolders.add(current_path) print(f"Part Done") return subfolders
Мой клиент s3_client-это aioboto3
клиент.
Есть ли способ ускорить процесс поиска и сохранения каталогов ?
Примечание: Я понял, что этот метод не приносит мне всех результатов, только те, которые получены из текущего пагинатора. Могу ли я асинхронно получить следующий пагинатор?
Ответ №1:
Я не нашел способа распараллелить возвращаемые объекты, но я распараллелил их, используя множество начальных префиксов, подобных этому:
subfolders = set() prefix_tasks = [get_subfolders(bucket, prefix) for prefix in prefixes] try: for prefix_future in asyncio.as_completed(prefix_tasks): prefix_subfolders = await prefix_future subfolders.update(prefix_subfolders) except KeyError as exc: print(f"Scanning origin bucket failed due to: {exc}") raise exc
где моя get_subfolders
функция:
async def get_subfolders(self, bucket: str, prefix: str) -gt; List[str]: subfolders = set() result = await self.s3_client.list_objects_v2(Bucket=bucket, Prefix=prefix) objects = result.get("Contents") subfolders.update(await self._get_paths_by_depth(objects=objects, depth=4)) # Use next continuation token for pagination for truncated results. while result["IsTruncated"]: result = await self.s3_client.list_objects_v2( Bucket=bucket, Prefix=prefix, ContinuationToken=result["NextContinuationToken"], ) objects = result.get("Contents") subfolders.update(await self._get_paths_by_depth(objects=objects, depth=4)) return subfolders
и моя get_paths_by_depth()
функция заключается в :
async def get_paths_by_depth(self, objects: dict, depth: int) -gt; Set[str]: subfolders = set() current_path = None try: # Get only paths with depth equal to 'depth' levels for bucket_object in objects: current_path = os.path.dirname(bucket_object["Key"]) if current_path.count("/") == depth: subfolders.add(current_path) except Exception as exc: print(f"Getting subfolders failed due to error: {exc}") raise exc return subfolders