前言

发布订阅模式使用上也非常广泛,比如vue的响应式数据就用到了发布订阅模式,前端的原生事件也是发布订阅模式,这种模式主要解决的就是异步的问题,用于代替回调函数,且使得两个对象是松耦合的联系再一起。

发布—订阅模式又叫观察者模式,它定义对象间的一种一对多的依赖关系,当一个对象的状 态发生改变时,所有依赖于它的对象都将得到通知。在 JavaScript 开发中,我们一般用事件模型 来替代传统的发布—订阅模式。

发布订阅实现

var event = {
    eventMap: new Map(),
    on(eventName, callback) {
        let eventList = event.eventMap.get(eventName);
        if (!eventList) {
            eventList = [];
            event.eventMap.set(eventName, eventList);
        }

        eventList.push(callback);
    },
    once(eventName, callback) {
        let eventList = event.eventMap.get(eventName);
        if (!eventList) {
            eventList = [];
            event.eventMap.set(eventName, eventList);
        }

        const onceCallback = function(data) {
            callback(data);
            event.remove(eventName, onceCallback);
        };
        eventList.push(onceCallback);
    },
    emit(eventName, data) {
        const eventList = event.eventMap.get(eventName);
        if (!eventList) return;

        eventList.forEach((callback) => callback(data));
    },
    remove(eventName, callback) {
        const eventList = event.eventMap.get(eventName);
        if (!eventList) return;

        const findIndex = eventList.findIndex((fn) => fn === callback);
        if (findIndex === -1) return;

        eventList.splice(findIndex, 1);
    },
};

event.on("test", (msg) => {
    console.log("获取到消息:", msg);
});

setTimeout(() => {
    event.emit("test", "失败了");
}, 2000);

这是一个比较常见的实现,功能上有订阅,发布,一次性订阅,移除订阅。

命名冲突

当我们全局只用一个event来实现消息订阅通知的时候,随着业务量的增大,eventName总是会有冲突的时候,解决这种冲突有两种比较好的解决办法,一种是使用es6的Symbol定义一个永远不重复的事件名,一种就是使用命名空间,既然你事件名和之前的冲突了,我再创一个全新event来订阅不就行了。

var eventName = Symbol("foo");

使用Symbol后,如果其他地方也要订阅,你就得给对方提供这个Symbol,所以常见的做法就是创建一个变量保存,然后export导出,如果其他地方需要,就引入它。

var Event = (function() {
    const defaultNamespaceName = "_default";
    const eventCache = {};

    function createEvent(namespaceName) {
        if (!namespaceName) namespaceName = defaultNamespaceName;

        if (!eventCache[namespaceName]) {
            eventCache[namespaceName] = {
                eventMap: new Map(),
                on(eventName, callback) {
                    let eventList = this.eventMap.get(eventName);
                    if (!eventList) {
                        eventList = [];
                        this.eventMap.set(eventName, eventList);
                    }

                    eventList.push(callback);
                },
                once(eventName, callback) {
                    let eventList = this.eventMap.get(eventName);
                    if (!eventList) {
                        eventList = [];
                        this.eventMap.set(eventName, eventList);
                    }

                    const onceCallback = (data) => {
                        callback(data);
                        this.remove(eventName, onceCallback);
                    };

                    eventList.push(onceCallback);
                },
                emit(eventName, data) {
                    const eventList = this.eventMap.get(eventName);
                    if (!eventList) return;

                    eventList.forEach((callback) => callback(data));
                },
                remove(eventName, callback) {
                    const eventList = this.eventMap.get(eventName);
                    if (!eventList) return;

                    const findIndex = eventList.findIndex((fn) => fn === callback);
                    if (findIndex === -1) return;

                    eventList.splice(findIndex, 1);
                },
            };
        }

        return eventCache[namespaceName];
    }

    const defaultEvent = createEvent();

    return {
        create: createEvent,
        ...defaultEvent,
    };
})();

Event.on("test", (msg) => {
    console.log("获取到消息:", msg);
});

const namespaceEvent1 = Event.create("namespace1");
const namespaceEvent2 = Event.create("namespace2");

namespaceEvent1.on("test", (msg) => {
    console.log("namespace1获取到消息:", msg);
});

namespaceEvent2.on("test", (msg) => {
    console.log("namespace2获取到消息:", msg);
});

setTimeout(() => {
    Event.emit("test", "失败了");

    namespaceEvent1.emit("test", "成功啦");

    namespaceEvent2.emit("test", "崩溃啦");
}, 2000);

命名空间的方式相对复杂一点,但是也不是很复杂,我们可以通过抛出的create创建命名空间,然后保存这个创建的event对象,然后订阅和通知都使用该对象即可。

必须先订阅再发布吗?

我们目前的代码实现中,都必须是先订阅,再进行发布通知,否则消息通知就会丢失,事实上我们不一定非要订阅后才能发布。

在以下消息队列的设计中,有些消息可能因为异步的原因,它的订阅时机是晚于发布时机的,如果采用上面的方式,就会导致消息的丢失,为了解决这个问题,我们可以实现不必先订阅再发布。

实现这种功能的做法就是消息缓存,我们将发布的消息缓存到内存或者持久化,当用户来订阅的时候,判断缓存中是否存在消息,如果存在就在订阅后,触发一次通知,将缓存的数据传递过去。

至于这个消息你要不要缓存多个,以及缓存多久,是否需要过期,就得配合实际的项目业务来动态调整。

在RxJS的 Replaysubject 就可以实现缓存消息,并在订阅时通知。

假设我们只需要保留最新的一份消息,我们调一下上面代码也可以做到:

var Event = (function() {
    const defaultNamespaceName = "_default";
    const eventCache = {};

    function createEvent(namespaceName) {
        if (!namespaceName) namespaceName = defaultNamespaceName;

        if (!eventCache[namespaceName]) {
            eventCache[namespaceName] = {
                eventMap: new Map(),
                cache: {},
                on(eventName, callback) {
                    let eventList = this.eventMap.get(eventName);
                    if (!eventList) {
                        eventList = [];
                        this.eventMap.set(eventName, eventList);
                    }

                    eventList.push(callback);

                    if (this.cache[eventName]) {
                        callback(this.cache[eventName]);
                    }
                },
                once(eventName, callback) {
                    let eventList = this.eventMap.get(eventName);
                    if (!eventList) {
                        eventList = [];
                        this.eventMap.set(eventName, eventList);
                    }

                    if (this.cache[eventName]) {
                        callback(this.cache[eventName]);
                        return;
                    }

                    const onceCallback = (data) => {
                        callback(data);
                        this.remove(eventName, onceCallback);
                    };
                    eventList.push(onceCallback);
                },
                emit(eventName, data) {
                    const eventList = this.eventMap.get(eventName);
                    if (!eventList) return;

                    eventList.forEach((callback) => callback(data));

                    this.cache[eventName] = data;
                },
                remove(eventName, callback) {
                    const eventList = this.eventMap.get(eventName);
                    if (!eventList) return;

                    const findIndex = eventList.findIndex((fn) => fn === callback);
                    if (findIndex === -1) return;

                    eventList.splice(findIndex, 1);
                },
            };
        }

        return eventCache[namespaceName];
    }

    const defaultEvent = createEvent();

    return {
        create: createEvent,
        ...defaultEvent,
    };
})();
分类: JavaScript设计模式与开发实践 标签: JavaScript模式发布订阅模式

评论

暂无评论数据

暂无评论数据

目录