When writing asynchronous code using Combine, we might sometimes want to share the result of a given set of concurrent operations, rather than performing duplicate work for each one. Let’s take a look at how the share
operator can enable us to do that in a really neat way.
As an example, let’s say that we’re working on the following ArticleLoader
, which uses URLSession
to load an Article
model from a given URL:
class ArticleLoader {
private let urlSession: URLSession
private let decoder: JSONDecoder
init(urlSession: URLSession = .shared,
decoder: JSONDecoder = .init()) {
self.urlSession = urlSession
self.decoder = decoder
}
func loadArticle(from url: URL) -> AnyPublisher<Article, Error> {
urlSession
.dataTaskPublisher(for: url)
.map(\.data)
.decode(type: Article.self, decoder: decoder)
.eraseToAnyPublisher()
}
}
Now let’s say that we’re expecting the above loadArticle
method to be called multiple times, in either parallel or quick succession, with the same URL — which currently would lead to duplicate network requests, as each call to our method produces a brand new publisher.
To address that, let’s store each of the publishers that we create within a dictionary (keyed by the URL that each publisher is for), and then when we receive a loadArticle
call, we’ll first check if that dictionary contains an existing publisher that can be reused — like this:
class ArticleLoader {
typealias Publisher = AnyPublisher<Article, Error>
private let urlSession: URLSession
private let decoder: JSONDecoder
private var publishers = [URL: Publisher]()
...
func loadArticle(from url: URL) -> Publisher {
if let publisher = publishers[url] {
return publisher
}
let publisher = urlSession
.dataTaskPublisher(for: url)
.map(\.data)
.decode(type: Article.self, decoder: decoder)
.handleEvents(receiveCompletion: { [weak self] _ in
self?.publishers[url] = nil
})
.eraseToAnyPublisher()
publishers[url] = publisher
return publisher
}
}
Note how we also remove each publisher from our dictionary once it completes, as to avoid keeping old publishers around in memory. We use receiveCompletion
, rather than receiveOutput
, to also be notified whenever an error was encountered.
Now, looking at the above code, it might initially seem like we’ve solved our problem. However, if we look at our network logs (or simply put a print("Done")
call within our handleEvents
closure), then it turns out that we’re actually still performing multiple, duplicate operations. How can that be?
It turns out that even if we are indeed reusing our publisher instances, that doesn’t guarantee that we’re actually reusing the work that those publishers are performing. In fact, by default, each publisher will run through our entire data pipeline for each subscriber that attaches to it. That might initially seem rather odd, so let’s examine that behavior from a slightly different angle.
As another quick example, here we’re creating a publisher that uses a Timer
to publish a new random number every second, and we’re then attaching two separate subscribers to that publisher, both of which simply print the numbers that they receive:
var cancellables = Set<AnyCancellable>()
let randomNumberGenerator = Timer
.publish(every: 1, on: .main, in: .common)
.autoconnect()
.map { _ in Int.random(in: 1...100) }
randomNumberGenerator
.sink { number in
print(number)
}
.store(in: &cancellables)
randomNumberGenerator
.sink { number in
print(number)
}
.store(in: &cancellables)
It would arguably be a bit strange if both of our subscribers were given the exact same number every second, given that we’re expecting each number to be completely random (and therefore somewhat “unique”). So, from that perspective, the fact that Combine publishers produce separate output for each subscriber does arguably make a lot of sense.
But, going back to our ArticleLoader
, how can we then modify that default behavior to prevent duplicate network calls from being performed?
The good news is that all that we have to do is to use the share
operator, which (like its name implies) modifies a given Combine pipeline so that the result of its work is automatically shared among all subscribers:
class ArticleLoader {
...
func loadArticle(from url: URL) -> Publisher {
if let publisher = publishers[url] {
return publisher
}
let publisher = urlSession
.dataTaskPublisher(for: url)
.map(\.data)
.decode(type: Article.self, decoder: decoder)
.handleEvents(receiveCompletion: { [weak self] _ in
self?.publishers[url] = nil
})
.share()
.eraseToAnyPublisher()
publishers[url] = publisher
return publisher
}
}
With just that tiny change in place, we’ve now completely solved our problem. Now, even if multiple loadArticle
calls happen in quick succession, only a single network call will be performed, and its result will be reported to each of those call sites.
Well, perhaps “complete solved” isn’t entirely true, because our implementation still has one potential issue — it currently doesn’t account for the fact that our ArticleLoader
is likely to be called on a different thread than what URLSession
returns its data task output on. While it’s likely that this will never end up causing any actual problems, how about we do a quick bonus round and make our implementation completely thread-safe while we’re at it?
To do that, let’s make a few tweaks to our loadArticle
implementation. First, we’ll base our Combine pipeline on our input URL, and we’ll then immediately jump over to an internal DispatchQueue
, which we’ll also use when receiving a completion event from one of our publishers. That way, we can guarantee that our publishers
dictionary will always be both read and written to on the exact same queue:
class ArticleLoader {
...
private let queue = DispatchQueue(label: "ArticleLoader")
func loadArticle(from url: URL) -> Publisher {
Just(url)
.receive(on: queue)
.flatMap { [weak self, urlSession, queue, decoder] url -> Publisher in
if let publisher = self?.publishers[url] {
return publisher
}
let publisher = urlSession
.dataTaskPublisher(for: url)
.map(\.data)
.decode(type: Article.self, decoder: decoder)
.receive(on: queue)
.handleEvents(receiveCompletion: { [weak self] _ in
self?.publishers[url] = nil
})
.share()
.eraseToAnyPublisher()
self?.publishers[url] = publisher
return publisher
}
.eraseToAnyPublisher()
}
}
With those tweaks in place, we now have a completely thread-safe implementation that successfully reuses publishers to avoid having to perform any duplicate work. A potential next step could be to add caching to the above implementation (we currently just rely on the default caching mechanism that URLSession
provides out of the box), if that’s something that we think would be useful.
So that’s how the share
operator can be used to avoid duplicate work within a Combine pipeline. I hope you found this article useful and interesting, and if you did, feel free to share it with a friend or on social media. That always helps me out a lot! And, if you’ve got any questions, comments, or feedback, then feel free to reach out via email.
Thanks for reading!