Apache Beam как фильтровать данные на основе значения даты

#java #apache-beam

Вопрос:

Я пытаюсь прочитать записи из файла CSV и отфильтровать записи на основе даты. Я реализовал это следующим образом. Но правильно ли это?

Шаги следующие:

  1. Создание конвейера
  2. Считывание данных из файла
  3. Выполните необходимую фильтрацию
  4. Создайте объект MapElement и преобразуйте запрос заказа в строку
  5. Сопоставление сущности OrderRequest со строкой
  6. Запишите выходные данные в файл

Код:

 // Creating pipeline
Pipeline pipeline = Pipeline.create();

// For transformations Reading from a file
PCollection<String> orderRequest = pipeline
        .apply(TextIO.read().from("src/main/resources/ST/STCheck/OrderRequest.csv"));

PCollection<OrderRequest> pCollectionTransformation = orderRequest
        .apply(ParDo.of(new DoFn<String, OrderRequest>() {

            private static final long serialVersionUID = 1L;

            @ProcessElement
            public void processElement(ProcessContext c) {
                String rowString = c.element();
                if (!rowString.contains("order_id")) {
                    String[] strArr = rowString.split(",");
                    OrderRequest orderRequest = new OrderRequest();
                    orderRequest.setOrder_id(strArr[0]);
                    // Condition to check if the

                    String source1 = strArr[1];
                    DateTimeFormatter fmt1 = DateTimeFormat.forPattern("mm/dd/yyyy");
                    DateTime d1 = fmt1.parseDateTime(source1);
                    System.out.println(d1);

                    String source2 = "4/24/2017";
                    DateTimeFormatter fmt2 = DateTimeFormat.forPattern("mm/dd/yyyy");
                    DateTime d2 = fmt2.parseDateTime(source2);
                    System.out.println(d2);

                    orderRequest.setOrder_date(strArr[1]);
                    System.out.println(strArr[1]);

                    orderRequest.setAmount(Double.valueOf(strArr[2]));
                    orderRequest.setCounter_id(strArr[3]);
                    if (DateTimeComparator.getInstance().compare(d1, d2) > -1) {
                        c.output(orderRequest);
                    }
                }
            }
        }));

// Create a MapElement Object and convert the OrderRequest to String
MapElements<OrderRequest, String> mapElements = MapElements.into(TypeDescriptors.strings())
        .via((OrderRequest orderRequestType) -> orderRequestType.getOrder_id()   " "
                  orderRequestType.getOrder_date()   " "   orderRequestType.getAmount()   " "
                  orderRequestType.getCounter_id());

// Mapping the OrderRequest Entity to String
PCollection<String> pStringList = pCollectionTransformation.apply(mapElements);

// Now Writing the elements to a file
pStringList.apply(TextIO.write().to("src/main/resources/ST/STCheck/OrderRequestOut.csv").withNumShards(1)
        .withSuffix(".csv"));

// To run pipeline
pipeline.run();

System.out.println("We are done!!");
 

Класс Pojo:

 public class OrderRequest  implements Serializable{
    String order_id;
    String order_date;
    double amount;
    String counter_id;
}
 

Хотя я получаю правильный результат, это правильный путь? Две мои главные проблемы-это

 1) How to i access individual columns? So that, I can specify conditions based on that column value.
2) Can we specify headers when reading the data?
 

Ответ №1:

Да, вы можете обрабатывать CSV-файлы, подобные этому, при TextIO.read() условии, что они не содержат полей, встраивающих новые строки, и вы можете распознавать/пропускать строки заголовка. Ваш конвейер выглядит хорошо, хотя в качестве незначительной проблемы со стилем я бы, вероятно, попросил первого ПарДо выполнить только синтаксический анализ, а затем фильтр, который смотрел на дату, чтобы отфильтровать вещи.

Если вы хотите автоматически выводить строки заголовка, вы можете открыть чтение первой строки в своей основной программе (используя стандартные библиотеки java или класс файловых систем Beams) и извлечь ее вручную, передав в свой DoFn для синтаксического анализа.

Я согласен, что более столбчатый подход был бы более естественным. У нас есть это в Python в качестве нашего API фреймов данных, который теперь доступен для общего использования. Вы бы написали что-то вроде

 with beam.Pipeline() as p:
   df = p | beam.dataframe.io.read_csv("src/main/resources/ST/STCheck/OrderRequest.csv")
   filtered = df[df.order_date > limit]
   filtered.write_csv("src/main/resources/ST/STCheck/OrderRequestOut.csv")