Synchronising Combine's Publishers for easy testing

I've been working full-time with RxSwift for the past three years, and it's been a blast! I just love everything about it, the concept, the flexibility of the "reactive functional programming" paradigm and the community behind it.

Luckily Apple's decided to join in and released Combine, a framework that also offers the same concept. It's very similar to RxSwift, as you can see in this cheat sheet, which encouraged me to toy with it and SwiftUI every now and then.

One of the very common testing helpers present in the RxSwift community is the RxBlocking framework, which provides a simple way of synchronising the observable events and wait for them to complete before evaluating the following lines. This is very handy when all you need is to wait for some values in a stream. For example:

let value = try Observable.just(42).toBlocking().first()
XCTAssertEqual(value, 42)

An observable that emits the value 42 and completes is created, toBlocking() ensures to wait until it completes, and first() grabs the first element after the completion. Easy!

It's time to Combine!

Wouldn't it be great if we had the same for Combine? Well, we can easily create an extension for that!

In our unit tests target, let's create a new helper file and add this extension:

import Combine

extension Publisher {
    func waitForCompletion(timeout: TimeInterval = 1.0) throws -> [Output] {
        let expectation = XCTestExpectation(description: "wait for completion")
        var completion: Subscribers.Completion<Failure>?
        var output = [Output]()

        let subscription = self.collect()
            .sink(receiveCompletion: { receiveCompletion in
                completion = receiveCompletion
                expectation.fulfill()
            }, receiveValue: { value in
                output = value
            })

        XCTWaiter().wait(for: [expectation], timeout: timeout)
        subscription.cancel()

        switch try XCTUnwrap(completion) {
        case let .failure(error):
            throw error
        case .finished:
            return output
        }
    }
}

We create an expectation which will only fulfil when the publisher completes. We wait for this expectation to fulfil up to the given timeout. If the timeout is triggered, completion will still be nil and XCTUnwrap in the switch will throw an error.

If it does complete, it returns the output when finished or throws an error if the Publisher threw an error too.

Note that it's important to keep a Cancellable reference to the sink's subscription, otherwise it won't continue to listen to any future event. We cancel the subscription after the wait, so Xcode doesn't show a warning for unused variable, but it wouldn't be necessary since that subscription would be automatically deallocated/cancelled once the function returns.

This helper's usage is as simple as the example below:

import XCTest
import Combine

class testTests: XCTestCase {
    func testArray() throws {
        let array = [1, 2, 3]
        // Creates a publisher that emits an output and completes
        let result = try array.publisher.waitForCompletion()
        XCTAssertEqual(result, [1, 2, 3])
    }
}

You need to be careful when using this helper in publishers that don't complete, as in the example below:

func testCurrentValueSubject() throws {
    let current = CurrentValueSubject<Int, Never>(0)
    current.value = 2

    let result = try current.waitForCompletion()
    XCTAssertEqual(result, [0, 2])
}

This test will fail because CurrentValueSubject never completes, so the timeout will be activated and the XCTUnwrap function we saw before will raise an error. In order to test a scenario like that you either need to ensure the Publisher completes, or you can manipulate the operator to take just a few elements from it. That's possible using the prefix operator. As its documentation states, it republishes elements up to a certain length and then completes.

Updating the code as shown below, only the first two elements will be republished and it'll complete, making our test pass.

let result = try current.prefix(2).waitForCompletion()

One small tweak

If you ran the failing test before our fix, you may have noticed that the failure shown on Xcode was pointing to our helper method. This isn't very helpful when running hundreds of tests, since we can easily lose track of which test failed, etc.

Thankfully XCTest functions take as arguments the file and line which are being evaluated. They're automatically set by default by the #file and #line literals.

Look at XCTest's XCTUnwrap method definition:

/// - Parameters:
///   - expression: An expression of type `T?` to compare against `nil`. Its type will determine the type of the returned value.
///   - message: An optional description of the failure.
///   - file: The file in which failure occurred. Defaults to the file name of the test case in which this function was called.
///   - line: The line number on which failure occurred. Defaults to the line number on which this function was called.
/// - Returns: A value of type `T`, the result of evaluating and unwrapping the given `expression`.
/// - Throws: An error when `expression == nil`. It will also rethrow any error thrown while evaluating the given expression.
public func XCTUnwrap<T>(_ expression: @autoclosure () throws -> T?, _ message: @autoclosure () -> String = "", file: StaticString = #file, line: UInt = #line) throws ->

To ensure the failures point to the correct failing tests, all we need to do is to forward the file and line arguments to our helper. It looks like this:

extension Publisher {
    func waitForCompletion(timeout: TimeInterval = 1.0, file: StaticString = #file, line: UInt = #line) throws -> [Output] {
        let expectation = XCTestExpectation(description: "wait for completion")
        var completion: Subscribers.Completion<Failure>?
        var output = [Output]()

        let subscription = self.collect()
            .sink(receiveCompletion: { receiveCompletion in
                completion = receiveCompletion
                expectation.fulfill()
            }, receiveValue: { value in
                output = value
            })

        XCTWaiter().wait(for: [expectation], timeout: timeout)
        subscription.cancel()

        // We're also including a more meaningful error here!
        switch try XCTUnwrap(completion, "Publisher never completed", file: file, line: line) {
        case let .failure(error):
            throw error
        case .finished:
            return output
        }
    }
}

What's great about this is that, even though the method signature is more verbose, it doesn't change the usage at all, since the values are set by default. Now Xcode will point out exactly to the correct failing test for us!

More extensions

Now you go on and add more helpers to make testing more pleasant and ergonomic for you. An easy one is an equivalent to RxSwift's toBlocking().first() helper:

extension Publisher {
    func waitForFirstOutput(timeout: TimeInterval = 1.0, file: StaticString = #file, line: UInt = #line) throws -> Output {
        return try XCTUnwrap(prefix(1).waitForCompletion(file: file, line: line).first, "", file: file, line: line)
    }
}

In the test below it'll return 2 since the subject already has that value when the helper was executed:

func testCurrentValueSubject() throws {
    let current = CurrentValueSubject<Int, Never>(0)
    current.value = 2

    let result = try current.waitForFirstOutput()
    XCTAssertEqual(result, 2)
}

Conclusion

I hope you liked this post! The code is available in this gist on my Github.

Thanks for reading and let me know what you think. Feel free to message me up on Twitter too!

No Comments Yet