Iterating over internet socket messages with async / await in Swift – Donny Wals


Printed on: January 24, 2023

In iOS 13, we gained the flexibility to simply ship and obtain knowledge utilizing internet sockets by URLSession. With async/await, we gained the flexibility to fetch knowledge from servers utilizing the await key phrase and we are able to iterate over asynchronous sequences utilizing async for loops.

We are able to even learn knowledge from a URL one line at a time by calling the strains property on URL:

let url = URL(string: "https://donnywals.com")!

for attempt await line in url.strains {
    // use line
}

Whereas that is actually cool and permits us to construct apps that ingest knowledge in actual time if the server helps streaming our bodies, we can not use the strains property to arrange an online socket connection and hear for incoming messages and doubtlessly ship messages over the identical connection too.

On this put up, you’ll study every little thing it’s worthwhile to learn about constructing your individual mechanism to conveniently iterate over messages from an online socket asynchronously. We’ll leverage some present performance from URLSessionWebSocketTask and AsyncThrowingStream to construct our personal AsyncSequence that conveniently wraps our URLSessionWebSocketTask.

Notice that the ensuing code has solely had comparatively restricted testing performed so I can not assure that the supplied answer might be 100% right for every little thing you throw at it. In the event you discover any points with the ultimate code, be happy to contact me. Bonus factors if you happen to’re in a position to present some concepts for a possible repair.

Utilizing an online socket with out async / await

Earlier than we get began, let’s shortly assessment the best way to use an online socket with out async/await. The code particulars are outlined in this put up. Remember to learn it if you wish to study extra about utilizing internet sockets in your apps.


let url = URL(string: "ws://127.0.0.1:8080")!
let socketConnection = URLSession.shared.webSocketTask(with: url)
socketConnection.resume()

func setReceiveHandler() {
    socketConnection.obtain { end in
        defer { self.setReceiveHandler() }

        do {
            let message = attempt end result.get()
            swap message {
            case let .string(string):
                print(string)
            case let .knowledge(knowledge):
                print(knowledge)
            @unknown default:
                print("unkown message acquired")
            }
        } catch {
            // deal with the error
            print(error)
        }
    }
}

setReceiveHandler()

Discover how, to obtain messages from the socket, I need to name obtain with a completion handler. This technique solely permits me to obtain a single incoming message, so I need to re-set my handler after receiving a message to routinely start listening for the following message.

It is a nice instance of a scenario the place an async for loop similar to for attempt await message in socketConnection would make plenty of sense. Sadly, this isn’t attainable out of the field. Nevertheless, URLSessionWebSocketTask offers some type of assist for async / await so we’re not fully out of luck.

A primary implementation of internet sockets with async / await

Whereas URLSessionWebSocketTask doesn’t expose an AsyncSequence that emits incoming messages out of the field, it does include an async model of the obtain technique you noticed earlier.

This enables us to rewrite the instance above as an async technique as follows:

func setReceiveHandler() async {
    do {
        let message = attempt await socketConnection.obtain()

        swap message {
        case let .string(string):
          print(string)
        case let .knowledge(knowledge):
          print(knowledge)
        @unknown default:
          print("unkown message acquired")
        }
    } catch {
        print(error)
    }

    await setReceiveHandler()
}

This code works simply nice, besides we don’t actually have a method to cease the recursion right here. The code you noticed earlier really has the very same difficulty; there’s no situation to cease listening for internet socket messages even when the net socket connection has already been closed.

We may enhance our code by solely recursing if:

  1. We didn’t encounter any errors
  2. The socket connection remains to be lively

This might look a bit as follows:

func setReceiveHandler() async {
    guard socketConnection.closeCode == .invalid else {
        return
    }

    do {
        let message = attempt await socketConnection.obtain()

        swap message {
        case let .string(string):
          print(string)
        case let .knowledge(knowledge):
          print(knowledge)
        @unknown default:
          print("unkown message acquired")
        }

        await setReceiveHandler()
    } catch {
        print(error)
    }
}

An open internet socket’s closed code is at all times stated to invalid to sign that the connection has not (but) been closed. We are able to leverage this to examine that our connection remains to be lively earlier than ready for the following message to be acquired.

That is significantly better already as a result of we respect closed sockets and failures a lot nicer now, however we may enhance the readability of this code a tiny bit by leveraging a whereas loop as an alternative of recursively calling the setReceiveHandler perform:

func setReceiveHandler() async {
    var isActive = true

    whereas isActive && socketConnection.closeCode == .invalid {
        do {
            let message = attempt await socketConnection.obtain()

            swap message {
            case let .string(string):
              print(string)
            case let .knowledge(knowledge):
              print(knowledge)
            @unknown default:
              print("unkown message acquired")
            }
        } catch {
            print(error)
            isActive = false
        }
    }
}

To me, this model of the code is barely simpler to learn however which may not be the case for you. It’s functionally equal so you possibly can select to make use of whichever choice fits you finest.

Whereas this code works, I’m not fairly proud of the place we’ve landed proper now. There’s plenty of logic on this perform and I would favor to separate dealing with the incoming values from the calls to socketConnection.obtain() by some means. Ideally, I ought to have the ability to write the next:

do {
    for attempt await message in socketConnection {
        swap message {
        case let .string(string):
            print(string)
        case let .knowledge(knowledge):
            print(knowledge)
        @unknown default:
            print("unkown message acquired")
      }
} catch {
    // deal with error
}

That is a lot, a lot nicer from a call-site perspective and it will permit us to place the ugly bits elsewhere.

To do that, we are able to leverage the ability of AsyncStream which permits us to construct a customized async sequence of values.

Utilizing AsyncStream to emit internet socket messages

Given our finish aim, there are a couple of methods for us to get the place we wish to be. The simplest method could be to put in writing a perform in an extension on URLSessionWebSocketTask that may encapsulate the whereas loop you noticed earlier. This implementation would look as follows:

typealias WebSocketStream = AsyncThrowingStream<URLSessionWebSocketTask.Message, Error>

public extension URLSessionWebSocketTask {    
    var stream: WebSocketStream {
        return WebSocketStream { continuation in
            Job {
                var isAlive = true

                whereas isAlive && closeCode == .invalid {
                    do {
                        let worth = attempt await obtain()
                        continuation.yield(worth)
                    } catch {
                        continuation.end(throwing: error)
                        isAlive = false
                    }
                }
            }
        }
    }
}

To make the code somewhat bit simpler to learn, I’ve outlined a typealias for my AsyncThrowingStream so we don’t have to have a look at the identical lengthy sort signature in every single place.

The code above creates an occasion of AsyncThrowingStream that asynchronously awaits new values from the net socket so long as the net socket is taken into account lively and hasn’t been closed. To emit incoming messages and potential errors, the continuation’s yield and end strategies are used. These strategies will both emit a brand new worth (yield) or finish the stream of values with an error (end).

This code works nice in lots of conditions, however there’s one difficulty. If we resolve to shut the net socket connection from the app’s facet by calling cancel(with:purpose:) on our socketConnection, our WebSocketStream doesn’t finish. As an alternative, it is going to be caught ready for messages, and the decision web site might be caught too.

Job {
    attempt await Job.sleep(for: .seconds(5))
    attempt await socketConnection.cancel(with: .goingAway, purpose: nil)
}

Job {    
    do {
        for attempt await message in socketConnection.stream {
            // deal with incoming messages
        }
    } catch {
        // deal with error
    }

    print("this might by no means be printed")
}

If every little thing works as anticipated, our internet socket connection will shut after 5 seconds. At that time, our for loop ought to finish and our print assertion ought to execute, because the asynchronous stream is now not lively. Sadly, this isn’t the case, so we have to discover a higher option to mannequin our stream.

URLSessionWebSocketTask doesn’t present a method for us to detect cancellation. So, I’ve discovered that it’s best to make use of an object that wraps the URLSessionWebSocketTask, and to cancel the duty by that object. This enables us to each finish the async stream we’re offering to callers and shut the net socket reference to one technique name.

Right here’s what that object appears like:

class SocketStream: AsyncSequence {
    typealias AsyncIterator = WebSocketStream.Iterator
    typealias Factor = URLSessionWebSocketTask.Message

    non-public var continuation: WebSocketStream.Continuation?
    non-public let process: URLSessionWebSocketTask

    non-public lazy var stream: WebSocketStream = {
        return WebSocketStream { continuation in
            self.continuation = continuation

            Job {
                var isAlive = true

                whereas isAlive && process.closeCode == .invalid {
                    do {
                        let worth = attempt await process.obtain()
                        continuation.yield(worth)
                    } catch {
                        continuation.end(throwing: error)
                        isAlive = false
                    }
                }
            }
        }
    }()

    init(process: URLSessionWebSocketTask) {
        self.process = process
        process.resume()
    }

    deinit {
        continuation?.end()
    }

    func makeAsyncIterator() -> AsyncIterator {
        return stream.makeAsyncIterator()
    }

    func cancel() async throws {
        process.cancel(with: .goingAway, purpose: nil)
        continuation?.end()
    }
}

There’s a bunch of code right here, but it surely’s not too dangerous. The primary few strains are all about establishing some sort aliases and properties for comfort. The lazy var stream is basically the very same code that you just’ve already within the URLSessionWebSocketTask extension from earlier than.

When our SocketStream‘s deinit is named we make it possible for we finish our stream. There’s additionally a cancel technique that closes the socket connection in addition to the stream. As a result of SocketStream conforms to AsyncSequence we should present an Iterator object that’s used once we attempt to iterate over our SocketStreams. We merely ask our inside stream object to make an iterator and use that as our return worth.

Utilizing the code above appears as follows:

let url = URL(string: "ws://127.0.0.1:8080")!
let socketConnection = URLSession.shared.webSocketTask(with: url)
let stream = SocketStream(process: socketConnection)

Job {  
    do {
        for attempt await message in stream {
            // deal with incoming messages
        }
    } catch {
        // deal with error
    }

    print("this might be printed as soon as the stream ends")
}

To cancel our stream after 5 seconds identical to earlier than, you possibly can run the next process in parallel with our iterating process:

Job {
    attempt await Job.sleep(for: .seconds(5))
    attempt await stream.cancel()
}

Job {
    // iterate...
}

Whereas that is fairly cool, we do have a little bit of a difficulty right here on older iOS variations due to the next little bit of code. By older I imply pre-iOS 17.0.

In the event you’re targetting iOS 17 or newer you possibly can ignore this subsequent half

non-public lazy var stream: WebSocketStream = {
    return WebSocketStream { continuation in
        self.continuation = continuation

        Job {
            var isAlive = true

            whereas isAlive && process.closeCode == .invalid {
                do {
                    let worth = attempt await process.obtain()
                    continuation.yield(worth)
                } catch {
                    continuation.end(throwing: error)
                    isAlive = false
                }
            }
        }
    }
}()

The duty that we run our whereas loop in received’t finish except we finish our stream from inside our catch block. If we manually shut the net socket connection utilizing the cancel technique we write earlier, the decision to obtain() won’t ever obtain an error nor a worth which signifies that it is going to be caught without end. This was fastened in iOS 17 however remains to be an issue in older iOS variations.

Probably the most dependable option to repair that is to return to the callback based mostly model of obtain to drive your async stream:

non-public lazy var stream: WebSocketStream = {
    return WebSocketStream { continuation in
        self.continuation = continuation
        waitForNextValue()
    }
}()

non-public func waitForNextValue() {
    guard process.closeCode == .invalid else {
        continuation?.end()
        return
    }

    process.obtain(completionHandler: { [weak self] end in
        guard let continuation = self?.continuation else {
            return
        }

        do {
            let message = attempt end result.get()
            continuation.yield(message)
            self?.waitForNextValue()
        } catch {
            continuation.end(throwing: error)
        }
    })
}

With this strategy we don’t have any lingering duties, and our name web site is as clear and concise as ever; we’ve solely modified a few of our inside logic.

In Abstract

Swift Concurrency offers many helpful options for writing higher code, and Apple shortly adopted async / await for present APIs. Nevertheless, some APIs that may be helpful are lacking, similar to iterating over internet socket messages.

On this put up, you realized the best way to use async streams to create an async sequence that emits internet socket messages. You first noticed a completely async / await model that was neat, however had reminiscence and process lifecycle points. Then, you noticed a model that mixes a callback-based strategy with the async stream.

The result’s a straightforward option to iterate over incoming internet socket messages with async / await. When you’ve got any questions, feedback, or enhancements for this put up, please do not hesitate to achieve out to me on Twitter.

Recent Articles

Related Stories

Leave A Reply

Please enter your comment!
Please enter your name here

Stay on op - Ge the daily news in your inbox