运行效果:
搜狗微信搜索下拉框词采集核心代码参考:
#微信搜索下拉词采集
#首发:微信公众号:二爷记
#author:微信:huguo00289
# -*- coding: UTF-8 -*-
import requests
from fake_useragent import UserAgent
from urllib import parse
#采集微信搜索下拉词
def get_words(keyword):
word=parse.quote(keyword)
headers={
"User-Agent":UserAgent().random,
}
url="https://weixin.sogou.com/sugg/ajaj_json.jsp?key={}".format(word)
html=requests.get(url=url,headers=headers,timeout=5).text
content=html.replace("window.sogou.sug(",'').replace(')','').split(',[')[1]
words=content.replace("]",'').replace('"','').split(',')
print("采集 {} 微信搜索下拉词成功!".format(keyword))
print(words)
print("-"*100)
return words
写的比较渣,见谅,尤其是那段字符格式化处理,咯里吧嗦的!!
多线程采集源码参考:
#微信搜索下拉词采集
#首发:微信公众号:二爷记
#author:微信:huguo00289
# -*- coding: UTF-8 -*-
import requests
from fake_useragent import UserAgent
from urllib import parse
import threading
#采集微信搜索下拉词
def get_words(keyword):
word=parse.quote(keyword)
headers={
"User-Agent":UserAgent().random,
}
url="https://weixin.sogou.com/sugg/ajaj_json.jsp?key={}".format(word)
html=requests.get(url=url,headers=headers,timeout=5).text
content=html.replace("window.sogou.sug(",'').replace(')','').split(',[')[1]
words=content.replace("]",'').replace('"','').split(',')
print("采集 {} 微信搜索下拉词成功!".format(keyword))
print(words)
print("-"*100)
return words
#多线程采集微信搜索下拉词
def smt_words(words):
threadings=[]
for word in words:
t= threading.Thread(target=get_words,args=(word,))
threadings.append(t)
t.start()
for x in threadings:
x.join()
print("多线程采集微信搜索下拉词完成!")
if __name__=='__main__':
words=get_words("21考研")
smt_words(words)
异步采集源码参考
#微信搜索下拉词异步采集
#首发:微信公众号:二爷记
#author:微信:huguo00289
#https://www.cnblogs.com/shenh/p/9090586.html
#https://blog.csdn.net/a__int__/article/details/104600972
# -*- coding: UTF-8 -*-
from fake_useragent import UserAgent
from urllib import parse
import aiohttp
import asyncio
loop = asyncio.get_event_loop()
class Async_get_words:
def __init__(self):
super().__init__() #super().__init__(),就是继承父类的init方法,同样可以使用super()点 其他方法名,去继承其他方法
self.CONCURRENCY = 10 #并发量为10
self.session = None
self.semaphore = asyncio.Semaphore(self.CONCURRENCY) #限制并发量
self.headers={
"User-Agent":UserAgent().random,
}
#异步协议访问
async def request(self, url):
async with self.semaphore:
# try:
print('getting', url)
# 添加ssl=False 防止SSLCertVerificationError
async with self.session.get(url,ssl=False,headers=self.headers) as response:
await asyncio.sleep(1)
return await response.read()
# except client_exceptions.ServerDisconnectedError:
# print('ServerDisconnectedError occurred while scraping ',url)
#处理数据
async def get_words(self,keyword):
word = parse.quote(keyword)
url = "https://weixin.sogou.com/sugg/ajaj_json.jsp?key={}".format(word)
repsonse= await self.request(url)
#print(repsonse)
html =repsonse.decode('gbk') #用utf-8解析
print(html)
content = html.replace("window.sogou.sug(", '').replace(')', '').split(',[')[1]
words = content.replace("]", '').replace('"', '').split(',')
print("采集 {} 微信搜索下拉词成功!".format(keyword))
print(words)
print("-" * 100)
return words
async def main(self,keywords):
self.session = aiohttp.ClientSession()
# 添加任务一行写法
tasks = [asyncio.ensure_future(self.get_words(keyword)) for keyword in keywords]
# tasks = []
# for keyword in keywords:
# tasks.append(asyncio.ensure_future(self.get_words(keyword)))
await asyncio.gather(*tasks)
await self.session.close()
def run(self):
keywords=['21考研时间', '21考研政治答案', '21考研数学答案', '21考研英语答案', '21考研英语', '21考研政治', '21考研人数', '21考研成绩什么时候公布', '21考研准考证打印时间', '21考研答案']
loop.run_until_complete(self.main(keywords))
if __name__ == '__main__':
spider=Async_get_words()
spider.run()
本文分享自 Python与SEO学习 微信公众号,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。
本文参与 腾讯云自媒体同步曝光计划 ,欢迎热爱写作的你一起参与!