В этом руководстве объясняются структуры данных очередей и демонстрируются системы очередей. Очереди часто используются для обработки длительных задач, таких как доставка новостной рассылки по электронной почте. Ниже вы создадите простую очередь задач Node.
Не всегда практично выполнять задачу в тот момент, когда она запрошена.
Рассмотрим систему администрирования новостной рассылки по электронной почте. После написания администратор должен нажать большую красную кнопку «ОТПРАВИТЬ СЕЙЧАС». Приложение может немедленно отправлять каждое электронное письмо и показывать «завершенный» ответ. Это сработает для дюжины сообщений, но сколько времени потребуется для 1000 подписчиков или более? Время запроса браузера истечет до завершения процесса.
Другой пример: пользователь может загрузить любое количество фотографий в
В таких ситуациях более эффективно разделять задачи. Пользователь получает мгновенный ответ, но обработка задачи происходит в фоновом режиме. Другие приложения или серверы обрабатывают задачи и планируют повторные попытки в случае сбоя. Пользователь может получать оповещения или просматривать журналы, чтобы определить ход выполнения.
Что такое структуры данных очереди?
Очередь — это структура данных, которая содержит набор элементов:
Любой процесс может отправить (или поставить в очередь) элемент в любое время — например, отправить информационный бюллетень X получателю Y.
Любой процесс может получить (или удалить из очереди) элемент в начале очереди — например, элемент, который находился в очереди дольше всего.
Структуры данных очереди представляют собой структуру «первым поступил — первым обслужен» (FIFO). Первый элемент, добавленный в очередь, будет первым удаленным.
Базовая структура данных очереди задач JavaScript
Вы можете создать очередь задач, используя массив JavaScript. Метод добавляет элемент в конец массива, а метод push () удаляет и возвращает элемент с начала: shift ()
const queue = [];
queue.push ('item 1') ;
queue.push ('item 2') ;
console.log (queue.shift ()); // item 1
console.log (queue.shift ()); // item 2
console.log (queue.shift ()); // undefined
Ваши структуры данных очереди могут содержать любые данные в отдельных элементах массива. Вы можете передавать строки, числа, логические значения, другие массивы или объекты.
Вы можете использовать класс ES6 для определения любого количества отдельных очередей:
class Queue {
constructor () { this.q = []; }
send (item) { this.q.push (item) ; }
receive () { return this.q.shift () ; }
}
// define two queues
const q1 = new Queue () ;
const q2 = new Queue () ;
q1.send ('item 1') ;
q2.send ('item 2') ;
console.log (q1.receive ()); // item 1
console.log (q1.receive ()); // undefined
console.log (q2.receive ()); // item 2
Эти простые структуры данных очереди могут быть полезны для менее важного кода на стороне клиента, например для постановки в очередь обновлений пользовательского интерфейса, чтобы обработка выполнялась в одном обновлении DOM. localStorage или IndexedDB могут предложить определенный уровень сохранения данных, если это необходимо.
Платформы очереди
Очереди в памяти менее практичны для сложных серверных приложений:
Два или более отдельных приложения не могут (легко) получить доступ к одной и той же очереди.
Данные очереди исчезают при завершении работы приложения.
Специально созданное программное обеспечение брокера сообщений обеспечивает более надежную организацию очередей. Платформы различаются, но предлагают такие функции, как:
сохранение данных в ряде баз данных с опциями репликации, сегментирования и кластеризации
ряд протоколов доступа, часто включая HTTP и
любое количество отдельных очередей
отложенный обмен сообщениями, когда обработка сообщений может происходить позже
поддержка, подобная транзакции, когда сообщение повторно ставится в очередь, если обработка не подтверждена
Шаблоны
Программное обеспечение брокера сообщений включает Redis, RabbitMQ, Apache ActiveMQ и Gearman. Облачные службы обмена сообщениями включают Amazon SQS, Azure Service Bus и Google Pub/Sub.
Это могут быть жизнеспособные варианты для приложений корпоративного уровня. Тем не менее, они могут быть излишними, если у вас более простые требования и вы уже используете базу данных.
Используйте MongoDB в качестве брокера сообщений нашей очереди задач узла
Можно разработать сложную систему очередей задач Node для управления структурами данных очередей в паре сотен строк кода.
Описанный
Node Task Queue Project: начало работы
Убедитесь, что у вас установлен Node.js 14 или более поздней версии, а затем создайте новую папку проекта, например
{
«name»: «
«version»: «1.0.0»,
«description»: «Queue test»,
«type»: «module»,
«scripts»: {
«send»: «node. /send.js»,
«receive»: «node. /receive.js»
}
}
Примечание: «type»: «module»настраивает проект на использование модулей ES6. Будет «scripts»отправлять и получать элементы в очереди.
Установите модуль
npm install @craigbuckler/
Затем создайте.envфайл с учетными данными для подключения к базе данных MongoDB. Например:
QUEUE_DB_HOST=localhost
QUEUE_DB_PORT=27017
QUEUE_DB_USER=root
QUEUE_DB_PASS=mysecret
QUEUE_DB_NAME=qdb
QUEUE_DB_COLL=queue
Примечание: это создает queueколлекцию (QUEUE_DB_COLL) в qdbбазе данных (QUEUE_DB_NAME). Вы можете использовать существующую базу данных, но убедитесь, что коллекция не конфликтует с другой коллекцией.
Доступ к базе данных для чтения/записи должен быть предоставлен пользователю root (QUEUE_DB_USER) с паролем mysecret (QUEUE_DB_PASS). Установите оба значения пустыми, если аутентификация не требуется.
Запустите базу данных MongoDB, если она еще не запущена. Те, у кого есть Docker и Docker Compose, могут создать новый
version: '3'
services:
queuedb:
environment:
— MONGO_INITDB_ROOT_USERNAME=${QUEUE_DB_USER}
— MONGO_INITDB_ROOT_PASSWORD=${QUEUE_DB_PASS}
image: mongo:4.
container_name: queuedb
volumes:
— queuedata: /data/db
ports:
— «${QUEUE_DB_PORT}: ${QUEUE_DB_PORT}»
restart: always
volumes:
queuedata:
Затем выполните
Docker доступен для Linux, macOS и Windows 10. См. инструкции по установке Docker.
Создайте новый send.jsфайл, чтобы добавить случайно сгенерированные сообщения электронной почты в очередь с именем news:
// Queue module
import { Queue } from '@craigbuckler/
// initialize queue named 'news’
const newsQ = new Queue ('news’) ;
// random name
const name = String.fromCharCode (65 + Math.random () * 26).repeat (1 + Math.random () * 10) ;
// add object to queue
const send = await newsQ.send ({
name: name,
email: `${ name.toLowerCase () }@test.com`,
date: new Date (),
message: `Hey there, ${ name }! `
}) ;
console.log ('send’, send) ;
// get number of items remaining in queue
console.log ('items queued:', await newsQ.count ());
// close connection and quit
await newsQ.close () ;
Выполните его с помощью npm run send, и вы увидите такой вывод:
send {
_id: 607d692563bd6d05bb459931,
sent:
data: {
name: 'AAA’,
email: 'aaa@test.com’,
date:
message: 'Hey there, AAA!'
}
}
items queued: 1
Метод.send () возвращает qItemобъект, содержащий:
документ MongoDB_id
дата/время, когда элемент изначально был поставлен в очередь, и
копия сообщенияdata
Запустите сценарий любое количество раз, чтобы добавить дополнительные элементы в очередь. Будет items queuedувеличиваться при каждом запуске.
Теперь создайте новый receive.jsфайл для получения сообщений из той же очереди задач Node:
// Queue module
import { Queue } from '@craigbuckler/
// initialize queue named 'news’
const newsQ = new Queue ('news’) ;
let qItem;
do {
qItem = await newsQ.receive () ;
if (qItem) {
console.log ('\nreceive’, qItem) ;
//... process qItem.data...
//... to send email...
}
} while (qItem) ;
// number of items remaining in queue
console.log ('items queued:', await newsQ.count ());
await newsQ.close () ;
Запустите npm run receive, чтобы получить и обработать элементы в очереди:
receive {
_id: 607d692563bd6d05bb459931,
sent:
data: {
name: 'AAA’,
email: 'aaa@test.com’,
date:
message: 'Hey there, AAA!'
}
}
items queued: 0
В этом примере электронная почта не отправляется, но это можно реализовать с помощью Nodemailer или другого подходящего модуля.
Если обработка не удалась — возможно,
newsQ.send (qItem.data, 600) ;
Второй 600аргумент — это необязательное количество секунд или будущая дата. Эта команда повторно ставит элемент в очередь по истечении 600 секунд (десяти минут).
Это простой пример, но любое приложение может отправлять данные в любое количество очередей. Другой процесс, возможно запущенный как cronзадание, может получать и обрабатывать элементы, когда это необходимо.
Как
Строка type, переданная конструктору класса, определяет имя очереди. Метод.send () создает новый документ MongoDB при передаче данных для добавления в очередь. Документ MongoDB содержит:
MongoDB _id (дата/время создания закодированы в значении).
Очередь type.
Значение даты/времени обработки с именем proc. Можно установить будущее время, но по умолчанию используется текущее время.
Пункт data. Это может быть что угодно: логическое значение, число, строка, массив, объект и так далее.
Метод.receive () находит самый старый документ, который имеет совпадение typeи procдату/время в прошлом. Документ форматируется, возвращается вызывающему коду и удаляется из базы данных.
Следующие разделы описывают модуль более подробно.
При необходимости dotenvмодуль считывает.envпеременные окружения. Объект подключения к базе данных создается с помощью официального mongodbмодуля драйвера:
// modules
import dotenv from 'dotenv’;
import mongoDB from 'mongodb’;
// environment variables
if (! process.env.QUEUE_DB_HOST) {
dotenv.config () ;
}
// MongoDB database client
const
dbName = process.env.QUEUE_DB_NAME || 'qdb’,
qCollectionName = process.env.QUEUE_DB_COLL || 'queue’,
qAuth = process.env.QUEUE_DB_USER? `${ process.env.QUEUE_DB_USER }: ${ process.env.QUEUE_DB_PASS || '' }@`: '',
dbClient = new mongoDB.MongoClient (
`mongodb: //${ qAuth }${ process.env.QUEUE_DB_HOST || 'localhost’ }: ${ process.env.QUEUE_DB_PORT || '27017' }/`,
{ useNewUrlParser: true, useUnifiedTopology: true }
) ;
Переменная qCollectionсодержит ссылку на коллекцию очередей базы данных (определяется параметром QUEUE_DB_COLL). Он создается и возвращается dbConnect () функцией, которая также определяет схему коллекции и при необходимости индексирует ее. Все Queueметоды запускаются const q = await dbConnect () ;для получения ссылки на коллекцию:
let qCollection; // queue collection
// shared connection
async function dbConnect () {
// collection available
if (qCollection) return qCollection;
// connect to database
await dbClient.connect () ;
// collection defined?
const
db = dbClient.db (dbName),
colList = await db.listCollections ({ name: qCollectionName }, { nameOnly: true }).toArray () ;
if (! colList.length) {
// define collection schema
let $jsonSchema = {
bsonType: 'object’,
required: [ 'type’, 'proc’, 'data’ ],
properties: {
type: { bsonType: 'string’, minLength: 1 },
proc: { bsonType: 'date’ }
}
};
await db.createCollection (qCollectionName, { validator: { $jsonSchema } }) ;
// define indexes
await db.collection (qCollectionName).createIndexes ([
{ key: { type: 1 } },
{ key: { proc: 1 } }
]) ;
}
// return queue collection
qCollection = db.collection (qCollectionName) ;
return qCollection;
}
Функция dbClose () закрывает соединение с базой данных:
// close MongoDB database connection
async function dbClose () {
if (qCollection) {
await dbClient.close () ;
qCollection = null;
}
}
Конструктор Queueзадает очередь typeили имя:
export class Queue {
constructor (type = 'DEFAULT’) {
this.type = type;
}
Метод.send () добавляет данные в очередь с соответствующим type. У него есть необязательный delayUntilпараметр, который добавляет элемент в очередь в будущем, указав количество секунд или Date ().
Метод вставляет новый документ в базу данных и возвращает qItemобъект ({ _id, sent, data}) или в nullслучае неудачи:
async send (data = null, delayUntil) {
try {
// calculate start date/time
let proc = new Date () ;
if (delayUntil instanceof Date) {
proc = delayUntil;
}
else if (! isNaN (delayUntil)) {
proc = new Date (+proc + delayUntil * 1000) ;
}
// add item to queue
const
q = await dbConnect (),
ins = await q.insertOne ({
type: this.type, proc, data
}) ;
// return qItem
return ins && ins.insertedCount && ins.insertedId? { _id: ins.insertedId, sent: ins.insertedId.getTimestamp (), data }: null;
}
catch (err) {
console.log (`Queue.send error: \n${ err }`) ;
return null;
}
}
Метод.receive () извлекает и удаляет самый старый элемент очереди в базе данных с определенной typeдатой proc/временем в прошлом. Он возвращает qItemобъект ({ _id, sent, data}) или nullесли ничего не доступно или возникает ошибка:
async receive () {
try {
// find and delete next item on queue
const
now = new Date (),
q = await dbConnect (),
rec = await q.findOneAndDelete (
{
type: this.type,
proc: { $lt: now }
},
{
sort: { proc: 1 }
}
) ;
const v = rec && rec.value;
// return qItem
return v? { _id: v. _id, sent: v. _id.getTimestamp (), data: v.data }: null;
}
catch (err) {
console.log (`Queue.receive error: \n${ err }`) ;
return null;
}
}
Метод.remove () удаляет элемент из очереди, идентифицированный qItemобъектом ({ _id, sent, data}), возвращенным.send () методом. Его можно использовать для удаления элемента из очереди независимо от его положения в очереди.
Метод возвращает количество удаленных документов (обычно 1) или nullпри возникновении ошибки:
async remove (qItem) {
// no item to remove
if (! qItem ||! qItem. _id) return null;
try {
const
q = await dbConnect (),
del = await q.deleteOne ({ _id: qItem. _id }) ;
return del.deletedCount;
}
catch (err) {
console.log (`Queue.remove error: \n${ err }`) ;
return null;
}
}
Метод.purge () удаляет все одинаковые элементы в очереди typeи возвращает количество удалений:
async purge () {
try {
const
q = await dbConnect (),
del = await q.deleteMany ({ type: this.type }) ;
return del.deletedCount;
}
catch (err) {
console.log (`Queue.purge error: \n${ err }`) ;
return null;
}
}
Метод.count () возвращает количество одинаковых элементов в очереди type:
async count () {
try {
const q = await dbConnect () ;
return await q.countDocuments ({ type: this.type }) ;
}
catch (err) {
console.log (`Queue.count error: \n${ err }`) ;
return null;
}
}
Метод.close () запускает dbClose () функцию для завершения соединения с базой данных, чтобы цикл обработки событий Node.js мог завершиться:
async close () {
try {
await dbClose () ;
}
catch (err) {
console.log (`Queue.close error: \n${ err }`) ;
return null;
}
}
// end of class
}
Новая очередь
Очереди являются проблемой для любого