Как выполнить потоковую вставку массива JSON в таблицу BigQuery в Apache Beam

#json #google-bigquery #apache-beam

#json #google-bigquery #apache-beam

Вопрос:

Мое приложение apache beam получает сообщение в массиве JSON, но вставляет каждую строку в таблицу BigQuery. Как я могу поддержать этот вариант использования в ApacheBeam? Могу ли я разделить каждую строку и вставить ее в таблицу одну за другой?

Пример сообщения в формате JSON:

 [
  {"id": 1, "name": "post1", "price": 10},
  {"id": 2, "name": "post2", "price": 20},
  {"id": 3, "name": "post3", "price": 30}
]
  

Схема таблицы BigQuery:

 [
    {
      "mode": "REQUIRED",
      "name": "id",
      "type": "INT64"
    },
    {
      "mode": "REQUIRED",
      "name": "name",
      "type": "STRING"
    },
    {
      "mode": "REQUIRED",
      "name": "price",
      "type": "INT64"
    }
]
  

Комментарии:

1. Привет, Йошей, извините, но не был уверен в описании проблемы. Вы спрашиваете, получаете ли вы массив JSON в apache Bean, как вы можете его обработать и вставлять строку за строкой в BigQuery. Итак, в вашем примере 3 строки?

2. да, поскольку я описываю вопрос

3. @Yohei, ты пробовал beam.apache.org/documentation/io/built-in/google-bigquery / … ?

Ответ №1:

Вот мое решение. Я преобразовал строку JSON в список один раз, затем c.выводите один за другим. Мой код в Scala, но вы можете сделать то же самое в Java.

     case class MyTranscationRecord(id: String, name: String, price: Int)
    case class MyTranscation(recordList: List[MyTranscationRecord])
    class ConvertJSONTextToMyRecord extends DoFn[KafkaRecord[java.lang.Long, String], MyTranscation]() {
      private val logger: Logger = LoggerFactory.getLogger(classOf[ConvertJSONTextToMyRecord])
      @ProcessElement
      def processElement(c: ProcessContext): Unit = {
        try {
          val mapper: ObjectMapper = new ObjectMapper()
            .registerModule(DefaultScalaModule)
          val messageText = c.element.getKV.getValue
          val transaction: MyRecord = mapper.readValue(messageText, classOf[MyTranscation])
          logger.info(s"successfully converted to an EPC transaction = $transaction")
          for (record <- transaction.recordList) {
              c.output(record)
          }
        } catch {
          case e: Exception =>
            val message = e.getLocalizedMessage   e.getStackTrace
            logger.error(message)
        }
      }
    }