Google App Engine не может запустить задания потока данных

#java #google-app-engine #google-cloud-platform #google-cloud-dataflow

#java #google-app-engine #google-облачная платформа #google-облако-поток данных

Вопрос:

Вот сообщение об ошибке, которое я распечатываю при сбое. Я использую локальный сервер разработчика и перехожу к http://localhost:8080/dataflow/schedule чтобы вызвать doGet() для запуска конвейера потока данных. Я также использую учетную запись службы App Engine по умолчанию (@appspot.gserviceaccount.com ) для учетных данных.

Вот мой код для запуска задания,

 @WebServlet(name = "dataflowscheduler", value = "/dataflow/schedule")
public class DataflowSchedulingServlet extends HttpServlet {
    @Override
    public void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException {


        Properties properties = System.getProperties();


        try{
            String[] args = {
                    "--project=<project_name>",
                    "--runner=DataflowRunner",
                    "--stagingLocation=gs://<project_name>/temp/cronjobs",
                    "--maxNumWorkers=1",
                    "--gcpTempLocation=gs://<project_name>/temp/gcpTempLocation",
                    "--tempLocation=gs://<project_name>/temp/tempLocation",
                    "--driverClassName=org.postgresql.Driver",
                    "--connectionURL=jdbc:postgresql://example.com:port/production",
                    "--username=<username>",
                    "--password=<password>",
                    "--driverJars=gs://<project_name>/temp/postgresql-42.2.5.jar",
                    "--bigQueryLoadingTemporaryDirectory=gs://<project_name>/temp/",
                    "--connectionProperties='unicode=trueamp;characterEncoding=UTF-8'",
                    "--query=SELECT * FROM public.view_relations",
                    "--datasetname=flyhomes_production",
                    "--schemaname=public",
                    "--tablename=view_relations",
                    "--outputTable=<project_name>:<dataset>.view_relations",
                    "--dataflowJobFile=/Users/annasun/GoogleCloudProject/postgres-to-bigquery/out.json"};

            JdbcToBigQuery.main(args);

            System.out.println("STARTJOB() !!! ");

        } catch (InterruptedException e) {
            response.getWriter().println( "Exception: "   Arrays.toString(e.getStackTrace()));
        }

        response.setContentType("text/plain");
        response.getWriter().println("Hello App Engine - Standard using "
             //     SystemProperty.version.get()
                  " Java "   properties.get("java.specification.version"));

    }
  

И вот основная функция,

 public static void main(String[] args) throws IOException, InterruptedException {
    System.out.println("HelloWorld()!" );

    // Parse the user options passed from the command-line
    JdbcConverters.JdbcToBigQueryOptions options =
            PipelineOptionsFactory.fromArgs(args)
                    .withValidation()
                    .as(JdbcConverters.JdbcToBigQueryOptions.class);

    String datasetName = options.getDatasetname().toString();
    String jobName = options.getJobName();
    String tableName =  options.getTablename().toString().replace("_", "-");
    jobName = jobName   "-"   tableName;
    options.setJobName(jobName);

        System.out.println("run_updateTable_production(options)");
        run_updateTable_production(options);
        System.out.println("AFTER -- run_updateTable_production(options) ");

}


private static void run_updateTable_production(JdbcConverters.JdbcToBigQueryOptions options)
        throws InterruptedException{

    Timestamp lastUpdatedTime = SchemaCreator.getLastUpdatedTimeFromBigQuery(options);
    System.out.println("LAST UPDATED TIME IS "   lastUpdatedTime);

     if(lastUpdatedTime != null ) {
         System.out.println("!! LAST UPDATED TIME IS "   lastUpdatedTime);

         String query_base = options.getQuery().toString();
         String query_update = query_base   " WHERE updated_at > '"   lastUpdatedTime
                   "' ORDER BY updated_at, id ";
         String jobName = options.getJobName();
       //  select * from public.listings WHERE updated_at > lastUpdatedTime
       //  ORDER BY updated_at, id OFFSET 100 LIMIT 50

         options.setQuery(ValueProvider.StaticValueProvider.of(query_update));
         System.out.println("QUERY IS : "   options.getQuery());
         options.setJobName(jobName   "-UPDATE-"
                   lastUpdatedTime.toString().replace(":", "-").replace(".", "-"));
         System.out.println(jobName   "-UPDATE-"
                   lastUpdatedTime.toString().replace(":", "-").replace(".", "-"));
         run(options);

     } else {
         run_createTable_Recursive(options);
     }

    System.out.println("FINISHED -- run_updateTable_production(options) ");
}



/**
 * Runs the pipeline with the supplied options.
 *
 * @param options The execution parameters to the pipeline.
 * @return The result of the pipeline execution.
 */
private static PipelineResult run(JdbcConverters.JdbcToBigQueryOptions options) {
    System.out.println("BEFORE Pipeline.create!!!!");
    // Create the pipeline
    Pipeline pipeline = Pipeline.create(options);

    System.out.println("AFTER Pipeline.create!!!!");

    /*
     * Steps: 1) Read records via JDBC and convert to TableRow via RowMapper
     *        2) Append TableRow to BigQuery via BigQueryIO
     */
    pipeline
            /*
             * Step 1: Read records via JDBC and convert to TableRow
             *         via {@link org.apache.beam.sdk.io.jdbc.JdbcIO.RowMapper}
             */
            .apply(
                    "Read from JdbcIO",
                    DynamicJdbcIO.<TableRow>read()
                            .withDataSourceConfiguration(
                                    DynamicJdbcIO.DynamicDataSourceConfiguration.create(
                                            options.getDriverClassName(), options.getConnectionURL())
                                            .withUsername(options.getUsername())
                                            .withPassword(options.getPassword())
                                            .withDriverJars(options.getDriverJars())
                                            .withConnectionProperties(options.getConnectionProperties()))
                            .withQuery(options.getQuery())
                            .withCoder(TableRowJsonCoder.of())
                            .withRowMapper(JdbcConverters.getResultSetToTableRow()))
            /*
             * Step 2: Append TableRow to an existing BigQuery table
             */
            .apply(
                    "Write to BigQuery",
                    BigQueryIO.writeTableRows()
                            .withoutValidation()
                            .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
                            .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
                            .withCustomGcsTempLocation(options.getBigQueryLoadingTemporaryDirectory())
                            .to(options.getOutputTable()));

    System.out.println("AFTER Pipeline.APPLY!!!!");
    // Execute the pipeline and return the result.
    return pipeline.run();
}
  

Однако я получил ошибку сервера.

ОШИБКА HTTP 500 Проблема с доступом к / потоку данных / расписанию.

 Server Error
Caused by:

    java.lang.RuntimeException: Error while staging packages
            at org.apache.beam.runners.dataflow.util.PackageUtil.stageClasspathElements(PackageUtil.java:398)
            at org.apache.beam.runners.dataflow.util.PackageUtil.stageClasspathElements(PackageUtil.java:271)
            at org.apache.beam.runners.dataflow.util.GcsStager.stageFiles(GcsStager.java:80)
            at org.apache.beam.runners.dataflow.util.GcsStager.stageDefaultFiles(GcsStager.java:68)
            at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:713)
            at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:179)
            at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313)
            at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299)
            at com.flyhomes.cloud.dataflow.JdbcToBigQuery.run(JdbcToBigQuery.java:258)
            at com.flyhomes.cloud.dataflow.JdbcToBigQuery.run_updateTable_production(JdbcToBigQuery.java:140)
            at com.flyhomes.cloud.dataflow.JdbcToBigQuery.main(JdbcToBigQuery.java:104)
            at com.flyhomes.cloud.dataflow.DataflowSchedulingServlet.doGet(DataflowSchedulingServlet.java:64)
            at javax.servlet.http.HttpServlet.service(HttpServlet.java:687)
            at javax.servlet.http.HttpServlet.service(HttpServlet.java:790)
            at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:865)
            at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1655)
            at com.google.appengine.tools.development.ResponseRewriterFilter.doFilter(ResponseRewriterFilter.java:134)
            at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1642)
            at com.google.appengine.tools.development.HeaderVerificationFilter.doFilter(HeaderVerificationFilter.java:34)
            at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1642)
            at com.google.appengine.api.blobstore.dev.ServeBlobFilter.doFilter(ServeBlobFilter.java:63)
            at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1642)
            at com.google.apphosting.utils.servlet.TransactionCleanupFilter.doFilter(TransactionCleanupFilter.java:48)
            at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1642)
            at com.google.appengine.tools.development.jetty9.StaticFileFilter.doFilter(StaticFileFilter.java:123)
            at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1642)
            at com.google.appengine.tools.development.DevAppServerModulesFilter.doDirectRequest(DevAppServerModulesFilter.java:366)
            at com.google.appengine.tools.development.DevAppServerModulesFilter.doDirectModuleRequest(DevAppServerModulesFilter.java:349)
            at com.google.appengine.tools.development.DevAppServerModulesFilter.doFilter(DevAppServerModulesFilter.java:116)
            at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1642)
            at com.google.appengine.tools.development.DevAppServerRequestLogFilter.doFilter(DevAppServerRequestLogFilter.java:44)
            at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1634)
            at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:533)
            at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:146)
            at org.eclipse.jetty.security.SecurityHandler.handle(SecurityHandler.java:524)
            at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
            at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:257)
            at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1595)
            at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:255)
            at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1317)
            at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:203)
            at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:473)
            at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1564)
            at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:201)
            at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1219)
            at com.google.appengine.tools.development.jetty9.DevAppEngineWebAppContext.doScope(DevAppEngineWebAppContext.java:94)
            at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:144)
            at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
            at com.google.appengine.tools.development.jetty9.JettyContainerService$ApiProxyHandler.handle(JettyContainerService.java:595)
            at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
            at org.eclipse.jetty.server.Server.handle(Server.java:531)
            at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:352)
            at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:260)
            at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:281)
            at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:102)
            at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:118)
            at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:333)
            at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:310)
            at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:168)
            at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.produce(EatWhatYouKill.java:132)
            at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:762)
            at org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:680)
            at java.lang.Thread.run(Thread.java:748)
    Caused by: java.lang.NullPointerException
            at com.google.appengine.tools.development.RequestEndListenerHelper.getListeners(RequestEndListenerHelper.java:52)
            at com.google.appengine.tools.development.RequestEndListenerHelper.register(RequestEndListenerHelper.java:39)
            at com.google.appengine.tools.development.RequestThreadFactory$1$1.start(RequestThreadFactory.java:65)
            at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:950)
            at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1357)
            at org.apache.beam.repackaged.beam_runners_google_cloud_dataflow_java.com.google.common.util.concurrent.MoreExecutors$ListeningDecorator.execute(MoreExecutors.java:530)
            at java.util.concurrent.CompletableFuture.asyncRunStage(CompletableFuture.java:1640)
            at java.util.concurrent.CompletableFuture.runAsync(CompletableFuture.java:1858)
            at org.apache.beam.sdk.util.MoreFutures.supplyAsync(MoreFutures.java:101)
            at org.apache.beam.runners.dataflow.util.PackageUtil.stagePackage(PackageUtil.java:170)
            at org.apache.beam.runners.dataflow.util.PackageUtil.lambda$stageClasspathElements$2(PackageUtil.java:359)
            at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
            at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
            at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:443)
            at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
            at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
            at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
            at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
    Caused by:
    java.lang.NullPointerException
            at com.google.appengine.tools.development.RequestEndListenerHelper.getListeners(RequestEndListenerHelper.java:52)
            at com.google.appengine.tools.development.RequestEndListenerHelper.register(RequestEndListenerHelper.java:39)
            at com.google.appengine.tools.development.RequestThreadFactory$1$1.start(RequestThreadFactory.java:65)
            at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:950)
            at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1357)
            at org.apache.beam.repackaged.beam_runners_google_cloud_dataflow_java.com.google.common.util.concurrent.MoreExecutors$ListeningDecorator.execute(MoreExecutors.java:530)
            at java.util.concurrent.CompletableFuture.asyncRunStage(CompletableFuture.java:1640)
            at java.util.concurrent.CompletableFuture.runAsync(CompletableFuture.java:1858)
            at org.apache.beam.sdk.util.MoreFutures.supplyAsync(MoreFutures.java:101)
            at org.apache.beam.runners.dataflow.util.PackageUtil.stagePackage(PackageUtil.java:170)
            at org.apache.beam.runners.dataflow.util.PackageUtil.lambda$stageClasspathElements$2(PackageUtil.java:359)
            at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
            at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
            at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:443)
            at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
            at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
            at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
            at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
  

и сообщение об ошибке в журнале,

 {
 "error": {
  "errors": [
   {
    "domain": "global",
    "reason": "required",
    "message": "Anonymous caller does not have storage.buckets.list access to project <project_number>.",
    "locationType": "header",
    "location": "Authorization"
   }
  ],
  "code": 401,
  "message": "Anonymous caller does not have storage.buckets.list access to project <project_number>."
 }
}
  

Комментарии:

1. Пожалуйста, будьте осторожны при предоставлении URL-адреса подключения, имени пользователя и пароля вашей базы данных в общедоступных настройках, подобных этому. Я отредактировал ваш ответ, чтобы удалить его, но история по-прежнему будет общедоступной. Я рекомендую вам удалить вопрос и повторно опубликовать его без этой информации.

2. спасибо @Jofre, я создал новый пост [ superuser.com/questions/1416478 / … , и я скоро удалю это

Ответ №1:

Ошибка, которую вы видите. Ваш экземпляр app engine должен быть запущен от имени определенного пользователя или «Учетной записи службы«. Вам необходимо включить разрешения для storage.buckets.список в этой учетной записи. Используемая вами учетная запись службы по умолчанию, скорее всего, не имеет этих разрешений, вы можете найти инструкции здесь. Я также рекомендую сначала попробовать запустить конвейер за пределами app engine и убедиться, что он успешно запущен. Также может быть проще создать конвейер шаблонов и запустить его таким образом.

Кроме того, к вашему сведению, возможно, стоит перепроверить все в этом сообщении в блоге:

В этом блоге показано, как запустить задание шаблона DF, сделайте это как задание cron, вместо этого вам просто нужно будет запустить его из RPC. Эти инструкции должны, по крайней мере, помочь с большей частью настройки.

https://amygdala.github.io/dataflow/app_engine/2017/10/24/gae_dataflow.html