网络刮板在数字海洋VPS上冻结
我使用asyncio和httpx的刮板构建,并且它触发了邮政请求,其中用户将关键字列表上传为CSV文件。当地的所有功能都很好,但是当我尝试在数字海洋服务器上执行50多个关键字时,它会挂断电话。
我的API代码片段:
import fastapi as _fastapi
from fastapi.responses import HTMLResponse, FileResponse, Response, RedirectResponse, StreamingResponse
from starlette.requests import Request
from starlette.templating import Jinja2Templates
import shutil
import os
import time
from rq import Queue
from rq.job import Job
from redis import Redis
from scraper import run_scraper
from utils import clean_file, csv_writer
app = _fastapi.FastAPI()
r = Redis(
host="localhost",
port=6379,
db=0,
)
q = Queue(connection=r, default_timeout=-1)
templates = Jinja2Templates("templates")
@app.get("/")
def index(request: Request):
return templates.TemplateResponse("index.html", {"request": request})
@app.post("/api/v1/scraped_csv")
async def extract_ads(csv_file: _fastapi.UploadFile = _fastapi.File(...)):
temp_file = _save_file_to_disk(csv_file, path="temp", save_as="temp")
task = q.enqueue(run_scraper, temp_file)
return RedirectResponse(f'/progress/{task.id}', status_code=303)
@app.get("/progress/{job_id}")
def progress(request: Request, job_id):
job = Job.fetch(job_id, connection=r)
if job.is_finished:
csv_path = os.path.abspath(clean_file)
return FileResponse(path=csv_path, media_type="text/csv", filename=clean_file)
return templates.TemplateResponse("log_stream.html", {"request": request})
@app.get("/log_stream/")
def stream():
def iterfile():
with open("scraper.log") as log_info:
while True:
where = log_info.tell()
line = log_info.readline()
if not line:
time.sleep(0.5)
log_info.seek(where)
else:
yield line
return StreamingResponse(iterfile(), media_type="text/plain")
def _save_file_to_disk(uploaded_file, path=".", save_as="default"):
extension = os.path.splitext(uploaded_file.filename)[-1]
temp_file = os.path.join(path, save_as + extension)
with open(temp_file, "wb") as buffer:
shutil.copyfileobj(uploaded_file.file, buffer)
return temp_file
由于请求超时问题,我正在使用Redis队列并在后台进行刮擦。
我的刮板代码片段:
import httpx
import asyncio
from bs4 import BeautifulSoup
from decouple import config
from urllib.parse import urlencode
from urllib.parse import urlparse
import os
import logging
from utils import csv_reader, csv_writer
SCRAPERAPI_KEY = config("API_KEY")
NUM_RETRIES = 3
logging.basicConfig(filename="scraper.log",
format='%(asctime)s %(message)s',
filemode='w')
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
base_url = "https://www.google.com/search?"
headers = {
"authority": "www.google.com",
"sec-ch-dpr": "1",
"sec-ch-viewport-width": "1366",
"sec-ch-ua": '" Not A;Brand";v="99", "Chromium";v="98", "Yandex";v="22"',
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-full-version": '"22.3.3.886"',
"sec-ch-ua-arch": '"x86"',
"sec-ch-ua-platform": '"Linux"',
"sec-ch-ua-platform-version": '"5.4.0"',
"sec-ch-ua-model": '""',
"sec-ch-ua-bitness": '"64"',
"sec-ch-ua-full-version-list": '" Not A;Brand";v="99.0.0.0", "Chromium";v="98.0.4758.886", "Yandex";v="22.3.3.886"',
"upgrade-insecure-requests": "1",
"user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.141 Safari/537.36",
"accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
"sec-fetch-site": "same-origin",
"sec-fetch-mode": "navigate",
"sec-fetch-user": "?1",
"sec-fetch-dest": "document",
"referer": "https://www.google.com/",
"accept-language": "en,ru;q=0.9",
}
pagination_params = {
"q": "lift installation",
"source": "lnms",
"tbm": "shop",
"ei": "aW-HYv74MrCOseMP_8OumAY",
"start": "",
"sa": "X",
"ved": "0ahUKEwikobSm8e33AhUjUGwGHQVfDr84PBDy0wMIpw0",
"biw": "480",
"bih": "665",
"dpr": "1",
}
initial_params = {
"q": "",
"source": "lnms",
"tbm": "shop",
"sa": "X",
"ved": "0ahUKEwikobSm8e33AhUjUGwGHQVfDr84PBDy0wMIpw0",
"biw": "480",
"bih": "665",
"dpr": "1",
}
results = []
def get_scraperapi_url(url):
payload = {
"api_key": SCRAPERAPI_KEY,
"url": url,
"country_code": "au",
"keep_headers": "true",
}
proxy_url = "http://api.scraperapi.com/?" + urlencode(payload)
return proxy_url
async def log_request(request):
logger.debug(f"Request: {request.method} {request.url}")
async def log_response(response):
request = response.request
logger.debug(f"Response: {request.method} {request.url} - Status: {response.status_code}")
async def fetch_pages(keyword, page_no):
initial_params["q"] = keyword
if not page_no:
params = initial_params
url = base_url + urlencode(params)
else:
params = pagination_params
params["start"] = str(page_no * 10)
params["q"] = keyword
url = base_url + urlencode(params)
async with httpx.AsyncClient(
event_hooks={"request": [log_request], "response": [log_response]}
) as client:
response = await client.get(
get_scraperapi_url(url), headers=headers, timeout=None
)
return response
async def parse_page(html):
ad_urls = []
content = BeautifulSoup(html, "lxml")
for ad in content.find_all("a", {"class": "sh-np__click-target"}):
try:
async with httpx.AsyncClient() as client:
r = await client.get(
"https://www.google.com" + ad["href"], headers=headers, timeout=None
)
url = str(r.url)
ad_urls.append(urlparse(url).netloc)
logger.debug(urlparse(url).netloc)
except:
pass
for idx in range(len(ad_urls)):
results.append({"Ad_Url": ad_urls[idx]})
return results
async def run_scraper(file_path):
tasks = []
kw = await csv_reader(file_path)
for k in kw:
for page in range(0, 4):
for _ in range(NUM_RETRIES):
try:
response = await fetch_pages(k, page)
if response.status_code in [200, 404]:
break
except httpx.ConnectError:
response = ""
if response.status_code == 200:
tasks.append(asyncio.create_task(parse_page(response.content)))
ad_data = await asyncio.gather(*tasks)
logger.info('Done!')
await csv_writer(ad_data[0])
logger.info('csv created.. Please refresh the page to download the csv.')
return ad_data[0]
Gunicorn.Py
import multiprocessing
import os
from dotenv import load_dotenv
load_dotenv()
name = "gunicorn config for FastAPI - TutLinks.com"
accesslog = "/root/fastapi_google_ads_scraper/google_ad_scraper/gunicorn-access.log"
errorlog = "/root/fastapi_google_ads_scraper/google_ad_scraper/gunicorn-error.log"
bind = "0.0.0.0:8000"
worker_class = "uvicorn.workers.UvicornWorker"
workers = multiprocessing.cpu_count () * 2 + 1
worker_connections = 1024
backlog = 2048
max_requests = 5120
timeout = 0
keepalive = 2
debug = os.environ.get("debug", "false") == "true"
reload = debug
preload_app = False
daemon = False
Digital Ocean VP:8GB RAM,4 AMD VCPU,25GB磁盘,Ubuntu 20.04 20.04
Scraper只需挂断任何错误即可挂断,我已声明default_timeout = -1
indis queue in Redis queue in Redis queue in因为某些请求确实比平时多。我试图在VPS下独立运行Scraper,以查看它是刮板问题还是API,但刮板本身在最后一个关键字上挂了数小时。关于如何解决此问题的任何想法?我已经在墙上撞了数周了数周,但不知道。谢谢
I have this scraper build with asyncio and httpx and it triggers on POST request where a user uploads the list of keywords as a csv file. Everything working fine locally but it hangs up when I try to do 50+ keywords on digital ocean server.
my api code fragment:
import fastapi as _fastapi
from fastapi.responses import HTMLResponse, FileResponse, Response, RedirectResponse, StreamingResponse
from starlette.requests import Request
from starlette.templating import Jinja2Templates
import shutil
import os
import time
from rq import Queue
from rq.job import Job
from redis import Redis
from scraper import run_scraper
from utils import clean_file, csv_writer
app = _fastapi.FastAPI()
r = Redis(
host="localhost",
port=6379,
db=0,
)
q = Queue(connection=r, default_timeout=-1)
templates = Jinja2Templates("templates")
@app.get("/")
def index(request: Request):
return templates.TemplateResponse("index.html", {"request": request})
@app.post("/api/v1/scraped_csv")
async def extract_ads(csv_file: _fastapi.UploadFile = _fastapi.File(...)):
temp_file = _save_file_to_disk(csv_file, path="temp", save_as="temp")
task = q.enqueue(run_scraper, temp_file)
return RedirectResponse(f'/progress/{task.id}', status_code=303)
@app.get("/progress/{job_id}")
def progress(request: Request, job_id):
job = Job.fetch(job_id, connection=r)
if job.is_finished:
csv_path = os.path.abspath(clean_file)
return FileResponse(path=csv_path, media_type="text/csv", filename=clean_file)
return templates.TemplateResponse("log_stream.html", {"request": request})
@app.get("/log_stream/")
def stream():
def iterfile():
with open("scraper.log") as log_info:
while True:
where = log_info.tell()
line = log_info.readline()
if not line:
time.sleep(0.5)
log_info.seek(where)
else:
yield line
return StreamingResponse(iterfile(), media_type="text/plain")
def _save_file_to_disk(uploaded_file, path=".", save_as="default"):
extension = os.path.splitext(uploaded_file.filename)[-1]
temp_file = os.path.join(path, save_as + extension)
with open(temp_file, "wb") as buffer:
shutil.copyfileobj(uploaded_file.file, buffer)
return temp_file
I am using redis queue and make scraping in the background because of request timeout issue.
my scraper code fragment:
import httpx
import asyncio
from bs4 import BeautifulSoup
from decouple import config
from urllib.parse import urlencode
from urllib.parse import urlparse
import os
import logging
from utils import csv_reader, csv_writer
SCRAPERAPI_KEY = config("API_KEY")
NUM_RETRIES = 3
logging.basicConfig(filename="scraper.log",
format='%(asctime)s %(message)s',
filemode='w')
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
base_url = "https://www.google.com/search?"
headers = {
"authority": "www.google.com",
"sec-ch-dpr": "1",
"sec-ch-viewport-width": "1366",
"sec-ch-ua": '" Not A;Brand";v="99", "Chromium";v="98", "Yandex";v="22"',
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-full-version": '"22.3.3.886"',
"sec-ch-ua-arch": '"x86"',
"sec-ch-ua-platform": '"Linux"',
"sec-ch-ua-platform-version": '"5.4.0"',
"sec-ch-ua-model": '""',
"sec-ch-ua-bitness": '"64"',
"sec-ch-ua-full-version-list": '" Not A;Brand";v="99.0.0.0", "Chromium";v="98.0.4758.886", "Yandex";v="22.3.3.886"',
"upgrade-insecure-requests": "1",
"user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.141 Safari/537.36",
"accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
"sec-fetch-site": "same-origin",
"sec-fetch-mode": "navigate",
"sec-fetch-user": "?1",
"sec-fetch-dest": "document",
"referer": "https://www.google.com/",
"accept-language": "en,ru;q=0.9",
}
pagination_params = {
"q": "lift installation",
"source": "lnms",
"tbm": "shop",
"ei": "aW-HYv74MrCOseMP_8OumAY",
"start": "",
"sa": "X",
"ved": "0ahUKEwikobSm8e33AhUjUGwGHQVfDr84PBDy0wMIpw0",
"biw": "480",
"bih": "665",
"dpr": "1",
}
initial_params = {
"q": "",
"source": "lnms",
"tbm": "shop",
"sa": "X",
"ved": "0ahUKEwikobSm8e33AhUjUGwGHQVfDr84PBDy0wMIpw0",
"biw": "480",
"bih": "665",
"dpr": "1",
}
results = []
def get_scraperapi_url(url):
payload = {
"api_key": SCRAPERAPI_KEY,
"url": url,
"country_code": "au",
"keep_headers": "true",
}
proxy_url = "http://api.scraperapi.com/?" + urlencode(payload)
return proxy_url
async def log_request(request):
logger.debug(f"Request: {request.method} {request.url}")
async def log_response(response):
request = response.request
logger.debug(f"Response: {request.method} {request.url} - Status: {response.status_code}")
async def fetch_pages(keyword, page_no):
initial_params["q"] = keyword
if not page_no:
params = initial_params
url = base_url + urlencode(params)
else:
params = pagination_params
params["start"] = str(page_no * 10)
params["q"] = keyword
url = base_url + urlencode(params)
async with httpx.AsyncClient(
event_hooks={"request": [log_request], "response": [log_response]}
) as client:
response = await client.get(
get_scraperapi_url(url), headers=headers, timeout=None
)
return response
async def parse_page(html):
ad_urls = []
content = BeautifulSoup(html, "lxml")
for ad in content.find_all("a", {"class": "sh-np__click-target"}):
try:
async with httpx.AsyncClient() as client:
r = await client.get(
"https://www.google.com" + ad["href"], headers=headers, timeout=None
)
url = str(r.url)
ad_urls.append(urlparse(url).netloc)
logger.debug(urlparse(url).netloc)
except:
pass
for idx in range(len(ad_urls)):
results.append({"Ad_Url": ad_urls[idx]})
return results
async def run_scraper(file_path):
tasks = []
kw = await csv_reader(file_path)
for k in kw:
for page in range(0, 4):
for _ in range(NUM_RETRIES):
try:
response = await fetch_pages(k, page)
if response.status_code in [200, 404]:
break
except httpx.ConnectError:
response = ""
if response.status_code == 200:
tasks.append(asyncio.create_task(parse_page(response.content)))
ad_data = await asyncio.gather(*tasks)
logger.info('Done!')
await csv_writer(ad_data[0])
logger.info('csv created.. Please refresh the page to download the csv.')
return ad_data[0]
gunicorn.py
import multiprocessing
import os
from dotenv import load_dotenv
load_dotenv()
name = "gunicorn config for FastAPI - TutLinks.com"
accesslog = "/root/fastapi_google_ads_scraper/google_ad_scraper/gunicorn-access.log"
errorlog = "/root/fastapi_google_ads_scraper/google_ad_scraper/gunicorn-error.log"
bind = "0.0.0.0:8000"
worker_class = "uvicorn.workers.UvicornWorker"
workers = multiprocessing.cpu_count () * 2 + 1
worker_connections = 1024
backlog = 2048
max_requests = 5120
timeout = 0
keepalive = 2
debug = os.environ.get("debug", "false") == "true"
reload = debug
preload_app = False
daemon = False
Digital Ocean VPS: 8gb ram, 4 AMD vcpus, 25gb disk, Ubuntu 20.04
Scraper just hangs up without any errors and I have declared the default_timeout=-1
in redis queue because some of the requests do take time more than usual. I have tried to run the scraper independently under vps to see whether it is scraper issue or api but the scraper itself hang up for hours at the last keyword. Any ideas on how to solve this issue? I have been hitting my head against the wall for weeks now but couldn't figure out. Thank you
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论