sách gpt4 ai đã đi

scala - Spark 斯卡拉: User defined aggregate function that calculates median

In lại 作者:行者123 更新时间:2023-12-02 14:11:26 27 4
mua khóa gpt4 Nike

我正在尝试找到一种方法来计算给定数据帧的中位数。

val df = sc.parallelize(Seq(("a",1.0),("a",2.0),("a",3.0),("b",6.0), ("b", 8.0))).toDF("col1", "col2")

+----+----+
|col1|col2|
+----+----+
| a| 1.0|
| a| 2.0|
| a| 3.0|
| b| 6.0|
| b| 8.0|
+----+----+

现在我想做那样的事情:
df.groupBy("col1").agg(calcmedian("col2"))

结果应该是这样的:

+----+------+
|col1|median|
+----+------+
| a| 2.0|
| b| 7.0|
+----+------+`

因此 calcmedian() 必须是一个 UDAF,但问题是,UDAF 的“evaluate”方法只需要一行,但我需要整个表对值进行排序并返回中位数...

// Once all entries for a group are exhausted, spark will evaluate to get the final result  
def evaluate(buffer: Row) = {...}

这有可能吗?或者还有其他好的解决方法吗?我想强调的是,我知道如何计算“一组”数据集的中位数。但我不想在“foreach”循环中使用这个算法,因为这是低效的!

Cảm ơn!

<小时>

biên tập:

这就是我到目前为止所尝试的:

object calcMedian extends UserDefinedAggregateFunction {
// Schema you get as an input
def inputSchema = new StructType().add("col2", DoubleType)
// Schema of the row which is used for aggregation
def bufferSchema = new StructType().add("col2", DoubleType)
// Returned type
def dataType = DoubleType
// Self-explaining
def deterministic = true
// initialize - called once for each group
def initialize(buffer: MutableAggregationBuffer) = {
buffer(0) = 0.0
}

// called for each input record of that group
def update(buffer: MutableAggregationBuffer, input: Row) = {
buffer(0) = input.getDouble(0)
}
// if function supports partial aggregates, spark might (as an optimization) comput partial results and combine them together
def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
buffer1(0) = input.getDouble(0)
}
// Once all entries for a group are exhausted, spark will evaluate to get the final result
def evaluate(buffer: Row) = {
val tile = 50
var median = 0.0

//PROBLEM: buffer is a Row --> I need DataFrame here???
val rdd_sorted = buffer.sortBy(x => x)
val c = rdd_sorted.count()
if (c == 1){
median = rdd_sorted.first()
}khác{
val index = rdd_sorted.zipWithIndex().map(_.swap)
val last = c
val n = (tile/ 100d) * (c*1d)
val k = math.floor(n).toLong
val d = n - k
if( k <= 0) {
median = rdd_sorted.first()
}khác{
if (k <= c){
median = index.lookup(last - 1).head
}khác{
if(k >= c){
median = index.lookup(last - 1).head
}khác{
median = index.lookup(k-1).head + d* (index.lookup(k).head - index.lookup(k-1).head)
}
}
}
}
} //end of evaluate

1 Câu trả lời

Hãy thử cách này:

import org.apache.spark.functions._

val result = data.groupBy("col1").agg(callUDF("percentile_approx", col("col2"), lit(0.5)))

关于scala - Spark 斯卡拉: User defined aggregate function that calculates median,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37590230/

27 4 0
行者123
Hồ sơ cá nhân

Tôi là một lập trình viên xuất sắc, rất giỏi!

Nhận phiếu giảm giá Didi Taxi miễn phí
Mã giảm giá Didi Taxi
Giấy chứng nhận ICP Bắc Kinh số 000000
Hợp tác quảng cáo: 1813099741@qq.com 6ren.com