Last active
June 15, 2020 11:36
-
-
Save Abizern/b7c44e62eeb85b2268881a64a2a7e542 to your computer and use it in GitHub Desktop.
Recursive publisher
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// I needed to fetch all the pages from a paged endpoint. | |
// In this specific case, the JSON results contained a `pagingStatus` section that provided extra information which I could use: | |
// Hiding that behind a protocol: | |
import Foundation | |
protocol PagedReturning { | |
var pagingStatus: PagingStatus { get } | |
} | |
/// Dictionary that is returned within the response for objects that uses a paginated request. | |
struct PagingStatus: Decodable { | |
let pageNumber: Int | |
let pageSize: Int | |
let pageElements: Int | |
let totalElements: Int | |
let totalPages: Int | |
} | |
extension PagingStatus { | |
var isLastPage: Bool { | |
pageNumber == totalPages - 1 // 0 indexed page numbers | |
} | |
var nextPage: Int? { | |
guard !isLastPage else { return nil } | |
return pageNumber + 1 | |
} | |
} | |
// I wrote a custom publisher that took an Endpoint (A simplified version of https://github.com/objcio/tiny-networking) | |
// All the work went into the Subscriber. On each page it checks the PagingResponse to see if there is a next page, if there is it sends another request, otherwise it emits a completion. | |
import Foundation | |
import Combine | |
struct PagedNetworkPublisher<Response, Output> where Response: Decodable & PagedReturning { | |
typealias Failure = Error | |
let endpoint: (Int) -> Endpoint<Response> | |
let output: (Response, Int, Int) -> Output | |
} | |
extension PagedNetworkPublisher { | |
class Subscription<S> where S: Subscriber, Failure == S.Failure, Output == S.Input { | |
let endpoint: (Int) -> Endpoint<Response> | |
let output: (Response, Int, Int) -> Output | |
let subscriber: S | |
var cancellable: AnyCancellable? | |
init(endpoint: @escaping (Int) -> Endpoint<Response>, output: @escaping (Response, Int, Int) -> Output, subscriber: S) { | |
self.endpoint = endpoint | |
self.output = output | |
self.subscriber = subscriber | |
} | |
} | |
} | |
extension PagedNetworkPublisher.Subscription: Combine.Subscription { | |
func request(_ demand: Subscribers.Demand) { | |
fetch(page: 0) | |
} | |
func cancel() { | |
cancellable?.cancel() | |
} | |
var combineIdentifier: CombineIdentifier { | |
CombineIdentifier() | |
} | |
private func fetch(page: Int) { | |
print("Page: \(page)") | |
cancellable = NetworkCommunicator() | |
.load(endpoint(page)) | |
.sink(receiveCompletion: { completion in | |
print(completion) | |
}, receiveValue: { response in | |
_ = self.subscriber.receive(self.output(response, page, response.pagingStatus.pageSize)) | |
guard let next = response.pagingStatus.nextPage else { | |
self.subscriber.receive(completion: .finished) | |
return | |
} | |
self.fetch(page: next) | |
}) | |
} | |
} | |
extension PagedNetworkPublisher: Publisher { | |
func receive<S>(subscriber: S) where S: Subscriber, Failure == S.Failure, Output == S.Input { | |
let subscription = Subscription(endpoint: endpoint, output: output, subscriber: subscriber) | |
subscriber.receive(subscription: subscription) | |
} | |
} | |
// USAGE! | |
struct ProductsService { | |
private static let networkCommunicator = NetworkCommunicator() | |
/// Load and store all products from a collection | |
/// - Parameter collectionId: The ID of the collection to fetch | |
@discardableResult | |
static func loadProducts(collectionId: String) -> AnyPublisher<(), Error> { | |
let context = DataEnvironment.shared.container.newBackgroundContext() | |
context.mergePolicy = NSMergeByPropertyObjectTrumpMergePolicy | |
return PagedNetworkPublisher(endpoint: { (page) in | |
PagedProductResponse.endpoint(collectionId: collectionId, page: page) | |
}) { (response, pageNumber, pageSize) in | |
let productResponses = response.products | |
context.performAndWait { | |
let collection = DesignerCollection.findOrFetch(in: context, matching: DesignerCollection.predicateFor(collectionId: collectionId)) | |
productResponses.enumerated().forEach { index, response in | |
let sortIndex = (pageNumber * pageSize) + index | |
if let product = Product.findOrFetch(in: context, matching: Product.predicateFor(productId: response.id)) { | |
let product = product.configure(with: response, regionId: -1, sortIndex: sortIndex) | |
product.designerCollection = collection | |
} else { | |
let product = Product.insert(into: context, response: response, regionId: -1, sortIndex: sortIndex) | |
product.designerCollection = collection | |
} | |
} | |
do { | |
try context.save() | |
} catch { | |
loggingPrint(error) | |
} | |
} | |
} | |
.eraseToAnyPublisher() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment