Синтаксический анализ строк, содержащих «n» , с помощью «ReadFromText» Apache Beam

#python #apache-beam

Вопрос:

У меня есть .csv файл, который я пытаюсь прочитать, используя apache_beam.io.ReadFromText() в конвейере ( beam это псевдоним для apache_beam ):

 reader = beam.io.ReadFromText(csv_path, skip_header_lines=1)
 

У меня есть класс чтения строк, подобный следующему:

 class RowReader(beam.DoFn):
    def process(self, row):
        row_delimited = row.split(",")
        text_1 = str(row_delimited[1]).encode()
        text_2 = str(row_delimited[4]).encode()
        image_1 = tf.io.read_file(row_delimited[-3]).numpy()
        image_2 = tf.io.read_file(row_delimited[-2]).numpy()
        label = row_delimited[-1]
        
        yield {"text_1": text_1, "image_1": image_1,
                "text_2": text_2, "image_2": image_2,
                "label": label} 
 

В одной из строк csv-файла есть запись, подобная следующей:

введите описание изображения здесь

Но beam.io.ReadFromText() расщепляется на основе n s. Но проблема в том, что строка именно такая:

 1357240852939218946,"#IndiaFightsCorona:
 

В то время как фактический первый элемент после , :

 #IndiaFightsCorona:nnNearly 4.5 million beneficiaries vaccinated against #COVID19 in 19 days.nnIndia is the fastest country to cross landmark of vaccinating 4 million beneficiaries in merely 18 days.nn#StaySafe #IndiaWillWin #Unite2FightCorona
 

Я пытался перейти strip_trailing_newlines=False к ReadFromText() , но это не помогло. Как заставить beam игнорировать n разделение?

Есть какие-нибудь обходные пути?

Ответ №1:

Похоже, вы хотите использовать skip_trailing_newlines флаг в соответствии с документами:

 reader = beam.io.ReadFromText(csv_path, skip_header_lines=1, skip_trailing_newlines=False)
 

Работает ли это сейчас?

Комментарии:

1. На самом деле нет. Я, вероятно, неправильно понял этот аргумент. Скорее всего, это не возымеет никакого эффекта.

2. Хорошо, спасибо, что проверили. Можете ли вы распечатать в виде блока кода фактическую строку, которую вы пытаетесь проанализировать, вместо того, чтобы показывать ее в виде скриншота? Я думаю, что так было бы проще помочь

3. Добавлен. Посмотрим, имеет ли это смысл сейчас.

Ответ №2:

apache_beam.io.ReadFromText предполагается, что все новые строки обозначают записи. Если в вашем csv-файле есть записи, охватывающие строки, есть несколько вариантов.

(1) Создайте коллекцию имен файлов (например, используя файлы соответствия или просто объединяя и используя beam.Create ), за которой следует DoFn файл, открывающий файл (возможно, используя вызовы файловых систем для поддержки GCS, HDFS и т. Д.) И выводящий строки с помощью программы чтения CSV Python. (Недостатком ReadFromText является то, что это заставляет весь файл считываться одним работником.)

или (2) Используйте apache_beam.dataframe.io.read_csv API-интерфейсы фреймов данных Beam. Это даст отложенный фрейм данных, который вы можете использовать, как если бы это был фрейм данных Pandas, или преобразовать в коллекцию ПК apache_beam.dataframe.convert.to_pcollection .

Комментарии:

1. Итак, вы имеете в виду что-то вроде того, что я сначала прочитал CSV, apache_beam.dataframe.io.read_csv а затем сделал его объектом PCollection. Затем я передаю его в качестве компонента трубопровода при построении своего трубопровода?

2. Для первого варианта я сделал что-то вроде: gcs_file = beam.io.filesystems.FileSystems.open(csv_path) а потом csv_reader = csv.reader(io.TextIOWrapper(gcs_file)) . В данном случае у нас проблема с заголовком.

Ответ №3:

Вот что я сделал. Определил RowReader как следующее:

 class RowReader(beam.DoFn):
    def process(self, row):
        text_1 = str(row[1]).encode()
        text_2 = str(row[4]).encode()
        image_1 = tf.io.read_file(row[7]).numpy()
        image_2 = tf.io.read_file(row[8]).numpy()
        label = int(row[9])

        yield {"text_1": text_1, "image_1": image_1,
                "text_2": text_2, "image_2": image_2,
                "label": label} 
 

Затем определил конвейер следующим образом:

 with beam.Pipeline(args.runner, options=options) as pipe:
  df = (
        pipe 
        | read_csv(csv_path)
      )
  pc = to_pcollection(df)
  _ = (
        pc
        | 'Decode from CSV' >> beam.ParDo(RowReader())
        ...
      )
 

read_csv импортируется как from apache_beam.dataframe.io import read_csv и to_pcollection импортируется как from apache_beam.dataframe.convert import to_pcollection .