#python #apache-beam
Вопрос:
У меня есть .csv
файл, который я пытаюсь прочитать, используя apache_beam.io.ReadFromText()
в конвейере ( beam
это псевдоним для apache_beam
):
reader = beam.io.ReadFromText(csv_path, skip_header_lines=1)
У меня есть класс чтения строк, подобный следующему:
class RowReader(beam.DoFn):
def process(self, row):
row_delimited = row.split(",")
text_1 = str(row_delimited[1]).encode()
text_2 = str(row_delimited[4]).encode()
image_1 = tf.io.read_file(row_delimited[-3]).numpy()
image_2 = tf.io.read_file(row_delimited[-2]).numpy()
label = row_delimited[-1]
yield {"text_1": text_1, "image_1": image_1,
"text_2": text_2, "image_2": image_2,
"label": label}
В одной из строк csv-файла есть запись, подобная следующей:
Но beam.io.ReadFromText()
расщепляется на основе n
s. Но проблема в том, что строка именно такая:
1357240852939218946,"#IndiaFightsCorona:
В то время как фактический первый элемент после ,
:
#IndiaFightsCorona:nnNearly 4.5 million beneficiaries vaccinated against #COVID19 in 19 days.nnIndia is the fastest country to cross landmark of vaccinating 4 million beneficiaries in merely 18 days.nn#StaySafe #IndiaWillWin #Unite2FightCorona
Я пытался перейти strip_trailing_newlines=False
к ReadFromText()
, но это не помогло. Как заставить beam
игнорировать n
разделение?
Есть какие-нибудь обходные пути?
Ответ №1:
Похоже, вы хотите использовать skip_trailing_newlines
флаг в соответствии с документами:
reader = beam.io.ReadFromText(csv_path, skip_header_lines=1, skip_trailing_newlines=False)
Работает ли это сейчас?
Комментарии:
1. На самом деле нет. Я, вероятно, неправильно понял этот аргумент. Скорее всего, это не возымеет никакого эффекта.
2. Хорошо, спасибо, что проверили. Можете ли вы распечатать в виде блока кода фактическую строку, которую вы пытаетесь проанализировать, вместо того, чтобы показывать ее в виде скриншота? Я думаю, что так было бы проще помочь
3. Добавлен. Посмотрим, имеет ли это смысл сейчас.
Ответ №2:
apache_beam.io.ReadFromText
предполагается, что все новые строки обозначают записи. Если в вашем csv-файле есть записи, охватывающие строки, есть несколько вариантов.
(1) Создайте коллекцию имен файлов (например, используя файлы соответствия или просто объединяя и используя beam.Create
), за которой следует DoFn
файл, открывающий файл (возможно, используя вызовы файловых систем для поддержки GCS, HDFS и т. Д.) И выводящий строки с помощью программы чтения CSV Python. (Недостатком ReadFromText является то, что это заставляет весь файл считываться одним работником.)
или (2) Используйте apache_beam.dataframe.io.read_csv
API-интерфейсы фреймов данных Beam. Это даст отложенный фрейм данных, который вы можете использовать, как если бы это был фрейм данных Pandas, или преобразовать в коллекцию ПК apache_beam.dataframe.convert.to_pcollection
.
Комментарии:
1. Итак, вы имеете в виду что-то вроде того, что я сначала прочитал CSV,
apache_beam.dataframe.io.read_csv
а затем сделал его объектом PCollection. Затем я передаю его в качестве компонента трубопровода при построении своего трубопровода?2. Для первого варианта я сделал что-то вроде:
gcs_file = beam.io.filesystems.FileSystems.open(csv_path)
а потомcsv_reader = csv.reader(io.TextIOWrapper(gcs_file))
. В данном случае у нас проблема с заголовком.
Ответ №3:
Вот что я сделал. Определил RowReader
как следующее:
class RowReader(beam.DoFn):
def process(self, row):
text_1 = str(row[1]).encode()
text_2 = str(row[4]).encode()
image_1 = tf.io.read_file(row[7]).numpy()
image_2 = tf.io.read_file(row[8]).numpy()
label = int(row[9])
yield {"text_1": text_1, "image_1": image_1,
"text_2": text_2, "image_2": image_2,
"label": label}
Затем определил конвейер следующим образом:
with beam.Pipeline(args.runner, options=options) as pipe:
df = (
pipe
| read_csv(csv_path)
)
pc = to_pcollection(df)
_ = (
pc
| 'Decode from CSV' >> beam.ParDo(RowReader())
...
)
read_csv
импортируется как from apache_beam.dataframe.io import read_csv
и to_pcollection
импортируется как from apache_beam.dataframe.convert import to_pcollection
.