Skip to content

orlandos-nl/MongoQueue

Repository files navigation

MongoQueue

A MongoKitten based JobQueue for MongoDB.

Join our Discord for any questions and friendly banter.

Read the Docs for more info.

Quick Start

Connect to MongoDB with MongoKitten regularly:

let db = try await MongoDatabase.connect(to: "mongodb://localhost/my_database")

Select a collection for your job queue:

let queue = MongoQueue(collection: db["tasks"])

Define your jobs by conforming to ScheduledTask (and implicitly Codable):

struct RegistrationEmailTask: ScheduledTask {
    // This type has no context
    typealias ExecutionContext = Void

    // Computed property, required by ScheduledTask
    // This executed the task ASAP
    var taskExecutionDate: Date { Date() }
    
    // Stored properties represent the metadata needed to execute the task
    let recipientEmail: String
    let userId: ObjectId
    let fullName: String
    
    func execute(withContext context: ExecutionContext) async throws {
        // TODO: Send the email
        // Throwing an error triggers `onExecutionFailure`
    }
    
    func onExecutionFailure(failureContext: QueuedTaskFailure<ExecutionContext>) async throws -> TaskExecutionFailureAction {
        // Only attempt the job once. Failing to send the email cancels the job
        return .dequeue()
    }
}

Register the task to MongoQueue, before it starts.

// Context is `Void`, so we pass in a void here
queue.registerTask(RegistrationEmailTask.self, context: ())

Start the queue in the background - this is helpful in use inside HTTP applications - or when creating separate workers.

try queue.runInBackground()

Alternatively, run the queue in the foreground and block until the queue is stopped. Only use this if your queue worker is only running as a worker. I.E., it isn't serving users on the side.

try await queue.run()

Queue the task in MongoDB:

let task = RegistrationEmailTask(
  recipientEmail: "[email protected]",
  userId: ...,
  fullName: "Joannis Orlandos"
)
try await queue.queueTask(task)

Tada! Just wait for it to be executed.

Testing

You can run all currently active jobs in the queue one-by-one using:

try await queue.runUntilEmpty()

This will not run any jobs scheduled for the future, and will exit once there are no current jobs available.

Parallelisation

You can set the parallelisation amount per job queue instance using the following code:

queue.setMaxParallelJobs(to: 6)

If you have two containers running an instance of MongoQueue, it will therefore be able to run 2 * 6 = 12 jobs simultaneously.

Integrate with Vapor

To access the queue from your Vapor Request, add the following snippet:

import Vapor
import MongoKitten
import MongoQueue

extension Request {
  public var queue: MongoQueue {
    return application.queue
  }
}

private struct MongoQueueStorageKey: StorageKey {
  typealias Value = MongoQueue
}

extension Application {
  public var queue: MongoQueue {
    get {
      storage[MongoQueueStorageKey.self]!
    }
    set {
      storage[MongoQueueStorageKey.self] = newValue
    }
  }

  public func initializeMongoQueue(withCollection collection: MongoCollection) {
    self.queue = MongoQueue(collection: collection)
  }
}

From here, you can add tasks as such:

app.post("tasks") { req in
  try await req.queue.queueTask(MyCustomTask())
  return HTTPStatus.created
}

Advanced Use

Before diving into more (detailed) APIs, here's an overview of how this works:

When you queue a task, it is used to derive the basic information for queueing the job. Parts of these requirements are in the protocol, but have a default value provided by MongoQueue.

Dequeing Process

Each task has a category, a unique string identifying this task's type in the database. When you register your task with MongoQueue, the category is used to know how to decode & execute the task once it is acquired by a worker.

MongoQueue regularly checks, on a timer (and if possible with Change Streams for better responsiveness) whether a new task is ready to grab. When it pulls a task from MongoDB, it takes the highest priority task that is scheduled for execution at this date.

The priority is .normal by default, but urgency can be increased or decreased in a tasks var priority: TaskPriority { get }.

When the task is taken out of the queue, its status is set to executing. This means that other jobs can't execute this task right now. While doing so, the task model's maxTaskDuration is used as an indication of the expected duration of a task. The expected deadline is set on the model in MongoDB by adding maxTaskDuration to the current date.

If the deadline is reached, other workers can (and will) dequeue the task and put it back into scheduled. This assumes the worker has crashed. However, in cases where the task is taking an abnormal amount of time, the worker will update the deadline accordingly.

Due to this system, it is adviced to set urgent and short-lived tasks to a shorter maxTaskDuration. But take network connectivity into consideration, as setting it very low (like 5 seconds) may cause the deadline to be reached before it can be prolonged.

If the task is dequeued, your task model gets a notification in func onDequeueTask(withId taskId: ObjectId, withContext context: ExecutionContext, inQueue queue: MongoQueue) async throws.

Likewise, on execution failure you get a call on func onExecutionFailure(failureContext: QueuedTaskFailure<ExecutionContext>) async throws -> TaskExecutionFailureAction where you can decide whether to requeue, and whether to apply a maximum amount of attempts.