Отмена вложенного объединенного издателя

#swift #combine

#swift #объединить

Вопрос:

Я пытаюсь переключить свое приложение на использование конвейера объединения. Я надеялся упростить управление потоками, но столкнулся с неожиданным поведением Combine.

Я предположил, что, несмотря на то, что я подписываюсь на DispatchQueue.global(), отмена основного конвейера приведет к отмене вложенной подписки.

Вот моя игровая площадка:

 import Cocoa
import Combine

let folders = ["folder1", "folder2", "folder3", "folder4"]

class OneByOnePublisher: Publisher {
    typealias Output = String
    typealias Failure = Never
    
    let input: [String]
    init(input: [String]) {
        self.input = input
    }
  
    func receive<Downstream: Subscriber>(subscriber: Downstream) where Downstream.Input == Output, Downstream.Failure == Failure {
        let subject = PassthroughSubject<String, Never>()
        subject.receive(subscriber: subscriber)
        for value in input {
            subject.send(value)
        }
        subject.send(completion: .finished)
    }
}

func uppercase(_ character: Character) -> String {
    print("Uppercasing (character)")
    Thread.sleep(forTimeInterval: 0.5)
    return character.uppercased()
}

func uppercasePublisher(_ folder: String) -> AnyPublisher<String, Never> {
    return folder.publisher
//        .handleEvents(receiveCancel: { print("Received cancel in nested") })
        .map{uppercase($0)}
        .collect()
        .map{$0.joined()}
        .eraseToAnyPublisher()
}


let stringPublisher = PassthroughSubject<String, Never>()
let oneByOnePublisher = OneByOnePublisher(input: folders)


let cancelable = oneByOnePublisher
    .subscribe(on: DispatchQueue.global())
    .handleEvents(receiveCancel: { print("Received cancel in main") })
    .flatMap{uppercasePublisher($0)}
    .receive(on: DispatchQueue.main)
    .sink { (completion) in
        print("Received completion: (completion)")
    } receiveValue: { (value) in
        print("Received value: (value)")
    }

Thread.sleep(forTimeInterval: 2)
cancelable.cancel()
Thread.sleep(forTimeInterval: 2)
print("Done")
  

Вывод этого

 Uppercasing f
Uppercasing o
Uppercasing l
Uppercasing d
Received cancel in main
Uppercasing e
Uppercasing r
Uppercasing 1
Done

  

Однако, если я раскомментирую строку с

 //        .handleEvents(receiveCancel: { print("Received cancel in nested") })
  

Тогда результат — это то, что я ожидал бы в первую очередь

 Uppercasing f
Uppercasing o
Uppercasing l
Uppercasing d
Received cancel in nested
Received cancel in main
Done
  

Чего мне не хватает? Почему в первом случае вложенная подписка не отменяется немедленно? Почему добавление handleEvents() изменяет поток отмены?

Комментарии:

1. «Немедленно» здесь запутывается, потому что вы подписываетесь на параллельную очередь.

2. Не связано, но на самом деле вам не нужно создавать OneByOnePublisher издателя… Простое folders.publisher выполнение приведет к тому же результату. Кроме того, вам не нужно uppercasePublisher — вы можете добиться этого с помощью простого .map { uppercase($0) }

3. Да, я создал OneByOnePublisher просто в качестве примера. Он точно имитирует фактический дизайн, в котором есть вложенный издатель, и все выполняется в глобальной очереди.

4. Для немедленного, я не против немедленного прекращения, но он все равно должен отменить вложенный издатель в середине потока. По какой-то причине этого не происходит, и вложенный издатель продолжает отправлять данные.

5. Тогда почему он приходит вовремя, когда у меня есть handleEvents()? Из вывода отладки я вижу, что в последнем случае вложенный издатель перестает выдавать значения.

Ответ №1:

Вероятно, есть ошибка с Publishers.Sequence издателем, которая является результатом folder.publisher вызова. И добавление handleEvents вызова скрывает эту ошибку, поскольку вызов приводит к издателю-оболочке, который, похоже, правильно обрабатывает отмену.

Чтобы проверить эту теорию, давайте адаптируем OneByOnePublisher ее для работы с любыми последовательностями (это также заставит ее работать с исходным массивом строк):

 class OneByOnePublisher<Seq: Sequence>: Publisher {
    typealias Output = Seq.Element
    typealias Failure = Never
    
    let input: Seq
    init(input: Seq) {
        self.input = input
    }
  
    func receive<Downstream: Subscriber>(subscriber: Downstream) where Downstream.Input == Output, Downstream.Failure == Failure {
        let subject = PassthroughSubject<Output, Failure>()
        subject.receive(subscriber: subscriber)
        for value in input {
            subject.send(value)
        }
        subject.send(completion: .finished)
    }
}

extension Sequence {
    var oneByOne: OneByOnePublisher<Self> { OneByOnePublisher(input: self) }
}
  

Теперь, если мы изменим uppercasePublisher использование расширенного издателя

 func uppercasePublisher(_ folder: String) -> AnyPublisher<String, Never> {
    return folder.oneByOne
        // .handleEvents(receiveCancel: { NSLog("Received cancel in nested") })
        .map{uppercase($0)}
        .collect()
        .map{$0.joined()}
        .eraseToAnyPublisher()
}
  

мы видим, что отмена работает как ожидалось, независимо handleEvents от того, прокомментирована строка или нет. Это указывает на то, что исходная folder.publisher строка была источником проблемы, и, более конкретно, это был Publishers.Sequence экземпляр, вызывающий проблему.

Комментарии:

1. Попал в точку! Спасибо, что разобрались с этой головоломкой. Я не уверен, в чем именно заключается ошибка, но на самом деле это не только .handleEvents(), который позволяет избежать ошибки, но и print() . В любом случае, еще раз спасибо!

2. @JkMu на какой print вызов вы ссылаетесь? Если это тот, который находится в receiveCancel закрытии, то это не меняет правила игры, вы можете просто .handleEvents() без аргументов и увидеть то же поведение.

3. Я имею в виду, что если вы замените .handleEvents() на .print() , это будет иметь тот же эффект при отмене.