Как одновременно перечислять объекты в корзине S3 с помощью aioboto3

#python #asynchronous #boto3 #python-asyncio

Вопрос:

Я хочу найти все уникальные пути в корзине S3, мне нужны все пути непосредственно перед уровнем файла. Глубина каталогов может варьироваться, поэтому не все файлы находятся на одной глубине, например, у меня могут быть эти файлы :

 data/subdir1/subdir2/file.csv data/subdir1/subdir3/subdir4/subdir5/file2.csv data/subdir6/subdir7/subdir8/file3.csv  

и мне нужны эти каталоги:

 data/subdir1/subdir2/ data/subdir1/subdir3/subdir4/subdir5/ data/subdir6/subdir7/subdir8/  

Я использую приведенный ниже код, чтобы получить их. Я использую async for цикл с пагинатором, потому что я думал, что они будут обрабатываться одновременно, но я не уверен, что это так. Это кажется очень медленным, поэтому я думаю, что они все еще делаются последовательно:

 subfolders = set()  current_path = None   paginator = self.s3_client.get_paginator("list_objects")   async for result in paginator.paginate(Bucket=bucket, Prefix=prefix):  for file in result.get("Contents", []):  current_path = os.path.dirname(file.get("Key"))  if current_path not in subfolders:  subfolders.add(current_path)  print(f"Part Done")   return subfolders  

Мой клиент s3_client-это aioboto3 клиент.

Есть ли способ ускорить процесс поиска и сохранения каталогов ?

Примечание: Я понял, что этот метод не приносит мне всех результатов, только те, которые получены из текущего пагинатора. Могу ли я асинхронно получить следующий пагинатор?

Ответ №1:

Я не нашел способа распараллелить возвращаемые объекты, но я распараллелил их, используя множество начальных префиксов, подобных этому:

 subfolders = set()  prefix_tasks = [get_subfolders(bucket, prefix) for prefix in prefixes]  try:   for prefix_future in asyncio.as_completed(prefix_tasks):  prefix_subfolders = await prefix_future  subfolders.update(prefix_subfolders)   except KeyError as exc:  print(f"Scanning origin bucket failed due to: {exc}")  raise exc  

где моя get_subfolders функция:

 async def get_subfolders(self, bucket: str, prefix: str) -gt; List[str]:   subfolders = set()   result = await self.s3_client.list_objects_v2(Bucket=bucket, Prefix=prefix)  objects = result.get("Contents")  subfolders.update(await self._get_paths_by_depth(objects=objects, depth=4))   # Use next continuation token for pagination for truncated results.  while result["IsTruncated"]:  result = await self.s3_client.list_objects_v2(  Bucket=bucket,  Prefix=prefix,  ContinuationToken=result["NextContinuationToken"],  )  objects = result.get("Contents")  subfolders.update(await self._get_paths_by_depth(objects=objects, depth=4))   return subfolders  

и моя get_paths_by_depth() функция заключается в :

 async def get_paths_by_depth(self, objects: dict, depth: int) -gt; Set[str]:  subfolders = set()  current_path = None  try:  # Get only paths with depth equal to 'depth' levels  for bucket_object in objects:  current_path = os.path.dirname(bucket_object["Key"])  if current_path.count("/") == depth:  subfolders.add(current_path)   except Exception as exc:  print(f"Getting subfolders failed due to error: {exc}")  raise exc   return subfolders