如何在nodej中的事件发射器中解决数据丢失问题?微服务

发布于 2025-01-28 06:01:22 字数 4835 浏览 3 评论 0原文

下面的代码仅用于测试目的,如果它有效,那么我想用微服务在实际项目中实现相同的操作。我在一台服务器中有4个微服务。微服务之间的通信是通过兔子。

我的流量:

微服务1(通过RabbitMQ从此处发送数据)=====>微服务2(从RabbitMQ接收数据)将数据恢复到====>微服务1。

微服务1有两个文件:

  1. emitter.controller.js
  2. 接收authcontroller.js

microservice 2现在有一个文件:

  1. readecteTsettingsController.js

,这是Flow ->

emitter.controller.js(m1)将数据发送到rectectSettingsController.js(m2),然后再次接收settingscontroller.js(m2)将相同的数据发送回到已接收到authauthcontroller.js(m1)在emitter.controller.js(M1)中收听,此后我们将发送HTTP响应。

首先,我们将所有响应以“键”为单位,作为每个请求的mongodbobjectid,只是为了唯一性,“值”是每个请求的响应。 令emit_response_list_map = new Map();

最后,我们通过地图访问响应并将响应寄回。

我的问题是:

  1. 如果我通过 jmeter 达到100个请求,那么所有流程都有效,但是在Expive autheauthcontroller.js(m1)中,有时会发出重复值。在emitter.controller.js(M1)中的侦听器中发生2-3次,并且数据丢失发生。 注意:在侦听器中,所有100个发射数据都已收到,但是在100个objectid中,有100个正在重复,所有这些都不是唯一的。如何在事件发射器中解决此数据丢失问题

? i.sstatic.net/dmvkd.png“ alt =”在此处输入图像说明“>

我的代码下方

microservice 1: 文件:emitter.controller.js

var common = require('../config/event_emitter');
var commonEmitter = common.commonEmitter;
const { sendMessage } = require('../config/rabbitmq/emit');
const connection = require('../config/rabbitmq/connection');
var mongoose = require('mongoose');
const { parse, stringify } = require('flatted');



let emit_response_list_map = new Map();
exports.emitOnceTesting = async (req, res, next) => {
    let count = mongoose.Types.ObjectId();
    emit_response_list_map.set(count.toString(), res);

    const rabbit_conn = await connection.connect();
    const emit_channel = await rabbit_conn.createChannel();
    let method = "emit_r";

    let dataObj = { method, count };
    let dataPacket = { exchange: 'device_model', key: "emit.key", data: dataObj };
    emit_channel.assertExchange('device_model', 'topic', {
        durable: false
    });

    sendMessage(emit_channel, dataPacket);
    console.log(`============================== [[SEND] => [FROM] auth_ms : emit_controller: ${method} [TO] settings_ms : receivedDeviceController: emit_controller: ${method}] =======================================, ${dataPacket.data.count}`);

    commonEmitter.on('emitter_test', async (data) => {
        try {
            console.log("______________emitter_test_______________", data.index);

            commonEmitter.removeAllListeners('emitter_test');
            await emit_channel.close();
            const res = emit_response_list_map.get(data.index);

            res.status(200).json({ status: true });        
        } catch (err) {
            console.log(err);
        }
    });
}

微服务2: - >文件:readectsettingscontroller.js:

 let emitTesting_q1 = await device_channel_sett_ms.assertQueue('emit1test_r', { exclusive: true });
        device_channel_sett_ms.bindQueue(emitTesting_q1.queue, 'device_model', 'emit.key');
        device_channel_sett_ms.consume(emitTesting_q1.queue, async (msg) => {
            try {
                // @ts-ignore
                let data = parse(msg.content);
                console.log('============================== [[RECEIVED] => [HERE] settings_ms : receivedDeviceController: emitTesting [FROM] coin_zone_ms : receivedDeviceController: emitTesting] =======================================', data);            //res.json({ status: true, success: data });
                let dataPacketAck = { exchange: 'device_model', key: 'emit.key.ack', data };

                await sendMessage(device_channel_sett_ms, dataPacketAck);
                console.log('============================== [[SEND-ACK] => [FROM] settings_ms : receivedDeviceController: emitTesting [FROM] coin_zone_ms : receivedDeviceController:emitTesting ] =======================================', data);
            } catch (err) {
                console.log('exception in getGeoFenceSetting : getGeoFenceDetails -----------', err);
            }
        },
            {
                noAck: true
            }
        );


微服务1:文件:recdiveAuthController.js

   let emit_key_ack = await device_channel_auth_ms.assertQueue('', { exclusive: true });
    device_channel_auth_ms.bindQueue(emit_key_ack.queue, 'device_model', 'emit.key.ack');
    device_channel_auth_ms.consume(emit_key_ack.queue, async (msg) => {
        // @ts-ignore
        let data = parse(msg.content);
        console.log("___________________________________ack________________", data.countack);
        if (data.method = "emit_r") {
            commonEmitter.emit('emitter_test', data);
        } else if (data.method = "") {
        } else if (data.method = "") {
        }
    });

Below code is just for testing purpose , if it works then i want to implement the same in real project with microservices. I have 4 microservices in one server. And the communication between microservices is through rabbitMQ.

My flow :

Microservice 1 (Send data from here through rabbitMQ) =====> Microservice 2 (receive data here from rabbitMQ) resend the data back to the same to ====> Microservice 1.

Microservice 1 has two files:

  1. Emitter.controller.js
  2. recievedAuthController.js

Microservice 2 has one file:

  1. receivedSettingsController.js

Now,Here is the Flow ->

Emitter.controller.js (M1) sends data to receivedSettingsController.js (M2) and again receivedSettingsController.js (M2) sends the same data back to recievedAuthController.js (M1) which has event emitter (emitter_test) this emitter emits data which is listened in Emitter.controller.js (M1), After this we are sending the http response.

At first we are storing all the responses in map with “key” as mongoDBObjectId for each request just for uniqueness and the “value” is responses of each request.
let emit_response_list_map = new Map();

At last we are accessing the response through Map and sending back the response.

My Issue:

  1. If I hit 100 requests through Jmeter then , all the flow works but at event emitter in recievedAuthController.js (M1) emits duplicate values sometimes., i.e the ObjectID we are sending throughout the flow appears 2-3 times in the listener in Emitter.controller.js (M1) and data loss happens.
    Note: In listener all 100 emits data is received but in that 100 some of the ObjectID’s are repeating, all are not unique. How to solve this data loss issue in Event Emitters?

enter image description here

My Code Below

MicroService 1 :
File: Emitter.controller.js

var common = require('../config/event_emitter');
var commonEmitter = common.commonEmitter;
const { sendMessage } = require('../config/rabbitmq/emit');
const connection = require('../config/rabbitmq/connection');
var mongoose = require('mongoose');
const { parse, stringify } = require('flatted');



let emit_response_list_map = new Map();
exports.emitOnceTesting = async (req, res, next) => {
    let count = mongoose.Types.ObjectId();
    emit_response_list_map.set(count.toString(), res);

    const rabbit_conn = await connection.connect();
    const emit_channel = await rabbit_conn.createChannel();
    let method = "emit_r";

    let dataObj = { method, count };
    let dataPacket = { exchange: 'device_model', key: "emit.key", data: dataObj };
    emit_channel.assertExchange('device_model', 'topic', {
        durable: false
    });

    sendMessage(emit_channel, dataPacket);
    console.log(`============================== [[SEND] => [FROM] auth_ms : emit_controller: ${method} [TO] settings_ms : receivedDeviceController: emit_controller: ${method}] =======================================, ${dataPacket.data.count}`);

    commonEmitter.on('emitter_test', async (data) => {
        try {
            console.log("______________emitter_test_______________", data.index);

            commonEmitter.removeAllListeners('emitter_test');
            await emit_channel.close();
            const res = emit_response_list_map.get(data.index);

            res.status(200).json({ status: true });        
        } catch (err) {
            console.log(err);
        }
    });
}

MicroService 2:-> File: receivedSettingsController.js :

 let emitTesting_q1 = await device_channel_sett_ms.assertQueue('emit1test_r', { exclusive: true });
        device_channel_sett_ms.bindQueue(emitTesting_q1.queue, 'device_model', 'emit.key');
        device_channel_sett_ms.consume(emitTesting_q1.queue, async (msg) => {
            try {
                // @ts-ignore
                let data = parse(msg.content);
                console.log('============================== [[RECEIVED] => [HERE] settings_ms : receivedDeviceController: emitTesting [FROM] coin_zone_ms : receivedDeviceController: emitTesting] =======================================', data);            //res.json({ status: true, success: data });
                let dataPacketAck = { exchange: 'device_model', key: 'emit.key.ack', data };

                await sendMessage(device_channel_sett_ms, dataPacketAck);
                console.log('============================== [[SEND-ACK] => [FROM] settings_ms : receivedDeviceController: emitTesting [FROM] coin_zone_ms : receivedDeviceController:emitTesting ] =======================================', data);
            } catch (err) {
                console.log('exception in getGeoFenceSetting : getGeoFenceDetails -----------', err);
            }
        },
            {
                noAck: true
            }
        );


Microservice 1: File: recievedAuthController.js

   let emit_key_ack = await device_channel_auth_ms.assertQueue('', { exclusive: true });
    device_channel_auth_ms.bindQueue(emit_key_ack.queue, 'device_model', 'emit.key.ack');
    device_channel_auth_ms.consume(emit_key_ack.queue, async (msg) => {
        // @ts-ignore
        let data = parse(msg.content);
        console.log("___________________________________ack________________", data.countack);
        if (data.method = "emit_r") {
            commonEmitter.emit('emitter_test', data);
        } else if (data.method = "") {
        } else if (data.method = "") {
        }
    });

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文