-
Notifications
You must be signed in to change notification settings - Fork 65
/
Copy pathMongoClient.swift
386 lines (360 loc) · 17 KB
/
MongoClient.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
import MongoSwift
import NIO
/// A MongoDB Client providing a synchronous API.
public class MongoClient {
/// Encoder whose options are inherited by databases derived from this client.
public var encoder: BSONEncoder { self.asyncClient.encoder }
/// Decoder whose options are inherited by databases derived from this client.
public var decoder: BSONDecoder { self.asyncClient.decoder }
/// The read concern set on this client, or nil if one is not set.
public var readConcern: ReadConcern? { self.asyncClient.readConcern }
/// The `ReadPreference` set on this client.
public var readPreference: ReadPreference { self.asyncClient.readPreference }
/// The write concern set on this client, or nil if one is not set.
public var writeConcern: WriteConcern? { self.asyncClient.writeConcern }
/// The `EventLoopGroup` being used by the underlying async client.
private let eventLoopGroup: MultiThreadedEventLoopGroup
/// The underlying async client.
internal let asyncClient: MongoSwift.MongoClient
/**
* Create a new client connection to a MongoDB server. For options that are included in both the
* `MongoConnectionString` and the `MongoClientOptions` struct, the final value is set in descending order of
* priority: the value specified in `MongoClientOptions` (if non-nil), the value specified in the
* `MongoConnectionString`, or the default value if both are unset.
*
* - Parameters:
* - connectionString: the connection string to connect to.
* - options: optional `MongoClientOptions` to use for this client.
*
* - Throws:
* - A `MongoError.InvalidArgumentError` if the connection string passed in is improperly formatted.
* - A `MongoError.InvalidArgumentError` if the connection string specifies the use of TLS but libmongoc was not
* built with TLS support.
*/
public init(_ connectionString: MongoConnectionString, options: MongoClientOptions? = nil) throws {
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 5)
do {
self.asyncClient = try MongoSwift.MongoClient(connectionString, using: eventLoopGroup, options: options)
self.eventLoopGroup = eventLoopGroup
} catch {
try eventLoopGroup.syncShutdownGracefully()
throw error
}
}
/**
* Create a new client connection to a MongoDB server. For options that are included in both the connection string
* URI and the `MongoClientOptions` struct, the final value is set in descending order of priority: the value
* specified in `MongoClientOptions` (if non-nil), the value specified in the URI, or the default value if both are
* unset.
*
* - Parameters:
* - connectionString: the connection string to connect to.
* - options: optional `MongoClientOptions` to use for this client.
*
* - SeeAlso: https://docs.mongodb.com/manual/reference/connection-string/
*
* - Throws:
* - A `MongoError.InvalidArgumentError` if the connection string passed in is improperly formatted.
* - A `MongoError.InvalidArgumentError` if the connection string specifies the use of TLS but libmongoc was not
* built with TLS support.
*/
public convenience init(
_ connectionString: String = "mongodb://localhost:27017",
options: MongoClientOptions? = nil
) throws {
let connString = try MongoConnectionString(string: connectionString)
try self.init(connString, options: options)
}
deinit {
do {
try self.asyncClient.syncClose()
} catch {
assertionFailure("Error closing async client: \(error)")
}
do {
try eventLoopGroup.syncShutdownGracefully()
} catch {
assertionFailure("Error shutting down event loop group: \(error)")
}
}
/**
* Starts a new `ClientSession` with the provided options.
*
* This session must be explicitly as an argument to each command that should be executed as part of the session.
*
* `ClientSession`s are _not_ thread safe so you must ensure the returned session is not used concurrently for
* multiple operations.
*/
public func startSession(options: ClientSessionOptions? = nil) -> ClientSession {
ClientSession(client: self, options: options)
}
/**
* Starts a new `ClientSession` with the provided options and passes it to the provided closure. The session must
* be explicitly passed as an argument to each command within the closure that should be executed as part of the
* session.
*
* The session is only valid within the body of the closure and will be ended after the body completes.
*
* `ClientSession`s are _not_ thread safe so you must ensure the session is not used concurrently for multiple
* operations.
*
* - Parameters:
* - options: Options to use when creating the session.
* - sessionBody: A closure which takes in a `ClientSession` and returns a `T`.
*
* - Throws:
* - `RuntimeError.CompatibilityError` if the deployment does not support sessions.
*/
public func withSession<T>(
options: ClientSessionOptions? = nil,
_ sessionBody: (ClientSession) throws -> T
) rethrows -> T {
let session = self.startSession(options: options)
defer { session.end() }
return try sessionBody(session)
}
/**
* Run the `listDatabases` command.
*
* - Parameters:
* - filter: Optional `Document` specifying a filter that the listed databases must pass. This filter can be based
* on the "name", "sizeOnDisk", "empty", or "shards" fields of the output.
* - options: Optional `ListDatabasesOptions` specifying options for listing databases.
* - session: Optional `ClientSession` to use when executing this command.
*
* - Returns: A `[DatabaseSpecification]` containing the databases matching provided criteria.
*
* - Throws:
* - `MongoError.LogicError` if the provided session is inactive.
* - `EncodingError` if an error is encountered while encoding the options to BSON.
* - `MongoError.CommandError` if options.authorizedDatabases is false and the user does not have listDatabases
* permissions.
*
* - SeeAlso: https://docs.mongodb.com/manual/reference/command/listDatabases/
*/
public func listDatabases(
_ filter: BSONDocument? = nil,
options: ListDatabasesOptions? = nil,
session: ClientSession? = nil
) throws -> [DatabaseSpecification] {
try self.asyncClient.listDatabases(filter, options: options, session: session?.asyncSession).wait()
}
/**
* Get a list of `MongoDatabase`s.
*
* - Parameters:
* - filter: Optional `Document` specifying a filter on the names of the returned databases.
* - options: Optional `ListDatabasesOptions` specifying options for listing databases.
* - session: Optional `ClientSession` to use when executing this command
*
* - Returns: An Array of `MongoDatabase`s that match the provided filter.
*
* - Throws:
* - `MongoError.LogicError` if the provided session is inactive.
* - `MongoError.CommandError` if options.authorizedDatabases is false and the user does not have listDatabases
* permissions.
*/
public func listMongoDatabases(
_ filter: BSONDocument? = nil,
options: ListDatabasesOptions? = nil,
session: ClientSession? = nil
) throws -> [MongoDatabase] {
try self.listDatabaseNames(filter, options: options, session: session).map { self.db($0) }
}
/**
* Get a list of names of databases.
*
* - Parameters:
* - filter: Optional `Document` specifying a filter on the names of the returned databases.
* - options: Optional `ListDatabasesOptions` specifying options for listing databases.
* - session: Optional `ClientSession` to use when executing this command
*
* - Returns: A `[String]` containing names of databases that match the provided filter.
*
* - Throws:
* - `MongoError.LogicError` if the provided session is inactive.
* - `MongoError.CommandError` if options.authorizedDatabases is false and the user does not have listDatabases
* permissions.
*/
public func listDatabaseNames(
_ filter: BSONDocument? = nil,
options: ListDatabasesOptions? = nil,
session: ClientSession? = nil
) throws -> [String] {
try self.asyncClient.listDatabaseNames(filter, options: options, session: session?.asyncSession).wait()
}
/**
* Gets a `MongoDatabase` instance for the given database name. If an option is not specified in the optional
* `MongoDatabaseOptions` param, the database will inherit the value from the parent client or the default if
* the client’s option is not set. To override an option inherited from the client (e.g. a read concern) with the
* default value, it must be explicitly specified in the options param (e.g. ReadConcern.serverDefault, not nil).
*
* - Parameters:
* - name: the name of the database to retrieve
* - options: Optional `MongoDatabaseOptions` to use for the retrieved database
*
* - Returns: a `MongoDatabase` corresponding to the provided database name
*/
public func db(_ name: String, options: MongoDatabaseOptions? = nil) -> MongoDatabase {
MongoDatabase(client: self, asyncDB: self.asyncClient.db(name, options: options))
}
/**
* Starts a `ChangeStream` on a `MongoClient`. Allows the client to observe all changes in a cluster -
* excluding system collections and the "config", "local", and "admin" databases.
*
* - Parameters:
* - pipeline: An array of aggregation pipeline stages to apply to the events returned by the change stream.
* - options: An optional `ChangeStreamOptions` to use when constructing the change stream.
* - session: An optional `ClientSession` to use with this change stream.
*
* - Returns: a `ChangeStream` on all collections in all databases in a cluster.
*
* - Throws:
* - `MongoError.CommandError` if an error occurs on the server while creating the change stream.
* - `MongoError.InvalidArgumentError` if the options passed formed an invalid combination.
* - `MongoError.InvalidArgumentError` if the `_id` field is projected out of the change stream documents by the
* pipeline.
*
* - SeeAlso:
* - https://docs.mongodb.com/manual/changeStreams/
* - https://docs.mongodb.com/manual/meta/aggregation-quick-reference/
* - https://docs.mongodb.com/manual/reference/system-collections/
*
* - Note: Supported in MongoDB version 4.0+ only.
*/
public func watch(
_ pipeline: [BSONDocument] = [],
options: ChangeStreamOptions? = nil,
session: ClientSession? = nil
) throws -> ChangeStream<ChangeStreamEvent<BSONDocument>> {
try self.watch(
pipeline,
options: options,
session: session,
withEventType: ChangeStreamEvent<BSONDocument>.self
)
}
/**
* Starts a `ChangeStream` on a `MongoClient`. Allows the client to observe all changes in a cluster -
* excluding system collections and the "config", "local", and "admin" databases. Associates the specified
* `Codable` type `T` with the `fullDocument` field in the `ChangeStreamEvent`s emitted by the returned
* `ChangeStream`.
*
* - Parameters:
* - pipeline: An array of aggregation pipeline stages to apply to the events returned by the change stream.
* - options: An optional `ChangeStreamOptions` to use when constructing the change stream.
* - session: An optional `ClientSession` to use with this change stream.
* - withFullDocumentType: The type that the `fullDocument` field of the emitted `ChangeStreamEvent`s will be
* decoded to.
*
* - Returns: A `ChangeStream` on all collections in all databases in a cluster.
*
* - Throws:
* - `MongoError.CommandError` if an error occurs on the server while creating the change stream.
* - `MongoError.InvalidArgumentError` if the options passed formed an invalid combination.
* - `MongoError.InvalidArgumentError` if the `_id` field is projected out of the change stream documents by the
* pipeline.
*
* - SeeAlso:
* - https://docs.mongodb.com/manual/changeStreams/
* - https://docs.mongodb.com/manual/meta/aggregation-quick-reference/
* - https://docs.mongodb.com/manual/reference/system-collections/
*
* - Note: Supported in MongoDB version 4.0+ only.
*/
public func watch<FullDocType: Codable>(
_ pipeline: [BSONDocument] = [],
options: ChangeStreamOptions? = nil,
session: ClientSession? = nil,
withFullDocumentType _: FullDocType.Type
) throws -> ChangeStream<ChangeStreamEvent<FullDocType>> {
try self.watch(
pipeline,
options: options,
session: session,
withEventType: ChangeStreamEvent<FullDocType>.self
)
}
/**
* Starts a `ChangeStream` on a `MongoClient`. Allows the client to observe all changes in a cluster -
* excluding system collections and the "config", "local", and "admin" databases. Associates the specified
* `Codable` type `T` with the returned `ChangeStream`.
*
* - Parameters:
* - pipeline: An array of aggregation pipeline stages to apply to the events returned by the change stream.
* - options: An optional `ChangeStreamOptions` to use when constructing the change stream.
* - session: An optional `ClientSession` to use with this change stream.
* - withEventType: The type that the entire change stream response will be decoded to and that will be returned
* when iterating through the change stream.
*
* - Returns: A `ChangeStream` on all collections in all databases in a cluster.
*
* - Throws:
* - `MongoError.CommandError` if an error occurs on the server while creating the change stream.
* - `MongoError.InvalidArgumentError` if the options passed formed an invalid combination.
* - `MongoError.InvalidArgumentError` if the `_id` field is projected out of the change stream documents by the
* pipeline.
*
* - SeeAlso:
* - https://docs.mongodb.com/manual/changeStreams/
* - https://docs.mongodb.com/manual/meta/aggregation-quick-reference/
* - https://docs.mongodb.com/manual/reference/system-collections/
*
* - Note: Supported in MongoDB version 4.0+ only.
*/
public func watch<EventType: Codable>(
_ pipeline: [BSONDocument] = [],
options: ChangeStreamOptions? = nil,
session: ClientSession? = nil,
withEventType _: EventType.Type
) throws -> ChangeStream<EventType> {
let asyncStream = try self.asyncClient.watch(
pipeline,
options: options,
session: session?.asyncSession,
withEventType: EventType.self
).wait()
return ChangeStream(wrapping: asyncStream, client: self)
}
/**
* Attach a `CommandEventHandler` that will receive `CommandEvent`s emitted by this client.
*
* Note: the client stores a weak reference to this handler, so it must be kept alive separately in order for it
* to continue to receive events.
*/
public func addCommandEventHandler<T: CommandEventHandler>(_ handler: T) {
self.asyncClient.addCommandEventHandler(handler)
}
/**
* Attach a callback that will receive `CommandEvent`s emitted by this client.
*
* Note: if the provided callback captures this client, it must do so weakly. Otherwise, it will constitute a
* strong reference cycle and potentially result in memory leaks.
*/
public func addCommandEventHandler(_ handlerFunc: @escaping (CommandEvent) -> Void) {
self.asyncClient.addCommandEventHandler(handlerFunc)
}
/**
* Attach an `SDAMEventHandler` that will receive `SDAMEvent`s emitted by this client.
*
* Note: the client stores a weak reference to this handler, so it must be kept alive separately in order for it
* to continue to receive events.
*/
public func addSDAMEventHandler<T: SDAMEventHandler>(_ handler: T) {
self.asyncClient.addSDAMEventHandler(handler)
}
/**
* Attach a callback that will receive `SDAMEvent`s emitted by this client.
*
* Note: if the provided callback captures this client, it must do so weakly. Otherwise, it will constitute a
* strong reference cycle and potentially result in memory leaks.
*/
public func addSDAMEventHandler(_ handlerFunc: @escaping (SDAMEvent) -> Void) {
self.asyncClient.addSDAMEventHandler(handlerFunc)
}
}
extension MongoClient: Equatable {
public static func == (lhs: MongoClient, rhs: MongoClient) -> Bool {
lhs.asyncClient == rhs.asyncClient
}
}