Создание больших zip-файлов в AWS S3 по частям

#python #python-3.x #amazon-s3 #boto3

#python #python-3.x #amazon-s3 #boto3

Вопрос:

Итак, этот вопрос в конечном итоге касается как python, так и S3.

Допустим, у меня есть корзина S3 с этими файлами :

 file1 --------- 2GB
file2 --------- 3GB
file3 --------- 1.9GB
file4 --------- 5GB
  

Эти файлы были загружены с использованием предварительно указанного URL-адреса публикации для S3

Что мне нужно сделать, так это предоставить клиенту возможность загружать их все в ZIP (или аналогичном формате), но я не могу сделать это ни в памяти, ни в серверном хранилище, поскольку это бессерверная настройка.

Насколько я понимаю, в идеале сервер должен:

  1. Запустите задание multipartupload на S3
  2. Вероятно, необходимо отправить фрагмент в составное задание в качестве заголовка zip-файла;
  3. Загружайте каждый файл в корзине по частям каким-либо потоком, чтобы не переполнять память
  4. Используйте указанный выше поток, чтобы создать для них zip-фрагмент и отправить его в составном задании
  5. Завершите составное задание и zip-файл

Честно говоря, я понятия не имею, как этого добиться и возможно ли это вообще, но некоторые вопросы :

  • Как мне загрузить файл в S3 по частям? Предпочтительно использовать boto3 или botocore
  • Как мне создать zip-файл по частям при освобождении памяти?
  • Как мне подключить все это к многопользовательской загрузке?

Редактировать: Теперь, когда я думаю об этом, может быть, мне даже не нужно помещать ZIP-файл в S3, я могу просто напрямую передавать поток клиенту, верно? На самом деле это было бы намного лучше

Вот некоторый гипотетический код, предполагающий мою правку выше :

   #Let's assume Flask
  @app.route(/'download_bucket_as_zip'):
  def stream_file():
    def stream():
      #Probably needs to yield zip headers/metadata?
      for file in getFilesFromBucket():
         for chunk in file.readChunk(4000):
            zipchunk = bytesToZipChunk(chunk)
            yield zipchunk
    return Response(stream(), mimetype='application/zip')
  

Комментарии:

1. Вы когда-нибудь получали рабочее решение для этого? Я пытаюсь сделать то же самое и попробовал ответ, который вы приняли, и продолжаю получать проблемы. Спасибо

2. @pycode81 насколько я помню, я решил проблему с помощью aiozipstream и aiobotocore, но у меня больше нет исходного кода

Ответ №1:

Ваш вопрос чрезвычайно сложен, потому что его решение может привести вас к множеству кроличьих нор.

Я считаю, что Рахул Айер на правильном пути, потому что ИМХО было бы проще запустить новый экземпляр EC2, сжать файлы в этом экземпляре и переместить их обратно в корзину S3, которая обслуживает zip-файлы только клиенту.

Если бы ваши файлы были меньше, вы могли бы использовать AWS Cloudfront для обработки архивирования, когда клиент запрашивает файл.

Во время моего исследования я заметил, что другие языки, такие как .Net и Java, имели API, которые обрабатывают потоковую передачу в zip-файлы. Я также посмотрел на zipstream, который был разветвлен несколько раз. Неясно, как zipstream можно использовать для потоковой передачи файла для архивирования.

Приведенный ниже код разбивает файл на части и записывает фрагменты в zip-файл. Входные файлы были размером около 12 ГБ, а выходной файл — почти 5 ГБ.

Во время тестирования я не видел каких-либо серьезных проблем с использованием памяти или большими скачками.

Я добавил некоторый псевдокод S3 в один из постов ниже. Я думаю, что требуется дополнительное тестирование, чтобы понять, как этот код работает с файлами в S3.

 from io import RawIOBase
from zipfile import ZipFile
from zipfile import ZipInfo
from zipfile import ZIP_DEFLATED

# This module is needed for ZIP_DEFLATED
import zlib


class UnseekableStream(RawIOBase):
def __init__(self):
    self._buffer = b''

def writable(self):
    return True

def write(self, b):
    if self.closed:
        raise ValueError('The stream was closed!')
    self._buffer  = b
    return len(b)

def get(self):
    chunk = self._buffer
    self._buffer = b''
    return chunk


def zipfile_generator(path, stream):
   with ZipFile(stream, mode='w') as zip_archive:
       z_info = ZipInfo.from_file(path)
       z_info.compress_type = ZIP_DEFLATED
       with open(path, 'rb') as entry, zip_archive.open(z_info, mode='w') as dest: 
          for chunk in iter(lambda: entry.read(16384), b''): # 16384 is the maximum size of an SSL/TLS buffer.
             dest.write(chunk)
             yield stream.get()
 yield stream.get()


stream = UnseekableStream()
# each on the input files was 4gb
files = ['input.txt', 'input2.txt', 'input3.txt']
with open("test.zip", "wb") as f:
   for item in files:
      for i in zipfile_generator(item, stream):
         f.write(i)
         f.flush()
stream.close()
f.close()
  

псевдокод s3 /почтовый индекс

Этот код является строго гипотетическим, поскольку он нуждается в тестировании.

 from io import RawIOBase
from zipfile import ZipFile
from zipfile import ZipInfo
from zipfile import ZIP_DEFLATED
import os

import boto3

# This module is needed for ZIP_DEFLATED
import zlib

session = boto3.Session(
aws_access_key_id='XXXXXXXXXXXXXXXXXXXXXXX',
aws_secret_access_key='XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX',
region_name='XXXXXXXXXX')

s3 = session.resource('s3')
bucket_name = s3.Bucket('bucket name')

class UnseekableStream(RawIOBase):
   def __init__(self):
      self._buffer = b''

   def writable(self):
      return True

   def write(self, b):
      if self.closed:
        raise ValueError('The stream was closed!')
    self._buffer  = b
    return len(b)

    def get(self):
      chunk = self._buffer
      self._buffer = b''
      return chunk


def zipfile_generator(path, stream):
   with ZipFile(stream, mode='w') as zip_archive:
       z_info = ZipInfo.from_file(path)
       z_info.compress_type = ZIP_DEFLATED
       with open(path, 'rb') as entry, zip_archive.open(z_info, mode='w') as dest:
           for chunk in iter(lambda: entry.read(16384), b''):
            dest.write(chunk)
              yield stream.get()
    yield stream.get()


stream = UnseekableStream()
with open("test.zip", "wb") as f:
   for file in bucket_name.objects.all():
     obj = s3.get_object(Bucket=bucket_name, Key=file.key)
     for i in zipfile_generator(obj.get(), stream):
        f.write(i)
        f.flush()
stream.close()
f.close()
  

Ответ №2:

Что мне нужно сделать, так это предоставить клиенту возможность загружать их все в ZIP (или аналогичном формате), но я не могу сделать это ни в памяти, ни в серверном хранилище, поскольку это бессерверная настройка.

Когда вы говорите «меньше сервера», если вы имеете в виду, что хотели бы использовать Lambda для создания zip-файла в S3, вы столкнетесь с несколькими ограничениями:

  • У Lambda есть ограничение по времени, в течение которого функции могут выполняться.
  • Поскольку Lambda имеет ограничение по объему памяти, у вас могут возникнуть проблемы со сборкой большого файла в лямбда-функции
  • У Lambda есть ограничение на максимальный размер вызова PUT.

По вышеуказанным причинам я думаю, что следующий подход лучше:

  • Когда файлы потребуются, создайте экземпляр EC2 «на лету». Возможно, ваша лямбда-функция может инициировать создание экземпляра EC2.
  • скопируйте все файлы в хранилище экземпляров компьютера или даже EFS.
  • Сжимайте файлы в zip
  • Загрузите файл обратно в S3 или отправьте файл напрямую
  • Уничтожьте экземпляр EC2.

На мой взгляд, это значительно упростило бы код, который вам нужно написать, поскольку любой код, который выполняется на вашем ноутбуке / настольном компьютере, вероятно, будет работать на экземпляре EC2. У вас также не будет ограничений по времени / пространству, характерных для lambda.

Поскольку вы можете избавиться от экземпляра EC2 после загрузки zip-файла обратно в S3, вам не нужно беспокоиться о стоимости постоянно работающего сервера — просто запустите его, когда вам это нужно, и отключите, когда закончите.

Код для сжатия нескольких файлов в папке может быть таким простым, как :

От:https://code.tutsplus.com/tutorials/compressing-and-extracting-files-in-python—cms-26816

 import os
import zipfile
 
fantasy_zip = zipfile.ZipFile('C:\Stories\Fantasy\archive.zip', 'w')
 
for folder, subfolders, files in os.walk('C:\Stories\Fantasy'):
 
    for file in files:
        if file.endswith('.pdf'):
            fantasy_zip.write(os.path.join(folder, file), os.path.relpath(os.path.join(folder,file), 'C:\Stories\Fantasy'), compress_type = zipfile.ZIP_DEFLATED)
 
fantasy_zip.close()
  

Комментарии:

1. что касается части EC2, я предлагаю вам ознакомиться с ECS, которая является именно тем, что вы описали.

Ответ №3:

 import io


class S3File(io.RawIOBase):
    def __init__(self, s3_object):
        self.s3_object = s3_object
        self.position = 0

    def __repr__(self):
        return "<%s s3_object=%r>" % (type(self).__name__, self.s3_object)

    @property
    def size(self):
        return self.s3_object.content_length

    def tell(self):
        return self.position

    def seek(self, offset, whence=io.SEEK_SET):
        if whence == io.SEEK_SET:
            self.position = offset
        elif whence == io.SEEK_CUR:
            self.position  = offset
        elif whence == io.SEEK_END:
            self.position = self.size   offset
        else:
            raise ValueError("invalid whence (%r, should be %d, %d, %d)" % (
                whence, io.SEEK_SET, io.SEEK_CUR, io.SEEK_END
            ))

        return self.position

    def seekable(self):
        return True

    def read(self, size=-1):
        if size == -1:
            # Read to the end of the file
            range_header = "bytes=%d-" % self.position
            self.seek(offset=0, whence=io.SEEK_END)
        else:
            new_position = self.position   size

            # If we're going to read beyond the end of the object, return
            # the entire object.
            if new_position >= self.size:
                return self.read()

            range_header = "bytes=%d-%d" % (self.position, new_position - 1)
            self.seek(offset=size, whence=io.SEEK_CUR)

        return self.s3_object.get(Range=range_header)["Body"].read()

    def readable(self):
        return True


if __name__ == "__main__":
    import zipfile

    import boto3

    s3 = boto3.resource("s3")
    s3_object = s3.Object(bucket_name="bukkit", key="bagit.zip")

    s3_file = S3File(s3_object)

    with zipfile.ZipFile(s3_file) as zf:
        print(zf.namelist())
  

Ответ №4:

Может быть, лучше использовать один из написанных на javascript zip-кодировщиков, например JSZip . Или что-то подобное node-lz4