#apache-atlas #spline-data-lineage-tracker
Вопрос:
Я пытался запустить следующий код с новым сплайном jsr: za.co.absa.spline.agent.spark:spark-3.0-spline-agent-bundle_2.12:0.6.0, но получал ошибки, характерные для UserExtraMetadataProvider, который был устаревшим в более новых версиях. Я также попытался заменить UserExtraMetadataProvider на UserExtraAppendingPostProcessingFilter, используя код, показанный ниже этого первого блока кода, но все равно получаю ошибки. Не могли бы вы проверить и поделиться тем, как правильно написать код фильтра после обработки, используя новый пакет сплайнов.
%scala
import za.co.absa.spline.harvester.conf.StandardSplineConfigurationStack
import za.co.absa.spline.harvester.extra.UserExtraMetadataProvider
import za.co.absa.spline.harvester.HarvestingContext
import org.apache.commons.configuration.Configuration
import za.co.absa.spline.harvester.SparkLineageInitializer._
import za.co.absa.spline.harvester.conf.DefaultSplineConfigurer
import za.co.absa.spline.producer.model._
import scala.util.parsing.json.JSON
val splineConf: Configuration = StandardSplineConfigurationStack(spark)
spark.enableLineageTracking(new DefaultSplineConfigurer(splineConf) {
//override protected def userExtraMetadataProvider = new UserExtraMetaDataProvider {
//val test = dbutils.notebook.getContext.notebookPath
val notebookInformationJson = dbutils.notebook.getContext.toJson
val outerMap = JSON.parseFull(notebookInformationJson).getOrElse(0).asInstanceOf[Map[String,String]]
val tagMap = outerMap("tags").asInstanceOf[Map[String,String]]
val extraContextMap = outerMap("extraContext").asInstanceOf[Map[String,String]]
val notebookPath = extraContextMap("notebook_path").split("/")
val notebookURL = tagMap("browserHostName") "/?o=" tagMap("orgId") tagMap("browserHash")
val user = tagMap("user")
val name = notebookPath(notebookPath.size-1)
val notebookInfo = Map("notebookURL" -> notebookURL,
"user" -> user,
"name" -> name,
"mounts" -> dbutils.fs.ls("/mnt").map(_.path),
"timestamp" -> System.currentTimeMillis)
val notebookInfoJson = scala.util.parsing.json.JSONObject(notebookInfo)
override protected def userExtraMetadataProvider: UserExtraMetadataProvider = new UserExtraMetadataProvider {
override def forExecEvent(event: ExecutionEvent, ctx: HarvestingContext): Map[String, Any] = Map("foo" -> "bar1")
override def forExecPlan(plan: ExecutionPlan, ctx: HarvestingContext): Map[String, Any] = Map("notebookInfo" -> notebookInfoJson) // tilføj mount info til searchAndReplace denne funktion indeholder infoen
override def forOperation(op: ReadOperation, ctx: HarvestingContext): Map[String, Any] = Map("foo" -> "bar3")
override def forOperation(op: WriteOperation, ctx: HarvestingContext): Map[String, Any] = Map("foo" -> "bar4")
override def forOperation(op: DataOperation, ctx: HarvestingContext): Map[String, Any] = Map("foo" -> "bar5")
}
})
Вот обновленный код, в котором все еще есть ошибки
%scala
import za.co.absa.spline.harvester.conf.StandardSplineConfigurationStack
import za.co.absa.spline.harvester.extra.UserExtraMetadataProvider
import za.co.absa.spline.harvester.HarvestingContext
import org.apache.commons.configuration.Configuration
import za.co.absa.spline.harvester.SparkLineageInitializer._
import za.co.absa.spline.harvester.conf.DefaultSplineConfigurer
import za.co.absa.spline.producer.model._
import play.api.libs.json._
val splineConf: Configuration = StandardSplineConfigurationStack(spark)
spark.enableLineageTracking(new DefaultSplineConfigurer(splineConf) {
val notebookInformationJson = Json.toJson(dbutils.notebook.getContext)
val outerMap = Json.toJson(notebookInformationJson).getOrElse(0).asInstanceOf[Map[String,String]]
val tagMap = outerMap("tags").asInstanceOf[Map[String,String]]
val extraContextMap = outerMap("extraContext").asInstanceOf[Map[String,String]]
val notebookPath = extraContextMap("notebook_path").split("/")
val notebookURL = tagMap("browserHostName") "/?o=" tagMap("orgId") tagMap("browserHash")
val user = tagMap("user")
val name = notebookPath(notebookPath.size-1)
val notebookInfo = Map("notebookURL" -> Json.toJson(notebookURL),
"user" -> Json.toJson(user),
"name" -> Json.toJson(name),
"mounts" -> Json.toJson(dbutils.fs.ls("/mnt").map(_.path)),
"timestamp" -> Json.toJson(System.currentTimeMillis))
val notebookInfoJson = Json.toJson(notebookInfo)
def userExtraMetadataProvider: UserExtraAppendingPostProcessingFilter
= new UserExtraAppendingPostProcessingFilter
{
def processExecutionEvent(event: ExecutionEvent, ctx: HarvestingContext): Map[String, Any] = Map("foo" -> "bar1")
def processExecutionPlan (plan: ExecutionPlan, ctx: HarvestingContext): Map[String, Any] = Map("notebookInfo" -> notebookInfoJson)
def processReadOperation(op: ReadOperation, ctx: HarvestingContext): Map[String, Any] = Map("foo" -> "bar3")
def processWriteOperation(op: WriteOperation, ctx: HarvestingContext): Map[String, Any] = Map("foo" -> "bar4")
def processDataOperation(op: DataOperation, ctx: HarvestingContext): Map[String, Any] = Map("foo" -> "bar5")
}
})
Вот в чем ошибка:
command-2044409137370707:12: error: not enough arguments for constructor DefaultSplineConfigurer: (sparkSession: org.apache.spark.sql.SparkSession, userConfiguration: org.apache.commons.configuration.Configuration)za.co.absa.spline.harvester.conf.DefaultSplineConfigurer.
Unspecified value parameter userConfiguration.
spark.enableLineageTracking(new DefaultSplineConfigurer(splineConf) {
^
command-2044409137370707:32: error: not found: type UserExtraAppendingPostProcessingFilter
def userExtraMetadataProvider: UserExtraAppendingPostProcessingFilter
^
command-2044409137370707:33: error: not found: type UserExtraAppendingPostProcessingFilter
= new UserExtraAppendingPostProcessingFilter
^
command-2044409137370707:37: error: not found: type ExecutionEvent
def processExecutionEvent(event: ExecutionEvent, ctx: HarvestingContext): Map[String, Any] = Map("foo" -> "bar1")
^
command-2044409137370707:38: error: not found: type ExecutionPlan
def processExecutionPlan (plan: ExecutionPlan, ctx: HarvestingContext): Map[String, Any] = Map("notebookInfo" -> notebookInfoJson)
^
command-2044409137370707:39: error: not found: type ReadOperation
def processReadOperation(op: ReadOperation, ctx: HarvestingContext): Map[String, Any] = Map("foo" -> "bar3")
^
command-2044409137370707:40: error: not found: type WriteOperation
def processWriteOperation(op: WriteOperation, ctx: HarvestingContext): Map[String, Any] = Map("foo" -> "bar4")
^
command-2044409137370707:41: error: not found: type DataOperation
def processDataOperation(op: DataOperation, ctx: HarvestingContext): Map[String, Any] = Map("foo" -> "bar5")
^
command-2044409137370707:36: warning: a pure expression does nothing in statement position; multiline expressions may require enclosing parentheses
{
^
Ответ №1:
Ваш код не компилируется по нескольким причинам:
- Вы пропустили некоторые импортные операции (это ясно видно из журнала ошибок):
import za.co.absa.spline.producer.model.v1_1._ import za.co.absa.spline.harvester.extra.UserExtraAppendingPostProcessingFilter
- Правильной подписью для дополнительного поставщика метаданных является следующая:
protected def maybeUserExtraMetadataProvider: Option[UserExtraMetadataProvider]
UserExtraAppendingPostProcessingFilter
это просто адаптер для устаревшихUserExtraMetadataProvider
. Поэтому вам все равно нужно создать экземпляр:new UserExtraAppendingPostProcessingFilter(new UserExtraMetadataProvider() { // ??? })
Пожалуйста, обратите внимание, что мы работаем над декларативным решением для сбора дополнительных метаданных, чтобы большинство правил и значений можно было определить в конфигурации, и для этого практически не потребуется кодирование. Видишь https://github.com/AbsaOSS/spline-spark-agent/issues/169
А пока просто используйте UserExtraMetadataProvider
Для получения более подробной информации см. https://github.com/AbsaOSS/spline-spark-agent/discussions/228#discussioncomment-819620