如何读取URL的文件和Web用多线程刮擦它们

发布于 2025-02-13 17:39:24 字数 2365 浏览 0 评论 0原文

我正在python中实现网络刮擦脚本,该脚本读取JSON文件,并获取一个网址列表以刮擦每个文件。 该文件包含超过60k行,其中约50k是唯一的(因此首先我要删除重复项)。

要执行此过程,我将有下一个:

import contextlib
from bs4 import BeautifulSoup
import feedparser
import pandas
import requests
import time

BASE_URL = 'https://www.iso.org'

def create_iso_details_json(p_merged_iso_df):
    merged_iso_details_df = p_merged_iso_df.drop_duplicates(subset=['Link']).drop(columns=['TC', 'ICS'], axis=1)
    iso_details_dfs = [parse_iso_details(iso, stage, link) 
                       for iso, stage, link in zip(merged_iso_details_df['Standard and/or project'], merged_iso_details_df['Stage'], merged_iso_details_df['Link']) 
                       if link != '']
    merged_iso_details_df = pandas.concat(iso_details_dfs)
    print('Total rows retrieved: ', len(merged_iso_details_df.index))
    merged_iso_details_df.to_json('iso_details.json', orient="records")
    
def parse_iso_details(p_iso, p_stage, p_url):
    print('URL: ', p_url)
    soup = BeautifulSoup(requests.get(p_url).text, 'html.parser')
    try:
        feed_details_url = BASE_URL + soup.find('section', {'id': 'product-details'}).find('a', {'class': 'ss-icon ss-social-circle text-warning text-sm'})['href']
    except AttributeError:
        print('Could not find feed data for URL: ', p_url)
    print(feed_details_url)
    iso_details_dfs = []
    if feed_details_url is not None:
        iso_details_dfs.append(read_iso_details(feed_details_url, p_iso, p_stage))
    with contextlib.suppress(ValueError):
        return pandas.concat(iso_details_dfs)
    
def read_iso_details(p_feed_details_url, p_iso, p_stage):
    data = {'Standard and/or project': p_iso, 'Stage': p_stage}
    df = pandas.DataFrame(data, index=[0])
    feed = feedparser.parse(p_feed_details_url)
    df['Publication date'] = [entry.published for entry in feed.entries]
    return df

def main():
    start_time = time.time()
    merged_iso_df = pandas.read_json('input_file.json', dtype={"Stage": str})
    create_iso_details_json(merged_iso_df)
    print(f"--- {time.time() - start_time} seconds ---")

if __name__ == "__main__":
    main()

我将pandas DataFrame的结果合并为以后将其写入另一个JSON文件。

现在,这需要花费很多时间,因为该过程每个输入URL的请求提出,并且持续0.5到1秒。

我想通过多线程(不是多处理)来实现此过程,以便处理时间显着减少。

实现这一目标的最佳方法是什么?将输入JSON文件分为多个部分,作为要创建处理的线程数量的数量?如何将每个线程的结果合并到一个编写输出JSON文件中?

先感谢您。

I am implementing a web scraping script in Python that reads a JSON file and gets a list of URLs to scrape each.
This file contains over 60K rows of which around 50K are unique (so first I am removing duplicates).

To do this process I have the next:

import contextlib
from bs4 import BeautifulSoup
import feedparser
import pandas
import requests
import time

BASE_URL = 'https://www.iso.org'

def create_iso_details_json(p_merged_iso_df):
    merged_iso_details_df = p_merged_iso_df.drop_duplicates(subset=['Link']).drop(columns=['TC', 'ICS'], axis=1)
    iso_details_dfs = [parse_iso_details(iso, stage, link) 
                       for iso, stage, link in zip(merged_iso_details_df['Standard and/or project'], merged_iso_details_df['Stage'], merged_iso_details_df['Link']) 
                       if link != '']
    merged_iso_details_df = pandas.concat(iso_details_dfs)
    print('Total rows retrieved: ', len(merged_iso_details_df.index))
    merged_iso_details_df.to_json('iso_details.json', orient="records")
    
def parse_iso_details(p_iso, p_stage, p_url):
    print('URL: ', p_url)
    soup = BeautifulSoup(requests.get(p_url).text, 'html.parser')
    try:
        feed_details_url = BASE_URL + soup.find('section', {'id': 'product-details'}).find('a', {'class': 'ss-icon ss-social-circle text-warning text-sm'})['href']
    except AttributeError:
        print('Could not find feed data for URL: ', p_url)
    print(feed_details_url)
    iso_details_dfs = []
    if feed_details_url is not None:
        iso_details_dfs.append(read_iso_details(feed_details_url, p_iso, p_stage))
    with contextlib.suppress(ValueError):
        return pandas.concat(iso_details_dfs)
    
def read_iso_details(p_feed_details_url, p_iso, p_stage):
    data = {'Standard and/or project': p_iso, 'Stage': p_stage}
    df = pandas.DataFrame(data, index=[0])
    feed = feedparser.parse(p_feed_details_url)
    df['Publication date'] = [entry.published for entry in feed.entries]
    return df

def main():
    start_time = time.time()
    merged_iso_df = pandas.read_json('input_file.json', dtype={"Stage": str})
    create_iso_details_json(merged_iso_df)
    print(f"--- {time.time() - start_time} seconds ---")

if __name__ == "__main__":
    main()

I am merging the results in a pandas DataFrame to write it to another JSON file later.

Now, this takes so much time since the process makes a request per each input URL and lasts between 0.5 and 1 seconds.

I would like to implement this process with multithreading (not multiprocessing) so the processing time decreases significatively.

What is the best approach to achieve this? Split the input JSON file into many parts as number of threads to create to processing? How I merge the results of each thread into one to write the output JSON file?

Thank you in advance.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

玻璃人 2025-02-20 17:39:24

此网站 解释了多读的多读。您可以做的是将URL分为相等的部分并同时运行它们。问题是,您基本上只是将所花费的时间除以所使用的线程数。但据我所知,这是您可以做的最好的事情,而不会过度复杂化。

This Website explains multithreading pretty well. What you could do is splitting the URLs into equal parts and running them simultaneously. The problem with that is, that you basically just divide the time it would take by the number of threads you use. But to my knowledge, this is the best thing you can do without overcomplicating it.

指尖微凉心微凉 2025-02-20 17:39:24

我终于设法通过@yui在答案中发布的多线程来实现该过程。

真正的问题是将每个线程的结果合并到一个线程中,因此我决定将每个线程结果写入附录模式为CSV的文件。然后,当所有线程完成后,我会读取CSV并将结果写入Requierd JSON文件。

BASE_URL = 'https://www.iso.org'
NUM_THREADS = 4
q = Queue()
INPUT_FILE = 'iso_tc_ics.json'
ISO_DETAILS_CSV_FILE = 'iso_details.csv'
OUTPUT_FILE = 'iso_details.json'

def create_iso_details_json(p_queue):
    iso_details_df = p_queue.get()
    iso_details_dfs = [parse_iso_details(iso, stage, link) 
                       for iso, stage, link in zip(iso_details_df['Standard and/or project'], iso_details_df['Stage'], iso_details_df['Link']) 
                       if link != '']
    iso_details_df = pandas.concat(iso_details_dfs)
    print('Rows retrieved: ', len(iso_details_df.index))
    return iso_details_df
    
def parse_iso_details(p_iso, p_stage, p_url):
    print('URL: ', p_url)
    soup = BeautifulSoup(requests.get(p_url).text, 'html.parser')
    try:
        feed_details_url = BASE_URL + soup.find('section', {'id': 'product-details'}).find('a', {'class': 'ss-icon ss-social-circle text-warning text-sm'})['href']
    except AttributeError:
        print('Could not find feed data for URL: ', p_url)
    print(feed_details_url)
    iso_details_dfs = []
    if feed_details_url is not None:
        iso_details_dfs.append(read_iso_details(feed_details_url, p_iso, p_stage))
    with contextlib.suppress(ValueError):
        return pandas.concat(iso_details_dfs)
    
def read_iso_details(p_feed_details_url, p_iso, p_stage):
    data = {'Standard and/or project': p_iso, 'Stage': p_stage}
    df = pandas.DataFrame(data, index=[0])
    feed = feedparser.parse(p_feed_details_url)
    df['Publication date'] = [entry.published for entry in feed.entries]
    return df

def main():
    global q
    result_df = create_iso_details_json(q)
    with open(ISO_DETAILS_CSV_FILE, 'a') as f:
        result_df.to_csv(f, mode='a', index=False, header=not f.tell(), encoding='ISO-8859-1')
    q.task_done()

def init():
    merged_iso_df = pandas.read_json(INPUT_FILE, dtype={"Stage": str})
    merged_iso_details_df = merged_iso_df.drop_duplicates(subset=['Link']).drop(columns=['TC', 'ICS'], axis=1)
    iso_details_df_chunks = numpy.array_split(merged_iso_details_df, NUM_THREADS)

    for iso_details_df in iso_details_df_chunks:
        q.put(iso_details_df)

    for _ in range(NUM_THREADS):
        worker = Thread(target=main)
        worker.daemon = True
        worker.start()

def end():
    q.join()
    result_iso_details_df = pandas.read_csv(ISO_DETAILS_CSV_FILE, dtype={"Stage": str}, encoding='ISO-8859-1')
    print('Total rows retrieved: ', len(result_iso_details_df.index))
    result_iso_details_df.to_json(OUTPUT_FILE, orient="records")
    with contextlib.suppress(OSError):
        os.remove(ISO_DETAILS_CSV_FILE)

if __name__ == "__main__":
    start_time = time.time()

    init()

    end()
    
    print(f"--- {time.time() - start_time} seconds ---")

I finally managed to implement the process with multithreading as @Yui posted in their answer.

The real problem was to merge results of each thread into one, so I decided to write each thread result into a file in append mode as a CSV. Then when all threads are finished I read the CSV and write the results into requierd JSON file.

BASE_URL = 'https://www.iso.org'
NUM_THREADS = 4
q = Queue()
INPUT_FILE = 'iso_tc_ics.json'
ISO_DETAILS_CSV_FILE = 'iso_details.csv'
OUTPUT_FILE = 'iso_details.json'

def create_iso_details_json(p_queue):
    iso_details_df = p_queue.get()
    iso_details_dfs = [parse_iso_details(iso, stage, link) 
                       for iso, stage, link in zip(iso_details_df['Standard and/or project'], iso_details_df['Stage'], iso_details_df['Link']) 
                       if link != '']
    iso_details_df = pandas.concat(iso_details_dfs)
    print('Rows retrieved: ', len(iso_details_df.index))
    return iso_details_df
    
def parse_iso_details(p_iso, p_stage, p_url):
    print('URL: ', p_url)
    soup = BeautifulSoup(requests.get(p_url).text, 'html.parser')
    try:
        feed_details_url = BASE_URL + soup.find('section', {'id': 'product-details'}).find('a', {'class': 'ss-icon ss-social-circle text-warning text-sm'})['href']
    except AttributeError:
        print('Could not find feed data for URL: ', p_url)
    print(feed_details_url)
    iso_details_dfs = []
    if feed_details_url is not None:
        iso_details_dfs.append(read_iso_details(feed_details_url, p_iso, p_stage))
    with contextlib.suppress(ValueError):
        return pandas.concat(iso_details_dfs)
    
def read_iso_details(p_feed_details_url, p_iso, p_stage):
    data = {'Standard and/or project': p_iso, 'Stage': p_stage}
    df = pandas.DataFrame(data, index=[0])
    feed = feedparser.parse(p_feed_details_url)
    df['Publication date'] = [entry.published for entry in feed.entries]
    return df

def main():
    global q
    result_df = create_iso_details_json(q)
    with open(ISO_DETAILS_CSV_FILE, 'a') as f:
        result_df.to_csv(f, mode='a', index=False, header=not f.tell(), encoding='ISO-8859-1')
    q.task_done()

def init():
    merged_iso_df = pandas.read_json(INPUT_FILE, dtype={"Stage": str})
    merged_iso_details_df = merged_iso_df.drop_duplicates(subset=['Link']).drop(columns=['TC', 'ICS'], axis=1)
    iso_details_df_chunks = numpy.array_split(merged_iso_details_df, NUM_THREADS)

    for iso_details_df in iso_details_df_chunks:
        q.put(iso_details_df)

    for _ in range(NUM_THREADS):
        worker = Thread(target=main)
        worker.daemon = True
        worker.start()

def end():
    q.join()
    result_iso_details_df = pandas.read_csv(ISO_DETAILS_CSV_FILE, dtype={"Stage": str}, encoding='ISO-8859-1')
    print('Total rows retrieved: ', len(result_iso_details_df.index))
    result_iso_details_df.to_json(OUTPUT_FILE, orient="records")
    with contextlib.suppress(OSError):
        os.remove(ISO_DETAILS_CSV_FILE)

if __name__ == "__main__":
    start_time = time.time()

    init()

    end()
    
    print(f"--- {time.time() - start_time} seconds ---")
土豪我们做朋友吧 2025-02-20 17:39:24

我会选择Asyncio和Aiohttp,这是一个完整的示例,说明如何同时执行多个请求并最终获得结果,

import aiohttp
import asyncio

async def geturl(url, session):
    async with session.get(url) as resp:
        if resp.status == 200:
            return (await resp.json())['name']
        else:
            return "ERROR"

async def main():
    urls = [f'https://pokeapi.co/api/v2/pokemon/{i}' for i in range(1,10)]
    async with aiohttp.ClientSession() as session:
        tasks = [geturl(url, session) for url in urls]
        # asyncio.gather will run all the tasks concurrently
        # and return their results once all tasks have returned
        all_results = await asyncio.gather(*tasks)
        print(all_results)

asyncio.run(main())

这将按照方式打印出前10个口袋妖怪名称,您可以根据需要调整

I would go with asyncio and aiohttp here is a complete example of how to do multiple requests concurrently and get the result in the end

import aiohttp
import asyncio

async def geturl(url, session):
    async with session.get(url) as resp:
        if resp.status == 200:
            return (await resp.json())['name']
        else:
            return "ERROR"

async def main():
    urls = [f'https://pokeapi.co/api/v2/pokemon/{i}' for i in range(1,10)]
    async with aiohttp.ClientSession() as session:
        tasks = [geturl(url, session) for url in urls]
        # asyncio.gather will run all the tasks concurrently
        # and return their results once all tasks have returned
        all_results = await asyncio.gather(*tasks)
        print(all_results)

asyncio.run(main())

This will print the first 10 pokemon names by the way, you can tweak for your needs

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文