youtube api - 等待视频

发布于 2025-01-10 09:09:25 字数 2850 浏览 0 评论 0原文

我制作了一个机器人,它等待来自多个频道的视频并将新视频发送到电报。代码如下:

from distutils.log import error
from googleapiclient.discovery import build
import config
import time

import telebot
ChannelsAndVideos = []
Channels = config.channels_id
bot_token = config.bot_token
api_key = config.api_key
sleepTime = config.sleepTime
messageText = config.messageText
telegram_channels_id = config.telegram_channels_id
youtube = build('youtube', 'v3', developerKey=api_key)
bot = telebot.TeleBot(bot_token)
sended = []
for ChannelId in Channels:
    try:
        request = youtube.channels().list(
            part='statistics',
            id=ChannelId
            )
        response = request.execute()
        if(response['pageInfo']['totalResults'] != 1):
            print("Error! Не удалось добавить " + ChannelId)
        else:
            print(ChannelId + " был добавлен (" + response.get('items')[0].get('statistics').get('videoCount') + " видео)")
            ChannelsAndVideos.append((ChannelId, response.get('items')[0].get('statistics').get('videoCount')))
    except error:
        print(error.message)
while(True):
    time.sleep(sleepTime)
    for i in range(len(ChannelsAndVideos)):
        request = youtube.channels().list(
            part='statistics',
            id=ChannelsAndVideos[i][0]
        )
        response = request.execute()
        if(response['pageInfo']['totalResults'] != 1):
            print("Error!")
        else:
            if response.get('items')[0].get('statistics').get('videoCount') > ChannelsAndVideos[i][1]:
                ChannelsAndVideos[i] = (ChannelsAndVideos[i][0], response.get('items')[0].get('statistics').get('videoCount'))
                request = youtube.channels().list(
                    part='contentDetails',
                    id=ChannelsAndVideos[i][0]
                    )
                response = request.execute()
                request = youtube.playlistItems().list(
                part=['contentDetails','snippet','status'],
                playlistId=response['items'][0]['contentDetails']['relatedPlaylists']['uploads']
                )
                response = request.execute()
                if not (response['items'][0]['snippet']['resourceId']['videoId'] in sended):
                    sended.append(response['items'][0]['snippet']['resourceId']['videoId'])
                    for chat_id in telegram_channels_id:
                       try:
                            bot.send_message(chat_id, messageText + "https://www.youtube.com/watch?v=" + response['items'][0]['snippet']['resourceId']['videoId'])
                       except ...:
                           print("Не удалось отправить сообщение в " + str(chat_id))

它的实现存在一个问题:每 30 秒它就会请求每个频道的最新视频,所以这会吃掉RAM 和其余可能的请求(它们的数量在 youtube api 中受到限制)。我该如何优化这个系统?

I made a bot that waits for videos from several channels and sends new ones to telegram.. Here is the code:

from distutils.log import error
from googleapiclient.discovery import build
import config
import time

import telebot
ChannelsAndVideos = []
Channels = config.channels_id
bot_token = config.bot_token
api_key = config.api_key
sleepTime = config.sleepTime
messageText = config.messageText
telegram_channels_id = config.telegram_channels_id
youtube = build('youtube', 'v3', developerKey=api_key)
bot = telebot.TeleBot(bot_token)
sended = []
for ChannelId in Channels:
    try:
        request = youtube.channels().list(
            part='statistics',
            id=ChannelId
            )
        response = request.execute()
        if(response['pageInfo']['totalResults'] != 1):
            print("Error! Не удалось добавить " + ChannelId)
        else:
            print(ChannelId + " был добавлен (" + response.get('items')[0].get('statistics').get('videoCount') + " видео)")
            ChannelsAndVideos.append((ChannelId, response.get('items')[0].get('statistics').get('videoCount')))
    except error:
        print(error.message)
while(True):
    time.sleep(sleepTime)
    for i in range(len(ChannelsAndVideos)):
        request = youtube.channels().list(
            part='statistics',
            id=ChannelsAndVideos[i][0]
        )
        response = request.execute()
        if(response['pageInfo']['totalResults'] != 1):
            print("Error!")
        else:
            if response.get('items')[0].get('statistics').get('videoCount') > ChannelsAndVideos[i][1]:
                ChannelsAndVideos[i] = (ChannelsAndVideos[i][0], response.get('items')[0].get('statistics').get('videoCount'))
                request = youtube.channels().list(
                    part='contentDetails',
                    id=ChannelsAndVideos[i][0]
                    )
                response = request.execute()
                request = youtube.playlistItems().list(
                part=['contentDetails','snippet','status'],
                playlistId=response['items'][0]['contentDetails']['relatedPlaylists']['uploads']
                )
                response = request.execute()
                if not (response['items'][0]['snippet']['resourceId']['videoId'] in sended):
                    sended.append(response['items'][0]['snippet']['resourceId']['videoId'])
                    for chat_id in telegram_channels_id:
                       try:
                            bot.send_message(chat_id, messageText + "https://www.youtube.com/watch?v=" + response['items'][0]['snippet']['resourceId']['videoId'])
                       except ...:
                           print("Не удалось отправить сообщение в " + str(chat_id))

There is a problem in its implementation: every 30 seconds it makes a request for the latest videos of each channel, so this eats up RAM and the rest of possible requests (their number is limited in the youtube api). How can I optimize this system?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

执笏见 2025-01-17 09:09:25

减少内存使用

 已发送 = []
 ...
                如果不是(已发送的响应['items'][0]['snippet']['resourceId']['videoId']):
                    send.append(response['items'][0]['snippet']['resourceId']['videoId'])

为了避免发送的视频列表不断增长,请使用集合。它还会将发送中的['videoId']的运行时复杂度从 O(n) 提高到 O(1)。

sended = set()
...
                if not (response['items'][0]['snippet']['resourceId']['videoId'] in sended):
                    sended.add(response['items'][0]['snippet']['resourceId']['videoId'])

速率限制

while(True):
    时间.睡眠(睡眠时间)
    对于范围内的 i(len(ChannelsAndVideos)):
        请求= youtube.channels().list(

不是在迭代通道之前sleep大块,而是在每次向 API 发出请求之前sleep。为了方便使用,创建一个帮助器 request 函数,该函数将确保在下一个请求之前至少有 sleepTime 延迟。

last_request_time = time.time()
def execute(request)
    global last_request_time
    next_request_time = last_request_time + sleepTime
    time.sleep(max(next_request_time - time.time(), 0))
    last_request_time = time.time()
    return request.execute()

现在将所有 request.execute() 替换为 execute(request)。根据 YouTube API 使用限制调整 sleepTime 值并删除所有 time.sleep(sleepTime)

结果

from distutils.log import error
from googleapiclient.discovery import build
import config
import time

import telebot
ChannelsAndVideos = []
Channels = config.channels_id
bot_token = config.bot_token
api_key = config.api_key
sleepTime = config.sleepTime
messageText = config.messageText
telegram_channels_id = config.telegram_channels_id
youtube = build('youtube', 'v3', developerKey=api_key)
bot = telebot.TeleBot(bot_token)

sended = set()


last_request_time = 0
def execute(request)
    global last_request_time
    next_request_time = last_request_time + sleepTime
    time.sleep(max(next_request_time - time.time(), 0))
    last_request_time = time.time()
    return request.execute()


for ChannelId in Channels:
    try:
        request = youtube.channels().list(
            part='statistics',
            id=ChannelId
            )
        response = execute(request)
        if(response['pageInfo']['totalResults'] != 1):
            print(‎"Error! Failed to add "‎ + ChannelId)
        else:
            print(ChannelId + ‎" has been added ("‎ + response.get('items')[0].get('statistics').get('videoCount') + ‎" video)"‎)
            ChannelsAndVideos.append((ChannelId, response.get('items')[0].get('statistics').get('videoCount')))
    except error:
        print(error.message)

while(True):
    for i in range(len(ChannelsAndVideos)):
        request = youtube.channels().list(
            part='statistics',
            id=ChannelsAndVideos[i][0]
        )
        response = execute(request)
        if(response['pageInfo']['totalResults'] != 1):
            print("Error!")
        else:
            if response.get('items')[0].get('statistics').get('videoCount') > ChannelsAndVideos[i][1]:
                ChannelsAndVideos[i] = (ChannelsAndVideos[i][0], response.get('items')[0].get('statistics').get('videoCount'))
                request = youtube.channels().list(
                    part='contentDetails',
                    id=ChannelsAndVideos[i][0]
                    )
                response = execute(request)
                request = youtube.playlistItems().list(
                part=['contentDetails','snippet','status'],
                playlistId=response['items'][0]['contentDetails']['relatedPlaylists']['uploads']
                )
                response = execute(request)
                if not (response['items'][0]['snippet']['resourceId']['videoId'] in sended):
                    sended.add(response['items'][0]['snippet']['resourceId']['videoId'])
                    for chat_id in telegram_channels_id:
                       try:
                            bot.send_message(chat_id, messageText + "https://www.youtube.com/watch?v=" + response['items'][0]['snippet']['resourceId']['videoId'])
                       except ...:
                           print(‎"Failed to send message to "‎ + str(chat_id))

Reducing memory usage

 sended = []
 ...
                if not (response['items'][0]['snippet']['resourceId']['videoId'] in sended):
                    sended.append(response['items'][0]['snippet']['resourceId']['videoId'])

To avoid the ever growing list of sent videos, use a set. It will also improve the runtime complexity of ['videoId'] in sended from O(n) to O(1).

sended = set()
...
                if not (response['items'][0]['snippet']['resourceId']['videoId'] in sended):
                    sended.add(response['items'][0]['snippet']['resourceId']['videoId'])

Rate limiting

while(True):
    time.sleep(sleepTime)
    for i in range(len(ChannelsAndVideos)):
        request = youtube.channels().list(

Instead of sleeping in big chunk before iterating the channels, sleep before each request to the api. For easy usage, create a helper request function which will ensure at least sleepTime delay before the next request.

last_request_time = time.time()
def execute(request)
    global last_request_time
    next_request_time = last_request_time + sleepTime
    time.sleep(max(next_request_time - time.time(), 0))
    last_request_time = time.time()
    return request.execute()

Now replace all request.execute()s with execute(request). Adjust sleepTime value based on YouTube api usage limits and remove all time.sleep(sleepTime).

Result

from distutils.log import error
from googleapiclient.discovery import build
import config
import time

import telebot
ChannelsAndVideos = []
Channels = config.channels_id
bot_token = config.bot_token
api_key = config.api_key
sleepTime = config.sleepTime
messageText = config.messageText
telegram_channels_id = config.telegram_channels_id
youtube = build('youtube', 'v3', developerKey=api_key)
bot = telebot.TeleBot(bot_token)

sended = set()


last_request_time = 0
def execute(request)
    global last_request_time
    next_request_time = last_request_time + sleepTime
    time.sleep(max(next_request_time - time.time(), 0))
    last_request_time = time.time()
    return request.execute()


for ChannelId in Channels:
    try:
        request = youtube.channels().list(
            part='statistics',
            id=ChannelId
            )
        response = execute(request)
        if(response['pageInfo']['totalResults'] != 1):
            print(‎"Error! Failed to add "‎ + ChannelId)
        else:
            print(ChannelId + ‎" has been added ("‎ + response.get('items')[0].get('statistics').get('videoCount') + ‎" video)"‎)
            ChannelsAndVideos.append((ChannelId, response.get('items')[0].get('statistics').get('videoCount')))
    except error:
        print(error.message)

while(True):
    for i in range(len(ChannelsAndVideos)):
        request = youtube.channels().list(
            part='statistics',
            id=ChannelsAndVideos[i][0]
        )
        response = execute(request)
        if(response['pageInfo']['totalResults'] != 1):
            print("Error!")
        else:
            if response.get('items')[0].get('statistics').get('videoCount') > ChannelsAndVideos[i][1]:
                ChannelsAndVideos[i] = (ChannelsAndVideos[i][0], response.get('items')[0].get('statistics').get('videoCount'))
                request = youtube.channels().list(
                    part='contentDetails',
                    id=ChannelsAndVideos[i][0]
                    )
                response = execute(request)
                request = youtube.playlistItems().list(
                part=['contentDetails','snippet','status'],
                playlistId=response['items'][0]['contentDetails']['relatedPlaylists']['uploads']
                )
                response = execute(request)
                if not (response['items'][0]['snippet']['resourceId']['videoId'] in sended):
                    sended.add(response['items'][0]['snippet']['resourceId']['videoId'])
                    for chat_id in telegram_channels_id:
                       try:
                            bot.send_message(chat_id, messageText + "https://www.youtube.com/watch?v=" + response['items'][0]['snippet']['resourceId']['videoId'])
                       except ...:
                           print(‎"Failed to send message to "‎ + str(chat_id))
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文