Переместите значение метки текущего времени spark в метку времени postgres со столбцом часового пояса

#postgresql #scala #apache-spark #hadoop #apache-spark-sql

Вопрос:

Мне нужно ввести значение «start_time» в таблицу postgres, в которой есть столбец с типом данных «отметка времени с часовым поясом». Мне нужно решение только в соединении java jdbc только, пожалуйста.

 import spark.implicits._  val df1 =Seq(("Process Name","Process Description"))  .toDF("process_nm","process_desc")  val df2 = df1.withColumn("start_time",current_timestamp)  df2.show(false)  df2.printSchema  df2.collect().foreach(row=gt;  {   println("Before calling-" row.getString(0) " " row.getString(1) "   " row.getTimestamp(2))    var process_name:String=row.getString(0)  var process_description:String=row.getString(1)   var start_time=row.getTimestamp(2)  var insertSql="""insert into test_log(process_nm,start_time,process_desc)   values('$process_name','$start_time','$process_description')"""    import com.typesafe.config.ConfigFactory  import org.apache.spark.sql.SparkSession  import org.apache.spark.sql.functions.{concat, lit}   import java.io.File  import java.sql.{Connection, DriverManager}    var db_conn_string = "jdbc:"   db_type   "://"   db_host   ":"   db_port   "/"   db_database  val direct_conn = DriverManager.getConnection(db_conn_string, db_user, db_pass)  val statement = direct_conn.createStatement()  val result=statement.executeUpdate(insertSql)  println("inserted-" result)   println("insert_sql-" insertSql)  })   

Получение ошибки ниже при нажатии start_time в таблицу postgres

 Before calling-Process Name Process Description 2021-12-06 05:51:12.278559 org.postgresql.util.PSQLException: ERROR: invalid input syntax for type timestamp with time zone: "$start_time"  Position: 93  at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2552)  at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2284)  at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:322)  at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:481)  at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:401)  at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:322)  at org.postgresql.jdbc.PgStatement.executeCachedSql(PgStatement.java:308)  at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:284)  at org.postgresql.jdbc.PgStatement.executeUpdate(PgStatement.java:258)  at $anonfun$res19$1(lt;pastiegt;:57)  at $anonfun$res19$1$adapted(lt;pastiegt;:32)  at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)  at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)  

Ответ №1:

Воспользуйся:

 var insertSql="insert into test_log(process_nm,start_time,process_desc)   values('$process_name','$start_time','$process_description')"  

Вы не хотите использовать»»», так как он не допускает подстановку переменных.

В качестве отступления, где это возможно, я бы также предложил использовать «val» вместо » var » для повышения производительности.

Комментарии:

1. Я считаю, что он не жалуется на цитаты. Spark не может преобразовать «start_time» в метку времени postgres с типом данных часового пояса. Есть ли способ преобразовать его в метку времени в формате часового пояса?

2. Я не жалуюсь на цитаты, с которыми я согласен, и вы правы в том, что postgress не может интерпретировать то, что вы отправляете, но проблема в том, что вы буквально отправляете «$start_time». В вашем сообщении об ошибке говорится об этом выше. К вашему сведению, посмотрите на «»» это необработанный интерполятор для scala и не выполняет интерполяцию переменных.

3. Если вы чувствуете, что этот ответ был полезным и вам удобно, не могли бы вы отметить его как правильный?

4. Я попробовал с одинарными двойными кавычками и все та же проблема.

5. Можете ли вы обновить вопрос своим обновленным кодом?