Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Docs Menu
Docs Home
/ / /
Scala Driver

Transform Your Data with Aggregation

On this page

  • Overview
  • Compare Aggregation and Find Operations
  • Limitations
  • Run Aggregation Operations
  • Filter, Group, and Count Documents
  • Explain an Aggregation
  • Run an Atlas Full-Text Search
  • Search Operator Helper Methods
  • Additional Information
  • MongoDB Server Manual
  • API Documentation

In this guide, you can learn how to use the Scala driver to perform aggregation operations.

Aggregation operations process data in your MongoDB collections and return computed results. The MongoDB Aggregation framework, which is part of the Query API, is modeled on the concept of data processing pipelines. Documents enter a pipeline that contains one or more stages, and this pipeline transforms the documents into an aggregated result.

An aggregation operation is similar to a car factory. A car factory has an assembly line, which contains assembly stations with specialized tools to do specific jobs, like drills and welders. Raw parts enter the factory, and then the assembly line transforms and assembles them into a finished product.

The aggregation pipeline is the assembly line, aggregation stages are the assembly stations, and operator expressions are the specialized tools.

The following table lists the different tasks that find operations can perform and compares them to what aggregation operations can perform. The aggregation framework provides expanded functionality that allows you to transform and manipulate your data.

Find Operations
Aggregation Operations
Select certain documents to return
Select which fields to return
Sort the results
Limit the results
Count the results
Select certain documents to return
Select which fields to return
Sort the results
Limit the results
Count the results
Rename fields
Compute new fields
Summarize data
Connect and merge data sets

Consider the following limitations when performing aggregation operations:

  • Returned documents cannot violate the BSON document size limit of 16 megabytes.

  • Pipeline stages have a memory limit of 100 megabytes by default. You can exceed this limit by passing a value of true to the allowDiskUse() method and chaining the method to aggregate().

  • The $graphLookup operator has a strict memory limit of 100 megabytes and ignores the value passed to the allowDiskUse() method.

Note

Sample Data

The examples in this guide use the restaurants collection in the sample_restaurants database from the Atlas sample datasets. To learn how to create a free MongoDB Atlas cluster and load the sample datasets, see the Get Started with Atlas guide.

To perform an aggregation, pass a list containing the pipeline stages to the aggregate() method. The Scala driver provides the Aggregates class, which includes helper methods for building pipeline stages.

To learn more about pipeline stages and their corresponding Aggregates helper methods, see the following resources:

This code example produces a count of the number of bakeries in each borough of New York. To do so, it calls the aggregate() method and passes an aggregation pipeline as a list of stages. The code builds these stages by using the following Aggregates helper methods:

  • filter(): Builds the $match stage to filter for documents that have a cuisine value of "Bakery"

  • group(): Builds the $group stage to group the matching documents by the borough field, accumulating a count of documents for each distinct value

val pipeline = Seq(Aggregates.filter(Filters.equal("cuisine", "Bakery")),
Aggregates.group("$borough", Accumulators.sum("count", 1))
)
collection.aggregate(pipeline)
.subscribe((doc: Document) => println(doc.toJson()),
(e: Throwable) => println(s"There was an error: $e"))
{"_id": "Brooklyn", "count": 173}
{"_id": "Queens", "count": 204}
{"_id": "Bronx", "count": 71}
{"_id": "Staten Island", "count": 20}
{"_id": "Missing", "count": 2}
{"_id": "Manhattan", "count": 221}

To view information about how MongoDB executes your operation, you can instruct the MongoDB query planner to explain it. When MongoDB explains an operation, it returns execution plans and performance statistics. An execution plan is a potential way in which MongoDB can complete an operation. When you instruct MongoDB to explain an operation, it returns both the plan MongoDB executed and any rejected execution plans by default.

To explain an aggregation operation, chain the explain() method to the aggregate() method. You can pass a verbosity level to explain(), which modifies the type and amount of information that the method returns. For more information about verbosity, see Verbosity Modes in the MongoDB Server manual.

The following example instructs MongoDB to explain the aggregation operation from the preceding Filter, Group, and Count Documents example. The code passes a verbosity value of ExplainVerbosity.EXECUTION_STATS to the explain() method, which configures the method to return statistics describing the execution of the winning plan:

val pipelineToExplain = Seq(Aggregates.filter(Filters.equal("cuisine", "Bakery")),
Aggregates.group("$borough", Accumulators.sum("count", 1))
)
collection.aggregate(pipelineToExplain)
.explain(ExplainVerbosity.EXECUTION_STATS)
.subscribe((doc: Document) => println(doc.toJson()),
(e: Throwable) => println(s"There was an error: $e"))
{"explainVersion": "2", "queryPlanner": {"namespace": "sample_restaurants.restaurants",
"indexFilterSet": false, "parsedQuery": {"cuisine": {"$eq": "Bakery"}}, "queryHash": "865F14C3",
"planCacheKey": "0FC225DA", "optimizedPipeline": true, "maxIndexedOrSolutionsReached": false,
"maxIndexedAndSolutionsReached": false, "maxScansToExplodeReached": false, "winningPlan":
{"queryPlan": {"stage": "GROUP", "planNodeId": 3, "inputStage": {"stage": "COLLSCAN",
"planNodeId": 1, "filter": {"cuisine": {"$eq": "Bakery"}}, "direction": "forward"}},
...}

Tip

Only Available on Atlas for MongoDB v4.2 and later

This aggregation pipeline operator is only available for collections hosted on MongoDB Atlas clusters running v4.2 or later that are covered by an Atlas Search index.

To specify a full-text search of one or more fields, you can create a $search pipeline stage. The Scala driver provides the Aggregates.search() helper method to create this stage. The search() method requires the following arguments:

  • SearchOperator instance: Specifies the field and text to search for.

  • SearchOptions instance: Specifies options to customize the full-text search. You must set the index option to the name of the Atlas Search index to use.

This example creates pipeline stages to perform the following actions:

  • Search the name field for text that contains the word "Salt"

  • Project only the _id and name values of matching documents

val operator = SearchOperator.text(SearchPath.fieldPath("name"), "Salt")
val options = searchOptions().index("<search index name>")
val pipeline = Seq(Aggregates.search(operator, options),
Aggregates.project(Projections.include("name")))
collection.aggregate(pipeline)
.subscribe((doc: Document) => println(doc.toJson()),
(e: Throwable) => println(s"There was an error: $e"))
{"_id": {"$oid": "..."}, "name": "Fresh Salt"}
{"_id": {"$oid": "..."}, "name": "Salt & Pepper"}
{"_id": {"$oid": "..."}, "name": "Salt + Charcoal"}
{"_id": {"$oid": "..."}, "name": "A Salt & Battery"}
{"_id": {"$oid": "..."}, "name": "Salt And Fat"}
{"_id": {"$oid": "..."}, "name": "Salt And Pepper Diner"}

Important

To run the preceding example, you must create an Atlas Search index on the restaurants collection that covers the name field. Then, replace the "<search index name>" placeholder with the name of the index. To learn more about Atlas Search indexes, see the Atlas Search Indexes guide.

The Scala driver provides helper methods for the following operators:

Operator
Description

Performs a search for a word or phrase that contains a sequence of characters from an incomplete input string.

Combines two or more operators into a single query.

Checks whether a field matches a value you specify. Maps to the equals() and equalsNull() methods

Tests if a path to a specified indexed field name exists in a document.

Performs a search for an array of BSON number, date, boolean, objectId, uuid, or string values at the given path and returns documents where the value of the field equals any value in the specified array.

Returns documents similar to input documents.

Supports querying and scoring numeric, date, and GeoJSON point values.

Performs a search for documents containing an ordered sequence of terms using the analyzer specified in the index configuration.

Supports querying a combination of indexed fields and values.

Supports querying and scoring numeric, date, and string values. Maps to the numberRange() and dateRange() methods

Interprets the query field as a regular expression.

Performs a full-text search using the analyzer that you specify in the index configuration.

Enables queries which use special characters in the search string that can match any character.

Note

Atlas Sample Dataset

This example uses the sample_mflix.movies collection from the Atlas sample datasets. To learn how to set up a free-tier Atlas cluster and load the sample dataset, see the Get Started with Atlas tutorial in the Atlas documentation.

Before you can run this example, you must create an Atlas Search index on the movies collection that has the following definition:

{
"mappings": {
"dynamic": true,
"fields": {
"title": {
"analyzer": "lucene.keyword",
"type": "string"
},
"genres": {
"normalizer": "lowercase",
"type": "token"
}
}
}
}

To learn more about creating Atlas Search indexes, see the Atlas Search Indexes guide.

The following code creates a $search stage that has the following specifications:

  • Checks that the genres array includes "Comedy"

  • Searches the fullplot field for the phrase "new york"

  • Matches year values between 1950 and 2000, inclusive

  • Searches for title values that begins with the term "Love"

val searchStage = Aggregates.search(
SearchOperator.compound()
.must(
Iterable(
SearchOperator.in(fieldPath("genres"), List("Comedy")),
SearchOperator.phrase(fieldPath("fullplot"), "new york"),
SearchOperator.numberRange(fieldPath("year")).gtLt(1950, 2000),
SearchOperator.wildcard("Love *", fieldPath("title")),
).asJava
)
)
val projectStage = Aggregates.project(
Projections.include("title", "year", "genres"))
val aggregatePipelineStages = Seq(searchStage, projectStage)
collection.aggregate(aggregatePipelineStages)
.subscribe((doc: Document) => println(doc.toJson()),
(e: Throwable) => println(s"There was an error: $e"))
{"_id": ..., "genres": ["Comedy", "Romance"], "title": "Love at First Bite", "year": 1979}
{"_id": ..., "genres": ["Comedy", "Drama"], "title": "Love Affair", "year": 1994}

To learn more about the Atlas Search helper methods, see the SearchOperator interface reference in the Driver Core API documentation.

To learn more about the topics discussed in this guide, see the following pages in the MongoDB Server manual:

To learn more about the methods and types discussed in this guide, see the following API documentation:

Back

Cluster Monitoring