Spider实战系列-爬取鬼吹灯小说(spider species)
 南窗  分类:IT技术  人气:154  回帖:0  发布于1年前 收藏

第一次发表实战类型的爬虫文章,如果有那里不明白或者出现bug欢迎大家在下面评论,可以给出我更好的建议,欢迎大家指正.

网站链接放在这里了​​鬼吹灯​​

主要是以协程为主来爬取小说得章节内容,协程爬取不懂得小伙伴可以先关注我一手,后续会整理理论的知识放在专栏里

整体思路

  1. 得到鬼吹灯页面的源码
  2. 解析源码得到每一个章节的url
  3. 得到书名,这个书名通过切片得到
  4. 通过url得到一个页面的内容
  5. 使用并发执行多个任务下载

代码实现

导包

import asyncio
import os
from aiohttp import ClientSession
import requests
import aiofiles
from bs4 import BeautifulSoup
复制代码

得到页面源码的方法

参数是传入的url

返回出页面的源码

def get_page_source(url):
    """
    获取页面源码的方法
    :param url: 传入的url
    :return: 返回的是页面的源码
    """
    headers={
        'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36 Edg/109.0.1518.78'
    }
    res = requests.get(url,headers=headers)
    data = res.content.decode()
    return data
复制代码

解析页面得到url

在这里,我使用了集合来存储每一个章节的url,使用xpath来得到章节url,我个人是比较喜欢使用xpath,在这里给出另一种写法,使用的是的beautifulSoup

在页面F12查看,我们找到的是div下的ul下的li下的a标签的属性href

写法一:使用xpath

def parse_page_source(html):
    """
    对页面进行解析,得到我们每一个章节的url
    :param html: 传入的页面源码
    :return: 返回的是一个带有所有章节url的集合
    """
    book_list=[]
    tree = etree.HTML(html)
    mulu_list = tree.xpath('//div[@class="mulu-list quanji"]')
    for mulu in mulu_list:
        # 抓取整个页面下章节的url
        a = mulu.xpath('./ul/li/a/@href')
        # 注意这里一定要在for循环里添加集合
        book_list.append(a)
    return book_list
复制代码

写法二:bs4

def parse_page_source(html):
    """
    对页面进行解析,得到我们每一个章节的url
    :param html: 传入的页面源码
    :return: 返回的是一个带有所有章节url的集合
    """
    book_list = []
    soup = BeautifulSoup(html, 'html.parser')
    a_list = soup.find_all('div', attrs={'class': 'mulu-list quanji'})
    for a in a_list:
        a_list = a.find_all('a')
        for href in a_list:
            chapter_url = href['href']
            book_list.append(chapter_url)
    return book_list
复制代码

得到和章节名

通过传入的章节url,进行切片以下面一个链接为例

​​​​​https://www.51shucheng.net/daomu/guichuideng/jingjuegucheng/2464.html​​

我们按照/切分就有了['https:', '', 'www.51shucheng.net', 'daomu', 'guichuideng', 'jingjuegucheng', '2464.html'] 然后我们取倒数第二个元素就有了b_name

def get_book_name(chapter_url):
    """
    得到名称,为了后续下载好分辨
    :param chapter_url:
    :return:
    """
    book_chapter_name = chapter_url.split('/')[-2]
    return book_chapter_name
复制代码

下载一个章节的内容

使用xpath来写

async def aio_download_one_content(chapter_url, single):
    """
    下载一个章节内容
    :param chapter_url: 传入得到的章节url
    :param single: 使用async with single就是10个并发
    :return:
    """
    c_name = get_book_name(chapter_url)
    for i in range(10):
        try:
            async with single:
                async with ClientSession() as session:
                    async with session.get(chapter_url) as res:
                        # 得到章节内容的页面的源码
                        page_source = await res.content.read()
                        tree = etree.HTML(page_source)
                        # 章节名称
                        base_title = tree.xpath('//h1/text()')[0]
                        if(':'in base_title):
                            number = base_title.split(':')[0]
                            con = base_title.split(':')[1]
                            title = number+con
                        else:
                            title=base_title
                        # 章节内容
                        content = tree.xpath('//div[@class="neirong"]/p/text()')
                        chapter_content = '\n'.join(content)
                        if not os.path.exists(f'{book_name}/{c_name}'):
                            os.makedirs(f'{book_name}/{c_name}')
                            async with aiofiles.open(f'{book_name}/{c_name}/{title}.txt', mode="w",
                                                     encoding='utf-8') as f:
                                await f.write(chapter_content)
                            print(chapter_url, "下载完毕!")
                            return ""
        except Exception as e:
            print(e)
            print(chapter_url, "下载失败!, 重新下载. ")
    return chapter_url
复制代码

这段代码单独拿出来是因为有的章节名称是这样的<<第19章 : 考古队>>,这样的数据是不对的,放在文件里无法命名,这就导致了后续能写入文件的只有章节名没有:的内容,所以我对第一次筛选出的数据进行切片,如果遇到:就把前面和后面的数据切出来在组合,如果没遇到就让一个新的变量来接收base_title

# 章节名称
                    base_title = tree.xpath('//h1/text()')[0]
                    if(':'in base_title):
                        number = base_title.split(':')[0]
                        con = base_title.split(':')[1]
                        title = number+con
                    else:
                        title=base_title
复制代码

使用bs4来写

async def aio_download_one(chapter_url, signal):
    """
    下载一个章节内容
    :param chapter_url: 传入得到的章节url
    :param single: 使用async with single就是10个并发
    :return:
    """
    c_name = get_book_name(chapter_url)
    for c in range(10):
        try:
            async with signal:
                async with aiohttp.ClientSession() as session:
                    async with session.get(chapter_url) as resp:
                        # 得到章节内容的页面的源码
                        page_source = await resp.text()
                        soup = BeautifulSoup(page_source, 'html.parser')
                        # 章节名称
                        base_title = soup.find('h1').text
                        if (':' in base_title):
                            number = base_title.split(':')[0]
                            con = base_title.split(':')[1]
                            title = number + con
                        else:
                            title = base_title
                        # 章节内容
                        p_content = soup.find('div', attrs={'class': 'neirong'}).find_all('p')
                        content = [p.text + '\n' for p in p_content]
                        chapter_content = '\n'.join(content)
                        if not os.path.exists(f'{book_name}/{c_name}'):
                            os.makedirs(f'{book_name}/{c_name}')
                        async with aiofiles.open(f'{book_name}/{c_name}/{title}.txt', mode="w",
                                                 encoding='utf-8') as f:
                            await f.write(chapter_content)
                        print(chapter_url, "下载完毕!")
                        return ""
        except Exception as e:
            print(e)
            print(chapter_url, "下载失败!, 重新下载. ")
    return chapter_url
复制代码

协程下载

async def aio_download(url_list):
    # 创建一个任务列表
    tasks = []
    # 设置最多10个任务并行运作
    semaphore = asyncio.Semaphore(10)
    for h in url_list:
        tasks.append(asyncio.create_task(aio_download_one_content(h, semaphore)))
    await asyncio.wait(tasks)
复制代码

主函数运行

主函数运行就没什么可说的了,这里注意一点就是最后不要loop.close(),这样的话会导致你还没有爬取完数据,loop.close()就会关闭,情况如下,还剩一点就爬完了,结果报错了

if __name__ == '__main__':
    url = 'https://www.51shucheng.net/daomu/guichuideng'
    book_name = '鬼吹灯'
    if not os.path.exists(book_name):
        os.makedirs(book_name)
    source = get_page_source(url)
    href_list = parse_page_source(source)
    loop = asyncio.get_event_loop()
    loop.run_until_complete(aio_download(href_list))
复制代码

完成效果图

我就不一一截图了

总结

为什么我在这里比对了xpath和bs4两种代码,小伙伴可以仔细看一下,在xpath中,我想拿到数据,找到它,大量的使用了//这种,这样的话就会从源码内全局检索,这就导致了我想爬取文章内容会很慢,有些时候还会超时导致报错.所以我们使用xpath的时候,想让他的速度提高,最好有一个指定的点 到了再//

可以理解为下面这种情况

def getList():
    url = 'https://www.xiurenba.cc/XiuRen/'
    response = requests.get(url, headers=headers)
    data = response.c
    ontent.decode()
    # print(data)
    tree = etree.HTML(data)
    li_list = tree.xpath('//ul[@class="update_area_lists cl"]/li')
    return li_list
复制代码
def get_single_url():

li_list = getList()

for li in li_list:

single_url = 'https://www.xiurenba.cc' + li.xpath('./a/@href')[0]
复制代码

还有就是遇到了特殊符号要把它干掉,或者替换掉,这样就可以正常爬取数据

如果有小伙伴想要直接拿取源码的话,可以顺着代码实现一步步粘贴过去

讨论这个帖子(0)垃圾回帖将一律封号处理……