Создание сайтов в Горловке, ДНР. Структуры данных очереди: как построить очередь задач узла

 
 

В этом руководстве объясняются структуры данных очередей и демонстрируются системы очередей. Очереди часто используются для обработки длительных задач, таких как доставка новостной рассылки по электронной почте. Ниже вы создадите простую очередь задач Node.

Не всегда практично выполнять задачу в тот момент, когда она запрошена.

Рассмотрим систему администрирования новостной рассылки по электронной почте. После написания администратор должен нажать большую красную кнопку «ОТПРАВИТЬ СЕЙЧАС». Приложение может немедленно отправлять каждое электронное письмо и показывать «завершенный» ответ. Это сработает для дюжины сообщений, но сколько времени потребуется для 1000 подписчиков или более? Время запроса браузера истечет до завершения процесса.

Другой пример: пользователь может загрузить любое количество фотографий в приложение-галерею. Система изменяет размер и повышает резкость каждого изображения для альтернативных размеров. Этот процесс может выполняться при загрузке, но для каждого изображения будет задержка.

В таких ситуациях более эффективно разделять задачи. Пользователь получает мгновенный ответ, но обработка задачи происходит в фоновом режиме. Другие приложения или серверы обрабатывают задачи и планируют повторные попытки в случае сбоя. Пользователь может получать оповещения или просматривать журналы, чтобы определить ход выполнения.

Что такое структуры данных очереди?

Очередь — это структура данных, которая содержит набор элементов:

Любой процесс может отправить (или поставить в очередь) элемент в любое время — например, отправить информационный бюллетень X получателю Y.

Любой процесс может получить (или удалить из очереди) элемент в начале очереди — например, элемент, который находился в очереди дольше всего.

Структуры данных очереди представляют собой структуру «первым поступил — первым обслужен» (FIFO). Первый элемент, добавленный в очередь, будет первым удаленным.

Базовая структура данных очереди задач JavaScript

Вы можете создать очередь задач, используя массив JavaScript. Метод добавляет элемент в конец массива, а метод push () удаляет и возвращает элемент с начала: shift ()

const queue = [];

queue.push ('item 1') ;

queue.push ('item 2') ;

console.log (queue.shift ()); // item 1

console.log (queue.shift ()); // item 2

console.log (queue.shift ()); // undefined

Ваши структуры данных очереди могут содержать любые данные в отдельных элементах массива. Вы можете передавать строки, числа, логические значения, другие массивы или объекты.

Вы можете использовать класс ES6 для определения любого количества отдельных очередей:

class Queue {

constructor () { this.q = []; }

send (item) { this.q.push (item) ; }

receive () { return this.q.shift () ; }

}

// define two queues

const q1 = new Queue () ;

const q2 = new Queue () ;

q1.send ('item 1') ;

q2.send ('item 2') ;

console.log (q1.receive ()); // item 1

console.log (q1.receive ()); // undefined

console.log (q2.receive ()); // item 2

Эти простые структуры данных очереди могут быть полезны для менее важного кода на стороне клиента, например для постановки в очередь обновлений пользовательского интерфейса, чтобы обработка выполнялась в одном обновлении DOM. localStorage или IndexedDB могут предложить определенный уровень сохранения данных, если это необходимо.

Платформы очереди

Очереди в памяти менее практичны для сложных серверных приложений:

Два или более отдельных приложения не могут (легко) получить доступ к одной и той же очереди.

Данные очереди исчезают при завершении работы приложения.

Специально созданное программное обеспечение брокера сообщений обеспечивает более надежную организацию очередей. Платформы различаются, но предлагают такие функции, как:

сохранение данных в ряде баз данных с опциями репликации, сегментирования и кластеризации

ряд протоколов доступа, часто включая HTTP и веб-сокеты

любое количество отдельных очередей

отложенный обмен сообщениями, когда обработка сообщений может происходить позже

поддержка, подобная транзакции, когда сообщение повторно ставится в очередь, если обработка не подтверждена

Шаблоны публикации-подписки, когда приложения получают событие, когда новый элемент появляется в очереди.

Программное обеспечение брокера сообщений включает Redis, RabbitMQ, Apache ActiveMQ и Gearman. Облачные службы обмена сообщениями включают Amazon SQS, Azure Service Bus и Google Pub/Sub.

Это могут быть жизнеспособные варианты для приложений корпоративного уровня. Тем не менее, они могут быть излишними, если у вас более простые требования и вы уже используете базу данных.

Используйте MongoDB в качестве брокера сообщений нашей очереди задач узла

Можно разработать сложную систему очередей задач Node для управления структурами данных очередей в паре сотен строк кода.

Описанный queue-mongodbздесь модуль использует MongoDB для хранения данных, но те же принципы могут быть приняты любой базой данных SQL или NoSQL. Код доступен на GitHub и npm.

Node Task Queue Project: начало работы

Убедитесь, что у вас установлен Node.js 14 или более поздней версии, а затем создайте новую папку проекта, например queue-test. Добавьте новый package.jsonфайл:

{

«name»: «queue-test»,

«version»: «1.0.0»,

«description»: «Queue test»,

«type»: «module»,

«scripts»: {

«send»: «node. /send.js»,

«receive»: «node. /receive.js»

}

}

Примечание: «type»: «module»настраивает проект на использование модулей ES6. Будет «scripts»отправлять и получать элементы в очереди.

Установите модуль очереди-mongodb:

npm install @craigbuckler/queue-mongodb

Затем создайте.envфайл с учетными данными для подключения к базе данных MongoDB. Например:

QUEUE_DB_HOST=localhost

QUEUE_DB_PORT=27017

QUEUE_DB_USER=root

QUEUE_DB_PASS=mysecret

QUEUE_DB_NAME=qdb

QUEUE_DB_COLL=queue

Примечание: это создает queueколлекцию (QUEUE_DB_COLL) в qdbбазе данных (QUEUE_DB_NAME). Вы можете использовать существующую базу данных, но убедитесь, что коллекция не конфликтует с другой коллекцией.

Доступ к базе данных для чтения/записи должен быть предоставлен пользователю root (QUEUE_DB_USER) с паролем mysecret (QUEUE_DB_PASS). Установите оба значения пустыми, если аутентификация не требуется.

Запустите базу данных MongoDB, если она еще не запущена. Те, у кого есть Docker и Docker Compose, могут создать новый docker-compose.ymlфайл:

version: '3'

services:

queuedb:

environment:

— MONGO_INITDB_ROOT_USERNAME=${QUEUE_DB_USER}

— MONGO_INITDB_ROOT_PASSWORD=${QUEUE_DB_PASS}

image: mongo:4.4-bionic

container_name: queuedb

volumes:

— queuedata: /data/db

ports:

— «${QUEUE_DB_PORT}: ${QUEUE_DB_PORT}»

restart: always

volumes:

queuedata:

Затем выполните docker-compose upзагрузку и запустите MongoDB с постоянным томом данных.

Docker доступен для Linux, macOS и Windows 10. См. инструкции по установке Docker.

Создайте новый send.jsфайл, чтобы добавить случайно сгенерированные сообщения электронной почты в очередь с именем news:

// Queue module

import { Queue } from '@craigbuckler/queue-mongodb’;

// initialize queue named 'news’

const newsQ = new Queue ('news’) ;

// random name

const name = String.fromCharCode (65 + Math.random () * 26).repeat (1 + Math.random () * 10) ;

// add object to queue

const send = await newsQ.send ({

name: name,

email: `${ name.toLowerCase () }@test.com`,

date: new Date (),

message: `Hey there, ${ name }! `

}) ;

console.log ('send’, send) ;

// get number of items remaining in queue

console.log ('items queued:', await newsQ.count ());

// close connection and quit

await newsQ.close () ;

Выполните его с помощью npm run send, и вы увидите такой вывод:

send {

_id: 607d692563bd6d05bb459931,

sent: 2021-04-19T11:27:33.000Z,

data: {

name: 'AAA’,

email: 'aaa@test.com’,

date: 2021-04-19T11:27:33.426Z,

message: 'Hey there, AAA!'

}

}

items queued: 1

Метод.send () возвращает qItemобъект, содержащий:

документ MongoDB_id

дата/время, когда элемент изначально был поставлен в очередь, и

копия сообщенияdata

Запустите сценарий любое количество раз, чтобы добавить дополнительные элементы в очередь. Будет items queuedувеличиваться при каждом запуске.

Теперь создайте новый receive.jsфайл для получения сообщений из той же очереди задач Node:

// Queue module

import { Queue } from '@craigbuckler/queue-mongodb’;

// initialize queue named 'news’

const newsQ = new Queue ('news’) ;

let qItem;

do {

qItem = await newsQ.receive () ;

if (qItem) {

console.log ('\nreceive’, qItem) ;

//... process qItem.data...

//... to send email...

}

} while (qItem) ;

// number of items remaining in queue

console.log ('items queued:', await newsQ.count ());

await newsQ.close () ;

Запустите npm run receive, чтобы получить и обработать элементы в очереди:

receive {

_id: 607d692563bd6d05bb459931,

sent: 2021-04-19T11:27:33.000Z,

data: {

name: 'AAA’,

email: 'aaa@test.com’,

date: 2021-04-19T11:27:33.426Z,

message: 'Hey there, AAA!'

}

}

items queued: 0

В этом примере электронная почта не отправляется, но это можно реализовать с помощью Nodemailer или другого подходящего модуля.

Если обработка не удалась — возможно, из-за того, что почтовый сервер не работает — элемент можно повторно поставить в очередь следующим образом:

newsQ.send (qItem.data, 600) ;

Второй 600аргумент — это необязательное количество секунд или будущая дата. Эта команда повторно ставит элемент в очередь по истечении 600 секунд (десяти минут).

Это простой пример, но любое приложение может отправлять данные в любое количество очередей. Другой процесс, возможно запущенный как cronзадание, может получать и обрабатывать элементы, когда это необходимо.

Как queue-mongodbработает модуль

Строка type, переданная конструктору класса, определяет имя очереди. Метод.send () создает новый документ MongoDB при передаче данных для добавления в очередь. Документ MongoDB содержит:

MongoDB _id (дата/время создания закодированы в значении).

Очередь type.

Значение даты/времени обработки с именем proc. Можно установить будущее время, но по умолчанию используется текущее время.

Пункт data. Это может быть что угодно: логическое значение, число, строка, массив, объект и так далее.

Метод.receive () находит самый старый документ, который имеет совпадение typeи procдату/время в прошлом. Документ форматируется, возвращается вызывающему коду и удаляется из базы данных.

Следующие разделы описывают модуль более подробно.

queue-mongodbМодуль: Инициализация

При необходимости dotenvмодуль считывает.envпеременные окружения. Объект подключения к базе данных создается с помощью официального mongodbмодуля драйвера:

// modules

import dotenv from 'dotenv’;

import mongoDB from 'mongodb’;

// environment variables

if (! process.env.QUEUE_DB_HOST) {

dotenv.config () ;

}

// MongoDB database client

const

dbName = process.env.QUEUE_DB_NAME || 'qdb’,

qCollectionName = process.env.QUEUE_DB_COLL || 'queue’,

qAuth = process.env.QUEUE_DB_USER? `${ process.env.QUEUE_DB_USER }: ${ process.env.QUEUE_DB_PASS || '' }@`: '',

dbClient = new mongoDB.MongoClient (

`mongodb: //${ qAuth }${ process.env.QUEUE_DB_HOST || 'localhost’ }: ${ process.env.QUEUE_DB_PORT || '27017' }/`,

{ useNewUrlParser: true, useUnifiedTopology: true }

) ;

Переменная qCollectionсодержит ссылку на коллекцию очередей базы данных (определяется параметром QUEUE_DB_COLL). Он создается и возвращается dbConnect () функцией, которая также определяет схему коллекции и при необходимости индексирует ее. Все Queueметоды запускаются const q = await dbConnect () ;для получения ссылки на коллекцию:

let qCollection; // queue collection

// shared connection

async function dbConnect () {

// collection available

if (qCollection) return qCollection;

// connect to database

await dbClient.connect () ;

// collection defined?

const

db = dbClient.db (dbName),

colList = await db.listCollections ({ name: qCollectionName }, { nameOnly: true }).toArray () ;

if (! colList.length) {

// define collection schema

let $jsonSchema = {

bsonType: 'object’,

required: [ 'type’, 'proc’, 'data’ ],

properties: {

type: { bsonType: 'string’, minLength: 1 },

proc: { bsonType: 'date’ }

}

};

await db.createCollection (qCollectionName, { validator: { $jsonSchema } }) ;

// define indexes

await db.collection (qCollectionName).createIndexes ([

{ key: { type: 1 } },

{ key: { proc: 1 } }

]) ;

}

// return queue collection

qCollection = db.collection (qCollectionName) ;

return qCollection;

}

Функция dbClose () закрывает соединение с базой данных:

// close MongoDB database connection

async function dbClose () {

if (qCollection) {

await dbClient.close () ;

qCollection = null;

}

}

queue-mongodbМодуль: QueueКонструктор

Конструктор Queueзадает очередь typeили имя:

export class Queue {

constructor (type = 'DEFAULT’) {

this.type = type;

}

queue-mongodbМодуль: Queue.send () Метод

Метод.send () добавляет данные в очередь с соответствующим type. У него есть необязательный delayUntilпараметр, который добавляет элемент в очередь в будущем, указав количество секунд или Date ().

Метод вставляет новый документ в базу данных и возвращает qItemобъект ({ _id, sent, data}) или в nullслучае неудачи:

async send (data = null, delayUntil) {

try {

// calculate start date/time

let proc = new Date () ;

if (delayUntil instanceof Date) {

proc = delayUntil;

}

else if (! isNaN (delayUntil)) {

proc = new Date (+proc + delayUntil * 1000) ;

}

// add item to queue

const

q = await dbConnect (),

ins = await q.insertOne ({

type: this.type, proc, data

}) ;

// return qItem

return ins && ins.insertedCount && ins.insertedId? { _id: ins.insertedId, sent: ins.insertedId.getTimestamp (), data }: null;

}

catch (err) {

console.log (`Queue.send error: \n${ err }`) ;

return null;

}

}

queue-mongodbМодуль: Queue.receive () Метод

Метод.receive () извлекает и удаляет самый старый элемент очереди в базе данных с определенной typeдатой proc/временем в прошлом. Он возвращает qItemобъект ({ _id, sent, data}) или nullесли ничего не доступно или возникает ошибка:

async receive () {

try {

// find and delete next item on queue

const

now = new Date (),

q = await dbConnect (),

rec = await q.findOneAndDelete (

{

type: this.type,

proc: { $lt: now }

},

{

sort: { proc: 1 }

}

) ;

const v = rec && rec.value;

// return qItem

return v? { _id: v. _id, sent: v. _id.getTimestamp (), data: v.data }: null;

}

catch (err) {

console.log (`Queue.receive error: \n${ err }`) ;

return null;

}

}

queue-mongodbМодуль: Queue.remove () Метод

Метод.remove () удаляет элемент из очереди, идентифицированный qItemобъектом ({ _id, sent, data}), возвращенным.send () методом. Его можно использовать для удаления элемента из очереди независимо от его положения в очереди.

Метод возвращает количество удаленных документов (обычно 1) или nullпри возникновении ошибки:

async remove (qItem) {

// no item to remove

if (! qItem ||! qItem. _id) return null;

try {

const

q = await dbConnect (),

del = await q.deleteOne ({ _id: qItem. _id }) ;

return del.deletedCount;

}

catch (err) {

console.log (`Queue.remove error: \n${ err }`) ;

return null;

}

}

queue-mongodbМодуль: Queue.purge () Метод

Метод.purge () удаляет все одинаковые элементы в очереди typeи возвращает количество удалений:

async purge () {

try {

const

q = await dbConnect (),

del = await q.deleteMany ({ type: this.type }) ;

return del.deletedCount;

}

catch (err) {

console.log (`Queue.purge error: \n${ err }`) ;

return null;

}

}

queue-mongodbМодуль: Queue.count () Метод

Метод.count () возвращает количество одинаковых элементов в очереди type:

async count () {

try {

const q = await dbConnect () ;

return await q.countDocuments ({ type: this.type }) ;

}

catch (err) {

console.log (`Queue.count error: \n${ err }`) ;

return null;

}

}

queue-mongodbМодуль: Queue.close () Метод

Метод.close () запускает dbClose () функцию для завершения соединения с базой данных, чтобы цикл обработки событий Node.js мог завершиться:

async close () {

try {

await dbClose () ;

}

catch (err) {

console.log (`Queue.close error: \n${ err }`) ;

return null;

}

}

// end of class

}

Новая очередь

Очереди являются проблемой для любого веб-приложения с ресурсоемкими функциями, которые могут стать узким местом. Они могут повысить производительность и обслуживание за счет разделения приложений на более мелкие, быстрые и надежные процессы. Выделенное программное обеспечение брокера сообщений является опцией, но простые системы очередей, такие как очередь задач Node, которую мы создали сегодня, возможны в нескольких десятках строк кода.

3D-печать5GABC-анализAndroidAppleAppStoreAsusCall-центрChatGPTCRMDellDNSDrupalExcelFacebookFMCGGoogleHuaweiInstagramiPhoneLinkedInLinuxMagentoMicrosoftNvidiaOpenCartPlayStationPOS материалPPC-специалистRuTubeSamsungSEO-услугиSMMSnapchatSonyStarlinkTikTokTwitterUbuntuUp-saleViasatVPNWhatsAppWindowsWordPressXiaomiYouTubeZoomАвдеевкаАктивные продажиАкцияАлександровск ЛНРАлмазнаяАлчевскАмвросиевкаАнализ конкурентовАнализ продажАнтимерчандайзингАнтрацитАртемовскАртемовск ЛНРАссортиментная политикаБелгородБелицкоеБелозерскоеБердянскБизнес-идеи (стартапы)БрендБрянкаБукингВахрушевоВендорВидеоВикипедияВирусная рекламаВирусный маркетингВладивостокВнутренние продажиВнутренний маркетингВолгоградВолновахаВоронежГорловкаГорнякГорскоеДебальцевоДебиторкаДебиторская задолженностьДезинтермедитацияДзержинскДивизионная система управленияДизайнДимитровДирект-маркетингДисконтДистрибьюторДистрибьюцияДобропольеДокучаевскДоменДружковкаЕкатеринбургЕнакиевоЖдановкаЗапорожьеЗимогорьеЗолотоеЗоринскЗугрэсИжевскИловайскИрминоКазаньКалининградКировскКировскоеКомсомольскоеКонстантиновкаКонтент-маркетингКонтент-планКопирайтингКраматорскКрасноармейскКрасногоровкаКраснодарКраснодонКраснопартизанскКрасный ЛиманКрасный ЛучКременнаяКураховоКурскЛисичанскЛуганскЛутугиноМакеевкаМариупольМаркетингМаркетинговая информацияМаркетинговые исследованияМаркетинговый каналМаркетинг услугМаркетологМарьинкаМедиаМелекиноМелитопольМенеджментМерчандайзерМерчандайзингМиусинскМолодогвардейскМоскваМоспиноНижний НовгородНиколаевНиколаевкаНишевой маркетингНовоазовскНовогродовкаНоводружескНовосибирскНумерическая дистрибьюцияОдессаОмскОтдел маркетингаПартизанский маркетингПервомайскПеревальскПетровскоеПлата за кликПоисковая оптимизацияПопаснаяПравило ПаретоПривольеПрогнозирование продажПродвижение сайтов в ДонецкеПроизводство видеоПромоПромоушнПрямой маркетингРабота для маркетологаРабота для студентаРазработка приложенийРаспродажаРегиональные продажиРекламаРеклама на асфальтеРемаркетингРетро-бонусРибейтРитейлРовенькиРодинскоеРостов-на-ДонуРубежноеСамараСанкт-ПетербургСаратовСватовоСвердловскСветлодарскСвятогорскСевастопольСеверодонецкСеверскСедовоСейлз промоушнСелидовоСимферопольСинергияСколковоСлавянскСнежноеСоздание сайтов в ДонецкеСоледарСоциальные сетиСочиСтаробельскСтаробешевоСтахановСтимулирование сбытаСуходольскСчастьеТелемаркетингТельмановоТираспольТорговый представительТорезТрейд маркетингТрейд промоушнТюменьУглегорскУгледарУкраинскХабаровскХарцызскХерсонХостингЦелевая аудиторияЦифровой маркетингЧасов ЯрЧелябинскШахтерскЮжно-СахалинскЮнокоммунаровскЯндексЯсиноватая