我想从流数据中计算数据,然后发送到网页。例如,:我将在流数据中计算TotalSales列的和。,但它在summary = dataStream.select('TotalSales').groupby().sum().toPandas()上出错,这是我的代码。
import os
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType
from pyspark.sql.functions import *
spark = SparkSession.builder.appName
我有来自卡夫卡主题的数据。在我的dataframe中,我有“平均”列,我希望通过对“平均”列执行一些计算来创建新列。
目前我写的是-
rdd_get_chills = df_avg_tmp.rdd.map(lambda line:get_wind_chills(line))
但这是个错误-
pyspark.sql.utils.AnalysisException: Queries with streaming sources must be executed with writeStream.start();
我想对我从一个卡夫卡集群中获得的消息流执行tweet情绪分析,该集群反过来从Twitter v2中获取这些消息。
当我尝试应用预先训练过的情感分析管道时,我会收到一条错误消息,上面写着:Exception: target must be either a spark DataFrame, a list of strings or a string,我想知道是否有办法解决这个问题。
我已经检查了文档,在流数据上找不到任何东西。
这是我使用的代码:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functi
到目前为止,Spark还没有为流数据创建DataFrame,但是当我进行异常检测时,使用DataFrame进行数据分析更方便、更快。我已经做了这一部分,但是当我尝试用流式数据进行实时异常检测时,问题就出现了。我尝试了几种方法,但仍然无法将DStream转换为DataFrame,也无法将DStream中的RDD转换为DataFrame。
下面是我最新版本的代码的一部分:
import sys
import re
from pyspark import SparkContext
from pyspark.sql.context import SQLContext
from pyspark.sql
每当我试图转换它时,这就是我得到的例外。
val df_col = df.select("ts.user.friends_count").collect.map(_.toSeq)
org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
我所要做的就是在结构化流中复制以下sql.dataframe操作。
df.collect().foreach(row => droolsCaseClass(row.ge
我试着用spark连接到kafka的话题。它不会读取数据流中的任何数据,也不会产生任何错误。下面是我的jupyter代码:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2 pyspark-shell'
from pretty import pprint
from pyspark import SparkContext
from pyspark.streaming import Stream
当我从Kafka主题创建一个流并打印它的内容时
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2 pyspark-shell'
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka impo
我正在使用pandasUDF将标准的ML python库应用于pyspark DataFrame。在定义了模式并进行了预测之后,我得到了pyspark DF作为输出。现在,我想用这个预测数据帧做一些事情,例如,我尝试对列"weekly_forecast_1“中的所有值进行求和。当我应用.collect()或.toPandas()方法时,在.fit()中得到以下错误 IndexError: too many indices for array:array is 0-dimensional, but 1 were indexed 每当我尝试将.collect()或.toPandas()方
我有一个要求,把从火花放电脚本创建的日志推到kafka。我正在做POC,所以在windows机器上使用Kafka二进制文件。我的版本是- kafka - 2.4.0,火花- 3.0和python-3.8.1。我用的是吡喃编辑器。
import sys
import logging
from datetime import datetime
try:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka im
带有火花流的Kafka抛出了一个错误:
from pyspark.streaming.kafka import KafkaUtils ImportError: No module named kafka
我已经建立了一个卡夫卡经纪人和一个工作火花环境与一个主人和一个工人。
import os
os.environ['PYSPARK_PYTHON'] = '/usr/bin/python2.7'
import findspark
findspark.init('/usr/spark/spark-3.0.0-preview2-bin-hadoop2.7
我对kafka非常陌生,我试图将数据写到一个主题中,并从同一个主题中读取数据(我们现在作为一个源团队来摄取数据)。因此,我们在对Kafk主题进行写操作,并从相同的主题中使用)。我在星火壳上编写了下面的代码,以便将数据写入一个Kafka主题。
pyspark --packages io.delta:delta-core_2.11:0.6.1,org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0,io.strimzi:kafka-oauth-client:0.5.0
from pyspark.sql.functions import col
from
我已经为pyspark dataframe中的两个特性应用了groupby和计算标准差。 from pyspark.sql import functions as f
val1 = [('a',20,100),('a',100,100),('a',50,100),('b',0,100),('b',0,100),('c',0,0),('c',0,50),('c',0,100),('c',0,20)]
cols = ['group',
我正在使用twitter流函数,它提供了一个流。我需要使用星火writeStream函数,如:
// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
val ds = df
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap
我在DataFrame中有3列:- time:TimeStamp,col1:Double,col2:Double我想执行以下操作:
dataFrame.withWatermark("time", "10 seconds")
.groupBy(window(col("time"),"10 seconds","1 second"))
.agg(mean("col1") with window of 10 seconds,max("col") with