# #go #redis #slice
Вопрос:
У меня есть пример использования, в котором я должен прочитать данные из Google BigTable и сохранить их в базе данных redis, которая в дальнейшем будет использоваться командой для внутренней обработки.
Для того же самого написан приведенный ниже код
func (sS *someStruct) GetAndSetDataForCurrentDay() error {
tm := time.Now()
currDay := getPrefix(tm) //Some algo to get the prefix for BigTable Keys
client, err := sS.GetClient("ClientName") // Big Table Initilisation
tbl := client.Open("TableName")
var redisRd []class_ad.RedisData //This might be culprit :(
err = tbl.ReadRows(
context.Background(),
bt.PrefixRange(currDay),
func(row bt.Row) bool {
data, _ := sS.readAllRowData(row)
rd, _ := sS.parseAllRows(data)
redisRd = append(redisRd, rd...) // redisRd is global slice which I feel might be causing bottleneck
return true
},
)
redisClient, err := redisv2.GetConnectionx("redisConnName") //Redis Coonection Established Outside
saveToRedis(redisClient, redisRd, time.Now())
defer redisClient.Close()
return err
}
func (sS *someStruct) parseAllRows(data []ColumnData) ([]class_ad.RedisData, error) {
for _, v := range data {
if v.ColumnName == "SomeColumn1" {
someValue = string(v.Value)
continue
}
if v.ColumnName == "SomeColumn2" {
someOtherValue, err = v
continue
}
.
.
.
.
.
.
.
.
}
return []class_ad.RedisData{"key1#Parallal", "Key2#Parallal"}, err
}
func (sS *someStruct) readAllRowData(row bt.Row) ([]ColumnData, error) {
rowData := make([]ColumnData, 0)
for columnFamilyName, columnFamilyData := range row {
for _, column := range columnFamilyData {
singleColumnData := ColumnData{
Key: column.Row,
ColumnFamilyName: columnFamilyName,
ColumnName: column.Column,
Value: column.Value,
}
rowData = append(rowData, singleColumnData)
}
}
return rowData, nil
}
func saveToRedis(redisClient *redisv2.Connectionx, rowData []class_ad.RedisData, now time.Time) {
channel := make(chan bool) //Channel created
for _, data := range rowData {
go setKeyDataInRedis(redisClient, data, now, channel)
}
success, fail := 0, 0
for range rowData {
if <-channel {
success
continue
}
fail
}
close(channel)
log.Printf("Success %v", success)
log.Printf("Failure %v", fail)
}
func setKeyDataInRedis(redisClient *redisv2.Connectionx, data class_ad.RedisData, now time.Time, channel chan bool) {
redisClient.ExpireAt(data.Key, endDate.Unix())
if redisClient.HMSet(data.Key, map[string]string{
"COLUMN_1": strconv.FormatFloat(data.Column1, 'f', -1, 64),
"COLUMN_2": strconv.FormatFloat(data.Conversion, 'f', -1, 64),
.
.
.
.
"CREATION_DATE": now.String(),
"MODIFICATION_DATE": now.String()}) != nil {
channel <- false
return
}
channel <- true
}
Количество записей, которые мы читаем, составляет около 10 миллионов в день. Приведенный выше код следует за очень высокой загрузкой процессора, что приводит к гибели службы. Любое решение по этой линии. Массовое сохранение данных в базе данных Redis.
Комментарии:
1. «Очень высокая загрузка процессора» является естественным следствием эффективной работы при большой рабочей нагрузке. Это не проблема. Похоже, что в этом сценарии проблема заключается в том, что «приводит к гибели службы».
Ответ №1:
Текущий дизайн является неоптимальным. Я вижу две проблемы:
- У вас хранится огромный список
redisRd
, потому что вы читаете все строки, прежде чем начнете запись в redis. Для этого требуется огромный объем памяти. Уже представленный код не оправдывает необходимость одновременного хранения всех строк в памяти. - Например, цикл, в
saveToRedis
котором вы повторяетеrowData
, порождает несколько миллионов горотинов почти в один и тот же момент. Все они начнут пытаться связаться с redis. Вероятно, конфигурация клиента защищает ваш сервер от массовых ошибок подключения. Но это не меняет того факта, что такое количество гороутов требует координации и нескольких ГБ памяти.
Вышеуказанные проблемы, вероятно, также приводят к дополнительной загрузке процессора из-за GC.
Вы можете заменить свой глобальный список redisRd
на chan class_ad.RedisData
. Канал должен быть буферизован и записан с помощью tbl.ReadRows
обратного вызова. Прежде чем вы начнете читать из таблицы BigQuery, вам следует настроить небольшое количество гороутов (вы можете начать с 50), которые будут отвечать за чтение данных из этого канала и запись их в redis. Количество таких гороутов будет определять количество одновременно выполняемых операций набора. Конечно, результат такой операции не должен быть сообщен обратно tbl.ReadRows
, а должен быть отправлен на другой канал. Вы также можете выполнить конвейерную обработку, чтобы уменьшить количество пакетов, отправляемых в redis. Все эти изменения должны значительно сократить использование памяти и ускорить весь процесс. Ниже приведен мой вариант того, как это может выглядеть:
package main
import (
"context"
"log"
"sync"
)
type (
// Row is a structure returned by big query client
Row struct{}
bigQueryRepository interface {
ReadRows(ctx context.Context, callback func(Row) bool) error
}
BigQueryReader struct {
client bigQueryRepository
}
// Record is a structure used by the redis repository
Record struct{}
RedisWriter struct{}
RowsWriter interface {
Write(ctx context.Context, row []Row) error
}
WorkDistributor struct {
OnError func(err error)
Workers []RowsWriter
rows chan Row
}
)
func (r BigQueryReader) ReadRows(ctx context.Context, writer RowsWriter) error {
return r.client.ReadRows(ctx, func(row Row) bool {
if err := writer.Write(ctx, []Row{row}); err != nil {
// Since there is only one goroutine which reads the data from bigquery,
// translation to Record (potentially heavy operation) is done by workers.
// There are multiple workers, so they can make better use of multiple CPU cores.
log.Printf("Error writing a row: %s", err)
return false
}
return true
})
}
func (r RedisWriter) convertRowToRecord(row Row) []Record {
// convert row to records
return nil
}
// Write multiple rows. Method guarantees that either all rows are written or error occurs.
func (r RedisWriter) Write(ctx context.Context, rows []Row) error {
// Use MULTI / EXEC commands to simulate transaction since we want to write all rows or none.
// Send MULTI to redis
for i := range rows {
records := r.convertRowToRecord(rows[i])
for j := range records {
// Send SET operations using pipeline
_ = j
}
}
// Send EXEC to redis
return nil
}
func (w WorkDistributor) Write(ctx context.Context, rows []Row) error {
for i := range rows {
w.rows <- rows[i]
}
return nil
}
func (w WorkDistributor) Run() {
wg := sync.WaitGroup{}
for i := 0; i < len(w.Workers); i {
wg.Add(1)
go func(idx int, worker RowsWriter) {
defer wg.Done()
for row := range w.rows {
// Instead of writing single row there could be some batching mechanism to gather multiple rows together and pass them to worker.
err := worker.Write(context.Background(), []Row{row})
if err != nil {
w.OnError(err)
}
}
}(i, w.Workers[i])
}
wg.Wait()
}
const numberOfWorkers = 50
func main() {
wrks := make([]RowsWriter, numberOfWorkers)
for i := 0; i < numberOfWorkers; i {
wrks[i] = RedisWriter{}
}
writer := WorkDistributor{Workers: wrks, OnError: func(err error) {
log.Printf("Error writing to redis: %s", err)
}, rows: make(chan Row, numberOfWorkers)}
bigQuery := BigQueryReader{}
err := bigQuery.ReadRows(context.Background(), writer)
if err != nil {
log.Fatal(err)
}
}
Существует 3 основных элемента: средство чтения строк, средство записи строк и распределитель работ. Благодаря этому вы можете легко писать модульные тесты для каждого элемента.
Что стоит упомянуть, так это то, что высокая загрузка процессора не означает, что что-то не так. Вы должны следить за тем, тратится ли процессор на выполнение логики приложения (что хорошо) или на GC, обширное выделение памяти и т. Д., Что может быть признаком того, что ваше приложение может быть оптимизировано.
Комментарии:
1. Спасибо @Jaroslaw. У меня есть небольшая идея. Не могли бы вы дополнить свой ответ каким-нибудь фиктивным фрагментом кода?
2. Я добавил несколько примеров. Не стесняйтесь спрашивать, если у вас есть какие-либо вопросы.
3. Конвейер-это то, что я не смогу использовать из-за некоторых ограничений. Я думаю, смогу ли я использовать пакетную обработку redisData в пределах
tbl.ReadRows(
. Что СЛУЧИЛОСЬ?4. Не могли бы вы подробнее рассказать о том, почему вы не можете использовать конвейер? Как
tbl.ReadRows
поможет дозирование?5. ПОЭТОМУ для Redis мы избежали этого в оболочке, которую мы создали для соединений Redis. ПОЭТОМУ я исключил это. Для Дозирования. Я имею в виду, что, как только количество пакетов увеличится, я вызову Redis для сохранения этих записей.