#c# #multithreading #system.reactive #.net-5
#c# #многопоточность #system.reactive #.net-5
Вопрос:
Приведенный ниже код загружает исторические данные OHLCV из Binance с даты начала до даты окончания. Поскольку Binance позволяет нам загружать только 1000 свечей одновременно, я сделал DownloadAsync
так, как есть. Также приветствуются любые рекомендации по коду.
На самом деле вопрос заключается в том, чтобы сделать DownloadAsync
многопоточным, чтобы ускорить процесс, потому что представьте, что вы загружаете свечи с 2018 по 2021 год с интервалом 5m. Я бы предпочел использовать System.Reactive
, но, думаю, другие решения тоже приветствуются, поскольку сложно представить код в многопоточной версии.
Приведенный ниже код можно протестировать.
using Binance.Net;
using Binance.Net.Enums;
using Binance.Net.Interfaces;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.IO;
using System.Threading.Tasks;
using System.Linq;
using System.Text.RegularExpressions;
using System.Reactive.Linq;
using System.Threading;
namespace DownloadCandleDataTest
{
public class DataProvider
{
private Exchange _exchange;
public DataProvider(Exchange exchange)
{
_exchange = exchange;
}
public async Task<List<OHLCV>> DownloadAsync(string pair, KlineInterval interval, DateTime startDate, DateTime endDate, int startupCandleCount)
{
DateTime start = startDate;
DateTime end = endDate;
var tempStartDate = start;
var tempEndDate = end;
var tempList = new List<OHLCV>();
for (int i = 0; tempStartDate < tempEndDate; i )
{
var candles = await _exchange.GetCandlesAsync(pair, interval, tempStartDate, tempEndDate, 100).ConfigureAwait(false);
if (candles.Count > 0)
{
// Remove the first candle when i > 0, to prevent duplicates
if (i > 0)
{
candles.RemoveAt(0);
}
var first = candles.First();
var last = candles.Last();
Console.WriteLine($"First: {first.Timestamp} | Last: {last.Timestamp}");
tempList.AddRange(candles);
tempStartDate = last.Timestamp;
}
}
// Find duplicates
var groups = tempList.GroupBy(g => g.Timestamp).Where(e => e.Skip(1).Any());
foreach (var group in groups)
{
Console.WriteLine(group.Key);
foreach (var ohclv in group)
{
Console.WriteLine("t" ohclv.Timestamp);
}
}
return null;
}
}
class Program
{
public static void StartBackgroundWork()
{
Console.WriteLine("Shows use of Start to start on a background thread:");
var o = Observable.Start(() =>
{
//This starts on a background thread.
Console.WriteLine("From background thread. Does not block main thread.");
Console.WriteLine("Calculating...");
Thread.Sleep(3000);
Console.WriteLine("Background work completed.");
}).Finally(() => Console.WriteLine("Main thread completed."));
Console.WriteLine("rnt In Main Thread...rn");
o.Wait(); // Wait for completion of background operation.
}
static async Task Main(string[] args)
{
using var exchange = new Exchange();
var dataProvider = new DataProvider(exchange);
await dataProvider.DownloadAsync("TRXUSDT", KlineInterval.FiveMinutes, new DateTime(2019, 1, 1), new DateTime(2019, 1, 2), 100).ConfigureAwait(false);
Console.ReadLine();
}
}
public class OHLCV
{
public DateTime Timestamp { get; set; }
public decimal Open { get; set; }
public decimal High { get; set; }
public decimal Low { get; set; }
public decimal Close { get; set; }
public decimal Volume { get; set; }
}
public static class Extensions
{
public static OHLCV ToCandle(this IBinanceKline candle)
{
return new OHLCV
{
Timestamp = candle.OpenTime,
Open = candle.Open,
High = candle.High,
Low = candle.Low,
Close = candle.Close,
Volume = candle.BaseVolume,
};
}
}
public class Exchange : IDisposable
{
private readonly IBinanceClient _client;
public Exchange()
{
_client = new BinanceClient();
}
public async Task<List<OHLCV>> GetCandlesAsync(string pair, KlineInterval interval, DateTime? startTime = null, DateTime? endTime = null, int? limit = null)
{
var result = await _client.Spot.Market.GetKlinesAsync(pair, interval, startTime, endTime, limit).ConfigureAwait(false);
if (result.Success)
{
return result.Data?.Select(e => e.ToCandle()).ToList();
}
return null;
}
public void Dispose()
{
if (_client != null)
{
_client.Dispose();
}
}
}
}
Комментарии:
1. В чем собственно вопрос, хотите ли вы делать больше загрузок параллельно? Занимает ли обработка после загрузки процессорное время, и вы хотите ее распараллелить?
2. Я хочу больше параллельных загрузок, да. В Python (ccxt) он загружает свечи так быстро из-за многопоточности. Python (ccxt lib) использует asyncio. github.com/freqtrade/freqtrade/blob /…
Ответ №1:
Вы серьезно переоцениваете это.
Поскольку вы получаете равномерно распределенные свечи и знаете, сколько вы получаете за вызов GetKlinesAsync
, вы можете вычислить все требуемые даты начала.
var pair = "TRXUSDT";
var interval = KlineInterval.FiveMinutes;
var startDate = new DateTime(2019, 1, 1);
var endDate = new DateTime(2019, 1, 2);
var gap = 5.0; // same as `interval` for purpose of computing start dates.
var limit = 100;
IObservable<DateTime> startDates =
Observable
.Generate(startDate, x => x <= endDate, x => x.AddMinutes(gap * limit), x => x);
Теперь сгенерировать ваш запрос довольно просто:
IObservable<OHLCV> query =
from s in startDates
from rs in
Observable
.Using(
() => new BinanceClient(),
bc => Observable.FromAsync(ct =>
bc.Spot.Market.GetKlinesAsync(pair, interval, s, endDate, limit, ct)))
where rs.Success
from c in rs.Data.Select(x => x.ToCandle())
select c;
Поскольку это параллельный запрос, возможно, даже вероятно, что вы получите свои результаты не по порядку, поэтому вам нужно выполнить .ToArray()
запрос, чтобы иметь возможность обрабатывать все данные, полученные в конце, а не по мере поступления каждой свечи.
IDisposable subscription =
query
.ToArray()
.Select(xs => xs.OrderBy(x => x.Timestamp).ToArray())
.Subscribe(cs =>
{
/* candles downloaded using multiple threads */
/* and sorted in `Timestamp` order */
});
Это создает все свечи по порядку, используя несколько потоков, без каких-либо дубликатов.
Если вы хотите использовать его в качестве DownLoadAsync
метода, тогда вы делаете это:
public async Task<List<OHLCV>> DownloadAsync(string pair, KlineInterval interval, double gap, DateTime startDate, DateTime endDate, int limit)
{
IObservable<DateTime> startDates =
Observable
.Generate(startDate, x => x <= endDate, x => x.AddMinutes(gap * limit), x => x);
IObservable<OHLCV> query =
from s in startDates
from rs in
Observable
.Using(
() => new BinanceClient(),
bc => Observable.FromAsync(ct =>
bc.Spot.Market.GetKlinesAsync(pair, interval, s, endDate, limit, ct)))
where rs.Success
from c in rs.Data.Select(x => x.ToCandle())
select c;
return await query.ToArray().Select(xs => xs.OrderBy(x => x.Timestamp).ToList());
}
Обратите внимание, что подпись немного изменилась.
Комментарии:
1. Спасибо за ваш исчерпывающий ответ! Как мне вернуть свечи в метод DownloadAsync?
2. Большое спасибо! Это то, чего я хотел.
3. @nop — Или, в полном объеме, это: pastebin.com/4GuNavFc
4. @nop — класс, который вы сделали накладным, который потенциально просто добавляет ошибок. И поскольку он реализует
IDisposable
, вы должны использоватьObservable.Using
его в любом случае. Предиктор ошибок номер один — это код. Чем больше кода вы пишете, тем больше у вас ошибок. Так что пишите меньше кода.5. @nop — Вы можете делать все, что хотите, но я стараюсь писать меньше кода, и я стараюсь создавать и распоряжаться доступными ресурсами так быстро, как только могу.
Ответ №2:
Ключом к параллельному выполнению большего количества веб-запросов является создание множества задач и ожидание их всех Task.WhenAll()
вместо ожидания каждого из них внутри цикла (циклов).
Если вы ожидаете каждого из них в цикле, они будут обрабатываться последовательно (хотя поток пользовательского интерфейса не будет заблокирован во время выполнения веб-запроса).
Комментарии:
1. Я знаю эту концепцию. Мне нужен фрагмент с моим методом DownloadAsync, потому что посмотрите на него. Даже параллельный. For не может быть использован на нем прямо сейчас