This PR contains all the work related to setting up this project as required to implement the [Assignment](https://repo.rock-n-code.com/rock-n-code/deep-linking-assignment/wiki/Assignment) on top, as intended. To summarise this work: - [x] created a new **Xcode** project; - [x] cloned the `Wikipedia` app and inserted it into the **Xcode** project; - [x] created the `Locations` app and also, its `Libraries` package; - [x] created the `Shared` package to share dependencies between the apps; - [x] added a `Makefile` file and implemented some **environment** and **help** commands. Co-authored-by: Javier Cicchelli <javier@rock-n-code.com> Reviewed-on: rock-n-code/deep-linking-assignment#1
731 lines
27 KiB
Swift
731 lines
27 KiB
Swift
/*
|
||
* Event Platform Client (EPC)
|
||
*
|
||
* DESCRIPTION
|
||
* Collects events in an input buffer, adds some metadata, places them in an
|
||
* ouput buffer where they are periodically bursted to a remote endpoint via
|
||
* HTTP POST.
|
||
*
|
||
* Designed for use with Wikipedia iOS application producing events to a
|
||
* stream intake service.
|
||
*
|
||
* LICENSE NOTICE
|
||
* Copyright 2020 Wikimedia Foundation
|
||
*
|
||
* Redistribution and use in source and binary forms, with or without
|
||
* modification, are permitted provided that the following conditions are
|
||
* met:
|
||
*
|
||
* 1. Redistributions of source code must retain the above copyright notice,
|
||
* this list of conditions and the following disclaimer.
|
||
*
|
||
* 2. Redistributions in binary form must reproduce the above copyright
|
||
* notice, this list of conditions and the following disclaimer in the
|
||
* documentation and/or other materials provided with the distribution.
|
||
*
|
||
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
|
||
* IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
|
||
* THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
|
||
* PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
|
||
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
|
||
* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
|
||
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
|
||
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
|
||
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
|
||
* NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
|
||
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||
*/
|
||
|
||
import Foundation
|
||
import CocoaLumberjackSwift
|
||
|
||
/**
|
||
* Event Platform Client (EPC)
|
||
*
|
||
* Use `EPC.shared?.submit(stream, event, domain?, date?)` to submit ("log") events to
|
||
* streams.
|
||
*
|
||
* iOS schemas will always include the following fields which are managed by EPC
|
||
* and which will be assigned automatically by the library:
|
||
* - `dt`: client-side timestamp of when event was originally submitted
|
||
* - `app_install_id`: app install ID as in legacy EventLoggingService
|
||
* - `app_session_id`: the ID of the session at the time of the event when it was
|
||
* originally submitted
|
||
*/
|
||
public class EventPlatformClient: NSObject, SamplingControllerDelegate {
|
||
// MARK: - Properties
|
||
|
||
public static let shared: EventPlatformClient = {
|
||
return EventPlatformClient()
|
||
}()
|
||
|
||
// SINGLETONTODO
|
||
/// Session for requesting data
|
||
let session = MWKDataStore.shared().session
|
||
let samplingController: SamplingController
|
||
let storageManager: StorageManager?
|
||
|
||
/**
|
||
* Store events until the library is finished initializing
|
||
*
|
||
* The EPC library makes an HTTP request to a remote stream configuration service for information
|
||
* about how to evaluate incoming event data. Until this initialization is complete, we store any incoming
|
||
* events in this buffer.
|
||
*
|
||
* Only modify (append events to, remove events from) asynchronously via `queue.async`
|
||
*/
|
||
private var inputBuffer: [(Data, Stream)] = []
|
||
|
||
/**
|
||
* Maximum number of events allowed in the input buffer
|
||
*/
|
||
private let inbutBufferLimit = 128
|
||
|
||
/**
|
||
* Streams are the event stream identifiers that can be utilized with the EventPlatformClientLibrary. They should
|
||
* correspond to the `$id` of a schema in
|
||
* [this repository](https://gerrit.wikimedia.org/g/schemas/event/secondary/).
|
||
*/
|
||
public enum Stream: String, Codable {
|
||
case editHistoryCompare = "ios.edit_history_compare"
|
||
case remoteNotificationsInteraction = "ios.notification_interaction"
|
||
case talkPagesInteraction = "ios.talk_page_interaction"
|
||
}
|
||
|
||
/**
|
||
* Schema specifies which schema (and specifically which version of that schema)
|
||
* a given event conforms to. Analytics schemas can be found in the jsonschema directory of
|
||
* [secondary repo](https://gerrit.wikimedia.org/g/schemas/event/secondary/).
|
||
* As an example, if instrumenting client-side error logging, a possible
|
||
* `$schema` would be `/mediawiki/client/error/1.0.0`. For the most part, the
|
||
* `$schema` will start with `/analytics`, since there's where
|
||
* analytics-related schemas are collected.
|
||
*/
|
||
public enum Schema: String, Codable {
|
||
case editHistoryCompare = "/analytics/mobile_apps/ios_edit_history_compare/2.1.0"
|
||
case remoteNotificationsInteraction = "/analytics/mobile_apps/ios_notification_interaction/2.1.0"
|
||
case talkPages = "/analytics/mobile_apps/ios_talk_page_interaction/1.0.0"
|
||
}
|
||
|
||
/**
|
||
* Serial dispatch queue that enables working with properties in a thread-safe
|
||
* way
|
||
*/
|
||
private let queue = DispatchQueue(label: "EventPlatformClient-" + UUID().uuidString)
|
||
|
||
/**
|
||
* Serial dispatch queue for encoding data on a background thread
|
||
*/
|
||
private let encodeQueue = DispatchQueue(label: "EventPlatformClientEncode-" + UUID().uuidString, qos: .background)
|
||
|
||
/**
|
||
* Where to send events to for intake
|
||
*
|
||
* See [wikitech:Event Platform/EventGate](https://wikitech.wikimedia.org/wiki/Event_Platform/EventGate)
|
||
* for more information. Specifically, the section on
|
||
* **eventgate-analytics-external**. This service uses the stream
|
||
* configurations from Meta wiki as its source of truth.
|
||
*/
|
||
private static let eventIntakeURI = URL(string: "https://intake-analytics.wikimedia.org/v1/events")!
|
||
|
||
/**
|
||
* MediaWiki API endpoint which returns stream configurations as JSON
|
||
*
|
||
* Streams are configured via [mediawiki-config/wmf-config/InitialiseSettings.php](https://gerrit.wikimedia.org/g/operations/mediawiki-config/+/master/wmf-config/InitialiseSettings.php)
|
||
*
|
||
* The config changes are deployed in [backport windows](https://wikitech.wikimedia.org/wiki/Backport_windows)
|
||
* by scheduling on the [Deployments](https://wikitech.wikimedia.org/wiki/Deployments)
|
||
* page. Stream configurations are made available for external consumption via
|
||
* MediaWiki API via [Extension:EventStreamConfig](https://gerrit.wikimedia.org/g/mediawiki/extensions/EventStreamConfig/)
|
||
*
|
||
* In production, we use [Meta wiki](https://meta.wikimedia.org/wiki/Main_Page)
|
||
* [streamconfigs endpoint](https://meta.wikimedia.org/w/api.php?action=help&modules=streamconfigs)
|
||
* with the constraint that the `destination_event_service` is configured to
|
||
* be "eventgate-analytics-external" (to filter out irrelevant streams from
|
||
* the returned list of stream configurations).
|
||
*/
|
||
private static let streamConfigsURI = URL(string: "https://meta.wikimedia.org/w/api.php?action=streamconfigs&format=json&constraints=destination_event_service=eventgate-analytics-external")!
|
||
|
||
/**
|
||
* An individual stream's configuration.
|
||
*/
|
||
struct StreamConfiguration: Codable {
|
||
let sampling: Sampling?
|
||
struct Sampling: Codable {
|
||
let rate: Double?
|
||
let identifier: String?
|
||
}
|
||
}
|
||
|
||
/**
|
||
* Holds each stream's configuration.
|
||
*/
|
||
private var streamConfigurations: [Stream: StreamConfiguration]? {
|
||
get {
|
||
queue.sync {
|
||
return _streamConfigurations
|
||
}
|
||
}
|
||
set {
|
||
queue.async {
|
||
self._streamConfigurations = newValue
|
||
}
|
||
}
|
||
}
|
||
private var _streamConfigurations: [Stream: StreamConfiguration]? = nil
|
||
|
||
/**
|
||
* Updated when app enters background, used for determining if the session has
|
||
* expired.
|
||
*/
|
||
private var lastTimestamp: Date = Date()
|
||
|
||
/**
|
||
* Return a session identifier
|
||
* - Returns: session ID
|
||
*
|
||
* The identifier is a string of 20 zero-padded hexadecimal digits
|
||
* representing a uniformly random 80-bit integer.
|
||
*/
|
||
internal var sessionID: String {
|
||
queue.sync {
|
||
guard let sID = _sessionID else {
|
||
let newID = generateID()
|
||
_sessionID = newID
|
||
return newID
|
||
}
|
||
|
||
return sID
|
||
}
|
||
}
|
||
private var _sessionID: String?
|
||
|
||
|
||
// MARK: - Methods
|
||
|
||
public override init() {
|
||
self.storageManager = StorageManager.shared
|
||
self.samplingController = SamplingController()
|
||
|
||
super.init()
|
||
|
||
self.samplingController.delegate = self
|
||
|
||
guard self.storageManager != nil else {
|
||
DDLogError("EPC: Error initializing the storage manager. Event intake and submission will be disabled.")
|
||
return
|
||
}
|
||
|
||
self.fetchStreamConfiguration(retries: 10, retryDelay: 30)
|
||
}
|
||
|
||
/**
|
||
* This method is called by the application delegate in
|
||
* `applicationWillResignActive()` and disables event logging.
|
||
*/
|
||
public func appInBackground() {
|
||
lastTimestamp = Date()
|
||
}
|
||
/**
|
||
* This method is called by the application delegate in
|
||
* `applicationDidBecomeActive()` and re-enables event logging.
|
||
*
|
||
* If it has been more than 15 minutes since the app entered background state,
|
||
* a new session is started.
|
||
*/
|
||
public func appInForeground() {
|
||
if sessionTimedOut() {
|
||
resetSession()
|
||
}
|
||
}
|
||
/**
|
||
* This method is called by the application delegate in
|
||
* `applicationWillTerminate()`
|
||
*
|
||
* We do not persist session ID on app close because we have decided that a
|
||
* session ends when the user (or the OS) has closed the app or when 15
|
||
* minutes of inactivity have passed.
|
||
*/
|
||
public func appWillClose() {
|
||
// Placeholder for any onTerminate logic
|
||
}
|
||
|
||
/**
|
||
* Generates a new identifier using the same algorithm as EPC libraries for
|
||
* web and Android
|
||
*/
|
||
private func generateID() -> String {
|
||
var id: String = ""
|
||
for _ in 1...5 {
|
||
id += String(format: "%04x", arc4random_uniform(65535))
|
||
}
|
||
return id
|
||
}
|
||
|
||
/**
|
||
* Called when user toggles logging permissions in Settings
|
||
*
|
||
* This assumes storageManager's deviceID will be reset separately by a
|
||
* different owner (EventLoggingService's `reset()` method)
|
||
*/
|
||
public func reset() {
|
||
resetSession()
|
||
}
|
||
|
||
/**
|
||
* Unset the session
|
||
*/
|
||
private func resetSession() {
|
||
queue.async {
|
||
self._sessionID = nil
|
||
}
|
||
samplingController.removeAllSamplingCache()
|
||
}
|
||
|
||
/**
|
||
* Check if session expired, based on last active timestamp
|
||
*
|
||
* A new session ID is required if it has been more than 15 minutes since the
|
||
* user was last active (e.g. when app entered background).
|
||
*/
|
||
private func sessionTimedOut() -> Bool {
|
||
/*
|
||
* A TimeInterval value is always specified in seconds.
|
||
*/
|
||
return lastTimestamp.timeIntervalSinceNow < -900
|
||
}
|
||
|
||
/**
|
||
* Fetch stream configuration from stream configuration service
|
||
* - Parameters:
|
||
* - retries: number of retries remaining
|
||
* - retryDelay: seconds between each attempt, increasing by 50% after
|
||
* every failed attempt
|
||
*/
|
||
private func fetchStreamConfiguration(retries: Int, retryDelay: TimeInterval) {
|
||
self.httpGet(url: EventPlatformClient.streamConfigsURI, completion: { (data, response, error) in
|
||
guard let httpResponse = response as? HTTPURLResponse, let data = data, httpResponse.statusCode == 200 else {
|
||
DDLogWarn("EPC: Server did not respond adequately, will try \(EventPlatformClient.streamConfigsURI.absoluteString) again")
|
||
|
||
if retries > 0 {
|
||
dispatchOnMainQueueAfterDelayInSeconds(retryDelay) {
|
||
self.fetchStreamConfiguration(retries: retries - 1, retryDelay: retryDelay * 1.5)
|
||
}
|
||
} else {
|
||
DDLogWarn("EPC: Ran out of retries when attempting to download stream configs")
|
||
}
|
||
|
||
return
|
||
}
|
||
|
||
self.loadStreamConfiguration(data)
|
||
})
|
||
}
|
||
|
||
/**
|
||
* Processes fetched stream config
|
||
* - Parameter data: JSON-serialized stream configuration
|
||
*
|
||
* Example of a retrieved config:
|
||
* ``` js
|
||
* {
|
||
* "streams": {
|
||
* "test.instrumentation.sampled": {
|
||
* "sampling": {
|
||
* "rate":0.1
|
||
* }
|
||
* },
|
||
* "test.instrumentation": {},
|
||
* }
|
||
* }
|
||
* ```
|
||
*/
|
||
private func loadStreamConfiguration(_ data: Data) {
|
||
#if DEBUG
|
||
if let raw = String.init(data: data, encoding: String.Encoding.utf8) {
|
||
DDLogDebug("EPC: Downloaded stream configs (raw): \(raw)")
|
||
}
|
||
#endif
|
||
guard let storageManager = self.storageManager else {
|
||
DDLogError("Storage manager not initialized; this shouldn't happen!")
|
||
return
|
||
}
|
||
struct StreamConfigurationsJSON: Codable {
|
||
let streams: [String: StreamConfiguration]
|
||
}
|
||
do {
|
||
let json = try JSONDecoder().decode(StreamConfigurationsJSON.self, from: data)
|
||
|
||
// Make them available to any newly logged events before flushing
|
||
// buffer (this is set using serial queue but asynchronously)
|
||
streamConfigurations = json.streams.reduce(into: [:], { (result, kv) in
|
||
guard let stream = Stream(rawValue: kv.key) else {
|
||
return
|
||
}
|
||
result?[stream] = kv.value
|
||
})
|
||
|
||
// Process event buffer after making stream configs available
|
||
// NOTE: If any event is re-submitted while streamConfigurations
|
||
// is still being set (asynchronously), they will just go back to
|
||
// input buffer.
|
||
while let (data, stream) = inputBufferPopFirst() {
|
||
guard let config = streamConfigurations?[stream] else {
|
||
continue
|
||
}
|
||
guard samplingController.inSample(stream: stream, config: config) else {
|
||
continue
|
||
}
|
||
storageManager.push(data: data, stream: stream)
|
||
}
|
||
} catch let error {
|
||
DDLogError("EPC: Problem processing JSON payload from response: \(error)")
|
||
}
|
||
}
|
||
|
||
/**
|
||
* Flush the queue of outgoing requests in a first-in-first-out,
|
||
* fire-and-forget fashion
|
||
*/
|
||
func postAllScheduled(_ completion: (() -> Void)? = nil) {
|
||
guard let storageManager = self.storageManager else {
|
||
completion?()
|
||
return
|
||
}
|
||
|
||
let events = storageManager.popAll()
|
||
if events.count == 0 {
|
||
// DDLogDebug("EPC: Nothing to send.")
|
||
completion?()
|
||
return
|
||
}
|
||
|
||
DDLogDebug("EPC: Processing all scheduled requests")
|
||
let group = DispatchGroup()
|
||
for event in events {
|
||
group.enter()
|
||
httpPost(url: EventPlatformClient.eventIntakeURI, body: event.data) { result in
|
||
switch result {
|
||
case .success:
|
||
storageManager.markPurgeable(event: event)
|
||
break
|
||
case .failure(let error):
|
||
switch error {
|
||
case .networkingLibraryError:
|
||
/// Leave unmarked to retry on networking library failure
|
||
break
|
||
default:
|
||
/// Give up on events rejected by the server
|
||
DDLogError("EPC: The analytics service failed to process an event. A response code of 400 could indicate that the event didn't conform to provided schema. Check the error for more information.: \(error)")
|
||
storageManager.markPurgeable(event: event)
|
||
break
|
||
}
|
||
}
|
||
group.leave()
|
||
}
|
||
}
|
||
group.notify(queue: queue) {
|
||
completion?()
|
||
}
|
||
}
|
||
|
||
/// EventBody is used to encode event data into the POST body of a request to the Modern Event Platform
|
||
struct EventBody<E>: Encodable where E: EventInterface {
|
||
/// EventGate needs to know which version of the schema to validate against
|
||
var meta: Meta
|
||
|
||
struct Meta: Codable {
|
||
let stream: Stream
|
||
|
||
/**
|
||
* meta.id is *optional* and should only be done in case the client is
|
||
* known to send duplicates of events, otherwise we don't need to
|
||
* make the payload any heavier than it already is
|
||
*/
|
||
let id: UUID
|
||
let domain: String?
|
||
}
|
||
|
||
let appInstallID: String
|
||
|
||
/**
|
||
* Generated events have the session ID attached to them before stream
|
||
* config is available (in case they're generated offline) and before
|
||
* they're cc'd to any other streams (once config is available).
|
||
*/
|
||
let appSessionID: String
|
||
|
||
/**
|
||
* The top-level field `dt` is for recording the time the event
|
||
* was generated. EventGate sets `meta.dt` during ingestion, so for
|
||
* analytics events that field is used as "timestamp of reception" and
|
||
* is used for partitioning the events in the database. See Phab:T240460
|
||
* for more information.
|
||
*/
|
||
let dt: Date
|
||
|
||
/**
|
||
* Event represents the client-provided event data.
|
||
* The event is encoded at the top level of the resulting structure.
|
||
* If any of the `CodingKeys` conflict with keys defined by `EventBody`,
|
||
* the values from `event` will be used.
|
||
*/
|
||
let event: E
|
||
|
||
enum CodingKeys: String, CodingKey {
|
||
case schema = "$schema"
|
||
case meta
|
||
case appInstallID = "app_install_id"
|
||
case appSessionID = "app_session_id"
|
||
case dt
|
||
case event
|
||
}
|
||
|
||
func encode(to encoder: Encoder) throws {
|
||
var container = encoder.container(keyedBy: CodingKeys.self)
|
||
do {
|
||
try container.encode(meta, forKey: .meta)
|
||
try container.encode(appInstallID, forKey: .appInstallID)
|
||
try container.encode(appSessionID, forKey: .appSessionID)
|
||
try container.encode(dt, forKey: .dt)
|
||
try container.encode(E.schema, forKey: .schema)
|
||
try event.encode(to: encoder)
|
||
} catch let error {
|
||
DDLogError("EPC: Error encoding event body: \(error)")
|
||
}
|
||
}
|
||
}
|
||
|
||
/**
|
||
* Submit an event according to the given stream's configuration.
|
||
* - Parameters:
|
||
* - stream: The stream to submit the event to
|
||
* - event: The event data
|
||
* - domain: Optional domain to include for the event (without protocol)
|
||
*
|
||
* An example call:
|
||
* ```
|
||
* struct TestEvent: EventInterface {
|
||
* static let schema = "/analytics/mobile_apps/test/1.0.0"
|
||
* let test_string: String
|
||
* let test_map: SourceInfo
|
||
* struct SourceInfo: Codable {
|
||
* let file: String
|
||
* let method: String
|
||
* }
|
||
* }
|
||
*
|
||
* let sourceInfo = TestEvent.SourceInfo(file: "Features/Feed/ExploreViewController.swift", method: "refreshControlActivated")
|
||
* let event = TestEvent(test_string: "Explore Feed refreshed", test_map: sourceInfo)
|
||
*
|
||
* EventPlatformClient.shared?.submit(
|
||
* stream: .test, // Defined in `EPC.Stream`
|
||
* event: event
|
||
* )
|
||
* ```
|
||
*
|
||
* Regarding `domain`: this is *optional* and should be used when event needs
|
||
* to be attrributed to a particular wiki (Wikidata, Wikimedia Commons, a
|
||
* specific edition of Wikipedia, etc.). If the language is NOT relevant in
|
||
* the context, `domain` can be safely omitted. Using "domain" rather than
|
||
* "language" is consistent with the other platforms and allows for the
|
||
* possibility of setting a non-Wikipedia domain like "commons.wikimedia.org"
|
||
* and "wikidata.org" for multimedia/metadata-related in-app analytics.
|
||
* Instrumentation code should use the `host` property of a `URL` as the value
|
||
* for this parameter.
|
||
*
|
||
* Cases where instrumentation would set a `domain`:
|
||
* - reading or editing an article
|
||
* - managing watchlist
|
||
* - interacting with feed
|
||
* - searching
|
||
*
|
||
* Cases where it might not be necessary for the instrument to set a `domain`:
|
||
* - changing settings
|
||
* - managing reading lists
|
||
* - navigating map of nearby articles
|
||
* - multi-lingual features like Suggested Edits
|
||
* - marking session start/end; in which case schema and `data` should have a
|
||
* `languages` field where user's list of languages can be stored, although
|
||
* it might make sense to set it to the domain associated with the user's
|
||
* 1st preferred language – in which case use
|
||
* `MWKLanguageLinkController.sharedInstance().appLanguage.siteURL().host`
|
||
*/
|
||
public func submit<E: EventInterface>(stream: Stream, event: E, domain: String? = nil) {
|
||
let date = Date() // Record the date synchronously so there's no delay
|
||
encodeQueue.async {
|
||
self._submit(stream: stream, event: event, date: date, domain: domain)
|
||
}
|
||
}
|
||
|
||
/// Private, synchronous version of `submit`.
|
||
private func _submit<E: EventInterface>(stream: Stream, event: E, date: Date, domain: String? = nil) {
|
||
guard let storageManager = self.storageManager else {
|
||
return
|
||
}
|
||
|
||
let userDefaults = UserDefaults.standard
|
||
|
||
if !userDefaults.wmf_sendUsageReports {
|
||
return
|
||
}
|
||
|
||
guard let appInstallID = userDefaults.wmf_appInstallId else {
|
||
DDLogWarn("EPC: App install ID is unset. This shouldn't happen.")
|
||
return
|
||
}
|
||
|
||
let meta = EventBody<E>.Meta(stream: stream, id: UUID(), domain: domain)
|
||
|
||
let eventPayload = EventBody(meta: meta, appInstallID: appInstallID, appSessionID: sessionID, dt: date, event: event)
|
||
do {
|
||
let encoder = JSONEncoder()
|
||
encoder.dateEncodingStrategy = .iso8601
|
||
|
||
#if DEBUG
|
||
encoder.outputFormatting = .prettyPrinted
|
||
#endif
|
||
|
||
let data = try encoder.encode(eventPayload)
|
||
|
||
#if DEBUG
|
||
let jsonString = String(data: data, encoding: .utf8)!
|
||
DDLogDebug("EPC: Scheduling event to be sent to \(EventPlatformClient.eventIntakeURI) with POST body:\n\(jsonString)")
|
||
#endif
|
||
|
||
guard let streamConfigs = streamConfigurations else {
|
||
appendEventToInputBuffer(data: data, stream: stream)
|
||
return
|
||
}
|
||
guard let config = streamConfigs[stream] else {
|
||
DDLogDebug("EPC: Event submitted to '\(stream)' but only the following streams are configured: \(streamConfigs.keys.map(\.rawValue).joined(separator: ", "))")
|
||
return
|
||
}
|
||
guard samplingController.inSample(stream: stream, config: config) else {
|
||
DDLogDebug("EPC: Stream '\(stream.rawValue)' is not in sample")
|
||
return
|
||
}
|
||
storageManager.push(data: data, stream: stream)
|
||
} catch let error {
|
||
DDLogError("EPC: \(error.localizedDescription)")
|
||
}
|
||
|
||
}
|
||
}
|
||
|
||
// MARK: Thread-safe accessors for collection properties
|
||
private extension EventPlatformClient {
|
||
|
||
/**
|
||
* Thread-safe synchronous retrieval of buffered events
|
||
*/
|
||
func getInputBuffer() -> [(Data, Stream)] {
|
||
queue.sync {
|
||
return self.inputBuffer
|
||
}
|
||
}
|
||
|
||
/**
|
||
* Thread-safe synchronous buffering of an event
|
||
* - Parameter event: event to be buffered
|
||
*/
|
||
func appendEventToInputBuffer(data: Data, stream: Stream) {
|
||
queue.sync {
|
||
/*
|
||
* Check if input buffer has reached maximum allowed size. Practically
|
||
* speaking, there should not have been over a hundred events
|
||
* generated when the user first launches the app and before the
|
||
* stream configuration has been downloaded and becomes available. In
|
||
* such a case we're just going to start clearing out the oldest
|
||
* events to make room for new ones.
|
||
*/
|
||
if self.inputBuffer.count == self.inbutBufferLimit {
|
||
_ = self.inputBuffer.remove(at: 0)
|
||
}
|
||
self.inputBuffer.append((data, stream))
|
||
}
|
||
}
|
||
|
||
|
||
/**
|
||
* Thread-safe synchronous removal of first buffered event
|
||
* - Returns: a previously buffered event
|
||
*/
|
||
func inputBufferPopFirst() -> (Data, Stream)? {
|
||
queue.sync {
|
||
if self.inputBuffer.isEmpty {
|
||
return nil
|
||
}
|
||
return self.inputBuffer.remove(at: 0)
|
||
}
|
||
}
|
||
}
|
||
|
||
// MARK: NetworkIntegration
|
||
|
||
private extension EventPlatformClient {
|
||
/// PostEventError describes the possible failure cases when POSTing an event
|
||
enum PostEventError: Error {
|
||
case networkingLibraryError(_ error: Error)
|
||
case missingResponse
|
||
case unexepectedResponse(_ httpCode: Int)
|
||
}
|
||
|
||
/**
|
||
* HTTP POST
|
||
* - Parameter body: Body of the POST request
|
||
* - Parameter completion: callback invoked upon receiving the server response
|
||
*/
|
||
private func httpPost(url: URL, body: Data? = nil, completion: @escaping ((Result<Void, PostEventError>) -> Void)) {
|
||
DDLogDebug("EPC: Attempting to POST events")
|
||
let request = session.request(with: url, method: .post, bodyData: body, bodyEncoding: .json)
|
||
let task = session.dataTask(with: request, completionHandler: { (_, response, error) in
|
||
let fail: (PostEventError) -> Void = { error in
|
||
DDLogDebug("EPC: An error occurred sending the request: \(error)")
|
||
completion(.failure(error))
|
||
}
|
||
if let error = error {
|
||
fail(PostEventError.networkingLibraryError(error))
|
||
return
|
||
}
|
||
guard let httpResponse = response as? HTTPURLResponse else {
|
||
fail(PostEventError.missingResponse)
|
||
return
|
||
}
|
||
guard httpResponse.statusCode == 201 else {
|
||
fail(PostEventError.unexepectedResponse(httpResponse.statusCode))
|
||
return
|
||
}
|
||
completion(.success(()))
|
||
})
|
||
task?.resume()
|
||
}
|
||
|
||
/**
|
||
* HTTP GET
|
||
* - Parameter url: Where to GET data from
|
||
* - Parameter completion: What to do with gotten data
|
||
*/
|
||
private func httpGet(url: URL, completion: @escaping (Data?, URLResponse?, Error?) -> Void) {
|
||
DDLogDebug("EPC: Attempting to GET data from \(url.absoluteString)")
|
||
var request = URLRequest.init(url: url) // httpMethod = "GET" by default
|
||
request.setValue(WikipediaAppUtils.versionedUserAgent(), forHTTPHeaderField: "User-Agent")
|
||
let task = session.dataTask(with: request, completionHandler: completion)
|
||
task?.resume()
|
||
}
|
||
}
|
||
|
||
// MARK: EventInterface
|
||
|
||
/**
|
||
* Protocol for event data.
|
||
* Currently only requires conformance to Codable.
|
||
*/
|
||
public protocol EventInterface: Codable {
|
||
/**
|
||
* Defines which schema this event conforms to.
|
||
* Check the documentation for `EPC.Schema` for more information.
|
||
*/
|
||
static var schema: EventPlatformClient.Schema { get }
|
||
}
|