731 lines
27 KiB
Swift
731 lines
27 KiB
Swift
|
/*
|
|||
|
* Event Platform Client (EPC)
|
|||
|
*
|
|||
|
* DESCRIPTION
|
|||
|
* Collects events in an input buffer, adds some metadata, places them in an
|
|||
|
* ouput buffer where they are periodically bursted to a remote endpoint via
|
|||
|
* HTTP POST.
|
|||
|
*
|
|||
|
* Designed for use with Wikipedia iOS application producing events to a
|
|||
|
* stream intake service.
|
|||
|
*
|
|||
|
* LICENSE NOTICE
|
|||
|
* Copyright 2020 Wikimedia Foundation
|
|||
|
*
|
|||
|
* Redistribution and use in source and binary forms, with or without
|
|||
|
* modification, are permitted provided that the following conditions are
|
|||
|
* met:
|
|||
|
*
|
|||
|
* 1. Redistributions of source code must retain the above copyright notice,
|
|||
|
* this list of conditions and the following disclaimer.
|
|||
|
*
|
|||
|
* 2. Redistributions in binary form must reproduce the above copyright
|
|||
|
* notice, this list of conditions and the following disclaimer in the
|
|||
|
* documentation and/or other materials provided with the distribution.
|
|||
|
*
|
|||
|
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
|
|||
|
* IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
|
|||
|
* THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
|
|||
|
* PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
|
|||
|
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
|
|||
|
* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
|
|||
|
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
|
|||
|
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
|
|||
|
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
|
|||
|
* NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
|
|||
|
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
|||
|
*/
|
|||
|
|
|||
|
import Foundation
|
|||
|
import CocoaLumberjackSwift
|
|||
|
|
|||
|
/**
|
|||
|
* Event Platform Client (EPC)
|
|||
|
*
|
|||
|
* Use `EPC.shared?.submit(stream, event, domain?, date?)` to submit ("log") events to
|
|||
|
* streams.
|
|||
|
*
|
|||
|
* iOS schemas will always include the following fields which are managed by EPC
|
|||
|
* and which will be assigned automatically by the library:
|
|||
|
* - `dt`: client-side timestamp of when event was originally submitted
|
|||
|
* - `app_install_id`: app install ID as in legacy EventLoggingService
|
|||
|
* - `app_session_id`: the ID of the session at the time of the event when it was
|
|||
|
* originally submitted
|
|||
|
*/
|
|||
|
public class EventPlatformClient: NSObject, SamplingControllerDelegate {
|
|||
|
// MARK: - Properties
|
|||
|
|
|||
|
public static let shared: EventPlatformClient = {
|
|||
|
return EventPlatformClient()
|
|||
|
}()
|
|||
|
|
|||
|
// SINGLETONTODO
|
|||
|
/// Session for requesting data
|
|||
|
let session = MWKDataStore.shared().session
|
|||
|
let samplingController: SamplingController
|
|||
|
let storageManager: StorageManager?
|
|||
|
|
|||
|
/**
|
|||
|
* Store events until the library is finished initializing
|
|||
|
*
|
|||
|
* The EPC library makes an HTTP request to a remote stream configuration service for information
|
|||
|
* about how to evaluate incoming event data. Until this initialization is complete, we store any incoming
|
|||
|
* events in this buffer.
|
|||
|
*
|
|||
|
* Only modify (append events to, remove events from) asynchronously via `queue.async`
|
|||
|
*/
|
|||
|
private var inputBuffer: [(Data, Stream)] = []
|
|||
|
|
|||
|
/**
|
|||
|
* Maximum number of events allowed in the input buffer
|
|||
|
*/
|
|||
|
private let inbutBufferLimit = 128
|
|||
|
|
|||
|
/**
|
|||
|
* Streams are the event stream identifiers that can be utilized with the EventPlatformClientLibrary. They should
|
|||
|
* correspond to the `$id` of a schema in
|
|||
|
* [this repository](https://gerrit.wikimedia.org/g/schemas/event/secondary/).
|
|||
|
*/
|
|||
|
public enum Stream: String, Codable {
|
|||
|
case editHistoryCompare = "ios.edit_history_compare"
|
|||
|
case remoteNotificationsInteraction = "ios.notification_interaction"
|
|||
|
case talkPagesInteraction = "ios.talk_page_interaction"
|
|||
|
}
|
|||
|
|
|||
|
/**
|
|||
|
* Schema specifies which schema (and specifically which version of that schema)
|
|||
|
* a given event conforms to. Analytics schemas can be found in the jsonschema directory of
|
|||
|
* [secondary repo](https://gerrit.wikimedia.org/g/schemas/event/secondary/).
|
|||
|
* As an example, if instrumenting client-side error logging, a possible
|
|||
|
* `$schema` would be `/mediawiki/client/error/1.0.0`. For the most part, the
|
|||
|
* `$schema` will start with `/analytics`, since there's where
|
|||
|
* analytics-related schemas are collected.
|
|||
|
*/
|
|||
|
public enum Schema: String, Codable {
|
|||
|
case editHistoryCompare = "/analytics/mobile_apps/ios_edit_history_compare/2.1.0"
|
|||
|
case remoteNotificationsInteraction = "/analytics/mobile_apps/ios_notification_interaction/2.1.0"
|
|||
|
case talkPages = "/analytics/mobile_apps/ios_talk_page_interaction/1.0.0"
|
|||
|
}
|
|||
|
|
|||
|
/**
|
|||
|
* Serial dispatch queue that enables working with properties in a thread-safe
|
|||
|
* way
|
|||
|
*/
|
|||
|
private let queue = DispatchQueue(label: "EventPlatformClient-" + UUID().uuidString)
|
|||
|
|
|||
|
/**
|
|||
|
* Serial dispatch queue for encoding data on a background thread
|
|||
|
*/
|
|||
|
private let encodeQueue = DispatchQueue(label: "EventPlatformClientEncode-" + UUID().uuidString, qos: .background)
|
|||
|
|
|||
|
/**
|
|||
|
* Where to send events to for intake
|
|||
|
*
|
|||
|
* See [wikitech:Event Platform/EventGate](https://wikitech.wikimedia.org/wiki/Event_Platform/EventGate)
|
|||
|
* for more information. Specifically, the section on
|
|||
|
* **eventgate-analytics-external**. This service uses the stream
|
|||
|
* configurations from Meta wiki as its source of truth.
|
|||
|
*/
|
|||
|
private static let eventIntakeURI = URL(string: "https://intake-analytics.wikimedia.org/v1/events")!
|
|||
|
|
|||
|
/**
|
|||
|
* MediaWiki API endpoint which returns stream configurations as JSON
|
|||
|
*
|
|||
|
* Streams are configured via [mediawiki-config/wmf-config/InitialiseSettings.php](https://gerrit.wikimedia.org/g/operations/mediawiki-config/+/master/wmf-config/InitialiseSettings.php)
|
|||
|
*
|
|||
|
* The config changes are deployed in [backport windows](https://wikitech.wikimedia.org/wiki/Backport_windows)
|
|||
|
* by scheduling on the [Deployments](https://wikitech.wikimedia.org/wiki/Deployments)
|
|||
|
* page. Stream configurations are made available for external consumption via
|
|||
|
* MediaWiki API via [Extension:EventStreamConfig](https://gerrit.wikimedia.org/g/mediawiki/extensions/EventStreamConfig/)
|
|||
|
*
|
|||
|
* In production, we use [Meta wiki](https://meta.wikimedia.org/wiki/Main_Page)
|
|||
|
* [streamconfigs endpoint](https://meta.wikimedia.org/w/api.php?action=help&modules=streamconfigs)
|
|||
|
* with the constraint that the `destination_event_service` is configured to
|
|||
|
* be "eventgate-analytics-external" (to filter out irrelevant streams from
|
|||
|
* the returned list of stream configurations).
|
|||
|
*/
|
|||
|
private static let streamConfigsURI = URL(string: "https://meta.wikimedia.org/w/api.php?action=streamconfigs&format=json&constraints=destination_event_service=eventgate-analytics-external")!
|
|||
|
|
|||
|
/**
|
|||
|
* An individual stream's configuration.
|
|||
|
*/
|
|||
|
struct StreamConfiguration: Codable {
|
|||
|
let sampling: Sampling?
|
|||
|
struct Sampling: Codable {
|
|||
|
let rate: Double?
|
|||
|
let identifier: String?
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
/**
|
|||
|
* Holds each stream's configuration.
|
|||
|
*/
|
|||
|
private var streamConfigurations: [Stream: StreamConfiguration]? {
|
|||
|
get {
|
|||
|
queue.sync {
|
|||
|
return _streamConfigurations
|
|||
|
}
|
|||
|
}
|
|||
|
set {
|
|||
|
queue.async {
|
|||
|
self._streamConfigurations = newValue
|
|||
|
}
|
|||
|
}
|
|||
|
}
|
|||
|
private var _streamConfigurations: [Stream: StreamConfiguration]? = nil
|
|||
|
|
|||
|
/**
|
|||
|
* Updated when app enters background, used for determining if the session has
|
|||
|
* expired.
|
|||
|
*/
|
|||
|
private var lastTimestamp: Date = Date()
|
|||
|
|
|||
|
/**
|
|||
|
* Return a session identifier
|
|||
|
* - Returns: session ID
|
|||
|
*
|
|||
|
* The identifier is a string of 20 zero-padded hexadecimal digits
|
|||
|
* representing a uniformly random 80-bit integer.
|
|||
|
*/
|
|||
|
internal var sessionID: String {
|
|||
|
queue.sync {
|
|||
|
guard let sID = _sessionID else {
|
|||
|
let newID = generateID()
|
|||
|
_sessionID = newID
|
|||
|
return newID
|
|||
|
}
|
|||
|
|
|||
|
return sID
|
|||
|
}
|
|||
|
}
|
|||
|
private var _sessionID: String?
|
|||
|
|
|||
|
|
|||
|
// MARK: - Methods
|
|||
|
|
|||
|
public override init() {
|
|||
|
self.storageManager = StorageManager.shared
|
|||
|
self.samplingController = SamplingController()
|
|||
|
|
|||
|
super.init()
|
|||
|
|
|||
|
self.samplingController.delegate = self
|
|||
|
|
|||
|
guard self.storageManager != nil else {
|
|||
|
DDLogError("EPC: Error initializing the storage manager. Event intake and submission will be disabled.")
|
|||
|
return
|
|||
|
}
|
|||
|
|
|||
|
self.fetchStreamConfiguration(retries: 10, retryDelay: 30)
|
|||
|
}
|
|||
|
|
|||
|
/**
|
|||
|
* This method is called by the application delegate in
|
|||
|
* `applicationWillResignActive()` and disables event logging.
|
|||
|
*/
|
|||
|
public func appInBackground() {
|
|||
|
lastTimestamp = Date()
|
|||
|
}
|
|||
|
/**
|
|||
|
* This method is called by the application delegate in
|
|||
|
* `applicationDidBecomeActive()` and re-enables event logging.
|
|||
|
*
|
|||
|
* If it has been more than 15 minutes since the app entered background state,
|
|||
|
* a new session is started.
|
|||
|
*/
|
|||
|
public func appInForeground() {
|
|||
|
if sessionTimedOut() {
|
|||
|
resetSession()
|
|||
|
}
|
|||
|
}
|
|||
|
/**
|
|||
|
* This method is called by the application delegate in
|
|||
|
* `applicationWillTerminate()`
|
|||
|
*
|
|||
|
* We do not persist session ID on app close because we have decided that a
|
|||
|
* session ends when the user (or the OS) has closed the app or when 15
|
|||
|
* minutes of inactivity have passed.
|
|||
|
*/
|
|||
|
public func appWillClose() {
|
|||
|
// Placeholder for any onTerminate logic
|
|||
|
}
|
|||
|
|
|||
|
/**
|
|||
|
* Generates a new identifier using the same algorithm as EPC libraries for
|
|||
|
* web and Android
|
|||
|
*/
|
|||
|
private func generateID() -> String {
|
|||
|
var id: String = ""
|
|||
|
for _ in 1...5 {
|
|||
|
id += String(format: "%04x", arc4random_uniform(65535))
|
|||
|
}
|
|||
|
return id
|
|||
|
}
|
|||
|
|
|||
|
/**
|
|||
|
* Called when user toggles logging permissions in Settings
|
|||
|
*
|
|||
|
* This assumes storageManager's deviceID will be reset separately by a
|
|||
|
* different owner (EventLoggingService's `reset()` method)
|
|||
|
*/
|
|||
|
public func reset() {
|
|||
|
resetSession()
|
|||
|
}
|
|||
|
|
|||
|
/**
|
|||
|
* Unset the session
|
|||
|
*/
|
|||
|
private func resetSession() {
|
|||
|
queue.async {
|
|||
|
self._sessionID = nil
|
|||
|
}
|
|||
|
samplingController.removeAllSamplingCache()
|
|||
|
}
|
|||
|
|
|||
|
/**
|
|||
|
* Check if session expired, based on last active timestamp
|
|||
|
*
|
|||
|
* A new session ID is required if it has been more than 15 minutes since the
|
|||
|
* user was last active (e.g. when app entered background).
|
|||
|
*/
|
|||
|
private func sessionTimedOut() -> Bool {
|
|||
|
/*
|
|||
|
* A TimeInterval value is always specified in seconds.
|
|||
|
*/
|
|||
|
return lastTimestamp.timeIntervalSinceNow < -900
|
|||
|
}
|
|||
|
|
|||
|
/**
|
|||
|
* Fetch stream configuration from stream configuration service
|
|||
|
* - Parameters:
|
|||
|
* - retries: number of retries remaining
|
|||
|
* - retryDelay: seconds between each attempt, increasing by 50% after
|
|||
|
* every failed attempt
|
|||
|
*/
|
|||
|
private func fetchStreamConfiguration(retries: Int, retryDelay: TimeInterval) {
|
|||
|
self.httpGet(url: EventPlatformClient.streamConfigsURI, completion: { (data, response, error) in
|
|||
|
guard let httpResponse = response as? HTTPURLResponse, let data = data, httpResponse.statusCode == 200 else {
|
|||
|
DDLogWarn("EPC: Server did not respond adequately, will try \(EventPlatformClient.streamConfigsURI.absoluteString) again")
|
|||
|
|
|||
|
if retries > 0 {
|
|||
|
dispatchOnMainQueueAfterDelayInSeconds(retryDelay) {
|
|||
|
self.fetchStreamConfiguration(retries: retries - 1, retryDelay: retryDelay * 1.5)
|
|||
|
}
|
|||
|
} else {
|
|||
|
DDLogWarn("EPC: Ran out of retries when attempting to download stream configs")
|
|||
|
}
|
|||
|
|
|||
|
return
|
|||
|
}
|
|||
|
|
|||
|
self.loadStreamConfiguration(data)
|
|||
|
})
|
|||
|
}
|
|||
|
|
|||
|
/**
|
|||
|
* Processes fetched stream config
|
|||
|
* - Parameter data: JSON-serialized stream configuration
|
|||
|
*
|
|||
|
* Example of a retrieved config:
|
|||
|
* ``` js
|
|||
|
* {
|
|||
|
* "streams": {
|
|||
|
* "test.instrumentation.sampled": {
|
|||
|
* "sampling": {
|
|||
|
* "rate":0.1
|
|||
|
* }
|
|||
|
* },
|
|||
|
* "test.instrumentation": {},
|
|||
|
* }
|
|||
|
* }
|
|||
|
* ```
|
|||
|
*/
|
|||
|
private func loadStreamConfiguration(_ data: Data) {
|
|||
|
#if DEBUG
|
|||
|
if let raw = String.init(data: data, encoding: String.Encoding.utf8) {
|
|||
|
DDLogDebug("EPC: Downloaded stream configs (raw): \(raw)")
|
|||
|
}
|
|||
|
#endif
|
|||
|
guard let storageManager = self.storageManager else {
|
|||
|
DDLogError("Storage manager not initialized; this shouldn't happen!")
|
|||
|
return
|
|||
|
}
|
|||
|
struct StreamConfigurationsJSON: Codable {
|
|||
|
let streams: [String: StreamConfiguration]
|
|||
|
}
|
|||
|
do {
|
|||
|
let json = try JSONDecoder().decode(StreamConfigurationsJSON.self, from: data)
|
|||
|
|
|||
|
// Make them available to any newly logged events before flushing
|
|||
|
// buffer (this is set using serial queue but asynchronously)
|
|||
|
streamConfigurations = json.streams.reduce(into: [:], { (result, kv) in
|
|||
|
guard let stream = Stream(rawValue: kv.key) else {
|
|||
|
return
|
|||
|
}
|
|||
|
result?[stream] = kv.value
|
|||
|
})
|
|||
|
|
|||
|
// Process event buffer after making stream configs available
|
|||
|
// NOTE: If any event is re-submitted while streamConfigurations
|
|||
|
// is still being set (asynchronously), they will just go back to
|
|||
|
// input buffer.
|
|||
|
while let (data, stream) = inputBufferPopFirst() {
|
|||
|
guard let config = streamConfigurations?[stream] else {
|
|||
|
continue
|
|||
|
}
|
|||
|
guard samplingController.inSample(stream: stream, config: config) else {
|
|||
|
continue
|
|||
|
}
|
|||
|
storageManager.push(data: data, stream: stream)
|
|||
|
}
|
|||
|
} catch let error {
|
|||
|
DDLogError("EPC: Problem processing JSON payload from response: \(error)")
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
/**
|
|||
|
* Flush the queue of outgoing requests in a first-in-first-out,
|
|||
|
* fire-and-forget fashion
|
|||
|
*/
|
|||
|
func postAllScheduled(_ completion: (() -> Void)? = nil) {
|
|||
|
guard let storageManager = self.storageManager else {
|
|||
|
completion?()
|
|||
|
return
|
|||
|
}
|
|||
|
|
|||
|
let events = storageManager.popAll()
|
|||
|
if events.count == 0 {
|
|||
|
// DDLogDebug("EPC: Nothing to send.")
|
|||
|
completion?()
|
|||
|
return
|
|||
|
}
|
|||
|
|
|||
|
DDLogDebug("EPC: Processing all scheduled requests")
|
|||
|
let group = DispatchGroup()
|
|||
|
for event in events {
|
|||
|
group.enter()
|
|||
|
httpPost(url: EventPlatformClient.eventIntakeURI, body: event.data) { result in
|
|||
|
switch result {
|
|||
|
case .success:
|
|||
|
storageManager.markPurgeable(event: event)
|
|||
|
break
|
|||
|
case .failure(let error):
|
|||
|
switch error {
|
|||
|
case .networkingLibraryError:
|
|||
|
/// Leave unmarked to retry on networking library failure
|
|||
|
break
|
|||
|
default:
|
|||
|
/// Give up on events rejected by the server
|
|||
|
DDLogError("EPC: The analytics service failed to process an event. A response code of 400 could indicate that the event didn't conform to provided schema. Check the error for more information.: \(error)")
|
|||
|
storageManager.markPurgeable(event: event)
|
|||
|
break
|
|||
|
}
|
|||
|
}
|
|||
|
group.leave()
|
|||
|
}
|
|||
|
}
|
|||
|
group.notify(queue: queue) {
|
|||
|
completion?()
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
/// EventBody is used to encode event data into the POST body of a request to the Modern Event Platform
|
|||
|
struct EventBody<E>: Encodable where E: EventInterface {
|
|||
|
/// EventGate needs to know which version of the schema to validate against
|
|||
|
var meta: Meta
|
|||
|
|
|||
|
struct Meta: Codable {
|
|||
|
let stream: Stream
|
|||
|
|
|||
|
/**
|
|||
|
* meta.id is *optional* and should only be done in case the client is
|
|||
|
* known to send duplicates of events, otherwise we don't need to
|
|||
|
* make the payload any heavier than it already is
|
|||
|
*/
|
|||
|
let id: UUID
|
|||
|
let domain: String?
|
|||
|
}
|
|||
|
|
|||
|
let appInstallID: String
|
|||
|
|
|||
|
/**
|
|||
|
* Generated events have the session ID attached to them before stream
|
|||
|
* config is available (in case they're generated offline) and before
|
|||
|
* they're cc'd to any other streams (once config is available).
|
|||
|
*/
|
|||
|
let appSessionID: String
|
|||
|
|
|||
|
/**
|
|||
|
* The top-level field `dt` is for recording the time the event
|
|||
|
* was generated. EventGate sets `meta.dt` during ingestion, so for
|
|||
|
* analytics events that field is used as "timestamp of reception" and
|
|||
|
* is used for partitioning the events in the database. See Phab:T240460
|
|||
|
* for more information.
|
|||
|
*/
|
|||
|
let dt: Date
|
|||
|
|
|||
|
/**
|
|||
|
* Event represents the client-provided event data.
|
|||
|
* The event is encoded at the top level of the resulting structure.
|
|||
|
* If any of the `CodingKeys` conflict with keys defined by `EventBody`,
|
|||
|
* the values from `event` will be used.
|
|||
|
*/
|
|||
|
let event: E
|
|||
|
|
|||
|
enum CodingKeys: String, CodingKey {
|
|||
|
case schema = "$schema"
|
|||
|
case meta
|
|||
|
case appInstallID = "app_install_id"
|
|||
|
case appSessionID = "app_session_id"
|
|||
|
case dt
|
|||
|
case event
|
|||
|
}
|
|||
|
|
|||
|
func encode(to encoder: Encoder) throws {
|
|||
|
var container = encoder.container(keyedBy: CodingKeys.self)
|
|||
|
do {
|
|||
|
try container.encode(meta, forKey: .meta)
|
|||
|
try container.encode(appInstallID, forKey: .appInstallID)
|
|||
|
try container.encode(appSessionID, forKey: .appSessionID)
|
|||
|
try container.encode(dt, forKey: .dt)
|
|||
|
try container.encode(E.schema, forKey: .schema)
|
|||
|
try event.encode(to: encoder)
|
|||
|
} catch let error {
|
|||
|
DDLogError("EPC: Error encoding event body: \(error)")
|
|||
|
}
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
/**
|
|||
|
* Submit an event according to the given stream's configuration.
|
|||
|
* - Parameters:
|
|||
|
* - stream: The stream to submit the event to
|
|||
|
* - event: The event data
|
|||
|
* - domain: Optional domain to include for the event (without protocol)
|
|||
|
*
|
|||
|
* An example call:
|
|||
|
* ```
|
|||
|
* struct TestEvent: EventInterface {
|
|||
|
* static let schema = "/analytics/mobile_apps/test/1.0.0"
|
|||
|
* let test_string: String
|
|||
|
* let test_map: SourceInfo
|
|||
|
* struct SourceInfo: Codable {
|
|||
|
* let file: String
|
|||
|
* let method: String
|
|||
|
* }
|
|||
|
* }
|
|||
|
*
|
|||
|
* let sourceInfo = TestEvent.SourceInfo(file: "Features/Feed/ExploreViewController.swift", method: "refreshControlActivated")
|
|||
|
* let event = TestEvent(test_string: "Explore Feed refreshed", test_map: sourceInfo)
|
|||
|
*
|
|||
|
* EventPlatformClient.shared?.submit(
|
|||
|
* stream: .test, // Defined in `EPC.Stream`
|
|||
|
* event: event
|
|||
|
* )
|
|||
|
* ```
|
|||
|
*
|
|||
|
* Regarding `domain`: this is *optional* and should be used when event needs
|
|||
|
* to be attrributed to a particular wiki (Wikidata, Wikimedia Commons, a
|
|||
|
* specific edition of Wikipedia, etc.). If the language is NOT relevant in
|
|||
|
* the context, `domain` can be safely omitted. Using "domain" rather than
|
|||
|
* "language" is consistent with the other platforms and allows for the
|
|||
|
* possibility of setting a non-Wikipedia domain like "commons.wikimedia.org"
|
|||
|
* and "wikidata.org" for multimedia/metadata-related in-app analytics.
|
|||
|
* Instrumentation code should use the `host` property of a `URL` as the value
|
|||
|
* for this parameter.
|
|||
|
*
|
|||
|
* Cases where instrumentation would set a `domain`:
|
|||
|
* - reading or editing an article
|
|||
|
* - managing watchlist
|
|||
|
* - interacting with feed
|
|||
|
* - searching
|
|||
|
*
|
|||
|
* Cases where it might not be necessary for the instrument to set a `domain`:
|
|||
|
* - changing settings
|
|||
|
* - managing reading lists
|
|||
|
* - navigating map of nearby articles
|
|||
|
* - multi-lingual features like Suggested Edits
|
|||
|
* - marking session start/end; in which case schema and `data` should have a
|
|||
|
* `languages` field where user's list of languages can be stored, although
|
|||
|
* it might make sense to set it to the domain associated with the user's
|
|||
|
* 1st preferred language – in which case use
|
|||
|
* `MWKLanguageLinkController.sharedInstance().appLanguage.siteURL().host`
|
|||
|
*/
|
|||
|
public func submit<E: EventInterface>(stream: Stream, event: E, domain: String? = nil) {
|
|||
|
let date = Date() // Record the date synchronously so there's no delay
|
|||
|
encodeQueue.async {
|
|||
|
self._submit(stream: stream, event: event, date: date, domain: domain)
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
/// Private, synchronous version of `submit`.
|
|||
|
private func _submit<E: EventInterface>(stream: Stream, event: E, date: Date, domain: String? = nil) {
|
|||
|
guard let storageManager = self.storageManager else {
|
|||
|
return
|
|||
|
}
|
|||
|
|
|||
|
let userDefaults = UserDefaults.standard
|
|||
|
|
|||
|
if !userDefaults.wmf_sendUsageReports {
|
|||
|
return
|
|||
|
}
|
|||
|
|
|||
|
guard let appInstallID = userDefaults.wmf_appInstallId else {
|
|||
|
DDLogWarn("EPC: App install ID is unset. This shouldn't happen.")
|
|||
|
return
|
|||
|
}
|
|||
|
|
|||
|
let meta = EventBody<E>.Meta(stream: stream, id: UUID(), domain: domain)
|
|||
|
|
|||
|
let eventPayload = EventBody(meta: meta, appInstallID: appInstallID, appSessionID: sessionID, dt: date, event: event)
|
|||
|
do {
|
|||
|
let encoder = JSONEncoder()
|
|||
|
encoder.dateEncodingStrategy = .iso8601
|
|||
|
|
|||
|
#if DEBUG
|
|||
|
encoder.outputFormatting = .prettyPrinted
|
|||
|
#endif
|
|||
|
|
|||
|
let data = try encoder.encode(eventPayload)
|
|||
|
|
|||
|
#if DEBUG
|
|||
|
let jsonString = String(data: data, encoding: .utf8)!
|
|||
|
DDLogDebug("EPC: Scheduling event to be sent to \(EventPlatformClient.eventIntakeURI) with POST body:\n\(jsonString)")
|
|||
|
#endif
|
|||
|
|
|||
|
guard let streamConfigs = streamConfigurations else {
|
|||
|
appendEventToInputBuffer(data: data, stream: stream)
|
|||
|
return
|
|||
|
}
|
|||
|
guard let config = streamConfigs[stream] else {
|
|||
|
DDLogDebug("EPC: Event submitted to '\(stream)' but only the following streams are configured: \(streamConfigs.keys.map(\.rawValue).joined(separator: ", "))")
|
|||
|
return
|
|||
|
}
|
|||
|
guard samplingController.inSample(stream: stream, config: config) else {
|
|||
|
DDLogDebug("EPC: Stream '\(stream.rawValue)' is not in sample")
|
|||
|
return
|
|||
|
}
|
|||
|
storageManager.push(data: data, stream: stream)
|
|||
|
} catch let error {
|
|||
|
DDLogError("EPC: \(error.localizedDescription)")
|
|||
|
}
|
|||
|
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
// MARK: Thread-safe accessors for collection properties
|
|||
|
private extension EventPlatformClient {
|
|||
|
|
|||
|
/**
|
|||
|
* Thread-safe synchronous retrieval of buffered events
|
|||
|
*/
|
|||
|
func getInputBuffer() -> [(Data, Stream)] {
|
|||
|
queue.sync {
|
|||
|
return self.inputBuffer
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
/**
|
|||
|
* Thread-safe synchronous buffering of an event
|
|||
|
* - Parameter event: event to be buffered
|
|||
|
*/
|
|||
|
func appendEventToInputBuffer(data: Data, stream: Stream) {
|
|||
|
queue.sync {
|
|||
|
/*
|
|||
|
* Check if input buffer has reached maximum allowed size. Practically
|
|||
|
* speaking, there should not have been over a hundred events
|
|||
|
* generated when the user first launches the app and before the
|
|||
|
* stream configuration has been downloaded and becomes available. In
|
|||
|
* such a case we're just going to start clearing out the oldest
|
|||
|
* events to make room for new ones.
|
|||
|
*/
|
|||
|
if self.inputBuffer.count == self.inbutBufferLimit {
|
|||
|
_ = self.inputBuffer.remove(at: 0)
|
|||
|
}
|
|||
|
self.inputBuffer.append((data, stream))
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
|
|||
|
/**
|
|||
|
* Thread-safe synchronous removal of first buffered event
|
|||
|
* - Returns: a previously buffered event
|
|||
|
*/
|
|||
|
func inputBufferPopFirst() -> (Data, Stream)? {
|
|||
|
queue.sync {
|
|||
|
if self.inputBuffer.isEmpty {
|
|||
|
return nil
|
|||
|
}
|
|||
|
return self.inputBuffer.remove(at: 0)
|
|||
|
}
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
// MARK: NetworkIntegration
|
|||
|
|
|||
|
private extension EventPlatformClient {
|
|||
|
/// PostEventError describes the possible failure cases when POSTing an event
|
|||
|
enum PostEventError: Error {
|
|||
|
case networkingLibraryError(_ error: Error)
|
|||
|
case missingResponse
|
|||
|
case unexepectedResponse(_ httpCode: Int)
|
|||
|
}
|
|||
|
|
|||
|
/**
|
|||
|
* HTTP POST
|
|||
|
* - Parameter body: Body of the POST request
|
|||
|
* - Parameter completion: callback invoked upon receiving the server response
|
|||
|
*/
|
|||
|
private func httpPost(url: URL, body: Data? = nil, completion: @escaping ((Result<Void, PostEventError>) -> Void)) {
|
|||
|
DDLogDebug("EPC: Attempting to POST events")
|
|||
|
let request = session.request(with: url, method: .post, bodyData: body, bodyEncoding: .json)
|
|||
|
let task = session.dataTask(with: request, completionHandler: { (_, response, error) in
|
|||
|
let fail: (PostEventError) -> Void = { error in
|
|||
|
DDLogDebug("EPC: An error occurred sending the request: \(error)")
|
|||
|
completion(.failure(error))
|
|||
|
}
|
|||
|
if let error = error {
|
|||
|
fail(PostEventError.networkingLibraryError(error))
|
|||
|
return
|
|||
|
}
|
|||
|
guard let httpResponse = response as? HTTPURLResponse else {
|
|||
|
fail(PostEventError.missingResponse)
|
|||
|
return
|
|||
|
}
|
|||
|
guard httpResponse.statusCode == 201 else {
|
|||
|
fail(PostEventError.unexepectedResponse(httpResponse.statusCode))
|
|||
|
return
|
|||
|
}
|
|||
|
completion(.success(()))
|
|||
|
})
|
|||
|
task?.resume()
|
|||
|
}
|
|||
|
|
|||
|
/**
|
|||
|
* HTTP GET
|
|||
|
* - Parameter url: Where to GET data from
|
|||
|
* - Parameter completion: What to do with gotten data
|
|||
|
*/
|
|||
|
private func httpGet(url: URL, completion: @escaping (Data?, URLResponse?, Error?) -> Void) {
|
|||
|
DDLogDebug("EPC: Attempting to GET data from \(url.absoluteString)")
|
|||
|
var request = URLRequest.init(url: url) // httpMethod = "GET" by default
|
|||
|
request.setValue(WikipediaAppUtils.versionedUserAgent(), forHTTPHeaderField: "User-Agent")
|
|||
|
let task = session.dataTask(with: request, completionHandler: completion)
|
|||
|
task?.resume()
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
// MARK: EventInterface
|
|||
|
|
|||
|
/**
|
|||
|
* Protocol for event data.
|
|||
|
* Currently only requires conformance to Codable.
|
|||
|
*/
|
|||
|
public protocol EventInterface: Codable {
|
|||
|
/**
|
|||
|
* Defines which schema this event conforms to.
|
|||
|
* Check the documentation for `EPC.Schema` for more information.
|
|||
|
*/
|
|||
|
static var schema: EventPlatformClient.Schema { get }
|
|||
|
}
|