#swift #combine
#swift #объединить
Вопрос:
Я пытаюсь переключить свое приложение на использование конвейера объединения. Я надеялся упростить управление потоками, но столкнулся с неожиданным поведением Combine.
Я предположил, что, несмотря на то, что я подписываюсь на DispatchQueue.global(), отмена основного конвейера приведет к отмене вложенной подписки.
Вот моя игровая площадка:
import Cocoa
import Combine
let folders = ["folder1", "folder2", "folder3", "folder4"]
class OneByOnePublisher: Publisher {
typealias Output = String
typealias Failure = Never
let input: [String]
init(input: [String]) {
self.input = input
}
func receive<Downstream: Subscriber>(subscriber: Downstream) where Downstream.Input == Output, Downstream.Failure == Failure {
let subject = PassthroughSubject<String, Never>()
subject.receive(subscriber: subscriber)
for value in input {
subject.send(value)
}
subject.send(completion: .finished)
}
}
func uppercase(_ character: Character) -> String {
print("Uppercasing (character)")
Thread.sleep(forTimeInterval: 0.5)
return character.uppercased()
}
func uppercasePublisher(_ folder: String) -> AnyPublisher<String, Never> {
return folder.publisher
// .handleEvents(receiveCancel: { print("Received cancel in nested") })
.map{uppercase($0)}
.collect()
.map{$0.joined()}
.eraseToAnyPublisher()
}
let stringPublisher = PassthroughSubject<String, Never>()
let oneByOnePublisher = OneByOnePublisher(input: folders)
let cancelable = oneByOnePublisher
.subscribe(on: DispatchQueue.global())
.handleEvents(receiveCancel: { print("Received cancel in main") })
.flatMap{uppercasePublisher($0)}
.receive(on: DispatchQueue.main)
.sink { (completion) in
print("Received completion: (completion)")
} receiveValue: { (value) in
print("Received value: (value)")
}
Thread.sleep(forTimeInterval: 2)
cancelable.cancel()
Thread.sleep(forTimeInterval: 2)
print("Done")
Вывод этого
Uppercasing f
Uppercasing o
Uppercasing l
Uppercasing d
Received cancel in main
Uppercasing e
Uppercasing r
Uppercasing 1
Done
Однако, если я раскомментирую строку с
// .handleEvents(receiveCancel: { print("Received cancel in nested") })
Тогда результат — это то, что я ожидал бы в первую очередь
Uppercasing f
Uppercasing o
Uppercasing l
Uppercasing d
Received cancel in nested
Received cancel in main
Done
Чего мне не хватает? Почему в первом случае вложенная подписка не отменяется немедленно? Почему добавление handleEvents() изменяет поток отмены?
Комментарии:
1. «Немедленно» здесь запутывается, потому что вы подписываетесь на параллельную очередь.
2. Не связано, но на самом деле вам не нужно создавать
OneByOnePublisher
издателя… Простоеfolders.publisher
выполнение приведет к тому же результату. Кроме того, вам не нужноuppercasePublisher
— вы можете добиться этого с помощью простого.map { uppercase($0) }
3. Да, я создал OneByOnePublisher просто в качестве примера. Он точно имитирует фактический дизайн, в котором есть вложенный издатель, и все выполняется в глобальной очереди.
4. Для немедленного, я не против немедленного прекращения, но он все равно должен отменить вложенный издатель в середине потока. По какой-то причине этого не происходит, и вложенный издатель продолжает отправлять данные.
5. Тогда почему он приходит вовремя, когда у меня есть handleEvents()? Из вывода отладки я вижу, что в последнем случае вложенный издатель перестает выдавать значения.
Ответ №1:
Вероятно, есть ошибка с Publishers.Sequence
издателем, которая является результатом folder.publisher
вызова. И добавление handleEvents
вызова скрывает эту ошибку, поскольку вызов приводит к издателю-оболочке, который, похоже, правильно обрабатывает отмену.
Чтобы проверить эту теорию, давайте адаптируем OneByOnePublisher
ее для работы с любыми последовательностями (это также заставит ее работать с исходным массивом строк):
class OneByOnePublisher<Seq: Sequence>: Publisher {
typealias Output = Seq.Element
typealias Failure = Never
let input: Seq
init(input: Seq) {
self.input = input
}
func receive<Downstream: Subscriber>(subscriber: Downstream) where Downstream.Input == Output, Downstream.Failure == Failure {
let subject = PassthroughSubject<Output, Failure>()
subject.receive(subscriber: subscriber)
for value in input {
subject.send(value)
}
subject.send(completion: .finished)
}
}
extension Sequence {
var oneByOne: OneByOnePublisher<Self> { OneByOnePublisher(input: self) }
}
Теперь, если мы изменим uppercasePublisher
использование расширенного издателя
func uppercasePublisher(_ folder: String) -> AnyPublisher<String, Never> {
return folder.oneByOne
// .handleEvents(receiveCancel: { NSLog("Received cancel in nested") })
.map{uppercase($0)}
.collect()
.map{$0.joined()}
.eraseToAnyPublisher()
}
мы видим, что отмена работает как ожидалось, независимо handleEvents
от того, прокомментирована строка или нет. Это указывает на то, что исходная folder.publisher
строка была источником проблемы, и, более конкретно, это был Publishers.Sequence
экземпляр, вызывающий проблему.
Комментарии:
1. Попал в точку! Спасибо, что разобрались с этой головоломкой. Я не уверен, в чем именно заключается ошибка, но на самом деле это не только .handleEvents(), который позволяет избежать ошибки, но и print() . В любом случае, еще раз спасибо!
2. @JkMu на какой
receiveCancel
закрытии, то это не меняет правила игры, вы можете просто.handleEvents()
без аргументов и увидеть то же поведение.3. Я имею в виду, что если вы замените .handleEvents() на .print() , это будет иметь тот же эффект при отмене.