Как отправить схему Avro в GCP BigQuery с помощью Java?

#java #spring-boot #google-bigquery #avro

#java #весенняя загрузка #google-bigquery #avro

Вопрос:

Я пытаюсь отправить схему avro в GCP BigQuery, используя Java 11 и Spring 2. Я изучил много информации, но я не нашел примера, как отправить файловую схему avro в формате:

 {"namespace": "example.gcp",
  "type": "record",
  "name": "Client",
  "fields": [
    {"name": "id", "type": "long"},
    {"name": "name", "type": "string"},
    {"name": "phone", "type": ["string", "null"]},
    {"name": "address", "type": ["string", "null"]}
  ]
}
  

Я могу отправить как двоичный файл в формате .avro , используя этот фрагмент кода:

 @PostMapping("/uploadFileAvro")
public ModelAndView handleFileUpload(
        @RequestParam("file") MultipartFile file, @RequestParam("tableName") String tableName)
        throws IOException {

    ListenableFuture<Job> loadJob = this.bigQueryTemplate.writeDataToTable(
            tableName, file.getInputStream(), FormatOptions.avro());

    return getResponse(loadJob, tableName);
}
  

и в форматном .csv файле с использованием кода, подобного этому:

 @PostMapping("/uploadFileCSV")
public ModelAndView handleFileUploadCSV(
        @RequestParam("file") MultipartFile file, @RequestParam("tableName") String tableName)
        throws IOException {

    ListenableFuture<Job> loadJob = this.bigQueryTemplate.writeDataToTable(
            tableName, file.getInputStream(), FormatOptions.csv());

    return getResponse(loadJob, tableName);
}
  

но когда я пытаюсь отправить файл схемы, а не двоичный файл в формате .avro , я получаю сообщение об ошибке:

 java.util.concurrent.ExecutionException: org.springframework.cloud.gcp.bigquery.core.BigQueryException: Error while reading data, error message: The Apache Avro library failed to parse the header with the following error: Invalid data file. Magic does not match: gs://bigquery-prod-upload-us/prod-scotty-fa6aadb4-b3d6-40db-a39f-2025f1a99019
    at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)
    at org.springframework.util.concurrent.SettableListenableFuture.get(SettableListenableFuture.java:119)
    at org.springframework.cloud.springbootbigqueryapp.controller.WebController.getResponse(WebController.java:107)
    at org.springframework.cloud.springbootbigqueryapp.controller.WebController.handleFileUpload(WebController.java:63)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:190)
    at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:138)
    at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:105)
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:878)
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:792)
    at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87)
    at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1040)
    at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:943)
    at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006)
    at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:909)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:652)
    at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:733)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
    at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
    at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
    at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
    at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
    at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:202)
    at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:97)
    at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:541)
    at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:143)
    at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92)
    at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:78)
    at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:343)
    at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:374)
    at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65)
    at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:868)
    at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1590)
    at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.springframework.cloud.gcp.bigquery.core.BigQueryException: Error while reading data, error message: The Apache Avro library failed to parse the header with the following error: Invalid data file. Magic does not match: gs://bigquery-prod-upload-us/prod-scotty-fa6aadb4-b3d6-40db-a39f-2025f1a99019
    at org.springframework.cloud.gcp.bigquery.core.BigQueryTemplate.lambda$createJobFuture$0(BigQueryTemplate.java:170)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    ... 1 more
  

Пожалуйста, может кто-нибудь просветить меня, как отправить файл схемы avro в GCP BigQuery?

PS Как я правильно понимаю из видео, я могу сделать это программно каким-то образом.

Комментарии:

1. Файл схемы — это JSON, а не Avro, поэтому попытка записать его как Avro (или с параметрами формата CSV) не имеет смысла… BigQuery уже будет знать схему из записанных в нее двоичных данных Avro (которые последующие запросы будут отклонять на основе известной им схемы, таким образом, ошибка)

2. Спасибо за ваш ответ. Я добавил формат csv в качестве примера, который работает правильно для меня, когда я отправляю csv-файл в GCP BigQuery. Итак, как я вас понимаю, я могу отправлять только в формате JSON вместо avro?

3. Это может сработать, но я не уверен в варианте использования, потому что в идеале у вас должно быть всего несколько схем, а BigQuery — это инструмент аналитики для больших наборов данных

Ответ №1:

В BigQuery нет концепции самого реестра схем, поэтому вы, вероятно, захотите использовать schema либо для создания таблицы, либо для загрузки данных в bigquery.

В случае создания таблицы вам нужно преобразовать схему avro в схему bigquery. BigQuery не использует представления схемы avro напрямую. Вот пример на Java создания таблиц с явной схемой: https://cloud.google.com/bigquery/docs/samples/bigquery-create-table#bigquery_create_table-java

В случае загрузки нет необходимости передавать схему независимо от данных. Структура Avro OCF обычно включает схему как часть заголовка, а также набор блоков данных. Просто укажите URI для файлов avro: https://cloud.google.com/bigquery/docs/samples/bigquery-load-table-gcs-avro#bigquery_load_table_gcs_avro-java