Winston 记录器导致“超出最大调用堆栈大小”错误

发布于 2025-01-09 16:34:25 字数 3184 浏览 1 评论 0原文

在收到许多消息后,我在生产中收到此错误(代码对于前 5k+ 条消息工作正常),但后来它开始抛出以下错误:

(node:36) UnhandledPromiseRejectionWarning: RangeError: Maximum call stack size exceeded
at Date.[Symbol.toPrimitive] (<anonymous>)
at Date.toJSON (<anonymous>)
at JSON.stringify (<anonymous>)
at Format.jsonFormatter [as transform] (/data/packages/nodes-base/src/Logging.ts:67:30)
at DerivedLogger._transform (/data/packages/nodes-base/node_modules/winston/lib/winston/logger.js:313:29)
at DerivedLogger.Transform._read (/data/packages/nodes-base/node_modules/readable-stream/lib/_stream_transform.js:177:10)

一旦我们开始收到此错误,我们就会得到相同的 RangeError: Maximum任何其他操作也超出了调用堆栈大小。

我的代码没有进行任何递归调用,但我在异步锁内使用此日志,一旦我开始接收堆栈超出错误,就不再记录日志。这是日志,我只是通过它记录一个字符串: logger.info("kafka message returned", {"MsgTs": msgTs});

这是每当我收到一个消息时我的代码流新消息:

await CheckMemoryConsumptionAndTriggerCallback.call(this, consumerGroup, payload, logger, callback, module)


export async function CheckMemoryConsumptionAndTriggerCallback(this: ITriggerFunctions, payload: EachMessagePayload, logger: winston.Logger, callback: Function, listenerType: string) {


let intervalObj : NodeJS.Timeout
// isMemoryLimitReached method reads memory usage from a system file 
const memoryLimitReached = await isMemoryLimitReached(logger);


 if (memoryLimitReached !== true) {
    const retVal = await callback(payload, logger);
  
    if (retVal === KAFKA_MSG.EMITTED){
        // saving message timestamp in db when message is emitted
        await setKafkaDetailsDb(this, payload.message.timestamp, this.getWorkflow().executionUUID, logger)
    }
  }
 }


 async function setKafkaDetailsDb(caller: ITriggerFunctions, kafkaMsgTs: string, executionID: string | undefined, logger: winston.Logger){


  const lock_key = "key_" + caller.getWorkflow().id!;

GetAsyncLockInstance().acquire(lock_key, async function() {
    //  Some code
    await SaveStaticDataInDb.call(caller, caller.getWorkflow().id!, staticData)
    // this log below doesn’t come anymore once we start receiving the warning 
    logger.info("kafka message received", {"MsgTs": msgTs});

}, {});

 }

在代码的后面,在上面的日志之后,我们还使用 SetInterval() 来限制同时运行的处理数量。我对这个 setInterval() 方法也有疑问。

const reCheckProcessCount = async () => {     
        if (WorkflowHelpers.GetActiveProcessCount() <= concurrentProcessCount) {
            // removing the interval
            clearInterval(intervalObj);
            WorkflowHelpers.IncrActiveProcessCount()
            return this.runSubprocess(data, loadStaticData);
        }
        return ""
    };
    
            
    if (WorkflowHelpers.GetActiveProcessCount() >= concurrentProcessCount) { 
        console.log("Total allowed concurrent process count limit reached.")
        intervalObj = setInterval(reCheckProcessCount, 30000);
    } else {
        WorkflowHelpers.IncrActiveProcessCount()
        return this.runSubprocess(data, loadStaticData);    
    }   

但是上面的方法都没有被递归调用,尽管对于每个新消息,它们都会被重复调用,但它们并不处于任何递归循环中。

此错误仅在生产中出现,我无法查明此错误的确切位置和原因,也无法在本地重新生成此错误,我尝试循环调用上述流程 11k 次以重现此错误,但无法't。

任何帮助或指导将不胜感激,谢谢。

I'm getting this error on production after reciving many messages (code works fine for the first 5k+ messages), but later it starts to throw below error :

(node:36) UnhandledPromiseRejectionWarning: RangeError: Maximum call stack size exceeded
at Date.[Symbol.toPrimitive] (<anonymous>)
at Date.toJSON (<anonymous>)
at JSON.stringify (<anonymous>)
at Format.jsonFormatter [as transform] (/data/packages/nodes-base/src/Logging.ts:67:30)
at DerivedLogger._transform (/data/packages/nodes-base/node_modules/winston/lib/winston/logger.js:313:29)
at DerivedLogger.Transform._read (/data/packages/nodes-base/node_modules/readable-stream/lib/_stream_transform.js:177:10)

Once we start to recive this error, we get this same RangeError: Maximum call stack size exceeded on any other operation as well.

My code is not making any recursive call, but I'm using this log inside an async lock and once I start reciving the stack exceed error, the log isn't logged anymore. this is the log, I'm just logging a string through it : logger.info("kafka message received", {"MsgTs": msgTs});

this is my code flow whenever I recive a new message:

await CheckMemoryConsumptionAndTriggerCallback.call(this, consumerGroup, payload, logger, callback, module)


export async function CheckMemoryConsumptionAndTriggerCallback(this: ITriggerFunctions, payload: EachMessagePayload, logger: winston.Logger, callback: Function, listenerType: string) {


let intervalObj : NodeJS.Timeout
// isMemoryLimitReached method reads memory usage from a system file 
const memoryLimitReached = await isMemoryLimitReached(logger);


 if (memoryLimitReached !== true) {
    const retVal = await callback(payload, logger);
  
    if (retVal === KAFKA_MSG.EMITTED){
        // saving message timestamp in db when message is emitted
        await setKafkaDetailsDb(this, payload.message.timestamp, this.getWorkflow().executionUUID, logger)
    }
  }
 }


 async function setKafkaDetailsDb(caller: ITriggerFunctions, kafkaMsgTs: string, executionID: string | undefined, logger: winston.Logger){


  const lock_key = "key_" + caller.getWorkflow().id!;

GetAsyncLockInstance().acquire(lock_key, async function() {
    //  Some code
    await SaveStaticDataInDb.call(caller, caller.getWorkflow().id!, staticData)
    // this log below doesn’t come anymore once we start receiving the warning 
    logger.info("kafka message received", {"MsgTs": msgTs});

}, {});

 }

Later in the code, after above log, we are also using SetInterval() to limit the number of processing from running concurrently. I have my doubts in this setInterval() method as well.

const reCheckProcessCount = async () => {     
        if (WorkflowHelpers.GetActiveProcessCount() <= concurrentProcessCount) {
            // removing the interval
            clearInterval(intervalObj);
            WorkflowHelpers.IncrActiveProcessCount()
            return this.runSubprocess(data, loadStaticData);
        }
        return ""
    };
    
            
    if (WorkflowHelpers.GetActiveProcessCount() >= concurrentProcessCount) { 
        console.log("Total allowed concurrent process count limit reached.")
        intervalObj = setInterval(reCheckProcessCount, 30000);
    } else {
        WorkflowHelpers.IncrActiveProcessCount()
        return this.runSubprocess(data, loadStaticData);    
    }   

But none of the method above is being called recursively, they are called repeatedly though for every new message, but they are not in any recursive loop.

This error is seen on production only and I'm unable to pin point the exact location and reason for this error, also unable to re-produce this locally, I tried calling the above flow in loop for 11k times to reproduce this error but couldn't.

Any help or guidance will be much appreciated, thanks.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文