爬虫
爬虫
前言
链接:
【爬虫1000集】目前B站最完整的爬虫教程,包含所有干货内容!这还没人看,我不更了!_哔哩哔哩_bilibili
index_img: img/article_img_1.jpg
一、Python标准库爬虫
1
2
3
4
5
6
7
8
9
10
11
import urllib.request
url = "https://www.baidu.com"
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36'
}
request = urllib.request.Request(url=url, headers=headers) # 定制请求对象
html = urllib.request.urlopen(url)
print(type(html)) # <class 'http.client.HTTPResponse'>
print(html.getheaders()) # 获取headers
urllib.request.urlretrieve(url=url, filename="baidu.html") # 下载网页/图片汉字变成Unicode编码
1
2
3
4
5
import urllib.parse
name = urllib.parse.quote("张")
print(name)
# %E5%BC%A0
方式二:
1
2
3
4
5
6
7
8
9
import urllib.parse
data = {
"name": "张",
"sex": "女",
"age": 19
}
print(urllib.parse.urlencode(data))
# name=%E5%BC%A0&sex=%E5%A5%B3&age=19s
post爬虫实例-百度翻译
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import json
import urllib.parse
import urllib.request
url = "https://fanyi.baidu.com/sug"
data = {
"kw": "hello"
}
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36',
}
data = urllib.parse.urlencode(data).encode("utf-8")
request = urllib.request.Request(url=url, data=data, headers=headers)
html = urllib.request.urlopen(request)
print(json.loads(html.read().decode("utf-8")))
二、Requests
1
2
3
4
5
6
7
8
9
10
11
12
import requests
url = "https://fanyi.baidu.com/sug"
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36'
}
data = {
"kw": "day"
}
html = requests.post(url=url, headers=headers, data=data)
print(html.json())
html.close()XHR一般都是Ajax请求
三、数据解析
3.1 Re解析
1
2
3
4
5
6
7
8
9
10
import re
s = "我的年龄是19,我的ID是1314"
obj = re.compile(r"\d+") # 预编译正则语句
print(obj.findall(s)) # 查找所有
iterator = obj.finditer(s) # 返回迭代器,效率较高
for i in iterator:
print(i.group())
print(obj.match(s)) # 从头开始匹配
print(obj.search(s)) # 只匹配一次
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import re
s = """
<span class="video" id="1" src="url1">测试1</span>
<span class="video" id="2" src="url2">测试2</span>
<span class="video" id="3" src="url3">测试3</span>
"""
obj = re.compile(r'<span class="video" id="(?P<ID>\d+)" src=".*?">(?P<Test>.*?)</span>', re.S)
# re.S让.能匹配换行符
# ?P<分组名称>
iterator = obj.finditer(s)
for it in iterator:
print(it.group("ID"))
print(it.group("Test"))实战-豆瓣Top250
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import re
import csv
import requests
base_url = "https://movie.douban.com/top250?start=0&filter="
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.5112.81 Safari/537.36 Edg/104.0.1293.54"
}
html = requests.get(url=base_url, headers=headers)
obj = re.compile(r'<span class="title">(?P<title>.*?)</span>.*?'
r'<span class="rating_num" property="v:average">(?P<score>.*?)</span>.*?'
r'<span>(?P<num>.*?)人评价</span>.*?'
r' <span class="inq">(?P<quote>.*?)</span>', re.S)
iterator = obj.finditer(html.text)
f = open("doubanTop250.csv", "w", encoding="utf-8")
csvWriter = csv.writer(f)
for it in iterator:
row = it.groupdict()
csvWriter.writerow(row.values())
html.close()
f.close()
3.2 Bs4解析
bs4解析主要是通过标签名和属性解析
1
2
3
4
5
6
7
html.find("table",class_="test") # class_即class,此处的下划线是为了避免冲突
# 或者
html.find("table",attrs={
"class": "test"
})
test.text # 拿到文本标签内容
test.get("class") # 拿到特定属性的值
3.3 Xpath解析
1
2
3
4
5
6
7
8
9
# //表示后代
# text()取文本
/book/author/*/nick[1]/text() *表示任意节点 [1]表示第一个(xpath是从第一个开始数的)
/book/author/*/nick[@class="test"]/text() 查找特定属性
若从子节点查找xpath:./xxxxxx
若查找属性xpath:/book/author/@href
from lxml import etree
tree = etree.parse("index.html")
tree.xpath("/")
四、Requests进阶
4.1 处理Cookie
1
2
3
4
5
6
7
session = requests.session()
session.post()
session.get()
# 或者采用如下方式
requests.post(url,headers = {
"Cookie": "xxx"
})
4.2 防盗链
审查元素和页面源代码是有偏差的,审查元素部分代码是js脚本生成的 防盗链需要加上Referer字段
4.3 代理
1
2
3
4
proxies = {
"https": "socks5://127.0.0.1:10808",
"http": "socks5://127.0.0.1:10808"
}
五、并发编程
并发编程分为两种应用场景
CPU密集型
指的是I/O在很短时间内就可以完成,主要是CPU进行大量的计算,特点是CPU占用率比较高
比如:解压缩,加解密,正则表达式搜索
IO密集型
指的是系统在运行过程中大部分时间在等待IO操作
比如:文件处理程序,爬虫,读写数据库
一个进程可以启动n个线程,一个线程可以启动n个协程
相比协程,线程的启动数目有限制,有线程切换开销
实现技术有三种
多进程
可以多核,真正并行
适用于CPU密集型计算
多线程
适用于IO密集型计算,同时运行的任务数目不多
多协程
支持的库有限制
适用于IO密集型计算,需要超多任务并行
5.1、多线程
进程是分配资源的最小单位
线程是程序执行的最小单位,多线程更加节省资源
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
进程是资源单位
线程是执行单位
from threading import Thread
def func():
for ii in range(10000):
print(f"func{ii}")
if __name__ == '__main__':
thread = Thread(target=func)
thread.start() # 多线程状态设置为可以开始状态,具体的执行时间由cpu决定
for i in range(10000):
print(f"main{i}")第二种写法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from threading import Thread
class MyThread(Thread):
def run(self):
for ii in range(10000):
print(f"func{ii}")
if __name__ == '__main__':
thread = MyThread()
thread.start() # 多线程状态设置为可以开始状态,具体的执行时间由cpu决定
for i in range(10000):
print(f"main{i}")
这是第二种写法多线程传递参数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
from threading import Thread
def func(name):
for ii in range(10000):
print(f"{name}{ii}")
if __name__ == '__main__':
thread1 = Thread(target=func, args=("张",)) 参数必须是元组
thread1.start()
thread2 = Thread(target=func, args=("陈",))
thread2.start()
多线程传递参数设置守护主线程
1
2
3
m = Thread(target=func(2, 4),daemon=True)
或者
m.setDaemon(True)线程之间的执行顺序是无序的
线程之间的通信
queue.Queue()
线程安全问题的解决:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import threading
import time
class Account:
def __init__(self, balance):
self.balance = balance
def draw(account: Account, amount):
with lock:
if account.balance >= amount:
time.sleep(1)
print("取钱成功")
account.balance -= amount
print("余额为:", account.balance)
else:
print("取钱失败")
if __name__ == '__main__':
account = Account(1000)
lock = threading.Lock()
ta = threading.Thread(target=draw, args=(account, 500))
tb = threading.Thread(target=draw, args=(account, 600))
ta.start()
tb.start()获取返回参数只能通过queue间接获取
5.2、多进程
进程是操作系统进行资源分配和调度运行的基本单位,一个程序至少有一个进程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from multiprocessing import Process
def func():
for ii in range(1000):
print("多进程", ii)
if __name__ == '__main__':
pro = Process(target=func)
# target:执行的目标函数
# name:进程名,一般不设置
# group:进程组
pro.start()
for i in range(1000):
print("主进程", i)如何传递参数?
- args:以元组形式传递
- kwargs:以字典形式传递
1
2
m = Process(target=func, args=(2, 4))
n = Process(target=func(2, 4))如何获取进程编号?
os.getpid()
获取当前进程编号os.getppid()
获取当前父进程编号主进程会等待所有子进程执行结束再结束
如何设置守护主进程?也就是主进程关闭后所有进程直接销毁
1
2
3
m = Process(target=func(2, 4))
m.daemon = True
m.start()
thread.join()会阻塞当前线程(通常是主线程),直到被调用的线程执行完毕
5.3、线程池和进程池
原理:新建线程系统需要分配资源,终止线程系统需要回收资源,如果可以重用线程,则可以减去新建/终止的开销
一次性开辟一些线程,用户直接给线程池提交任务
1
2
3
4
5
6
7
8
9
10
11
12
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
def func(name):
for i in range(1000):
print(name, i)
if __name__ == '__main__':
with ThreadPoolExecutor(61) as t:
for i in range(500):
t.submit(func, name="张")
线程池实战-爬取cnblog
blog_spider.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# https://www.cnblogs.com/#p2
import requests
from bs4 import BeautifulSoup
urls = [f'https://www.cnblogs.com/#p{i}' for i in range(1, 5)]
# 抓取网页源代码
def craw(url):
html = requests.get(url)
return html.text
# 利用bs4解析
def parse(html):
soup = BeautifulSoup(html, "html.parser")
contents = soup.find_all("a", class_='post-item-title')
result = []
for content in contents:
a = content.text
b = content.get('href')
result.append({a: b})
return result
if __name__ == '__main__':
parse(craw(urls[0]))thread_pool.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import blog_spider
import concurrent.futures
# craw
with concurrent.futures.ThreadPoolExecutor() as pool:
# 遍历方式一:map的结果和和入参的顺序是对应的,也就是说htmls是有序的
htmls = pool.map(blog_spider.craw, blog_spider.urls)
htmls = list(zip(blog_spider.urls, htmls))
# parse
with concurrent.futures.ThreadPoolExecutor() as pool:
futures = {}
for url, html in htmls:
# 遍历方式二:这个是一个一个提交
future = pool.submit(blog_spider.parse, html)
futures[url] = future
for url, future in futures.items():
print(url, future.result())
进程池
GIL,全称是Global Interpreter Lock,是Python解释器的一个重要特性。它是一个互斥锁,用于确保在任何给定时间只有一个线程在解释器中运行。这意味着,即使在多核CPU上运行Python程序,由于GIL的存在,每个时刻只有一个CPU核心能够真正运行Python字节码,为了解决GIL带来的缺陷,引入了多进程。
在Flask中如何使用进程池进行CPU密集型计算?
将进程池的定义放在main函数中,且在
app.run()
之前
5.4、协程
5.4.1 Yield
介绍:
带有yield的函数不再是一个普通函数,而是一个生成器,可以用于迭代
类似return,迭代遇到一次yield就return,直到下一次调用,从上一次遇到yield的地方开始执行
若函数里面有yield,若无next(),则函数不会执行!!!
举例一:理解生成器
在下面的代码中,
func
函数并不会执行!!!因为有yield
,函数变成了生成器
1
2
3
4
5
6
7
8
9
10
def func():
print(1)
print(2)
print(3)
yield 4
yield 5
yield 6
func()举例二:理解有
yield
的函数的执行过程首先第一次调用
next()
,执行到res = yield 4
,生成器返回4,后面再次调用next(g)
的时候,res
并没有接收到任何值!!!
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def func():
print("starting...")
while True:
res = yield 4
print("res:", res)
g = func()
print(next(g))
print("*" * 20)
print(next(g))
"""
starting...
4
********************
res: None
4
"""若想让
res
接收到参数,可以这样修改代码:g.send(666)
,然后res
就能接收到666
yield
参考资料:
5.4.2 Asyncio
协程:在单线程内实现并发
意义:当程序遇见io操作的时候,可以选择性的切换到其他任务上
async
:修饰函数时,代表这个函数可以暂停,是协程函数,也就是遇到堵塞的时候去执行其它程序; 执行协程函数的时候,必须把协程对象交给事件循环
处理
await
:后面是可以等待的对象,等待到对象的返回结果,才会继续执行后续代码
await with
:用于异步上下文管理器对象,进入代码块之前获取资源,代码块执行完毕后释放资源在
asyncio
中,运行协程函数的方式有三种:
await 协程函数()
:通常是在一个协程中调用另一个协程,注意,这个一定出现在协程函数中,普通函数是不行的
1
2
3
await download(1)
await download(2)
# 此时download()返回的是协程对象:<class 'coroutine'>上面这种情况会按顺序执行,因为虽然是协程函数,但是没有任何调度算法,也就是统一管理
asyncio.run()
:必须出现在非协程函数中,一般用于执行最顶层的入口函数,可以看做是一切的调度者
asyncio.create_task()
:将协程对象封装为Task
,一般用于实现异步并发
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import asyncio
async def download(url):
print("开始请求")
await asyncio.sleep(3) # 模拟网络请求
print("结束请求")
async def main():
urls = [
"https://www.baidu.com",
"https://bilibili.com",
"https://www.sougou.com"
]
tasks = []
for url in urls:
tmp = asyncio.create_task(download(url))
tasks.append(tmp)
await asyncio.wait(tasks)
if __name__ == '__main__':
asyncio.run(main())
Asynico
参考资料:
5.4.3 Aiohttp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import asyncio
import aiohttp
i1 = "https://w.wallhaven.cc/full/x8/wallhaven-x8v5mv.jpg"
i2 = "https://w.wallhaven.cc/full/dp/wallhaven-dp1w5g.jpg"
i3 = "https://w.wallhaven.cc/full/72/wallhaven-7286w9.png"
urls = [i1, i2, i3]
async def download(url):
name = url.split("/")[-1]
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
with open(name, "wb") as f:
f.write(await resp.content.read())
async def main():
tasks = (asyncio.create_task(download(url)) for url in urls)
await asyncio.wait(list(tasks))
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main())
5.4.4 信号量
控制并发的最大数量
1
2
3
sem = asyncio.Semaphore(10)
async with sem:
# ...
六、Selenium
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from selenium.webdriver import Chrome
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
import time
# 创建浏览器对象
# 若点击链接创建了新窗口,selenium是不能切换的
web = Chrome()
web.get("https://www.baidu.com")
web.find_element(By.XPATH, '//*[@id="kw"]').send_keys("python", Keys.ENTER)
time.sleep(3)
web.find_element(By.XPATH, '//*[@id="2"]/div/div/h3/a').click()
time.sleep(3)
web.switch_to.window(web.window_handles[-1])
time.sleep(3)
# 关掉子窗口,但是selenium还停留在原始窗口,需要手动切换
web.close()
web.switch_to.window(web.window_handles[0])若遇到Iframe则不行,需要进行如下处理
1
2
web.switch_to.frame(web.find_element(By.XPATH,'xxx'))
web.switch_to.default_content() # 返回原窗口若要操作下拉列表,需要导入下面的模块
1
2
from selenium.webdriver.support.select import Select
sel = Select(sel_el)无头模式
1
2
3
4
5
6
7
8
9from selenium.webdriver import Chrome
>from selenium.webdriver.chrome.options import Options
>opt = Options()
opt.add_argument("--headless")
opt.add_argument("--disable-gpu")
web = Chrome(options=opt)
web.get("https://www.baidu.com")
print(web.title)如何拿到页面源代码?
1
print(web.page_source)
检测是否为selenium控制
1
2
3
4
5
6
7
8
9
window.navigator.webdriver
>若为true如何处理:
from selenium.webdriver import Chrome
>from selenium.webdriver.chrome.options import Options
opt = Options()
opt.add_argument("--disable-blink-features=AutomationControlled")
web = Chrome(options=opt)
web.get("https://www.baidu.com")