Printed on: March 25, 2024
Some time in the past I’ve printed a publish that explains how you should utilize AsyncStream to construct your individual asynchronous sequences in Swift Concurrency. Since writing that publish, a brand new method to creating AsyncStream objects has been launched to permit for extra comfort stream constructing.
On this publish, I’ll increase on what we’ve already coated within the earlier publish in order that we don’t must go over every little thing from scratch.
By the top of this publish you’ll perceive the brand new and extra handy makeStream
methodology that was added to AsyncStream
. You’ll learn the way and when it is sensible to construct your individual async streams, and I’ll reiterate a few of their gotchas that can assist you keep away from errors that I’ve needed to make previously.
Reviewing the older state of affairs
Whereas I gained’t clarify the previous method intimately, I feel it is sensible to go over the previous method to be able to refresh your thoughts. Or in case you weren’t aware of the previous method, it would assist put the enhancements in Swift 5.9 into perspective a bit extra.
Pre-Swift 5.9 we might create our AsyncStream
objects as follows:
let stream = AsyncStream(unfolding: {
return Int.random(in: 0..<Int.max)
})
The method proven right here is the only approach to construct an async stream but additionally the least versatile.
Briefly, the closure that we go to unfolding
right here will probably be known as each time we’re anticipated to asynchronously produce a brand new worth for our stream. As soon as the worth is produced, you come it in order that the for loop
iterating over this sequence can use the worth. To terminate your async stream, you come nil
out of your closure to point that there aren’t any additional values to be produced.
This method lacks some flexibility and doesn’t match very nicely for remodeling issues like delegate primarily based code over into Swift Concurrency.
A extra helpful and versatile approach to construct an AsyncStream
that may bridge a callback primarily based API like CLLocationManagerDelegate
appears to be like as follows:
class AsyncLocationStream: NSObject, CLLocationManagerDelegate {
lazy var stream: AsyncStream<CLLocation> = {
AsyncStream { (continuation: AsyncStream<CLLocation>.Continuation) -> Void in
self.continuation = continuation
}
}()
var continuation: AsyncStream<CLLocation>.Continuation?
func locationManager(_ supervisor: CLLocationManager, didUpdateLocations areas: [CLLocation]) {
for location in areas {
continuation?.yield(location)
}
}
}
This code does a bit bit greater than construct an async stream so let’s go over it in a bit extra element.
First, there’s a lazy var
that’s used to create an occasion of AsyncStream
. After we create the async stream, we go the AsyncStream
initializer a closure. This closure receives a continuation object that we will use to push values onto our AsyncStream
. As a result of we’re bridging a callback primarily based API we’d like entry to the continuation from outdoors of the preliminary closure so we assign the continuation to a var
on the AsyncLocationStream
object.
Subsequent, we now have the didUpdateLocations
delegate methodology. From that methodology, we name yield
on the continuation to push each obtained location onto our AsyncStream
which permits anyone that’s writing a for loop
over the stream
property to obtain areas. Right here’s what that would really like like in a simplified instance:
let locationStream = AsyncLocationStream()
for await worth in locationStream.stream {
print("location obtained", worth)
}
Whereas this all works completely effective, there’s this non-compulsory continuation
that we’re coping with. Fortunately, the brand new makeStream
method takes care of this.
Making a stream with makeStream
In essence, a makeStream
primarily based AsyncStream
works an identical to the one you noticed earlier.
We nonetheless work with a continuation that’s used to yield
values to whoever is iterating our stream. With a view to finish the stream we name end
on the continuation, and to deal with somebody cancelling their Activity
or breaking out of the for loop you possibly can nonetheless use onTermination
on the continuation to carry out cleanup. We’ll check out onTermination
within the subsequent part.
For now, let’s concentrate on seeing how makeStream
permits us to rewrite the instance you simply noticed to be a bit cleaner.
class AsyncLocationStream: NSObject, CLLocationManagerDelegate {
let stream: AsyncStream<CLLocation>
non-public let continuation: AsyncStream<CLLocation>.Continuation
override init() {
let (stream, continuation) = AsyncStream.makeStream(of: CLLocation.self)
self.stream = stream
self.continuation = continuation
tremendous.init()
}
func locationManager(_ supervisor: CLLocationManager, didUpdateLocations areas: [CLLocation]) {
for location in areas {
continuation.yield(location)
}
}
}
We’ve written a bit bit extra code than we had earlier than however the code we now have now’s barely cleaner and extra readable.
As a substitute of a lazy var
we will now outline two let
properties which inserts significantly better with what we’re attempting to do. Moreover, we create our AsyncStream
and its continuation in a single line of code as a substitute of needing a closure to carry the continuation from our closure onto our class.
Every thing else stays just about the identical. We nonetheless name yield
to push values onto our stream, and we nonetheless use end
to finish our continuation (we’re not calling that within the snippet above).
Whereas that is all very handy, AsyncStream.makeStream
comes with the identical reminiscence and lifecycle associated points as its older counterparts. Let’s take a short take a look at these points and the way to repair them within the subsequent part.
Avoiding reminiscence leaks and infinite loops
After we’re iterating an async sequence from inside a job, it’s cheap to count on that sooner or later the item we’re iterating goes out of scope and that our iteration stops.
For instance, if we’re leveraging the AsyncLocationStream
you noticed earlier than from inside a ViewModel
we’d need the placement updates to cease mechanically at any time when the display, its ViewModel
, and the AsyncLocationStream
exit of scope.
In actuality, these objects will exit of scope however any job that’s iterating the AsyncLocationStream
‘s stream
gained’t finish till the stream’s continuation is explicitly ended. I’ve explored this phenomenon extra in depth on this publish the place I dig into lifecycle administration for async sequences.
Let’s take a look at an instance that demonstrates this impact. We’ll take a look at a dummy LocationProvider
first.
class LocationProvider {
let areas: AsyncStream<UUID>
non-public let continuation: AsyncStream<UUID>.Continuation
non-public let cancellable: AnyCancellable?
init() {
let stream = AsyncStream.makeStream(of: UUID.self)
areas = stream.stream
continuation = stream.continuation
}
deinit {
print("location supplier is gone")
}
func startUpdates() {
cancellable = Timer.publish(each: 1.0, on: .fundamental, in: .frequent)
.autoconnect()
.sink(receiveValue: { [weak self] _ in
print("will ship")
self?.continuation.yield(UUID())
})
}
}
The item above creates an AsyncStream
identical to you noticed earlier than. After we name startUpdates
we begin simulating receiving location updates. Each second, we ship a brand new distinctive UUID
onto our stream.
To make the take a look at life like, I’ve added a MyViewModel
object that will usually function the interface in between the placement supplier and the view:
class MyViewModel {
let locationProvider = LocationProvider()
var areas: AsyncStream<UUID> {
locationProvider.areas
}
deinit {
print("view mannequin is gone")
}
init() {
locationProvider.startUpdates()
}
}
We’re not doing something particular on this code so let’s transfer on to creating the take a look at state of affairs itself:
var viewModel: MyViewModel? = MyViewModel()
let sampleTask = Activity {
guard let areas = viewModel?.areas else { return }
print("earlier than for loop")
for await location in areas {
print(location)
}
print("after for loop")
}
Activity {
attempt await Activity.sleep(for: .seconds(2))
viewModel = nil
}
In our take a look at, we arrange two duties. One which we’ll use to iterate over our AsyncStream
and we print some strings earlier than and after the loop.
Now we have a second job that runs in parallel. This job will wait for 2 seconds after which it units the viewModel
property to nil
. This simulates a display going away and the view mannequin being deallocated due to it.
Let’s take a look at the printed outcomes for this code:
earlier than for loop
will ship
B9BED2DE-B929-47A6-B47D-C28AD723FCB1
will ship
FCE7DAD1-D47C-4D03-81FD-42B0BA38F976
view mannequin is gone
location supplier is gone
Discover how we’re not seeing after the loop
printed right here.
Which means whereas the view mannequin and site supplier each get deallocated as anticipated, we’re not seeing the for loop finish like we’d need to.
To repair this, we have to ensure that we end
our continuation when the placement supplier is deallocated:
class LocationProvider {
// ...
deinit {
print("location supplier is gone")
continuation.end()
}
// ...
}
Within the deinit
for LocationProvider
we will name continuation.end()
which can repair the leak that we simply noticed. If we run the code once more, we’ll see the next output:
earlier than for loop
will ship
B3DE2994-E0E1-4397-B04E-448047315133
will ship
D790D3FA-FE40-4182-9F58-1FEC93335F18
view mannequin is gone
location supplier is gone
after for loop
In order that mounted our for loop sitting and ready for a worth that will by no means come (and our Activity
being caught eternally consequently). Nevertheless, we’re not out of the woods but. Let’s change the take a look at setup a bit bit. As a substitute of deallocating the view mannequin, let’s attempt cancelling the Activity
that we created to iterate the AsyncStream
.
var viewModel: MyViewModel? = MyViewModel()
let sampleTask = Activity {
guard let areas = viewModel?.areas else { return }
print("earlier than for loop")
for await location in areas {
print(location)
}
print("after for loop")
}
Activity {
attempt await Activity.sleep(for: .seconds(2))
sampleTask.cancel()
}
Working to code now ends in the next output:
earlier than for loop
will ship
0B6E962F-F2ED-4C33-8155-140DB94F3AE0
will ship
1E195613-2CE1-4763-80C4-590083E4353E
after for loop
will ship
will ship
will ship
will ship
So whereas our loop ended, the placement updates don’t cease. We will add an onTermination
closure to our continuation to be notified of an ended for loop (which occurs once you cancel a Activity
that’s iterating an async sequence):
class LocationProvider {
// ...
func startUpdates() {
cancellable = Timer.publish(each: 1.0, on: .fundamental, in: .frequent)
.autoconnect()
.sink(receiveValue: { [weak self] _ in
print("will ship")
self?.continuation.yield(UUID())
})
continuation.onTermination = { [weak self] _ in
self?.cancellable = nil
}
}
}
With this code in place, we will now deal with each a job getting cancelled in addition to our LocationProvider
being deallocated.
Everytime you’re writing your individual async streams it’s necessary that you simply take a look at what occurs when the proprietor of your continuation is deallocated (you’ll normally need to end your continuation) or when the for loop that iterates your stream is ended (you’ll need to carry out some cleanup as wanted).
Making errors right here is sort of straightforward so remember to preserve a watch out!
In Abstract
On this publish, you noticed the brand new and extra handy AsyncStream.makeStream
methodology in motion. You realized that this methodology replaces a much less handy AsyncStream
initializer that compelled us to manually retailer a continuation outdoors of the closure which might normally result in having a lazy var
for the stream and an non-compulsory for the continuation.
After exhibiting you ways you should utilize AsyncStream.makeStream
, you realized about among the gotchas that include async streams on the whole. I confirmed you how one can take a look at for these gotchas, and how one can repair them to ensure that your streams finish and clear up as and once you count on.