Skip to content

Queue

将监听器接收到的消息放入消息队列中,监听器函数从队列中依次提取消息进行排除处理。

使用方法

typescript
import { queue } from 'fastevent/pipes';

emitter.on(
    'task',
    async (msg) => {
        await processTask(msg.payload);
    },
    {
        pipes: [
            queue({
                size: 5 // 队列大小=5
            })], 
    },
);

配置参数

queue(optins)支持以下参数:

参数 类型 默认值 描述
size number 10 队列大小
overflow 'slide' | 'drop' | 'throw' | 'expand' 'slide' 队列满时的处理策略
expandOverflow 'slide' | 'drop' | 'throw' 'slide' 扩展策略(当 overflow=expand时使用)
maxExpandSize number 100 最大扩展大小
onPush (newMsg, queuedMsgs) => void 新消息入列时的回调函数
onDrop (msg) => void 当新消息被丢弃时的回调函数
lifetime number 指定消息在队列中保存的最大时长(毫秒),超过会丢弃。

溢出处理

overflow参数用于指定队列满时的处理策略,可选值如下:

描述
slide 丢弃最早的消息
drop 丢弃新消息
throw 抛出异常
expand 扩展队列,每次扩展size,最大maxExpandSize

重点说一下expand策略,当队列满时,会扩展队列,每次扩展size,最大maxExpandSize。 如果队列已经达到maxExpandSize,则由expandOverflow参数指定处理策略。

入列回调

onPush参数用于指定新消息入列时的回调函数,该函数会在新消息入列时被调用。

onPush参数:

参数 类型 描述
newMsg any 新消息
queuedMsgs any[] 队列中的消息

onPush回调可以在入列时对消息队列进行处理。

下例是根据按优先级处理消息的示例:

typescript
import { queue } from 'fastevent/pipes';
emitter.on("test", async (msg) => {
    await delay(first ? 500 : 10)  // 每个消息处理时间相同
    first = false
    results.push(msg.payload)
}, {
    pipes: [queue({
        size: 5,
        onPush: (newMsg, queuedMsgs) => {
            // 根据priority排序,高优先级(数字大)的排在前面
            const insertIndex = queuedMsgs.findIndex(
                msg => (msg[0].meta.priority ?? 0) < (newMsg.meta.priority ?? 0)
            )
            queuedMsgs.splice(insertIndex, 0, [newMsg, 0])
        }
    })]
})

// 发送不同优先级的消息
const promises = [
    ...emitter.emit("test", 1, { meta: { priority: 1 } }),   //  
    ...emitter.emit("test", 2, { meta: { priority: 1 } }),   //  低
    ...emitter.emit("test", 3, { meta: { priority: 3 } }),   //  
    ...emitter.emit("test", 4, { meta: { priority: 2 } }),   //  
    ...emitter.emit("test", 5, { meta: { priority: 5 } }),   //  
    ...emitter.emit("test", 6, { meta: { priority: 4 } }),   //  高
]

return new Promise<void>(resolve => {
    vi.runAllTimersAsync()
    Promise.all(promises).then(() => {
        // 验证消息按优先级顺序处理: 
        // 第1条消息因为还没有入列,所以先得到处理
        expect(results).toEqual([1, 5, 6, 3, 4, 2])
    }).finally(() => {
        resolve()
    })
})

提示

onPush回调一般用于对队列进行重新处理,如排序,分组等。

在上例中,当每次接收到一条消息时对消息队列进行重新排序,将高优先级的消息排在前面。 这样存在一个问题,当消息量比较大时,每次重新排序会消耗大量时间,影响性能。

因此,也可以支持在出列时进行排序时,同样的功能可以改为在出列时进行排序。示例如下:

ts
import { queue } from 'fastevent/pipes';
emitter.on("test", async (msg) => {
    await delay(first ? 500 : 10)  // 每个消息处理时间相同
    first = false
    results.push(msg.payload)
}, {
    pipes: [queue({
        size: 5,
        onPop: (queuedMsgs, hasNew) => {
            // 自上次pop之后是否有新消息 ,如果有则按优先级重新排序
            if (hasNew) {
                const sortedMsgs = queuedMsgs.sort(([msg1], [msg2]) => {
                    return (msg2.meta.priority ?? 0) - (msg1.meta.priority ?? 0)
                })
                queuedMsgs.splice(0, queuedMsgs.length, ...sortedMsgs)
            }
            return queuedMsgs.shift()
        }
    })
    ]
})

// 发送不同优先级的消息
const promises = [
    ...emitter.emit("test", 1, { meta: { priority: 1 } }),   //  
    ...emitter.emit("test", 2, { meta: { priority: 1 } }),   //  低
    ...emitter.emit("test", 3, { meta: { priority: 3 } }),   //  
    ...emitter.emit("test", 4, { meta: { priority: 2 } }),   //  
    ...emitter.emit("test", 5, { meta: { priority: 5 } }),   //  
    ...emitter.emit("test", 6, { meta: { priority: 4 } }),   //  高
]

return new Promise<void>(resolve => {
    vi.runAllTimersAsync()
    Promise.all(promises).then(() => {
        // 验证消息按优先级顺序处理: 
        // 第1条消息因为还没有入列,所以先得到处理
        expect(results).toEqual([1, 5, 6, 3, 4, 2])
    }).finally(() => {
        resolve()
    })
})
  • onPop在消息出列时调用,返回queuedMsgs中的消息,hasNew表示是否还有新消息,如果在处理消息期间没有新的消息入列,则不需要重复排序。

丢弃消息

提供drop回调参数,在消息被丢弃时调用。

typescript
// 自定义重试配置
emitter.on(
    'sendRequest',
    async () => {
        await apiRequest();
    },
    {
        pipes: [
            queue(100, { 
                drop: (msg, error) => {
                   //
                }, 
            }),
        ],
    },
);

默认参数

queue的默认参数如下:

参数
size 10
overflow 'slide'
expandOverflow 'slide'
maxExpandSize 100

快捷方式

Queue提供了以下几个快捷方式

dropping

队列溢出丢弃

ts
export const dropping = (size: number = 10) => queue({ size, overflow: 'drop' })

sliding

队列溢出时丢弃最早的消息

ts
export const sliding = (size: number = 10) => queue({ size, overflow: 'slide' })

expanding

队列溢出时自动扩展队列

ts
export const expanding = (options?: Omit<QueueListenerPipeOptions, 'overflow'>) => queue(Object.assign({}, options, { overflow: 'expand' as QueueListenerPipeOptions['overflow'] }))