Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >微信公众号信息抓取方法(二)——抓取文章点赞、阅读、评论、小程序信息

微信公众号信息抓取方法(二)——抓取文章点赞、阅读、评论、小程序信息

作者头像
aox.lei
发布于 2018-09-10 03:35:14
发布于 2018-09-10 03:35:14
6.1K11
代码可运行
举报
文章被收录于专栏:Aox LeiAox Lei
运行总次数:1
代码可运行

上一篇文章文章将cookie信息保存到redis中, 则这一节主要是取出cookie, 并且构造方法去获取文章的点赞、阅读、评论、小程序信息, 而且不会访问文章内容页, 防止被微信认为是刷阅读数而封号, cookie的有效期保险一些为2个小时。所以在2个小时内一定要处理完数据

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
# crawl_like.py
# -*- coding:utf-8 -*-
''' 抓取文章点赞和评论'''

import json
import re
import time
import threading
from datetime import datetime
import requests
from bs4 import BeautifulSoup
from app.config import redis_key
from app import logger, _, __
from app.model.Article import Article
from app.model.ContentLikeRead import ContentLikeRead
from app.model.Comment import Comment
from app.model.WechatAccountMain import WechatAccountMain
from app.model.Weapp import Weapp
from app.lib.function import emjoyEncode, now
from hot_redis import List
from app.lib.function import parse_url

logger.name = __name__

class like(object):
    ''' 抓取文章点赞和评论 '''
    LIKE_URL = 'https://mp.weixin.qq.com/mp/getappmsgext?__biz=%s&appmsg_type=9&mid=%s&sn=%s&idx=%s&appmsg_token=%s&is_need_ad=0'
    COMMENT_URL = 'https://mp.weixin.qq.com/mp/appmsg_comment?action=getcomment&scene=0&__biz=%s&appmsgid=%s&idx=%d&comment_id=%s&offset=%d&limit=100'
    BODY_URL = 'https://mp.weixin.qq.com/mp/getverifyinfo?__biz=%s&type=reg_info#wechat_redirect'
    WEAPP_URL = 'https://mp.weixin.qq.com/mp/appmsg_weapp?action=batch_get_weapp&__biz=%s&mid=%s&idx=%d&weapp_num=1&weapp_appid_0=%s&weapp_sn_0=%s&appmsg_token=%s&x5=0&f=json'
    ARTICLE_URL = 'https://mp.weixin.qq.com/mp/profile_ext?action=getmsg&__biz=%s&f=json&offset=%d&count=10&is_ok=1&scene=124&appmsg_token=%s&uin=777&key=777&&x5=0&f=json'


    def run(self, uin):
        ''' 脚本执行入口 '''
        while True:
            # _task = List(key=redis_key.get('LIKE_HEADERS_PREFIX_KEY') + str(uin)).pop()
            _task = self.get_task() # 获取任务列表, 也就是获取cookie,biz等信息
            if not _task:
                time.sleep(10)
                continue

            _task = json.loads(_task, strict=False)

            _headers = _task.get('headers')
            _biz = _task.get('biz')
            _appmsg_token = _task.get('appmsg_token')

            wechatInfo = self.getBizInfo(_biz) # 获取公众号信息

            if wechatInfo is False:
                continue

            _threads = []

            contentList = Article().lists(wechatInfo.id) # 获取要抓取点赞的文章

            if contentList is False:
                continue

            for _content in contentList:
                _threads = []
                _content_body = self._get_content(_content.content_url)
                _comment_id = self._getCommentId(_content_body)
                _weapp_list = self.get_weapp_list(_content_body)

                _threads.append(threading.Thread(
                    target=self.crawl_like,
                    args=(_content.id, _headers, _biz, _content.mid,
                          _content.sn, _content.idx, _appmsg_token)))

                if int(_comment_id):
                    _threads.append(threading.Thread(
                        target=self.crawl_comment,
                        args=(_content.id, _headers, _biz, _content.mid,
                              _content.idx, _comment_id, 0)))

                if _weapp_list is not False:
                    for _weapp_value in _weapp_list:
                        _threads.append(threading.Thread(
                            target=self.crawl_weapp,
                            args=(wechatInfo.id, _content.id, {
                                'biz': _biz, 'mid': _content.mid,
                                'idx': _content.idx,
                                'appid': _weapp_value.get('appid'),
                                'sn': _weapp_value.get('sn'),
                                'appmsg_token': _appmsg_token
                            })))

                for _t in _threads:
                    _t.start()

                for _t in _threads:
                    _t.join()

                time.sleep(2)


            print('[%s] [账号:%s]%s --------------抓取成功' % (now(), str(uin), _biz))

    def crawl_weapp(self, wechat_account_id, content_id, url_info):
        '''
        抓取文章中的小程序
        '''
        _url = self.WEAPP_URL % (
            url_info.get('biz'), str(url_info.get('mid')),
            url_info.get('idx'), url_info.get('appid'),
            url_info.get('sn'), url_info.get('appmsg_token'))

        try:
            _requests = requests.get(_url, timeout=10)
            body = _requests.text
        except:
            logger.error(__('抓取小程序信息失败', {'url': _url}))
            return False

        data = json.loads(body, strict=False)

        if 'weapp_info' not in data:
            logger.warning(_('未获取到小程序信息数据', {'url': _url, 'body': body}))
            return False

        weapp_info = data.get('weapp_info')[0]
        if Weapp().info_by_appid(weapp_info.get('weapp_appid')) is False:
            Weapp().add({
                'wechat_account_id': wechat_account_id,
                'content_id': content_id,
                'name': weapp_info.get('nickname'),
                'homepage_url': weapp_info.get('homepage_url'),
                'logo_url': weapp_info.get('logo_url'),
                'weapp_appid': weapp_info.get('weapp_appid'),
                'weapp_username': weapp_info.get('weapp_username'),
                'app_version': weapp_info.get('app_version')
            })


    def crawl_like(self, content_id, headers, biz, mid, sn, idx, appmsg_token):
        ''' 抓取文章点赞数和阅读数 '''
        _url = self.LIKE_URL % (biz, mid, sn, str(idx), appmsg_token)
        _params = {'is_only_read': 1}
        body = ''
        try:
            body = requests.post(_url, headers=headers, data=_params, timeout=10)
            body = body.text
        except:
            logger.error(__('抓取点赞数失败', {'url': _url}))
            return False

        if body == '':
            return False

        body = json.loads(body, strict=False)

        if 'appmsgstat' not in body:
            logger.warning(_('未获取到点赞数据', {'url': _url, 'body': body}))
            return False

        _appmsgstat = body.get('appmsgstat')

        _like_num = _appmsgstat.get('like_num') if 'like_num' in _appmsgstat else 0
        _read_num = _appmsgstat.get('read_num') if 'read_num' in _appmsgstat else 0

        ContentLikeRead().add(
            content_id=content_id,
            like_num=_like_num,
            read_num=_read_num,
        )

    def crawl_comment(self, content_id, headers, biz, mid, idx, comment_id, offset=0):
        ''' 抓取文章评论 '''
        _url = self.COMMENT_URL % (biz, mid, idx, str(comment_id), offset)

        try:
            body = requests.get(_url, headers=headers, timeout=10)
            if re.compile(r'请在微信客户端打开链接', re.I).findall(body.text):
                return False
        except:
            logger.error(__('抓取评论失败:', {'url':_url, 'data':body.text}))
            return False

        body = json.loads(body.text, strict=False)
        _list = body.get('elected_comment')


        _data = []
        _reply_data = []
        for _value in _list:
            _tmp_data = {
                'content_id': content_id,
                'wx_content_id': _value.get('content_id'),
                'nick_name': emjoyEncode(_value.get('nick_name')),
                'content': emjoyEncode(_value.get('content')),
                'like_num': _value.get('like_num'),
                'is_top': _value.get('is_top'),
                'publish_time': time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(_value.get('create_time')))
            }
            _data.append(_tmp_data)
            if 'reply' in _value and 'reply_list' in _value.get('reply') and len(_value.get('reply').get('reply_list')) > 0:
                _reply_list = _value.get('reply').get('reply_list')
                for _reply_value in _reply_list:
                    _tmp_data = {
                        'wx_content_id': _value.get('content_id'),
                        'uin': _reply_value.get('uin'),
                        'publish_time': time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(_reply_value.get('create_time'))),
                        'to_uin': _reply_value.get('to_uin'),
                        'content': emjoyEncode(_reply_value.get('content')),
                        'like_num': _reply_value.get('like_num') if 'like_num' in _reply_value else 0
                    }
                    _reply_data.append(_tmp_data)

        _r = Comment().addAll(_data, _reply_data)
        return _r

    def _get_content(self, content_url):
        _headers = {
            'user_agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 7_1_2 like Mac OS X) AppleWebKit/537.51.2 (KHTML, like Gecko) Mobile/11D257 MicroMessenger/6.0.1 NetType/WIFI'
        }
        try:
            body = requests.get(content_url, headers=_headers, timeout=10)
            return body.text
        except:
            logger.error(__('抓取comment_id失败', {'url': content_url}))
            return False

    @classmethod
    def _getCommentId(self, body):
        try:
            _match = re.compile(r'var\s+comment_id\s+=\s+"(\d+)"\s+\*\s+\d;', re.I).findall(body)
        except:
            return False

        if _match:
            return _match[0]
        return False

    def get_weapp_list(self, body):
        '''
        从文章中获取weapp需要的信息
        '''
        try:
            _match = re.compile(r'var\s+weapp_sn_arr_json\s+=\s+"(.*?)";', re.I).findall(body)
        except:
            return False

        if not _match:
            return False

        _match = _match[0].replace('" || "', '').replace('\\x22', '"')
        if _match == '':
            return False
        data = json.loads(_match, strict=False)

        if not isinstance(data, (dict)):
            return False
        weapp_card_list = data.get('weapp_card_list')

        return weapp_card_list

    def getBizInfo(self, biz):
        info = WechatAccountMain().getInfoByBiz(biz)
        if info is False:
            return False
        if info.status != 1:
            return False

        return info
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
1 条评论
热度
最新
app包内容可否提供?
app包内容可否提供?
回复回复点赞举报
推荐阅读
编辑精选文章
换一批
Python爬取微信公众号文章和评论 (基于Fiddler抓包分析)
  感觉微信公众号算得是比较难爬的平台之一,不过一番折腾之后还是小有收获的。没有用Scrapy(估计爬太快也有反爬限制),但后面会开始整理写一些实战出来。简单介绍下本次的开发环境:
happyJared
2018/09/20
4.2K0
Python爬取微信公众号文章和评论 (基于Fiddler抓包分析)
基于PC端的爬取公众号历史文章
微信后台很多消息未回复:看到时已经回复不了。有问题可以添加我的微信:菜单 ->联系我
不断折腾
2019/09/23
2.6K0
基于PC端的爬取公众号历史文章
如何使用 Python 爬取微信公众号文章?
有时候我们遇到一个好的公众号,里面的每篇都是值得反复阅读的,这时就可以使用公众号爬虫将内容抓取保存下来慢慢赏析。
纯洁的微笑
2020/03/02
5K1
如何使用 Python 爬取微信公众号文章?
python之抓取微信公众号文章系列2
微信公众号历史的所有文章(来源???) 每篇文章的阅读量和点赞量(电脑上浏览文章只显示内容,没有阅读量、点赞量、评论……)
周小董
2019/03/25
4.5K1
python之抓取微信公众号文章系列2
微信公众号数据分析。
既然提到了公众号的数据分析,那必然少不了公众号的数据。 本次,以我一直关注的一个公众号「曹将」为例。 通过抓包软件Charles获取请求信息,得以获取公众号数据。 本次只获取公众号文章的部分信息。 对
张俊红
2019/05/14
1.4K0
微信公众号数据分析。
用Python获取公众号评论并生成词云图证明抖音无罪
近期为Python社会贡献了一份力量 Python社区越来越强大了,Python氛围越来越好了!希望每个人都能贡献自己的力量
龙哥
2018/10/22
9120
用Python获取公众号评论并生成词云图证明抖音无罪
使用Python快速获取公众号文章定制电子书(二)
接上篇文章使用Python快速获取公众号文章定制电子书(一)。我们现在已经成功的将公众号历史消息的前十条文章给爬取了出来,使用 content_url 这个关键字段,我们便可以轻易的获取文章具体内容,并将文章保存到本地文件中。实际上上面这些东西已经是我们实现爬取公号文章的核心功能了,剩下的就是如何通过某种方式将公众号的所有文章一次性爬取出来。
小之丶
2018/07/27
5730
使用Python快速获取公众号文章定制电子书(二)
python selenium 微信公众号历史文章随手一点就返回首页?郁闷之下只好将他们都下载下来。
参照资料:selenium webdriver 如何添加cookie: https://www.cnblogs.com/sundahua/p/10202494.html
forxtz
2020/10/10
7690
python selenium 微信公众号历史文章随手一点就返回首页?郁闷之下只好将他们都下载下来。
springboot批量爬取微信公众号信息及视频下载
1. 准备需要爬取的公众号链接(例如:https://mp.weixin.qq.com/s/GPz-w3_gS8jsgINJH9t6vw).下面的是整合了160多个公众号文章的地址.
程序员小藕
2021/08/10
1.3K1
50行Python代码,教你获取公众号全部文章
小詹说:我们平时阅读公众号的文章会遇到一个问题——阅读历史文章体验不好。的确如此,小詹自己也这么认为。所以今天分享的是好朋友 Python3X 的一篇干货分享,原文如下:
小小詹同学
2019/07/12
2.6K0
50行Python代码,教你获取公众号全部文章
微信公众号信息抓取方法(一)——抓取公众号历史消息列表数据
研究微信抓取之前, 看过知乎有大神写的比较完善的例子, 受到启发, 才完成了整个微信公众号的抓取。 微信公众号内容的批量采集与应用 微信抓取的难点: 1. 无法获取到微信公众号的信息(微信并没有提供列表) 2. 无法脱离客户端获取微信公众号历史消息页面 3. 可以获取到文章内容页但是脱离客户端后无法获取到点赞、阅读数据
aox.lei
2018/09/10
13.4K0
微信公众号爬虫 微信公众号爬虫
爬取步骤: 1. get_cookie.py用selenium登陆,获取cookie,其中你需要勾选“记住”选项,还需要微信扫描二维码,确定顺利登陆
尾尾部落
2018/09/04
8.8K0
python爬虫公众号所有信息,并批量下载公众号视频
本篇添加一个批量下载公众号内视频的功能,可以实现完全复制一个公众号,危险动作,请不要操作!谢谢
Python疯子
2019/12/16
2.9K0
python爬虫公众号所有信息,并批量下载公众号视频
Python 系列文章 —— WxCrawler
WxCrawler # coding:utf-8 import requests import re import html import demjson import json from bs4 import BeautifulSoup import urllib3 class WxCrawler(object): urllib3.disable_warnings() #Hearders,x-wechat-key 会过期,会出验证问题 headers = """Connect
玩转编程
2022/01/15
3180
如何快速抄写公众号文章
就是照搬被人文章到公众号上,一般格式是不能复制粘贴的,怎么办呢,爬源码 import requests import re import time from lxml import html from
用户1733462
2018/06/07
1.2K0
Python轻松抓取微信公众号文章
今天继续向 Python 头条添加数据信息,完成了微信公号的爬虫,接下来会继续通过搜狗的知乎搜索抓取知乎上与 Python 相关的文章、问答。微信公众号的文章链接有些是具有时效性的,过一段时间会变成参数错误而无法访问,但是我们发现从公众号后台点击过去得到的链接却是永久链接,其参数不会改变链接也不会失效,也就是说只要能够获得这些参数就可以得到永久链接。通过观察发现即使从搜狗搜索入口的有时效性的链接访问网页,其源码中也带有这些参数:
程序员皮克
2021/12/14
2K0
Python轻松抓取微信公众号文章
Python Fiddler抓包工具教学,获取公众号(pc客户端)数据
前言 今天来教大家如何使用Fiddler抓包工具,获取公众号(PC客户端)的数据。 Fiddler是位于客户端和服务器端的HTTP代理,是目前最常用的http抓包工具之一。 开发环境 python 3.8 运行代码 pycharm 2021.2 辅助敲代码 requests 第三方模块 Fiddler 汉化版 抓包的工具 微信PC端 如何抓包 配置Fiddler环境 先打开Fiddler,选择工具,再选选项 在选项窗口里点击HTTPS,把勾选框都勾选上 在选项窗口里点击链接,把勾选框都勾选上,然后点
松鼠爱吃饼干
2023/03/08
3.5K0
Python Fiddler抓包工具教学,获取公众号(pc客户端)数据
使用chatgpt编写公众号订推文的脚本
这个脚本首先打开Chrome浏览器,然后打开微信公众号平台登录页面。在登录后,它会跳转到发布文章页面,填写文章标题和正文,并点击发布按钮。最后,它会等待一段时间以确保文章发布成功,然后关闭浏览器。
Tom2Code
2023/08/31
5860
使用chatgpt编写公众号订推文的脚本
小程序访问公众号文章
随着小程序不断的发展,现在个人的小程序也开放了很多功能了,个人小程序直接打开公众号链接。在群里看到的一款小程序,点击可以直接阅读文章了,所以琢磨了一下,写了一些源码。
王小婷
2025/05/18
1210
小程序访问公众号文章
Python——如何优雅的爬取公众号信息
最近两个周业余时间在赶的一个项目,因为精力有限所以进展缓慢,索性就先把接近完善的这部分代码,先分享出来吧。
Ed_Frey
2019/09/16
3K0
Python——如何优雅的爬取公众号信息
推荐阅读
相关推荐
Python爬取微信公众号文章和评论 (基于Fiddler抓包分析)
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验