Поток While(true) Сущность

#c# #multithreading #entity-framework #while-loop #concurrent-queue

#c# #многопоточность #сущность-фреймворк #цикл while #параллельная очередь

Вопрос:

Я создаю регистратор свечей (Binance Crypto), интересный для 1-минутных свечей, включая данные внутри свечи для изучения рынка (но в конечном итоге я мог бы использовать этот же код, чтобы на самом деле следить за тем, что происходит на рынке)

Чтобы избежать возможной задержки / производительности EF / SQL и т. Д. Я решил сделать это, используя два потока.

Один получает подписанные (асинхронные) токены от Binance и помещает их в ConcurrentQueue, в то время как другой продолжает пытаться отменить очередь и сохранить данные в MSSQL

Мой вопрос касается второго потока, цикла while (true). Каков наилучший подход для сохранения более 200 информации в секунду в SQL, пока эта информация поступает по отдельности (иногда 300 информации за 300 мс, иногда меньше) с использованием EF:

Должен ли я открывать SQL con каждый раз, когда хочу сохранить? (Производительность). Каков наилучший подход для достижения этой цели?

— ОТРЕДАКТИРОВАНО — В какой-то момент я получил 600k в очереди, поэтому я столкнулся с проблемами при вставке в SQL, измененный с Linq на SQL на EF

Вот мой фактический код:

 //Initialize 
        public void getCoinsMoves()
        {
            Thread THTransferDatatoSQL = new Thread(TransferDatatoSQL);
            THTransferDatatoSQL.Name = "THTransferDatatoSQL";
            THTransferDatatoSQL.SetApartmentState(ApartmentState.STA);
            THTransferDatatoSQL.IsBackground = true;
            THTransferDatatoSQL.Start();

            List<string> SymbolsMap;
            using(DBBINANCEEntities lSQLBINANCE = new DBBINANCEEntities())
            {
                SymbolsMap = lSQLBINANCE.TB_SYMBOLS_MAP.Select(h => h.SYMBOL).ToList();
            }

            socketClient.Spot.SubscribeToKlineUpdatesAsync(SymbolsMap, Binance.Net.Enums.KlineInterval.OneMinute, h =>
            {
                RecordCandles(h);
            });
        }

//Enqueue Data
        public void RecordCandles(Binance.Net.Interfaces.IBinanceStreamKlineData Candle)
        {
            FRACTIONED_CANDLES.Enqueue(new TB_FRACTIONED_CANDLES_DATA()
            {
                BASE_VOLUME = Candle.Data.BaseVolume,
                CLOSE_TIME = Candle.Data.CloseTime.AddHours(-3),
                MONEY_VOLUME = Candle.Data.QuoteVolume,
                PCLOSE = Candle.Data.Close,
                PHIGH = Candle.Data.High,
                PLOW = Candle.Data.Low,
                POPEN = Candle.Data.Open,
                SYMBOL = Candle.Symbol,
                TAKER_BUY_BASE_VOLUME = Candle.Data.TakerBuyBaseVolume,
                TAKER_BUY_MONEY_VOLUME = Candle.Data.TakerBuyQuoteVolume,
                TRADES = Candle.Data.TradeCount,
                IS_LAST_CANDLE = Candle.Data.Final
            });
        }

//Transfer Data to SQL
        public void TransferDatatoSQL()
        {
            while (true)
            {
                TB_FRACTIONED_CANDLES_DATA NewData;
                if (FRACTIONED_CANDLES.TryDequeue(out NewData))
                {
                    using (DBBINANCEEntities LSQLBINANCE = new DBBINANCEEntities())
                    {
                        LSQLBINANCE.TB_FRACTIONED_CANDLES_DATA.Add(NewData);
                        if (NewData.IS_LAST_CANDLE)
                            LSQLBINANCE.TB_CANDLES_DATA.Add(new TB_CANDLES_DATA()
                            {
                                BASE_VOLUME = NewData.BASE_VOLUME,
                                CLOSE_TIME = NewData.CLOSE_TIME,
                                IS_LAST_CANDLE = NewData.IS_LAST_CANDLE,
                                MONEY_VOLUME = NewData.MONEY_VOLUME,
                                PCLOSE = NewData.PCLOSE,
                                PHIGH = NewData.PHIGH,
                                PLOW = NewData.PLOW,
                                POPEN = NewData.POPEN,
                                SYMBOL = NewData.SYMBOL,
                                TAKER_BUY_BASE_VOLUME = NewData.TAKER_BUY_BASE_VOLUME,
                                TAKER_BUY_MONEY_VOLUME = NewData.TAKER_BUY_MONEY_VOLUME,
                                TRADES = NewData.TRADES
                            });
                        LSQLBINANCE.SaveChanges();
                    }
                }
                Thread.Sleep(1);
            }            
        }
 

Спасибо в Adv

Рафаэль

Комментарии:

1. Не используйте LINQ для SQL; он древний и устаревший. Не используйте потоки; вы очень быстро отклеитесь. Не беспокойтесь об открытии и закрытии соединений; это не то, что вы думаете (прочитайте, что такое объединение пулов соединений). Не беспокойтесь о производительности вставки 300 элементов в секунду, пока вы на самом деле не докажете, что это проблема…

2. Привет, Кайус, спасибо за ответ. Я на самом деле сталкиваюсь с проблемами, которые я знаю. В какой-то момент моя очередь получила более 600 тыс. элементов, поэтому второй поток отстает для выполнения операции вставки. Я запускаю MSSQL на обычном рабочем столе I5 с 16 ГБ оперативной памяти. Есть предложения?

3. InsertOnSubmit работает очень медленно. Используйте SqlBulkCopy вместо этого и запускайте его каждые несколько секунд

4. Я думал об этом, но несколько секунд на 1-минутной свече (для онлайн-целей, когда и если я собираюсь ее использовать), это все равно, что ослепнуть

Ответ №1:

Я вижу одну ошибку в вашем коде, вы переводите фоновый поток в спящий режим после каждой вставки, не переходите в спящий режим, если есть больше данных. Вместо:

 if (FRACTIONED_CANDLES.TryDequeue(out NewData))
{
    using (DBBINANCEEntities LSQLBINANCE = new DBBINANCEEntities())
    {
        LSQLBINANCE.TB_FRACTIONED_CANDLES_DATA.Add(NewData);
        if (NewData.IS_LAST_CANDLE)
            LSQLBINANCE.TB_CANDLES_DATA.Add(new TB_CANDLES_DATA()
            {
                BASE_VOLUME = NewData.BASE_VOLUME,
                CLOSE_TIME = NewData.CLOSE_TIME,
                IS_LAST_CANDLE = NewData.IS_LAST_CANDLE,
                MONEY_VOLUME = NewData.MONEY_VOLUME,
                PCLOSE = NewData.PCLOSE,
                PHIGH = NewData.PHIGH,
                PLOW = NewData.PLOW,
                POPEN = NewData.POPEN,
                SYMBOL = NewData.SYMBOL,
                TAKER_BUY_BASE_VOLUME = NewData.TAKER_BUY_BASE_VOLUME,
                TAKER_BUY_MONEY_VOLUME = NewData.TAKER_BUY_MONEY_VOLUME,
                TRADES = NewData.TRADES
            });
        LSQLBINANCE.SaveChanges();
    }
}
Thread.Sleep(1);
 

Измените последнюю строку на:

 else
    Thread.Sleep(1);
 

Это может решить вашу проблему.

Комментарии:

1. Кроме того, totally мог видеть, что это потенциально выгодно за счет использования более простой объемной вставки вместо одной за раз, поскольку вы не манипулируете вставленными данными / не нужно их отслеживать. Очевидно, что мало пользы, если одновременно выполняется только одна ожидающая запись, но все равно это приведет к удалению отслеживания объектов EF. Я использую github.com/borisdj/EFCore . Объемные расширения для такого рода задач. Вы могли бы переписать фоновый поток, чтобы удалять из очереди ВСЕ ожидающие данные и каждый раз вставлять в один оператор, чтобы избавиться / уменьшить накладные расходы на отслеживание объектов EF при вашем текущем подходе.

2. Это действительно помогло! Thx, измененный на bulk, решает проблему. Просто захватите все, даже если все просто 1 или 1.000