Notification Service

Наверх

Для начала, опишем необходимые интерфейсы typescript для сервиса:

import type { ServiceSchema, ServiceSettingSchema, Service, Context } from 'moleculer';

export interface NotificationParams {
	message: string,
}

interface NotificationLocalVars {
	
}

interface NotificationMethods {
	
}

type NotificationThis = Service<ServiceSettingSchema> & NotificationMethods & NotificationLocalVars;

NotificationParams - это параметры запроса, который мы будем отправлять со стороны клиента. Для примера, запрос будет содержать одну строчку с сообщением.

NotificationLocalVars - описание локальных переменных сервиса. Пока что их нет.

NotificationMethods - описание служебных методов сервиса. Пока что их тоже нет.

type Meta = {
	'$socketId': string,
	user: string,
	'$rooms': string[]
}

Тип Meta нужен для описания того, что содержится в контексте запроса. В частности, нам нужен $socketId - айдишник клиента, обращающегося к сервису.

Создаём сам сервис:

const NotificationService: ServiceSchema<ServiceSettingSchema> & { methods: NotificationMethods } = {
	name: "notification",
	actions: {},
	methods: {},
};

export default NotificationService;

Для начала, попробуем сделать так, чтобы пользователь мог отправить сообщение через сокеты, и тут же получить его обратным ответом от сервера.

const NotificationService: ServiceSchema<ServiceSettingSchema> & { methods: NotificationMethods } = {
	name: "notification",
	actions: {
		notify: {
			params: {
				message: { type: "string" }
			},
			handler(ctx: Context<NotificationParams, Meta>) {
				this.logger.info(ctx.meta.$socketId);
				this.broker.call('io.broadcast', {
					namespace: '/',
					event: 'event',
					args: [ctx.params.message],
					local: true,
					rooms: [ctx.meta.$socketId]
				});
				return ctx.params.message;
			}
		}
	},
	...
}

Со стороны фронта это будет выглядеть так:

useEffect(() => {
	socket.on('connect', onConnect);
	socket.on('disconnect', onDisconnect);
	socket.on('event', onEvent);

	return () => {
		socket.off('connect', onConnect);
		socket.off('disconnect', onDisconnect);
		socket.off('event', onEvent);
	};
}, []);

const onConnect = () => {
	
}

const onDisconnect = () => {
	
}

const onEvent = (data: any) => {
	console.log(data);
}

const onButtonClick = () => {
	socket.emit('call', 'notification.notify', { message: "my message" });
}

Теперь попробуем сделать оповещения со стороны сервиса. Для примера, организуем оповещения каждую секунду.
Можно воспользоваться пакетом cron, чтобы не заморачиваться с SetTimeout:

npm install cron
import { CronJob } from 'cron';
const NotificationService: ServiceSchema<ServiceSettingSchema> & { methods: NotificationMethods } = {
	...
	async started(this) {
		const job = CronJob.from({
			cronTime: '* * * * * *',
			onTick: () => {
				this.notifyAll(`Current time: ${new Date().toString()}`)
			},
			start: true,
			timeZone: 'Asia/Tokyo'
		});
	}
}

Нам понадобится локальная переменная, в которой мы будем хранить всех подписчиков:

interface NotificationLocalVars {
	subscribers: string[],
}

Переменную нужно инициализировать при старте сервиса, и по желанию - почистить при остановке:

const NotificationService: ServiceSchema<ServiceSettingSchema> & { methods: NotificationMethods } = {
	...
	created() {
		this.subscribers = [];
	},
	stopped() {
		this.subscribers = [];
	},
	...
}

Действия в сервисе для подписки и отписки:

const NotificationService: ServiceSchema<ServiceSettingSchema> & { methods: NotificationMethods } = {
	...
	actions: {
		...
		subscribe: {
			handler(ctx: Context<null, Meta>) {
			this.logger.info(`${ctx.meta.$socketId} subscribed!`);
			this.subscribers.push(ctx.meta.$socketId);
		},
		unsubscribe: {
			handler(ctx: Context<null, Meta>) {
			this.logger.info(`${ctx.meta.$socketId} unsubscribed!`);
			this.subscribers = this.subscribers.filter((id: string) => id !== ctx.meta.$socketId);
		}
	}
}

И сам метод для оповещения:

interface NotificationMethods {
	notifyAll: (message: string) => void;
}
const NotificationService: ServiceSchema<ServiceSettingSchema> & { methods: NotificationMethods } = {
	...
	methods: {
		notifyAll(message: string) {
			this.broker.call('io.broadcast', {
				namespace: '/',
				event: 'event',
				args: [message],
				local: true,
				rooms: this.subscribers
			});
		}
	},
	...
}

Со стороны фронта добавим подписку и отписку к сервису оповещений:

const onConnect = () => {
	socket.emit('call', 'notification.subscribe');
}

const onDisconnect = () => {
	socket.emit('call', 'notification.unsubscribe');
}

Теперь каждую секунду по ws мы будем получать точное время с сервера.

Финальный вариант сервиса:

import type { ServiceSchema, ServiceSettingSchema, Service, Context } from 'moleculer';
import { CronJob } from 'cron';

export interface NotificationParams {
    message: string,
}

interface NotificationLocalVars {
    subscribers: string[],
}

interface NotificationMethods {
    notifyAll: (message: string) => void;
}

type Meta = {
    '$socketId': string,
    user: string,
    '$rooms': string[]
}

type NotificationThis = Service<ServiceSettingSchema> & NotificationMethods & NotificationLocalVars;
const NotificationService: ServiceSchema<ServiceSettingSchema> & { methods: NotificationMethods } = {
    name: "notification",
    actions: {
        notify: {
            params: {
                message: { type: "string" }
            },
            handler(ctx: Context<NotificationParams, Meta>) {
                this.logger.info(ctx.meta.$socketId);
                this.broker.call('io.broadcast', {
                    namespace: '/',
                    event: 'event',
                    args: [ctx.params.message],
                    local: true,
                    rooms: [ctx.meta.$socketId]
                });
                return ctx.params.message;
            }
        },
        subscribe: {
            handler(ctx: Context<null, Meta>) {
                this.logger.info(`${ctx.meta.$socketId} subscribed!`);
                this.subscribers.push(ctx.meta.$socketId);
            }
        },
        unsubscribe: {
            handler(ctx: Context<null, Meta>) {
                this.logger.info(`${ctx.meta.$socketId} unsubscribed!`);
                this.subscribers = this.subscribers.filter((id: string) => id !== ctx.meta.$socketId);
            }
        }
    },
    methods: {
        notifyAll(message: string) {
            this.broker.call('io.broadcast', {
                namespace: '/',
                event: 'event',
                args: [message],
                local: true,
                rooms: this.subscribers
            });
        }
    },
    created() {
        this.subscribers = [];
    },
    stopped() {
        this.subscribers = [];
    },
    async started(this) {
        const job = CronJob.from({
            cronTime: '* * * * * *',
            onTick: () => {
                this.notifyAll(`Current time: ${new Date().toString()}`)
            },
            start: true,
            timeZone: 'Asia/Tokyo'
        });
    }
};

export default NotificationService;

Наверх