Queue
Places messages received by the listener into a message queue, and the listener function extracts messages from the queue for sequential processing.
Usage
import { queue } from 'fastevent/pipes';
emitter.on(
'task',
async (msg) => {
await processTask(msg.payload);
},
{
pipes: [
queue({
size: 5 // Queue size=5
})],
},
);Configuration Parameters
queue(options) supports the following parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
size | number | 10 | Queue size |
overflow | 'slide' | 'drop' | 'throw' | 'expand' | 'slide' | Strategy when queue is full |
expandOverflow | 'slide' | 'drop' | 'throw' | 'slide' | Expansion strategy (used when overflow=expand) |
maxExpandSize | number | 100 | Maximum expansion size |
onPush | (newMsg, queuedMsgs) => void | Callback function when a new message enters the queue | |
onDrop | (msg) => void | Callback function when a new message is dropped | |
lifetime | number | Specifies the maximum time (in milliseconds) a message is kept in the queue, after which it will be discarded. |
Overflow Handling
The overflow parameter specifies the strategy when the queue is full, with the following options:
| Value | Description |
|---|---|
slide | Discard the oldest message |
drop | Discard the new message |
throw | Throw an exception |
expand | Expand the queue, each time by size, up to maxExpandSize |
Let's focus on the expand strategy: when the queue is full, it will expand by size each time, up to maxExpandSize.
If the queue has already reached maxExpandSize, the strategy specified by expandOverflow will be used.
Queue Entry Callback
The onPush parameter specifies a callback function when a new message enters the queue.
onPush parameters:
| Parameter | Type | Description |
|---|---|---|
newMsg | any | New message |
queuedMsgs | any[] | Messages in the queue |
The onPush callback can process the message queue when a new message enters.
Here's an example of processing messages by priority:
import { queue } from 'fastevent/pipes';
emitter.on("test", async (msg) => {
await delay(first ? 500 : 10) // Each message has the same processing time
first = false
results.push(msg.payload)
}, {
pipes: [queue({
size: 5,
onPush: (newMsg, queuedMsgs) => {
// Sort by priority, higher priority (larger number) comes first
const insertIndex = queuedMsgs.findIndex(
msg => (msg[0].meta.priority ?? 0) < (newMsg.meta.priority ?? 0)
)
queuedMsgs.splice(insertIndex, 0, [newMsg, 0])
}
})]
})
// Send messages with different priorities
const promises = [
...emitter.emit("test", 1, { meta: { priority: 1 } }), //
...emitter.emit("test", 2, { meta: { priority: 1 } }), // Low
...emitter.emit("test", 3, { meta: { priority: 3 } }), //
...emitter.emit("test", 4, { meta: { priority: 2 } }), //
...emitter.emit("test", 5, { meta: { priority: 5 } }), //
...emitter.emit("test", 6, { meta: { priority: 4 } }), // High
]
return new Promise<void>(resolve => {
vi.runAllTimersAsync()
Promise.all(promises).then(() => {
// Verify messages are processed in priority order:
// The 1st message is processed first because it hasn't entered the queue yet
expect(results).toEqual([1, 5, 6, 3, 4, 2])
}).finally(() => {
resolve()
})
})Note
The onPush callback is generally used to reprocess the queue, such as sorting, grouping, etc.
Discarding Messages
Provide a drop callback parameter that is called when a message is discarded.
// Custom retry configuration
emitter.on(
'sendRequest',
async () => {
await apiRequest();
},
{
pipes: [
queue(100, {
drop: (msg, error) => {
//
},
}),
],
},
);Default Parameters
The default parameters for queue are:
| Parameter | Value |
|---|---|
size | 10 |
overflow | 'slide' |
expandOverflow | 'slide' |
maxExpandSize | 100 |
Shortcuts
Queue provides the following shortcuts:
dropping
Drop new messages when queue overflows
export const dropping = (size: number = 10) => queue({ size, overflow: 'drop' })sliding
Drop oldest messages when queue overflows
export const sliding = (size: number = 10) => queue({ size, overflow: 'slide' })expanding
Automatically expand the queue when it overflows
export const expanding = (options?: Omit<QueueListenerPipeOptions, 'overflow'>) => queue(Object.assign({}, options, { overflow: 'expand' as QueueListenerPipeOptions['overflow'] }))