Многопоточность кода с использованием System.Расширения Reactive

#c# #multithreading #system.reactive #.net-5

#c# #многопоточность #system.reactive #.net-5

Вопрос:

Приведенный ниже код загружает исторические данные OHLCV из Binance с даты начала до даты окончания. Поскольку Binance позволяет нам загружать только 1000 свечей одновременно, я сделал DownloadAsync так, как есть. Также приветствуются любые рекомендации по коду.

На самом деле вопрос заключается в том, чтобы сделать DownloadAsync многопоточным, чтобы ускорить процесс, потому что представьте, что вы загружаете свечи с 2018 по 2021 год с интервалом 5m. Я бы предпочел использовать System.Reactive , но, думаю, другие решения тоже приветствуются, поскольку сложно представить код в многопоточной версии.

Приведенный ниже код можно протестировать.

 using Binance.Net;
using Binance.Net.Enums;
using Binance.Net.Interfaces;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.IO;
using System.Threading.Tasks;
using System.Linq;
using System.Text.RegularExpressions;
using System.Reactive.Linq;
using System.Threading;

namespace DownloadCandleDataTest
{
    public class DataProvider
    {
        private Exchange _exchange;

        public DataProvider(Exchange exchange)
        {
            _exchange = exchange;
        }

        public async Task<List<OHLCV>> DownloadAsync(string pair, KlineInterval interval, DateTime startDate, DateTime endDate, int startupCandleCount)
        {
            DateTime start = startDate;
            DateTime end = endDate;

            var tempStartDate = start;
            var tempEndDate = end;
            var tempList = new List<OHLCV>();

            for (int i = 0; tempStartDate < tempEndDate; i  )
            {
                var candles = await _exchange.GetCandlesAsync(pair, interval, tempStartDate, tempEndDate, 100).ConfigureAwait(false);

                if (candles.Count > 0)
                {
                    // Remove the first candle when i > 0, to prevent duplicates
                    if (i > 0)
                    {
                        candles.RemoveAt(0);
                    }

                    var first = candles.First();
                    var last = candles.Last();
                    Console.WriteLine($"First: {first.Timestamp} | Last: {last.Timestamp}");

                    tempList.AddRange(candles);

                    tempStartDate = last.Timestamp;
                }
            }

            // Find duplicates
            var groups = tempList.GroupBy(g => g.Timestamp).Where(e => e.Skip(1).Any());

            foreach (var group in groups)
            {
                Console.WriteLine(group.Key);
                foreach (var ohclv in group)
                {
                    Console.WriteLine("t"   ohclv.Timestamp);
                }
            }


            return null;
        }
    }

    class Program
    {
        public static void StartBackgroundWork()
        {
            Console.WriteLine("Shows use of Start to start on a background thread:");
            var o = Observable.Start(() =>
            {
                //This starts on a background thread.
                Console.WriteLine("From background thread. Does not block main thread.");
                Console.WriteLine("Calculating...");
                Thread.Sleep(3000);
                Console.WriteLine("Background work completed.");
            }).Finally(() => Console.WriteLine("Main thread completed."));
            Console.WriteLine("rnt In Main Thread...rn");
            o.Wait();   // Wait for completion of background operation.
        }

        static async Task Main(string[] args)
        {
            using var exchange = new Exchange();
            var dataProvider = new DataProvider(exchange);
            await dataProvider.DownloadAsync("TRXUSDT", KlineInterval.FiveMinutes, new DateTime(2019, 1, 1), new DateTime(2019, 1, 2), 100).ConfigureAwait(false);

            Console.ReadLine();
        }
    }

    public class OHLCV
    {
        public DateTime Timestamp { get; set; }
        public decimal Open { get; set; }
        public decimal High { get; set; }
        public decimal Low { get; set; }
        public decimal Close { get; set; }
        public decimal Volume { get; set; }
    }

    public static class Extensions
    {
        public static OHLCV ToCandle(this IBinanceKline candle)
        {
            return new OHLCV
            {
                Timestamp = candle.OpenTime,
                Open = candle.Open,
                High = candle.High,
                Low = candle.Low,
                Close = candle.Close,
                Volume = candle.BaseVolume,
            };
        }
    }

    public class Exchange : IDisposable
    {
        private readonly IBinanceClient _client;

        public Exchange()
        {
            _client = new BinanceClient();
        }

        public async Task<List<OHLCV>> GetCandlesAsync(string pair, KlineInterval interval, DateTime? startTime = null, DateTime? endTime = null, int? limit = null)
        {
            var result = await _client.Spot.Market.GetKlinesAsync(pair, interval, startTime, endTime, limit).ConfigureAwait(false);

            if (result.Success)
            {
                return result.Data?.Select(e => e.ToCandle()).ToList();
            }

            return null;
        }

        public void Dispose()
        {
            if (_client != null)
            {
                _client.Dispose();
            }
        }
    }
}

 

Комментарии:

1. В чем собственно вопрос, хотите ли вы делать больше загрузок параллельно? Занимает ли обработка после загрузки процессорное время, и вы хотите ее распараллелить?

2. Я хочу больше параллельных загрузок, да. В Python (ccxt) он загружает свечи так быстро из-за многопоточности. Python (ccxt lib) использует asyncio. github.com/freqtrade/freqtrade/blob /…

Ответ №1:

Вы серьезно переоцениваете это.

Поскольку вы получаете равномерно распределенные свечи и знаете, сколько вы получаете за вызов GetKlinesAsync , вы можете вычислить все требуемые даты начала.

     var pair = "TRXUSDT";
    var interval = KlineInterval.FiveMinutes;
    var startDate = new DateTime(2019, 1, 1);
    var endDate = new DateTime(2019, 1, 2);
    var gap = 5.0; // same as `interval` for purpose of computing start dates.
    var limit = 100;

    IObservable<DateTime> startDates =
        Observable
            .Generate(startDate, x => x <= endDate, x => x.AddMinutes(gap * limit), x => x);
 

Теперь сгенерировать ваш запрос довольно просто:

     IObservable<OHLCV> query =
        from s in startDates
        from rs in
            Observable
                .Using(
                    () => new BinanceClient(),
                    bc => Observable.FromAsync(ct =>
                        bc.Spot.Market.GetKlinesAsync(pair, interval, s, endDate, limit, ct)))
        where rs.Success
        from c in rs.Data.Select(x => x.ToCandle())
        select c;
 

Поскольку это параллельный запрос, возможно, даже вероятно, что вы получите свои результаты не по порядку, поэтому вам нужно выполнить .ToArray() запрос, чтобы иметь возможность обрабатывать все данные, полученные в конце, а не по мере поступления каждой свечи.

     IDisposable subscription =
        query
            .ToArray()
            .Select(xs => xs.OrderBy(x => x.Timestamp).ToArray())
            .Subscribe(cs =>
            {
                /* candles downloaded using multiple threads */
                /* and sorted in `Timestamp` order  */
            });
 

Это создает все свечи по порядку, используя несколько потоков, без каких-либо дубликатов.

Если вы хотите использовать его в качестве DownLoadAsync метода, тогда вы делаете это:

 public async Task<List<OHLCV>> DownloadAsync(string pair, KlineInterval interval, double gap, DateTime startDate, DateTime endDate, int limit)
{
    IObservable<DateTime> startDates =
        Observable
            .Generate(startDate, x => x <= endDate, x => x.AddMinutes(gap * limit), x => x);

    IObservable<OHLCV> query =
        from s in startDates
        from rs in
            Observable
                .Using(
                    () => new BinanceClient(),
                    bc => Observable.FromAsync(ct =>
                        bc.Spot.Market.GetKlinesAsync(pair, interval, s, endDate, limit, ct)))
        where rs.Success
        from c in rs.Data.Select(x => x.ToCandle())
        select c;

    return await query.ToArray().Select(xs => xs.OrderBy(x => x.Timestamp).ToList());
}
 

Обратите внимание, что подпись немного изменилась.

Комментарии:

1. Спасибо за ваш исчерпывающий ответ! Как мне вернуть свечи в метод DownloadAsync?

2. Большое спасибо! Это то, чего я хотел.

3. @nop — Или, в полном объеме, это: pastebin.com/4GuNavFc

4. @nop — класс, который вы сделали накладным, который потенциально просто добавляет ошибок. И поскольку он реализует IDisposable , вы должны использовать Observable.Using его в любом случае. Предиктор ошибок номер один — это код. Чем больше кода вы пишете, тем больше у вас ошибок. Так что пишите меньше кода.

5. @nop — Вы можете делать все, что хотите, но я стараюсь писать меньше кода, и я стараюсь создавать и распоряжаться доступными ресурсами так быстро, как только могу.

Ответ №2:

Ключом к параллельному выполнению большего количества веб-запросов является создание множества задач и ожидание их всех Task.WhenAll() вместо ожидания каждого из них внутри цикла (циклов).

Если вы ожидаете каждого из них в цикле, они будут обрабатываться последовательно (хотя поток пользовательского интерфейса не будет заблокирован во время выполнения веб-запроса).

Комментарии:

1. Я знаю эту концепцию. Мне нужен фрагмент с моим методом DownloadAsync, потому что посмотрите на него. Даже параллельный. For не может быть использован на нем прямо сейчас