Система.Реактивный — одновременное удаление неизвестного количества подписок

#c# #.net-core #system.reactive

#c# #.net-ядро #system.reactive

Вопрос:

У меня есть неизвестное количество подписок, которые я хотел бы удалить сразу, потому что их может стать много. Существует ли механизм для одновременного использования их всех System.Reactive ? Возможно, их обертывание Observable.Using(() => Disposable.Create... сработает?

 client.Streams.PongStream.Subscribe(x =>
    Log.Information($"Pong received ({x.Message})"));

client.Streams.FundingStream.Subscribe(response =>
{
    var funding = response.Data;
    Log.Information($"Funding: [{funding.Symbol}] rate:[{funding.FundingRate}] "  
                    $"mark price: {funding.MarkPrice} next funding: {funding.NextFundingTime} "  
                    $"index price {funding.IndexPrice}");
});

client.Streams.AggregateTradesStream.Subscribe(response =>
{
    var trade = response.Data;
    Log.Information($"Trade aggreg [{trade.Symbol}] [{trade.Side}] "  
                    $"price: {trade.Price} size: {trade.Quantity}");
});

client.Streams.TradesStream.Subscribe(response =>
{
    var trade = response.Data;
    Log.Information($"Trade normal [{trade.Symbol}] [{trade.Side}] "  
                    $"price: {trade.Price} size: {trade.Quantity}");
});

client.Streams.OrderBookPartialStream.Subscribe(response =>
{
    var ob = response.Data;
    Log.Information($"Order book snapshot [{ob.Symbol}] "  
                    $"bid: {ob.Bids.FirstOrDefault()?.Price:F} "  
                    $"ask: {ob.Asks.FirstOrDefault()?.Price:F}");
    Task.Delay(500).Wait();
    //OrderBookPartialResponse.StreamFakeSnapshot(response.Data, comm);
});

client.Streams.OrderBookDiffStream.Subscribe(response =>
{
    var ob = response.Data;
    Log.Information($"Order book diff [{ob.Symbol}] "  
                    $"bid: {ob.Bids.FirstOrDefault()?.Price:F} "  
                    $"ask: {ob.Asks.FirstOrDefault()?.Price:F}");
});

client.Streams.BookTickerStream.Subscribe(response =>
{
    var ob = response.Data;
    Log.Information($"Book ticker [{ob.Symbol}] "  
                    $"Best ask price: {ob.BestAskPrice} "  
                    $"Best ask qty: {ob.BestAskQty} "  
                    $"Best bid price: {ob.BestBidPrice} "  
                    $"Best bid qty: {ob.BestBidQty}");
});

client.Streams.KlineStream.Subscribe(response =>
{
    var ob = response.Data;
    Log.Information($"Kline [{ob.Symbol}] "  
                    $"Kline start time: {ob.Data.StartTime} "  
                    $"Kline close time: {ob.Data.CloseTime} "  
                    $"Interval: {ob.Data.Interval} "  
                    $"First trade ID: {ob.Data.FirstTradeId} "  
                    $"Last trade ID: {ob.Data.LastTradeId} "  
                    $"Open price: {ob.Data.OpenPrice} "  
                    $"Close price: {ob.Data.ClosePrice} "  
                    $"High price: {ob.Data.HighPrice} "  
                    $"Low price: {ob.Data.LowPrice} "  
                    $"Base asset volume: {ob.Data.BaseAssetVolume} "  
                    $"Number of trades: {ob.Data.NumberTrades} "  
                    $"Is this kline closed?: {ob.Data.IsClose} "  
                    $"Quote asset volume: {ob.Data.QuoteAssetVolume} "  
                    $"Taker buy base: {ob.Data.TakerBuyBaseAssetVolume} "  
                    $"Taker buy quote: {ob.Data.TakerBuyQuoteAssetVolume} "  
                    $"Ignore: {ob.Data.Ignore} ");
});

client.Streams.MiniTickerStream.Subscribe(response =>
{
    var ob = response.Data;
    Log.Information($"Mini-ticker [{ob.Symbol}] "  
                    $"Open price: {ob.OpenPrice} "  
                    $"Close price: {ob.ClosePrice} "  
                    $"High price: {ob.HighPrice} "  
                    $"Low price: {ob.LowPrice} "  
                    $"Base asset volume: {ob.BaseAssetVolume} "  
                    $"Quote asset volume: {ob.QuoteAssetVolume}");
});
 

Вот что такое эти подписки на самом деле.

 public class BinanceClientStreams
{
    internal readonly Subject<PongResponse> PongSubject = new Subject<PongResponse>();

    internal readonly Subject<TradeResponse> TradesSubject = new Subject<TradeResponse>();
    internal readonly Subject<AggregatedTradeResponse> TradeBinSubject = new Subject<AggregatedTradeResponse>();

    internal readonly Subject<OrderBookPartialResponse> OrderBookPartialSubject =
        new Subject<OrderBookPartialResponse>();

    internal readonly Subject<OrderBookDiffResponse> OrderBookDiffSubject = new Subject<OrderBookDiffResponse>();
    internal readonly Subject<FundingResponse> FundingSubject = new Subject<FundingResponse>();

    internal readonly Subject<BookTickerResponse> BookTickerSubject = new Subject<BookTickerResponse>();
    
    internal readonly Subject<KlineResponse> KlineSubject = new Subject<KlineResponse>();
    
    internal readonly Subject<MiniTickerResponse> MiniTickerSubject = new Subject<MiniTickerResponse>();
    
    // PUBLIC

    /// <summary>
    /// Response stream to every ping request
    /// </summary>
    public IObservable<PongResponse> PongStream => PongSubject.AsObservable();

    /// <summary>
    /// Trades stream - emits every executed trade on Binance
    /// </summary>
    public IObservable<TradeResponse> TradesStream => TradesSubject.AsObservable();

    /// <summary>
    /// Chunk of trades - emits grouped trades together
    /// </summary>
    public IObservable<AggregatedTradeResponse> AggregateTradesStream => TradeBinSubject.AsObservable();

    /// <summary>
    /// Partial order book stream - emits small snapshot of the order book
    /// </summary>
    public IObservable<OrderBookPartialResponse> OrderBookPartialStream => OrderBookPartialSubject.AsObservable();

    /// <summary>
    /// Order book difference stream - emits small snapshot of the order book
    /// </summary>
    public IObservable<OrderBookDiffResponse> OrderBookDiffStream => OrderBookDiffSubject.AsObservable();

    /// <summary>
    /// Mark price and funding rate stream - emits mark price and funding rate for a single symbol pushed every 3 seconds or every second
    /// </summary>
    public IObservable<FundingResponse> FundingStream => FundingSubject.AsObservable();

    /// <summary>
    ///  The best bid or ask's price or quantity in real-time for a specified symbol
    /// </summary>
    public IObservable<BookTickerResponse> BookTickerStream => BookTickerSubject.AsObservable();

    /// <summary>
    /// The Kline/Candlestick subscription, provide symbol and chart intervals
    /// </summary>
    public IObservable<KlineResponse> KlineStream => KlineSubject.AsObservable();

    /// <summary>
    /// Mini-ticker specified symbol statistics for the previous 24hrs
    /// </summary>
    public IObservable<MiniTickerResponse> MiniTickerStream => MiniTickerSubject.AsObservable();
}
 

Ответ №1:

Я думаю, что вы ищете CompositeDisposable . Вам нужно создать экземпляр этого класса и добавить к нему все ваши подписки.

 var compDisp = new CompositeDisposable();

compDisp.Add(client.Streams.PongStream.Subscribe(x =>
    Log.Information($"Pong received ({x.Message})")));

compDisp.Add(client.Streams.FundingStream.Subscribe(response =>
{
    var funding = response.Data;
    Log.Information($"Funding: [{funding.Symbol}] rate:[{funding.FundingRate}] "  
                    $"mark price: {funding.MarkPrice} next funding: {funding.NextFundingTime} "  
                    $"index price {funding.IndexPrice}");
}));

compDisp.Add(client.Streams.AggregateTradesStream.Subscribe(response =>
{
    var trade = response.Data;
    Log.Information($"Trade aggreg [{trade.Symbol}] [{trade.Side}] "  
                    $"price: {trade.Price} size: {trade.Quantity}");
}));

compDisp.Add(client.Streams.TradesStream.Subscribe(response =>
{
    var trade = response.Data;
    Log.Information($"Trade normal [{trade.Symbol}] [{trade.Side}] "  
                    $"price: {trade.Price} size: {trade.Quantity}");
}));

compDisp.Add(client.Streams.OrderBookPartialStream.Subscribe(response =>
{
    var ob = response.Data;
    Log.Information($"Order book snapshot [{ob.Symbol}] "  
                    $"bid: {ob.Bids.FirstOrDefault()?.Price:F} "  
                    $"ask: {ob.Asks.FirstOrDefault()?.Price:F}");
    Task.Delay(500).Wait();
    //OrderBookPartialResponse.StreamFakeSnapshot(response.Data, comm);
}));

compDisp.Add(client.Streams.OrderBookDiffStream.Subscribe(response =>
{
    var ob = response.Data;
    Log.Information($"Order book diff [{ob.Symbol}] "  
                    $"bid: {ob.Bids.FirstOrDefault()?.Price:F} "  
                    $"ask: {ob.Asks.FirstOrDefault()?.Price:F}");
}));

compDisp.Add(client.Streams.BookTickerStream.Subscribe(response =>
{
    var ob = response.Data;
    Log.Information($"Book ticker [{ob.Symbol}] "  
                    $"Best ask price: {ob.BestAskPrice} "  
                    $"Best ask qty: {ob.BestAskQty} "  
                    $"Best bid price: {ob.BestBidPrice} "  
                    $"Best bid qty: {ob.BestBidQty}");
}));

compDisp.Add(client.Streams.KlineStream.Subscribe(response =>
{
    var ob = response.Data;
    Log.Information($"Kline [{ob.Symbol}] "  
                    $"Kline start time: {ob.Data.StartTime} "  
                    $"Kline close time: {ob.Data.CloseTime} "  
                    $"Interval: {ob.Data.Interval} "  
                    $"First trade ID: {ob.Data.FirstTradeId} "  
                    $"Last trade ID: {ob.Data.LastTradeId} "  
                    $"Open price: {ob.Data.OpenPrice} "  
                    $"Close price: {ob.Data.ClosePrice} "  
                    $"High price: {ob.Data.HighPrice} "  
                    $"Low price: {ob.Data.LowPrice} "  
                    $"Base asset volume: {ob.Data.BaseAssetVolume} "  
                    $"Number of trades: {ob.Data.NumberTrades} "  
                    $"Is this kline closed?: {ob.Data.IsClose} "  
                    $"Quote asset volume: {ob.Data.QuoteAssetVolume} "  
                    $"Taker buy base: {ob.Data.TakerBuyBaseAssetVolume} "  
                    $"Taker buy quote: {ob.Data.TakerBuyQuoteAssetVolume} "  
                    $"Ignore: {ob.Data.Ignore} ");
}));

compDisp.Add(client.Streams.MiniTickerStream.Subscribe(response =>
{
    var ob = response.Data;
    Log.Information($"Mini-ticker [{ob.Symbol}] "  
                    $"Open price: {ob.OpenPrice} "  
                    $"Close price: {ob.ClosePrice} "  
                    $"High price: {ob.HighPrice} "  
                    $"Low price: {ob.LowPrice} "  
                    $"Base asset volume: {ob.BaseAssetVolume} "  
                    $"Quote asset volume: {ob.QuoteAssetVolume}");
}));
 

Все подписки будут удалены, как только compDisp экземпляр будет удален. Когда это будет сделано, конечно, зависит от контекста вашего приложения.

Редактировать: в зависимости от архитектуры вашего приложения WhenActivated вам также может быть интересен метод расширения. Он определен в ActivatableView интерфейсе ActivatableViewModel and и принимает функцию, которая вызывается каждый раз, когда активируется представление (модель) (т. Е. В основном, когда оно отображается на экране). Эта функция также имеет CompositeDisposable параметр as, который добавляется каждый раз, когда представление (модель) деактивируется.

Редактировать 2 Только что понял, что DiposeWith метод на самом деле является частью ReactiveUI фреймворка, а также методом WhenAcitvated расширения, а не частью реактивного расширения, на котором основан этот фреймворк. Таким образом, вы не можете писать такие вещи, как myObservable.Subscribe(x => ...).DisposeWith(compDisp) без использования этой платформы, но compDisp.Add(myObservable.Subscribe(x => ...)) должны работать. Я соответствующим образом скорректировал приведенный выше код.

Комментарии:

1. Спасибо за ваш ответ! Вот что я сделал, основываясь на вашем ответе: controlc.com/4d3cb01d . Можете ли вы это проверить?

2. Я не знал, что CompositeDisposable также может быть сконструирован таким образом (как уже упоминалось, я обычно использую метод DisposeWith от ReactuveUI). Однако я только что изменил класс модели представления в своем приложении для тестирования, и, похоже, он работает так, как вы это сделали.

3. Большое вам спасибо! 🙂 Я добавил методы расширения в проект, и в настоящее время я использую .DisposeWith