-
Notifications
You must be signed in to change notification settings - Fork 65
/
Copy pathChangeStream.swift
304 lines (275 loc) · 13.5 KB
/
ChangeStream.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
import CLibMongoC
import Foundation
import NIO
/// Direct wrapper of a `mongoc_change_stream_t`.
private struct MongocChangeStream: MongocCursorWrapper {
internal let pointer: OpaquePointer
internal static var isLazy: Bool { false }
fileprivate init(stealing ptr: OpaquePointer) {
self.pointer = ptr
}
internal func errorDocument(bsonError: inout bson_error_t, replyPtr: UnsafeMutablePointer<BSONPointer?>) -> Bool {
mongoc_change_stream_error_document(self.pointer, &bsonError, replyPtr)
}
internal func next(outPtr: UnsafeMutablePointer<BSONPointer?>) -> Bool {
mongoc_change_stream_next(self.pointer, outPtr)
}
internal func more() -> Bool {
true
}
internal func destroy() {
mongoc_change_stream_destroy(self.pointer)
}
}
/// A token used for manually resuming a change stream. Pass this to the `resumeAfter` field of
/// `ChangeStreamOptions` to resume or start a change stream where a previous one left off.
/// - SeeAlso: https://docs.mongodb.com/manual/changeStreams/#resume-a-change-stream
public struct ResumeToken: Codable, Equatable {
private let resumeToken: BSONDocument
internal init(_ resumeToken: BSONDocument) {
self.resumeToken = resumeToken
}
public func encode(to encoder: Encoder) throws {
var container = encoder.singleValueContainer()
try container.encode(self.resumeToken)
}
public init(from decoder: Decoder) throws {
let container = try decoder.singleValueContainer()
self.resumeToken = try container.decode(BSONDocument.self)
}
}
// TODO: SWIFT-981: Remove this.
/// The key we use for storing a change stream's namespace in it's `userInfo`. This allows types
/// using the decoder e.g. `ChangeStreamEvent` to access the namespace even if it is not present in the raw
/// document the server returns. Ok to force unwrap as initialization never fails.
// swiftlint:disable:next force_unwrapping
internal let changeStreamNamespaceKey = CodingUserInfoKey(rawValue: "namespace")!
// sourcery: skipSyncExport
/// A MongoDB change stream.
/// - SeeAlso: https://docs.mongodb.com/manual/changeStreams/
public class ChangeStream<T: Codable>: CursorProtocol {
public typealias Element = T
/// The client this change stream descended from.
private let client: MongoClient
/// Decoder for decoding documents into type `T`.
private let decoder: BSONDecoder
/// The `EventLoop` this `ChangeStream` is bound to.
internal let eventLoop: EventLoop?
/// The cursor this change stream is wrapping.
private let wrappedCursor: Cursor<MongocChangeStream>
/// Process an event before returning it to the user, or does nothing and returns nil if the provided event is nil.
private func processEvent(_ event: BSONDocument?) throws -> T? {
guard let event = event else {
return nil
}
return try self.processEvent(event)
}
/// Process an event before returning it to the user.
private func processEvent(_ event: BSONDocument) throws -> T {
// Update the resumeToken with the `_id` field from the document.
guard let resumeToken = event["_id"]?.documentValue else {
throw MongoError.InternalError(message: "_id field is missing from the change stream document.")
}
self.resumeToken = ResumeToken(resumeToken)
return try self.decoder.decode(T.self, from: event)
}
internal init(
stealing changeStreamPtr: OpaquePointer,
connection: Connection,
client: MongoClient,
namespace: MongoNamespace,
session: ClientSession?,
decoder: BSONDecoder,
eventLoop: EventLoop?,
options: ChangeStreamOptions?
) throws {
let mongocChangeStream = MongocChangeStream(stealing: changeStreamPtr)
self.wrappedCursor = try Cursor(
mongocCursor: mongocChangeStream,
connection: connection,
session: session,
type: .tailableAwait
)
self.client = client
self.decoder = BSONDecoder(copies: decoder, options: nil)
self.decoder.userInfo[changeStreamNamespaceKey] = namespace
self.eventLoop = eventLoop
// startAfter takes precedence over resumeAfter.
if let startAfter = options?.startAfter {
self.resumeToken = startAfter
} else if let resumeAfter = options?.resumeAfter {
self.resumeToken = resumeAfter
}
}
/**
* Indicates whether this change stream has the potential to return more data.
*
* This change stream will be dead after `next` returns `nil`, but it may still be alive after `tryNext` returns
* `nil`.
*
* After either of `next` or `tryNext` return a non-`DecodingError` error, this change stream will be dead. It may
* still be alive after either returns a `DecodingError`, however.
*
* - Warning:
* If this change stream is alive when it goes out of scope, it will leak resources. To ensure it is dead
* before it leaves scope, invoke `ChangeStream.kill(...)` on it.
*/
public func isAlive() -> EventLoopFuture<Bool> {
self.client.operationExecutor.execute(on: self.eventLoop) {
self.wrappedCursor.isAlive
}
}
/// The `ResumeToken` associated with the most recent event seen by the change stream.
public internal(set) var resumeToken: ResumeToken?
/**
* Get the next `T` from this change stream.
*
* This method will continue polling until an event is returned from the server, an error occurs,
* or the change stream is killed. Each attempt to retrieve results will wait for a maximum of `maxAwaitTimeMS`
* (specified on the `ChangeStreamOptions` passed to the method that created this change stream) before trying
* again.
*
* A thread from the driver's internal thread pool will be occupied until the returned future is completed, so
* performance degradation is possible if the number of polling change streams is too close to the total number of
* threads in the thread pool. To configure the total number of threads in the pool, set the
* `MongoClientOptions.threadPoolSize` option during client creation.
*
* Note: You *must not* call any change stream methods besides `kill` and `isAlive` while the future returned from
* this method is unresolved. Doing so will result in undefined behavior.
*
* - Returns:
* An `EventLoopFuture<T?>` evaluating to the next `T` in this change stream, `nil` if the change stream is
* exhausted, or an error if one occurred. The returned future will not resolve until one of those conditions is
* met, potentially after multiple requests to the server.
*
* If the future evaluates to an error, it is likely one of the following:
* - `MongoError.CommandError` if an error occurs while fetching more results from the server.
* - `MongoError.LogicError` if this function is called after the change stream has died.
* - `MongoError.LogicError` if this function is called and the session associated with this change stream is
* inactive.
* - `DecodingError` if an error occurs decoding the server's response.
*/
public func next() -> EventLoopFuture<T?> {
self.client.operationExecutor.execute(on: self.eventLoop) {
try self.processEvent(self.wrappedCursor.next())
}
}
/**
* Attempt to get the next `T` from this change stream, returning `nil` if there are no results.
*
* The change stream will wait server-side for a maximum of `maxAwaitTimeMS` (specified on the `ChangeStreamOptions`
* passed to the method that created this change stream) before returning `nil`.
*
* This method may be called repeatedly while `isAlive` is true to retrieve new data.
*
* Note: You *must not* call any change stream methods besides `kill` and `isAlive` while the future returned from
* this method is unresolved. Doing so will result in undefined behavior.
*
* - Returns:
* An `EventLoopFuture<T?>` containing the next `T` in this change stream, an error if one occurred, or `nil` if
* there was no data.
*
* If the future evaluates to an error, it is likely one of the following:
* - `MongoError.CommandError` if an error occurs while fetching more results from the server.
* - `MongoError.LogicError` if this function is called after the change stream has died.
* - `MongoError.LogicError` if this function is called and the session associated with this change stream is
* inactive.
* - `DecodingError` if an error occurs decoding the server's response.
*/
public func tryNext() -> EventLoopFuture<T?> {
self.client.operationExecutor.execute(on: self.eventLoop) {
try self.processEvent(self.wrappedCursor.tryNext())
}
}
/**
* Consolidate the currently available results of the change stream into an array of type `T`.
*
* Since `toArray` will only fetch the currently available results, it may return more data if it is called again
* while the change stream is still alive.
*
* Note: You *must not* call any change stream methods besides `kill` and `isAlive` while the future returned from
* this method is unresolved. Doing so will result in undefined behavior.
*
* - Returns:
* An `EventLoopFuture<[T]>` evaluating to the results currently available in this change stream, or an error.
*
* If the future evaluates to an error, that error is likely one of the following:
* - `MongoError.CommandError` if an error occurs while fetching more results from the server.
* - `MongoError.LogicError` if this function is called after the change stream has died.
* - `MongoError.LogicError` if this function is called and the session associated with this change stream is
* inactive.
* - `DecodingError` if an error occurs decoding the server's responses.
*/
public func toArray() -> EventLoopFuture<[T]> {
self.client.operationExecutor.execute(on: self.eventLoop) {
try self.wrappedCursor.toArray().map(self.processEvent)
}
}
/**
* Calls the provided closure with each event in the change stream as it arrives.
*
* A thread from the driver's internal thread pool will be occupied until the returned future is completed, so
* performance degradation is possible if the number of polling change streams is too close to the total number of
* threads in the thread pool. To configure the total number of threads in the pool, set the
* `MongoClientOptions.threadPoolSize` option during client creation.
*
* Note: You *must not* call any change stream methods besides `kill` and `isAlive` while the future returned from
* this method is unresolved. Doing so will result in undefined behavior.
*
* - Returns:
* An `EventLoopFuture<Void>` which will complete once the change stream is closed or once an error is
* encountered.
*
* If the future evaluates to an error, that error is likely one of the following:
* - `MongoError.CommandError` if an error occurs while fetching more results from the server.
* - `MongoError.LogicError` if this function is called after the change stream has died.
* - `MongoError.LogicError` if this function is called and the session associated with this change stream is
* inactive.
* - `DecodingError` if an error occurs decoding the server's responses.
*/
public func forEach(_ body: @escaping (T) throws -> Void) -> EventLoopFuture<Void> {
self.client.operationExecutor.execute(on: self.eventLoop) {
while let next = try self.processEvent(self.wrappedCursor.next()) {
try body(next)
}
}
}
/**
* Kill this change stream.
*
* This method MAY be called even if there are unresolved futures created from other `ChangeStream` methods.
*
* This method MAY be called if the change stream is already dead. It will have no effect.
*
* - Warning:
* On Swift versions and platforms where structured concurrency is not available, if a change stream is alive
* when it goes out of scope, it will leak resources. On those Swift versions/platforms, you must invoke this
* method to ensure resources are properly cleaned up. If structured concurrency is available, it is not
* necessary to call this method as resources will be cleaned up automatically during deinitialization.
*
* - Returns:
* An `EventLoopFuture` that evaluates when the change stream has completed closing. This future should not fail.
*/
public func kill() -> EventLoopFuture<Void> {
self.client.operationExecutor.execute(on: self.eventLoop) {
self.wrappedCursor.kill()
}
}
#if compiler(>=5.5.2) && canImport(_Concurrency)
/// When concurrency is available, we can ensure change streams are always cleaned up properly.
deinit {
// We can't do this with an @available check on the method; see https://bugs.swift.org/browse/SR-15537.
guard #available(macOS 10.15, *) else {
return
}
let client = self.client
let el = self.eventLoop
let wrappedCursor = self.wrappedCursor
Task {
try await client.operationExecutor.execute(on: el) {
wrappedCursor.kill()
}
}
}
#endif
}