Как записать данные в таблицы Apache Iceberg с помощью Spark SQL?

#apache-spark #apache-spark-sql #iceberg

#apache-spark #apache-spark-sql #iceberg

Вопрос:

Я пытаюсь ознакомиться с Apache Iceberg, и у меня возникли некоторые проблемы с пониманием того, как записать некоторые внешние данные в таблицу с использованием Spark SQL.

  • У меня есть файл one.csv, который находится в каталоге /data
  • мой каталог Iceberg настроен так, чтобы указывать на этот каталог, /warehouse
  • Я хочу записать этот файл.csv в таблицу Iceberg Apache (желательно с использованием Spark SQL)

Возможно ли вообще считывать внешние данные с помощью Spark SQL? А затем записать их в таблицы iceberg? Должен ли я использовать scala или python для этого? Я просмотрел документацию Iceberg и Spark 3.0.1, но, возможно, я что-то упускаю.

Обновление кода

Вот некоторый код, который, я надеюсь, поможет

 spark.conf.set("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog")
spark.conf.set("spark.sql.catalog.spark_catalog.type", "hive")
spark.conf.set("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog")
spark.conf.set("spark.sql.catalog.local.type", "hadoop")
spark.conf.set("spark.sql.catalog.local.warehouse", "data/warehouse")
 

У меня есть данные, которые мне нужны для использования, в каталоге /one/one.csv

Как мне перенести их в таблицу Iceberg с помощью Spark? Можно ли все это сделать с помощью SparkSQL?

 spark.sql(
"""
CREATE or REPLACE TABLE local.db.one
USING iceberg
AS SELECT * FROM `/one/one.csv`
"""
)
 

Тогда цель состоит в том, чтобы я мог работать с этой таблицей iceberg напрямую, например:

 select * from local.db.one
 

и это дало бы мне все содержимое из файла / one / one.csv.

Комментарии:

1. как записать в таблицу, задокументировано или нет? iceberg.apache.org/getting-started — таким образом, вы должны иметь возможность загружать CSV-файл в Spark Dataframe и вставлять его в таблицу Iceberg — что именно не работает? Вы должны показать пример кода

2. @UninformedUser Я добавил некоторый код для уточнения, надеюсь, это поможет.

Ответ №1:

Чтобы использовать SparkSQL, считайте файл в фрейм данных, а затем регистрируйте его как временное представление. Это временное представление теперь может быть передано в SQL как:

 var df = spark.read.format("csv").load("/data/one.csv")
df.createOrReplaceTempView("tempview");

spark.sql("CREATE or REPLACE TABLE local.db.one USING iceberg AS SELECT * FROM tempview");
 

Чтобы ответить на ваш другой вопрос, Scala или Python не требуются; приведенный выше пример написан на Java.

Комментарии:

1. обратите внимание, что если вы собираетесь перезаписывать, временное представление и sql не нужны, java api также может это сделать

Ответ №2:

 val sparkConf = new SparkConf()
sparkConf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
sparkConf.set("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog")
sparkConf.set("spark.sql.catalog.spark_catalog.type", "hive")
sparkConf.set("spark.sql.catalog.hive_catalog", "org.apache.iceberg.spark.SparkCatalog")
sparkConf.set("spark.sql.catalog.hive_catalog.type", "hadoop")
sparkConf.set("spark.sql.catalog.hive_catalog.warehouse", "hdfs://host:port/user/hive/warehouse")
sparkConf.set("hive.metastore.uris", "thrift://host:19083")
sparkConf.set("spark.sql.catalog.hive_prod", " org.apache.iceberg.spark.SparkCatalog")
sparkConf.set("spark.sql.catalog.hive_prod.type", "hive")
sparkConf.set("spark.sql.catalog.hive_prod.uri", "thrift://host:19083")
sparkConf.set("hive.metastore.warehouse.dir", "hdfs://host:port/user/hive/warehouse")
val spark: SparkSession = SparkSession.builder()
  .enableHiveSupport()
  .config(sparkConf)
  .master("yarn")
  .appName("kafkaTableTest")
  .getOrCreate()

spark.sql(
  """
    |
    |create table if not exists hive_catalog.icebergdb.kafkatest1(
    |    company_id int,
    |    event string,
    |    event_time timestamp,
    |    position_id int,
    |    user_id int
    |)using iceberg
    |PARTITIONED BY (days(event_time))
    |""".stripMargin)

import spark.implicits._



val df: DataFrame = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "kafka_server")
  .option("subscribe", "topic")
  .option("startingOffsets", "latest")
  .load()
//.selectExpr("cast (value as string)")

val value: DataFrame = df.selectExpr("CAST(value AS STRING)")
  .as[String]
  .map(data => {
    val json_str: JSONObject = JSON.parseObject(data)
    val company_id: Integer = json_str.getInteger("company_id")
    val event: String = json_str.getString("event")
    val event_time: String = json_str.getString("event_time")
    val position_id: Integer = json_str.getInteger("position_id")
    val user_id: Integer = json_str.getInteger("user_id")
    (company_id, event, event_time, position_id, user_id)
  })
  .toDF("company_id", "event", "event_time", "position_id", "user_id")



value.createOrReplaceTempView("table")

spark.sql(
  """
    |select
    | company_id,
    | event,
    | to_timestamp(event_time,'yyyy-MM-dd HH:mm:ss') as event_time,
    | position_id,
    | user_id
    |from table
    |""".stripMargin)
  .writeStream
  .format("iceberg")
  .outputMode("append")
  .trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))
  .option("path","hive_catalog.icebergdb.kafkatest1") // tablePath: catalog.db.tableName
  .option("checkpointLocation","hdfspath")
  .start()
  .awaitTermination()
 

В этом примере выполняется чтение данных из Kafka и запись данных в таблицу Iceberg