Run aggregation pipeline on a MongoDB collection.

yaml
type: "io.kestra.plugin.mongodb.Aggregate"

Simple aggregation pipeline to group and sum data

yaml
id: mongodb_aggregate
namespace: company.team

tasks:
  - id: aggregate
    type: io.kestra.plugin.mongodb.Aggregate
    connection:
      uri: "mongodb://root:example@localhost:27017/?authSource=admin"
    database: "my_database"
    collection: "sales"
    pipeline:
      - $match:
          status: "active"
      - $group:
          _id: "$category"
          total:
            $sum: "$amount"
          count:
            $sum: 1
      - $sort:
          total: -1

Complex aggregation with lookup and data transformation

yaml
id: mongodb_complex_aggregate
namespace: company.team

tasks:
  - id: aggregate_with_lookup
    type: io.kestra.plugin.mongodb.Aggregate
    connection:
      uri: "mongodb://root:example@localhost:27017/?authSource=admin"
    database: "my_database"
    collection: "users"
    pipeline:
      - $lookup:
          from: "orders"
          localField: "_id"
          foreignField: "userId"
          as: "userOrders"
      - $addFields:
          totalOrders:
            $size: "$userOrders"
          totalSpent:
            $sum: "$userOrders.amount"
      - $project:
          name: 1
          email: 1
          totalOrders: 1
          totalSpent: 1
      - $match:
          totalOrders:
            $gt: 0
    allowDiskUse: true
    maxTimeMs: 30000
Properties

MongoDB collection.

MongoDB connection properties.

MongoDB database.

SubType object

MongoDB aggregation pipeline

List of pipeline stages as a BSON array or list of maps

Default true

Whether to allow disk usage for stages

Enables writing to temporary files when a pipeline stage exceeds the 100 megabyte limit

Default 1000

Batch size for cursor

Sets the number of documents to return per batch

Default 60000

Maximum execution time in milliseconds

Sets the maximum execution time on the server for this operation

Default FETCH
Possible Values
STOREFETCHFETCH_ONENONE

Whether to store the data from the aggregation result into an Ion-serialized data file

List containing the aggregation results

Only populated if store parameter is set to false

The number of documents returned by the aggregation

Format uri

URI of the file containing the aggregation results

Only populated if store parameter is set to true

Unit count

Number of documents returned by the aggregation pipeline

Connection string to MongoDB server

URL format like mongodb://mongodb0.example.com: 27017