The following are common Python methods for data analysts to load data from data source. I summarized and recorded for reference and copy&paste.
1. CSV
# Import the pandas library - the most popular data analysis library in Python
# 导入pandas库 - Python中最流行的数据分析库
import pandas as pd
# Load CSV file into a DataFrame (table-like structure)
# 将CSV文件加载到DataFrame(表格结构)中
df = pd.read_csv('data.csv')
# Display the first 5 rows to verify data was loaded correctly
# 显示前5行数据以验证数据是否正确加载
print("First 5 rows of the dataset:")
print(df.head())
# Get basic information about the dataset (data types, non-null values)
# 获取数据集的基本信息(数据类型、非空值)
print("\nDataset information:")
print(df.info())
# Get statistical summary of numerical columns
# 获取数值列的统计摘要
print("\nStatistical summary:")
print(df.describe())
2. Excel
# Import pandas for data manipulation
# 导入pandas进行数据处理
import pandas as pd
# Load Excel file - specify sheet_name to select specific worksheet
# 加载Excel文件 - 指定sheet_name来选择特定的工作表
df = pd.read_excel('data.xlsx', sheet_name='Sheet1')
# Alternative: Load all sheets from Excel file into a dictionary of DataFrames
# 替代方法:将Excel文件中的所有工作表加载到DataFrame字典中
all_sheets = pd.read_excel('data.xlsx', sheet_name=None)
# Access individual sheet from the dictionary
# 从字典中访问单个工作表
sheet1_data = all_sheets['Sheet1']
print("Excel data successfully loaded!")
print(f"Number of rows: {len(df)}")
print(f"Number of columns: {len(df.columns)}")
print(f"Column names: {df.columns.tolist()}")
3. JSON file
# Import pandas and json libraries
# 导入pandas和json库
import pandas as pd
import json
# Method 1: Using pandas to directly read JSON
# 方法1:使用pandas直接读取JSON
df = pd.read_json('data.json')
print("JSON data loaded using pandas:")
print(df.head())
# Method 2: Using json module for more complex JSON structures
# 方法2:使用json模块处理更复杂的JSON结构
with open('complex_data.json', 'r') as file:
json_data = json.load(file)
# Convert nested JSON to DataFrame (if needed)
# 将嵌套的JSON转换为DataFrame(如果需要)
# Example for nested data - adjust based on your JSON structure
# 嵌套数据的示例 - 根据您的JSON结构进行调整
if isinstance(json_data, list):
df_nested = pd.json_normalize(json_data)
print("\nNested JSON data converted to DataFrame:")
print(df_nested.head())
4. SQL
# Import required libraries
# 导入所需的库
import pandas as pd
import mysql.connector
from sqlalchemy import create_engine
import pymysql
# Method 1: Using SQLAlchemy (Recommended Method)
# 方法1:使用SQLAlchemy(推荐方法)
def load_data_from_mysql_sqlalchemy():
"""
Advanced method to load data from MySQL using SQLAlchemy
使用SQLAlchemy从MySQL加载数据的高级方法
"""
try:
# Create SQLAlchemy engine
# 创建SQLAlchemy引擎
# Format: mysql+pymysql://username:password@host:port/database
engine = create_engine(
'mysql+pymysql://your_username:your_password@localhost:3306/your_database'
)
# Method 1.1: Load data using SQL query
# 方法1.1:使用SQL查询加载数据
query = """
SELECT
o.order_id,
o.order_date,
c.customer_name,
p.product_name,
od.quantity,
od.unit_price
FROM orders o
JOIN customers c ON o.customer_id = c.customer_id
JOIN order_details od ON o.order_id = od.order_id
JOIN products p ON od.product_id = p.product_id
WHERE o.order_date >= '2023-01-01'
"""
df_query = pd.read_sql(query, engine)
print("\nData loaded using SQL query:")
print(df_query.head())
# Method 1.2: Load entire table
# 方法1.2:加载整个表
df_table = pd.read_sql_table('customers', engine)
print("\nEntire customers table loaded:")
print(df_table.head())
# Method 1.3: Load data in chunks (for large datasets)
# 方法1.3:分块加载数据(适用于大型数据集)
chunk_size = 1000
chunks = pd.read_sql(query, engine, chunksize=chunk_size)
# Process chunks
# 处理数据块
for chunk_number, chunk in enumerate(chunks):
print(f"\nProcessing chunk {chunk_number + 1}:")
print(f"Number of rows: {len(chunk)}")
# Add your data processing logic here
# 在这里添加数据处理逻辑
return df_query
except Exception as e:
print(f"Error: {e}")
return None
# Method 2: Using mysql-connector-python (Basic Method)
# 方法2:使用mysql-connector-python(基础方法)
def load_data_from_mysql_basic():
"""
Basic method to load data from MySQL using mysql-connector-python
使用mysql-connector-python从MySQL加载数据的基础方法
"""
try:
# Establish connection to MySQL database
# 建立与MySQL数据库的连接
connection = mysql.connector.connect(
host='localhost', # Database host
user='your_username', # Your MySQL username
password='your_password', # Your MySQL password
database='your_database' # Your database name
)
# Create cursor object
# 创建游标对象
cursor = connection.cursor()
# Execute SQL query
# 执行SQL查询
query = """
SELECT
customer_id,
customer_name,
email,
created_at
FROM customers
WHERE created_at >= '2023-01-01'
LIMIT 1000
"""
cursor.execute(query)
# Fetch all results
# 获取所有结果
results = cursor.fetchall()
# Convert to DataFrame
# 转换为DataFrame
df = pd.DataFrame(results, columns=[
'customer_id',
'customer_name',
'email',
'created_at'
])
print("Data loaded successfully using mysql-connector-python:")
print(df.head())
return df
except mysql.connector.Error as err:
print(f"Error: {err}")
return None
finally:
# Close cursor and connection
# 关闭游标和连接
if 'cursor' in locals():
cursor.close()
if 'connection' in locals():
connection.close()
# Method 3: Using PyMySQL (Alternative Method)
# 方法3:使用PyMySQL(替代方法)
def load_data_from_mysql_pymysql():
"""
Alternative method to load data from MySQL using PyMySQL
使用PyMySQL从MySQL加载数据的替代方法
"""
try:
# Establish connection
# 建立连接
connection = pymysql.connect(
host='localhost',
user='your_username',
password='your_password',
database='your_database',
charset='utf8mb4'
)
# Load data using pandas
# 使用pandas加载数据
query = """
SELECT
product_id,
product_name,
category,
price,
stock_quantity
FROM products
WHERE category = 'Electronics'
"""
df = pd.read_sql(query, connection)
print("\nData loaded using PyMySQL:")
print(df.head())
return df
except Exception as e:
print(f"Error: {e}")
return None
finally:
if 'connection' in locals():
connection.close()
# Example usage with error handling and connection pooling
# 使用错误处理和连接池的示例
def load_data_with_connection_pool():
"""
Advanced example with connection pooling and comprehensive error handling
带有连接池和全面错误处理的高级示例
"""
from sqlalchemy.pool import QueuePool
try:
# Create engine with connection pooling
# 创建带有连接池的引擎
engine = create_engine(
'mysql+pymysql://your_username:your_password@localhost:3306/your_database',
poolclass=QueuePool,
pool_size=5, # Number of connections to keep open
max_overflow=10, # Maximum number of connections to create
pool_timeout=30, # Seconds to wait before giving up on getting a connection
pool_recycle=1800 # Recycle connections after 30 minutes
)
# Test connection
# 测试连接
with engine.connect() as connection:
result = connection.execute("SELECT 1")
print("Database connection successful!")
# Load data with retry logic
# 使用重试逻辑加载数据
max_retries = 3
retry_count = 0
while retry_count < max_retries:
try:
query = """
SELECT
t.transaction_id,
t.transaction_date,
c.customer_name,
SUM(od.quantity * od.unit_price) as total_amount
FROM transactions t
JOIN customers c ON t.customer_id = c.customer_id
JOIN order_details od ON t.order_id = od.order_id
GROUP BY t.transaction_id, t.transaction_date, c.customer_name
ORDER BY t.transaction_date DESC
LIMIT 1000
"""
df = pd.read_sql(query, engine)
print("\nTransaction data loaded successfully:")
print(df.head())
return df
except Exception as e:
retry_count += 1
if retry_count == max_retries:
print(f"Failed to load data after {max_retries} attempts: {e}")
return None
print(f"Attempt {retry_count} failed, retrying...")
time.sleep(1) # Wait before retrying
except Exception as e:
print(f"Error setting up connection pool: {e}")
return None
# Usage example
# 使用示例
if __name__ == "__main__":
# Load data using different methods
# 使用不同方法加载数据
print("\nLoading data using SQLAlchemy...")
df_sqlalchemy = load_data_from_mysql_sqlalchemy()
print("Loading data using basic method...")
df_basic = load_data_from_mysql_basic()
print("\nLoading data using PyMySQL...")
df_pymysql = load_data_from_mysql_pymysql()
print("\nLoading data with connection pooling...")
df_pool = load_data_with_connection_pool()
5. Web Scraping
# Import required libraries
# 导入所需的库
import pandas as pd
import requests
from bs4 import BeautifulSoup
# Method 1: Using pandas to read HTML tables directly
# 方法1:使用pandas直接读取HTML表格
url = 'https://en.wikipedia.org/wiki/List_of_countries_by_population'
tables = pd.read_html(url)
# Select the table of interest (usually there are multiple tables)
# 选择感兴趣的表格(通常有多个表格)
population_table = tables[0] # First table on the page
print("Table loaded from Wikipedia:")
print(population_table.head())
# Method 2: Using BeautifulSoup for more complex web scraping
# 方法2:使用BeautifulSoup进行更复杂的网页抓取
response = requests.get('https://example.com/data')
soup = BeautifulSoup(response.text, 'html.parser')
# Example: Extract all links from a page
# 示例:从页面提取所有链接
links = []
for anchor in soup.find_all('a'):
links.append({
'text': anchor.text,
'href': anchor.get('href')
})
links_df = pd.DataFrame(links)
print("\nLinks extracted from webpage:")
print(links_df.head())
6. API
# Import required libraries
# 导入所需的库
import pandas as pd
import requests
import json
# Make API request to fetch data
# 发送API请求获取数据
api_url = 'https://api.example.com/data'
response = requests.get(api_url)
# Check if request was successful
# 检查请求是否成功
if response.status_code == 200:
# Parse JSON response
# 解析JSON响应
data = response.json()
# Convert to DataFrame
# 转换为DataFrame
df = pd.DataFrame(data)
print("Data loaded from API:")
print(df.head())
else:
print(f"Error fetching data: {response.status_code}")
# Example with authentication and parameters
# 带有认证和参数的示例
def get_api_data_with_auth(api_url, api_key, parameters):
"""
Fetch data from an API requiring authentication
从需要认证的API获取数据
Parameters:
- api_url: The URL of the API endpoint
- api_key: Your API key for authentication
- parameters: Dictionary of query parameters
Returns:
- DataFrame containing the API response data
"""
headers = {
'Authorization': f'Bearer {api_key}',
'Content-Type': 'application/json'
}
response = requests.get(api_url, headers=headers, params=parameters)
if response.status_code == 200:
return pd.DataFrame(response.json())
else:
print(f"Error: {response.status_code}")
print(response.text)
return None
# Example usage:
# 示例用法:
# df = get_api_data_with_auth(
# 'https://api.example.com/data',
# 'your_api_key_here',
# {'limit': 100, 'category': 'sales'}
# )
7. Large CSV (processing in chunks)
# Import required libraries
# 导入所需的库
import pandas as pd
# For large files, use chunksize parameter to process in chunks
# 对于大文件,使用chunksize参数分块处理
chunk_size = 10000 # Adjust based on your system's memory
chunks = pd.read_csv('large_data.csv', chunksize=chunk_size)
# Process each chunk separately
# 分别处理每个数据块
results = []
for chunk_number, chunk in enumerate(chunks):
# Example: Calculate average of a column in each chunk
# 示例:计算每个数据块中某列的平均值
avg = chunk['value_column'].mean()
results.append({
'chunk_number': chunk_number,
'avg_value': avg,
'chunk_size': len(chunk)
})
# Optionally, print progress
# 可选:打印进度
print(f"Processed chunk {chunk_number}, average value: {avg}")
# Combine results
# 合并结果
results_df = pd.DataFrame(results)
print("\nSummary of chunk processing:")
print(results_df)
8. zip
# Import pandas
# 导入pandas
import pandas as pd
# Read from compressed files directly (no need to decompress)
# 直接从压缩文件读取(无需解压)
# From ZIP file
# 从ZIP文件
df_zip = pd.read_csv('data.zip')
# From GZ file
# 从GZ文件
df_gz = pd.read_csv('data.csv.gz')
# From BZ2 file
# 从BZ2文件
df_bz2 = pd.read_csv('data.csv.bz2')
print("Data loaded from compressed file successfully!")
print(f"Number of rows: {len(df_zip)}")
9. CSV in different formats
# Import pandas
# 导入pandas
import pandas as pd
# Handle different CSV formats and issues
# 处理不同的CSV格式和问题
# Custom delimiter (e.g., semicolon or tab)
# 自定义分隔符(例如,分号或制表符)
df_semicolon = pd.read_csv('european_data.csv', sep=';')
df_tab = pd.read_csv('tab_data.tsv', sep='\t')
# Specify encoding for special characters
# 为特殊字符指定编码
df_encoded = pd.read_csv('international_data.csv', encoding='utf-8')
# For older Windows files
# 对于较旧的Windows文件
df_latin = pd.read_csv('old_windows_file.csv', encoding='latin-1')
# Handle files with no header
# 处理没有标题的文件
df_no_header = pd.read_csv('no_header.csv', header=None,
names=['column1', 'column2', 'column3'])
# Skip rows (e.g., comments or metadata at top of file)
# 跳过行(例如,文件顶部的注释或元数据)
df_skip = pd.read_csv('data_with_notes.csv', skiprows=3)
# Handle missing values
# 处理缺失值
df_missing = pd.read_csv('data_with_blanks.csv', na_values=['N/A', 'missing'])
print("All CSV variations loaded successfully!")