使用 Python CGI 限制对外部数据库的调用

发布于 2024-08-14 17:08:19 字数 212 浏览 2 评论 0原文

我有一个 Python CGI 脚本,可以从 GPS 服务中提取数据;我希望网页上的此信息大约每 10 秒更新一次(GPS 服务的 TOS 允许的最大值)。但可能有 100 个用户同时查看该网页,所有用户都调用该脚本。

我认为用户的脚本需要从缓冲区页面获取数据,该页面本身每十秒只更新一次。如果没有人直接查看内容(并且不访问 CGI),如何使该缓冲区页面自动更新?有更好的方法来实现这一目标吗?

I've got a Python CGI script that pulls data from a GPS service; I'd like this information to be updated on the webpage about once every 10s (the max allowed by the GPS service's TOS). But there could be, say, 100 users viewing the webpage at once, all calling the script.

I think the users' scripts need to grab data from a buffer page that itself only upates once every ten seconds. How can I make this buffer page auto-update if there's no one directly viewing the content (and not accessing the CGI)? Are there better ways to accomplish this?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

大海や 2024-08-21 17:08:20

将 GPS 数据查询的结果与日期时间一起缓存在文件或数据库 (sqlite) 中。

然后,您可以根据上次缓存的日期时间进行日期时间检查,以启动另一个 GPS 数据查询。

不过,您可能会遇到 cgi 和日期时间检查的并发问题...

为了解决并发问题,您可以使用 sqlite,并将写入放在 try/ except 中。
这是使用 sqlite 的示例缓存实现。

import datetime
import sqlite3 

class GpsCache(object):
    db_path = 'gps_cache.db'
    def __init__(self):
        self.con = sqlite3.connect(self.db_path)
        self.cur = self.con.cursor()

    def _get_period(self, dt=None):
        '''normalize time to 15 minute periods'''
        if dt.minute < 15:
           minute_period = 0
        elif 15 <= dt.minute < 30:
           minute_period = 15
        elif 30 <= dt_minute < 45: 
           minute_period = 30
        elif 45 <= dt_minute:
           minute_period = 25
        period_dt = datetime.datetime(year=dt.year, month=dt.month, day=dt.day, hour=dt.hour, minute=minute_period)
        return period_dt

    def get_cache(dt=None):
        period_dt = self._get_period(dt)
        select_sql = 'SELECT * FROM GPS_CACHE WHERE date_time = "%s";' % period_dt.strftime('%Y-%m-%d %H:%M')
        self.cur.execut(select_sql)
        result = self.cur.fetchone()[0]
        return result


    def put_cache(dt=None, data=None):
        period_dt = self._get_period(dt)
        insert_sql = 'INSERT ....'  # edit to your table structure
        try:
            self.cur.execute(insert_sql)
            self.con.commit()
        except sqlite3.OperationalError:
            # assume db is being updated by another process with the current resutls and ignore
            pass

所以我们现在有了缓存工具的实现端。

您需要首先检查缓存,然后如果它不是“新鲜”(不返回任何内容),请使用当前方法获取数据。然后缓存你​​抓取的数据。
您可能应该更好地组织它,但您应该在这里了解总体思路。

使用此示例,您只需将当前对“remote_get_gps_data”的调用替换为“get_gps_data”即可。

from gps_cacher import GpsCache

def remote_get_gps_data():
    # your function here
    return data

def get_gps_data():
    data = None
    gps_cache = GpsCache()
    current_dt = datetime.datetime.now()
    cached_data = gps_cache.get_cache(current_dt)    
    if cached_data:
        data = cached_data
    else:
        data = remote_get_gps_data()
        gps_cache.put_cache(current_dt, data)
    return data

Cache the results of your GPS data query in a file or database (sqlite) along with a datetime.

You can then do a datetime check against the last cached datetime to initiate another GPS data query.

You'll probably run into concurrency issues with cgi and the datetime check though...

To get around concurrency issues, you can use sqlite, and put the write in a try/except.
Here's a sample cache implementation using sqlite.

import datetime
import sqlite3 

class GpsCache(object):
    db_path = 'gps_cache.db'
    def __init__(self):
        self.con = sqlite3.connect(self.db_path)
        self.cur = self.con.cursor()

    def _get_period(self, dt=None):
        '''normalize time to 15 minute periods'''
        if dt.minute < 15:
           minute_period = 0
        elif 15 <= dt.minute < 30:
           minute_period = 15
        elif 30 <= dt_minute < 45: 
           minute_period = 30
        elif 45 <= dt_minute:
           minute_period = 25
        period_dt = datetime.datetime(year=dt.year, month=dt.month, day=dt.day, hour=dt.hour, minute=minute_period)
        return period_dt

    def get_cache(dt=None):
        period_dt = self._get_period(dt)
        select_sql = 'SELECT * FROM GPS_CACHE WHERE date_time = "%s";' % period_dt.strftime('%Y-%m-%d %H:%M')
        self.cur.execut(select_sql)
        result = self.cur.fetchone()[0]
        return result


    def put_cache(dt=None, data=None):
        period_dt = self._get_period(dt)
        insert_sql = 'INSERT ....'  # edit to your table structure
        try:
            self.cur.execute(insert_sql)
            self.con.commit()
        except sqlite3.OperationalError:
            # assume db is being updated by another process with the current resutls and ignore
            pass

So we have the cache tool now the implementation side.

You'll want to check the cache first then if it's not 'fresh' (doens't return anything), go grab the data using your current method. Then cache the data you grabbed.
you should probably organize this better, but you should get the general idea here.

Using this sample, you just replace your current calls to 'remote_get_gps_data' with 'get_gps_data'.

from gps_cacher import GpsCache

def remote_get_gps_data():
    # your function here
    return data

def get_gps_data():
    data = None
    gps_cache = GpsCache()
    current_dt = datetime.datetime.now()
    cached_data = gps_cache.get_cache(current_dt)    
    if cached_data:
        data = cached_data
    else:
        data = remote_get_gps_data()
        gps_cache.put_cache(current_dt, data)
    return data
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文