记录每天数以百万计的请求以及需要采取哪些措施

发布于 2025-01-10 22:42:15 字数 5916 浏览 8 评论 0

HackerEarth 的 web 服务器每天处理数以百万计的请求。这些请求日志可以用来分析挖掘一些对关键业务非常有用的见解和指标,例如,每天的浏览量,每个子产品的浏览量,最受欢迎的用户导航流程等等。

最初的想法

HackerEarth 使用 Django 作为其主要 WEB 开发框架,并保存一些为性能和可扩展性定制的部件。在正常运行期间,我们的服务器平均每秒处理 80-90 个请求,而当多个竞争在一个时间差上重叠时,这会激增到每秒 200-250 个请求。我们需要一个系统,它能够很容易的扩展以满足高峰期流量每秒 500 个请求。另外,该系统应该添加最小处理开销到 web 服务器上,而收集到的数据应该存储以用于运算和离线处理。

架构

上图显示了我们的请求日志收集系统的高层体系结构。实线代表不同组件之间的数据流,而虚线代表不同组件之间的通信。整个架构是基于消息和无状态,因此各个组件可以很容易地被移除/替换,而无需停机。

下面是一个关于在数据流顺序中的每个元件的更详细解释。

Web 服务器

在 web 服务器上,我们部署了一个 Django 中间件 ,它为给定的请求异步检索所需数据,然后将它转发给 Transporter 集群服务器。这是通过使用一个线程完成的,而该中间件为 Django 请求/响应循环添加了 2 毫秒的开销。


    class RequestLoggerMiddleware(object):
        """
        Logs data from requests
        """
        def process_request(self, request):
            if settings.LOCAL or settings.DEBUG:
                return None

            if request.is_ajax():
                is_ajax = True
            request.META['IS_AJAX'] = is_ajax

            before = datetime.datetime.now()

            DISALLOWED_USER_AGENTS = ["ELB-HealthChecker/1.0"]


            http_user_agent = request.environ.get('HTTP_USER_AGENT','')

            if http_user_agent in DISALLOWED_USER_AGENTS:
                return None

            # this creates a thread which collects required data and forwards
            # it to the transporter cluster
            run_async(log_request_async, request)
            after = datetime.datetime.now()

            log("TotalTimeTakenByMiddleware %s"%((after-before).total_seconds()))
            return None

Transporter 集群

transporter 集群是非阻塞 Thrift 服务器阵列,用于从 web 服务器接收数据,然后将其路由到任何其他诸如 MongoDB, RabbitMQ, Kafka 这样的组件这一唯一目的。一条给定的消息应该被路由到哪里是来自于 web 服务器的消息自身指定的。从 web 服务器到 transporter 服务器的通信只有唯一一种方式,而这节省了花费在确认由 thrift 服务器接收的信息上的一些时间资源。由于这个原因,我们可能会失去一些请求日志,但是这样值得。当前,请求日志被路由到 Kafka 集群。web 服务器和 transporter 服务器之间的通信平均花费 1-2 毫秒,并且可以水平缩放以处理负载的增加。

以下是 thrift 配置文件的一部分。该文件定义了一个 DataTransporter 服务,支持作为一个修改器的带 oneway 的方法,这基本上意味着 RPC 调用将会立即返回。


    service DataTransporter {
        oneway void transport(1:map<string, string> message)
    }

Kafka 集群

Kafka 是一个支持发布/订阅消息模式的高通量分布式消息系统。这种消息传递基础结构使我们能够建立一个依赖于该流请求日志的其他管线。我们的 Kafka 集群保存 15 天有意义的日志,所以我们可以让我们实现的任何新的消费者开始处理数据 15 天内的任何数据。

创建一个 kafka 集群的有用参考。

Pipeline 管理服务器

这个服务器管理来自 Kafka topics 的请求日志消息的消费,将其存储在 MongoDB 中,然后将它们移动到 Amazon S3 以及 Amazon Redshift 。MongoDB 仅作为从 Kafka topics 消费的数据的分段区域,而该数据每隔一小时被转移到 S3。每个保存到 S3 的文件被加载到 Amazon Redshift 这一可以扩展到数据 PB 级的数据仓库解决方案。我们使用 Amazon Redshift 来分析/度量来自请求日志数据的计算。该服务器与一个 RabbitMQ 集群协同工作,这个集群被用来对任务完成和启动进行通信。

下面是从 S3 加载数据到 Redshift 的脚本。该脚本首先通过移除任何重复行,然后插入新数据来处理重复数据的插入。


    import os
    import sys
    import subprocess

    from django.conf import settings


    def load_s3_delta_into_redshift(s3_delta_file_path):
        """s3_delta_file_path is path after the bucket
        name.
        """
        bigdata_bucket = settings.BIGDATA_S3_BUCKET

        attrs = {
            'bigdata_bucket': bigdata_bucket,
            's3_delta_file_path': s3_delta_file_path,
        }

        complete_delta_file_path = "s3://{bigdata_bucket}/{s3_delta_file_path}".format(**attrs)

        schema_file_path = "s3://{bigdata_bucket}/request_log/s3_col_schema.json".format(**attrs)

        data = {
                'AWS_ACCESS_KEY_ID': settings.AWS_ACCESS_KEY_ID,
                'AWS_SECRET_ACCESS_KEY': settings.AWS_SECRET_ACCESS_KEY,
                'LOG_FILE':  complete_delta_file_path,
                'schema_file_path': schema_file_path
              }

        S3_REDSHIFT_COPY_COMMAND = " ".join([
            "copy requestlog_staging from '{LOG_FILE}' ",
            "CREDENTIALS 'aws_access_key_id={AWS_ACCESS_KEY_ID};aws_secret_access_key={AWS_SECRET_ACCESS_KEY}'",
            "json '{schema_file_path}';"
        ]).format(**data)


        LOADDATA_COMMAND = " ".join([
            "begin transaction;",
            "create temp table if not exists requestlog_staging(like requestlog);",
            S3_REDSHIFT_COPY_COMMAND,
            'delete from requestlog using requestlog_staging where requestlog.row_id=requestlog_staging.row_id;',
            'insert into requestlog select * from requestlog_staging;',
            "drop table requestlog_staging;",
            'end transaction;'
            #'vacuum;' #sorts new data added
        ])

        redshift_conn_args = {
            'host': settings.REDSHIFT_HOST,
            'port': settings.REDSHIFT_PORT,
            'username': settings.REDSHIFT_DB_USERNAME
        }

        REDSHIFT_CONNECT_CMD = 'psql -U {username} -h {host} -p {port}'.format(**redshift_conn_args)

        PSQL_LOADDATA_CMD = '%s -c "%s"'%(REDSHIFT_CONNECT_CMD, LOADDATA_COMMAND)

        returncode = subprocess.call(PSQL_LOADDATA_CMD, shell=True)
        if returncode !=0:
            raise Exception("Unable to load s3 delta file into redshift ",
                    s3_delta_file_path)

下一步

对于任何 Web 应用程序,数据就像金子。若以正确的方式处理,那么它可以提供的洞察以及它可以驱动的增长是惊人的。使用请求日志,可以构建几十个的功能和见解,包括推荐引擎,更好的内容交付,并提高产品的整体构建。所有这一切都有助于使 HackerEarth 更好的服务于我们的用户。

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据

关于作者

冷血

暂无简介

文章
评论
27 人气
更多

推荐作者

梦途

文章 0 评论 0

蓝眼睛不忧郁

文章 0 评论 0

134fengkuang

文章 0 评论 0

yang18

文章 0 评论 0

属性

文章 0 评论 0

    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文