StanfordSpezi / SpeziHealthKit

HealthKit module of the Stanford Spezi framework
https://swiftpackageindex.com/StanfordSpezi/SpeziHealthKit/documentation/
MIT License
17 stars 4 forks source link

Better Support for Batch Uploads of Historical HealthKit Data #17

Open mjoerke opened 6 months ago

mjoerke commented 6 months ago

Problem

Our application relies on uploading 3 months of historical HealthKit data to our Firebase. In our application delegate, we have the following code

class PrismaDelegate: SpeziAppDelegate {
    override var configuration: Configuration {
        Configuration(standard: PrismaStandard()) {
            ... // Firebase configuration
            if HKHealthStore.isHealthDataAvailable() {
                healthKit
            }
            ... // other module initialization
        }
    }
    ...
    private var healthKit: HealthKit {
        HealthKit {
            CollectSamples(
                [
                    HKQuantityType(.stepCount),
                    ... // other sample types
                ],
                predicate: HKQuery.predicateForSamples(
                    withStart: Calendar.current.date(byAdding: .month, value: -3, to: .now),
                    end: nil,
                    options: .strictEndDate
                ),
                deliverySetting: .anchorQuery(.afterAuthorizationAndApplicationWillLaunch)
            )
        }
    }

While this code largely works, we have encountered several issues related to batch historical data uploads:

  1. Firebase rate limiting

We occasionally encounter the following error,

[FirebaseFirestore][I-FST000001] WriteStream (11df9e618) Stream error: 'Resource exhausted: Write stream exhausted maximum allowed queued writes.'

which I believe is due to Firebase rate limiting.

If you index a field that increases or decreases sequentially between documents in a collection, like a timestamp, then the maximum write rate to the collection is 500 writes per second.

This is triggered in our application because we index on a timestamp field.

  1. Missing entries (potentially due to rate limiting)

The data in our Firebase is occasionally missing entries, which I suspect is due to the WriteStream error. For example, below I compare cumulative daily step count measured on an Apple Watch directly from the Apple Health app vs. computed from our Firestore entries (filtering out step count measurements from an iPhone). There are several days on which step count is slightly undercounted (bolded).

Date Health App Firebase
Feb 11 11317 10457
Feb 12 8497 8497
Feb 13 13140 13140
Feb 14 9160 8942
Feb 15 9679 9617
Feb 16 15084 15084
Feb 17 6209 6209
Feb 18 7432 7058
  1. Batch upload status is not communicated to the user

Three months of historical data is large (on the order of 1-3GB when measured across several test users) and this upload can take several hours to complete. I am not sure if it is possible to improve this upload speed in light of rate limiting. Regardless, the app stops uploading data when it enters into the background and this is not communicated in the UI. It would be highly desirable to have a progress bar indicating upload status and/or have a local notifications trigger when the application enters the background with instructions for the user. This is a design pattern employed by many other iOS applications (e.g., Spotify, Google Drive) for long-running network tasks.

Solution

I have briefly chatted with @PSchmiedmayer about possible solution strategies. We spoke about adding additional functionality for batch uploads to the SpeziHealthKit module by

@PSchmiedmayer feel free to chime in here with more implementation-specific details or if I missed something!

Additional context

No response

Code of Conduct

PSchmiedmayer commented 6 months ago

@mjoerke Thank you for the summary and overview!

I agree with the approach, and thank you for writing this down into the issue.

As an additional context: I would see this functionality live in two different places:

  1. SpeziHealthKit: Offers a convenient type that allows you to do batch uploads in a way that it would hand off the work to whatever cloud provider or custom logic.
  2. This logic (in our case SpeziFirebase) is responsible for the rate-limiting + checking method as this is specific to this cloud provider.

The sync between these methods is probably an async method that allows the HealthKit module to wait for the upload process to complete and a technique that is passed in there that allows a cloud provider to pull a new sample, process it, and report success back to the HealthKit module to ensure that this is always tracked even if, e.g., the app is terminated.

In addition to that, We should demonstrate some caching functionality in the Spezi Template app that catches errors in a more sophisticated way than just printing an error. This might be a bigger issue, but I could see a "Cache/Storage/Persistence" module that allows one to keep track of such a state and process this step by step to avoid any data loss along the way.

These are my first thoughts on this; I will do a deeper dive later this week 🚀

mjoerke commented 6 months ago

Thank you @PSchmiedmayer! Let me know if there's any place I can contribute

PSchmiedmayer commented 5 months ago

@mjoerke @bryant-jimenez Here are some additional thoughts when we tackle this issue:

I would suggest a BulkUpload type that conforms to HealthKitDataSourceDescription to be incorporated in the basic setup in SpeziHealthKit to ensure that we ask for authorization and can trigger the upload manually, e.g., using a button.

I think it makes a lot of sense to use an anchored query (we already have good extensions or this) and then use it in combination with the limit parameter. We should make this bulk size configurable. The anchor should only be advanced and a new query executed once the upload of the previous bulk has been successful.

Once you get sample in the bulk upload I would suggest to create a Task Group to parallelize the upload up to a limit (e.g. 10 concurrent tasks) but we can start with a sequential upload. The upload can then be handled in the Standard of an an app and any backoff mechanisms should be implemented there (e.g. wait a specific amount after each upload). We can add a new Standard Constraint as we already have for the normal sample upload that would then be implemented in the app that will use this feature.

mjoerke commented 5 months ago

@PSchmiedmayer This all sounds sensible to me. Thank you for the detailed reply! To make more specific where this code would live and how it would be called, are you imagining usage that looks something like this?

HealthKit {
    CollectSample(
        HKQuantityType(.stepCount),
        deliverySetting: .anchorQuery(.afterAuthorizationAndApplicationWillLaunch)
    ),
    BulkUpload(
        HKQuantityType(.stepCount),
        predicate: HKQuery.predicateForSamples(
            withStart: Calendar.current.date(byAdding: .month, value: -6, to: .now),
            end: nil,
            options: .strictEndDate
        ),
        deliverySetting: .anchorQuery(.afterAuthorizationAndApplicationWillLaunch),
        limit: 100 // how many samples to include in each anchor query batch
    )
}

In this case, would you recommend creating Sources/SpeziHealthKit/BulkUpload/BulkUpload.swift and follow the pattern employed in CollectSamples.swift and HealthKitSampleDataSource.swift?

I also have a separate question that came up while reading the SpeziHealthKit documentation – how exactly does setting HealthKitDeliverySetting to .background work? My assumption here is that Apple allows applications to read in HealthKit data in the background, but long-running network upload tasks will require the app to be in the foreground?

PSchmiedmayer commented 5 months ago

@mjoerke Thanks for sharing the first draft of the API surface.

I would suggest the following modifications:

  1. The delivery setting should probably just be .automatic or .manual. We will always want to run Bulk uploads to use anchor queries and background support is probably not going to work here as it is for waking up the application if new data is coming in so we can use .automatic (once the user gave permissions to access the data) or .manual if the user calls the manual delivery methods.
  2. I would suggest to rename limit to bulkSize, might be more descriptive and limit is more anchor query specific which we might not want to explore here on this API level.

Yes, following these examples is the way to go 👍

Regarding background delivery: We rely on the HealthKit background upload functionality. This wakes up the app and allows you to perform some operations for a few seconds. This includes network and e.g. Firebase interaction and our Template app is setup in a way that it does this upload in the background without the user even having to open the app. We are fully at the mercy of Apple when they schedule the application wake up but we made the experience that this happens around once an hour as documented in their docs: https://developer.apple.com/documentation/healthkit/hkhealthstore/1614175-enablebackgrounddelivery

mjoerke commented 5 months ago

Thank you for the suggestions and clarifications! @bryant-jimenez do you have any additional questions? (and I'm sure we will have more questions once we start building things)

Re background delivery: I think this makes sense to me – HealthKit background uploading is sufficient for periodic uploads as data comes in real-time, but insufficient for large batch uploads of historical data, which will require the user to keep the application in the foreground?

bryant-jimenez commented 5 months ago

This all makes sense to me so far! Questions will come up as we implement, but at a high level I agree with all of the suggestions written here.

PSchmiedmayer commented 5 months ago

@mjoerke Yes, the background upload functionally only informs the app about new datapoints in HealthKit and wakes up the app to respond to them. This is not designed for background tasks, Apple has different and limited APIs for that.

bryant-jimenez commented 5 months ago

@PSchmiedmayer Hi Paul, I would like to run through my plan for an initial implementation attempt at this:

Currently, my understanding is that the anchoredSingleObjectQuery() function that is used in HealthKitSampleDataSource.swift is what we will be modeling our bulk upload functionality off of, since this function is structured off of using the HKAnchoredQueryDescriptor type to actually build and execute the query while also saving the anchor returned. This query will then be ran in batches of bulkSize which we pass in according to Matt's initial code stubs in BulkUpload.swift.

I created a file BulkUploadSampleDataSource.swift (linked here), which is modeled off of HealthKitSampleDataSource.swift, where I will have similar functionality, except I will only want to run the function for bulk upload/batch querying with the delivery settings .automatic and .manual. What i'm imagining is that the function will look like the following:

    private func anchoredBulkUploadQuery() async throws {

        try await healthStore.requestAuthorization(toShare: [], read: [sampleType])

        // create an anchor descriptor that reads a data batch of the defined bulkSize
        var anchorDescriptor = HKAnchoredObjectQueryDescriptor(
            predicates: [
                .sample(type: sampleType, predicate: predicate)
            ],
            anchor: anchor,
            limit: bulkSize
        )

        // run query at least once
        var result = try await anchorDescriptor.result(for: healthStore)

        // continue reading bulkSize batches of data until theres no new data
        // to do: parallelize this
        repeat {
            for deletedObject in result.deletedObjects {
                await standard.remove(sample: deletedObject)
            }

            for addedSample in result.addedSamples {
                await standard.add(sample: addedSample)
            }

            // advance the anchor
            let newAnchor = result.newAnchor
            anchor = newAnchor

            anchorDescriptor = HKAnchoredObjectQueryDescriptor(
                predicates: [
                    .sample(type: sampleType, predicate: predicate)
                ],
                anchor: anchor,
                limit: bulkSize
            )
            result = try await anchorDescriptor.result(for: healthStore)

        } while (result.addedSamples != []) && (result.deletedObjects != [])
    }

Please feel free to let me know your thoughts on this implementation plan - I also had a couple of questions below on modifications that we'll need to make for this to work and also questions for my own understanding:

@mjoerke Feel free to chime in as well with any thoughts, ideas, or suggestions!

mjoerke commented 5 months ago

@bryant-jimenez Thank you for putting this together! I'll take a first stab at answering these questions, but @PSchmiedmayer is definitely the expert here.

Delivery Settings In my BulkUpload class spec, I used HealthKitDeliveryStartSetting (which is either .automatic or .manual) as input to the initializer rather than HealthKitDeliverySetting, since we will always be using an .anchorQuery type. I believe the switch statement should only focus on these two cases of anchored queries (see here for an example).

New Add/Remove Functions My guess is that we will need another function for processing bulk uploads, since add/remove only take a single HKSample as input. The backoff timing depends on batches of data (e.g., Firebase rate limits at 500 uploads/sec). The alternative would be to add something like sleep(1/500) in the add function, but this seems less than ideal since it will affect regular/non-bulk uploads too and still might not properly rate limit if we process uploads in parallel.

I am unsure on:

PSchmiedmayer commented 5 months ago

@bryant-jimenez Happy to provide some input; @mjoerke correctly answered the delivery setting and the add/remove function, thank you!

As noted in my last comment, we will need to:

[...] add a new Standard Constraint as we already have for the normal sample upload that would then be implemented in the app that will use this feature.

I would suggest that this function is async and that the Standard uses Task.sleep to back off or slow down the computation time. Swift concurrency will be smart enough to resume other work in the meantime.

A few further comments about BulkUploadSampleDataSource.swift and your example:

  1. The overall approach looks good.
  2. We will need to have a different prefix for these anchors so they don't collide with the main HealthKit anchors
  3. Yes, we only need to focus on our core functionality, all elements form anchoredContinuousObjectQuery and even the fact that only use HealthKitDeliveryStartSetting and not HealthKitDeliverySetting as we only have one use case would be the way to go.
  4. I would suggest that we change the signature of the add function for our new constraint to get a collection of samples (and maybe even the deleted samples in the same call as a second parameter so it can be accounted for in the rate limiting) and then allow the function to determine how long it wants to take for the bulk upload (or even to ahead and upload everything in a single request if the server supports it).
  5. I think there is no need for the newAnchor property there?
  6. I don't think there is a need to parallelizing this in the first implementation, as long as the bulks are arriving in sequence and we store the anchor for a probable retry we should be good.

Hope these insights already help @bryant-jimenez 🚀

bryant-jimenez commented 5 months ago

Thanks for the feedback @PSchmiedmayer ! The need for the new Standard constraint makes sense - I just had a couple of follow up questions:

Thank you!

mjoerke commented 5 months ago

@bryant-jimenez

mjoerke commented 5 months ago

@PSchmiedmayer We're running into an issue with protocol conformance (see here)

The problem is that the standard argument to BulkUpload.dataSources needs to conform to the BulkUploadConstraint protocol (which includes add_bulk), but when we specify the function signature as

public func dataSources(healthStore: HKHealthStore, standard: any BulkUploadConstraint)

BulkUpload no longer conforms to HealthKitDataSourceDescription, which expects

public func dataSources(healthStore: HKHealthStore, standard: any HealthKitConstraint)

even though BulkUploadConstraint inherits from HealthKitConstraint.

I have a fix here that checks whether the standard conforms to the BulkUploadConstraint at runtime, but I wanted to double check with you whether this solution is preferred.

An alternative would be to more generic protocols (which could be verified at compile-time), but this might require more changes to SpeziHealthKit code

PSchmiedmayer commented 5 months ago

@mjoerke @bryant-jimenez Yes, I think we won't really have a good solution here except for relying on one more runtime check. At this point the Spezi framework throws an error at startup if your standard does not conform to HealthKitConstraint. Instead of letting Spezi throw the error we will need to defer that to the two (or in the future more) HealthKitDataSourceDescription instances.

I would suggest to

  1. Change the Standard injection in HealthKit to @ObservationIgnored @StandardActor private var standard: any Standard
  2. Adjust the HealthKitDataSourceDescription protocol requirement to: func dataSources(healthStore: HKHealthStore, standard: any Standard) -> [HealthKitDataSource]
  3. Throw an appropriate error message in the HealthKitDataSourceDescriptions, e.g. like this:
    public func dataSources(
        healthStore: HKHealthStore,
        standard: any Standard
    ) -> [any HealthKitDataSource] {
        guard let healthKitConstraint = standard as? any HealthKitConstraint else {
            preconditionFailure(
                """
                ...
                """
            )
        }
        // ...
    }

This is the error that we currently throw within Spezi for use cases like this: https://github.com/StanfordSpezi/Spezi/blob/c43e4fa3d3938a847de2b677091a34ddaea5bc76/Sources/Spezi/Standard/StandardPropertyWrapper.swift#L41

Would also give us the advantage that an app that only uses bulk upload technically does not longer have to conform to HealthKitConstraint 🚀

mjoerke commented 5 months ago

Thank you @PSchmiedmayer! I've made your suggested changes here. If you think we're ready to start a PR, I'd be happy to create that.

PSchmiedmayer commented 5 months ago

@mjoerke Sure, a PR is always a great idea; I am happy to add some comments and additional hints in the git diff 🚀