连续检查JSON URL是否使用Python中的数据更改

发布于 2025-02-04 20:14:00 字数 1083 浏览 2 评论 0原文

我有一个写的小脚本(见下文),从Web URL获取JSON数据。目的是打印JSON中的任何新数据。有什么办法可以每5秒连续检查URL并报告任何更改?我确定我做得不正确,但是我尝试的是从JSON对象项目创建第一个列表,等待5秒钟创建第二个列表,然后比较两个列表。这显然不是这样做的方法,因为我仍然必须每次都必须自己运行脚本。我只想在脚本上运行脚本,将其作为“听”或“民意调查” URL,然后退回数据中的任何更改。我的代码在下面,非常感谢您对我的脚本提出的任何其他优化。

import json, requests 
import time

urls=["https://api.coingecko.com/api/v3/coins/markets?vs_currency=usd&order=market_cap_desc&per_page=250&page=1&sparkline=false", "https://api.coingecko.com/api/v3/coins/markets?vs_currency=usd&order=market_cap_desc&per_page=250&page=2&sparkline=false"]

def get_data(url):
    url = requests.get(url)
    text = url.text
    data = json.loads(text)
    coins = [coin['id'] for coin in data]
    return coins
 
def check_new_coins():
    first_list = get_data(url)
    time.sleep(5)
    second_list= get_data(url)
    new_coins = list(set(second_list).difference(first_list))

    if len(new_coins) > 0:
        for new_coin in new_coins:
            print(new_coin)
    else:
        print("No new coins")

for url in urls:
    check_new_coins()

I have a small script that I have written (see below) which fetches JSON data from a web url. The goal is to print out any new data in the JSON. Is there any way I can continuously check the URL every 5 seconds and report back any changes? I am sure I am not doing it right, but what i have tried is creating a first list from the JSON object items, waiting 5 seconds creating a second list and then comparing the two. This is obviously not the way to do it because I still have to run the script myself each time. I just want to run the script once have it kind of 'listen' or 'poll' the URL and throw back any changes in data. My code is below, any assistance is greatly appreciated and any other optimizations you would suggest for my script.

import json, requests 
import time

urls=["https://api.coingecko.com/api/v3/coins/markets?vs_currency=usd&order=market_cap_desc&per_page=250&page=1&sparkline=false", "https://api.coingecko.com/api/v3/coins/markets?vs_currency=usd&order=market_cap_desc&per_page=250&page=2&sparkline=false"]

def get_data(url):
    url = requests.get(url)
    text = url.text
    data = json.loads(text)
    coins = [coin['id'] for coin in data]
    return coins
 
def check_new_coins():
    first_list = get_data(url)
    time.sleep(5)
    second_list= get_data(url)
    new_coins = list(set(second_list).difference(first_list))

    if len(new_coins) > 0:
        for new_coin in new_coins:
            print(new_coin)
    else:
        print("No new coins")

for url in urls:
    check_new_coins()

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

烟酉 2025-02-11 20:14:00

我认为这就是您正在寻找的

import json, requests 
import time

class Checker:
    def __init__(self, urls, wait_time):
        self.wait_time = wait_time
        self.urls = urls
        self.coins = self.get_coins()
        self.main_loop()
    
    @staticmethod
    def get_data(url):
        url = requests.get(url)
        text = url.text
        data = json.loads(text)
        coins = [coin['id'] for coin in data]
        return coins
    
    def get_coins(self):
        coins = set()
        for url in self.urls:
            coins.update(Checker.get_data(url))
        return coins

    def check_new_coins(self):
        new_coins = self.get_coins()
        
        coins_diff = list(new_coins.difference(self.coins))
        
        current_time = time.strftime("%H:%M:%S", time.localtime())
        
        if len(coins_diff) > 0:
            print(current_time, coins_diff)
        else:
            print(current_time, "No new coins")

        self.coins = new_coins
    
    def main_loop(self):
        while True:
            time.sleep(self.wait_time)
            self.check_new_coins()

if __name__ == '__main__':
    urls=[
        "https://api.coingecko.com/api/v3/coins/markets?vs_currency=usd&order=market_cap_desc&per_page=250&page=1&sparkline=false",
        "https://api.coingecko.com/api/v3/coins/markets?vs_currency=usd&order=market_cap_desc&per_page=250&page=2&sparkline=false"
    ]

    Checker(urls, 5)

样本输出:

18:57:20 No new coins
18:57:25 No new coins
18:57:30 No new coins
18:57:35 No new coins
18:57:41 No new coins
18:57:46 No new coins
18:57:51 No new coins
18:57:56 No new coins

I think that's what you're looking for

import json, requests 
import time

class Checker:
    def __init__(self, urls, wait_time):
        self.wait_time = wait_time
        self.urls = urls
        self.coins = self.get_coins()
        self.main_loop()
    
    @staticmethod
    def get_data(url):
        url = requests.get(url)
        text = url.text
        data = json.loads(text)
        coins = [coin['id'] for coin in data]
        return coins
    
    def get_coins(self):
        coins = set()
        for url in self.urls:
            coins.update(Checker.get_data(url))
        return coins

    def check_new_coins(self):
        new_coins = self.get_coins()
        
        coins_diff = list(new_coins.difference(self.coins))
        
        current_time = time.strftime("%H:%M:%S", time.localtime())
        
        if len(coins_diff) > 0:
            print(current_time, coins_diff)
        else:
            print(current_time, "No new coins")

        self.coins = new_coins
    
    def main_loop(self):
        while True:
            time.sleep(self.wait_time)
            self.check_new_coins()

if __name__ == '__main__':
    urls=[
        "https://api.coingecko.com/api/v3/coins/markets?vs_currency=usd&order=market_cap_desc&per_page=250&page=1&sparkline=false",
        "https://api.coingecko.com/api/v3/coins/markets?vs_currency=usd&order=market_cap_desc&per_page=250&page=2&sparkline=false"
    ]

    Checker(urls, 5)

sample output:

18:57:20 No new coins
18:57:25 No new coins
18:57:30 No new coins
18:57:35 No new coins
18:57:41 No new coins
18:57:46 No new coins
18:57:51 No new coins
18:57:56 No new coins
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文