编写一个程序,使用Pandas处理DataFrame中的缺失值。
输入:
一个包含缺失值的DataFrame
输出:
处理后的DataFrame,其中缺失值已被适当处理
要求:
1. 使用Pandas的isnull()和sum()方法识别并统计缺失值
2. 根据数据类型选择合适的填充方法:数值型数据使用均值或中位数,分类型数据使用众数
3. 使用fillna()方法填充缺失值
import pandas as pd
import numpy as np
# 创建示例数据
data = {
'A': [1, 2, np.nan, 4, 5],
'B': [np.nan, 2, 3, 4, np.nan],
'C': ['x', 'y', 'z', np.nan, 'x']
}
df = pd.DataFrame(data)
# 识别缺失值
print("缺失值统计:")
print(df.isnull().sum())
# 处理缺失值
df['A'] = df['A'].fillna(df['A'].mean())
df['B'] = df['B'].fillna(df['B'].median())
df['C'] = df['C'].fillna(df['C'].mode()[0])
print("\n处理后的数据:")
print(df)
当你在练习中答错题目时,它们会自动添加到这里
在题库中点击题目旁边的星标图标来收藏题目
数据采集(Data Collection)是数据分析生命周期的第一步,也是最为关键的环节之一。它是指从各种数据源系统地收集、测量和分析数据的过程。数据采集的质量直接决定了后续分析的价值和可靠性,正所谓"垃圾进,垃圾出"(Garbage In, Garbage Out)。高质量的数据采集需要遵循系统性、规范性、时效性和合法性的原则。
数据采集是从一个或多个数据源获取原始数据,并将其转换为可用于分析和处理的格式的过程。这个过程包括数据识别、数据获取、数据传输和数据存储四个主要阶段。
准确性:数据是否真实反映现实情况;完整性:数据是否完整无缺失;一致性:数据在不同系统间是否一致;时效性:数据是否及时更新;唯一性:数据是否存在重复记录。
合法性原则:遵守相关法律法规;必要性原则:只采集业务必需的数据;最小化原则:采集数据量最小化;安全性原则:保障数据传输和存储安全。
import pandas as pd
import numpy as np
from datetime import datetime
class DataQualityChecker:
"""数据质量检查工具类"""
def __init__(self, df):
self.df = df
self.report = {}
def check_completeness(self):
"""检查完整性"""
total_cells = self.df.shape[0] * self.df.shape[1]
missing_cells = self.df.isnull().sum().sum()
completeness = (total_cells - missing_cells) / total_cells * 100
self.report['completeness'] = {
'total_cells': total_cells,
'missing_cells': missing_cells,
'completeness_rate': f"{completeness:.2f}%"
}
return self.report['completeness']
def check_accuracy(self, rules):
"""检查准确性(基于规则)"""
errors = {}
for col, rule in rules.items():
if col in self.df.columns:
invalid = self.df[~self.df[col].apply(rule)].shape[0]
errors[col] = invalid
self.report['accuracy'] = errors
return errors
def check_uniqueness(self, subset=None):
"""检查唯一性"""
duplicates = self.df.duplicated(subset=subset).sum()
uniqueness = (len(self.df) - duplicates) / len(self.df) * 100
self.report['uniqueness'] = {
'total_records': len(self.df),
'duplicates': duplicates,
'uniqueness_rate': f"{uniqueness:.2f}%"
}
return self.report['uniqueness']
def check_timeliness(self, date_col, days_threshold=30):
"""检查时效性"""
if date_col in self.df.columns:
latest = pd.to_datetime(self.df[date_col]).max()
days_old = (datetime.now() - latest).days
self.report['timeliness'] = {
'latest_date': str(latest),
'days_old': days_old,
'is_timely': days_old <= days_threshold
}
return self.report.get('timeliness', {})
def generate_report(self):
"""生成完整报告"""
print("=" * 60)
print("📊 数据质量检查报告")
print("=" * 60)
for dimension, result in self.report.items():
print(f"\n【{dimension.upper()}】")
for key, value in result.items():
print(f" • {key}: {value}")
# 使用示例
df = pd.DataFrame({
'id': [1, 2, 3, 4, 5],
'name': ['张三', '李四', None, '王五', '赵六'],
'age': [25, 30, 35, -5, 40], # -5是异常值
'email': ['a@test.com', 'invalid', 'c@test.com', 'd@test.com', 'e@test.com'],
'created_at': pd.date_range(start='2024-01-01', periods=5, freq='D')
})
checker = DataQualityChecker(df)
checker.check_completeness()
checker.check_accuracy({'age': lambda x: x > 0 and x < 120})
checker.check_uniqueness(subset=['id'])
checker.check_timeliness('created_at')
checker.generate_report()
数据源是指数据的来源和存储形式。根据数据的结构化程度,数据源可以分为结构化数据、半结构化数据和非结构化数据三大类。了解不同类型数据源的特点,有助于选择合适的采集技术和存储方案。随着大数据技术的发展,实时数据流也成为重要的数据源类型。
结构化数据是指具有明确结构定义、可以存储在关系数据库表格中的数据。这类数据具有固定的字段和数据类型,便于存储、查询和分析。
半结构化数据具有一定的结构特征,但没有固定的模式定义。这类数据通常包含标签或标记来标识数据元素,灵活性较高。
非结构化数据没有预定义的数据模型,是最常见的数据类型。这类数据需要通过特征提取、自然语言处理等技术才能转化为可分析的格式。
实时数据流是持续产生、需要实时处理的数据。这类数据通常来自物联网设备、传感器、用户行为追踪等场景。
import pandas as pd
import json
from datetime import datetime
# ========== 1. 结构化数据处理 ==========
print("=" * 50)
print("📊 结构化数据处理")
print("=" * 50)
# 从CSV读取
df_csv = pd.DataFrame({
'id': [1, 2, 3],
'name': ['张三', '李四', '王五'],
'score': [85, 90, 78],
'department': ['技术部', '市场部', '财务部']
})
print("CSV格式数据:")
print(df_csv)
# 从数据库读取(模拟)
print("\n数据库查询结果:")
print(df_csv[df_csv['score'] > 80])
# ========== 2. 半结构化数据处理 ==========
print("\n" + "=" * 50)
print("📝 半结构化数据处理")
print("=" * 50)
# JSON数据处理
json_data = '''
{
"company": "科技公司",
"employees": [
{"id": 1, "name": "张三", "skills": ["Python", "SQL"], "experience": 5},
{"id": 2, "name": "李四", "skills": ["Java", "Spark"], "experience": 3}
],
"metadata": {
"last_updated": "2024-01-15",
"total_count": 2
}
}
'''
data = json.loads(json_data)
print("JSON解析结果:")
print(f"公司: {data['company']}")
print(f"员工数: {data['metadata']['total_count']}")
print("\n员工列表:")
for emp in data['employees']:
print(f" - {emp['name']}: {', '.join(emp['skills'])}")
# JSON转DataFrame
df_json = pd.json_normalize(data['employees'])
print("\n转换为DataFrame:")
print(df_json)
# ========== 3. 非结构化数据处理示例 ==========
print("\n" + "=" * 50)
print("🖼️ 非结构化数据处理(文本示例)")
print("=" * 50)
text_data = """
数据采集是数据分析的基础。
高质量的数据是分析成功的关键。
Python是数据采集的常用工具。
"""
# 简单文本处理
words = text_data.replace('\n', ' ').split()
word_freq = {}
for word in words:
word_freq[word] = word_freq.get(word, 0) + 1
print("词频统计:")
for word, freq in sorted(word_freq.items(), key=lambda x: x[1], reverse=True)[:5]:
print(f" {word}: {freq}次")
# ========== 4. 实时数据流模拟 ==========
print("\n" + "=" * 50)
print("🌐 实时数据流模拟")
print("=" * 50)
import time
import random
def simulate_data_stream(num_records=5):
"""模拟实时数据流"""
for i in range(num_records):
record = {
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'sensor_id': f'SENSOR_{random.randint(1, 3)}',
'temperature': round(random.uniform(20, 30), 2),
'humidity': round(random.uniform(40, 60), 2)
}
print(f"接收数据: {record}")
# time.sleep(0.1) # 实际应用中需要延迟
return "数据流模拟完成"
print(simulate_data_stream())
在数据采集过程中,遵守法律法规和伦理准则是不可逾越的底线。随着数据隐私保护意识的增强和相关法规的完善,数据采集必须在合法合规的框架内进行。违反规定不仅可能导致法律责任,还会损害企业声誉和用户信任。
import requests
from urllib.robotparser import RobotFileParser
import hashlib
import re
from datetime import datetime
class EthicalDataCollector:
"""合规数据采集工具类"""
def __init__(self, user_agent='DataCollector/1.0'):
self.user_agent = user_agent
self.session = requests.Session()
self.session.headers.update({'User-Agent': user_agent})
self.request_delay = 1 # 默认请求间隔(秒)
self.last_request_time = 0
def check_robots_txt(self, base_url):
"""检查robots.txt协议"""
rp = RobotFileParser()
robots_url = f"{base_url.rstrip('/')}/robots.txt"
try:
rp.set_url(robots_url)
rp.read()
return rp
except Exception as e:
print(f"无法读取robots.txt: {e}")
return None
def can_fetch(self, rp, url):
"""检查是否允许爬取"""
if rp is None:
print("警告:无法验证robots.txt,请谨慎爬取")
return True
return rp.can_fetch(self.user_agent, url)
def rate_limit(self):
"""请求频率限制"""
import time
elapsed = time.time() - self.last_request_time
if elapsed < self.request_delay:
time.sleep(self.request_delay - elapsed)
self.last_request_time = time.time()
def anonymize_pii(self, data, fields):
"""个人身份信息脱敏"""
for field in fields:
if field in data:
if isinstance(data[field], str):
# 使用哈希脱敏
data[field] = hashlib.sha256(
data[field].encode()
).hexdigest()[:16]
return data
def detect_pii(self, text):
"""检测文本中的个人身份信息"""
patterns = {
'phone': r'1[3-9]\d{9}', # 手机号
'email': r'[\w\.-]+@[\w\.-]+\.\w+', # 邮箱
'id_card': r'\d{17}[\dXx]', # 身份证号
'bank_card': r'\d{16,19}' # 银行卡号
}
detected = {}
for pii_type, pattern in patterns.items():
matches = re.findall(pattern, str(text))
if matches:
detected[pii_type] = matches
return detected
def fetch_with_compliance(self, url, rp=None):
"""合规获取网页内容"""
# 检查robots.txt
if rp and not self.can_fetch(rp, url):
print(f"❌ robots.txt禁止爬取: {url}")
return None
# 请求频率限制
self.rate_limit()
try:
response = self.session.get(url, timeout=10)
if response.status_code == 200:
# 检测敏感信息
pii = self.detect_pii(response.text)
if pii:
print(f"⚠️ 检测到敏感信息: {list(pii.keys())}")
return response.text
else:
print(f"请求失败,状态码: {response.status_code}")
return None
except Exception as e:
print(f"请求异常: {e}")
return None
# 使用示例
collector = EthicalDataCollector()
# 模拟检测敏感信息
test_data = {
'name': '张三',
'phone': '13800138000',
'email': 'zhangsan@example.com',
'id_card': '110101199001011234'
}
print("=" * 50)
print("🔒 数据脱敏示例")
print("=" * 50)
print("原始数据:", test_data)
# 脱敏处理
anonymized = collector.anonymize_pii(test_data.copy(), ['phone', 'email', 'id_card'])
print("脱敏后数据:", anonymized)
# 检测文本中的PII
print("\n" + "=" * 50)
print("🔍 PII检测示例")
print("=" * 50)
sample_text = "联系方式:手机13800138000,邮箱test@example.com"
detected = collector.detect_pii(sample_text)
print(f"检测到的敏感信息: {detected}")
数据采集工具的选择取决于数据源类型、数据规模、采集频率和技术要求。Python生态系统提供了丰富的数据采集工具库,从简单的HTTP请求到复杂的分布式爬虫框架,能够满足各种数据采集场景的需求。
Requests:简洁的HTTP库,适合简单请求;BeautifulSoup:HTML/XML解析库,易于使用;Scrapy:功能强大的爬虫框架,支持分布式;Selenium:浏览器自动化,适合动态网页
requests:标准HTTP客户端;httpx:支持异步的现代HTTP客户端;aiohttp:异步HTTP客户端/服务器框架
SQLAlchemy:Python SQL工具包和ORM;psycopg2:PostgreSQL适配器;pymysql:纯Python MySQL客户端;pymongo:MongoDB驱动
pandas:数据处理分析库;openpyxl:Excel文件读写;csv:CSV文件处理标准库;json:JSON处理标准库
import pandas as pd
import sqlite3
import json
from datetime import datetime
class MultiSourceDataCollector:
"""多数据源采集工具类"""
def __init__(self):
self.collected_data = []
self.collection_log = []
def log_collection(self, source, status, record_count=0, message=''):
"""记录采集日志"""
log_entry = {
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'source': source,
'status': status,
'record_count': record_count,
'message': message
}
self.collection_log.append(log_entry)
print(f"[{log_entry['timestamp']}] {source}: {status} - {message}")
# ========== 文件数据源 ==========
def collect_from_csv(self, filepath, **kwargs):
"""从CSV文件采集"""
try:
df = pd.read_csv(filepath, **kwargs)
self.collected_data.append(('csv', df))
self.log_collection('CSV文件', '成功', len(df), f'读取{filepath}')
return df
except Exception as e:
self.log_collection('CSV文件', '失败', 0, str(e))
return None
def collect_from_json(self, filepath, record_path=None):
"""从JSON文件采集"""
try:
with open(filepath, 'r', encoding='utf-8') as f:
data = json.load(f)
if record_path:
df = pd.json_normalize(data, record_path=record_path)
else:
df = pd.json_normalize(data)
self.collected_data.append(('json', df))
self.log_collection('JSON文件', '成功', len(df), f'读取{filepath}')
return df
except Exception as e:
self.log_collection('JSON文件', '失败', 0, str(e))
return None
# ========== 数据库数据源 ==========
def collect_from_sqlite(self, db_path, query):
"""从SQLite数据库采集"""
try:
conn = sqlite3.connect(db_path)
df = pd.read_sql(query, conn)
conn.close()
self.collected_data.append(('sqlite', df))
self.log_collection('SQLite数据库', '成功', len(df), f'执行查询')
return df
except Exception as e:
self.log_collection('SQLite数据库', '失败', 0, str(e))
return None
def collect_from_database(self, connection_string, query, db_type='mysql'):
"""从数据库采集(通用方法)"""
try:
from sqlalchemy import create_engine
engine = create_engine(connection_string)
df = pd.read_sql(query, engine)
self.collected_data.append((db_type, df))
self.log_collection(f'{db_type}数据库', '成功', len(df))
return df
except Exception as e:
self.log_collection(f'{db_type}数据库', '失败', 0, str(e))
return None
# ========== 内存数据库示例 ==========
def create_sample_database(self):
"""创建示例数据库"""
conn = sqlite3.connect(':memory:')
# 创建表并插入数据
conn.execute('''
CREATE TABLE employees (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
department TEXT,
salary REAL,
hire_date TEXT
)
''')
sample_data = [
(1, '张三', '技术部', 15000, '2022-01-15'),
(2, '李四', '市场部', 12000, '2022-03-20'),
(3, '王五', '财务部', 13000, '2022-06-10'),
(4, '赵六', '技术部', 16000, '2023-01-05'),
(5, '钱七', '人事部', 11000, '2023-04-15')
]
conn.executemany(
'INSERT INTO employees VALUES (?, ?, ?, ?, ?)',
sample_data
)
conn.commit()
return conn
# ========== 数据合并与导出 ==========
def merge_collected_data(self):
"""合并所有采集的数据"""
if not self.collected_data:
return None
all_dfs = [df for source, df in self.collected_data if df is not None]
if all_dfs:
return pd.concat(all_dfs, ignore_index=True)
return None
def export_to_csv(self, filepath):
"""导出到CSV"""
merged = self.merge_collected_data()
if merged is not None:
merged.to_csv(filepath, index=False, encoding='utf-8')
print(f"数据已导出到: {filepath}")
def get_collection_summary(self):
"""获取采集摘要"""
print("\n" + "=" * 50)
print("📊 数据采集摘要")
print("=" * 50)
total_records = sum(
len(df) for source, df in self.collected_data if df is not None
)
print(f"总采集记录数: {total_records}")
print(f"数据源数量: {len(self.collected_data)}")
print("\n采集详情:")
for source, df in self.collected_data:
if df is not None:
print(f" • {source}: {len(df)}条记录, {len(df.columns)}个字段")
print("\n采集日志:")
for log in self.collection_log:
status_icon = '✅' if log['status'] == '成功' else '❌'
print(f" {status_icon} [{log['source']}] {log['message']}")
# 使用示例
collector = MultiSourceDataCollector()
# 创建示例数据库并采集
conn = collector.create_sample_database()
df_db = pd.read_sql('SELECT * FROM employees WHERE salary > 12000', conn)
collector.collected_data.append(('sqlite_memory', df_db))
collector.log_collection('内存数据库', '成功', len(df_db), '查询高薪员工')
# 创建示例CSV数据
df_csv = pd.DataFrame({
'product_id': [101, 102, 103],
'product_name': ['产品A', '产品B', '产品C'],
'price': [99.9, 199.9, 299.9]
})
collector.collected_data.append(('csv', df_csv))
collector.log_collection('CSV数据', '成功', len(df_csv), '产品数据')
# 获取采集摘要
collector.get_collection_summary()
一个完整的数据采集流程需要经过需求分析、方案设计、技术实现、质量控制和持续维护五个阶段。良好的流程设计能够确保数据采集的效率、质量和可持续性。在实际项目中,需要根据具体业务场景和数据特点灵活调整采集策略。
明确采集目标、数据范围、质量要求、时间约束和预算限制。与业务方充分沟通,理解数据用途和分析需求。
评估数据源可获取性,选择采集技术和工具,设计数据模型和存储方案,制定采集计划和应急预案。
开发采集脚本或程序,实现数据解析和清洗逻辑,建立错误处理和重试机制,进行单元测试和集成测试。
建立数据质量检查机制,监控采集过程和结果,记录异常情况和处理措施,定期生成质量报告。
监控数据源变化,及时更新采集规则,优化采集性能,处理历史数据迁移,维护采集文档。
采用模块化设计便于维护扩展;实现增量采集减少资源消耗;建立完善的日志和监控体系;定期备份采集配置和数据。
from dataclasses import dataclass
from typing import List, Dict, Optional
from enum import Enum
from datetime import datetime
import json
class CollectionStatus(Enum):
PENDING = "待执行"
RUNNING = "执行中"
COMPLETED = "已完成"
FAILED = "失败"
CANCELLED = "已取消"
@dataclass
class DataSource:
"""数据源配置"""
name: str
source_type: str # api, crawler, database, file
connection_info: Dict
fields: List[str]
schedule: Optional[str] = None
@dataclass
class CollectionTask:
"""采集任务"""
task_id: str
data_source: DataSource
status: CollectionStatus
start_time: Optional[datetime] = None
end_time: Optional[datetime] = None
records_collected: int = 0
error_message: Optional[str] = None
class DataCollectionPipeline:
"""数据采集流程管理器"""
def __init__(self, project_name: str):
self.project_name = project_name
self.data_sources: List[DataSource] = []
self.tasks: List[CollectionTask] = []
self.config = {
'retry_count': 3,
'retry_delay': 5,
'timeout': 30,
'batch_size': 1000
}
def add_data_source(self, source: DataSource):
"""添加数据源"""
self.data_sources.append(source)
print(f"✅ 已添加数据源: {source.name}")
def create_task(self, source_name: str) -> CollectionTask:
"""创建采集任务"""
source = next((s for s in self.data_sources if s.name == source_name), None)
if not source:
raise ValueError(f"数据源不存在: {source_name}")
task = CollectionTask(
task_id=f"TASK_{datetime.now().strftime('%Y%m%d%H%M%S')}",
data_source=source,
status=CollectionStatus.PENDING
)
self.tasks.append(task)
return task
def execute_task(self, task: CollectionTask):
"""执行采集任务"""
task.status = CollectionStatus.RUNNING
task.start_time = datetime.now()
print(f"\n🚀 开始执行任务: {task.task_id}")
print(f" 数据源: {task.data_source.name}")
print(f" 类型: {task.data_source.source_type}")
try:
# 模拟采集过程
# 实际应用中这里会调用具体的采集逻辑
import time
time.sleep(0.5) # 模拟采集耗时
task.records_collected = 100 # 模拟采集记录数
task.status = CollectionStatus.COMPLETED
print(f" ✅ 采集完成,共 {task.records_collected} 条记录")
except Exception as e:
task.status = CollectionStatus.FAILED
task.error_message = str(e)
print(f" ❌ 采集失败: {e}")
finally:
task.end_time = datetime.now()
def generate_report(self) -> str:
"""生成采集报告"""
report = {
'project': self.project_name,
'generated_at': datetime.now().isoformat(),
'summary': {
'total_sources': len(self.data_sources),
'total_tasks': len(self.tasks),
'completed': sum(1 for t in self.tasks if t.status == CollectionStatus.COMPLETED),
'failed': sum(1 for t in self.tasks if t.status == CollectionStatus.FAILED),
'total_records': sum(t.records_collected for t in self.tasks)
},
'tasks': [
{
'task_id': t.task_id,
'source': t.data_source.name,
'status': t.status.value,
'records': t.records_collected,
'duration': str(t.end_time - t.start_time) if t.end_time else None
}
for t in self.tasks
]
}
return json.dumps(report, ensure_ascii=False, indent=2)
# 使用示例
pipeline = DataCollectionPipeline("电商数据采集项目")
# 配置数据源
api_source = DataSource(
name="商品API",
source_type="api",
connection_info={"base_url": "https://api.example.com", "auth": "token"},
fields=["id", "name", "price", "stock"],
schedule="0 8 * * *" # 每天8点执行
)
db_source = DataSource(
name="订单数据库",
source_type="database",
connection_info={"host": "localhost", "port": 3306, "database": "orders"},
fields=["order_id", "user_id", "amount", "status"]
)
pipeline.add_data_source(api_source)
pipeline.add_data_source(db_source)
# 创建并执行任务
task1 = pipeline.create_task("商品API")
pipeline.execute_task(task1)
task2 = pipeline.create_task("订单数据库")
pipeline.execute_task(task2)
# 生成报告
print("\n" + "=" * 50)
print("📊 采集报告")
print("=" * 50)
print(pipeline.generate_report())
网络爬虫(Web Crawler),又称网络蜘蛛(Web Spider),是一种按照特定规则自动抓取万维网信息的程序。爬虫通过模拟浏览器向Web服务器发送请求,获取网页内容,然后从中提取需要的数据和新的URL,不断循环这个过程直到满足停止条件。
import requests
from urllib.parse import urljoin, urlparse, urldefrag
from collections import deque
import time
import random
class WebCrawler:
"""通用网络爬虫框架"""
def __init__(self, seed_urls, max_pages=100, delay=(1, 3)):
self.seed_urls = seed_urls if isinstance(seed_urls, list) else [seed_urls]
self.max_pages = max_pages
self.delay = delay
self.visited = set() # 已访问URL集合
self.queue = deque() # 待爬URL队列
self.results = [] # 爬取结果
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
})
def normalize_url(self, url, base_url):
"""URL标准化处理"""
# 转换为绝对URL
full_url = urljoin(base_url, url)
# 去除片段标识符
full_url, _ = urldefrag(full_url)
return full_url
def is_valid_url(self, url, base_domain):
"""验证URL有效性"""
parsed = urlparse(url)
# 只爬取相同域名下的页面
return parsed.netloc == base_domain and parsed.scheme in ['http', 'https']
def fetch_page(self, url):
"""获取页面内容"""
try:
response = self.session.get(url, timeout=10)
if response.status_code == 200:
return response.text
print(f"状态码 {response.status_code}: {url}")
except Exception as e:
print(f"请求失败: {url} - {e}")
return None
def parse_page(self, html, url):
"""解析页面(子类重写此方法)"""
from bs4 import BeautifulSoup
soup = BeautifulSoup(html, 'html.parser')
# 提取数据
data = {
'url': url,
'title': soup.title.string if soup.title else None,
'links': []
}
# 提取链接
base_domain = urlparse(url).netloc
for a in soup.find_all('a', href=True):
link = self.normalize_url(a['href'], url)
if self.is_valid_url(link, base_domain):
data['links'].append(link)
return data
def crawl(self):
"""执行爬取任务"""
# 初始化队列
for url in self.seed_urls:
self.queue.append(url)
print(f"开始爬取,种子URL: {len(self.seed_urls)}个,最大页面: {self.max_pages}")
while self.queue and len(self.visited) < self.max_pages:
# 取出URL
url = self.queue.popleft()
# 跳过已访问的URL
if url in self.visited:
continue
# 标记为已访问
self.visited.add(url)
print(f"[{len(self.visited)}/{self.max_pages}] 爬取: {url}")
# 获取页面
html = self.fetch_page(url)
if not html:
continue
# 解析页面
data = self.parse_page(html, url)
self.results.append(data)
# 添加新URL到队列
for link in data.get('links', []):
if link not in self.visited and link not in self.queue:
self.queue.append(link)
# 随机延迟
time.sleep(random.uniform(*self.delay))
print(f"\n爬取完成!共访问 {len(self.visited)} 个页面")
return self.results
# 使用示例
if __name__ == '__main__':
crawler = WebCrawler(
seed_urls=['https://example.com'],
max_pages=10,
delay=(1, 2)
)
results = crawler.crawl()
print(f"\n爬取结果: {len(results)} 条记录")
Requests是Python中最流行的HTTP库,简洁优雅的API设计使其成为发送HTTP请求的首选。BeautifulSoup是一个强大的HTML/XML解析库,能够快速从网页中提取数据。两者结合是Python爬虫开发的经典组合。
支持GET/POST/PUT/DELETE等HTTP方法;自动处理Cookie和Session;支持文件上传和下载;支持代理和超时设置;支持SSL证书验证。
find():查找第一个匹配元素;find_all():查找所有匹配元素;select():CSS选择器查找;get_text():获取文本内容;attrs:获取属性字典。
import requests
from bs4 import BeautifulSoup
import pandas as pd
from datetime import datetime
class WebScraper:
"""网页数据抓取工具"""
def __init__(self):
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
})
def get(self, url, params=None, **kwargs):
"""GET请求"""
response = self.session.get(url, params=params, timeout=10, **kwargs)
response.raise_for_status()
response.encoding = response.apparent_encoding
return response
def post(self, url, data=None, json=None, **kwargs):
"""POST请求"""
response = self.session.post(url, data=data, json=json, timeout=10, **kwargs)
response.raise_for_status()
return response
def parse_html(self, html, parser='html.parser'):
"""解析HTML"""
return BeautifulSoup(html, parser)
def extract_table(self, soup, table_selector='table'):
"""提取表格数据"""
tables = []
for table in soup.select(table_selector):
rows = []
for tr in table.find_all('tr'):
cells = [td.get_text(strip=True) for td in tr.find_all(['th', 'td'])]
if cells:
rows.append(cells)
if rows:
tables.append(rows)
return tables
def extract_links(self, soup, base_url=None, selector='a[href]'):
"""提取链接"""
links = []
for a in soup.select(selector):
href = a.get('href', '')
text = a.get_text(strip=True)
if base_url:
from urllib.parse import urljoin
href = urljoin(base_url, href)
links.append({'text': text, 'href': href})
return links
def extract_images(self, soup, base_url=None, selector='img[src]'):
"""提取图片"""
images = []
for img in soup.select(selector):
src = img.get('src', '')
alt = img.get('alt', '')
if base_url:
from urllib.parse import urljoin
src = urljoin(base_url, src)
images.append({'alt': alt, 'src': src})
return images
# 使用示例
scraper = WebScraper()
# 示例HTML
sample_html = '''
<!DOCTYPE html>
<html>
<head><title>商品列表</title></head>
<body>
<div class="container">
<h1>热门商品</h1>
<table id="products">
<thead>
<tr><th>名称</th><th>价格</th><th>库存</th></tr>
</thead>
<tbody>
<tr><td>商品A</td><td>¥99</td><td>100</td></tr>
<tr><td>商品B</td><td>¥199</td><td>50</td></tr>
<tr><td>商品C</td><td>¥299</td><td>30</td></tr>
</tbody>
</table>
<div class="pagination">
<a href="/page/1" class="active">1</a>
<a href="/page/2">2</a>
<a href="/page/3">3</a>
</div>
</div>
</body>
</html>
'''
soup = scraper.parse_html(sample_html)
# 提取标题
print("=" * 50)
print("📄 页面标题")
print("=" * 50)
print(soup.title.string)
# 提取表格数据
print("\n" + "=" * 50)
print("📊 表格数据")
print("=" * 50)
tables = scraper.extract_table(soup, '#products')
for table in tables:
for row in table:
print(row)
# 提取链接
print("\n" + "=" * 50)
print("🔗 页面链接")
print("=" * 50)
links = scraper.extract_links(soup, 'https://example.com', '.pagination a')
for link in links:
print(f" {link['text']}: {link['href']}")
# 转换为DataFrame
df = pd.DataFrame(tables[0][1:], columns=tables[0][0])
print("\n" + "=" * 50)
print("📈 DataFrame格式")
print("=" * 50)
print(df)
Scrapy是一个功能强大的Python爬虫框架,专为大规模数据抓取而设计。它提供了完整的爬虫开发架构,包括请求调度、并发控制、数据管道、中间件等功能,是构建生产级爬虫的首选框架。
异步非阻塞IO,高性能并发;内置数据去重和断点续爬;支持分布式爬虫扩展;丰富的中间件生态;完善的日志和统计功能。
# Scrapy爬虫示例代码结构
# 文件: myproject/spiders/example_spider.py
import scrapy
from scrapy.item import Field, Item
from itemloaders.processors import MapCompose, TakeFirst
# 定义数据项
class ProductItem(Item):
name = Field() # 商品名称
price = Field() # 价格
stock = Field() # 库存
url = Field() # 商品链接
crawled_at = Field() # 爬取时间
# 定义爬虫
class ProductSpider(scrapy.Spider):
name = 'products'
allowed_domains = ['example.com']
start_urls = ['https://example.com/products']
# 自定义设置
custom_settings = {
'CONCURRENT_REQUESTS': 16, # 并发请求数
'DOWNLOAD_DELAY': 1, # 下载延迟(秒)
'COOKIES_ENABLED': False, # 禁用Cookie
'FEED_FORMAT': 'json', # 输出格式
'FEED_URI': 'products.json', # 输出文件
}
def parse(self, response):
"""解析商品列表页"""
# 提取商品链接
for product in response.css('.product-item'):
url = product.css('a::attr(href)').get()
yield response.follow(url, callback=self.parse_product)
# 翻页处理
next_page = response.css('.next-page::attr(href)').get()
if next_page:
yield response.follow(next_page, callback=self.parse)
def parse_product(self, response):
"""解析商品详情页"""
item = ProductItem()
item['name'] = response.css('h1.title::text').get()
item['price'] = response.css('.price::text').get()
item['stock'] = response.css('.stock::text').get()
item['url'] = response.url
item['crawled_at'] = datetime.now().isoformat()
yield item
# 数据管道示例 (pipelines.py)
class DataCleanPipeline:
"""数据清洗管道"""
def process_item(self, item, spider):
# 清理价格字符串
if item.get('price'):
item['price'] = item['price'].replace('¥', '').strip()
# 清理库存字符串
if item.get('stock'):
item['stock'] = item['stock'].replace('件', '').strip()
return item
class ValidationPipeline:
"""数据验证管道"""
def process_item(self, item, spider):
required_fields = ['name', 'price', 'url']
for field in required_fields:
if not item.get(field):
raise DropItem(f"缺少必填字段: {field}")
return item
class DatabasePipeline:
"""数据库存储管道"""
def open_spider(self, spider):
import sqlite3
self.conn = sqlite3.connect('products.db')
self.conn.execute('''
CREATE TABLE IF NOT EXISTS products (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT, price REAL, stock INTEGER,
url TEXT UNIQUE, crawled_at TEXT
)
''')
def close_spider(self, spider):
self.conn.close()
def process_item(self, item, spider):
self.conn.execute(
'INSERT OR REPLACE INTO products VALUES (NULL, ?, ?, ?, ?, ?)',
(item['name'], item['price'], item.get('stock'),
item['url'], item['crawled_at'])
)
self.conn.commit()
return item
# 运行命令: scrapy crawl products -o products.json
现代网站大量使用JavaScript动态渲染内容,传统的HTTP请求无法获取完整数据。动态网页爬取需要使用浏览器自动化工具(如Selenium、Playwright)或逆向分析API接口。选择合适的方法取决于网站复杂度和性能要求。
内容通过JavaScript异步加载;数据可能来自API接口;页面结构可能动态变化;需要等待元素渲染完成;可能存在反爬检测。
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options
from selenium.common.exceptions import TimeoutException, NoSuchElementException
import time
class DynamicWebScraper:
"""动态网页爬取工具"""
def __init__(self, headless=True):
self.options = Options()
if headless:
self.options.add_argument('--headless')
self.options.add_argument('--disable-gpu')
self.options.add_argument('--no-sandbox')
self.options.add_argument('--disable-dev-shm-usage')
self.options.add_argument('--window-size=1920,1080')
self.options.add_argument('--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64)')
self.driver = None
def start(self):
"""启动浏览器"""
self.driver = webdriver.Chrome(options=self.options)
self.wait = WebDriverWait(self.driver, 10)
def stop(self):
"""关闭浏览器"""
if self.driver:
self.driver.quit()
def get_page(self, url, wait_selector=None, timeout=10):
"""访问页面并等待加载"""
self.driver.get(url)
if wait_selector:
try:
self.wait.until(
EC.presence_of_element_located((By.CSS_SELECTOR, wait_selector))
)
except TimeoutException:
print(f"等待超时: {wait_selector}")
return self.driver.page_source
def scroll_to_bottom(self, pause_time=1):
"""滚动到页面底部(触发懒加载)"""
last_height = self.driver.execute_script(
"return document.body.scrollHeight"
)
while True:
self.driver.execute_script(
"window.scrollTo(0, document.body.scrollHeight);"
)
time.sleep(pause_time)
new_height = self.driver.execute_script(
"return document.body.scrollHeight"
)
if new_height == last_height:
break
last_height = new_height
def click_element(self, selector, wait_after=1):
"""点击元素"""
element = self.wait.until(
EC.element_to_be_clickable((By.CSS_SELECTOR, selector))
)
element.click()
time.sleep(wait_after)
def input_text(self, selector, text, clear_first=True):
"""输入文本"""
element = self.wait.until(
EC.presence_of_element_located((By.CSS_SELECTOR, selector))
)
if clear_first:
element.clear()
element.send_keys(text)
def get_element_text(self, selector):
"""获取元素文本"""
try:
element = self.driver.find_element(By.CSS_SELECTOR, selector)
return element.text
except NoSuchElementException:
return None
def get_elements_text(self, selector):
"""获取多个元素文本"""
elements = self.driver.find_elements(By.CSS_SELECTOR, selector)
return [e.text for e in elements]
def execute_script(self, script, *args):
"""执行JavaScript"""
return self.driver.execute_script(script, *args)
def switch_to_frame(self, frame_selector):
"""切换到iframe"""
frame = self.wait.until(
EC.frame_to_be_available_and_switch_to_it((By.CSS_SELECTOR, frame_selector))
)
return frame
def switch_to_default(self):
"""切换回主文档"""
self.driver.switch_to.default_content()
# 使用示例
scraper = DynamicWebScraper(headless=True)
scraper.start()
try:
# 访问页面
html = scraper.get_page(
'https://example.com/products',
wait_selector='.product-list'
)
# 滚动加载更多内容
scraper.scroll_to_bottom()
# 提取数据
titles = scraper.get_elements_text('.product-title')
prices = scraper.get_elements_text('.product-price')
print("商品列表:")
for title, price in zip(titles, prices):
print(f" {title}: {price}")
# 点击加载更多按钮
try:
scraper.click_element('.load-more')
new_titles = scraper.get_elements_text('.product-title')
print(f"\n加载更多后: {len(new_titles)} 个商品")
except Exception as e:
print(f"加载更多失败: {e}")
finally:
scraper.stop()
网站为了保护数据和服务,会采用各种反爬机制。了解这些机制并采取合理的应对策略,是爬虫开发的重要技能。但请注意,所有应对措施都应在法律和道德框架内使用,尊重网站的robots.txt协议和服务条款。
import requests
import time
import random
import hashlib
from urllib.robotparser import RobotFileParser
from fake_useragent import UserAgent
class PoliteCrawler:
"""合规爬虫示例"""
def __init__(self, base_url):
self.base_url = base_url
self.session = requests.Session()
self.ua = UserAgent()
self.last_request_time = 0
self.min_delay = 1 # 最小请求间隔(秒)
# 检查robots.txt
self.rp = RobotFileParser()
self.rp.set_url(f"{base_url}/robots.txt")
try:
self.rp.read()
print("✅ 已读取robots.txt")
except:
print("⚠️ 无法读取robots.txt")
self.rp = None
def get_headers(self):
"""生成随机请求头"""
return {
'User-Agent': self.ua.random,
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate, br',
'DNT': '1',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
}
def can_fetch(self, url):
"""检查是否允许爬取"""
if self.rp is None:
return True
return self.rp.can_fetch('*', url)
def rate_limit(self):
"""请求频率限制"""
elapsed = time.time() - self.last_request_time
if elapsed < self.min_delay:
delay = self.min_delay - elapsed + random.uniform(0, 0.5)
time.sleep(delay)
self.last_request_time = time.time()
def fetch(self, url, max_retries=3):
"""获取页面内容"""
# 检查robots.txt
if not self.can_fetch(url):
print(f"❌ robots.txt禁止爬取: {url}")
return None
for attempt in range(max_retries):
try:
# 频率限制
self.rate_limit()
# 发送请求
response = self.session.get(
url,
headers=self.get_headers(),
timeout=15
)
# 处理响应
if response.status_code == 200:
response.encoding = response.apparent_encoding
return response.text
elif response.status_code == 429:
# 请求过于频繁,等待后重试
wait_time = (2 ** attempt) + random.uniform(0, 1)
print(f"⏳ 请求频率限制,等待 {wait_time:.1f} 秒")
time.sleep(wait_time)
elif response.status_code == 403:
print(f"🚫 访问被拒绝: {url}")
break
else:
print(f"❓ 状态码 {response.status_code}: {url}")
except requests.exceptions.Timeout:
print(f"⏰ 请求超时 (尝试 {attempt + 1}/{max_retries})")
except requests.exceptions.ConnectionError:
print(f"🔌 连接错误 (尝试 {attempt + 1}/{max_retries})")
except Exception as e:
print(f"❗ 未知错误: {e}")
# 重试前等待
if attempt < max_retries - 1:
time.sleep(2 ** attempt)
return None
def crawl_with_backoff(self, urls):
"""带退避策略的批量爬取"""
results = []
for i, url in enumerate(urls):
print(f"[{i+1}/{len(urls)}] 爬取: {url}")
html = self.fetch(url)
if html:
results.append({'url': url, 'html': html, 'status': 'success'})
else:
results.append({'url': url, 'html': None, 'status': 'failed'})
return results
# 使用示例
crawler = PoliteCrawler('https://example.com')
# 批量爬取
urls = [
'https://example.com/page/1',
'https://example.com/page/2',
'https://example.com/page/3',
]
results = crawler.crawl_with_backoff(urls)
# 统计结果
success = sum(1 for r in results if r['status'] == 'success')
print(f"\n爬取完成: {success}/{len(results)} 成功")
API(Application Programming Interface,应用程序编程接口)是软件系统之间进行通信的桥梁。在数据采集领域,API提供了一种结构化、标准化的数据获取方式,相比网络爬虫更加稳定、高效且合规。现代互联网服务普遍提供开放API,允许开发者按需获取数据。
import requests
import json
from datetime import datetime
class APIClient:
"""通用API客户端基础类"""
def __init__(self, base_url, timeout=30):
self.base_url = base_url.rstrip('/')
self.timeout = timeout
self.session = requests.Session()
self.session.headers.update({
'Content-Type': 'application/json',
'Accept': 'application/json',
'User-Agent': 'DataCollectionBot/1.0'
})
self.last_response = None
def set_headers(self, headers):
"""设置请求头"""
self.session.headers.update(headers)
def request(self, method, endpoint, **kwargs):
"""发送请求"""
url = f"{self.base_url}/{endpoint.lstrip('/')}"
try:
response = self.session.request(
method, url,
timeout=self.timeout,
**kwargs
)
self.last_response = response
# 记录请求日志
print(f"[{datetime.now().strftime('%H:%M:%S')}] {method} {url} -> {response.status_code}")
return self._handle_response(response)
except requests.exceptions.Timeout:
print(f"❌ 请求超时: {url}")
return None
except requests.exceptions.ConnectionError:
print(f"❌ 连接错误: {url}")
return None
except Exception as e:
print(f"❌ 请求异常: {e}")
return None
def _handle_response(self, response):
"""处理响应"""
if response.status_code == 200:
return response.json() if response.text else {}
elif response.status_code == 201:
return response.json() if response.text else {}
elif response.status_code == 204:
return True
elif response.status_code == 400:
error = response.json().get('error', 'Bad Request')
print(f"❌ 参数错误: {error}")
elif response.status_code == 401:
print("❌ 未授权,请检查认证信息")
elif response.status_code == 403:
print("❌ 禁止访问,权限不足")
elif response.status_code == 404:
print("❌ 资源不存在")
elif response.status_code == 429:
print("❌ 请求过于频繁,请稍后重试")
elif response.status_code >= 500:
print(f"❌ 服务器错误: {response.status_code}")
return None
# 便捷方法
def get(self, endpoint, params=None):
return self.request('GET', endpoint, params=params)
def post(self, endpoint, data=None):
return self.request('POST', endpoint, json=data)
def put(self, endpoint, data=None):
return self.request('PUT', endpoint, json=data)
def delete(self, endpoint):
return self.request('DELETE', endpoint)
# 使用示例
client = APIClient('https://jsonplaceholder.typicode.com')
# GET请求获取数据
posts = client.get('/posts', params={'_limit': 3})
print(f"获取到 {len(posts)} 条帖子")
# POST请求创建数据
new_post = client.post('/posts', data={
'title': '数据采集教程',
'body': '学习API调用的最佳实践',
'userId': 1
})
print(f"创建帖子ID: {new_post.get('id')}")
# PUT请求更新数据
updated = client.put('/posts/1', data={
'title': '更新后的标题',
'body': '更新后的内容',
'userId': 1
})
# DELETE请求删除
deleted = client.delete('/posts/1')
print(f"删除结果: {'成功' if deleted else '失败'}")
REST(Representational State Transfer)是一种软件架构风格,RESTful API是遵循REST原则设计的Web API。它使用标准的HTTP方法对资源进行操作,具有无状态、统一接口、可缓存等特点,是现代Web API设计的主流标准。
URL路径版本:/api/v1/users;请求头版本:Accept: application/vnd.api.v1+json;查询参数版本:/api/users?version=1。推荐使用URL路径版本,清晰直观。
import requests
import pandas as pd
from dataclasses import dataclass
from typing import Optional, List, Dict, Any
@dataclass
class APIConfig:
"""API配置"""
base_url: str
api_key: Optional[str] = None
version: str = 'v1'
timeout: int = 30
class RESTfulClient:
"""RESTful API客户端"""
def __init__(self, config: APIConfig):
self.config = config
self.session = requests.Session()
self._setup_session()
def _setup_session(self):
"""配置会话"""
headers = {
'Content-Type': 'application/json',
'Accept': f'application/json'
}
if self.config.api_key:
headers['Authorization'] = f'Bearer {self.config.api_key}'
self.session.headers.update(headers)
def _build_url(self, resource: str, resource_id: Optional[int] = None,
sub_resource: Optional[str] = None) -> str:
"""构建RESTful URL"""
parts = [
self.config.base_url,
f'api/{self.config.version}',
resource
]
if resource_id:
parts.append(str(resource_id))
if sub_resource:
parts.append(sub_resource)
return '/'.join(parts)
# CRUD操作
def list(self, resource: str, params: Dict = None) -> List[Dict]:
"""获取资源列表 GET /resource"""
url = self._build_url(resource)
response = self.session.get(url, params=params, timeout=self.config.timeout)
return self._parse_response(response)
def get(self, resource: str, resource_id: int) -> Dict:
"""获取单个资源 GET /resource/{id}"""
url = self._build_url(resource, resource_id)
response = self.session.get(url, timeout=self.config.timeout)
return self._parse_response(response)
def create(self, resource: str, data: Dict) -> Dict:
"""创建资源 POST /resource"""
url = self._build_url(resource)
response = self.session.post(url, json=data, timeout=self.config.timeout)
return self._parse_response(response)
def update(self, resource: str, resource_id: int, data: Dict,
partial: bool = False) -> Dict:
"""更新资源 PUT/PATCH /resource/{id}"""
url = self._build_url(resource, resource_id)
method = 'patch' if partial else 'put'
response = getattr(self.session, method)(url, json=data, timeout=self.config.timeout)
return self._parse_response(response)
def delete(self, resource: str, resource_id: int) -> bool:
"""删除资源 DELETE /resource/{id}"""
url = self._build_url(resource, resource_id)
response = self.session.delete(url, timeout=self.config.timeout)
return response.status_code in [200, 204]
def get_sub_resources(self, resource: str, resource_id: int,
sub_resource: str, params: Dict = None) -> List[Dict]:
"""获取关联资源 GET /resource/{id}/sub_resource"""
url = self._build_url(resource, resource_id, sub_resource)
response = self.session.get(url, params=params, timeout=self.config.timeout)
return self._parse_response(response)
def _parse_response(self, response) -> Any:
"""解析响应"""
if response.status_code in [200, 201]:
return response.json() if response.text else {}
elif response.status_code == 204:
return True
else:
return {'error': response.status_code, 'message': response.text}
# 使用示例
config = APIConfig(
base_url='https://jsonplaceholder.typicode.com',
version='v1' # 模拟版本
)
client = RESTfulClient(config)
# RESTful操作示例
print("=" * 50)
print("📚 RESTful API操作示例")
print("=" * 50)
# 1. 获取资源列表
users = client.list('users', params={'_limit': 3})
print(f"\n1. 获取用户列表: {len(users)} 条")
# 2. 获取单个资源
user = client.get('users', 1)
print(f"2. 获取用户详情: {user.get('name')}")
# 3. 创建资源
new_user = client.create('users', {
'name': '张三',
'email': 'zhangsan@example.com'
})
print(f"3. 创建用户: ID={new_user.get('id')}")
# 4. 更新资源
updated = client.update('users', 1, {'name': '李四'}, partial=True)
print(f"4. 更新用户: {updated.get('name')}")
# 5. 获取关联资源
posts = client.get_sub_resources('users', 1, 'posts')
print(f"5. 用户1的帖子: {len(posts)} 篇")
# 6. 删除资源
deleted = client.delete('users', 1)
print(f"6. 删除用户: {'成功' if deleted else '失败'}")
API认证是确保数据安全的重要机制。不同的API服务采用不同的认证方式,了解各种认证机制的原理和实现方法,是进行API数据采集的关键技能。选择合适的认证方式需要考虑安全性、易用性和API提供商的要求。
import requests
import base64
import time
import hmac
import hashlib
import urllib.parse
from datetime import datetime
# ========== 1. API Key认证 ==========
class APIKeyAuth:
"""API Key认证客户端"""
def __init__(self, base_url, api_key, key_name='X-API-Key'):
self.base_url = base_url
self.api_key = api_key
self.key_name = key_name
def request(self, method, endpoint, **kwargs):
headers = kwargs.pop('headers', {})
headers[self.key_name] = self.api_key
response = requests.request(
method, f"{self.base_url}/{endpoint}",
headers=headers, **kwargs
)
return response.json() if response.ok else None
# ========== 2. Basic认证 ==========
class BasicAuth:
"""HTTP Basic认证客户端"""
def __init__(self, base_url, username, password):
self.base_url = base_url
self.credentials = base64.b64encode(
f"{username}:{password}".encode()
).decode()
def request(self, method, endpoint, **kwargs):
headers = kwargs.pop('headers', {})
headers['Authorization'] = f'Basic {self.credentials}'
response = requests.request(
method, f"{self.base_url}/{endpoint}",
headers=headers, **kwargs
)
return response.json() if response.ok else None
# ========== 3. Bearer Token认证 ==========
class BearerTokenAuth:
"""Bearer Token认证客户端"""
def __init__(self, base_url, token):
self.base_url = base_url
self.token = token
self.session = requests.Session()
self.session.headers.update({
'Authorization': f'Bearer {token}',
'Content-Type': 'application/json'
})
def refresh_token(self, new_token):
"""刷新令牌"""
self.token = new_token
self.session.headers['Authorization'] = f'Bearer {new_token}'
# ========== 4. OAuth 2.0客户端 ==========
class OAuth2Client:
"""OAuth 2.0认证客户端"""
def __init__(self, client_id, client_secret, token_url,
auth_url=None, redirect_uri=None):
self.client_id = client_id
self.client_secret = client_secret
self.token_url = token_url
self.auth_url = auth_url
self.redirect_uri = redirect_uri
self.access_token = None
self.refresh_token = None
self.expires_at = None
def get_authorization_url(self, scope=None):
"""获取授权URL(授权码模式)"""
params = {
'client_id': self.client_id,
'redirect_uri': self.redirect_uri,
'response_type': 'code',
}
if scope:
params['scope'] = ' '.join(scope)
return f"{self.auth_url}?{urllib.parse.urlencode(params)}"
def get_token(self, code=None, grant_type='client_credentials'):
"""获取访问令牌"""
data = {'grant_type': grant_type}
if grant_type == 'authorization_code':
data['code'] = code
data['redirect_uri'] = self.redirect_uri
elif grant_type == 'refresh_token':
data['refresh_token'] = self.refresh_token
# 客户端认证
auth = (self.client_id, self.client_secret)
response = requests.post(self.token_url, data=data, auth=auth)
if response.ok:
token_data = response.json()
self.access_token = token_data['access_token']
self.refresh_token = token_data.get('refresh_token')
expires_in = token_data.get('expires_in', 3600)
self.expires_at = time.time() + expires_in
return token_data
return None
def is_token_expired(self):
"""检查令牌是否过期"""
if not self.expires_at:
return True
# 提前5分钟刷新
return time.time() > (self.expires_at - 300)
def ensure_valid_token(self):
"""确保令牌有效"""
if self.is_token_expired() and self.refresh_token:
self.get_token(grant_type='refresh_token')
def make_request(self, method, url, **kwargs):
"""发起认证请求"""
self.ensure_valid_token()
headers = kwargs.pop('headers', {})
headers['Authorization'] = f'Bearer {self.access_token}'
return requests.request(method, url, headers=headers, **kwargs)
# ========== 5. 签名认证(阿里云风格) ==========
class SignatureAuth:
"""签名认证客户端"""
def __init__(self, access_key_id, access_key_secret):
self.access_key_id = access_key_id
self.access_key_secret = access_key_secret
def sign(self, method, endpoint, params):
"""生成请求签名"""
# 1. 规范化参数
sorted_params = sorted(params.items())
canonicalized_query = '&'.join([
f"{urllib.parse.quote(k, safe='')}={urllib.parse.quote(str(v), safe='')}"
for k, v in sorted_params
])
# 2. 构造待签名字符串
string_to_sign = f"{method}&{urllib.parse.quote('/', safe='')}&{urllib.parse.quote(canonicalized_query, safe='')}"
# 3. 计算签名
key = (self.access_key_secret + '&').encode('utf-8')
signature = base64.b64encode(
hmac.new(key, string_to_sign.encode('utf-8'), hashlib.sha1).digest()
).decode('utf-8')
return signature
def build_request(self, method, endpoint, params):
"""构建签名请求"""
# 公共参数
public_params = {
'Format': 'JSON',
'Version': '2024-01-01',
'AccessKeyId': self.access_key_id,
'SignatureMethod': 'HMAC-SHA1',
'Timestamp': datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ'),
'SignatureVersion': '1.0',
'SignatureNonce': str(time.time())
}
# 合并参数
all_params = {**public_params, **params}
# 计算签名
signature = self.sign(method, endpoint, all_params)
all_params['Signature'] = signature
return all_params
# 使用示例
print("=" * 50)
print("🔐 API认证方式示例")
print("=" * 50)
# API Key示例
api_key_client = APIKeyAuth('https://api.example.com', 'your-api-key')
print("\n1. API Key认证: 在请求头中添加 X-API-Key")
# Basic Auth示例
basic_client = BasicAuth('https://api.example.com', 'username', 'password')
print("2. Basic认证: 用户名密码Base64编码")
# Bearer Token示例
bearer_client = BearerTokenAuth('https://api.example.com', 'your-access-token')
print("3. Bearer Token: 在Authorization头中携带令牌")
# OAuth 2.0示例
oauth_client = OAuth2Client(
client_id='your-client-id',
client_secret='your-client-secret',
token_url='https://auth.example.com/oauth/token'
)
print("4. OAuth 2.0: 完整的授权流程,支持令牌刷新")
# 签名认证示例
sig_client = SignatureAuth('access-key-id', 'access-key-secret')
print("5. 签名认证: 请求参数签名验证")
API返回的数据通常为JSON格式,需要进行解析、转换、清洗后才能用于分析。对于大量数据,API通常采用分页机制返回,需要实现自动化的分页获取策略。合理的数据处理流程能够提高数据采集效率和数据质量。
import requests
import pandas as pd
import json
from typing import List, Dict, Optional, Callable
from datetime import datetime
import time
class APIPaginator:
"""API分页处理器"""
def __init__(self, base_url: str, session: requests.Session = None):
self.base_url = base_url
self.session = session or requests.Session()
def fetch_page_based(self, endpoint: str,
page_param: str = 'page',
per_page: int = 100,
max_pages: int = None,
data_path: str = 'data',
total_path: str = 'total',
stop_condition: Callable = None) -> List[Dict]:
"""页码分页获取"""
all_data = []
page = 1
while True:
params = {page_param: page, 'per_page': per_page}
response = self.session.get(f"{self.base_url}/{endpoint}", params=params)
if not response.ok:
print(f"请求失败: {response.status_code}")
break
data = response.json()
# 提取数据
page_data = self._extract_nested(data, data_path)
if not page_data:
break
all_data.extend(page_data)
print(f"已获取第 {page} 页,累计 {len(all_data)} 条")
# 检查停止条件
if stop_condition and stop_condition(data):
break
if max_pages and page >= max_pages:
break
# 检查是否还有更多数据
total = self._extract_nested(data, total_path)
if total and len(all_data) >= total:
break
if len(page_data) < per_page:
break
page += 1
time.sleep(0.1) # 避免请求过快
return all_data
def fetch_offset_based(self, endpoint: str,
limit: int = 100,
max_records: int = None,
data_path: str = 'items') -> List[Dict]:
"""偏移分页获取"""
all_data = []
offset = 0
while True:
params = {'offset': offset, 'limit': limit}
response = self.session.get(f"{self.base_url}/{endpoint}", params=params)
if not response.ok:
break
data = response.json()
page_data = self._extract_nested(data, data_path)
if not page_data:
break
all_data.extend(page_data)
print(f"已获取 offset={offset},累计 {len(all_data)} 条")
if max_records and len(all_data) >= max_records:
all_data = all_data[:max_records]
break
if len(page_data) < limit:
break
offset += limit
time.sleep(0.1)
return all_data
def fetch_cursor_based(self, endpoint: str,
cursor_param: str = 'cursor',
cursor_path: str = 'next_cursor',
data_path: str = 'data',
max_requests: int = None) -> List[Dict]:
"""游标分页获取"""
all_data = []
cursor = None
request_count = 0
while True:
params = {}
if cursor:
params[cursor_param] = cursor
response = self.session.get(f"{self.base_url}/{endpoint}", params=params)
if not response.ok:
break
data = response.json()
page_data = self._extract_nested(data, data_path)
if page_data:
all_data.extend(page_data)
request_count += 1
print(f"请求 {request_count} 次,累计 {len(all_data)} 条")
if max_requests and request_count >= max_requests:
break
# 获取下一页游标
next_cursor = self._extract_nested(data, cursor_path)
if not next_cursor or next_cursor == cursor:
break
cursor = next_cursor
time.sleep(0.1)
return all_data
def _extract_nested(self, data: Dict, path: str):
"""提取嵌套数据"""
if not path:
return data
keys = path.split('.')
result = data
for key in keys:
if isinstance(result, dict):
result = result.get(key)
else:
return None
return result
# ========== JSON数据处理工具 ==========
class JSONDataProcessor:
"""JSON数据处理工具"""
@staticmethod
def flatten(data: Dict, separator: str = '_', prefix: str = '') -> Dict:
"""扁平化嵌套JSON"""
result = {}
for key, value in data.items():
new_key = f"{prefix}{separator}{key}" if prefix else key
if isinstance(value, dict):
result.update(JSONDataProcessor.flatten(value, separator, new_key))
elif isinstance(value, list):
result[new_key] = json.dumps(value)
else:
result[new_key] = value
return result
@staticmethod
def to_dataframe(data: List[Dict], flatten_nested: bool = True) -> pd.DataFrame:
"""转换为DataFrame"""
if flatten_nested:
data = [JSONDataProcessor.flatten(item) for item in data]
return pd.DataFrame(data)
@staticmethod
def extract_array(data: Dict, path: str) -> List[Dict]:
"""提取嵌套数组"""
keys = path.split('.')
result = data
for key in keys:
if isinstance(result, dict):
result = result.get(key, [])
else:
return []
return result if isinstance(result, list) else []
# 使用示例
print("=" * 50)
print("📊 API数据处理示例")
print("=" * 50)
# 模拟API响应
mock_api_response = {
'data': {
'items': [
{'id': 1, 'name': '项目1', 'user': {'name': '张三', 'id': 101}},
{'id': 2, 'name': '项目2', 'user': {'name': '李四', 'id': 102}},
{'id': 3, 'name': '项目3', 'user': {'name': '王五', 'id': 103}}
],
'pagination': {
'page': 1,
'total': 3,
'next_cursor': None
}
}
}
# 提取数据
items = JSONDataProcessor.extract_array(mock_api_response, 'data.items')
print(f"\n提取到 {len(items)} 条数据")
# 扁平化处理
flattened = [JSONDataProcessor.flatten(item) for item in items]
print("\n扁平化后的数据:")
for item in flattened:
print(f" {item}")
# 转换为DataFrame
df = JSONDataProcessor.to_dataframe(items)
print(f"\nDataFrame:\n{df}")
健壮的API客户端需要完善的错误处理和重试机制。网络请求可能因各种原因失败,包括网络超时、服务器错误、请求限流等。合理的重试策略能够在保证数据采集完整性的同时,避免对API服务器造成过大压力。
import requests
import time
import random
from functools import wraps
from typing import Callable, Optional
from enum import Enum
class RetryStrategy(Enum):
"""重试策略"""
FIXED = 'fixed' # 固定间隔
LINEAR = 'linear' # 线性递增
EXPONENTIAL = 'exponential' # 指数退避
class RobustAPIClient:
"""健壮的API客户端"""
def __init__(self, base_url: str,
max_retries: int = 3,
retry_strategy: RetryStrategy = RetryStrategy.EXPONENTIAL,
base_delay: float = 1.0,
max_delay: float = 60.0,
rate_limit: int = 10): # 每秒请求数
self.base_url = base_url
self.max_retries = max_retries
self.retry_strategy = retry_strategy
self.base_delay = base_delay
self.max_delay = max_delay
self.rate_limit = rate_limit
self.session = requests.Session()
self.last_request_time = 0
self.request_interval = 1.0 / rate_limit
def _calculate_delay(self, attempt: int) -> float:
"""计算重试延迟"""
if self.retry_strategy == RetryStrategy.FIXED:
delay = self.base_delay
elif self.retry_strategy == RetryStrategy.LINEAR:
delay = self.base_delay * attempt
else: # EXPONENTIAL
delay = self.base_delay * (2 ** attempt)
# 添加随机抖动
delay = min(delay, self.max_delay)
jitter = random.uniform(0, 0.1 * delay)
return delay + jitter
def _rate_limit_wait(self):
"""请求频率限制"""
elapsed = time.time() - self.last_request_time
if elapsed < self.request_interval:
time.sleep(self.request_interval - elapsed)
self.last_request_time = time.time()
def request(self, method: str, endpoint: str,
params: dict = None, data: dict = None,
headers: dict = None) -> Optional[dict]:
"""发送请求(带重试)"""
url = f"{self.base_url}/{endpoint.lstrip('/')}"
for attempt in range(self.max_retries + 1):
try:
# 频率限制
self._rate_limit_wait()
# 发送请求
response = self.session.request(
method, url,
params=params, json=data, headers=headers,
timeout=30
)
# 处理响应
if response.status_code == 200:
return response.json()
elif response.status_code == 429:
# 请求限流,等待后重试
retry_after = int(response.headers.get('Retry-After', 60))
print(f"⚠️ 请求限流,等待 {retry_after} 秒")
time.sleep(retry_after)
continue
elif response.status_code >= 500:
# 服务器错误,重试
if attempt < self.max_retries:
delay = self._calculate_delay(attempt)
print(f"⚠️ 服务器错误 {response.status_code},{delay:.1f}秒后重试")
time.sleep(delay)
continue
return {'error': 'server_error', 'status': response.status_code}
elif response.status_code == 401:
return {'error': 'unauthorized', 'message': '认证失败'}
elif response.status_code == 404:
return {'error': 'not_found', 'message': '资源不存在'}
else:
return {'error': 'unknown', 'status': response.status_code}
except requests.exceptions.Timeout:
if attempt < self.max_retries:
delay = self._calculate_delay(attempt)
print(f"⚠️ 请求超时,{delay:.1f}秒后重试")
time.sleep(delay)
continue
return {'error': 'timeout'}
except requests.exceptions.ConnectionError:
if attempt < self.max_retries:
delay = self._calculate_delay(attempt)
print(f"⚠️ 连接错误,{delay:.1f}秒后重试")
time.sleep(delay)
continue
return {'error': 'connection_error'}
except Exception as e:
return {'error': 'unknown', 'message': str(e)}
return {'error': 'max_retries_exceeded'}
# 便捷方法
def get(self, endpoint, params=None, headers=None):
return self.request('GET', endpoint, params=params, headers=headers)
def post(self, endpoint, data=None, headers=None):
return self.request('POST', endpoint, data=data, headers=headers)
# 装饰器方式实现重试
def retry_on_failure(max_retries=3, delay=1, exceptions=(Exception,)):
"""重试装饰器"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_retries + 1):
try:
return func(*args, **kwargs)
except exceptions as e:
if attempt == max_retries:
raise
wait_time = delay * (2 ** attempt)
print(f"⚠️ {func.__name__} 失败: {e},{wait_time}秒后重试")
time.sleep(wait_time)
return wrapper
return decorator
# 使用示例
print("=" * 50)
print("🔧 健壮API客户端示例")
print("=" * 50)
client = RobustAPIClient(
base_url='https://jsonplaceholder.typicode.com',
max_retries=3,
retry_strategy=RetryStrategy.EXPONENTIAL,
rate_limit=5
)
# 获取数据
result = client.get('/posts/1')
if 'error' not in result:
print(f"\n获取成功: {result.get('title')}")
else:
print(f"\n获取失败: {result}")
# 批量请求示例
print("\n批量获取数据:")
for i in range(1, 4):
post = client.get(f'/posts/{i}')
if 'error' not in post:
print(f" - 帖子{i}: {post.get('title')[:30]}...")
数据质量评估是数据预处理的第一步,也是最重要的环节之一。高质量的数据是数据分析成功的基础。数据质量评估需要从多个维度进行全面检查,包括完整性、准确性、一致性、时效性、唯一性和有效性。通过系统性的评估,可以发现问题并制定相应的处理策略。
import pandas as pd
import numpy as np
from datetime import datetime
from typing import Dict, List, Any
class DataQualityAssessor:
"""数据质量评估器"""
def __init__(self, df: pd.DataFrame):
self.df = df
self.report = {}
self.issues = []
def full_assessment(self) -> Dict:
"""全面数据质量评估"""
self.report = {
'basic_info': self._basic_info(),
'completeness': self._check_completeness(),
'uniqueness': self._check_uniqueness(),
'validity': self._check_validity(),
'consistency': self._check_consistency(),
'accuracy': self._check_accuracy(),
'issues': self.issues
}
return self.report
def _basic_info(self) -> Dict:
"""基本信息"""
return {
'rows': len(self.df),
'columns': len(self.df.columns),
'memory_usage': f"{self.df.memory_usage(deep=True).sum() / 1024 / 1024:.2f} MB",
'dtypes': self.df.dtypes.astype(str).to_dict()
}
def _check_completeness(self) -> Dict:
"""完整性检查"""
missing = self.df.isnull().sum()
result = {
'total_cells': self.df.shape[0] * self.df.shape[1],
'missing_cells': int(missing.sum()),
'completeness_rate': f"{(1 - missing.sum() / (self.df.shape[0] * self.df.shape[1])) * 100:.2f}%",
'missing_by_column': missing.to_dict(),
'columns_with_missing': missing[missing > 0].to_dict()
}
# 记录问题
for col, count in missing[missing > 0].items():
pct = count / len(self.df) * 100
if pct > 50:
self.issues.append(f"⚠️ 列 '{col}' 缺失率高达 {pct:.1f}%")
elif pct > 10:
self.issues.append(f"⚡ 列 '{col}' 缺失率 {pct:.1f}%")
return result
def _check_uniqueness(self) -> Dict:
"""唯一性检查"""
duplicates = self.df.duplicated()
result = {
'total_rows': len(self.df),
'duplicate_rows': int(duplicates.sum()),
'uniqueness_rate': f"{(1 - duplicates.sum() / len(self.df)) * 100:.2f}%",
'unique_values_per_column': self.df.nunique().to_dict()
}
if duplicates.sum() > 0:
self.issues.append(f"🔄 发现 {duplicates.sum()} 条重复记录")
return result
def _check_validity(self) -> Dict:
"""有效性检查"""
result = {'validations': {}}
for col in self.df.columns:
col_issues = []
# 数值列检查
if pd.api.types.is_numeric_dtype(self.df[col]):
# 检查负值(对于应该为正的列)
if self.df[col].min() < 0 and 'id' not in col.lower():
neg_count = (self.df[col] < 0).sum()
if neg_count > 0:
col_issues.append(f"包含 {neg_count} 个负值")
# 检查极端值
q1, q3 = self.df[col].quantile([0.25, 0.75])
iqr = q3 - q1
outliers = ((self.df[col] < q1 - 1.5*iqr) | (self.df[col] > q3 + 1.5*iqr)).sum()
if outliers > len(self.df) * 0.05: # 超过5%
col_issues.append(f"可能存在 {outliers} 个异常值")
# 日期列检查
elif pd.api.types.is_datetime64_any_dtype(self.df[col]):
future_dates = (self.df[col] > datetime.now()).sum()
if future_dates > 0:
col_issues.append(f"包含 {future_dates} 个未来日期")
result['validations'][col] = col_issues
if col_issues:
self.issues.append(f"❌ 列 '{col}': {', '.join(col_issues)}")
return result
def _check_consistency(self) -> Dict:
"""一致性检查"""
result = {'checks': []}
# 检查数据类型一致性
for col in self.df.columns:
types = self.df[col].apply(type).unique()
if len(types) > 1 and type(None) in types:
types = [t for t in types if t != type(None)]
if len(types) > 1:
result['checks'].append(f"列 '{col}' 包含多种数据类型: {types}")
return result
def _check_accuracy(self) -> Dict:
"""准确性检查(基于业务规则)"""
result = {'rules': {}}
# 示例:检查年龄范围
if 'age' in self.df.columns:
invalid_age = ((self.df['age'] < 0) | (self.df['age'] > 120)).sum()
result['rules']['age_range'] = f"{invalid_age} 条记录年龄不在合理范围"
# 示例:检查邮箱格式
if 'email' in self.df.columns:
import re
email_pattern = r'^[\w\.-]+@[\w\.-]+\.\w+$'
invalid_email = (~self.df['email'].fillna('').str.match(email_pattern)).sum()
result['rules']['email_format'] = f"{invalid_email} 条记录邮箱格式无效"
return result
def generate_report(self) -> str:
"""生成可读报告"""
self.full_assessment()
report_lines = [
"=" * 60,
"📊 数据质量评估报告",
"=" * 60,
f"\n【基本信息】",
f" 行数: {self.report['basic_info']['rows']:,}",
f" 列数: {self.report['basic_info']['columns']}",
f" 内存: {self.report['basic_info']['memory_usage']}",
f"\n【完整性】",
f" 完整率: {self.report['completeness']['completeness_rate']}",
f" 缺失单元格: {self.report['completeness']['missing_cells']:,}",
f"\n【唯一性】",
f" 唯一率: {self.report['uniqueness']['uniqueness_rate']}",
f" 重复行: {self.report['uniqueness']['duplicate_rows']}",
f"\n【发现的问题】",
]
for issue in self.issues:
report_lines.append(f" {issue}")
if not self.issues:
report_lines.append(" ✅ 未发现明显数据质量问题")
return "\n".join(report_lines)
# 使用示例
df = pd.DataFrame({
'id': [1, 2, 3, 4, 5, 5, 6],
'name': ['张三', '李四', None, '王五', '赵六', '赵六', '钱七'],
'age': [25, 30, -5, 35, 150, 150, 40],
'score': [85, 90, 78, np.nan, 92, 92, 88],
'email': ['a@test.com', 'invalid', 'c@test.com', 'd@test.com', 'e@test.com', 'e@test.com', None],
'created_at': pd.date_range('2024-01-01', periods=7, freq='D')
})
assessor = DataQualityAssessor(df)
print(assessor.generate_report())
缺失值处理是数据预处理中最常见的任务。处理策略的选择取决于缺失值的类型(完全随机缺失、随机缺失、非随机缺失)、缺失比例和业务场景。不当的处理可能引入偏差,因此需要根据具体情况选择合适的方法。
import pandas as pd
import numpy as np
from sklearn.impute import SimpleImputer, KNNImputer
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer
from typing import Dict, List, Optional
class MissingValueHandler:
"""缺失值处理器"""
def __init__(self, df: pd.DataFrame):
self.df = df.copy()
self.imputation_log = []
def analyze_missing(self) -> Dict:
"""分析缺失值情况"""
missing_info = {
'total_missing': self.df.isnull().sum().sum(),
'missing_by_column': self.df.isnull().sum().to_dict(),
'missing_percentage': (self.df.isnull().sum() / len(self.df) * 100).to_dict(),
'rows_with_missing': self.df.isnull().any(axis=1).sum(),
'missing_pattern': self._analyze_pattern()
}
return missing_info
def _analyze_pattern(self) -> str:
"""分析缺失模式"""
# 检查是否为完全随机缺失
missing_cols = self.df.columns[self.df.isnull().any()].tolist()
if len(missing_cols) == 0:
return "无缺失值"
return f"缺失列: {', '.join(missing_cols)}"
def drop_missing(self, axis=0, thresh: Optional[int] = None,
subset: Optional[List[str]] = None) -> pd.DataFrame:
"""删除缺失值"""
before = len(self.df)
if thresh:
self.df = self.df.dropna(axis=axis, thresh=thresh, subset=subset)
else:
self.df = self.df.dropna(axis=axis, subset=subset)
after = len(self.df)
self.imputation_log.append(f"删除缺失值: {before} -> {after} 行")
return self.df
def fill_constant(self, value: Dict[str, any]) -> pd.DataFrame:
"""固定值填充"""
self.df = self.df.fillna(value)
self.imputation_log.append(f"固定值填充: {value}")
return self.df
def fill_statistics(self, strategy: str = 'mean',
columns: Optional[List[str]] = None) -> pd.DataFrame:
"""统计量填充"""
cols = columns or self.df.select_dtypes(include=[np.number]).columns.tolist()
for col in cols:
if self.df[col].isnull().any():
if strategy == 'mean':
self.df[col].fillna(self.df[col].mean(), inplace=True)
elif strategy == 'median':
self.df[col].fillna(self.df[col].median(), inplace=True)
elif strategy == 'mode':
self.df[col].fillna(self.df[col].mode()[0], inplace=True)
self.imputation_log.append(f"统计量填充({strategy}): {cols}")
return self.df
def fill_forward_backward(self, method: str = 'ffill',
columns: Optional[List[str]] = None) -> pd.DataFrame:
"""前后向填充"""
cols = columns or self.df.columns.tolist()
for col in cols:
if method == 'ffill':
self.df[col] = self.df[col].ffill()
elif method == 'bfill':
self.df[col] = self.df[col].bfill()
self.imputation_log.append(f"前后向填充({method}): {cols}")
return self.df
def fill_interpolate(self, method: str = 'linear',
columns: Optional[List[str]] = None) -> pd.DataFrame:
"""插值填充"""
cols = columns or self.df.select_dtypes(include=[np.number]).columns.tolist()
for col in cols:
self.df[col] = self.df[col].interpolate(method=method)
self.imputation_log.append(f"插值填充({method}): {cols}")
return self.df
def fill_knn(self, n_neighbors: int = 5,
columns: Optional[List[str]] = None) -> pd.DataFrame:
"""KNN填充"""
cols = columns or self.df.select_dtypes(include=[np.number]).columns.tolist()
imputer = KNNImputer(n_neighbors=n_neighbors)
self.df[cols] = imputer.fit_transform(self.df[cols])
self.imputation_log.append(f"KNN填充(k={n_neighbors}): {cols}")
return self.df
def fill_mice(self, columns: Optional[List[str]] = None,
max_iter: int = 10) -> pd.DataFrame:
"""MICE多重插补"""
cols = columns or self.df.select_dtypes(include=[np.number]).columns.tolist()
imputer = IterativeImputer(max_iter=max_iter, random_state=42)
self.df[cols] = imputer.fit_transform(self.df[cols])
self.imputation_log.append(f"MICE插补: {cols}")
return self.df
def get_log(self) -> List[str]:
"""获取处理日志"""
return self.imputation_log
# 使用示例
print("=" * 50)
print("🔧 缺失值处理示例")
print("=" * 50)
df = pd.DataFrame({
'A': [1, 2, np.nan, 4, 5, np.nan, 7],
'B': [10, np.nan, 30, 40, np.nan, 60, 70],
'C': ['x', 'y', np.nan, 'y', 'x', np.nan, 'z'],
'D': [1.1, 2.2, 3.3, np.nan, 5.5, 6.6, np.nan]
})
print("\n原始数据:")
print(df)
handler = MissingValueHandler(df)
# 分析缺失值
missing_info = handler.analyze_missing()
print(f"\n缺失值统计: {missing_info['missing_by_column']}")
# 使用不同方法填充
df_filled = handler.fill_statistics(strategy='mean', columns=['A', 'B'])
df_filled = handler.fill_statistics(strategy='mode', columns=['C'])
df_filled = handler.fill_interpolate(method='linear', columns=['D'])
print("\n填充后数据:")
print(df_filled)
print(f"\n处理日志: {handler.get_log()}")
异常值是指明显偏离其他观测值的数据点。异常值可能是数据采集错误,也可能是真实的极端情况。正确识别和处理异常值对数据分析结果有重要影响。处理异常值前,需要先判断其产生原因,避免错误地删除有价值的极端数据。
import pandas as pd
import numpy as np
from scipy import stats
from sklearn.ensemble import IsolationForest
from sklearn.cluster import DBSCAN
from typing import Tuple, List, Optional
class OutlierDetector:
"""异常值检测器"""
def __init__(self, df: pd.DataFrame):
self.df = df
self.outliers = {}
def detect_iqr(self, column: str, multiplier: float = 1.5) -> Tuple[pd.DataFrame, float, float]:
"""IQR方法检测异常值"""
Q1 = self.df[column].quantile(0.25)
Q3 = self.df[column].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - multiplier * IQR
upper_bound = Q3 + multiplier * IQR
outliers = self.df[(self.df[column] < lower_bound) | (self.df[column] > upper_bound)]
self.outliers[f'{column}_iqr'] = {
'method': 'IQR',
'count': len(outliers),
'bounds': (lower_bound, upper_bound),
'indices': outliers.index.tolist()
}
return outliers, lower_bound, upper_bound
def detect_zscore(self, column: str, threshold: float = 3) -> pd.DataFrame:
"""Z-score方法检测异常值"""
z_scores = np.abs(stats.zscore(self.df[column].dropna()))
outlier_mask = z_scores > threshold
outliers = self.df.loc[self.df[column].dropna().index[outlier_mask]]
self.outliers[f'{column}_zscore'] = {
'method': 'Z-score',
'count': len(outliers),
'threshold': threshold,
'indices': outliers.index.tolist()
}
return outliers
def detect_isolation_forest(self, columns: List[str],
contamination: float = 0.1) -> pd.DataFrame:
"""孤立森林检测异常值"""
data = self.df[columns].dropna()
iso_forest = IsolationForest(contamination=contamination, random_state=42)
predictions = iso_forest.fit_predict(data)
outliers = data[predictions == -1]
self.outliers['isolation_forest'] = {
'method': 'Isolation Forest',
'count': len(outliers),
'contamination': contamination,
'indices': outliers.index.tolist()
}
return outliers
def detect_dbscan(self, columns: List[str],
eps: float = 0.5, min_samples: int = 5) -> pd.DataFrame:
"""DBSCAN聚类检测异常值"""
data = self.df[columns].dropna()
dbscan = DBSCAN(eps=eps, min_samples=min_samples)
clusters = dbscan.fit_predict(data)
# 标签为-1的是噪声点(异常值)
outliers = data[clusters == -1]
self.outliers['dbscan'] = {
'method': 'DBSCAN',
'count': len(outliers),
'indices': outliers.index.tolist()
}
return outliers
def get_summary(self) -> pd.DataFrame:
"""获取异常值检测摘要"""
summary = []
for key, info in self.outliers.items():
summary.append({
'detection': key,
'method': info['method'],
'outlier_count': info['count'],
'percentage': f"{info['count'] / len(self.df) * 100:.2f}%"
})
return pd.DataFrame(summary)
class OutlierHandler:
"""异常值处理器"""
def __init__(self, df: pd.DataFrame):
self.df = df.copy()
self.processing_log = []
def remove_outliers(self, column: str,
lower: float, upper: float) -> pd.DataFrame:
"""删除异常值"""
before = len(self.df)
self.df = self.df[(self.df[column] >= lower) & (self.df[column] <= upper)]
after = len(self.df)
self.processing_log.append(f"删除异常值({column}): {before} -> {after} 行")
return self.df
def clip_outliers(self, column: str,
lower: float, upper: float) -> pd.DataFrame:
"""截断异常值"""
self.df[column] = self.df[column].clip(lower, upper)
self.processing_log.append(f"截断异常值({column}): [{lower}, {upper}]")
return self.df
def replace_with_median(self, column: str,
lower: float, upper: float) -> pd.DataFrame:
"""用中位数替换异常值"""
median_val = self.df[column].median()
mask = (self.df[column] < lower) | (self.df[column] > upper)
self.df.loc[mask, column] = median_val
self.processing_log.append(f"中位数替换异常值({column}): {mask.sum()} 个")
return self.df
def winsorize(self, column: str, limits: Tuple[float, float] = (0.05, 0.05)) -> pd.DataFrame:
"""缩尾处理"""
from scipy.stats import mstats
self.df[column] = mstats.winsorize(self.df[column], limits=limits)
self.processing_log.append(f"缩尾处理({column}): limits={limits}")
return self.df
def transform_log(self, column: str) -> pd.DataFrame:
"""对数转换(减少极端值影响)"""
self.df[f'{column}_log'] = np.log1p(self.df[column])
self.processing_log.append(f"对数转换: {column}")
return self.df
# 使用示例
print("=" * 50)
print("🎯 异常值检测与处理示例")
print("=" * 50)
np.random.seed(42)
df = pd.DataFrame({
'value': np.concatenate([
np.random.normal(100, 10, 100),
[200, 250, 300, 50, -30] # 异常值
])
})
detector = OutlierDetector(df)
# IQR方法
outliers_iqr, lower, upper = detector.detect_iqr('value')
print(f"\nIQR方法检测到 {len(outliers_iqr)} 个异常值")
print(f"正常范围: [{lower:.2f}, {upper:.2f}]")
# Z-score方法
outliers_z = detector.detect_zscore('value')
print(f"Z-score方法检测到 {len(outliers_z)} 个异常值")
# 孤立森林
outliers_iso = detector.detect_isolation_forest(['value'])
print(f"孤立森林检测到 {len(outliers_iso)} 个异常值")
# 处理异常值
handler = OutlierHandler(df)
df_clipped = handler.clip_outliers('value', lower, upper)
print(f"\n处理后的数据范围: [{df_clipped['value'].min():.2f}, {df_clipped['value'].max():.2f}]")
# 获取摘要
print("\n异常值检测摘要:")
print(detector.get_summary())
数据清洗和转换是将原始数据转化为适合分析格式的过程。包括数据类型转换、格式标准化、特征编码、特征缩放、特征衍生等操作。良好的数据转换能够提高后续分析和建模的效果。
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler, MinMaxScaler, LabelEncoder, OneHotEncoder
from sklearn.preprocessing import RobustScaler, PowerTransformer
from typing import Dict, List, Optional
import re
class DataTransformer:
"""数据转换器"""
def __init__(self, df: pd.DataFrame):
self.df = df.copy()
self.transformers = {}
self.transform_log = []
# ========== 字符串清洗 ==========
def clean_strings(self, columns: Optional[List[str]] = None,
strip: bool = True, lower: bool = False) -> pd.DataFrame:
"""清洗字符串列"""
cols = columns or self.df.select_dtypes(include=['object']).columns.tolist()
for col in cols:
if strip:
self.df[col] = self.df[col].astype(str).str.strip()
if lower:
self.df[col] = self.df[col].str.lower()
self.transform_log.append(f"字符串清洗: {cols}")
return self.df
def remove_special_chars(self, column: str,
pattern: str = r'[^a-zA-Z0-9\u4e00-\u9fa5]') -> pd.DataFrame:
"""移除特殊字符"""
self.df[column] = self.df[column].astype(str).str.replace(pattern, '', regex=True)
return self.df
# ========== 数据类型转换 ==========
def to_numeric(self, column: str, errors: str = 'coerce') -> pd.DataFrame:
"""转换为数值类型"""
self.df[column] = pd.to_numeric(self.df[column], errors=errors)
self.transform_log.append(f"数值转换: {column}")
return self.df
def to_datetime(self, column: str, format: Optional[str] = None) -> pd.DataFrame:
"""转换为日期类型"""
self.df[column] = pd.to_datetime(self.df[column], format=format, errors='coerce')
self.transform_log.append(f"日期转换: {column}")
return self.df
def to_category(self, column: str) -> pd.DataFrame:
"""转换为分类类型"""
self.df[column] = self.df[column].astype('category')
self.transform_log.append(f"分类转换: {column}")
return self.df
# ========== 特征编码 ==========
def label_encode(self, column: str) -> pd.DataFrame:
"""标签编码"""
le = LabelEncoder()
self.df[f'{column}_encoded'] = le.fit_transform(self.df[column].astype(str))
self.transformers[f'{column}_label_encoder'] = le
self.transform_log.append(f"标签编码: {column}")
return self.df
def one_hot_encode(self, columns: List[str],
drop_first: bool = False) -> pd.DataFrame:
"""独热编码"""
self.df = pd.get_dummies(self.df, columns=columns, drop_first=drop_first)
self.transform_log.append(f"独热编码: {columns}")
return self.df
# ========== 特征缩放 ==========
def standard_scale(self, column: str) -> pd.DataFrame:
"""标准化(Z-score)"""
scaler = StandardScaler()
self.df[f'{column}_scaled'] = scaler.fit_transform(self.df[[column]])
self.transformers[f'{column}_standard_scaler'] = scaler
self.transform_log.append(f"标准化: {column}")
return self.df
def minmax_scale(self, column: str,
feature_range: tuple = (0, 1)) -> pd.DataFrame:
"""归一化"""
scaler = MinMaxScaler(feature_range=feature_range)
self.df[f'{column}_normalized'] = scaler.fit_transform(self.df[[column]])
self.transformers[f'{column}_minmax_scaler'] = scaler
self.transform_log.append(f"归一化: {column}")
return self.df
def robust_scale(self, column: str) -> pd.DataFrame:
"""稳健缩放(抗异常值)"""
scaler = RobustScaler()
self.df[f'{column}_robust'] = scaler.fit_transform(self.df[[column]])
self.transform_log.append(f"稳健缩放: {column}")
return self.df
# ========== 特征衍生 ==========
def extract_datetime_features(self, column: str) -> pd.DataFrame:
"""提取日期特征"""
dt = pd.to_datetime(self.df[column])
self.df[f'{column}_year'] = dt.dt.year
self.df[f'{column}_month'] = dt.dt.month
self.df[f'{column}_day'] = dt.dt.day
self.df[f'{column}_dayofweek'] = dt.dt.dayofweek
self.df[f'{column}_quarter'] = dt.dt.quarter
self.df[f'{column}_is_weekend'] = dt.dt.dayofweek.isin([5, 6]).astype(int)
self.transform_log.append(f"日期特征提取: {column}")
return self.df
def create_bins(self, column: str, bins: int = 5,
labels: Optional[List[str]] = None) -> pd.DataFrame:
"""数据分箱"""
self.df[f'{column}_binned'] = pd.cut(self.df[column], bins=bins, labels=labels)
self.transform_log.append(f"数据分箱: {column}")
return self.df
def create_interaction(self, col1: str, col2: str,
operation: str = 'multiply') -> pd.DataFrame:
"""创建交互特征"""
if operation == 'multiply':
self.df[f'{col1}_x_{col2}'] = self.df[col1] * self.df[col2]
elif operation == 'add':
self.df[f'{col1}_plus_{col2}'] = self.df[col1] + self.df[col2]
elif operation == 'divide':
self.df[f'{col1}_div_{col2}'] = self.df[col1] / self.df[col2]
self.transform_log.append(f"交互特征: {col1} {operation} {col2}")
return self.df
def get_transform_log(self) -> List[str]:
"""获取转换日志"""
return self.transform_log
# 使用示例
print("=" * 50)
print("🔄 数据转换示例")
print("=" * 50)
df = pd.DataFrame({
'name': [' 张三 ', '李四', '王五 ', '赵六'],
'age': ['25', '30', '35', '40'],
'income': [5000, 6000, 8000, 10000],
'category': ['A', 'B', 'A', 'C'],
'date': ['2024-01-01', '2024-02-15', '2024-03-20', '2024-04-10']
})
print("\n原始数据:")
print(df)
transformer = DataTransformer(df)
# 字符串清洗
transformer.clean_strings(['name'])
# 类型转换
transformer.to_numeric('age')
transformer.to_datetime('date')
# 特征编码
transformer.label_encode('category')
# 特征缩放
transformer.standard_scale('income')
transformer.minmax_scale('income')
# 特征衍生
transformer.extract_datetime_features('date')
transformer.create_bins('age', bins=3, labels=['青年', '中年', '壮年'])
print("\n转换后数据:")
print(transformer.df)
print(f"\n转换操作: {transformer.get_transform_log()}")
数据验证是确保数据符合预期格式和业务规则的重要步骤。通过建立验证规则,可以在数据进入分析流程前发现并处理问题。良好的数据验证机制能够提高数据质量,减少后续分析中的错误。
import pandas as pd
import numpy as np
import re
from typing import Dict, List, Callable, Any
from dataclasses import dataclass
@dataclass
class ValidationResult:
"""验证结果"""
is_valid: bool
column: str
rule: str
failed_count: int
failed_indices: List[int]
message: str
class DataValidator:
"""数据验证器"""
def __init__(self, df: pd.DataFrame):
self.df = df
self.rules = {}
self.results = []
def add_rule(self, column: str, rule_name: str,
validator: Callable, message: str = ""):
"""添加验证规则"""
if column not in self.rules:
self.rules[column] = []
self.rules[column].append({
'name': rule_name,
'validator': validator,
'message': message
})
def validate_not_null(self, column: str) -> 'DataValidator':
"""非空验证"""
self.add_rule(column, 'not_null',
lambda x: x.notna(),
f"列 '{column}' 不允许为空")
return self
def validate_unique(self, column: str) -> 'DataValidator':
"""唯一性验证"""
self.add_rule(column, 'unique',
lambda x: ~x.duplicated(),
f"列 '{column}' 值必须唯一")
return self
def validate_range(self, column: str, min_val: Any, max_val: Any) -> 'DataValidator':
"""范围验证"""
self.add_rule(column, 'range',
lambda x: (x >= min_val) & (x <= max_val),
f"列 '{column}' 值必须在 [{min_val}, {max_val}] 范围内")
return self
def validate_regex(self, column: str, pattern: str) -> 'DataValidator':
"""正则表达式验证"""
self.add_rule(column, 'regex',
lambda x: x.astype(str).str.match(pattern),
f"列 '{column}' 格式不正确")
return self
def validate_in_list(self, column: str, allowed: List[Any]) -> 'DataValidator':
"""枚举值验证"""
self.add_rule(column, 'in_list',
lambda x: x.isin(allowed),
f"列 '{column}' 值必须在 {allowed} 中")
return self
def validate_custom(self, column: str, validator: Callable,
rule_name: str, message: str) -> 'DataValidator':
"""自定义验证"""
self.add_rule(column, rule_name, validator, message)
return self
def run_validation(self) -> List[ValidationResult]:
"""执行所有验证"""
self.results = []
for column, rules in self.rules.items():
for rule in rules:
try:
mask = rule['validator'](self.df[column])
failed_indices = self.df[~mask].index.tolist()
result = ValidationResult(
is_valid=len(failed_indices) == 0,
column=column,
rule=rule['name'],
failed_count=len(failed_indices),
failed_indices=failed_indices,
message=rule['message'] if failed_indices else "验证通过"
)
self.results.append(result)
except Exception as e:
result = ValidationResult(
is_valid=False,
column=column,
rule=rule['name'],
failed_count=0,
failed_indices=[],
message=f"验证异常: {str(e)}"
)
self.results.append(result)
return self.results
def get_report(self) -> str:
"""生成验证报告"""
lines = [
"=" * 60,
"📋 数据验证报告",
"=" * 60
]
passed = sum(1 for r in self.results if r.is_valid)
failed = sum(1 for r in self.results if not r.is_valid)
lines.append(f"\n验证规则: {len(self.results)} 条")
lines.append(f"通过: {passed} 条")
lines.append(f"失败: {failed} 条")
if failed > 0:
lines.append("\n【失败的验证】")
for r in self.results:
if not r.is_valid:
lines.append(f" ❌ {r.column}.{r.rule}: {r.message}")
lines.append(f" 失败记录: {r.failed_count} 条")
if r.failed_indices and len(r.failed_indices) <= 5:
lines.append(f" 索引: {r.failed_indices}")
return "\n".join(lines)
# 使用示例
print("=" * 50)
print("📋 数据验证示例")
print("=" * 50)
df = pd.DataFrame({
'id': [1, 2, 3, 4, 5, 5], # 重复值
'name': ['张三', '李四', None, '王五', '赵六', '赵六'], # 空值
'age': [25, 30, -5, 35, 150, 40], # 异常值
'email': ['a@test.com', 'invalid', 'c@test.com', 'd@test.com', None, 'f@test.com'],
'status': ['active', 'inactive', 'active', 'pending', 'active', 'unknown'] # 非法值
})
validator = DataValidator(df)
# 添加验证规则
validator.validate_not_null('name') \
.validate_unique('id') \
.validate_range('age', 0, 120) \
.validate_regex('email', r'^[\w\.-]+@[\w\.-]+\.\w+$') \
.validate_in_list('status', ['active', 'inactive', 'pending'])
# 执行验证
results = validator.run_validation()
# 输出报告
print(validator.get_report())
文件存储是最基础、最常用的数据存储方式。不同的文件格式各有特点,选择合适的格式需要考虑数据类型、数据量、读写性能、跨平台兼容性等因素。了解各种格式的特点和适用场景,能够帮助我们做出正确的技术选择。
import pandas as pd
import json
import os
from typing import Dict, List, Optional
from datetime import datetime
class FileStorageManager:
"""文件存储管理器"""
def __init__(self, base_dir: str = 'data'):
self.base_dir = base_dir
os.makedirs(base_dir, exist_ok=True)
# ========== CSV操作 ==========
def save_csv(self, df: pd.DataFrame, filename: str,
mode: str = 'w', **kwargs) -> str:
"""保存CSV文件"""
filepath = os.path.join(self.base_dir, filename)
default_kwargs = {
'index': False,
'encoding': 'utf-8-sig' # 支持中文
}
default_kwargs.update(kwargs)
if mode == 'a' and os.path.exists(filepath):
default_kwargs['header'] = False
df.to_csv(filepath, mode=mode, **default_kwargs)
return filepath
def load_csv(self, filename: str, **kwargs) -> pd.DataFrame:
"""加载CSV文件"""
filepath = os.path.join(self.base_dir, filename)
return pd.read_csv(filepath, **kwargs)
# ========== JSON操作 ==========
def save_json(self, data, filename: str,
orient: str = 'records', indent: int = 2) -> str:
"""保存JSON文件"""
filepath = os.path.join(self.base_dir, filename)
if isinstance(data, pd.DataFrame):
data.to_json(filepath, orient=orient,
force_ascii=False, indent=indent)
else:
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=indent)
return filepath
def load_json(self, filename: str) -> any:
"""加载JSON文件"""
filepath = os.path.join(self.base_dir, filename)
with open(filepath, 'r', encoding='utf-8') as f:
return json.load(f)
# ========== Excel操作 ==========
def save_excel(self, df: pd.DataFrame, filename: str,
sheet_name: str = 'Sheet1') -> str:
"""保存Excel文件"""
filepath = os.path.join(self.base_dir, filename)
df.to_excel(filepath, index=False, sheet_name=sheet_name)
return filepath
def save_multi_sheet(self, data_dict: Dict[str, pd.DataFrame],
filename: str) -> str:
"""保存多sheet Excel"""
filepath = os.path.join(self.base_dir, filename)
with pd.ExcelWriter(filepath, engine='openpyxl') as writer:
for sheet_name, df in data_dict.items():
df.to_excel(writer, sheet_name=sheet_name, index=False)
return filepath
def load_excel(self, filename: str,
sheet_name: str = None) -> pd.DataFrame:
"""加载Excel文件"""
filepath = os.path.join(self.base_dir, filename)
if sheet_name:
return pd.read_excel(filepath, sheet_name=sheet_name)
return pd.read_excel(filepath)
def load_all_sheets(self, filename: str) -> Dict[str, pd.DataFrame]:
"""加载所有sheet"""
filepath = os.path.join(self.base_dir, filename)
return pd.read_excel(filepath, sheet_name=None)
# ========== Parquet操作(大数据推荐) ==========
def save_parquet(self, df: pd.DataFrame,
filename: str, compression: str = 'snappy') -> str:
"""保存Parquet文件(列式存储,高效压缩)"""
filepath = os.path.join(self.base_dir, filename)
df.to_parquet(filepath, compression=compression, index=False)
return filepath
def load_parquet(self, filename: str,
columns: List[str] = None) -> pd.DataFrame:
"""加载Parquet文件"""
filepath = os.path.join(self.base_dir, filename)
return pd.read_parquet(filepath, columns=columns)
# ========== 文件信息 ==========
def get_file_info(self, filename: str) -> Dict:
"""获取文件信息"""
filepath = os.path.join(self.base_dir, filename)
if not os.path.exists(filepath):
return {'exists': False}
stat = os.stat(filepath)
return {
'exists': True,
'size_bytes': stat.st_size,
'size_mb': round(stat.st_size / 1024 / 1024, 2),
'modified': datetime.fromtimestamp(stat.st_mtime).isoformat(),
'path': filepath
}
def list_files(self, pattern: str = '*') -> List[Dict]:
"""列出所有文件"""
import glob
files = []
for filepath in glob.glob(os.path.join(self.base_dir, pattern)):
info = self.get_file_info(os.path.basename(filepath))
info['filename'] = os.path.basename(filepath)
files.append(info)
return files
# 使用示例
print("=" * 50)
print("📁 文件存储示例")
print("=" * 50)
manager = FileStorageManager('data_output')
# 示例数据
df = pd.DataFrame({
'id': [1, 2, 3],
'name': ['张三', '李四', '王五'],
'score': [85, 90, 78],
'tags': [['python', 'data'], ['java', 'web'], ['sql', 'db']]
})
# 保存为不同格式
csv_path = manager.save_csv(df, 'students.csv')
json_path = manager.save_json(df, 'students.json')
excel_path = manager.save_excel(df, 'students.xlsx')
print(f"\n文件已保存:")
print(f" CSV: {csv_path}")
print(f" JSON: {json_path}")
print(f" Excel: {excel_path}")
# 获取文件信息
print(f"\n文件大小对比:")
for f in manager.list_files('students.*'):
print(f" {f['filename']}: {f['size_mb']} MB")
数据库存储适合大规模数据的持久化存储和高效查询。关系型数据库(如SQLite、MySQL、PostgreSQL)适合结构化数据,NoSQL数据库(如MongoDB、Redis)适合半结构化和非结构化数据。选择合适的数据库需要考虑数据量、查询模式、并发需求等因素。
import sqlite3
import pandas as pd
from typing import List, Dict, Optional, Any
from contextlib import contextmanager
from datetime import datetime
class SQLiteDatabase:
"""SQLite数据库管理器"""
def __init__(self, db_path: str = 'data.db'):
self.db_path = db_path
@contextmanager
def get_connection(self):
"""获取数据库连接(上下文管理器)"""
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row # 返回字典格式
try:
yield conn
conn.commit()
except Exception as e:
conn.rollback()
raise e
finally:
conn.close()
def execute(self, sql: str, params: tuple = None) -> int:
"""执行SQL语句"""
with self.get_connection() as conn:
cursor = conn.cursor()
if params:
cursor.execute(sql, params)
else:
cursor.execute(sql)
return cursor.rowcount
def executemany(self, sql: str, params_list: List[tuple]) -> int:
"""批量执行"""
with self.get_connection() as conn:
cursor = conn.cursor()
cursor.executemany(sql, params_list)
return cursor.rowcount
def query(self, sql: str, params: tuple = None) -> List[Dict]:
"""查询数据"""
with self.get_connection() as conn:
cursor = conn.cursor()
if params:
cursor.execute(sql, params)
else:
cursor.execute(sql)
return [dict(row) for row in cursor.fetchall()]
def query_df(self, sql: str, params: tuple = None) -> pd.DataFrame:
"""查询数据返回DataFrame"""
with self.get_connection() as conn:
return pd.read_sql_query(sql, conn, params=params)
def insert_df(self, df: pd.DataFrame, table: str,
if_exists: str = 'append') -> int:
"""插入DataFrame数据"""
with self.get_connection() as conn:
df.to_sql(table, conn, if_exists=if_exists, index=False)
return len(df)
def table_exists(self, table: str) -> bool:
"""检查表是否存在"""
sql = "SELECT name FROM sqlite_master WHERE type='table' AND name=?"
result = self.query(sql, (table,))
return len(result) > 0
def get_table_schema(self, table: str) -> List[Dict]:
"""获取表结构"""
return self.query(f"PRAGMA table_info({table})")
def create_table(self, table: str, columns: Dict[str, str],
primary_key: str = None) -> bool:
"""创建表"""
col_defs = []
for col, dtype in columns.items():
col_def = f"{col} {dtype}"
if col == primary_key:
col_def += " PRIMARY KEY"
col_defs.append(col_def)
sql = f"CREATE TABLE IF NOT EXISTS {table} ({', '.join(col_defs)})"
self.execute(sql)
return True
def get_tables(self) -> List[str]:
"""获取所有表名"""
result = self.query("SELECT name FROM sqlite_master WHERE type='table'")
return [row['name'] for row in result]
# 使用示例
print("=" * 50)
print("🗄️ SQLite数据库示例")
print("=" * 50)
db = SQLiteDatabase(':memory:') # 内存数据库
# 创建表
db.create_table('users', {
'id': 'INTEGER',
'name': 'TEXT NOT NULL',
'email': 'TEXT UNIQUE',
'score': 'REAL',
'created_at': 'TEXT'
}, primary_key='id')
# 插入数据
df = pd.DataFrame({
'name': ['张三', '李四', '王五'],
'email': ['zhang@test.com', 'li@test.com', 'wang@test.com'],
'score': [85.5, 90.0, 78.5],
'created_at': [datetime.now().isoformat()] * 3
})
db.insert_df(df, 'users')
print(f"\n插入 {len(df)} 条记录")
# 查询数据
users = db.query_df("SELECT * FROM users WHERE score > 80")
print(f"\n查询结果(score > 80):")
print(users)
# 更新数据
db.execute("UPDATE users SET score = ? WHERE name = ?", (95.0, '张三'))
print("\n更新张三的分数为95.0")
# 聚合查询
stats = db.query_df("""
SELECT
COUNT(*) as total,
AVG(score) as avg_score,
MAX(score) as max_score,
MIN(score) as min_score
FROM users
""")
print(f"\n统计信息:")
print(stats)
NoSQL数据库适合存储半结构化和非结构化数据,具有灵活的数据模型、水平扩展能力和高性能的特点。MongoDB是最流行的文档数据库,Redis是高性能的键值存储,它们在数据采集场景中应用广泛。
# MongoDB示例(需要安装pymongo)
# pip install pymongo
from typing import Dict, List, Optional, Any
from datetime import datetime
import json
# 模拟MongoDB操作(实际使用需要连接真实数据库)
class MockMongoCollection:
"""模拟MongoDB集合(演示用)"""
def __init__(self, name: str):
self.name = name
self.data = []
self._id_counter = 0
def insert_one(self, document: Dict) -> str:
"""插入单条文档"""
self._id_counter += 1
document['_id'] = f"doc_{self._id_counter}"
document['created_at'] = datetime.now().isoformat()
self.data.append(document)
return document['_id']
def insert_many(self, documents: List[Dict]) -> List[str]:
"""批量插入"""
return [self.insert_one(doc) for doc in documents]
def find(self, query: Dict = None, projection: Dict = None) -> List[Dict]:
"""查询文档"""
if query is None:
return self.data.copy()
results = []
for doc in self.data:
match = True
for key, value in query.items():
if key not in doc or doc[key] != value:
match = False
break
if match:
results.append(doc.copy())
return results
def update_one(self, query: Dict, update: Dict) -> bool:
"""更新文档"""
for doc in self.data:
match = all(doc.get(k) == v for k, v in query.items())
if match:
if '$set' in update:
doc.update(update['$set'])
return True
return False
def delete_one(self, query: Dict) -> bool:
"""删除文档"""
for i, doc in enumerate(self.data):
match = all(doc.get(k) == v for k, v in query.items())
if match:
self.data.pop(i)
return True
return False
def aggregate(self, pipeline: List[Dict]) -> List[Dict]:
"""聚合操作"""
# 简化实现
results = self.data.copy()
for stage in pipeline:
if '$match' in stage:
results = [d for d in results
if all(d.get(k) == v for k, v in stage['$match'].items())]
elif '$group' in stage:
# 简化的分组实现
pass
return results
class MongoDBManager:
"""MongoDB管理器"""
def __init__(self):
self.collections: Dict[str, MockMongoCollection] = {}
def get_collection(self, name: str) -> MockMongoCollection:
"""获取集合"""
if name not in self.collections:
self.collections[name] = MockMongoCollection(name)
return self.collections[name]
def list_collections(self) -> List[str]:
"""列出所有集合"""
return list(self.collections.keys())
# 使用示例
print("=" * 50)
print("🍃 MongoDB示例")
print("=" * 50)
mongo = MongoDBManager()
products = mongo.get_collection('products')
# 插入文档
doc_id = products.insert_one({
'name': '商品A',
'price': 99.9,
'category': '电子产品',
'tags': ['热销', '新品'],
'stock': 100
})
print(f"\n插入文档ID: {doc_id}")
# 批量插入
products.insert_many([
{'name': '商品B', 'price': 199.9, 'category': '电子产品', 'stock': 50},
{'name': '商品C', 'price': 29.9, 'category': '日用品', 'stock': 200},
])
# 查询
results = products.find({'category': '电子产品'})
print(f"\n电子产品类商品:")
for doc in results:
print(f" - {doc['name']}: ¥{doc['price']}")
# 更新
products.update_one(
{'name': '商品A'},
{'$set': {'price': 89.9, 'discount': True}}
)
print("\n更新商品A价格")
# Redis示例(模拟)
print("\n" + "=" * 50)
print("🔴 Redis示例(模拟)")
print("=" * 50)
class MockRedis:
"""模拟Redis客户端"""
def __init__(self):
self.data = {}
self.expiry = {}
def set(self, key: str, value: Any, ex: int = None) -> bool:
"""设置键值"""
self.data[key] = value
if ex:
self.expiry[key] = datetime.now().timestamp() + ex
return True
def get(self, key: str) -> Optional[Any]:
"""获取值"""
if key in self.expiry:
if datetime.now().timestamp() > self.expiry[key]:
del self.data[key]
del self.expiry[key]
return None
return self.data.get(key)
def delete(self, key: str) -> bool:
"""删除键"""
if key in self.data:
del self.data[key]
return True
return False
def incr(self, key: str) -> int:
"""自增"""
self.data[key] = int(self.data.get(key, 0)) + 1
return self.data[key]
def lpush(self, key: str, value: Any) -> int:
"""列表左推"""
if key not in self.data:
self.data[key] = []
self.data[key].insert(0, value)
return len(self.data[key])
def lrange(self, key: str, start: int, end: int) -> List:
"""获取列表范围"""
if key not in self.data:
return []
return self.data[key][start:end+1 if end != -1 else None]
redis = MockRedis()
# 缓存示例
redis.set('user:1001', json.dumps({'name': '张三', 'score': 95}), ex=3600)
user_data = json.loads(redis.get('user:1001'))
print(f"\n缓存用户数据: {user_data}")
# 计数器
redis.set('page_views', 0)
for _ in range(5):
redis.incr('page_views')
print(f"页面访问量: {redis.get('page_views')}")
# 消息队列
redis.lpush('task_queue', 'task1')
redis.lpush('task_queue', 'task2')
print(f"任务队列: {redis.lrange('task_queue', 0, -1)}")
数据备份是保障数据安全的重要措施。合理的备份策略能够在数据丢失时快速恢复,减少损失。备份策略需要考虑备份频率、备份类型(全量/增量)、存储位置、恢复时间目标等因素。
import os
import shutil
import sqlite3
import pandas as pd
import json
import zipfile
from datetime import datetime
from typing import List, Dict, Optional
import hashlib
class BackupManager:
"""数据备份管理器"""
def __init__(self, backup_dir: str = 'backups'):
self.backup_dir = backup_dir
os.makedirs(backup_dir, exist_ok=True)
self.backup_log = []
def _get_timestamp(self) -> str:
"""获取时间戳"""
return datetime.now().strftime('%Y%m%d_%H%M%S')
def _log_backup(self, backup_type: str, source: str,
target: str, size_bytes: int):
"""记录备份日志"""
self.backup_log.append({
'timestamp': datetime.now().isoformat(),
'type': backup_type,
'source': source,
'target': target,
'size_mb': round(size_bytes / 1024 / 1024, 2)
})
# ========== 文件备份 ==========
def backup_file(self, filepath: str) -> str:
"""备份单个文件"""
if not os.path.exists(filepath):
raise FileNotFoundError(f"文件不存在: {filepath}")
timestamp = self._get_timestamp()
filename = os.path.basename(filepath)
name, ext = os.path.splitext(filename)
backup_path = os.path.join(
self.backup_dir, f"{name}_{timestamp}{ext}"
)
shutil.copy2(filepath, backup_path)
size = os.path.getsize(backup_path)
self._log_backup('file', filepath, backup_path, size)
return backup_path
# ========== 数据库备份 ==========
def backup_sqlite(self, db_path: str) -> str:
"""备份SQLite数据库"""
timestamp = self._get_timestamp()
backup_path = os.path.join(
self.backup_dir, f"db_backup_{timestamp}.db"
)
# 使用SQLite备份API
source = sqlite3.connect(db_path)
dest = sqlite3.connect(backup_path)
source.backup(dest)
source.close()
dest.close()
size = os.path.getsize(backup_path)
self._log_backup('sqlite', db_path, backup_path, size)
return backup_path
# ========== DataFrame备份 ==========
def backup_dataframe(self, df: pd.DataFrame,
name: str, formats: List[str] = None) -> Dict[str, str]:
"""备份DataFrame到多种格式"""
formats = formats or ['csv', 'json', 'pkl']
timestamp = self._get_timestamp()
backups = {}
for fmt in formats:
filename = f"{name}_{timestamp}.{fmt}"
filepath = os.path.join(self.backup_dir, filename)
if fmt == 'csv':
df.to_csv(filepath, index=False, encoding='utf-8')
elif fmt == 'json':
df.to_json(filepath, orient='records',
force_ascii=False, indent=2)
elif fmt == 'pkl':
df.to_pickle(filepath)
elif fmt == 'parquet':
df.to_parquet(filepath, index=False)
backups[fmt] = filepath
size = os.path.getsize(filepath)
self._log_backup('dataframe', f'df:{name}', filepath, size)
return backups
# ========== 压缩备份 ==========
def create_archive(self, files: List[str],
archive_name: str = None) -> str:
"""创建压缩归档"""
timestamp = self._get_timestamp()
archive_name = archive_name or f"archive_{timestamp}"
archive_path = os.path.join(self.backup_dir, f"{archive_name}.zip")
with zipfile.ZipFile(archive_path, 'w',
zipfile.ZIP_DEFLATED) as zf:
for file in files:
if os.path.exists(file):
zf.write(file, os.path.basename(file))
size = os.path.getsize(archive_path)
self._log_backup('archive', 'multiple', archive_path, size)
return archive_path
# ========== 增量备份 ==========
def incremental_backup(self, source_dir: str,
last_backup_time: datetime = None) -> str:
"""增量备份(只备份修改过的文件)"""
timestamp = self._get_timestamp()
backup_path = os.path.join(
self.backup_dir, f"incremental_{timestamp}.zip"
)
with zipfile.ZipFile(backup_path, 'w',
zipfile.ZIP_DEFLATED) as zf:
for root, dirs, files in os.walk(source_dir):
for file in files:
filepath = os.path.join(root, file)
mtime = datetime.fromtimestamp(
os.path.getmtime(filepath)
)
if last_backup_time is None or mtime > last_backup_time:
arcname = os.path.relpath(filepath, source_dir)
zf.write(filepath, arcname)
size = os.path.getsize(backup_path)
self._log_backup('incremental', source_dir, backup_path, size)
return backup_path
# ========== 恢复操作 ==========
def restore_file(self, backup_path: str,
restore_path: str) -> bool:
"""从备份恢复文件"""
if backup_path.endswith('.zip'):
with zipfile.ZipFile(backup_path, 'r') as zf:
zf.extractall(restore_path)
else:
shutil.copy2(backup_path, restore_path)
return True
def restore_dataframe(self, backup_path: str) -> pd.DataFrame:
"""从备份恢复DataFrame"""
ext = os.path.splitext(backup_path)[1]
if ext == '.csv':
return pd.read_csv(backup_path)
elif ext == '.json':
return pd.read_json(backup_path)
elif ext == '.pkl':
return pd.read_pickle(backup_path)
elif ext == '.parquet':
return pd.read_parquet(backup_path)
else:
raise ValueError(f"不支持的格式: {ext}")
# ========== 备份管理 ==========
def list_backups(self) -> List[Dict]:
"""列出所有备份"""
backups = []
for filename in os.listdir(self.backup_dir):
filepath = os.path.join(self.backup_dir, filename)
if os.path.isfile(filepath):
stat = os.stat(filepath)
backups.append({
'filename': filename,
'size_mb': round(stat.st_size / 1024 / 1024, 2),
'created': datetime.fromtimestamp(
stat.st_ctime
).isoformat(),
'modified': datetime.fromtimestamp(
stat.st_mtime
).isoformat()
})
return sorted(backups, key=lambda x: x['modified'], reverse=True)
def cleanup_old_backups(self, keep_days: int = 30) -> int:
"""清理旧备份"""
deleted = 0
cutoff = datetime.now().timestamp() - (keep_days * 86400)
for filename in os.listdir(self.backup_dir):
filepath = os.path.join(self.backup_dir, filename)
if os.path.getmtime(filepath) < cutoff:
os.remove(filepath)
deleted += 1
return deleted
def get_backup_report(self) -> str:
"""生成备份报告"""
lines = [
"=" * 50,
"📦 备份报告",
"=" * 50,
f"\n备份目录: {self.backup_dir}",
f"备份次数: {len(self.backup_log)}",
f"\n最近备份:"
]
for log in self.backup_log[-5:]:
lines.append(
f" [{log['timestamp']}] {log['type']}: {log['size_mb']} MB"
)
return "\n".join(lines)
# 使用示例
print("=" * 50)
print("📦 数据备份示例")
print("=" * 50)
backup = BackupManager('backups')
# 备份DataFrame
df = pd.DataFrame({
'id': range(1, 101),
'value': [i * 10 for i in range(1, 101)],
'category': ['A', 'B', 'C'] * 33 + ['A']
})
backups = backup.backup_dataframe(df, 'sample_data', ['csv', 'json', 'parquet'])
print(f"\nDataFrame已备份到:")
for fmt, path in backups.items():
print(f" {fmt}: {path}")
# 列出备份
print(f"\n备份列表:")
for b in backup.list_backups()[:3]:
print(f" {b['filename']}: {b['size_mb']} MB")
print(backup.get_backup_report())
良好的项目规划是数据采集项目成功的关键。项目规划需要从业务需求出发,明确数据采集的目标、范围和预期成果,制定可行的技术方案和时间计划。合理的规划能够有效控制项目风险,确保项目按时高质量交付。
from dataclasses import dataclass, field
from typing import List, Dict, Optional
from datetime import datetime, timedelta
import json
@dataclass
class DataSource:
"""数据源配置"""
name: str
source_type: str # api, crawler, database, file
url: str
fields: List[str]
access_method: str = "public" # public, auth_required, private
rate_limit: int = 100 # 每分钟请求数
priority: int = 1 # 优先级
@dataclass
class ProjectPlan:
"""项目计划"""
name: str
objective: str
data_sources: List[DataSource]
milestones: Dict[str, str]
tech_stack: List[str]
quality_metrics: Dict[str, float]
risks: List[Dict[str, str]] = field(default_factory=list)
def to_dict(self):
return {
'name': self.name,
'objective': self.objective,
'data_sources': [
{
'name': ds.name, 'type': ds.source_type,
'url': ds.url, 'fields': ds.fields
} for ds in self.data_sources
],
'milestones': self.milestones,
'tech_stack': self.tech_stack,
'quality_metrics': self.quality_metrics,
'risks': self.risks
}
class ProjectPlanner:
"""项目规划器"""
def __init__(self):
self.plans = []
def create_plan(self, name: str, objective: str) -> ProjectPlan:
"""创建项目计划"""
return ProjectPlan(
name=name,
objective=objective,
data_sources=[],
milestones={},
tech_stack=['Python 3.x', 'Pandas', 'Requests'],
quality_metrics={
'completeness': 0.95,
'accuracy': 0.98,
'timeliness': 0.90
},
risks=[]
)
def add_data_source(self, plan: ProjectPlan, source: DataSource):
"""添加数据源"""
plan.data_sources.append(source)
def add_milestone(self, plan: ProjectPlan, week: str, task: str):
"""添加里程碑"""
plan.milestones[week] = task
def add_risk(self, plan: ProjectPlan, risk: str, mitigation: str):
"""添加风险"""
plan.risks.append({
'risk': risk,
'mitigation': mitigation,
'level': 'medium'
})
def estimate_effort(self, plan: ProjectPlan) -> Dict:
"""估算工作量"""
base_effort = {
'planning': 2, # 天
'development': 5,
'testing': 2,
'deployment': 1
}
# 根据数据源数量调整
source_factor = len(plan.data_sources) * 0.5
return {
'planning': base_effort['planning'],
'development': base_effort['development'] + source_factor,
'testing': base_effort['testing'],
'deployment': base_effort['deployment'],
'total_days': sum(base_effort.values()) + source_factor
}
def generate_report(self, plan: ProjectPlan) -> str:
"""生成项目报告"""
effort = self.estimate_effort(plan)
report = f"""
{'=' * 60}
📋 项目计划书: {plan.name}
{'=' * 60}
【项目目标】
{plan.objective}
【数据源】({len(plan.data_sources)}个)
"""
for ds in plan.data_sources:
report += f" • {ds.name} ({ds.source_type}): {ds.url}\n"
report += f"""
【里程碑】
"""
for week, task in plan.milestones.items():
report += f" {week}: {task}\n"
report += f"""
【技术栈】
{', '.join(plan.tech_stack)}
【质量标准】
"""
for metric, value in plan.quality_metrics.items():
report += f" • {metric}: {value*100}%\n"
report += f"""
【风险与应对】
"""
for r in plan.risks:
report += f" ⚠️ {r['risk']}\n 应对: {r['mitigation']}\n"
report += f"""
【工作量估算】
总计: {effort['total_days']:.1f} 工作日
"""
return report
# 使用示例
print("=" * 50)
print("📋 项目规划示例")
print("=" * 50)
planner = ProjectPlanner()
# 创建项目计划
plan = planner.create_plan(
name="电商商品价格监控系统",
objective="实时监控电商平台商品价格变化,提供价格趋势分析和预警功能"
)
# 添加数据源
planner.add_data_source(plan, DataSource(
name="商品列表API",
source_type="api",
url="https://api.example.com/products",
fields=["id", "name", "price", "stock", "rating"],
rate_limit=60
))
planner.add_data_source(plan, DataSource(
name="商品详情页",
source_type="crawler",
url="https://www.example.com/product/{id}",
fields=["description", "reviews", "images"],
rate_limit=30
))
# 添加里程碑
planner.add_milestone(plan, "第1周", "需求分析与架构设计")
planner.add_milestone(plan, "第2周", "数据采集模块开发")
planner.add_milestone(plan, "第3周", "数据处理与存储实现")
planner.add_milestone(plan, "第4周", "测试与部署上线")
# 添加风险
planner.add_risk(plan, "API接口变更", "建立接口监控和告警机制")
planner.add_risk(plan, "反爬机制升级", "准备多种采集策略备选")
# 更新技术栈
plan.tech_stack.extend(['SQLite', 'Schedule', 'Matplotlib'])
print(planner.generate_report(plan))
完整的数据采集流程包括需求分析、数据源评估、采集开发、数据处理、质量验证和数据存储等环节。每个环节都需要精心设计和实现,确保整个流程的稳定性和可维护性。模块化设计是构建可扩展数据采集系统的关键。
import requests
from bs4 import BeautifulSoup
import pandas as pd
import sqlite3
import json
import time
import random
import logging
from datetime import datetime
from typing import Dict, List, Optional, Any
from dataclasses import dataclass
from abc import ABC, abstractmethod
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# ========== 数据采集器基类 ==========
class BaseCollector(ABC):
"""数据采集器基类"""
def __init__(self, name: str):
self.name = name
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
})
self.stats = {'total': 0, 'success': 0, 'failed': 0}
@abstractmethod
def collect(self, *args, **kwargs) -> List[Dict]:
"""采集数据(子类实现)"""
pass
def _request(self, url: str, method: str = 'GET',
retries: int = 3, **kwargs) -> Optional[requests.Response]:
"""发送请求(带重试)"""
for attempt in range(retries):
try:
response = self.session.request(method, url, timeout=15, **kwargs)
if response.ok:
return response
logger.warning(f"请求失败 {response.status_code}: {url}")
except Exception as e:
logger.error(f"请求异常 (尝试 {attempt+1}/{retries}): {e}")
time.sleep(2 ** attempt)
return None
# ========== API采集器 ==========
class APICollector(BaseCollector):
"""API数据采集器"""
def __init__(self, name: str, base_url: str, api_key: str = None):
super().__init__(name)
self.base_url = base_url
if api_key:
self.session.headers['Authorization'] = f'Bearer {api_key}'
def collect(self, endpoint: str, params: Dict = None,
paginate: bool = False, page_size: int = 100) -> List[Dict]:
"""采集API数据"""
all_data = []
page = 1
while True:
if params is None:
params = {}
if paginate:
params.update({'page': page, 'page_size': page_size})
url = f"{self.base_url}/{endpoint}"
response = self._request('GET', url, params=params)
if not response:
break
data = response.json()
# 提取数据(根据API结构调整)
if isinstance(data, list):
items = data
elif isinstance(data, dict):
items = data.get('data', data.get('items', [data]))
else:
items = [data]
all_data.extend(items)
self.stats['total'] += len(items)
self.stats['success'] += len(items)
logger.info(f"采集 {url}: {len(items)} 条记录")
# 分页控制
if not paginate or len(items) < page_size:
break
page += 1
time.sleep(random.uniform(0.5, 1.5))
return all_data
# ========== 网页爬虫采集器 ==========
class WebCrawlerCollector(BaseCollector):
"""网页爬虫采集器"""
def __init__(self, name: str, base_url: str):
super().__init__(name)
self.base_url = base_url
def collect(self, paths: List[str], selectors: Dict[str, str],
delay: tuple = (1, 2)) -> List[Dict]:
"""爬取网页数据"""
all_data = []
for path in paths:
url = f"{self.base_url}/{path}"
logger.info(f"爬取: {url}")
response = self._request('GET', url)
if not response:
self.stats['failed'] += 1
continue
soup = BeautifulSoup(response.text, 'html.parser')
items = self._extract_items(soup, selectors)
all_data.extend(items)
self.stats['total'] += len(items)
self.stats['success'] += len(items)
time.sleep(random.uniform(*delay))
return all_data
def _extract_items(self, soup: BeautifulSoup, selectors: Dict) -> List[Dict]:
"""提取数据项"""
items = []
container = selectors.get('container', 'body')
for element in soup.select(container):
item = {}
for field, selector in selectors.get('fields', {}).items():
elem = element.select_one(selector)
if elem:
item[field] = elem.get_text(strip=True)
# 提取属性
attr = selectors.get('attributes', {}).get(field)
if attr:
item[field] = elem.get(attr, item[field])
if item:
items.append(item)
return items
# ========== 数据处理管道 ==========
class DataPipeline:
"""数据处理管道"""
def __init__(self):
self.processors = []
def add_processor(self, processor: callable):
"""添加处理器"""
self.processors.append(processor)
def process(self, data: List[Dict]) -> pd.DataFrame:
"""处理数据"""
df = pd.DataFrame(data)
for processor in self.processors:
df = processor(df)
return df
# ========== 数据存储器 ==========
class DataStorage:
"""数据存储器"""
def __init__(self, db_path: str = 'data.db'):
self.db_path = db_path
def save_to_db(self, df: pd.DataFrame, table: str,
if_exists: str = 'append'):
"""保存到数据库"""
conn = sqlite3.connect(self.db_path)
df.to_sql(table, conn, if_exists=if_exists, index=False)
conn.close()
logger.info(f"保存 {len(df)} 条记录到 {table}")
def save_to_file(self, df: pd.DataFrame, filepath: str):
"""保存到文件"""
ext = filepath.split('.')[-1]
if ext == 'csv':
df.to_csv(filepath, index=False, encoding='utf-8-sig')
elif ext == 'json':
df.to_json(filepath, orient='records',
force_ascii=False, indent=2)
elif ext == 'parquet':
df.to_parquet(filepath, index=False)
logger.info(f"保存数据到 {filepath}")
# ========== 完整采集流程 ==========
class DataCollectionWorkflow:
"""数据采集工作流"""
def __init__(self, name: str):
self.name = name
self.collector = None
self.pipeline = DataPipeline()
self.storage = DataStorage()
self.start_time = None
def set_collector(self, collector: BaseCollector):
"""设置采集器"""
self.collector = collector
def add_processor(self, processor: callable):
"""添加数据处理器"""
self.pipeline.add_processor(processor)
def run(self, output_table: str = 'collected_data', **collect_kwargs):
"""执行采集流程"""
self.start_time = datetime.now()
logger.info(f"开始采集: {self.name}")
# 采集数据
raw_data = self.collector.collect(**collect_kwargs)
if not raw_data:
logger.warning("未采集到数据")
return None
# 处理数据
df = self.pipeline.process(raw_data)
# 添加元数据
df['collected_at'] = datetime.now().isoformat()
df['source'] = self.collector.name
# 存储数据
self.storage.save_to_db(df, output_table)
# 统计信息
duration = (datetime.now() - self.start_time).total_seconds()
stats = {
'records': len(df),
'duration': f"{duration:.2f}s",
'success_rate': f"{self.collector.stats['success']/max(self.collector.stats['total'],1)*100:.1f}%"
}
logger.info(f"采集完成: {stats}")
return df
# 使用示例
print("=" * 50)
print("🔄 数据采集流程示例")
print("=" * 50)
# 创建采集器(使用模拟数据)
collector = APICollector(
name="商品API",
base_url="https://jsonplaceholder.typicode.com"
)
# 创建工作流
workflow = DataCollectionWorkflow("商品数据采集")
# 添加数据处理器
workflow.add_processor(lambda df: df.drop_duplicates())
workflow.add_processor(lambda df: df.dropna(subset=['id']))
# 设置采集器
workflow.set_collector(collector)
# 执行采集(使用公开测试API)
df = workflow.run(
output_table='posts',
endpoint='posts',
params={'_limit': 10}
)
if df is not None:
print(f"\n采集结果预览:")
print(df.head())
数据预处理是确保数据质量的关键步骤。采集的原始数据通常存在各种问题,需要通过系统化的处理流程进行清洗、转换和验证。良好的数据预处理能够显著提高后续分析和应用的效果。
import pandas as pd
import numpy as np
import sqlite3
from datetime import datetime
from typing import Dict, List, Optional, Tuple
import logging
logger = logging.getLogger(__name__)
class DataPreprocessor:
"""数据预处理器"""
def __init__(self, df: pd.DataFrame):
self.df = df.copy()
self.original_shape = df.shape
self.report = {'steps': [], 'issues': []}
def analyze(self) -> Dict:
"""分析数据质量"""
analysis = {
'shape': self.df.shape,
'columns': list(self.df.columns),
'dtypes': self.df.dtypes.astype(str).to_dict(),
'missing': {
'count': self.df.isnull().sum().to_dict(),
'percentage': (self.df.isnull().sum() / len(self.df) * 100).to_dict()
},
'duplicates': self.df.duplicated().sum(),
'unique_values': self.df.nunique().to_dict(),
'sample': self.df.head(3).to_dict()
}
return analysis
def clean_strings(self, columns: List[str] = None) -> 'DataPreprocessor':
"""清洗字符串"""
cols = columns or self.df.select_dtypes(include=['object']).columns
for col in cols:
if col in self.df.columns:
self.df[col] = self.df[col].astype(str).str.strip()
self.report['steps'].append(f"清洗字符串列: {list(cols)}")
return self
def handle_missing(self, strategy: Dict[str, str] = None) -> 'DataPreprocessor':
"""处理缺失值"""
default_strategy = {'numeric': 'mean', 'categorical': 'mode'}
strategy = strategy or default_strategy
for col in self.df.columns:
if self.df[col].isnull().any():
if pd.api.types.is_numeric_dtype(self.df[col]):
if strategy['numeric'] == 'mean':
self.df[col].fillna(self.df[col].mean(), inplace=True)
elif strategy['numeric'] == 'median':
self.df[col].fillna(self.df[col].median(), inplace=True)
else:
mode_val = self.df[col].mode()
if len(mode_val) > 0:
self.df[col].fillna(mode_val[0], inplace=True)
self.report['steps'].append(f"处理缺失值: {strategy}")
return self
def remove_duplicates(self, subset: List[str] = None) -> 'DataPreprocessor':
"""去除重复值"""
before = len(self.df)
self.df = self.df.drop_duplicates(subset=subset)
after = len(self.df)
removed = before - after
if removed > 0:
self.report['issues'].append(f"移除 {removed} 条重复记录")
return self
def detect_outliers(self, column: str, method: str = 'iqr') -> Tuple[pd.DataFrame, Tuple]:
"""检测异常值"""
if method == 'iqr':
Q1 = self.df[column].quantile(0.25)
Q3 = self.df[column].quantile(0.75)
IQR = Q3 - Q1
lower = Q1 - 1.5 * IQR
upper = Q3 + 1.5 * IQR
outliers = self.df[(self.df[column] < lower) | (self.df[column] > upper)]
return outliers, (lower, upper)
return pd.DataFrame(), (None, None)
def handle_outliers(self, column: str, method: str = 'clip') -> 'DataPreprocessor':
"""处理异常值"""
outliers, (lower, upper) = self.detect_outliers(column)
if len(outliers) > 0:
if method == 'clip':
self.df[column] = self.df[column].clip(lower, upper)
elif method == 'remove':
self.df = self.df[~self.df.index.isin(outliers.index)]
self.report['issues'].append(f"处理 {len(outliers)} 个异常值 ({column})")
return self
def convert_types(self, type_map: Dict[str, str]) -> 'DataPreprocessor':
"""转换数据类型"""
for col, dtype in type_map.items():
if col in self.df.columns:
if dtype == 'datetime':
self.df[col] = pd.to_datetime(self.df[col], errors='coerce')
elif dtype == 'numeric':
self.df[col] = pd.to_numeric(self.df[col], errors='coerce')
else:
self.df[col] = self.df[col].astype(dtype)
self.report['steps'].append(f"类型转换: {type_map}")
return self
def derive_features(self, transformations: Dict[str, callable]) -> 'DataPreprocessor':
"""衍生特征"""
for col_name, func in transformations.items():
self.df[col_name] = func(self.df)
self.report['steps'].append(f"衍生特征: {list(transformations.keys())}")
return self
def validate(self, rules: Dict[str, callable]) -> bool:
"""数据验证"""
valid = True
for col, rule in rules.items():
if col in self.df.columns:
invalid = self.df[~self.df[col].apply(rule)]
if len(invalid) > 0:
self.report['issues'].append(f"验证失败 ({col}): {len(invalid)} 条")
valid = False
return valid
def get_processed_data(self) -> pd.DataFrame:
"""获取处理后的数据"""
return self.df
def get_report(self) -> str:
"""获取处理报告"""
report = f"""
{'=' * 50}
📊 数据预处理报告
{'=' * 50}
原始数据: {self.original_shape[0]} 行 x {self.original_shape[1]} 列
处理后数据: {self.df.shape[0]} 行 x {self.df.shape[1]} 列
【处理步骤】
"""
for step in self.report['steps']:
report += f" • {step}\n"
if self.report['issues']:
report += "\n【发现的问题】\n"
for issue in self.report['issues']:
report += f" ⚠️ {issue}\n"
return report
# ========== 数据存储管理 ==========
class DataStorageManager:
"""数据存储管理器"""
def __init__(self, db_path: str = 'project_data.db'):
self.db_path = db_path
self.conn = None
def connect(self):
"""连接数据库"""
self.conn = sqlite3.connect(self.db_path)
return self.conn
def close(self):
"""关闭连接"""
if self.conn:
self.conn.close()
def save_dataframe(self, df: pd.DataFrame, table: str,
if_exists: str = 'replace', index: bool = False):
"""保存DataFrame"""
if self.conn is None:
self.connect()
df.to_sql(table, self.conn, if_exists=if_exists, index=index)
logger.info(f"保存 {len(df)} 条记录到表 {table}")
def load_dataframe(self, table: str, query: str = None) -> pd.DataFrame:
"""加载DataFrame"""
if self.conn is None:
self.connect()
if query:
return pd.read_sql_query(query, self.conn)
return pd.read_sql(f"SELECT * FROM {table}", self.conn)
def execute(self, sql: str):
"""执行SQL"""
if self.conn is None:
self.connect()
self.conn.execute(sql)
self.conn.commit()
def get_tables(self) -> List[str]:
"""获取所有表"""
if self.conn is None:
self.connect()
result = self.conn.execute(
"SELECT name FROM sqlite_master WHERE type='table'"
).fetchall()
return [r[0] for r in result]
# 使用示例
print("=" * 50)
print("📊 数据预处理示例")
print("=" * 50)
# 创建示例数据
df = pd.DataFrame({
'id': [1, 2, 3, 4, 5, 5],
'name': [' 张三 ', '李四', None, '王五', '赵六', '赵六'],
'price': [100, 200, 150, 5000, 180, 180],
'category': ['A', 'B', 'A', 'B', 'A', 'A'],
'date': ['2024-01-01', '2024-01-02', '2024-01-03', '2024-01-04', '2024-01-05', '2024-01-05']
})
print("\n原始数据:")
print(df)
# 预处理
preprocessor = DataPreprocessor(df)
preprocessor.clean_strings() \
.handle_missing() \
.remove_duplicates(subset=['id']) \
.convert_types({'date': 'datetime', 'price': 'numeric'}) \
.handle_outliers('price', method='clip') \
.derive_features({
'year': lambda df: df['date'].dt.year,
'month': lambda df: df['date'].dt.month
})
print(preprocessor.get_report())
processed_df = preprocessor.get_processed_data()
print("\n处理后数据:")
print(processed_df)
项目展示是展示项目成果和技术能力的重要环节。一个好的项目展示需要清晰地呈现项目背景、技术方案、实现过程和最终成果。同时,需要准备好应对答辩问题的策略,展示对项目的深入理解。
import pandas as pd
import numpy as np
from datetime import datetime
from typing import Dict, List
import json
class ProjectReporter:
"""项目报告生成器"""
def __init__(self, project_name: str):
self.project_name = project_name
self.sections = {}
self.metrics = {}
self.charts_data = []
def add_section(self, title: str, content: str):
"""添加章节"""
self.sections[title] = content
def add_metric(self, name: str, value: any, description: str = ""):
"""添加指标"""
self.metrics[name] = {
'value': value,
'description': description
}
def add_chart_data(self, chart_type: str, data: Dict, title: str):
"""添加图表数据"""
self.charts_data.append({
'type': chart_type,
'data': data,
'title': title
})
def generate_summary(self) -> str:
"""生成项目摘要"""
summary = f"""
{'=' * 60}
📄 {self.project_name} - 项目报告
{'=' * 60}
生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
【项目概述】
"""
for title, content in self.sections.items():
summary += f"\n{title}\n{content}\n"
summary += "\n【核心指标】\n"
for name, info in self.metrics.items():
summary += f" • {name}: {info['value']}"
if info['description']:
summary += f" ({info['description']})"
summary += "\n"
return summary
def generate_technical_report(self) -> Dict:
"""生成技术报告(JSON格式)"""
return {
'project_name': self.project_name,
'generated_at': datetime.now().isoformat(),
'sections': self.sections,
'metrics': self.metrics,
'charts': self.charts_data
}
def generate_presentation_outline(self) -> str:
"""生成演示大纲"""
outline = f"""
{'=' * 50}
📽️ 项目演示大纲
{'=' * 50}
1. 开场(2分钟)
- 项目背景介绍
- 核心问题陈述
2. 技术方案(5分钟)
- 系统架构设计
- 关键技术选型
- 核心代码演示
3. 实施过程(5分钟)
- 开发流程介绍
- 遇到的挑战
- 解决方案展示
4. 成果展示(5分钟)
- 数据统计结果
- 可视化图表
- 性能指标
5. 总结与展望(3分钟)
- 项目收获
- 不足与改进
- 未来方向
【常见答辩问题】
"""
questions = [
"为什么选择这个技术方案?",
"数据采集过程中遇到的最大挑战是什么?",
"如何保证数据质量?",
"系统如何扩展到更大规模?",
"项目的实际应用价值是什么?"
]
for i, q in enumerate(questions, 1):
outline += f" Q{i}: {q}\n"
return outline
# ========== 可视化示例 ==========
def create_project_visualizations():
"""创建项目可视化图表"""
import matplotlib.pyplot as plt
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
# 1. 数据采集统计
ax1 = axes[0, 0]
stages = ['采集总数', '有效数据', '清洗后', '最终存储']
values = [10000, 9500, 9000, 8800]
colors = ['#3498db', '#2ecc71', '#f39c12', '#9b59b6']
bars = ax1.bar(stages, values, color=colors)
ax1.set_title('数据采集流程统计', fontsize=14, fontweight='bold')
ax1.set_ylabel('记录数')
for bar, val in zip(bars, values):
ax1.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 100,
f'{val:,}', ha='center', va='bottom')
# 2. 数据质量分布
ax2 = axes[0, 1]
quality_labels = ['完整数据', '部分缺失', '异常数据', '重复数据']
quality_values = [85, 8, 4, 3]
ax2.pie(quality_values, labels=quality_labels, autopct='%1.1f%%',
colors=['#2ecc71', '#f1c40f', '#e74c3c', '#95a5a6'])
ax2.set_title('数据质量分布', fontsize=14, fontweight='bold')
# 3. 采集时间趋势
ax3 = axes[1, 0]
dates = pd.date_range('2024-01-01', periods=30, freq='D')
daily_records = np.random.randint(300, 500, 30)
ax3.plot(dates, daily_records, marker='o', linewidth=2, color='#3498db')
ax3.fill_between(dates, daily_records, alpha=0.3)
ax3.set_title('每日数据采集量趋势', fontsize=14, fontweight='bold')
ax3.set_xlabel('日期')
ax3.set_ylabel('采集记录数')
ax3.tick_params(axis='x', rotation=45)
# 4. 技术栈使用占比
ax4 = axes[1, 1]
tech_stack = ['Requests', 'BeautifulSoup', 'Pandas', 'SQLite', '其他']
usage = [25, 20, 30, 15, 10]
bars = ax4.barh(tech_stack, usage, color='#9b59b6')
ax4.set_title('技术栈使用占比 (%)', fontsize=14, fontweight='bold')
ax4.set_xlabel('占比 (%)')
for bar, val in zip(bars, usage):
ax4.text(bar.get_width() + 0.5, bar.get_y() + bar.get_height()/2,
f'{val}%', ha='left', va='center')
plt.tight_layout()
return fig
# 使用示例
print("=" * 50)
print("📄 项目报告生成示例")
print("=" * 50)
reporter = ProjectReporter("电商商品价格监控系统")
# 添加章节
reporter.add_section("项目背景",
"随着电商行业的快速发展,商品价格波动频繁。"
"本项目旨在建立一个自动化的商品价格监控系统,"
"帮助消费者和商家及时了解价格变化趋势。")
reporter.add_section("技术方案",
"采用Python开发,使用Requests进行数据采集,"
"BeautifulSoup解析HTML,Pandas处理数据,"
"SQLite存储数据,Matplotlib可视化展示。")
# 添加指标
reporter.add_metric("数据采集量", "10,000+ 条", "累计采集商品数据")
reporter.add_metric("数据有效率", "95%", "有效数据占比")
reporter.add_metric("系统稳定性", "99.5%", "采集成功率")
reporter.add_metric("处理速度", "500条/分钟", "数据处理效率")
# 生成报告
print(reporter.generate_summary())
print(reporter.generate_presentation_outline())
下面是一个完整的新闻数据采集系统示例,整合了本课程所学的所有知识点,包括数据采集、预处理、存储和可视化展示。这个项目可以作为课程综合实践的参考模板。
"""
新闻数据采集系统 - 完整项目示例
功能:采集新闻网站数据,进行数据分析和可视化展示
"""
import requests
from bs4 import BeautifulSoup
import pandas as pd
import sqlite3
import json
import time
import random
import logging
import re
from datetime import datetime, timedelta
from typing import Dict, List, Optional
from dataclasses import dataclass
from collections import Counter
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('news_collector.log', encoding='utf-8'),
logging.StreamHandler()
]
)
logger = logging.getLogger('NewsCollector')
@dataclass
class NewsArticle:
"""新闻文章数据结构"""
title: str
content: str
author: str
publish_time: str
source: str
category: str
url: str
collected_at: str
class NewsCollector:
"""新闻数据采集器"""
def __init__(self, sources: Dict[str, str]):
"""
初始化采集器
sources: 数据源配置 {名称: 基础URL}
"""
self.sources = sources
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
})
self.articles = []
self.stats = {'total': 0, 'success': 0, 'failed': 0}
def fetch_page(self, url: str, retries: int = 3) -> Optional[str]:
"""获取页面内容"""
for attempt in range(retries):
try:
response = self.session.get(url, timeout=15)
if response.ok:
response.encoding = response.apparent_encoding
return response.text
except Exception as e:
logger.warning(f"请求失败 (尝试 {attempt+1}/{retries}): {e}")
time.sleep(2 ** attempt)
return None
def parse_article(self, html: str, url: str, source: str) -> Optional[NewsArticle]:
"""解析文章内容"""
soup = BeautifulSoup(html, 'html.parser')
try:
# 提取标题(多种选择器尝试)
title = None
for selector in ['h1.title', 'h1', '.article-title', 'title']:
elem = soup.select_one(selector)
if elem:
title = elem.get_text(strip=True)
break
if not title:
return None
# 提取内容
content = ''
for selector in ['.article-content', '.content', 'article', '.post-content']:
elem = soup.select_one(selector)
if elem:
paragraphs = elem.find_all('p')
content = '\n'.join([p.get_text(strip=True) for p in paragraphs])
break
# 提取作者
author = '未知'
for selector in ['.author', '.article-author', '[rel="author"]']:
elem = soup.select_one(selector)
if elem:
author = elem.get_text(strip=True)
break
# 提取发布时间
publish_time = datetime.now().strftime('%Y-%m-%d')
for selector in ['.publish-time', 'time', '.date']:
elem = soup.select_one(selector)
if elem:
time_text = elem.get_text(strip=True)
# 尝试解析时间
if time_text:
publish_time = time_text
break
return NewsArticle(
title=title,
content=content[:500] if content else '', # 限制内容长度
author=author,
publish_time=publish_time,
source=source,
category=self._guess_category(title),
url=url,
collected_at=datetime.now().isoformat()
)
except Exception as e:
logger.error(f"解析文章失败: {e}")
return None
def _guess_category(self, title: str) -> str:
"""根据标题猜测分类"""
category_keywords = {
'科技': ['科技', '互联网', 'AI', '人工智能', '手机', '电脑'],
'财经': ['股市', '经济', '金融', '投资', '股票'],
'体育': ['足球', '篮球', '比赛', '奥运', '体育'],
'娱乐': ['明星', '电影', '音乐', '综艺', '娱乐'],
'国际': ['国际', '美国', '中国', '世界', '外交']
}
for category, keywords in category_keywords.items():
if any(kw in title for kw in keywords):
return category
return '综合'
def collect_from_source(self, source_name: str, base_url: str,
max_articles: int = 10):
"""从指定数据源采集"""
logger.info(f"开始采集: {source_name}")
# 获取列表页
list_html = self.fetch_page(base_url)
if not list_html:
logger.error(f"无法获取列表页: {base_url}")
return
soup = BeautifulSoup(list_html, 'html.parser')
# 提取文章链接
article_urls = []
for a in soup.find_all('a', href=True):
href = a['href']
if href.startswith('http'):
article_urls.append(href)
elif href.startswith('/'):
article_urls.append(base_url.rstrip('/') + href)
# 去重并限制数量
article_urls = list(set(article_urls))[:max_articles]
# 采集每篇文章
for url in article_urls:
self.stats['total'] += 1
html = self.fetch_page(url)
if html:
article = self.parse_article(html, url, source_name)
if article:
self.articles.append(article)
self.stats['success'] += 1
logger.info(f"采集成功: {article.title[:30]}...")
else:
self.stats['failed'] += 1
else:
self.stats['failed'] += 1
# 随机延迟
time.sleep(random.uniform(0.5, 1.5))
def collect_all(self, max_per_source: int = 10):
"""采集所有数据源"""
start_time = datetime.now()
for source_name, base_url in self.sources.items():
self.collect_from_source(source_name, base_url, max_per_source)
duration = (datetime.now() - start_time).total_seconds()
logger.info(f"采集完成: 共 {len(self.articles)} 篇文章, 耗时 {duration:.1f}秒")
def get_dataframe(self) -> pd.DataFrame:
"""转换为DataFrame"""
return pd.DataFrame([vars(a) for a in self.articles])
class NewsDatabase:
"""新闻数据库管理"""
def __init__(self, db_path: str = 'news_data.db'):
self.db_path = db_path
self._init_db()
def _init_db(self):
"""初始化数据库"""
conn = sqlite3.connect(self.db_path)
conn.execute('''
CREATE TABLE IF NOT EXISTS articles (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
content TEXT,
author TEXT,
publish_time TEXT,
source TEXT,
category TEXT,
url TEXT UNIQUE,
collected_at TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
conn.execute('CREATE INDEX IF NOT EXISTS idx_category ON articles(category)')
conn.execute('CREATE INDEX IF NOT EXISTS idx_source ON articles(source)')
conn.commit()
conn.close()
def save_articles(self, df: pd.DataFrame):
"""保存文章"""
conn = sqlite3.connect(self.db_path)
for _, row in df.iterrows():
try:
conn.execute('''
INSERT OR REPLACE INTO articles
(title, content, author, publish_time, source, category, url, collected_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', (
row['title'], row['content'], row['author'],
row['publish_time'], row['source'], row['category'],
row['url'], row['collected_at']
))
except Exception as e:
logger.error(f"保存文章失败: {e}")
conn.commit()
conn.close()
logger.info(f"保存 {len(df)} 篇文章到数据库")
def get_statistics(self) -> Dict:
"""获取统计信息"""
conn = sqlite3.connect(self.db_path)
stats = {
'total': conn.execute('SELECT COUNT(*) FROM articles').fetchone()[0],
'by_category': dict(conn.execute(
'SELECT category, COUNT(*) FROM articles GROUP BY category'
).fetchall()),
'by_source': dict(conn.execute(
'SELECT source, COUNT(*) FROM articles GROUP BY source'
).fetchall()),
'latest': conn.execute(
'SELECT title, source, collected_at FROM articles ORDER BY id DESC LIMIT 5'
).fetchall()
}
conn.close()
return stats
class NewsAnalyzer:
"""新闻数据分析器"""
def __init__(self, df: pd.DataFrame):
self.df = df
def word_frequency(self, column: str = 'title', top_n: int = 20) -> Dict[str, int]:
"""词频统计"""
import jieba
all_words = []
for text in self.df[column]:
if text:
words = jieba.cut(text)
all_words.extend([w for w in words if len(w) > 1])
return dict(Counter(all_words).most_common(top_n))
def category_distribution(self) -> Dict[str, int]:
"""分类分布"""
return self.df['category'].value_counts().to_dict()
def source_distribution(self) -> Dict[str, int]:
"""来源分布"""
return self.df['source'].value_counts().to_dict()
def daily_trend(self) -> Dict[str, int]:
"""每日采集趋势"""
self.df['date'] = pd.to_datetime(self.df['collected_at']).dt.date
return self.df.groupby('date').size().to_dict()
def generate_report(self) -> str:
"""生成分析报告"""
report = f"""
{'=' * 60}
📊 新闻数据分析报告
{'=' * 60}
【数据概览】
总文章数: {len(self.df)}
数据源数: {self.df['source'].nunique()}
分类数: {self.df['category'].nunique()}
【分类分布】
"""
for cat, count in self.category_distribution().items():
report += f" • {cat}: {count} 篇\n"
report += "\n【来源分布】\n"
for source, count in self.source_distribution().items():
report += f" • {source}: {count} 篇\n"
report += "\n【高频词汇】\n"
for word, count in self.word_frequency().items():
report += f" • {word}: {count} 次\n"
return report
# ========== 主程序 ==========
def main():
"""主程序入口"""
print("=" * 60)
print("📰 新闻数据采集系统")
print("=" * 60)
# 配置数据源(使用公开测试网站)
sources = {
'示例新闻': 'https://news.example.com',
# 实际使用时替换为真实新闻网站
}
# 创建采集器
collector = NewsCollector(sources)
# 执行采集
# collector.collect_all(max_per_source=10)
# 模拟数据(演示用)
mock_articles = [
NewsArticle(
title='人工智能技术在医疗领域取得重大突破',
content='近日,AI技术在医疗诊断领域取得重要进展...',
author='张记者',
publish_time='2024-01-15',
source='科技日报',
category='科技',
url='https://example.com/1',
collected_at=datetime.now().isoformat()
),
NewsArticle(
title='股市今日大涨,科技板块领涨',
content='今日A股市场表现强劲...',
author='李记者',
publish_time='2024-01-15',
source='财经日报',
category='财经',
url='https://example.com/2',
collected_at=datetime.now().isoformat()
)
]
collector.articles = mock_articles
# 获取数据
df = collector.get_dataframe()
print(f"\n采集数据预览:")
print(df[['title', 'category', 'source']].head())
# 保存到数据库
db = NewsDatabase('news_data.db')
db.save_articles(df)
# 数据分析
analyzer = NewsAnalyzer(df)
print(analyzer.generate_report())
# 输出统计
print(f"\n采集统计: {collector.stats}")
if __name__ == '__main__':
main()