Обработка декодированных данных нескольких типов с помощью goavro

#go #serialization #apache-kafka #avro #debezium

#Вперед #сериализация #apache-kafka #avro #debezium

Вопрос:

Что я пытаюсь сделать?

Для каждого изменения в базе данных я пытаюсь преобразовать событие debezium в CSV значений базы данных для загрузки в Redshift.

Для приведенных ниже изменений 110 я пытаюсь создать файл csv: 110,vck,desc,221.1

 mysql> select * from products;
 ----- ------------- --------------------------------------------------------- -------- 
| id  | name        | description                                             | weight |
 ----- ------------- --------------------------------------------------------- -------- 
| 110 | vck         | desc                                                    |  221.1 |
 ----- ------------- --------------------------------------------------------- -------- 
  

Вот моя попытка сделать это с помощью goavro.

Программа

https://play.golang.org/p/A8Wd0sZPUEQ

 package main

import (
    "fmt"
    "encoding/json"
)

func main() {
    debeziumEvent := `{"before":null,"after":{"datapipe.inventory.products.Value":{"id":110,"name":"vck","description":{"string":"desc"},"weight":{"double":221.10000610351562}}},"source":{"query":null,"snapshot":{"string":"true"},"server_id":0,"gtid":null,"name":"datapipe","thread":null,"ts_ms":0,"file":"mysql-bin.000049","version":"1.2.1.Final","connector":"mysql","pos":154,"table":{"string":"products"},"row":0,"db":"inventory"},"op":"c","ts_ms":{"long":1597649700266},"transaction":null}`
    
    var data map[string]interface{}
        err := json.Unmarshal([]byte(debeziumEvent), amp;data)
        if err != nil {
            panic(err)
        }
    
    after := data["after"].(map[string]interface{})
    csv := make([]interface{}, 0)
    
    for _, v := range after {
        for _, v2 := range v.(map[string]interface{}) {
            switch stype := v2.(type) {
            case map[string]interface{}:
                for _, v3 := range v2.(map[string]interface{}) {
                    csv = append(csv, v3)
                }
            case string:
                csv = append(csv, v2)
            case int:
                csv = append(csv, v2)
            case float64:
                csv = append(csv, v2)
            default:
                fmt.Printf("type %s not handledn", stype) 
                panic("unhandled type")
            }
            
        }
    }
    
    fmt.Println(csv)
}
  

Есть ли способ сделать это лучше? Для каждого типа данных мне нужно было бы иметь здесь инструкцию switch….

Связанная проблема с GoAVRO:https://github.com/linkedin/goavro/issues/217

Ответ №1:

fmt.Sprintf может использоваться для преобразования интерфейсов в строку. str := fmt.Sprintf("%v", v)

Выполнение этого уменьшает количество операторов case до 2:

 package main

import (
    "fmt"
    "encoding/json"
)

func main() {
    debeziumEvent := `{"before":null,"after":{"datapipe.inventory.products.Value":{"id":110,"name":"vck","description":{"string":"desc"},"weight":{"double":221.10000610351562}}},"source":{"query":null,"snapshot":{"string":"true"},"server_id":0,"gtid":null,"name":"datapipe","thread":null,"ts_ms":0,"file":"mysql-bin.000049","version":"1.2.1.Final","connector":"mysql","pos":154,"table":{"string":"products"},"row":0,"db":"inventory"},"op":"c","ts_ms":{"long":1597649700266},"transaction":null}`
    
    var data map[string]interface{}
        err := json.Unmarshal([]byte(debeziumEvent), amp;data)
        if err != nil {
            panic(err)
        }
    //fmt.Printf("data=%vn", data)
    
    after := data["after"].(map[string]interface{})
    csv := []string{}
    
    for _, v := range after {
        for _, v2 := range v.(map[string]interface{}) {
            switch v2.(type) {
            case map[string]interface{}:
                for _, v3 := range v2.(map[string]interface{}) {
                    csv = append(csv, fmt.Sprintf("%v", v3))
                }
            default:
                csv = append(csv, fmt.Sprintf("%v", v2))
            }
            
        }
    }
    
    fmt.Println(csv)
}