Почему при доступе к хранилищу данных происходит сбой потока данных?

#google-app-engine #google-cloud-datastore #google-cloud-dataflow #app-engine-flexible

#google-app-engine #google-cloud-хранилище данных #google-cloud-dataflow #app-engine-гибкий

Вопрос:

В большинстве случаев мой простой конвейер потока данных успешно копирует несколько типов из хранилища данных одного проекта в другое. Но в определенных типах (около 5% из них) мы всегда получаем эти ошибки.

Поток данных повторяется 4-8 раз с задержкой около 75 секунд, а затем конвейер завершается сбоем.

Как я могу диагностировать и устранить это?

РЕДАКТИРОВАТЬ: Ответ включает в себя: (1) в библиотеке хранилища данных, используемой Dataflow, была ошибка; после того, как они исправили эту ошибку, вы можете увидеть основную причину и (2) размер пакета по умолчанию для размещения объектов в этой библиотеке равен 500, что также является максимальным, и это превышает ограничение API хранилища данных в 10 Мб.

(очень просто) Конвейер выглядит следующим образом:

 Query.Builder qb = Query.newBuilder();
qb.addKindBuilder().setName(kindName);
Query query = qb.build();
Read dsRead = DatastoreIO.v1().read().withProjectId(inputProject).withQuery(query);
Write dsWrite = DatastoreIO.v1().write().withProjectId(outputProject);
PCollection<Entity> sourceEntities = pipeline.apply("read", dsRead);
Bound<Entity, Entity> entityFromSrcToTarget = ParDo.of(new EntityDoFn());/*Simple DoFn that copies Entities for insertion to target*/
PCollection<Entity> clonedEntities = sourceEntities.apply("clone-entity", entityFromSrcToTarget);
clonedEntities.apply("write-to-ds", dsWrite);
  

Первая трассировка стека

 com.google.datastore.v1.client.DatastoreException: I/O error, code=UNAVAILABLE at
com.google.datastore.v1.client.RemoteRpc.makeException(RemoteRpc.java:126) at
com.google.datastore.v1.client.RemoteRpc.call(RemoteRpc.java:95) at
com.google.datastore.v1.client.Datastore.commit(Datastore.java:84) at
com.google.cloud.dataflow.sdk.io.datastore.DatastoreV1$DatastoreWriterFn.flushBatch(DatastoreV1.java:925) at
com.google.cloud.dataflow.sdk.io.datastore.DatastoreV1$DatastoreWriterFn.finishBundle(DatastoreV1.java:899) Caused by: java.io.IOException: insufficient data written at
sun.net.www.protocol.http.HttpURLConnection$StreamingOutputStream.close(HttpURLConnection.java:3500) at
com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:81) at
com.google.api.client.http.HttpRequest.execute(HttpRequest.java:981) at
com.google.datastore.v1.client.RemoteRpc.call(RemoteRpc.java:87) at
com.google.datastore.v1.client.Datastore.commit(Datastore.java:84) at
com.google.cloud.dataflow.sdk.io.datastore.DatastoreV1$DatastoreWriterFn.flushBatch(DatastoreV1.java:925) at
com.google.cloud.dataflow.sdk.io.datastore.DatastoreV1$DatastoreWriterFn.finishBundle(DatastoreV1.java:899) at
com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.finishBundle(DoFnRunnerBase.java:158) at
com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn.finishBundle(SimpleParDoFn.java:196) at
com.google.cloud.dataflow.sdk.runners.worker.ForwardingParDoFn.finishBundle(ForwardingParDoFn.java:47) at
com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.finish(ParDoOperation.java:65) at
com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:80) at
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.executeWork(DataflowWorker.java:287) at
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.doWork(DataflowWorker.java:223) at
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:173) at
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.doWork(DataflowWorkerHarness.java:193) at
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:173) at
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:160) at
java.util.concurrent.FutureTask.run(FutureTask.java:266) at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at
java.lang.Thread.run(Thread.java:745)
  

Также

 (9908b474b1492772): java.lang.RuntimeException: 
com.google.cloud.dataflow.sdk.util.UserCodeException: java.lang.RuntimeException: 
com.google.cloud.dataflow.sdk.util.UserCodeException: java.lang.RuntimeException: 
com.google.cloud.dataflow.sdk.util.UserCodeException: java.lang.RuntimeException: 
com.google.cloud.dataflow.sdk.util.UserCodeException: java.lang.RuntimeException: 
com.google.cloud.dataflow.sdk.util.UserCodeException: java.lang.RuntimeException: 
com.google.cloud.dataflow.sdk.util.UserCodeException: 
com.google.datastore.v1.client.DatastoreException: I/O error, code=UNAVAILABLE at 
com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:162) at 
com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnContext.outputWindowedValue(DoFnRunnerBase.java:287) at 
com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnContext.outputWindowedValue(DoFnRunnerBase.java:283) at 
com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnProcessContext$1.outputWindowedValue(DoFnRunnerBase.java:507) at 
com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsViaIteratorsDoFn.processElement(GroupAlsoByWindowsViaIteratorsDoFn.java:125) at 
com.google.cloud.dataflow.sdk.util.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:49) at 
com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.processElement(DoFnRunnerBase.java:138) at 
com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:190) at 
com.google.cloud.dataflow.sdk.runners.worker.ForwardingParDoFn.processElement(ForwardingParDoFn.java:42) at 
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerLoggingParDoFn.processElement(DataflowWorkerLoggingParDoFn.java:47) at 
com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.process(ParDoOperation.java:55) at 
com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:52) at 
com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:202) at 
com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.start(ReadOperation.java:143) at 
com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:72) at 
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.executeWork(DataflowWorker.java:287) at 
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.doWork(DataflowWorker.java:223) at 
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:173) at 
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.doWork(DataflowWorkerHarness.java:193) at 
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:173) at 
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:160) at
 java.util.concurrent.FutureTask.run(FutureTask.java:266) at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at
 java.lang.Thread.run(Thread.java:745)
  

Комментарии:

1. Это выглядит либо как регулирование, либо как перегрузка на стороне хранилища данных. Не могли бы вы предоставить идентификатор работы?

2. Судя по трассировке стека, основной причиной является — com.google.cloud.dataflow.sdk.io.datastore.DatastoreV1$DatastoreWriterFn.finishBundle(DatastoreV1.java:899) Caused by: java.io.IOException: insufficient data written . Я нашел похожую ошибку на github.com/GoogleCloudPlatform/google-cloud-java/pull/1187 , который был отмечен как исправленный путем добавления опции повтора. Используете ли вы последнюю версию API?

3. @SaiPullabhotla Я использую com.google.cloud.dataflow.google-cloud-dataflow-java-sdk-all версии 1.7.0 (от maven).

4. @SaiPullabhotla «который был отмечен как исправленный путем добавления опции повтора». Как мне это сделать? Но также обратите внимание, что поток данных повторяет вид 4-8 раз с интервалом ~ 75 секунд между каждой повторной попыткой.

5. @SamMcVeety Идентификатор задания 2016-10-20_06_37_36-1944046122903944603