文章目录
- 一、效果演示
- 二、爬取目标
- 三、使用技术
- 四、需求分析
- 五、功能划分
- 六、代码实现(附完整代码)
- 七、相关知识参考
- 八、程序运行说明
一、效果演示
1、获取 [我的博客] URL
2、程序运行效果
二、爬取目标
获取到下图的10组数据
- 10组数据如下:
[原创,周排名,总排名,访问,等级,积分,粉丝,获赞,评论,收藏]
三、使用技术
- python3
- urllib模块
- bs4模块
- pymysql模块
四、需求分析
1、查看网页源代码
进入[我的博客]页面,页面随便一处右键选择[查看网页源代码]
2、源代码拷贝到VSCode
- 将网页源代码Ctrl+A 全部拷贝到任意一款HTML编辑器中,然后将代码格式化。
- 我这里选择的是VSCode工具
3、全局搜索关键字,找到代码位置
Ctrl + F全局搜索上述提到的10组数据的中文名称,或者数值,找到代码的位置
4、观察代码规律
- 根据上述图中找到的代码,可以明显看出这10部分数据均由class='text-center’的<dl>标签包裹
- <dl>标签的title属性值就是要找的关键数据的目标数值
- <dl>标签中的子标签<\dd>的文本内容就是要找的关键数据的中文名称
五、功能划分
- 我这里没有实现文件的存储,直接将数据存入数据库中
六、代码实现(附完整代码)
1、导入要使用的模块
import urllib.request as request
import urllib.error as error
import time
from bs4 import BeautifulSoup
import pymysql
2、获取整个页面HTML代码
"""
爬取整个HTML页面
"""
def download(crawl_url, num_retries=3):
# 设置用户代理
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/72.0.3626.121 Safari/537.36',
}
# 设置请求的URL
req = request.Request(crawl_url, headers=headers)
try:
# 开始爬取整个HTML代码
crawl_html = request.urlopen(req).read().decode("utf-8")
# 此处异常用于处理,csdn服务器异常后导致爬取失败
except error.URLError as e:
print("download error:", e.reason)
crawl_html = None
if num_retries > 0:
if hasattr(e, "code") and 500 <= e.code <= 600:
time.sleep(5000) # 主线程睡眠5s后再重新访问,最多访问3次,也就是如果程序开始执行10s后,csdn服务器始终未正常启动,则此次爬取失败
return download(crawl_url, num_retries-1)
return crawl_html
3、解析HTML获取目标数据
"""
解析HTML获取页面上的10组数据
"""
def parse_page(page_html):
# 声明一个包含10组key的字典data_dict
data_dict = {'积分': '', '粉丝': '', '获赞': '', '评论': '', '收藏': ''
, '原创': '', '周排名': '', '总排名': '', '访问': '', '等级': ''
, '账号': '', '昵称': ''}
# 开始解析html
soup = BeautifulSoup(page_html, "html.parser")
# 解析class='text-center'的所有<dl>标签列表
# 方式一
# dl_list = soup.find_all(name='div', attrs={'class': re.compile('text-center')})
# 方式二
dl_list = soup.find_all('dl', class_='text-center')
print('总共{0}个<dl>标签'.format(len(dl_list)))
# 遍历dl标签列表,获取到每一个dl标签,将目标数值存入字典data_dict
for dl in dl_list:
# print(dl)
dd_name = dl.select('dd')[0].text # 读取该dl标签的子标签dd的文本内容
dd_title = dl.get('title') # 读取该dl标签的title属性值
for k in data_dict.keys(): # 遍历data_dict字典,将匹配的数值存入字典中
if dd_name == k:
data_dict[k] = dd_title
# 获取账号和昵称信息
self_info_list = soup.find_all('a', id='uid') # 根据id获取a标签元素
alias = self_info_list[0].get('title')
account = self_info_list[0].select('span')[0].get('username')
data_dict.update({"账号": account, "昵称": alias})
# data_dict添加爬取时间的信息
data_dict.update({"爬取时间": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())})
return data_dict
4、将目标数据存入数据库
"""
将字典数据存入数据库
"""
def save_to_mysql(data_obj):
db = input('请输入本地数据库名:')
username = input('请输入数据库用户名:')
password = input('请输入数据库密码:')
# 连接数据库
conn = pymysql.connect(
host="127.0.0.1",
port=3306, # 端口号
user=username, # 数据库用户
password=password, # 数据库密码
database=db # 要连接的数据库名称
)
# 建立游标,用于数据库插入
cursor = conn.cursor()
# # 校验数据库是否存在博客数据记录表
# table_count = table_exists(conn, 'CSDN_SELF_BLOG_DATA')
# # 如果数据库不存在该表,则创建
# if table_count == 0:
sql_create = """CREATE TABLE IF NOT EXISTS `csdn_self_blog_data` (
`id` bigint NOT NULL AUTO_INCREMENT,
`account` varchar(100) DEFAULT NULL,
`alias` varchar(100) DEFAULT NULL,
`grade` int DEFAULT NULL,
`count_fan` int DEFAULT NULL,
`count_thumb` bigint DEFAULT NULL,
`count_comment` bigint DEFAULT NULL,
`count_star` int DEFAULT NULL,
`count_original` int DEFAULT NULL,
`rank_week` bigint DEFAULT NULL,
`rank_all` bigint DEFAULT NULL,
`count_scan` bigint DEFAULT NULL,
`blog_level` varchar(100) DEFAULT NULL,
`crawl_time` datetime DEFAULT NULL,
`start_hour` int DEFAULT NULL,
`end_hour` int DEFAULT NULL,
`crawl_date` date DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
"""
cursor.execute(sql_create)
print('==========表创建完成=========')
curr_time = data_obj['爬取时间']
# 创建SQL语句并往数据库插入数据
sql_insert = """insert into csdn_self_blog_data(
account, alias, grade, count_fan, count_thumb
,count_comment, count_star, count_original, rank_week, rank_all
,count_scan, blog_level, crawl_time, start_hour, end_hour
,crawl_date)
values( %s, %s, %s, %s, %s
,%s, %s, %s, %s, %s
,%s, %s, %s, %s, %s
,%s)"""
values_list = [data_obj['账号'], data_obj['昵称'], data_obj['积分'], data_obj['粉丝'], data_obj['获赞']
, data_obj['评论'], data_obj['收藏'], data_obj['原创'], data_obj['周排名'], data_obj['总排名']
, data_obj['访问'], data_obj['等级'], curr_time, curr_time[11:13], int(curr_time[11:13]) + 1
, time.strftime("%Y-%m-%d", time.localtime())]
cursor.execute(sql_insert, tuple(values_list))
conn.commit() # 提交请求,不然不会插入数据
conn.close()
print("======================保存数据库成功!=======================")
5、完整代码
import urllib.request as request
import urllib.error as error
import time
from bs4 import BeautifulSoup
import pymysql
import re
"""
爬取整个HTML页面
"""
def download(crawl_url, num_retries=3):
# 设置用户代理
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/72.0.3626.121 Safari/537.36',
}
# 设置请求的URL
req = request.Request(crawl_url, headers=headers)
try:
# 开始爬取整个HTML代码
crawl_html = request.urlopen(req).read().decode("utf-8")
# 此处异常用于处理,csdn服务器异常后导致爬取失败
except error.URLError as e:
print("download error:", e.reason)
crawl_html = None
if num_retries > 0:
if hasattr(e, "code") and 500 <= e.code <= 600:
time.sleep(5000) # 主线程睡眠5s后再重新访问,最多访问3次,也就是如果程序开始执行10s后,csdn服务器始终未正常启动,则此次爬取失败
return download(crawl_url, num_retries-1)
return crawl_html
"""
解析HTML获取页面上的10组数据
"""
def parse_page(page_html):
# 声明一个包含10组key的字典data_dict
data_dict = {'积分': '', '粉丝': '', '获赞': '', '评论': '', '收藏': ''
, '原创': '', '周排名': '', '总排名': '', '访问': '', '等级': ''
, '账号': '', '昵称': ''}
# 开始解析html
soup = BeautifulSoup(page_html, "html.parser")
# 解析class='text-center'的所有<dl>标签列表
# 方式一
# dl_list = soup.find_all(name='div', attrs={'class': re.compile('text-center')})
# 方式二
dl_list = soup.find_all('dl', class_='text-center')
print('总共{0}个<dl>标签'.format(len(dl_list)))
# 遍历dl标签列表,获取到每一个dl标签,将目标数值存入字典data_dict
for dl in dl_list:
# print(dl)
dd_name = dl.select('dd')[0].text # 读取该dl标签的子标签dd的文本内容
dd_title = dl.get('title') # 读取该dl标签的title属性值
for k in data_dict.keys(): # 遍历data_dict字典,将匹配的数值存入字典中
if dd_name == k:
data_dict[k] = dd_title
# 获取账号和昵称信息
self_info_list = soup.find_all('a', id='uid') # 根据id获取a标签元素
alias = self_info_list[0].get('title')
account = self_info_list[0].select('span')[0].get('username')
data_dict.update({"账号": account, "昵称": alias})
# data_dict添加爬取时间的信息
data_dict.update({"爬取时间": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())})
return data_dict
"""
判断表是否存在
"""
def table_exists(con, table_name): # 这个函数用来判断表是否存在
sql = "show tables;"
con.execute(sql)
tables = [con.fetchall()]
table_list = re.findall('(\'.*?\')',str(tables))
table_list = [re.sub("'", '', each) for each in table_list]
if table_name in table_list:
return 1 # 存在返回1
else:
return 0 # 不存在返回0
"""
将字典数据存入数据库
"""
def save_to_mysql(data_obj):
db = input('请输入本地数据库名:')
username = input('请输入数据库用户名:')
password = input('请输入数据库密码:')
# 连接数据库
conn = pymysql.connect(
host="127.0.0.1",
port=3306, # 端口号
user=username, # 数据库用户
password=password, # 数据库密码
database=db # 要连接的数据库名称
)
# 建立游标,用于数据库插入
cursor = conn.cursor()
# # 校验数据库是否存在博客数据记录表
# table_count = table_exists(conn, 'CSDN_SELF_BLOG_DATA')
# # 如果数据库不存在该表,则创建
# if table_count == 0:
sql_create = """CREATE TABLE IF NOT EXISTS `csdn_self_blog_data` (
`id` bigint NOT NULL AUTO_INCREMENT,
`account` varchar(100) DEFAULT NULL,
`alias` varchar(100) DEFAULT NULL,
`grade` int DEFAULT NULL,
`count_fan` int DEFAULT NULL,
`count_thumb` bigint DEFAULT NULL,
`count_comment` bigint DEFAULT NULL,
`count_star` int DEFAULT NULL,
`count_original` int DEFAULT NULL,
`rank_week` bigint DEFAULT NULL,
`rank_all` bigint DEFAULT NULL,
`count_scan` bigint DEFAULT NULL,
`blog_level` varchar(100) DEFAULT NULL,
`crawl_time` datetime DEFAULT NULL,
`start_hour` int DEFAULT NULL,
`end_hour` int DEFAULT NULL,
`crawl_date` date DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
"""
cursor.execute(sql_create)
print('==========表创建完成=========')
curr_time = data_obj['爬取时间']
# 创建SQL语句并往数据库插入数据
sql_insert = """insert into csdn_self_blog_data(
account, alias, grade, count_fan, count_thumb
,count_comment, count_star, count_original, rank_week, rank_all
,count_scan, blog_level, crawl_time, start_hour, end_hour
,crawl_date)
values( %s, %s, %s, %s, %s
,%s, %s, %s, %s, %s
,%s, %s, %s, %s, %s
,%s)"""
values_list = [data_obj['账号'], data_obj['昵称'], data_obj['积分'], data_obj['粉丝'], data_obj['获赞']
, data_obj['评论'], data_obj['收藏'], data_obj['原创'], data_obj['周排名'], data_obj['总排名']
, data_obj['访问'], data_obj['等级'], curr_time, curr_time[11:13], int(curr_time[11:13]) + 1
, time.strftime("%Y-%m-%d", time.localtime())]
cursor.execute(sql_insert, tuple(values_list))
conn.commit() # 提交请求,不然不会插入数据
conn.close()
print("======================保存数据库成功!=======================")
"""
主程序入口
"""
if __name__ == '__main__':
url = input('请输入您的CSDN网站[我的博客]URL地址:')
# url = "" # 使用自己的CSDN[我的博客]的URL地址
# 1、获取整个HTML
html = download(url)
# 2、解析HTML获取目标数据,并存储到字典中
dict_obj = parse_page(html)
# 打印字典信息,查看效果使用
# for key, value in dict_obj.items():
# print(key + ' = ' + value)
# 3、将字典数据存入MySQL数据库
save_to_mysql(dict_obj)
七、相关知识参考
- python爬虫:BeautifulSoup 使用select方法详解
- Data truncated for column ‘date’ at row 1 的错误解决方案
- 解决python中TypeError: not enough arguments for format string
- beautifulsoup通过id获取指定元素内容
- Python MySQL 插入表
- python中如何将数据保存到mysql数据库
- python3 两种方法将数据存入mysql数据库
- Python3实现的字典遍历操作详解
- python判断mysql中某张表是否存在
八、程序运行说明
- 上述完整代码,可以直接运行使用。如果电脑第一次运行Python程序,有俩种方式可以运行,具体可参考该链接 全世界都公认运行Python的最简单方法
- 上述代码运行需要有外网,公司内网一般不行
- 上述代码只支持本地运行,控制台输入所需参数,自动将自己的博客数据存储到本地的MySQL数据库的博客数据记录表中(表名可以在代码中修改)
- 如果本地未安装MySQL数据库,可以参考MySQL基础知识学习安装,也可以换成Oracle数据库,但是切换为Oracle后,代码需要有所调整
需要输入的参数如下:
- URL
- 本地数据库名
- 本地数据库用户
- 本地数据库密码