#apache-spark #pyspark #cassandra #blob #spark-cassandra-connector
#apache-spark #pyspark #cassandra #blob #spark-cassandra-connector
Вопрос:
Я пытаюсь вставить шестнадцатеричную строку в таблицу Cassandra со столбцом типа данных blob. Структура таблицы Cassandra выглядит следующим образом:
СОЗДАЙТЕ ТАБЛИЦУ mob.sample (ПЕРВИЧНЫЙ КЛЮЧ id text, большой двоичный объект данных);
Вот мой код:
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.functions import udf
def hexstrtohexnum(hexstr):
ani = int(hexstr[2:],16)
return(ani)
# Create a DataFrame using SparkSession
spark = (SparkSession.builder
.appName('SampleLoader')
.appName('SparkCassandraApp')
.getOrCreate())
schema = StructType([StructField("id",StringType(),True),
StructField("data",StringType(),True)])
# Create a DataFrame
df = spark.createDataFrame([("key1", '0x546869732069732061206669727374207265636f7264'),
("key2", '0x546865207365636f6e64207265636f7264'),
("key3", '0x546865207468697264207265636f7264')],schema)
hexstr2hexnum = udf(lambda z: hexstrtohexnum(z),IntegerType())
spark.udf.register("hexstr2hexnum", hexstr2hexnum)
df.withColumn("data",hexstr2hexnum("data"))
df.write.format("org.apache.spark.sql.cassandra").options(keyspace='mob',table='sample').save(mode="append")
Когда я запускаю приведенный выше код, он выдает ошибку:
WARN 2020-09-03 19:41:57,902 org.apache.spark.scheduler.TaskSetManager: Lost task 3.0 in stage 17.0 (TID 441, 10.37.122.156, executor 2): com.datastax.spark.connector.types.TypeConversionException: Cannot convert object 0x546869732069732061206669727374207265636f7264 of type class java.lang.String to java.nio.ByteBuffer.
at com.datastax.spark.connector.types.TypeConverter$$anonfun$convert$1.apply(TypeConverter.scala:44)
at com.datastax.spark.connector.types.TypeConverter$ByteBufferConverter$$anonfun$convertPF$11.applyOrElse(TypeConverter.scala:258)
at com.datastax.spark.connector.types.TypeConverter$class.convert(TypeConverter.scala:42)
at com.datastax.spark.connector.types.TypeConverter$ByteBufferConverter$.com$datastax$spark$connector$types$NullableTypeConverter$$super$convert(TypeConverter.scala:255)
Вот содержимое фрейма данных.
>>> df.show(3)
---- --------------------
| id| data|
---- --------------------
|key1|0x546869732069732...|
|key2|0x546865207365636...|
|key3|0x546865207468697...|
---- --------------------
Кто-нибудь может мне помочь, что не так с моим кодом? Есть ли что-то, чего мне не хватает?
Ответ №1:
При чтении тестовой записи тип большого двоичного объекта отображается как BinaryType, а не StringType
>>> table1 = spark.read.format("org.apache.spark.sql.cassandra").options(table="blobtest",keyspace="test").load()
>>> table1.show()
---- --------------------
| f1| f2|
---- --------------------
|1234|[54 68 69 73 20 6...|
---- --------------------
>>> print(table1.schema)
StructType(List(StructField(f1,StringType,false),StructField(f2,BinaryType,true)))
Измените свою схему на BinaryType, и вы сможете ее записать
>>> string = "This is a test."
>>> arr = bytearray(string, 'utf-8')
>>> schema = StructType([StructField("f1",StringType(),True),StructField("f2",BinaryType(),True)])
>>> df = spark.createDataFrame([("key3",arr)],schema)
>>> df.show()
---- --------------------
| f1| f2|
---- --------------------
|key3|[54 68 69 73 20 6...|
---- --------------------
>>> df.write.format("org.apache.spark.sql.cassandra").options(keyspace='test',table='blobtest2').save(mode="append")
Комментарии:
1. Решение работает, только если строка является строкой ascii. Но когда строка представляет собой шестнадцатеричную строку большого двоичного объекта, и вы хотите, чтобы шестнадцатеричная строка большого двоичного объекта сохранялась как есть в столбце большого двоичного объекта, это не работает. Есть ли возможность сохранить шестнадцатеричную строку blob как есть в столбце blob?
2. Используйте bytearray.fromhex для преобразования шестнадцатеричной строки в bytearray, например df = spark.createDataFrame([(«key4»,bytearray.fromhex(«546869732069732061206669727374207265636f7264»))],schema)
Ответ №2:
Я наконец нашел способ решить эту проблему.
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.functions import udf
import binascii
def hextobinary(hexstr):
a = binascii.unhexlify(hexstr[2:])
return(a)
# Create a DataFrame using SparkSession
spark = (SparkSession.builder
.appName('SampleLoader')
.appName('SparkCassandraApp')
.getOrCreate())
schema = StructType([StructField("id",StringType(),True),
StructField("data",StringType(),True)])
# Create a DataFrame
df = spark.createDataFrame([("key1", '0x546869732069732061206669727374207265636f7264'),
("key2", '0x546865207365636f6e64207265636f7264'),
("key3", '0x546865207468697264207265636f7264')],schema)
print(df)
tobinary = udf(lambda z: hextobinary(z),BinaryType())
spark.udf.register("tobinary", tobinary)
df1 = df.withColumn("data",tobinary("data"))
print(df1)
df1.write.format("org.apache.spark.sql.cassandra").options(keyspace='mob',table='sample').save(mode="append")