Python实战:使用selenium及BeautifulSoup4进行BOOS直聘信息爬取与数据累积【附源码】
本文介绍了如何使用Python的Selenium和BeautifulSoup库,结合pandas进行高效的网页数据爬取、处理和累积存储。通过模拟浏览器操作,我们实现了自动登录、数据抓取和去重,最终将职位信息持续更新到CSV文件中。文章详细阐述了运行环境要求、设计思路、具体实践步骤以及遇到的挑战和解决方案,为读者提供了一个实用的数据爬取与处理的示例。
·
运行环境要求
- 操作系统:适用于Windows、macOS、Linux。
- Python版本:Python 3.6及以上。
- 依赖库:
- selenium:用于模拟浏览器操作。
- webdriver_manager:自动管理驱动程序。
- BeautifulSoup4:解析HTML页面。
- pandas:数据处理和CSV文件操作。
- logging:日志记录。
pip install selenium webdriver_manager beautifulsoup4 pandas
设计思路
本项目旨在通过Selenium模拟用户浏览器行为,获取特定网站(如Boss直聘)上的职位信息,并利用BeautifulSoup解析这些信息。为了实现数据的持续累积而不是每次运行都覆盖原有数据,采用pandas进行数据合并和去重,最终将更新后的数据保存到CSV文件中。
具体实践
- 初始化Selenium WebDriver:配置ChromeDriver,启动Chrome浏览器实例。
from selenium import webdriver from selenium.webdriver.chrome.service import Service from webdriver_manager.chrome import ChromeDriverManager options = webdriver.ChromeOptions() try: driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options) except Exception as e: logging.error(f"创建WebDriver时出错: {e}") raise
- 获取并保存Cookies:访问目标网站并手动登录,然后保存Cookies以便后续自动登录使用。
def 获取cookie(driver, url): logging.info("开始获取cookie") driver.get(url) time.sleep(30) # 等待足够的时间手动登录并保存cookies cookies = driver.get_cookies() with open('cookies.json', 'w') as f: json.dump(cookies, f) logging.info("Cookie获取完毕并已保存")
- 加载Cookies实现自动登录:在后续的会话中加载之前保存的Cookies,实现自动登录。
def 加载cookie(driver, cookie_file='cookies.json'): logging.info("开始加载cookie") with open(cookie_file, 'r') as f: cookies = json.load(f) for cookie in cookies: if 'expiry' in cookie: del cookie['expiry'] # 删除过期时间,避免格式错误 driver.add_cookie(cookie) logging.info("Cookie加载完毕")
- 爬取职位信息:访问职位列表页面,使用BeautifulSoup解析页面,提取职位相关信息。
from bs4 import BeautifulSoup from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.common.by import By def 获取职位信息(driver, base_url, pages=1): logging.info("开始获取职位信息") 职位信息_list = [] for i in range(1, pages + 1): url = f'{base_url}&page={i}' driver.get(url) WebDriverWait(driver, 10).until(EC.presence_of_all_elements_located((By.CLASS_NAME, "job-card-wrapper"))) soup = BeautifulSoup(driver.page_source, 'html.parser') jobs = soup.find_all('li', class_='job-card-wrapper') # 提取并添加职位信息到列表中... return pd.DataFrame(职位信息_list)
- 数据去重与累积:读取已有的数据文件(如果存在),将新抓取的数据与旧数据合并,去除重复项,然后保存更新后的数据。
import pandas as pd if os.path.exists(data_file): existing_data = pd.read_csv(data_file) else: existing_data = pd.DataFrame() 新职位信息 = 获取职位信息(driver, base_url, pages=5) 更新后的职位信息 = pd.concat([existing_data, 新职位信息], ignore_index=True).drop_duplicates(subset=['职位名称', '公司名称']) 更新后的职位信息.to_csv(data_file, index=False, encoding='utf-8-sig')
技术要点
- Selenium的高级应用:包括但不限于Cookies的处理、显式等待(WebDriverWait)等技巧,以确保页面加载完成并成功获取数据。
- BeautifulSoup的灵活运用:精确地定位和提取所需的HTML元素。
- Pandas的数据处理能力:有效地合并、去重和保存数据。
项目复盘
- 挑战:
- 页面结构变化导致的数据提取失败。
- 网站反爬虫机制的应对。
- 数据去重逻辑的设计。
- 解决方案:
- 定期检查目标网站的页面结构,及时更新选择器。
- 合理设置请求间隔,使用Cookies模拟登录状态,减少被封概率。
- 利用pandas强大的数据处理功能,根据特定字段进行去重。
完整源码
"""
bosswork.py: 用于爬取职位信息的脚本。
"""
import json
import logging
import os
import random
import sys
import threading
import time
import argparse
import concurrent.futures
import urllib.parse
import tkinter as tk
import pandas as pd
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
from webdriver_manager.chrome import ChromeDriverManager
from bs4 import BeautifulSoup
def setup_logging():
"""设置日志记录"""
logging.basicConfig(
level=logging.INFO,
format='{asctime} - {levelname} - {module}:{lineno} - {message}',
style='{',
datefmt='%Y-%m-%d %H:%M:%S'
)
DESKTOP_USER_AGENTS = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) Chrome/58.0.3029.110 Safari/537.3",
"Mozilla/5.0 (Windows NT 10.0; WOW64; Trident/7.0; rv:11.0) like Gecko",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) Version/14.0.2 Safari/605.1.15",
# 更多桌面版User-Agent
]
def get_screen_size(default_width=1920, default_height=1080):
"""使用tkinter获取屏幕大小,如果在无GUI环境下运行,则返回默认值"""
try:
root = tk.Tk()
width = root.winfo_screenwidth()
height = root.winfo_screenheight()
root.quit()
except tk.TclError:
logging.warning("无法初始化图形用户界面,使用默认屏幕尺寸。")
return default_width, default_height
return width, height
def init_web_driver():
"""初始化webdriver"""
options = webdriver.ChromeOptions()
user_agent = random.choice(DESKTOP_USER_AGENTS)
options.add_argument(f'user-agent={user_agent}')
# options.add_argument('--headless')
try:
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options)
except Exception as e:
logging.error("WebDriver安装失败: %s", e)
raise SystemExit("请检查网络连接或权限设置。") from e
width, height = get_screen_size()
driver.set_window_size(width, height)
return driver
def random_wait(min_seconds=2, max_seconds=5):
"""随机等待"""
time.sleep(random.uniform(min_seconds, max_seconds))
def get_cookies(driver, url, account):
"""获取cookies"""
logging.info("开始获取cookie for %s", account)
driver.get(url)
exit_command = input("请在浏览器中完成登录后按回车继续...(输入'exit'并回车以退出)")
if exit_command.lower() == 'exit':
logging.info("用户选择退出程序 for %s", account)
driver.quit()
sys.exit(0)
cookies = driver.get_cookies()
cookie_file = f'cookies_{account}.json'
with open(cookie_file, 'w', encoding='utf-8') as f:
json.dump(cookies, f)
logging.info("Cookie获取完毕并已保存 for %s", account)
def load_cookies(driver, account):
"""加载cookies"""
logging.info("开始加载cookie for %s", account)
cookie_file = f'cookies_{account}.json'
if not os.path.exists(cookie_file):
logging.error(f"未找到{account}的cookie文件")
return False
driver.get("https://www.zhipin.com") # 确保是cookie对应的域名
random_wait(1, 3)
try:
with open(cookie_file, 'r', encoding='utf-8') as f:
cookies = json.load(f)
except IOError as e:
logging.error(f"读取cookie文件失败: {e}")
return False
for cookie in cookies:
if 'expiry' in cookie:
del cookie['expiry']
driver.add_cookie(cookie)
logging.info("Cookie加载完毕 for %s", account)
return True
def get_text_safely(job, selector, default='未知'):
"""尝试从job对象中获取指定选择器的文本"""
try:
return job.find(selector[0], class_=selector[1]).text.strip()
except AttributeError as e:
logging.error(f"获取文本时出错: {e}")
return default
def fetch_job_info(driver, base_url, page):
"""获取职位信息"""
职位信息_list = []
# 构造带有页码的URL
url = f'{base_url}&page={page}'
加载成功 = False
for 尝试次数 in range(3):
try:
driver.get(url)
WebDriverWait(driver, 10).until(EC.presence_of_all_elements_located((By.CLASS_NAME, "job-card-wrapper")))
加载成功 = True
break
except Exception as e:
logging.error("尝试 %d 次访问 %s 失败: %s", 尝试次数 + 1, url, e)
random_wait((尝试次数 + 1) * 3, (尝试次数 + 2) * 3)
if not 加载成功:
logging.error("无法加载页面: %s", url)
return pd.DataFrame() # 返回空DataFrame
soup = BeautifulSoup(driver.page_source, 'lxml')
jobs = soup.find_all('li', class_='job-card-wrapper')
for job in jobs:
# 获取每个职位的信息,并添加到列表中
try:
职位名称 = get_text_safely(job, ('span', 'job-name'))
工作地点 = get_text_safely(job, ('span', 'job-area'))
薪资 = get_text_safely(job, ('span', 'salary'), '面议')
标签列表 = [li.text for li in job.find('ul', class_='tag-list').find_all('li')]
经验要求 = 标签列表[0] if len(标签列表) > 0 else '未知'
教育要求 = 标签列表[1] if len(标签列表) > 1 else '未知'
联系人 = get_text_safely(job, ('div', 'info-public'))
公司名称 = get_text_safely(job, ('h3', 'company-name'))
公司标签列表 = [li.text for li in job.find('ul', class_='company-tag-list').find_all('li')]
公司类型 = 公司标签列表[0] if len(公司标签列表) > 0 else '未知'
公司规模 = 公司标签列表[1] if len(公司标签列表) > 1 else '未知'
详情 = ','.join([li.text for li in job.find('div', class_='job-card-footer').find('ul', class_='tag-list').find_all('li')])
职位详情链接 = "https://www.zhipin.com" + job.find('a')['href']
# 直接将字典添加到列表中
职位信息_list.append({
'职位名称': 职位名称,
'工作地点': 工作地点,
'薪资': 薪资,
'经验要求': 经验要求,
'教育要求': 教育要求,
'联系人': 联系人,
'公司名称': 公司名称,
'公司类型': 公司类型,
'公司规模': 公司规模,
'详情': 详情,
'职位详情链接': 职位详情链接
})
except Exception as e:
logging.error(f"解析职位信息时出错: {e}")
random_wait(1, 3)
# 循环结束后,使用收集到的职位信息列表创建DataFrame
职位信息 = pd.DataFrame(职位信息_list)
logging.info(f"第{page}页获取到 {len(职位信息)} 个职位信息")
return 职位信息
def 初始化已爬取职位集合(data_file):
已爬取职位 = set()
if os.path.exists(data_file):
df = pd.read_csv(data_file)
for _, row in df.iterrows():
已爬取职位.add((row['职位名称'], row['公司名称']))
return 已爬取职位
def 去重并保存(data_file, 新职位信息_list):
try:
if os.path.exists(data_file):
现有数据 = pd.read_csv(data_file)
所有数据 = pd.concat([现有数据] + 新职位信息_list, ignore_index=True)
else:
所有数据 = pd.concat(新职位信息_list, ignore_index=True)
更新后的职位信息 = 所有数据.drop_duplicates(subset=['职位名称', '公司名称'])
更新后的职位信息.to_csv(data_file, index=False, encoding='utf-8-sig')
except Exception as e:
logging.error(f"数据处理或保存时出错: {e}")
def main(总页数, 每次爬取页数, 账号列表, 岗位名称):
base_url = 'https://www.zhipin.com/web/geek/job?query={}&city=101240100'.format(
urllib.parse.quote(岗位名称) if 岗位名称 else ''
)
data_file = '职位信息.csv' # 数据文件路径
logging.info(f"开始爬取任务,目标总页数:{总页数},账号列表:{账号列表}")
已爬取职位 = 初始化已爬取职位集合(data_file)
已爬取总页数 = 0 # 初始化已爬取总页数
总职位信息数 = 0 # 初始化总职位信息数
for 账号 in 账号列表:
driver = init_web_driver() # 在循环外初始化webdriver
try:
if not load_cookies(driver, 账号):
logging.info(f"未找到{账号}的cookie文件,需要手动登录。")
get_cookies(driver, "https://www.zhipin.com", 账号)
实际页码 = 1 # 重置实际页码为1,以便于当前账号的爬取
while 实际页码 <= 每次爬取页数 and 已爬取总页数 < 总页数:
logging.info(f"{账号}开始爬取第{实际页码}页数据")
新职位信息 = fetch_job_info(driver, base_url, 实际页码)
新职位信息_list = []
for item in 新职位信息.to_dict('records'):
key = (item['职位名称'], item['公司名称'])
if key not in 已爬取职位:
新职位信息_list.append(item)
已爬取职位.add(key)
if 新职位信息_list:
是否首次写入 = not os.path.exists(data_file)
所有新职位信息 = pd.DataFrame(新职位信息_list)
所有新职位信息.to_csv(data_file, mode='a', index=False, header=是否首次写入, encoding='utf-8-sig')
logging.info(f"{账号}已成功爬取第{实际页码}页数据,新增{len(新职位信息_list)}个职位信息")
实际页码 += 1
已爬取总页数 += 1
总职位信息数 += len(新职位信息_list)
if 实际页码 > 10: # 达到每个账号的爬取限制,准备切换账号
break
except Exception as e:
logging.error(f"使用{账号}获取职位信息时出错: {e}")
finally:
driver.quit() # 关闭浏览器并准备重新初始化
logging.info(f"{账号}的WebDriver已关闭")
# 爬取结束后,再次去重以确保跨次运行的数据也能得到有效去重
if os.path.exists(data_file):
df = pd.read_csv(data_file)
去重后的数据 = df.drop_duplicates(subset=['职位名称', '公司名称'])
去重后的数据.to_csv(data_file, index=False, encoding='utf-8-sig')
logging.info("最终数据去重完成")
logging.info(f"所有账号爬取任务完成,总爬取页数:{已爬取总页数},总职位信息数:{总职位信息数}")
def main_parallel(总页数, 每次爬取页数, 账号列表, 岗位名称):
base_url = 'https://www.zhipin.com/web/geek/job?query={}&city=101240100'.format(
urllib.parse.quote(岗位名称) if 岗位名称 else ''
)
data_file = '职位信息.csv' # 数据文件路径
# 定义一个内部函数,用于每个线程/进程执行的任务
def task(账号):
random_wait()
driver = init_web_driver()
已爬取职位 = 初始化已爬取职位集合(data_file)
新职位信息_list = []
try:
if not load_cookies(driver, 账号):
logging.info(f"未找到{账号}的cookie文件,需要手动登录。")
get_cookies(driver, "https://www.zhipin.com", 账号)
for page in range(1, 每次爬取页数 + 1):
if page > 总页数: # 如果达到了用户指定的总页数,则停止
break
logging.info(f"线程{threading.get_ident()} - 账号{账号}开始爬取第{page}页数据")
新职位信息 = fetch_job_info(driver, base_url, page)
for item in 新职位信息.to_dict('records'):
key = (item['职位名称'], item['公司名称'])
if key not in 已爬取职位:
新职位信息_list.append(item)
已爬取职位.add(key)
except Exception as e:
logging.error(f"线程{threading.get_ident()} - 使用账号{账号}获取职位信息时出错: {e}")
finally:
driver.quit()
return 新职位信息_list
# 使用ThreadPoolExecutor或ProcessPoolExecutor
with concurrent.futures.ThreadPoolExecutor(max_workers=len(账号列表)) as executor:
# 将任务分配给不同的线程/进程执行
futures = [executor.submit(task, 账号) for 账号 in 账号列表]
# 等待所有任务完成
for future in concurrent.futures.as_completed(futures):
try:
新职位信息_list = future.result() # 获取任务结果,如果任务抛出异常,这里会重新抛出
if 新职位信息_list:
是否首次写入 = not os.path.exists(data_file)
所有新职位信息 = pd.DataFrame(新职位信息_list)
所有新职位信息.to_csv(data_file, mode='a', index=False, header=是否首次写入, encoding='utf-8-sig')
logging.info(f"线程{threading.get_ident()}已成功爬取数据,新增{len(新职位信息_list)}个职位信息")
except Exception as e:
logging.error(f"任务执行过程中发生错误: {e}")
# 爬取结束后,再次去重以确保跨次运行的数据也能得到有效去重
if os.path.exists(data_file):
df = pd.read_csv(data_file)
去重后的数据 = df.drop_duplicates(subset=['职位名称', '公司名称'])
去重后的数据.to_csv(data_file, index=False, encoding='utf-8-sig')
logging.info("最终数据去重完成")
if __name__ == '__main__':
setup_logging()
parser = argparse.ArgumentParser(description="爬取职位信息")
parser.add_argument('--总页数', type=int, default=20, help="要爬取的总页数")
parser.add_argument('--每次爬取页数', type=int, default=10, help="每次爬取的页数")
parser.add_argument('--账号列表', nargs='+', default=['账号1', '账号2'], help="账号列表")
parser.add_argument('--岗位名称', type=str, default=None, help="要爬取的岗位名称(可选)")
args = parser.parse_args()
start_time = time.time()
main_parallel(args.总页数, args.每次爬取页数, args.账号列表, args.岗位名称)
# main(args.总页数, args.每次爬取页数, args.账号列表, args.岗位名称)
end_time = time.time()
logging.info(f"爬取任务全部完成,耗时:{end_time - start_time:.2f}秒")
更多推荐
已为社区贡献1条内容
所有评论(0)