#java #apache-beam
Вопрос:
Я пытаюсь прочитать записи из файла CSV и отфильтровать записи на основе даты. Я реализовал это следующим образом. Но правильно ли это?
Шаги следующие:
- Создание конвейера
- Считывание данных из файла
- Выполните необходимую фильтрацию
- Создайте объект MapElement и преобразуйте запрос заказа в строку
- Сопоставление сущности OrderRequest со строкой
- Запишите выходные данные в файл
Код:
// Creating pipeline
Pipeline pipeline = Pipeline.create();
// For transformations Reading from a file
PCollection<String> orderRequest = pipeline
.apply(TextIO.read().from("src/main/resources/ST/STCheck/OrderRequest.csv"));
PCollection<OrderRequest> pCollectionTransformation = orderRequest
.apply(ParDo.of(new DoFn<String, OrderRequest>() {
private static final long serialVersionUID = 1L;
@ProcessElement
public void processElement(ProcessContext c) {
String rowString = c.element();
if (!rowString.contains("order_id")) {
String[] strArr = rowString.split(",");
OrderRequest orderRequest = new OrderRequest();
orderRequest.setOrder_id(strArr[0]);
// Condition to check if the
String source1 = strArr[1];
DateTimeFormatter fmt1 = DateTimeFormat.forPattern("mm/dd/yyyy");
DateTime d1 = fmt1.parseDateTime(source1);
System.out.println(d1);
String source2 = "4/24/2017";
DateTimeFormatter fmt2 = DateTimeFormat.forPattern("mm/dd/yyyy");
DateTime d2 = fmt2.parseDateTime(source2);
System.out.println(d2);
orderRequest.setOrder_date(strArr[1]);
System.out.println(strArr[1]);
orderRequest.setAmount(Double.valueOf(strArr[2]));
orderRequest.setCounter_id(strArr[3]);
if (DateTimeComparator.getInstance().compare(d1, d2) > -1) {
c.output(orderRequest);
}
}
}
}));
// Create a MapElement Object and convert the OrderRequest to String
MapElements<OrderRequest, String> mapElements = MapElements.into(TypeDescriptors.strings())
.via((OrderRequest orderRequestType) -> orderRequestType.getOrder_id() " "
orderRequestType.getOrder_date() " " orderRequestType.getAmount() " "
orderRequestType.getCounter_id());
// Mapping the OrderRequest Entity to String
PCollection<String> pStringList = pCollectionTransformation.apply(mapElements);
// Now Writing the elements to a file
pStringList.apply(TextIO.write().to("src/main/resources/ST/STCheck/OrderRequestOut.csv").withNumShards(1)
.withSuffix(".csv"));
// To run pipeline
pipeline.run();
System.out.println("We are done!!");
Класс Pojo:
public class OrderRequest implements Serializable{
String order_id;
String order_date;
double amount;
String counter_id;
}
Хотя я получаю правильный результат, это правильный путь? Две мои главные проблемы-это
1) How to i access individual columns? So that, I can specify conditions based on that column value.
2) Can we specify headers when reading the data?
Ответ №1:
Да, вы можете обрабатывать CSV-файлы, подобные этому, при TextIO.read()
условии, что они не содержат полей, встраивающих новые строки, и вы можете распознавать/пропускать строки заголовка. Ваш конвейер выглядит хорошо, хотя в качестве незначительной проблемы со стилем я бы, вероятно, попросил первого ПарДо выполнить только синтаксический анализ, а затем фильтр, который смотрел на дату, чтобы отфильтровать вещи.
Если вы хотите автоматически выводить строки заголовка, вы можете открыть чтение первой строки в своей основной программе (используя стандартные библиотеки java или класс файловых систем Beams) и извлечь ее вручную, передав в свой DoFn для синтаксического анализа.
Я согласен, что более столбчатый подход был бы более естественным. У нас есть это в Python в качестве нашего API фреймов данных, который теперь доступен для общего использования. Вы бы написали что-то вроде
with beam.Pipeline() as p:
df = p | beam.dataframe.io.read_csv("src/main/resources/ST/STCheck/OrderRequest.csv")
filtered = df[df.order_date > limit]
filtered.write_csv("src/main/resources/ST/STCheck/OrderRequestOut.csv")