Queue
Places messages received by the listener into a message queue, and the listener function extracts messages from the queue for sequential processing.
Usage
import { queue } from 'fastevent/pipes';
emitter.on(
'task',
async (msg) => {
await processTask(msg.payload);
},
{
pipes: [
queue({
size: 5 // Queue size=5
})],
},
);
Configuration Parameters
queue(options)
supports the following parameters:
Parameter | Type | Default | Description |
---|---|---|---|
size | number | 10 | Queue size |
overflow | 'slide' | 'drop' | 'throw' | 'expand' | 'slide' | Strategy when queue is full |
expandOverflow | 'slide' | 'drop' | 'throw' | 'slide' | Expansion strategy (used when overflow=expand ) |
maxExpandSize | number | 100 | Maximum expansion size |
onPush | (newMsg, queuedMsgs) => void | Callback function when a new message enters the queue | |
onDrop | (msg) => void | Callback function when a new message is dropped | |
lifetime | number | Specifies the maximum time (in milliseconds) a message is kept in the queue, after which it will be discarded. |
Overflow Handling
The overflow
parameter specifies the strategy when the queue is full, with the following options:
Value | Description |
---|---|
slide | Discard the oldest message |
drop | Discard the new message |
throw | Throw an exception |
expand | Expand the queue, each time by size , up to maxExpandSize |
Let's focus on the expand
strategy: when the queue is full, it will expand by size
each time, up to maxExpandSize
.
If the queue has already reached maxExpandSize
, the strategy specified by expandOverflow
will be used.
Queue Entry Callback
The onPush
parameter specifies a callback function when a new message enters the queue.
onPush
parameters:
Parameter | Type | Description |
---|---|---|
newMsg | any | New message |
queuedMsgs | any[] | Messages in the queue |
The onPush
callback can process the message queue when a new message enters.
Here's an example of processing messages by priority:
import { queue } from 'fastevent/pipes';
emitter.on("test", async (msg) => {
await delay(first ? 500 : 10) // Each message has the same processing time
first = false
results.push(msg.payload)
}, {
pipes: [queue({
size: 5,
onPush: (newMsg, queuedMsgs) => {
// Sort by priority, higher priority (larger number) comes first
const insertIndex = queuedMsgs.findIndex(
msg => (msg[0].meta.priority ?? 0) < (newMsg.meta.priority ?? 0)
)
queuedMsgs.splice(insertIndex, 0, [newMsg, 0])
}
})]
})
// Send messages with different priorities
const promises = [
...emitter.emit("test", 1, { meta: { priority: 1 } }), //
...emitter.emit("test", 2, { meta: { priority: 1 } }), // Low
...emitter.emit("test", 3, { meta: { priority: 3 } }), //
...emitter.emit("test", 4, { meta: { priority: 2 } }), //
...emitter.emit("test", 5, { meta: { priority: 5 } }), //
...emitter.emit("test", 6, { meta: { priority: 4 } }), // High
]
return new Promise<void>(resolve => {
vi.runAllTimersAsync()
Promise.all(promises).then(() => {
// Verify messages are processed in priority order:
// The 1st message is processed first because it hasn't entered the queue yet
expect(results).toEqual([1, 5, 6, 3, 4, 2])
}).finally(() => {
resolve()
})
})
Note
The onPush
callback is generally used to reprocess the queue, such as sorting, grouping, etc.
Discarding Messages
Provide a drop
callback parameter that is called when a message is discarded.
// Custom retry configuration
emitter.on(
'sendRequest',
async () => {
await apiRequest();
},
{
pipes: [
queue(100, {
drop: (msg, error) => {
//
},
}),
],
},
);
Default Parameters
The default parameters for queue
are:
Parameter | Value |
---|---|
size | 10 |
overflow | 'slide' |
expandOverflow | 'slide' |
maxExpandSize | 100 |
Shortcuts
Queue
provides the following shortcuts:
dropping
Drop new messages when queue overflows
export const dropping = (size: number = 10) => queue({ size, overflow: 'drop' })
sliding
Drop oldest messages when queue overflows
export const sliding = (size: number = 10) => queue({ size, overflow: 'slide' })
expanding
Automatically expand the queue when it overflows
export const expanding = (options?: Omit<QueueListenerPipeOptions, 'overflow'>) => queue(Object.assign({}, options, { overflow: 'expand' as QueueListenerPipeOptions['overflow'] }))