首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >抖音评论采集提取工具,uid个人主页链接,python抖音评论采集软件

抖音评论采集提取工具,uid个人主页链接,python抖音评论采集软件

原创
作者头像
用户11744395
发布2025-07-17 09:48:13
发布2025-07-17 09:48:13
1.4K0
举报

下载地址:https://www.pan38.com/dow/share.php?code=JCnzE 提取密码:1133

这个抖音评论采集工具提供了完整的功能实现,包括从视频链接或用户UID采集评论、用户信息获取、数据保存等功能。使用时需要配置有效的Cookie,并注意遵守平台规则。

代码语言:txt
复制

import requests
import json
import time
import re
from typing import List, Dict, Optional
from urllib.parse import urlparse, parse_qs
import csv
import os
from datetime import datetime

class DouyinCommentCrawler:
    def __init__(self):
        self.headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
            'Referer': 'https://www.douyin.com/',
            'Cookie': ''  # 需要填写有效的cookie
        }
        self.session = requests.Session()
        self.session.headers.update(self.headers)
        self.base_url = "https://www.douyin.com/"
        self.api_url = "https://www.douyin.com/aweme/v1/web/comment/list/"
        self.max_retries = 3
        self.comment_count = 0
        
    def get_aweme_id(self, url: str) -> Optional[str]:
        """从视频链接中提取aweme_id"""
        try:
            parsed = urlparse(url)
            if 'douyin.com' not in parsed.netloc:
                return None
                
            path = parsed.path
            if '/video/' in path:
                return path.split('/video/')[1].split('/')[0]
            elif 'aweme_id' in parsed.query:
                return parse_qs(parsed.query)['aweme_id'][0]
            return None
        except Exception as e:
            print(f"提取aweme_id失败: {e}")
            return None
            
    def get_user_info(self, uid: str) -> Dict:
        """获取用户基本信息"""
        url = f"https://www.douyin.com/aweme/v1/web/user/profile/other/"
        params = {
            'sec_user_id': uid,
            'device_platform': 'webapp',
            'aid': '6383',
            'channel': 'channel_pc_web'
        }
        
        for _ in range(self.max_retries):
            try:
                response = self.session.get(url, params=params)
                if response.status_code == 200:
                    return response.json()
                time.sleep(1)
            except Exception as e:
                print(f"获取用户信息失败: {e}")
                time.sleep(2)
        return {}
        
    def get_video_list(self, uid: str, count: int = 20) -> List[Dict]:
        """获取用户发布的视频列表"""
        url = f"https://www.douyin.com/aweme/v1/web/aweme/post/"
        params = {
            'sec_user_id': uid,
            'count': count,
            'max_cursor': 0,
            'device_platform': 'webapp',
            'aid': '6383'
        }
        
        video_list = []
        try:
            response = self.session.get(url, params=params)
            if response.status_code == 200:
                data = response.json()
                video_list = data.get('aweme_list', [])
        except Exception as e:
            print(f"获取视频列表失败: {e}")
        return video_list
        
    def get_comments(self, aweme_id: str, count: int = 20, max_comments: int = 1000) -> List[Dict]:
        """获取视频评论"""
        comments = []
        cursor = 0
        retry_count = 0
        
        while len(comments) < max_comments:
            params = {
                'aweme_id': aweme_id,
                'cursor': cursor,
                'count': count,
                'aid': '1128',
                'version_code': '170400',
                'device_platform': 'android'
            }
            
            try:
                response = self.session.get(self.api_url, params=params)
                if response.status_code == 200:
                    data = response.json()
                    new_comments = data.get('comments', [])
                    if not new_comments:
                        break
                        
                    comments.extend(new_comments)
                    cursor = data.get('cursor', cursor + count)
                    has_more = data.get('has_more', 0)
                    if not has_more:
                        break
                        
                    self.comment_count += len(new_comments)
                    print(f"已获取 {self.comment_count} 条评论")
                    
                    # 防止请求过快
                    time.sleep(0.5)
                    retry_count = 0
                else:
                    retry_count += 1
                    if retry_count >= self.max_retries:
                        break
                    time.sleep(2)
            except Exception as e:
                print(f"获取评论失败: {e}")
                retry_count += 1
                if retry_count >= self.max_retries:
                    break
                time.sleep(2)
                
        return comments
        
    def save_to_csv(self, data: List[Dict], filename: str):
        """保存数据到CSV文件"""
        if not data:
            return
            
        keys = data[0].keys()
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        filename = f"{filename}_{timestamp}.csv"
        
        try:
            with open(filename, 'w', newline='', encoding='utf-8-sig') as f:
                writer = csv.DictWriter(f, fieldnames=keys)
                writer.writeheader()
                writer.writerows(data)
            print(f"数据已保存到 {filename}")
        except Exception as e:
            print(f"保存文件失败: {e}")
            
    def process_comment_data(self, comments: List[Dict]) -> List[Dict]:
        """处理评论数据"""
        processed = []
        for comment in comments:
            try:
                user = comment.get('user', {})
                processed.append({
                    'comment_id': comment.get('cid', ''),
                    'content': comment.get('text', ''),
                    'create_time': comment.get('create_time', 0),
                    'digg_count': comment.get('digg_count', 0),
                    'reply_count': comment.get('reply_count', 0),
                    'user_id': user.get('uid', ''),
                    'user_nickname': user.get('nickname', ''),
                    'user_signature': user.get('signature', ''),
                    'user_avatar': user.get('avatar_thumb', {}).get('url_list', [''])[0]
                })
            except Exception as e:
                print(f"处理评论数据失败: {e}")
        return processed
        
    def run(self, input_data: str, max_comments: int = 500):
        """主运行方法"""
        # 判断输入是UID还是视频链接
        if 'douyin.com' in input_data:
            aweme_id = self.get_aweme_id(input_data)
            if not aweme_id:
                print("无效的视频链接")
                return
                
            comments = self.get_comments(aweme_id, max_comments=max_comments)
            processed = self.process_comment_data(comments)
            self.save_to_csv(processed, f"douyin_comments_{aweme_id}")
        else:
            # 假设输入是UID
            user_info = self.get_user_info(input_data)
            if not user_info:
                print("获取用户信息失败")
                return
                
            print(f"开始采集用户 {user_info.get('nickname', '')} 的视频评论")
            videos = self.get_video_list(input_data)
            for video in videos:
                aweme_id = video.get('aweme_id', '')
                desc = video.get('desc', '无标题')
                print(f"\n开始采集视频: {desc}")
                
                comments = self.get_comments(aweme_id, max_comments=max_comments)
                processed = self.process_comment_data(comments)
                self.save_to_csv(processed, f"douyin_comments_{aweme_id}")
                
                # 每个视频之间间隔
                time.sleep(2)

if __name__ == "__main__":
    crawler = DouyinCommentCrawler()
    
    print("抖音评论采集工具")
    print("1. 通过视频链接采集")
    print("2. 通过用户UID采集")
    choice = input("请选择采集方式(1/2): ")
    
    if choice == '1':
        url = input("请输入抖音视频链接: ")
        crawler.run(url)
    elif choice == '2':
        uid = input("请输入用户UID: ")
        crawler.run(uid)
    else:
        print("无效选择")

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
作者已关闭评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档