#c# #.net-core #system.reactive
#c# #.net-ядро #system.reactive
Вопрос:
У меня есть неизвестное количество подписок, которые я хотел бы удалить сразу, потому что их может стать много. Существует ли механизм для одновременного использования их всех System.Reactive
? Возможно, их обертывание Observable.Using(() => Disposable.Create...
сработает?
client.Streams.PongStream.Subscribe(x =>
Log.Information($"Pong received ({x.Message})"));
client.Streams.FundingStream.Subscribe(response =>
{
var funding = response.Data;
Log.Information($"Funding: [{funding.Symbol}] rate:[{funding.FundingRate}] "
$"mark price: {funding.MarkPrice} next funding: {funding.NextFundingTime} "
$"index price {funding.IndexPrice}");
});
client.Streams.AggregateTradesStream.Subscribe(response =>
{
var trade = response.Data;
Log.Information($"Trade aggreg [{trade.Symbol}] [{trade.Side}] "
$"price: {trade.Price} size: {trade.Quantity}");
});
client.Streams.TradesStream.Subscribe(response =>
{
var trade = response.Data;
Log.Information($"Trade normal [{trade.Symbol}] [{trade.Side}] "
$"price: {trade.Price} size: {trade.Quantity}");
});
client.Streams.OrderBookPartialStream.Subscribe(response =>
{
var ob = response.Data;
Log.Information($"Order book snapshot [{ob.Symbol}] "
$"bid: {ob.Bids.FirstOrDefault()?.Price:F} "
$"ask: {ob.Asks.FirstOrDefault()?.Price:F}");
Task.Delay(500).Wait();
//OrderBookPartialResponse.StreamFakeSnapshot(response.Data, comm);
});
client.Streams.OrderBookDiffStream.Subscribe(response =>
{
var ob = response.Data;
Log.Information($"Order book diff [{ob.Symbol}] "
$"bid: {ob.Bids.FirstOrDefault()?.Price:F} "
$"ask: {ob.Asks.FirstOrDefault()?.Price:F}");
});
client.Streams.BookTickerStream.Subscribe(response =>
{
var ob = response.Data;
Log.Information($"Book ticker [{ob.Symbol}] "
$"Best ask price: {ob.BestAskPrice} "
$"Best ask qty: {ob.BestAskQty} "
$"Best bid price: {ob.BestBidPrice} "
$"Best bid qty: {ob.BestBidQty}");
});
client.Streams.KlineStream.Subscribe(response =>
{
var ob = response.Data;
Log.Information($"Kline [{ob.Symbol}] "
$"Kline start time: {ob.Data.StartTime} "
$"Kline close time: {ob.Data.CloseTime} "
$"Interval: {ob.Data.Interval} "
$"First trade ID: {ob.Data.FirstTradeId} "
$"Last trade ID: {ob.Data.LastTradeId} "
$"Open price: {ob.Data.OpenPrice} "
$"Close price: {ob.Data.ClosePrice} "
$"High price: {ob.Data.HighPrice} "
$"Low price: {ob.Data.LowPrice} "
$"Base asset volume: {ob.Data.BaseAssetVolume} "
$"Number of trades: {ob.Data.NumberTrades} "
$"Is this kline closed?: {ob.Data.IsClose} "
$"Quote asset volume: {ob.Data.QuoteAssetVolume} "
$"Taker buy base: {ob.Data.TakerBuyBaseAssetVolume} "
$"Taker buy quote: {ob.Data.TakerBuyQuoteAssetVolume} "
$"Ignore: {ob.Data.Ignore} ");
});
client.Streams.MiniTickerStream.Subscribe(response =>
{
var ob = response.Data;
Log.Information($"Mini-ticker [{ob.Symbol}] "
$"Open price: {ob.OpenPrice} "
$"Close price: {ob.ClosePrice} "
$"High price: {ob.HighPrice} "
$"Low price: {ob.LowPrice} "
$"Base asset volume: {ob.BaseAssetVolume} "
$"Quote asset volume: {ob.QuoteAssetVolume}");
});
Вот что такое эти подписки на самом деле.
public class BinanceClientStreams
{
internal readonly Subject<PongResponse> PongSubject = new Subject<PongResponse>();
internal readonly Subject<TradeResponse> TradesSubject = new Subject<TradeResponse>();
internal readonly Subject<AggregatedTradeResponse> TradeBinSubject = new Subject<AggregatedTradeResponse>();
internal readonly Subject<OrderBookPartialResponse> OrderBookPartialSubject =
new Subject<OrderBookPartialResponse>();
internal readonly Subject<OrderBookDiffResponse> OrderBookDiffSubject = new Subject<OrderBookDiffResponse>();
internal readonly Subject<FundingResponse> FundingSubject = new Subject<FundingResponse>();
internal readonly Subject<BookTickerResponse> BookTickerSubject = new Subject<BookTickerResponse>();
internal readonly Subject<KlineResponse> KlineSubject = new Subject<KlineResponse>();
internal readonly Subject<MiniTickerResponse> MiniTickerSubject = new Subject<MiniTickerResponse>();
// PUBLIC
/// <summary>
/// Response stream to every ping request
/// </summary>
public IObservable<PongResponse> PongStream => PongSubject.AsObservable();
/// <summary>
/// Trades stream - emits every executed trade on Binance
/// </summary>
public IObservable<TradeResponse> TradesStream => TradesSubject.AsObservable();
/// <summary>
/// Chunk of trades - emits grouped trades together
/// </summary>
public IObservable<AggregatedTradeResponse> AggregateTradesStream => TradeBinSubject.AsObservable();
/// <summary>
/// Partial order book stream - emits small snapshot of the order book
/// </summary>
public IObservable<OrderBookPartialResponse> OrderBookPartialStream => OrderBookPartialSubject.AsObservable();
/// <summary>
/// Order book difference stream - emits small snapshot of the order book
/// </summary>
public IObservable<OrderBookDiffResponse> OrderBookDiffStream => OrderBookDiffSubject.AsObservable();
/// <summary>
/// Mark price and funding rate stream - emits mark price and funding rate for a single symbol pushed every 3 seconds or every second
/// </summary>
public IObservable<FundingResponse> FundingStream => FundingSubject.AsObservable();
/// <summary>
/// The best bid or ask's price or quantity in real-time for a specified symbol
/// </summary>
public IObservable<BookTickerResponse> BookTickerStream => BookTickerSubject.AsObservable();
/// <summary>
/// The Kline/Candlestick subscription, provide symbol and chart intervals
/// </summary>
public IObservable<KlineResponse> KlineStream => KlineSubject.AsObservable();
/// <summary>
/// Mini-ticker specified symbol statistics for the previous 24hrs
/// </summary>
public IObservable<MiniTickerResponse> MiniTickerStream => MiniTickerSubject.AsObservable();
}
Ответ №1:
Я думаю, что вы ищете CompositeDisposable
. Вам нужно создать экземпляр этого класса и добавить к нему все ваши подписки.
var compDisp = new CompositeDisposable();
compDisp.Add(client.Streams.PongStream.Subscribe(x =>
Log.Information($"Pong received ({x.Message})")));
compDisp.Add(client.Streams.FundingStream.Subscribe(response =>
{
var funding = response.Data;
Log.Information($"Funding: [{funding.Symbol}] rate:[{funding.FundingRate}] "
$"mark price: {funding.MarkPrice} next funding: {funding.NextFundingTime} "
$"index price {funding.IndexPrice}");
}));
compDisp.Add(client.Streams.AggregateTradesStream.Subscribe(response =>
{
var trade = response.Data;
Log.Information($"Trade aggreg [{trade.Symbol}] [{trade.Side}] "
$"price: {trade.Price} size: {trade.Quantity}");
}));
compDisp.Add(client.Streams.TradesStream.Subscribe(response =>
{
var trade = response.Data;
Log.Information($"Trade normal [{trade.Symbol}] [{trade.Side}] "
$"price: {trade.Price} size: {trade.Quantity}");
}));
compDisp.Add(client.Streams.OrderBookPartialStream.Subscribe(response =>
{
var ob = response.Data;
Log.Information($"Order book snapshot [{ob.Symbol}] "
$"bid: {ob.Bids.FirstOrDefault()?.Price:F} "
$"ask: {ob.Asks.FirstOrDefault()?.Price:F}");
Task.Delay(500).Wait();
//OrderBookPartialResponse.StreamFakeSnapshot(response.Data, comm);
}));
compDisp.Add(client.Streams.OrderBookDiffStream.Subscribe(response =>
{
var ob = response.Data;
Log.Information($"Order book diff [{ob.Symbol}] "
$"bid: {ob.Bids.FirstOrDefault()?.Price:F} "
$"ask: {ob.Asks.FirstOrDefault()?.Price:F}");
}));
compDisp.Add(client.Streams.BookTickerStream.Subscribe(response =>
{
var ob = response.Data;
Log.Information($"Book ticker [{ob.Symbol}] "
$"Best ask price: {ob.BestAskPrice} "
$"Best ask qty: {ob.BestAskQty} "
$"Best bid price: {ob.BestBidPrice} "
$"Best bid qty: {ob.BestBidQty}");
}));
compDisp.Add(client.Streams.KlineStream.Subscribe(response =>
{
var ob = response.Data;
Log.Information($"Kline [{ob.Symbol}] "
$"Kline start time: {ob.Data.StartTime} "
$"Kline close time: {ob.Data.CloseTime} "
$"Interval: {ob.Data.Interval} "
$"First trade ID: {ob.Data.FirstTradeId} "
$"Last trade ID: {ob.Data.LastTradeId} "
$"Open price: {ob.Data.OpenPrice} "
$"Close price: {ob.Data.ClosePrice} "
$"High price: {ob.Data.HighPrice} "
$"Low price: {ob.Data.LowPrice} "
$"Base asset volume: {ob.Data.BaseAssetVolume} "
$"Number of trades: {ob.Data.NumberTrades} "
$"Is this kline closed?: {ob.Data.IsClose} "
$"Quote asset volume: {ob.Data.QuoteAssetVolume} "
$"Taker buy base: {ob.Data.TakerBuyBaseAssetVolume} "
$"Taker buy quote: {ob.Data.TakerBuyQuoteAssetVolume} "
$"Ignore: {ob.Data.Ignore} ");
}));
compDisp.Add(client.Streams.MiniTickerStream.Subscribe(response =>
{
var ob = response.Data;
Log.Information($"Mini-ticker [{ob.Symbol}] "
$"Open price: {ob.OpenPrice} "
$"Close price: {ob.ClosePrice} "
$"High price: {ob.HighPrice} "
$"Low price: {ob.LowPrice} "
$"Base asset volume: {ob.BaseAssetVolume} "
$"Quote asset volume: {ob.QuoteAssetVolume}");
}));
Все подписки будут удалены, как только compDisp
экземпляр будет удален. Когда это будет сделано, конечно, зависит от контекста вашего приложения.
Редактировать: в зависимости от архитектуры вашего приложения WhenActivated
вам также может быть интересен метод расширения. Он определен в ActivatableView
интерфейсе ActivatableViewModel
and и принимает функцию, которая вызывается каждый раз, когда активируется представление (модель) (т. Е. В основном, когда оно отображается на экране). Эта функция также имеет CompositeDisposable
параметр as, который добавляется каждый раз, когда представление (модель) деактивируется.
Редактировать 2 Только что понял, что DiposeWith
метод на самом деле является частью ReactiveUI
фреймворка, а также методом WhenAcitvated
расширения, а не частью реактивного расширения, на котором основан этот фреймворк. Таким образом, вы не можете писать такие вещи, как myObservable.Subscribe(x => ...).DisposeWith(compDisp)
без использования этой платформы, но compDisp.Add(myObservable.Subscribe(x => ...))
должны работать. Я соответствующим образом скорректировал приведенный выше код.
Комментарии:
1. Спасибо за ваш ответ! Вот что я сделал, основываясь на вашем ответе: controlc.com/4d3cb01d . Можете ли вы это проверить?
2. Я не знал, что CompositeDisposable также может быть сконструирован таким образом (как уже упоминалось, я обычно использую метод DisposeWith от ReactuveUI). Однако я только что изменил класс модели представления в своем приложении для тестирования, и, похоже, он работает так, как вы это сделали.
3. Большое вам спасибо! 🙂 Я добавил методы расширения в проект, и в настоящее время я использую
.DisposeWith