运行环境要求

  • 操作系统:适用于Windows、macOS、Linux。
  • Python版本:Python 3.6及以上。
  • 依赖库
    • selenium:用于模拟浏览器操作。
    • webdriver_manager:自动管理驱动程序。
    • BeautifulSoup4:解析HTML页面。
    • pandas:数据处理和CSV文件操作。
    • logging:日志记录。
      pip install selenium webdriver_manager beautifulsoup4 pandas

设计思路

本项目旨在通过Selenium模拟用户浏览器行为,获取特定网站(如Boss直聘)上的职位信息,并利用BeautifulSoup解析这些信息。为了实现数据的持续累积而不是每次运行都覆盖原有数据,采用pandas进行数据合并和去重,最终将更新后的数据保存到CSV文件中。

具体实践

  1. 初始化Selenium WebDriver:配置ChromeDriver,启动Chrome浏览器实例。
    from selenium import webdriver
    from selenium.webdriver.chrome.service import Service
    from webdriver_manager.chrome import ChromeDriverManager
    
    options = webdriver.ChromeOptions()
    try:
        driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options)
    except Exception as e:
        logging.error(f"创建WebDriver时出错: {e}")
        raise
  2. 获取并保存Cookies:访问目标网站并手动登录,然后保存Cookies以便后续自动登录使用。
    def 获取cookie(driver, url):
        logging.info("开始获取cookie")
        driver.get(url)
        time.sleep(30)  # 等待足够的时间手动登录并保存cookies
        cookies = driver.get_cookies()
        with open('cookies.json', 'w') as f:
            json.dump(cookies, f)
        logging.info("Cookie获取完毕并已保存")
  3. 加载Cookies实现自动登录:在后续的会话中加载之前保存的Cookies,实现自动登录。
    def 加载cookie(driver, cookie_file='cookies.json'):
        logging.info("开始加载cookie")
        with open(cookie_file, 'r') as f:
            cookies = json.load(f)
            for cookie in cookies:
                if 'expiry' in cookie:
                    del cookie['expiry']  # 删除过期时间,避免格式错误
                driver.add_cookie(cookie)
        logging.info("Cookie加载完毕")
  4. 爬取职位信息:访问职位列表页面,使用BeautifulSoup解析页面,提取职位相关信息。
    from bs4 import BeautifulSoup
    from selenium.webdriver.support.ui import WebDriverWait
    from selenium.webdriver.support import expected_conditions as EC
    from selenium.webdriver.common.by import By
    
    def 获取职位信息(driver, base_url, pages=1):
        logging.info("开始获取职位信息")
        职位信息_list = []  
        for i in range(1, pages + 1):
            url = f'{base_url}&page={i}'
            driver.get(url)
            WebDriverWait(driver, 10).until(EC.presence_of_all_elements_located((By.CLASS_NAME, "job-card-wrapper")))
            soup = BeautifulSoup(driver.page_source, 'html.parser')
            jobs = soup.find_all('li', class_='job-card-wrapper')
            # 提取并添加职位信息到列表中...
        return pd.DataFrame(职位信息_list)
  5. 数据去重与累积:读取已有的数据文件(如果存在),将新抓取的数据与旧数据合并,去除重复项,然后保存更新后的数据。
    import pandas as pd
    
    if os.path.exists(data_file):
        existing_data = pd.read_csv(data_file)
    else:
        existing_data = pd.DataFrame()
    
    新职位信息 = 获取职位信息(driver, base_url, pages=5)  
    更新后的职位信息 = pd.concat([existing_data, 新职位信息], ignore_index=True).drop_duplicates(subset=['职位名称', '公司名称'])
    更新后的职位信息.to_csv(data_file, index=False, encoding='utf-8-sig')

技术要点

  • Selenium的高级应用:包括但不限于Cookies的处理、显式等待(WebDriverWait)等技巧,以确保页面加载完成并成功获取数据。
  • BeautifulSoup的灵活运用:精确地定位和提取所需的HTML元素。
  • Pandas的数据处理能力:有效地合并、去重和保存数据。

项目复盘

  • 挑战
    • 页面结构变化导致的数据提取失败。
    • 网站反爬虫机制的应对。
    • 数据去重逻辑的设计。
  • 解决方案
    • 定期检查目标网站的页面结构,及时更新选择器。
    • 合理设置请求间隔,使用Cookies模拟登录状态,减少被封概率。
    • 利用pandas强大的数据处理功能,根据特定字段进行去重。

 完整源码

"""
bosswork.py: 用于爬取职位信息的脚本。
"""
import csv
import json
import logging
import os
import random
import sys
import threading
import time
import argparse
import concurrent.futures
import urllib.parse
import tkinter as tk
import pandas as pd
import numpy as np
import urllib.parse
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
from webdriver_manager.chrome import ChromeDriverManager
from bs4 import BeautifulSoup

# 初始化锁对象
lock = threading.Lock()

def setup_logging():
    """
    设置日志记录
    """
    if not os.path.exists('log'):
        os.makedirs('log')

    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)

    file_handler = logging.FileHandler('log/bosswork.log', mode='w')
    file_handler.setLevel(logging.DEBUG)

    console_handler = logging.StreamHandler()
    console_handler.setLevel(logging.INFO)

    formatter = logging.Formatter(
        fmt='{asctime} - {levelname} - {module}:{lineno} - {message}',
        style='{',
        datefmt='%Y-%m-%d %H:%M:%S'
    )
    file_handler.setFormatter(formatter)
    console_handler.setFormatter(formatter)

    logger.addHandler(file_handler)
    logger.addHandler(console_handler)

DESKTOP_USER_AGENTS = [
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) Chrome/58.0.3029.110 Safari/537.3",
    "Mozilla/5.0 (Windows NT 10.0; WOW64; Trident/7.0; rv:11.0) like Gecko",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) Version/14.0.2 Safari/605.1.15",
]

def init_web_driver():
    """
    初始化webdriver

    返回:
        WebDriver: 初始化后的WebDriver对象。
    """
    options = webdriver.ChromeOptions()
    user_agent = random.choice(DESKTOP_USER_AGENTS)
    options.add_argument(f'user-agent={user_agent}')
    # options.add_argument('--headless')
    try:
        driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options)
    except Exception as e:
        logging.error("WebDriver安装失败: %s", e)
        raise SystemExit("请检查网络连接或权限设置。") from e
    return driver

def random_wait(min_seconds=2, max_seconds=5):
    """
    随机等待

    参数:
        min_seconds (int): 最小等待时间。
        max_seconds (int): 最大等待时间。
    """
    wait_time = random.uniform(min_seconds, max_seconds)
    logging.info(f"随机等待 {wait_time:.2f} 秒")
    time.sleep(wait_time)

def get_cookies(driver, url, account):
    """
    获取cookies

    参数:
        driver (WebDriver): WebDriver对象。
        url (str): URL地址。
        account (str): 账号名称。
    """
    logging.info("开始获取cookie for %s", account)
    driver.get(url)
    exit_command = input("请在浏览器中完成登录后按回车继续...(输入'exit'并回车以退出)")
    if exit_command.lower() == 'exit':
        logging.info("用户选择退出程序 for %s", account)
        driver.quit()
        sys.exit(0)
    cookies = driver.get_cookies()
    cookie_file = f'cookies_{account}.json'
    with open(cookie_file, 'w', encoding='utf-8') as f:
        json.dump(cookies, f)
    logging.info("Cookie获取完毕并已保存 for %s", account)

def load_cookies(driver, account):
    """
    加载cookies

    参数:
        driver (WebDriver): WebDriver对象。
        account (str): 账号名称。

    返回:
        bool: 是否成功加载cookies。
    """
    logging.info("开始加载cookie for %s", account)
    cookie_file = f'cookies_{account}.json'
    if not os.path.exists(cookie_file):
        logging.error(f"未找到{account}的cookie文件")
        return False
    driver.get("https://www.zhipin.com")  # 确保是cookie对应的域名
    random_wait(1, 3)
    try:
        with open(cookie_file, 'r', encoding='utf-8') as f:
            cookies = json.load(f)
    except IOError as e:
        logging.error(f"读取cookie文件失败: {e}")
        return False
    for cookie in cookies:
        if 'expiry' in cookie:
            del cookie['expiry']
        driver.add_cookie(cookie)
    logging.info("Cookie加载完毕 for %s", account)
    return True

def get_text_safely(element, tag_tuple, default=''):
    """
    安全获取文本内容

    参数:
        element (BeautifulSoup): BeautifulSoup元素。
        tag_tuple (tuple): 包含标签名和类名的元组。
        default (str): 默认返回值。

    返回:
        str: 文本内容。
    """
    try:
        return element.find(tag_tuple[0], class_=tag_tuple[1]).text.strip()
    except AttributeError:
        return default

def flatten_array(arr):
    if isinstance(arr, np.ndarray):
        return arr.flatten().tolist()
    return arr

def handle_security_check(driver, max_wait_time=60, check_interval=5):
    """
    处理安全检查页面

    参数:
        driver (WebDriver): WebDriver对象。
        max_wait_time (int): 最大等待时间(秒)。
        check_interval (int): 检查间隔时间(秒)。
    """
    try:
        logging.info("检测到安全检查页面,正在处理...")
        elapsed_time = 0
        
        while elapsed_time < max_wait_time:
            time.sleep(check_interval)
            elapsed_time += check_interval
            logging.info(f"已等待 {elapsed_time} 秒,正在检查页面是否跳转完成...")
            
            if "security-check.html" not in driver.current_url:
                logging.info("安全检查页面处理完毕,已跳转回正常页面")
                return True
        
        raise Exception("超过最大等待时间,仍在安全检查页面,请手动完成安全检查")
    
    except Exception as e:
        logging.error(f"处理安全检查页面时出错: {e}")
        input("请手动完成安全检查,然后按Enter键继续...")
        return False

def fetch_job_info(driver, base_url, page):
    """
    获取职位信息

    参数:
        driver (WebDriver): WebDriver对象。
        base_url (str): 基础URL。
        page (int): 页码。

    返回:
        list: 包含职位信息的列表。
    """
    职位信息_list = []
    url = f'{base_url}&page={page}'
    加载成功 = False
    for 尝试次数 in range(3):
        try:
            logging.info(f"尝试访问页面: {url}")
            driver.get(url)
            if "security-check.html" in driver.current_url:
                if not handle_security_check(driver):
                    break
            WebDriverWait(driver, 10).until(EC.presence_of_all_elements_located((By.CLASS_NAME, "job-card-wrapper")))
            加载成功 = True
            break
        except Exception as e:
            logging.error("尝试 %d 次访问 %s 失败: %s", 尝试次数 + 1, url, e)
            random_wait((尝试次数 + 1) * 3, (尝试次数 + 2) * 3)
    if not 加载成功:
        logging.error("无法加载页面: %s", url)
        return []  # 返回空列表
    soup = BeautifulSoup(driver.page_source, 'lxml')
    jobs = soup.find_all('li', class_='job-card-wrapper')
    logging.info(f"找到 {len(jobs)} 个职位卡片")
    
    if len(jobs) == 0:
        logging.error("未找到任何职位卡片,可能页面结构有变动")
        input("请手动检查页面结构,然后按Enter键继续...")
    
    for job in jobs:
        try:
            logging.debug(f"正在解析职位卡片: {job.prettify()}")
            职位名称 = str(get_text_safely(job, ('span', 'job-name')))
            工作地点 = str(get_text_safely(job, ('span', 'job-area')))
            薪资 = str(get_text_safely(job, ('span', 'salary'), '面议'))
            标签列表 = [str(li.text) for li in job.find('ul', class_='tag-list').find_all('li')]
            经验要求 = str(标签列表[0]) if len(标签列表) > 0 else '未知'
            教育要求 = str(标签列表[1]) if len(标签列表) > 1 else '未知'
            联系人 = str(get_text_safely(job, ('div', 'info-public')))
            公司名称 = str(get_text_safely(job, ('h3', 'company-name')))
            公司标签列表 = [str(li.text) for li in job.find('ul', class_='company-tag-list').find_all('li')]
            公司类型 = str(公司标签列表[0]) if len(公司标签列表) > 0 else '未知'
            公司规模 = str(公司标签列表[1]) if len(公司标签列表) > 1 else '未知'
            详情 = ','.join([str(li.text) for li in job.find('div', class_='job-card-footer').find_all('li')])
            职位详情链接 = "https://www.zhipin.com" + str(job.find('a')['href'])
            
            job_info = {
                '职位名称': flatten_array(职位名称),
                '工作地点': flatten_array(工作地点),
                '薪资': flatten_array(薪资),
                '经验要求': flatten_array(经验要求),
                '教育要求': flatten_array(教育要求),
                '联系人': flatten_array(联系人),
                '公司名称': flatten_array(公司名称),
                '公司类型': flatten_array(公司类型),
                '公司规模': flatten_array(公司规模),
                '详情': flatten_array(详情),
                '职位详情链接': flatten_array(职位详情链接)
            }
            
            for key, value in job_info.items():
                logging.debug(f"{key}: {value} (type: {type(value)})")
            
            职位信息_list.append(job_info)
        except AttributeError as e:
            logging.error(f"解析职位信息时出错(可能是页面结构变动): {e}")
            logging.debug(f"职位卡片HTML内容: {job.prettify()}")
            continue  # 跳过当前职位,继续解析下一个职位
        except Exception as e:
            logging.error(f"解析职位信息时出错: {e}")
            logging.debug(f"职位卡片HTML内容: {job.prettify()}")
            input("请手动检查页面结构,然后按Enter键继续...")
    random_wait(1, 3)
    if not 职位信息_list:
        return []  # 返回空列表以避免后续操作出错
        
    for item in 职位信息_list:
        item.setdefault('职位名称', '')
        item.setdefault('工作地点', '')
        item.setdefault('薪资', '面议')
        item.setdefault('经验要求', '未知')
        item.setdefault('教育要求', '未知')
        item.setdefault('联系人', '')
        item.setdefault('公司名称', '')
        item.setdefault('公司类型', '未知')
        item.setdefault('公司规模', '未知')
        item.setdefault('详情', '')
        item.setdefault('职位详情链接', '')

    logging.info(f"第{page}页获取到 {len(职位信息_list)} 个职位信息")
    return 职位信息_list

def save_to_csv(data, filename):
    """
    将数据保存到CSV文件

    参数:
        data (list): 要保存的数据列表。
        filename (str): 保存的文件名。
    """
    keys = data[0].keys()
    with open(filename, 'w', newline='', encoding='utf-8') as output_file:
        dict_writer = csv.DictWriter(output_file, fieldnames=keys)
        dict_writer.writeheader()
        dict_writer.writerows(data)
    logging.info(f"数据已保存到 {filename}")

def 初始化已爬取职位集合(data_file):
    """
    初始化已爬取职位集合

    参数:
        data_file (str): 数据文件路径。

    返回:
        set: 已爬取职位集合。
    """
    已爬取职位 = set()
    if os.path.exists(data_file):
        with open(data_file, 'r', encoding='utf-8') as file:
            reader = csv.DictReader(file)
            for row in reader:
                已爬取职位.add((row['职位名称'], row['公司名称']))
    return 已爬取职位

def main(总页数, 每次爬取页数, 账号列表, 岗位名称, 失败后是否停止=False):
    """
    串行爬取任务主函数

    参数:
        总页数 (int): 要爬取的总页数。
        每次爬取页数 (int): 每次爬取的页数。
        账号列表 (list): 账号列表。
        岗位名称 (str): 要爬取的岗位名称。
        失败后是否停止 (bool): 是否在抓取失败时继续运行。
    """
    base_url = 'https://www.zhipin.com/web/geek/job?query={}&city=101240100'.format(
        urllib.parse.quote(岗位名称) if 岗位名称 else ''
    )
    data_file = os.path.join('data', '职位信息.csv')  # 数据文件路径
    logging.info(f"开始爬取任务,目标总页数:{总页数},账号列表:{账号列表}")
    已爬取职位 = 初始化已爬取职位集合(data_file)
    已爬取总页数 = 0  # 初始化已爬取总页数
    总职位信息数 = 0  # 初始化总职位信息数
    for 账号 in 账号列表:
        driver = init_web_driver()  # 在循环外初始化webdriver
        try:
            if not load_cookies(driver, 账号):
                logging.info(f"未找到{账号}的cookie文件,需要手动登录。")
                get_cookies(driver, "https://www.zhipin.com", 账号)
            实际页码 = 1  # 重置实际页码为1,以便于当前账号的爬取
            while 实际页码 <= 每次爬取页数 and 已爬取总页数 < 总页数:
                logging.info(f"{账号}开始爬取第{实际页码}页数据")
                新职位信息 = fetch_job_info(driver, base_url, 实际页码)
                新职位信息_list = []
                for item in 新职位信息:
                    key = (item['职位名称'], item['公司名称'])
                    if key not in 已爬取职位:
                        新职位信息_list.append(item)
                        已爬取职位.add(key)
                if 新职位信息_list:
                    是否首次写入 = not os.path.exists(data_file)
                    with open(data_file, 'a', newline='', encoding='utf-8') as output_file:
                        dict_writer = csv.DictWriter(output_file, fieldnames=新职位信息_list[0].keys())
                        if 是否首次写入:
                            dict_writer.writeheader()
                        dict_writer.writerows(新职位信息_list)
                    logging.info(f"{账号}已成功爬取第{实际页码}页数据,新增{len(新职位信息_list)}个职位信息")
                实际页码 += 1
                已爬取总页数 += 1
                总职位信息数 += len(新职位信息_list)
                if 实际页码 > 10:  # 达到每个账号的爬取限制,准备切换账号
                    break
        except Exception as e:
            logging.error(f"使用{账号}获取职位信息时出错: {e}")
            if not 失败后是否停止:
                break
        finally:
            driver.quit()  # 关闭浏览器并准备重新初始化
            logging.info(f"{账号}的WebDriver已关闭")
    if os.path.exists(data_file):
        with open(data_file, 'r', encoding='utf-8') as file:
            reader = csv.DictReader(file)
            df = list(reader)
        去重后的数据 = { (item['职位名称'], item['公司名称']): item for item in df }.values()
        save_to_csv(list(去重后的数据), data_file)
        logging.info("最终数据去重完成")
    logging.info(f"所有账号爬取任务完成,总爬取页数:{已爬取总页数},总职位信息数:{总职位信息数}")

if __name__ == '__main__':
    setup_logging()
    parser = argparse.ArgumentParser(description="爬取职位信息")
    parser.add_argument('--总页数', type=int, default=20, help="要爬取的总页数")
    parser.add_argument('--每次爬取页数', type=int, default=10, help="每次爬取的页数")
    parser.add_argument('--账号列表', nargs='+', default=['账号1', '账号2'], help="账号列表")
    parser.add_argument('--岗位名称', type=str, default=None, help="要爬取的岗位名称(可选)")
    parser.add_argument('--失败后是否停止', action='store_true', help="是否在抓取失败时继续运行")
    args = parser.parse_args()
    start_time = time.time()
    main(args.总页数, args.每次爬取页数, args.账号列表, args.岗位名称, args.失败后是否停止)
    end_time = time.time()
    logging.info(f"爬取任务全部完成,耗时:{end_time - start_time:.2f}秒")

Logo

助力广东及东莞地区开发者,代码托管、在线学习与竞赛、技术交流与分享、资源共享、职业发展,成为松山湖开发者首选的工作与学习平台

更多推荐