Проблема с Scala StructType для потоковой передачи в блоках данных

#scala #parquet

#скала #parquet #scala

Вопрос:

Я борюсь с типом данных и подумал, может ли кто-нибудь мне помочь. Я создаю структурированный поток, используя сетку событий, Azure Databricks в Azure Synapse. Рассматриваемый файл имеет формат parquet, и я использую scala.

Когда я выполняю схему печати в файле, есть два столбца, которые отображаются как десятичные ( 6,2 ), один из которых вызывает проблему. Столбец, который выдает проблемы, имеет нулевые значения, другой десятичный столбец имеет целочисленные значения.

Я создаю StructType и для этих двух полей я использую тип DoubleType для двух десятичных файлов. Когда я делаю это, я получаю следующее сообщение об ошибке для десятичного столбца, который содержит нули

 Error while reading file wasbs:REDACTED_LOCAL_PART@storage.blob.core.windows.net/part-00000-xxx-xxx-xx-xxxx-xxx.c000.snappy.parquet. Parquet column cannot be converted. Column: [FlUsed], Expected: DoubleType, Found: INT32
  

Затем, когда я изменяю структуру на IntegerType, как предложено в сообщении об ошибке, и запускаю поток, я получаю следующее сообщение об ошибке.

 java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainIntegerDictionary
  

Если я сначала конвертирую parquet в csv, тогда все работает отлично, но это не долгосрочное решение. Из того, что я прочитал, это явно проблема с типом данных, но я не знаю, что делать, чтобы ее решить.

Любая помощь была бы высоко оценена.

Ответ №1:

Трудно точно определить решение, не увидев какой-нибудь пример кода, однако вам может потребоваться привести столбец. Вы можете сделать это, используя что-то вроде следующего:

 col("myColumn").cast(DoubleType)
  

Я бы предположил из того, что вы сказали, что вы должны установить тип ввода либо в DecimalType (6,2), либо в IntegerType. Тип вывода, вероятно, двухтипный.

Комментарии:

1. Привет, Ник, спасибо, я попробую это .. Я не против поделиться кодом.

Ответ №2:

Спасибо, я постараюсь привести это в действие, … размещение кода здесь в качестве комментариев не допускает достаточного количества символов.

 val schema = StructType(Seq(
   StructField("field1 ", StringType),
   StructField("field2 ", TimestampType),
   StructField("field5", StringType),                       
   StructField("field6", IntegerType), // spark.read and schema show reads the schema as follows decimal(6,2) – field actually contains integers 
   StructField("field7", IntegerType), // spark.read and schema show reads the schema as follows decimal(6,2) – field actually contains nulls .. this is the one giving the problem .. explicitly setting null ability value to true doesn't help
   StructField("field8", StringType), 
   StructField("field14", IntegerType)
))

 val stream = spark.readStream
   .format("abs-aqs")
   .option("fileFormat", "parquet")
   .option("queueName", "inbound-events")
   .option("connectionString", s"xxx")
   .schema(schema)
   .load()
  

Комментарии:

1. Поместите это в свой первоначальный вопрос 👍