Twitter API流式传输(Streaming API)允许开发者实时接收推文数据,而不是通过传统的请求-响应模式获取。这种流式传输方式特别适合需要实时处理大量推文的场景。
首先需要:
pip install tweepy pymongo # 假设使用MongoDB
import tweepy
from pymongo import MongoClient
import json
# Twitter API认证信息
consumer_key = 'YOUR_CONSUMER_KEY'
consumer_secret = 'YOUR_CONSUMER_SECRET'
access_token = 'YOUR_ACCESS_TOKEN'
access_token_secret = 'YOUR_ACCESS_TOKEN_SECRET'
# MongoDB连接信息
mongo_uri = "mongodb://localhost:27017/"
db_name = "twitter_stream"
collection_name = "tweets"
# 创建MongoDB客户端
client = MongoClient(mongo_uri)
db = client[db_name]
collection = db[collection_name]
# 自定义StreamListener
class TweetStreamListener(tweepy.Stream):
def on_data(self, data):
try:
# 解析JSON数据
tweet = json.loads(data)
# 存储到MongoDB
collection.insert_one(tweet)
print(f"Tweet from @{tweet['user']['screen_name']} stored")
return True
except Exception as e:
print(f"Error: {e}")
return True
def on_error(self, status):
print(f"Error: {status}")
return True
# 主程序
if __name__ == "__main__":
# 创建StreamListener实例
stream = TweetStreamListener(
consumer_key, consumer_secret,
access_token, access_token_secret
)
# 过滤流式传输(这里跟踪"python"关键词)
try:
print("Starting stream...")
stream.filter(track=["python"])
except KeyboardInterrupt:
print("Stopping stream...")
finally:
client.close()
原因:Twitter API对流式连接有速率限制 解决方案:
原因:网络不稳定或Twitter服务端问题 解决方案:
原因:推文JSON结构复杂多变 解决方案:
原因:高吞吐量导致数据库写入延迟 解决方案:
import tweepy
from pymongo import MongoClient
import json
import time
from queue import Queue
from threading import Thread
# 配置信息
config = {
"twitter": {
"consumer_key": "YOUR_KEY",
"consumer_secret": "YOUR_SECRET",
"access_token": "YOUR_TOKEN",
"access_token_secret": "YOUR_TOKEN_SECRET"
},
"mongodb": {
"uri": "mongodb://localhost:27017/",
"db": "twitter_stream",
"collection": "tweets"
},
"batch_size": 100,
"track_keywords": ["python", "programming"]
}
# 批量处理器
class BatchProcessor:
def __init__(self, collection, batch_size=100):
self.collection = collection
self.batch_size = batch_size
self.batch = []
self.queue = Queue()
self.running = True
# 启动处理线程
self.thread = Thread(target=self._process_queue)
self.thread.start()
def add_tweet(self, tweet):
self.queue.put(tweet)
def _process_queue(self):
while self.running or not self.queue.empty():
try:
tweet = self.queue.get(timeout=1)
self.batch.append(tweet)
if len(self.batch) >= self.batch_size:
self._insert_batch()
except:
continue
# 插入剩余批次
if self.batch:
self._insert_batch()
def _insert_batch(self):
try:
self.collection.insert_many(self.batch)
print(f"Inserted {len(self.batch)} tweets")
self.batch = []
except Exception as e:
print(f"Batch insert failed: {e}")
def stop(self):
self.running = False
self.thread.join()
# 自定义StreamListener
class ResilientStreamListener(tweepy.Stream):
def __init__(self, processor, *args, **kwargs):
super().__init__(*args, **kwargs)
self.processor = processor
self.retry_count = 0
self.max_retries = 5
def on_data(self, data):
try:
tweet = json.loads(data)
self.processor.add_tweet(tweet)
self.retry_count = 0 # 重置重试计数器
return True
except Exception as e:
print(f"Error processing tweet: {e}")
return True
def on_error(self, status):
print(f"Stream error: {status}")
if status == 420: # 速率限制
wait_time = min(2 ** self.retry_count, 60)
print(f"Rate limited. Waiting {wait_time} seconds...")
time.sleep(wait_time)
self.retry_count += 1
if self.retry_count > self.max_retries:
print("Max retries reached. Stopping...")
return False
return True
# 主程序
def main():
# 初始化MongoDB
client = MongoClient(config["mongodb"]["uri"])
db = client[config["mongodb"]["db"]]
collection = db[config["mongodb"]["collection"]]
# 初始化批量处理器
processor = BatchProcessor(collection, config["batch_size"])
try:
# 创建流式监听器
stream = ResilientStreamListener(
processor,
config["twitter"]["consumer_key"],
config["twitter"]["consumer_secret"],
config["twitter"]["access_token"],
config["twitter"]["access_token_secret"]
)
print("Starting filtered stream...")
stream.filter(track=config["track_keywords"])
except KeyboardInterrupt:
print("\nStopping stream...")
except Exception as e:
print(f"Fatal error: {e}")
finally:
processor.stop()
client.close()
if __name__ == "__main__":
main()
这个增强版示例包含了错误处理、批量插入和速率限制处理,更适合生产环境使用。
没有搜到相关的文章