首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >如何将搜索关键字附加到twitter json数据?

如何将搜索关键字附加到twitter json数据?
EN

Stack Overflow用户
提问于 2019-04-12 13:18:25
回答 1查看 126关注 0票数 1

我正在做卡夫卡的推特流媒体数据。我设法流式传输数据并使用twitter json。但是现在我如何创建一个包含twitter数据和搜索关键字的pyspark数据帧呢?

下面是我如何编写kafka producer

我设法从twitter对象创建了我想要的数据框架。但我不知道如何获取搜索关键字。

代码语言:javascript
运行
复制
class StdOutListener(StreamListener):
def __init__(self, producer):
    self.producer_obj = producer

#on_status is activated whenever a tweet has been heard
def on_data(self, data):
    try:
        self.producer_obj.send("twitterstreamingdata", data.encode('utf-8'))
        print(data)
        return True
    except BaseException as e:
        print("Error on_data: %s" % str(e))
    return True

# When an error occurs
def on_error(self, status):
    print (status)
    return True

# When reach the rate limit
def on_limit(self, track):
    # Print rate limiting error
    print("Rate limited, continuing")
    # Continue mining tweets
    return True

# When timed out
def on_timeout(self):
    # Print timeout message
    print(sys.stderr, 'Timeout...')
    # Wait 10 seconds
    time.sleep(120)
    return True  # To continue listening

def on_disconnect(self, notice):
    #Called when twitter sends a disconnect notice
    return


if __name__ == '__main__':

spark = SparkSession \
    .builder \
    .appName("Kafka Producer Application") \
    .getOrCreate()

#This is the initialization of Kafka producer
producer = KafkaProducer(bootstrap_servers='xx.xxx.xxx.xxx:9092')

#This handles twitter auth and the conn to twitter streaming API
auth = OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
stream = Stream(auth, StdOutListener(producer))

print("Kafka Producer Application: ")

WORDS = input("Enter any words: ")
print ("Is this what you just said?", WORDS)
word = [u for u in WORDS.split(',')]
#This line filter twitter stream to capture data by keywords
stream.filter(track=word)
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-04-13 02:00:33

解决这个问题的一种方法是将StdOutListener类构造函数更改为接收"keyword“参数,并在"on_data”函数中将"keyword“添加到JSON以发送给Kafka

代码语言:javascript
运行
复制
import json
import sys
import time

from kafka import KafkaProducer
from pyspark.sql import SparkSession
from tweepy import StreamListener, Stream, OAuthHandler


class StdOutListener(StreamListener):

    def __init__(self, producer: KafkaProducer = None, keyword=None):
        super(StreamListener, self).__init__()
        self.producer = producer
        self.keyword = keyword

    # on_status is activated whenever a tweet has been heard
    def on_data(self, data):
        try:
            data = json.loads(data)
            data['keyword'] = self.keyword
            data = json.dumps(data)
            self.producer.send("twitterstreamingdata", data.encode('utf-8'))
            return True
        except BaseException as e:
            print("Error on_data: %s" % str(e))
        return True

    # When an error occurs
    def on_error(self, status):
        print(status)
        return True

    # When reach the rate limit
    def on_limit(self, track):
        # Print rate limiting error
        print("Rate limited, continuing")
        # Continue mining tweets
        return True

    # When timed out
    def on_timeout(self):
        # Print timeout message
        print(sys.stderr, 'Timeout...')
        # Wait 10 seconds
        time.sleep(120)
        return True  # To continue listening

    def on_disconnect(self, notice):
        # Called when twitter sends a disconnect notice
        return


if __name__ == '__main__':
    CONSUMER_KEY = 'YOUR CONSUMER KEY'
    CONSUMER_SECRET = 'YOUR CONSUMER SECRET'
    ACCESS_TOKEN = 'YOUR ACCESS TOKEN'
    ACCESS_SECRET = 'YOUR ACCESS SECRET'

    print("Kafka Producer Application: ")
    words = input("Enter any words: ")
    print("Is this what you just said?", words)
    word = [u for u in words.split(',')]

    spark = SparkSession \
        .builder \
        .appName("Kafka Producer Application") \
        .getOrCreate()

    # This is the initialization of Kafka producer
    kafka_producer = KafkaProducer(bootstrap_servers='35.240.157.219:9092')
    # This handles twitter auth and the conn to twitter streaming API
    auth = OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
    auth.set_access_token(ACCESS_TOKEN, ACCESS_SECRET)
    stream = Stream(auth, StdOutListener(producer=kafka_producer, keyword=word))
    stream.filter(track=word)

希望能对你有所帮助!

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/55644934

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档