Python爬虫总结 1.1 urllib库 示例代码如下:
1.1.1发送get请求 1 2 3 4 5 6 7 8 9 from urllib import request url = '' resp = request.urlopen(url) if resp.getcode() == 200 : with open ('a.mp4' , 'wb' ) as f: f.write(
1.1.2发送post请求 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 from urllib import requestfrom urllib import parse url = "" data = { "keyword" : "王顺子" , "pageIndex" : "1" , "pageSize" : "20" , "searchType" : "0" } params_str = parse.urlencode(data) resp = request.urlopen(url, data=bytes (params_str, encoding='UTF-8' ))
1.1.4获取响应头信息 获取所有头部信息
1 resp.getheader('Content-Type' )
1.1.6url地址编解码 1 2 3 4 5 6 7 8 9 10 11 12 from urllib import parse params = { 'kw' : 'python教程' , 'searchType' : '1' }print (parse.urlencode(params)) print (parse.parse_qs(params_str)) print (parse.parse_qsl(params_str))
1.1.7构建请求头 示例代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 from urllib import requestfrom urllib import parse url = "" headers = { "User-Agent" : "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36" } req = request.Request(url, headers=headers) resp = request.urlopen(req)print ('UTF-8' ))
1.1.8处理不受信任的证书 示例代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 from urllib import requestimport ssl url = "" headers = { "User-Agent" : "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36" } req = request.Request(url, headers=headers) context = ssl.create_default_context(cafile="charles.pem" ) resp = request.urlopen(req, context=context)print ('UTF-8' ))
1.1.9设置代理 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 from urllib import request url = "" proxy = '' proxy_handler = request.ProxyHandler({'http' : proxy}) proxy_opener = request.build_opener(proxy_handler) resp = (url)print ('UTF-8' ))
1.1.10验证码处理 12306验证码案例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 from urllib import requestfrom urllib import parse url = "" resp = request.urlopen(url) headers_str = resp.getheader("Set-Cookie" ) headers = headers_str.split("," ) headers_result = []for header in headers: headers_result.append(header.split(";" )[0 ]) headers_result_str = ";" .join(headers_result)print (headers_result_str)with open ("yzm.jpg" , "wb" ) as f: f.write( answer = input ("请输入验证码答案: " ) check_url = "" data = { "answer" : answer, "login_site" : "E" , "rand" : "sjrand" } param = parse.urlencode(data) param_bytes = bytes (param, encoding="utf-8" ) check_headers = { "Cookie" : headers_result_str } req = request.Request(check_url, data=param_bytes, headers=check_headers) result = request.urlopen(req)print ("utf-8" ))
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 from urllib import requestfrom urllib import parsefrom http.cookiejar import CookieJar, MozillaCookieJar url = "" cookie_jar = MozillaCookieJar() cookie_handler = request.HTTPCookieProcessor(cookie_jar) cookie_opener = request.build_opener(cookie_handler) resp = (url)for c in cookie_jar: print (c)with open ("yzm.jpg" , "wb" ) as f: f.write( answer = input ("请输入验证码答案: " ) check_url = "" data = { "answer" : answer, "login_site" : "E" , "rand" : "sjrand" } param = parse.urlencode(data) param_bytes = bytes (param, encoding="utf-8" ) req = request.Request(check_url, data=param_bytes) result = (req)print ("utf-8" ))
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 from urllib import request user = "itlike" pwd = "123456" url = "" pm = request.HTTPPasswordMgrWithDefaultRealm() pm.add_password(None , url, user, pwd) handler = request.HTTPBasicAuthHandler(pm) opener = request.build_opener(handler) resp = (url)print ("utf-8" ))
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 from urllib import requestfrom http.cookiejar import CookieJar proxy = "" url = "" proxy_handler = request.ProxyHandler({"http" : proxy}) cookie_jar = CookieJar() cookie_handler = request.HTTPCookieProcessor(cookie_jar) opener = request.build_opener(proxy_handler, cookie_handler) req = request.Request(url, headers={ "User-Agent" : "Mozilla/5.0 (iPhone; CPU iPhone OS 11_0 like Mac OS X) AppleWebKit/604.1.38 (KHTML, like Gecko) Version/11.0 Mobile/15A372 Safari/604.1" }) resp = (req)for cookie in cookie_jar: print (cookie)
1.1.11弹窗验证授权处理 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 from urllib import requestimport base64 user = "itlike" pwd = "123456" result_str = user + ":" + pwd result = "Basic " + base64.b64encode(bytes (result_str, encoding="utf-8" )).decode("utf-8" ) url = "" req = request.Request(url, headers={ "Authorization" : result }) resp = request.urlopen(req)print ("utf-8" ))
1.1.12下载进度监听 1 2 3 4 5 6 7 8 9 10 11 from urllib import request url = "" def download_msg (block_num, block_size, total_size ): progress = (block_num + 1 ) * block_size / total_size progress = 1 if progress > 1 else progress print (progress) request.urlretrieve(url, "url_test_video.mp4" , reporthook=download_msg)
1.1.12异常处理 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 from urllib import requestfrom urllib import errorimport sockettry : url = "http://localhost/test8.mp4" resp = request.urlopen(url) print ("utf-8" ))except Exception as e: print (e)
1.2 requests库 1.2.1发送get请求 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 import requests as rts url = "" pm = { "mid" : "2081377" } resp = rts.get(url, params=pm)
1.2.2发送post请求 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 import requests as rts url = "" data_dic = { "keyword" : "python" , "pageIndex" : "1" , "pageSize" : "20" , "searchType" : "0" } cert = r'charles.pem' resp =, data=data_dic, verify=cert)
1.2.4获取响应头信息 1 2 3 print (resp.reason) print (resp.ok) print (resp.headers)
1.2.5获取响应体信息 1 2 3 4 print (resp.content) print (resp.text)
1.2.6获取响应编码 1 2 print(resp.encoding) # UTF-8 resp.encoding = 'UTF-8' # 指定编码
1.2.7构建请求头 1 2 3 4 5 6 7 8 9 import requests as rts url = "" headers = { "User-Agent" : "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36" } resp = rts.get(url, verify=False , headers=headers)
1.2.8处理不受信任的证书 1 2 3 4 5 6 import requests as rts url = "" resp = rts.get(url, verify=False ) print (resp.text)
1.2.9设置代理 1 2 3 4 5 6 7 8 9 10 11 12 import requests as rts url = "" proxy = { "http" : "http://1924086038:xle4zavg@" } resp = rts.get(url, proxies=proxy)print (resp.text)
1.2.10验证码处理 手动处理cookies
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 import requests as rts yzm_url = "" yzm_resp = rts.get(yzm_url) cookie = yzm_resp.cookieswith open ("yzm.jpg" , "wb" ) as f: f.write(yzm_resp.content) answer = input ("请输入验证码答案: " ) check_url = "" data = { "answer" : answer, "login_site" : "E" , "rand" : "sjrand" } check_resp =, data=data, cookies=cookie)print (check_resp.text)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 import requests as rts session = rts.Session() yzm_url = "" yzm_resp = session.get(yzm_url)with open ("yzm.jpg" , "wb" ) as f: f.write(yzm_resp.content) answer = input ("请输入验证码答案: " ) check_url = "" data = { "answer" : answer, "login_site" : "E" , "rand" : "sjrand" } check_resp =, data=data)print (check_resp.text)
1.2.11弹窗验证授权处理 1 2 3 4 5 6 7 import requests as rts url = "" resp = rts.get(url, auth=("itlike" , "123456" ))print (resp.text)
1.2.12流式下载 按字节读取
1 2 3 4 5 6 7 8 9 10 11 12 13 import requests as rts url = "" resp = rts.get(url, stream=True , verify=False , headers={ "Accept-Encoding" : "" })print (len (resp.content))for chunk in resp.iter_content(1024 ): print (chunk)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 import requests as rts url = "" pm = [ ("mid" , "2081377" ) ] resp = rts.get(url, params=pm, verify=False , stream=True )with open ("163.html" , "w" , encoding="utf-8" ) as f: for chunk in resp.iter_lines(): f.write(chunk.decode("utf-8" ))
1.2.14下载进度监听 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 import requests as rts url = "" resp = rts.get(url, verify=False , stream=True )print (resp.headers) total_size = int (resp.headers["Content-Length" ]) current_size = 0 with open ("gaoxiao.mp4" , "wb" ) as f: for chunk in resp.iter_content(1024 *10 ): f.write(chunk) current_size += len (chunk) print (current_size / total_size)
1.2.15异常处理 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import requests as rts url = "" pm = [ ("mid" , "2081377" ) ]try : resp = rts.get(url, params=pm, timeout=(0.001 , 2 )) print (resp.ok)except rts.exceptions.ConnectTimeout as cte: print ("连接异常-超时" , cte)except rts.exceptions.ReadTimeout as rte: print ("读取异常-超时" , rte)
1.3 数据解析 1.3.1json字符串解析 1 2 3 4 5 6 7 8 9 import json json_str = '{"status":{"code":0,"message":""},"extraInfo":null}' result = json.loads(json_str)print (type (result)) print (result)