网络刮板在数字海洋VPS上冻结

发布于 2025-02-04 04:28:53 字数 7884 浏览 2 评论 0原文

我使用asyncio和httpx的刮板构建,并且它触发了邮政请求,其中用户将关键字列表上传为CSV文件。当地的所有功能都很好,但是当我尝试在数字海洋服务器上执行50多个关键字时,它会挂断电话。

我的API代码片段:

import fastapi as _fastapi
from fastapi.responses import HTMLResponse, FileResponse, Response, RedirectResponse, StreamingResponse
from starlette.requests import Request
from starlette.templating import Jinja2Templates
import shutil
import os
import time

from rq import Queue
from rq.job import Job

from redis import Redis

from scraper import run_scraper
from utils import clean_file, csv_writer

app = _fastapi.FastAPI()

r = Redis(
    host="localhost",
    port=6379,
    db=0,
)
q = Queue(connection=r, default_timeout=-1)

templates = Jinja2Templates("templates")




@app.get("/")
def index(request: Request):
    return templates.TemplateResponse("index.html", {"request": request})


@app.post("/api/v1/scraped_csv")
async def extract_ads(csv_file: _fastapi.UploadFile = _fastapi.File(...)):
    temp_file = _save_file_to_disk(csv_file, path="temp", save_as="temp")
    task = q.enqueue(run_scraper, temp_file)

    return RedirectResponse(f'/progress/{task.id}', status_code=303)
    


@app.get("/progress/{job_id}")
def progress(request: Request, job_id):
    job = Job.fetch(job_id, connection=r)
    if job.is_finished:
        csv_path = os.path.abspath(clean_file)
        return FileResponse(path=csv_path, media_type="text/csv", filename=clean_file)
    return templates.TemplateResponse("log_stream.html", {"request": request})


@app.get("/log_stream/")
def stream():
    def iterfile():
        with open("scraper.log") as log_info:
            while True:
                where = log_info.tell()
                line = log_info.readline()
                if not line:
                    time.sleep(0.5)
                    log_info.seek(where)
                else:
                    yield line

   
    return StreamingResponse(iterfile(), media_type="text/plain")


def _save_file_to_disk(uploaded_file, path=".", save_as="default"):
    extension = os.path.splitext(uploaded_file.filename)[-1]
    temp_file = os.path.join(path, save_as + extension)
    with open(temp_file, "wb") as buffer:
        shutil.copyfileobj(uploaded_file.file, buffer)
    return temp_file

由于请求超时问题,我正在使用Redis队列并在后台进行刮擦。

我的刮板代码片段:

import httpx
import asyncio
from bs4 import BeautifulSoup
from decouple import config
from urllib.parse import urlencode
from urllib.parse import urlparse
import os
import logging

from utils import csv_reader, csv_writer

SCRAPERAPI_KEY = config("API_KEY")
NUM_RETRIES = 3

logging.basicConfig(filename="scraper.log",
                    format='%(asctime)s %(message)s',
                    filemode='w')

logger = logging.getLogger()
logger.setLevel(logging.DEBUG)


base_url = "https://www.google.com/search?"

headers = {
    "authority": "www.google.com",
    "sec-ch-dpr": "1",
    "sec-ch-viewport-width": "1366",
    "sec-ch-ua": '" Not A;Brand";v="99", "Chromium";v="98", "Yandex";v="22"',
    "sec-ch-ua-mobile": "?0",
    "sec-ch-ua-full-version": '"22.3.3.886"',
    "sec-ch-ua-arch": '"x86"',
    "sec-ch-ua-platform": '"Linux"',
    "sec-ch-ua-platform-version": '"5.4.0"',
    "sec-ch-ua-model": '""',
    "sec-ch-ua-bitness": '"64"',
    "sec-ch-ua-full-version-list": '" Not A;Brand";v="99.0.0.0", "Chromium";v="98.0.4758.886", "Yandex";v="22.3.3.886"',
    "upgrade-insecure-requests": "1",
    "user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.141 Safari/537.36",
    "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
    "sec-fetch-site": "same-origin",
    "sec-fetch-mode": "navigate",
    "sec-fetch-user": "?1",
    "sec-fetch-dest": "document",
    "referer": "https://www.google.com/",
    "accept-language": "en,ru;q=0.9",
}

pagination_params = {
    "q": "lift installation",
    "source": "lnms",
    "tbm": "shop",
    "ei": "aW-HYv74MrCOseMP_8OumAY",
    "start": "",
    "sa": "X",
    "ved": "0ahUKEwikobSm8e33AhUjUGwGHQVfDr84PBDy0wMIpw0",
    "biw": "480",
    "bih": "665",
    "dpr": "1",
}

initial_params = {
    "q": "",
    "source": "lnms",
    "tbm": "shop",
    "sa": "X",
    "ved": "0ahUKEwikobSm8e33AhUjUGwGHQVfDr84PBDy0wMIpw0",
    "biw": "480",
    "bih": "665",
    "dpr": "1",
}

results = []


def get_scraperapi_url(url):
    payload = {
        "api_key": SCRAPERAPI_KEY,
        "url": url,
        "country_code": "au",
        "keep_headers": "true",
    }
    proxy_url = "http://api.scraperapi.com/?" + urlencode(payload)
    return proxy_url


async def log_request(request):
    logger.debug(f"Request: {request.method} {request.url}")


async def log_response(response):
    request = response.request
    logger.debug(f"Response: {request.method} {request.url} - Status: {response.status_code}")


async def fetch_pages(keyword, page_no):
    initial_params["q"] = keyword
    if not page_no:
        params = initial_params
        url = base_url + urlencode(params)
    else:
        params = pagination_params

        params["start"] = str(page_no * 10)

        params["q"] = keyword
        url = base_url + urlencode(params)

    async with httpx.AsyncClient(
        event_hooks={"request": [log_request], "response": [log_response]}
    ) as client:
        response = await client.get(
            get_scraperapi_url(url), headers=headers, timeout=None
        )

        return response


async def parse_page(html):

    ad_urls = []
    content = BeautifulSoup(html, "lxml")

    for ad in content.find_all("a", {"class": "sh-np__click-target"}):
        try:
            async with httpx.AsyncClient() as client:
                r = await client.get(
                    "https://www.google.com" + ad["href"], headers=headers, timeout=None
                )
                url = str(r.url)
                ad_urls.append(urlparse(url).netloc)
                logger.debug(urlparse(url).netloc)
        except:
            pass

    for idx in range(len(ad_urls)):

        results.append({"Ad_Url": ad_urls[idx]})

    return results


async def run_scraper(file_path):
    tasks = []
    kw = await csv_reader(file_path)
    for k in kw:
        for page in range(0, 4):
            for _ in range(NUM_RETRIES):
                try:
                    response = await fetch_pages(k, page)
                    if response.status_code in [200, 404]:
                        break
                except httpx.ConnectError:
                    response = ""
            if response.status_code == 200:
                tasks.append(asyncio.create_task(parse_page(response.content)))

    ad_data = await asyncio.gather(*tasks)

    logger.info('Done!')
    await csv_writer(ad_data[0])
    logger.info('csv created.. Please refresh the page to download the csv.')
    

    return ad_data[0]

Gunicorn.Py

import multiprocessing
import os
from dotenv import load_dotenv
load_dotenv()

name = "gunicorn config for FastAPI - TutLinks.com"
accesslog = "/root/fastapi_google_ads_scraper/google_ad_scraper/gunicorn-access.log"
errorlog = "/root/fastapi_google_ads_scraper/google_ad_scraper/gunicorn-error.log"

bind = "0.0.0.0:8000"

worker_class = "uvicorn.workers.UvicornWorker"
workers = multiprocessing.cpu_count () * 2 + 1
worker_connections = 1024
backlog = 2048
max_requests = 5120
timeout = 0
keepalive = 2

debug = os.environ.get("debug", "false") == "true"
reload = debug
preload_app = False
daemon = False

Digital Ocean VP:8GB RAM,4 AMD VCPU,25GB磁盘,Ubuntu 20.04 20.04

Scraper只需挂断任何错误即可挂断,我已声明default_timeout = -1 indis queue in Redis queue in Redis queue in因为某些请求确实比平时多。我试图在VPS下独立运行Scraper,以查看它是刮板问题还是API,但刮板本身在最后一个关键字上挂了数小时。关于如何解决此问题的任何想法?我已经在墙上撞了数周了数周,但不知道。谢谢

I have this scraper build with asyncio and httpx and it triggers on POST request where a user uploads the list of keywords as a csv file. Everything working fine locally but it hangs up when I try to do 50+ keywords on digital ocean server.

my api code fragment:

import fastapi as _fastapi
from fastapi.responses import HTMLResponse, FileResponse, Response, RedirectResponse, StreamingResponse
from starlette.requests import Request
from starlette.templating import Jinja2Templates
import shutil
import os
import time

from rq import Queue
from rq.job import Job

from redis import Redis

from scraper import run_scraper
from utils import clean_file, csv_writer

app = _fastapi.FastAPI()

r = Redis(
    host="localhost",
    port=6379,
    db=0,
)
q = Queue(connection=r, default_timeout=-1)

templates = Jinja2Templates("templates")




@app.get("/")
def index(request: Request):
    return templates.TemplateResponse("index.html", {"request": request})


@app.post("/api/v1/scraped_csv")
async def extract_ads(csv_file: _fastapi.UploadFile = _fastapi.File(...)):
    temp_file = _save_file_to_disk(csv_file, path="temp", save_as="temp")
    task = q.enqueue(run_scraper, temp_file)

    return RedirectResponse(f'/progress/{task.id}', status_code=303)
    


@app.get("/progress/{job_id}")
def progress(request: Request, job_id):
    job = Job.fetch(job_id, connection=r)
    if job.is_finished:
        csv_path = os.path.abspath(clean_file)
        return FileResponse(path=csv_path, media_type="text/csv", filename=clean_file)
    return templates.TemplateResponse("log_stream.html", {"request": request})


@app.get("/log_stream/")
def stream():
    def iterfile():
        with open("scraper.log") as log_info:
            while True:
                where = log_info.tell()
                line = log_info.readline()
                if not line:
                    time.sleep(0.5)
                    log_info.seek(where)
                else:
                    yield line

   
    return StreamingResponse(iterfile(), media_type="text/plain")


def _save_file_to_disk(uploaded_file, path=".", save_as="default"):
    extension = os.path.splitext(uploaded_file.filename)[-1]
    temp_file = os.path.join(path, save_as + extension)
    with open(temp_file, "wb") as buffer:
        shutil.copyfileobj(uploaded_file.file, buffer)
    return temp_file

I am using redis queue and make scraping in the background because of request timeout issue.

my scraper code fragment:

import httpx
import asyncio
from bs4 import BeautifulSoup
from decouple import config
from urllib.parse import urlencode
from urllib.parse import urlparse
import os
import logging

from utils import csv_reader, csv_writer

SCRAPERAPI_KEY = config("API_KEY")
NUM_RETRIES = 3

logging.basicConfig(filename="scraper.log",
                    format='%(asctime)s %(message)s',
                    filemode='w')

logger = logging.getLogger()
logger.setLevel(logging.DEBUG)


base_url = "https://www.google.com/search?"

headers = {
    "authority": "www.google.com",
    "sec-ch-dpr": "1",
    "sec-ch-viewport-width": "1366",
    "sec-ch-ua": '" Not A;Brand";v="99", "Chromium";v="98", "Yandex";v="22"',
    "sec-ch-ua-mobile": "?0",
    "sec-ch-ua-full-version": '"22.3.3.886"',
    "sec-ch-ua-arch": '"x86"',
    "sec-ch-ua-platform": '"Linux"',
    "sec-ch-ua-platform-version": '"5.4.0"',
    "sec-ch-ua-model": '""',
    "sec-ch-ua-bitness": '"64"',
    "sec-ch-ua-full-version-list": '" Not A;Brand";v="99.0.0.0", "Chromium";v="98.0.4758.886", "Yandex";v="22.3.3.886"',
    "upgrade-insecure-requests": "1",
    "user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.141 Safari/537.36",
    "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
    "sec-fetch-site": "same-origin",
    "sec-fetch-mode": "navigate",
    "sec-fetch-user": "?1",
    "sec-fetch-dest": "document",
    "referer": "https://www.google.com/",
    "accept-language": "en,ru;q=0.9",
}

pagination_params = {
    "q": "lift installation",
    "source": "lnms",
    "tbm": "shop",
    "ei": "aW-HYv74MrCOseMP_8OumAY",
    "start": "",
    "sa": "X",
    "ved": "0ahUKEwikobSm8e33AhUjUGwGHQVfDr84PBDy0wMIpw0",
    "biw": "480",
    "bih": "665",
    "dpr": "1",
}

initial_params = {
    "q": "",
    "source": "lnms",
    "tbm": "shop",
    "sa": "X",
    "ved": "0ahUKEwikobSm8e33AhUjUGwGHQVfDr84PBDy0wMIpw0",
    "biw": "480",
    "bih": "665",
    "dpr": "1",
}

results = []


def get_scraperapi_url(url):
    payload = {
        "api_key": SCRAPERAPI_KEY,
        "url": url,
        "country_code": "au",
        "keep_headers": "true",
    }
    proxy_url = "http://api.scraperapi.com/?" + urlencode(payload)
    return proxy_url


async def log_request(request):
    logger.debug(f"Request: {request.method} {request.url}")


async def log_response(response):
    request = response.request
    logger.debug(f"Response: {request.method} {request.url} - Status: {response.status_code}")


async def fetch_pages(keyword, page_no):
    initial_params["q"] = keyword
    if not page_no:
        params = initial_params
        url = base_url + urlencode(params)
    else:
        params = pagination_params

        params["start"] = str(page_no * 10)

        params["q"] = keyword
        url = base_url + urlencode(params)

    async with httpx.AsyncClient(
        event_hooks={"request": [log_request], "response": [log_response]}
    ) as client:
        response = await client.get(
            get_scraperapi_url(url), headers=headers, timeout=None
        )

        return response


async def parse_page(html):

    ad_urls = []
    content = BeautifulSoup(html, "lxml")

    for ad in content.find_all("a", {"class": "sh-np__click-target"}):
        try:
            async with httpx.AsyncClient() as client:
                r = await client.get(
                    "https://www.google.com" + ad["href"], headers=headers, timeout=None
                )
                url = str(r.url)
                ad_urls.append(urlparse(url).netloc)
                logger.debug(urlparse(url).netloc)
        except:
            pass

    for idx in range(len(ad_urls)):

        results.append({"Ad_Url": ad_urls[idx]})

    return results


async def run_scraper(file_path):
    tasks = []
    kw = await csv_reader(file_path)
    for k in kw:
        for page in range(0, 4):
            for _ in range(NUM_RETRIES):
                try:
                    response = await fetch_pages(k, page)
                    if response.status_code in [200, 404]:
                        break
                except httpx.ConnectError:
                    response = ""
            if response.status_code == 200:
                tasks.append(asyncio.create_task(parse_page(response.content)))

    ad_data = await asyncio.gather(*tasks)

    logger.info('Done!')
    await csv_writer(ad_data[0])
    logger.info('csv created.. Please refresh the page to download the csv.')
    

    return ad_data[0]

gunicorn.py

import multiprocessing
import os
from dotenv import load_dotenv
load_dotenv()

name = "gunicorn config for FastAPI - TutLinks.com"
accesslog = "/root/fastapi_google_ads_scraper/google_ad_scraper/gunicorn-access.log"
errorlog = "/root/fastapi_google_ads_scraper/google_ad_scraper/gunicorn-error.log"

bind = "0.0.0.0:8000"

worker_class = "uvicorn.workers.UvicornWorker"
workers = multiprocessing.cpu_count () * 2 + 1
worker_connections = 1024
backlog = 2048
max_requests = 5120
timeout = 0
keepalive = 2

debug = os.environ.get("debug", "false") == "true"
reload = debug
preload_app = False
daemon = False

Digital Ocean VPS: 8gb ram, 4 AMD vcpus, 25gb disk, Ubuntu 20.04

Scraper just hangs up without any errors and I have declared the default_timeout=-1 in redis queue because some of the requests do take time more than usual. I have tried to run the scraper independently under vps to see whether it is scraper issue or api but the scraper itself hang up for hours at the last keyword. Any ideas on how to solve this issue? I have been hitting my head against the wall for weeks now but couldn't figure out. Thank you

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文