Преобразование HQL в SparkSQL

#apache-spark #hive #hiveql

#apache-spark #улей #hiveql

Вопрос:

Я пытаюсь преобразовать HQL в Spark.

У меня есть следующий запрос (работает в Hue с редактором Hive):

 select reflect('java.util.UUID', 'randomUUID') as id,
    tt.employee, 
    cast( from_unixtime(unix_timestamp (date_format(current_date(),'dd/MM/yyyy HH:mm:ss'), 'dd/MM/yyyy HH:mm:ss')) as timestamp) as insert_date,
    collect_set(tt.employee_detail) as employee_details,
    collect_set( tt.emp_indication ) as employees_indications,
    named_struct ('employee_info', collect_set(tt.emp_info),
        'employee_mod_info', collect_set(tt.emp_mod_info),
        'employee_comments', collect_set(tt.emp_comment) )
        as emp_mod_details,
    from (
        select views_ctr.employee,
        if ( views_ctr.employee_details.so is not null, views_ctr.employee_details, null ) employee_detail,
        if ( views_ctr.employee_info.so is not null, views_ctr.employee_info, null ) emp_info,
        if ( views_ctr.employee_comments.so is not null, views_ctr.employee_comments, null ) emp_comment,
        if ( views_ctr.employee_mod_info.so is not null, views_ctr.employee_mod_info, null ) emp_mod_info,
        if ( views_ctr.emp_indications.so is not null, views_ctr.emp_indications, null ) employees_indication,
        from 
        ( select * from views_sta where emp_partition=0 and employee is not null ) views_ctr
        ) tt
        group by employee
        distribute by employee
  

Во-первых, я пытаюсь записать это spark.sql следующим образом:

 sparkSession.sql("select reflect('java.util.UUID', 'randomUUID') as id, tt.employee,    cast( from_unixtime(unix_timestamp (date_format(current_date(),'dd/MM/yyyy HH:mm:ss'), 'dd/MM/yyyy HH:mm:ss')) as timestamp) as insert_date,    collect_set(tt.employee_detail) as employee_details,    collect_set( tt.emp_indication ) as employees_indications,  named_struct ('employee_info', collect_set(tt.emp_info),        'employee_mod_info', collect_set(tt.emp_mod_info),      'employee_comments', collect_set(tt.emp_comment) )      as emp_mod_details, from (      select views_ctr.employee,      if ( views_ctr.employee_details.so is not null, views_ctr.employee_details, null ) employee_detail,     if ( views_ctr.employee_info.so is not null, views_ctr.employee_info, null ) emp_info,      if ( views_ctr.employee_comments.so is not null, views_ctr.employee_comments, null ) emp_comment,       if ( views_ctr.employee_mod_info.so is not null, views_ctr.employee_mod_info, null ) emp_mod_info,      if ( views_ctr.emp_indications.so is not null, views_ctr.emp_indications, null ) employees_indication,      from        ( select * from views_sta where emp_partition=0 and employee is not null ) views_ctr        ) tt        group by employee       distribute by employee")
  

Но я получил следующее исключение:

Исключение в потоке «main» org.apache.spark.SparkException: Задание прервано из-за сбоя этапа: Задача не сериализуема: java.io.NotSerializableException: org.apache.spark.unsafe.types.UTF8String$IntWrapper -объект, не сериализуемый (класс: org.apache.spark.unsafe.types.UTF8String$IntWrapper, значение: org.apache.spark.unsafe.types.UTF8String$IntWrapper@30cfd641)

Если я пытаюсь выполнить свой запрос без collect_set выполнения его работы, это может привести к сбою, потому что типы столбцов struct в моей таблице?

Как я могу написать свой запрос HQL в Spark / исправить мое исключение?