#java #spring-boot #google-bigquery #avro
#java #весенняя загрузка #google-bigquery #avro
Вопрос:
Я пытаюсь отправить схему avro в GCP BigQuery, используя Java 11 и Spring 2. Я изучил много информации, но я не нашел примера, как отправить файловую схему avro в формате:
{"namespace": "example.gcp",
"type": "record",
"name": "Client",
"fields": [
{"name": "id", "type": "long"},
{"name": "name", "type": "string"},
{"name": "phone", "type": ["string", "null"]},
{"name": "address", "type": ["string", "null"]}
]
}
Я могу отправить как двоичный файл в формате .avro
, используя этот фрагмент кода:
@PostMapping("/uploadFileAvro")
public ModelAndView handleFileUpload(
@RequestParam("file") MultipartFile file, @RequestParam("tableName") String tableName)
throws IOException {
ListenableFuture<Job> loadJob = this.bigQueryTemplate.writeDataToTable(
tableName, file.getInputStream(), FormatOptions.avro());
return getResponse(loadJob, tableName);
}
и в форматном .csv
файле с использованием кода, подобного этому:
@PostMapping("/uploadFileCSV")
public ModelAndView handleFileUploadCSV(
@RequestParam("file") MultipartFile file, @RequestParam("tableName") String tableName)
throws IOException {
ListenableFuture<Job> loadJob = this.bigQueryTemplate.writeDataToTable(
tableName, file.getInputStream(), FormatOptions.csv());
return getResponse(loadJob, tableName);
}
но когда я пытаюсь отправить файл схемы, а не двоичный файл в формате .avro
, я получаю сообщение об ошибке:
java.util.concurrent.ExecutionException: org.springframework.cloud.gcp.bigquery.core.BigQueryException: Error while reading data, error message: The Apache Avro library failed to parse the header with the following error: Invalid data file. Magic does not match: gs://bigquery-prod-upload-us/prod-scotty-fa6aadb4-b3d6-40db-a39f-2025f1a99019
at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)
at org.springframework.util.concurrent.SettableListenableFuture.get(SettableListenableFuture.java:119)
at org.springframework.cloud.springbootbigqueryapp.controller.WebController.getResponse(WebController.java:107)
at org.springframework.cloud.springbootbigqueryapp.controller.WebController.handleFileUpload(WebController.java:63)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:190)
at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:138)
at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:105)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:878)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:792)
at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87)
at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1040)
at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:943)
at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006)
at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:909)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:652)
at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:733)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:202)
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:97)
at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:541)
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:143)
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92)
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:78)
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:343)
at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:374)
at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65)
at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:868)
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1590)
at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.springframework.cloud.gcp.bigquery.core.BigQueryException: Error while reading data, error message: The Apache Avro library failed to parse the header with the following error: Invalid data file. Magic does not match: gs://bigquery-prod-upload-us/prod-scotty-fa6aadb4-b3d6-40db-a39f-2025f1a99019
at org.springframework.cloud.gcp.bigquery.core.BigQueryTemplate.lambda$createJobFuture$0(BigQueryTemplate.java:170)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
... 1 more
Пожалуйста, может кто-нибудь просветить меня, как отправить файл схемы avro в GCP BigQuery?
PS Как я правильно понимаю из видео, я могу сделать это программно каким-то образом.
Комментарии:
1. Файл схемы — это JSON, а не Avro, поэтому попытка записать его как Avro (или с параметрами формата CSV) не имеет смысла… BigQuery уже будет знать схему из записанных в нее двоичных данных Avro (которые последующие запросы будут отклонять на основе известной им схемы, таким образом, ошибка)
2. Спасибо за ваш ответ. Я добавил формат
csv
в качестве примера, который работает правильно для меня, когда я отправляю csv-файл в GCP BigQuery. Итак, как я вас понимаю, я могу отправлять только в формате JSON вместо avro?3. Это может сработать, но я не уверен в варианте использования, потому что в идеале у вас должно быть всего несколько схем, а BigQuery — это инструмент аналитики для больших наборов данных
Ответ №1:
В BigQuery нет концепции самого реестра схем, поэтому вы, вероятно, захотите использовать schema либо для создания таблицы, либо для загрузки данных в bigquery.
В случае создания таблицы вам нужно преобразовать схему avro в схему bigquery. BigQuery не использует представления схемы avro напрямую. Вот пример на Java создания таблиц с явной схемой: https://cloud.google.com/bigquery/docs/samples/bigquery-create-table#bigquery_create_table-java
В случае загрузки нет необходимости передавать схему независимо от данных. Структура Avro OCF обычно включает схему как часть заголовка, а также набор блоков данных. Просто укажите URI для файлов avro: https://cloud.google.com/bigquery/docs/samples/bigquery-load-table-gcs-avro#bigquery_load_table_gcs_avro-java