Узкое место в сохранении 10 млн записей в базе данных Redis

# #go #redis #slice

Вопрос:

У меня есть пример использования, в котором я должен прочитать данные из Google BigTable и сохранить их в базе данных redis, которая в дальнейшем будет использоваться командой для внутренней обработки.

Для того же самого написан приведенный ниже код

 func (sS *someStruct) GetAndSetDataForCurrentDay() error {

    tm := time.Now()
    currDay := getPrefix(tm) //Some algo to get the prefix for BigTable Keys

    client, err := sS.GetClient("ClientName") // Big Table Initilisation
    tbl := client.Open("TableName")

    var redisRd []class_ad.RedisData //This might be culprit :(

    err = tbl.ReadRows(
        context.Background(),
        bt.PrefixRange(currDay),
        func(row bt.Row) bool {
            data, _ := sS.readAllRowData(row)
            rd, _ := sS.parseAllRows(data)
            redisRd = append(redisRd, rd...) // redisRd is global slice which I feel might be causing bottleneck
            return true
        },
    )
    redisClient, err := redisv2.GetConnectionx("redisConnName") //Redis Coonection Established Outside
    saveToRedis(redisClient, redisRd, time.Now())
    defer redisClient.Close()

    return err
}

func (sS *someStruct) parseAllRows(data []ColumnData) ([]class_ad.RedisData, error) {

    for _, v := range data {
        if v.ColumnName == "SomeColumn1" {
            someValue = string(v.Value)
            continue
        }

        if v.ColumnName == "SomeColumn2" {
            someOtherValue, err = v
            continue
        }
        .
        .
        .
        .
        .
        .
        .
        .
    }
    return []class_ad.RedisData{"key1#Parallal", "Key2#Parallal"}, err
}

        
func (sS *someStruct) readAllRowData(row bt.Row) ([]ColumnData, error) {
    rowData := make([]ColumnData, 0)
    for columnFamilyName, columnFamilyData := range row {
        for _, column := range columnFamilyData {
            singleColumnData := ColumnData{
                Key:              column.Row,
                ColumnFamilyName: columnFamilyName,
                ColumnName:       column.Column,
                Value:            column.Value,
            }
            rowData = append(rowData, singleColumnData)
        }
    }
    return rowData, nil
}
    
func saveToRedis(redisClient *redisv2.Connectionx, rowData []class_ad.RedisData, now time.Time) {
    channel := make(chan bool) //Channel created
    for _, data := range rowData {
        go setKeyDataInRedis(redisClient, data, now, channel)
    }
    success, fail := 0, 0
    for range rowData {
        if <-channel {
            success  
            continue
        }
        fail  
    }
    close(channel)

    log.Printf("Success %v", success)
    log.Printf("Failure %v", fail)
}

func setKeyDataInRedis(redisClient *redisv2.Connectionx, data class_ad.RedisData, now time.Time, channel chan bool) {
    redisClient.ExpireAt(data.Key, endDate.Unix())
    if redisClient.HMSet(data.Key, map[string]string{
        "COLUMN_1": strconv.FormatFloat(data.Column1, 'f', -1, 64),
        "COLUMN_2": strconv.FormatFloat(data.Conversion, 'f', -1, 64),
        .
        .
        .
        .
        "CREATION_DATE":     now.String(),
        "MODIFICATION_DATE": now.String()}) != nil {
        channel <- false
        return
    }
    channel <- true
}
 

Количество записей, которые мы читаем, составляет около 10 миллионов в день. Приведенный выше код следует за очень высокой загрузкой процессора, что приводит к гибели службы. Любое решение по этой линии. Массовое сохранение данных в базе данных Redis.

Комментарии:

1. «Очень высокая загрузка процессора» является естественным следствием эффективной работы при большой рабочей нагрузке. Это не проблема. Похоже, что в этом сценарии проблема заключается в том, что «приводит к гибели службы».

Ответ №1:

Текущий дизайн является неоптимальным. Я вижу две проблемы:

  1. У вас хранится огромный список redisRd , потому что вы читаете все строки, прежде чем начнете запись в redis. Для этого требуется огромный объем памяти. Уже представленный код не оправдывает необходимость одновременного хранения всех строк в памяти.
  2. Например, цикл, в saveToRedis котором вы повторяете rowData , порождает несколько миллионов горотинов почти в один и тот же момент. Все они начнут пытаться связаться с redis. Вероятно, конфигурация клиента защищает ваш сервер от массовых ошибок подключения. Но это не меняет того факта, что такое количество гороутов требует координации и нескольких ГБ памяти.

Вышеуказанные проблемы, вероятно, также приводят к дополнительной загрузке процессора из-за GC.

Вы можете заменить свой глобальный список redisRd на chan class_ad.RedisData . Канал должен быть буферизован и записан с помощью tbl.ReadRows обратного вызова. Прежде чем вы начнете читать из таблицы BigQuery, вам следует настроить небольшое количество гороутов (вы можете начать с 50), которые будут отвечать за чтение данных из этого канала и запись их в redis. Количество таких гороутов будет определять количество одновременно выполняемых операций набора. Конечно, результат такой операции не должен быть сообщен обратно tbl.ReadRows , а должен быть отправлен на другой канал. Вы также можете выполнить конвейерную обработку, чтобы уменьшить количество пакетов, отправляемых в redis. Все эти изменения должны значительно сократить использование памяти и ускорить весь процесс. Ниже приведен мой вариант того, как это может выглядеть:

 package main

import (
    "context"
    "log"
    "sync"
)

type (
    // Row is a structure returned by big query client
    Row                struct{}
    bigQueryRepository interface {
        ReadRows(ctx context.Context, callback func(Row) bool) error
    }
    BigQueryReader struct {
        client bigQueryRepository
    }
    // Record is a structure used by the redis repository
    Record      struct{}
    RedisWriter struct{}

    RowsWriter interface {
        Write(ctx context.Context, row []Row) error
    }
    WorkDistributor struct {
        OnError func(err error)
        Workers []RowsWriter
        rows    chan Row
    }
)

func (r BigQueryReader) ReadRows(ctx context.Context, writer RowsWriter) error {
    return r.client.ReadRows(ctx, func(row Row) bool {
        if err := writer.Write(ctx, []Row{row}); err != nil {
            // Since there is only one goroutine which reads the data from bigquery,
            // translation to Record (potentially heavy operation) is done by workers. 
            // There are multiple workers, so they can make better use of multiple CPU cores. 
            log.Printf("Error writing a row: %s", err)
            return false
        }
        return true
    })
}

func (r RedisWriter) convertRowToRecord(row Row) []Record {
    // convert row to records
    return nil
}

// Write multiple rows. Method guarantees that either all rows are written or error occurs.
func (r RedisWriter) Write(ctx context.Context, rows []Row) error {
    // Use MULTI / EXEC commands to simulate transaction since we want to write all rows or none.
    // Send MULTI to redis
    for i := range rows {
        records := r.convertRowToRecord(rows[i])
        for j := range records {
            // Send SET operations using pipeline
            _ = j
        }
    }
    // Send EXEC to redis
    return nil
}

func (w WorkDistributor) Write(ctx context.Context, rows []Row) error {
    for i := range rows {
        w.rows <- rows[i]
    }
    return nil
}

func (w WorkDistributor) Run() {
    wg := sync.WaitGroup{}
    for i := 0; i < len(w.Workers); i   {
        wg.Add(1)
        go func(idx int, worker RowsWriter) {
            defer wg.Done()
            for row := range w.rows {
                // Instead of writing single row there could be some batching mechanism to gather multiple rows together and pass them to worker.
                err := worker.Write(context.Background(), []Row{row})
                if err != nil {
                    w.OnError(err)
                }
            }
        }(i, w.Workers[i])
    }
    wg.Wait()
}

const numberOfWorkers = 50

func main() {
    wrks := make([]RowsWriter, numberOfWorkers)
    for i := 0; i < numberOfWorkers; i   {
        wrks[i] = RedisWriter{}
    }
    writer := WorkDistributor{Workers: wrks, OnError: func(err error) {
        log.Printf("Error writing to redis: %s", err)
    }, rows: make(chan Row, numberOfWorkers)}
    bigQuery := BigQueryReader{}
    err := bigQuery.ReadRows(context.Background(), writer)
    if err != nil {
        log.Fatal(err)
    }
}
 

Существует 3 основных элемента: средство чтения строк, средство записи строк и распределитель работ. Благодаря этому вы можете легко писать модульные тесты для каждого элемента.

Что стоит упомянуть, так это то, что высокая загрузка процессора не означает, что что-то не так. Вы должны следить за тем, тратится ли процессор на выполнение логики приложения (что хорошо) или на GC, обширное выделение памяти и т. Д., Что может быть признаком того, что ваше приложение может быть оптимизировано.

Комментарии:

1. Спасибо @Jaroslaw. У меня есть небольшая идея. Не могли бы вы дополнить свой ответ каким-нибудь фиктивным фрагментом кода?

2. Я добавил несколько примеров. Не стесняйтесь спрашивать, если у вас есть какие-либо вопросы.

3. Конвейер-это то, что я не смогу использовать из-за некоторых ограничений. Я думаю, смогу ли я использовать пакетную обработку redisData в пределах tbl.ReadRows( . Что СЛУЧИЛОСЬ?

4. Не могли бы вы подробнее рассказать о том, почему вы не можете использовать конвейер? Как tbl.ReadRows поможет дозирование?

5. ПОЭТОМУ для Redis мы избежали этого в оболочке, которую мы создали для соединений Redis. ПОЭТОМУ я исключил это. Для Дозирования. Я имею в виду, что, как только количество пакетов увеличится, я вызову Redis для сохранения этих записей.