下载地址:https://www.pan38.com/yun/share.php?code=JCnzE 提取密码:1199
这个淘宝店铺采集工具包含三个主要模块:主爬虫程序、代理管理模块和数据分析模块。主程序实现了店铺搜索、详情采集和数据存储功能,使用多线程提高采集效率。代理模块负责管理IP代理池,数据分析模块提供基础统计和可视化功能。使用时需要安装requests、beautifulsoup4、pandas等依赖库。
import requests
import json
import time
import random
from bs4 import BeautifulSoup
import csv
from fake_useragent import UserAgent
import re
from urllib.parse import urlencode
import os
from concurrent.futures import ThreadPoolExecutor
class TaobaoShopCrawler:
def __init__(self):
self.ua = UserAgent()
self.session = requests.Session()
self.headers = {
'User-Agent': self.ua.random,
'Accept': 'text/html,application/xhtml+xml',
'Accept-Language': 'zh-CN,zh;q=0.9',
'Referer': 'https://www.taobao.com/'
}
self.proxies = None
self.max_retries = 3
self.timeout = 10
self.output_dir = 'taobao_shops'
os.makedirs(self.output_dir, exist_ok=True)
def get_random_delay(self):
return random.uniform(1, 3)
def make_request(self, url, params=None):
retries = 0
while retries < self.max_retries:
try:
time.sleep(self.get_random_delay())
response = self.session.get(
url,
headers=self.headers,
params=params,
timeout=self.timeout,
proxies=self.proxies
)
response.raise_for_status()
return response
except Exception as e:
retries += 1
print(f"请求失败: {e}, 重试 {retries}/{self.max_retries}")
time.sleep(2)
return None
def parse_shop_info(self, html):
soup = BeautifulSoup(html, 'html.parser')
shop_info = {}
# 解析店铺基本信息
shop_name = soup.find('div', class_='shop-name').text.strip()
shop_info['name'] = shop_name
# 解析店铺评分
rating_items = soup.find_all('div', class_='shop-rate-item')
for item in rating_items:
label = item.find('span', class_='label').text.strip()
value = item.find('span', class_='value').text.strip()
shop_info[f'rating_{label}'] = value
# 解析店铺地址
location = soup.find('div', class_='shop-location').text.strip()
shop_info['location'] = location
# 解析开店时间
open_time = soup.find('div', class_='shop-open-time').text.strip()
shop_info['open_time'] = open_time
return shop_info
def search_shops(self, keyword, page=1):
base_url = 'https://s.taobao.com/search'
params = {
'q': keyword,
'tab': 'shop',
'page': page,
'app': 'shopsearch'
}
response = self.make_request(base_url, params)
if not response:
return []
soup = BeautifulSoup(response.text, 'html.parser')
shop_list = []
shop_items = soup.find_all('div', class_='shop-item')
for item in shop_items:
shop_link = item.find('a', class_='shop-link')['href']
shop_id = re.search(r'shop_id=(\d+)', shop_link).group(1)
shop_name = item.find('div', class_='shop-name').text.strip()
shop_list.append({
'id': shop_id,
'name': shop_name,
'url': f'https:{shop_link}'
})
return shop_list
def get_shop_details(self, shop_id):
url = f'https://shop{shop_id}.taobao.com'
response = self.make_request(url)
if not response:
return None
return self.parse_shop_info(response.text)
def save_to_csv(self, data, filename):
filepath = os.path.join(self.output_dir, filename)
with open(filepath, 'w', newline='', encoding='utf-8-sig') as f:
writer = csv.DictWriter(f, fieldnames=data[0].keys())
writer.writeheader()
writer.writerows(data)
def crawl_shops_by_keyword(self, keyword, max_pages=5):
all_shops = []
for page in range(1, max_pages + 1):
print(f"正在采集第 {page} 页...")
shops = self.search_shops(keyword, page)
if not shops:
break
with ThreadPoolExecutor(max_workers=5) as executor:
results = executor.map(
lambda shop: {**shop, **self.get_shop_details(shop['id'])},
shops
)
all_shops.extend(list(results))
filename = f'taobao_shops_{keyword}_{time.strftime("%Y%m%d")}.csv'
self.save_to_csv(all_shops, filename)
return all_shops
if __name__ == '__main__':
crawler = TaobaoShopCrawler()
keyword = input("请输入要采集的店铺关键词: ")
max_pages = int(input("请输入要采集的页数(1-50): "))
crawler.crawl_shops_by_keyword(keyword, max_pages)
import requests
import time
import random
class ProxyManager:
def __init__(self):
self.proxy_list = []
self.last_update = 0
self.update_interval = 3600 # 1小时更新一次
def fetch_proxies(self):
try:
response = requests.get(
'https://api.proxy.com/v1/proxies',
params={
'country': 'CN',
'protocol': 'http',
'anonymity': 'elite'
},
timeout=10
)
if response.status_code == 200:
self.proxy_list = response.json()['data']
self.last_update = time.time()
return True
except Exception as e:
print(f"获取代理失败: {e}")
return False
def get_random_proxy(self):
if not self.proxy_list or time.time() - self.last_update > self.update_interval:
self.fetch_proxies()
if not self.proxy_list:
return None
proxy = random.choice(self.proxy_list)
return {
'http': f"http://{proxy['ip']}:{proxy['port']}",
'https': f"http://{proxy['ip']}:{proxy['port']}"
}
pandas as pd
import matplotlib.pyplot as plt
from wordcloud import WordCloud
class DataAnalyzer:
def __init__(self, csv_file):
self.df = pd.read_csv(csv_file)
def basic_stats(self):
stats = {
'total_shops': len(self.df),
'avg_rating': self.df['rating_描述相符'].mean(),
'top_locations': self.df['location'].value_counts().head(5).to_dict()
}
return stats
def generate_wordcloud(self, save_path='wordcloud.png'):
text = ' '.join(self.df['name'].dropna())
wordcloud = WordCloud(
font_path='simhei.ttf',
width=800,
height=600,
background_color='white'
).generate(text)
plt.figure(figsize=(10, 8))
plt.imshow(wordcloud, interpolation='bilinear')
plt.axis('off')
plt.savefig(save_path)
plt.close()
def plot_rating_distribution(self, save_path='ratings.png'):
ratings = self.df[['rating_描述相符', 'rating_服务态度', 'rating_物流服务']]
ratings = ratings.apply(pd.to_numeric, errors='coerce')
plt.figure(figsize=(10, 6))
ratings.plot(kind='box')
plt.title('店铺评分分布')
plt.ylabel('评分')
plt.savefig(save_path)
plt.close()
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。