Как вставить шестнадцатеричные данные в столбец типа данных blob в Cassandra через pyspark

#apache-spark #pyspark #cassandra #blob #spark-cassandra-connector

#apache-spark #pyspark #cassandra #blob #spark-cassandra-connector

Вопрос:

Я пытаюсь вставить шестнадцатеричную строку в таблицу Cassandra со столбцом типа данных blob. Структура таблицы Cassandra выглядит следующим образом:

СОЗДАЙТЕ ТАБЛИЦУ mob.sample (ПЕРВИЧНЫЙ КЛЮЧ id text, большой двоичный объект данных);

Вот мой код:

 from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.functions import udf


def hexstrtohexnum(hexstr):
    ani = int(hexstr[2:],16)
    return(ani)

# Create a DataFrame using SparkSession
spark = (SparkSession.builder
         .appName('SampleLoader')
         .appName('SparkCassandraApp')
         .getOrCreate())


schema = StructType([StructField("id",StringType(),True),
                     StructField("data",StringType(),True)])

# Create a DataFrame
df = spark.createDataFrame([("key1", '0x546869732069732061206669727374207265636f7264'),
                                 ("key2", '0x546865207365636f6e64207265636f7264'),
                                 ("key3", '0x546865207468697264207265636f7264')],schema)

hexstr2hexnum = udf(lambda z: hexstrtohexnum(z),IntegerType())
spark.udf.register("hexstr2hexnum", hexstr2hexnum)
df.withColumn("data",hexstr2hexnum("data"))
df.write.format("org.apache.spark.sql.cassandra").options(keyspace='mob',table='sample').save(mode="append")
  

Когда я запускаю приведенный выше код, он выдает ошибку:

 WARN  2020-09-03 19:41:57,902 org.apache.spark.scheduler.TaskSetManager: Lost task 3.0 in stage 17.0 (TID 441, 10.37.122.156, executor 2): com.datastax.spark.connector.types.TypeConversionException: Cannot convert object 0x546869732069732061206669727374207265636f7264 of type class java.lang.String to java.nio.ByteBuffer.
    at com.datastax.spark.connector.types.TypeConverter$$anonfun$convert$1.apply(TypeConverter.scala:44)
    at com.datastax.spark.connector.types.TypeConverter$ByteBufferConverter$$anonfun$convertPF$11.applyOrElse(TypeConverter.scala:258)
    at com.datastax.spark.connector.types.TypeConverter$class.convert(TypeConverter.scala:42)
    at com.datastax.spark.connector.types.TypeConverter$ByteBufferConverter$.com$datastax$spark$connector$types$NullableTypeConverter$$super$convert(TypeConverter.scala:255)
  

Вот содержимое фрейма данных.

 >>> df.show(3)
 ---- -------------------- 
|  id|                data|
 ---- -------------------- 
|key1|0x546869732069732...|
|key2|0x546865207365636...|
|key3|0x546865207468697...|
 ---- -------------------- 
  

Кто-нибудь может мне помочь, что не так с моим кодом? Есть ли что-то, чего мне не хватает?

Ответ №1:

При чтении тестовой записи тип большого двоичного объекта отображается как BinaryType, а не StringType

 >>> table1 = spark.read.format("org.apache.spark.sql.cassandra").options(table="blobtest",keyspace="test").load()
>>> table1.show()
 ---- -------------------- 
|  f1|                  f2|
 ---- -------------------- 
|1234|[54 68 69 73 20 6...|
 ---- -------------------- 

>>> print(table1.schema)
StructType(List(StructField(f1,StringType,false),StructField(f2,BinaryType,true)))
  

Измените свою схему на BinaryType, и вы сможете ее записать

 >>> string = "This is a test."
>>> arr = bytearray(string, 'utf-8')
>>> schema = StructType([StructField("f1",StringType(),True),StructField("f2",BinaryType(),True)])
>>> df = spark.createDataFrame([("key3",arr)],schema)
>>> df.show()
         ---- -------------------- 
        |  f1|                  f2|
         ---- -------------------- 
        |key3|[54 68 69 73 20 6...|
         ---- -------------------- 
        
>>> df.write.format("org.apache.spark.sql.cassandra").options(keyspace='test',table='blobtest2').save(mode="append")
  

Комментарии:

1. Решение работает, только если строка является строкой ascii. Но когда строка представляет собой шестнадцатеричную строку большого двоичного объекта, и вы хотите, чтобы шестнадцатеричная строка большого двоичного объекта сохранялась как есть в столбце большого двоичного объекта, это не работает. Есть ли возможность сохранить шестнадцатеричную строку blob как есть в столбце blob?

2. Используйте bytearray.fromhex для преобразования шестнадцатеричной строки в bytearray, например df = spark.createDataFrame([(«key4»,bytearray.fromhex(«546869732069732061206669727374207265636f7264»))],schema)

Ответ №2:

Я наконец нашел способ решить эту проблему.

 from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.functions import udf
import binascii

def hextobinary(hexstr):
    a = binascii.unhexlify(hexstr[2:])
    return(a)

# Create a DataFrame using SparkSession
spark = (SparkSession.builder
         .appName('SampleLoader')
         .appName('SparkCassandraApp')
         .getOrCreate())


schema = StructType([StructField("id",StringType(),True),
                     StructField("data",StringType(),True)])

# Create a DataFrame
df = spark.createDataFrame([("key1", '0x546869732069732061206669727374207265636f7264'),
                            ("key2", '0x546865207365636f6e64207265636f7264'),
                            ("key3", '0x546865207468697264207265636f7264')],schema)
print(df)

tobinary = udf(lambda z: hextobinary(z),BinaryType())
spark.udf.register("tobinary", tobinary)
df1 = df.withColumn("data",tobinary("data"))
print(df1)
df1.write.format("org.apache.spark.sql.cassandra").options(keyspace='mob',table='sample').save(mode="append")