- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在尝试找到一种方法来计算给定数据帧的中位数。
val df = sc.parallelize(Seq(("a",1.0),("a",2.0),("a",3.0),("b",6.0), ("b", 8.0))).toDF("col1", "col2")
+----+----+
|col1|col2|
+----+----+
| a| 1.0|
| a| 2.0|
| a| 3.0|
| b| 6.0|
| b| 8.0|
+----+----+
现在我想做那样的事情:df.groupBy("col1").agg(calcmedian("col2"))
结果应该是这样的:
+----+------+
|col1|median|
+----+------+
| a| 2.0|
| b| 7.0|
+----+------+`
因此 calcmedian() 必须是一个 UDAF,但问题是,UDAF 的“evaluate”方法只需要一行,但我需要整个表对值进行排序并返回中位数...
// Once all entries for a group are exhausted, spark will evaluate to get the final result
def evaluate(buffer: Row) = {...}
这有可能吗?或者还有其他好的解决方法吗?我想强调的是,我知道如何计算“一组”数据集的中位数。但我不想在“foreach”循环中使用这个算法,因为这是低效的!
Cảm ơn!
<小时>小时>biên tập:
这就是我到目前为止所尝试的:
object calcMedian extends UserDefinedAggregateFunction {
// Schema you get as an input
def inputSchema = new StructType().add("col2", DoubleType)
// Schema of the row which is used for aggregation
def bufferSchema = new StructType().add("col2", DoubleType)
// Returned type
def dataType = DoubleType
// Self-explaining
def deterministic = true
// initialize - called once for each group
def initialize(buffer: MutableAggregationBuffer) = {
buffer(0) = 0.0
}
// called for each input record of that group
def update(buffer: MutableAggregationBuffer, input: Row) = {
buffer(0) = input.getDouble(0)
}
// if function supports partial aggregates, spark might (as an optimization) comput partial results and combine them together
def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
buffer1(0) = input.getDouble(0)
}
// Once all entries for a group are exhausted, spark will evaluate to get the final result
def evaluate(buffer: Row) = {
val tile = 50
var median = 0.0
//PROBLEM: buffer is a Row --> I need DataFrame here???
val rdd_sorted = buffer.sortBy(x => x)
val c = rdd_sorted.count()
if (c == 1){
median = rdd_sorted.first()
}khác{
val index = rdd_sorted.zipWithIndex().map(_.swap)
val last = c
val n = (tile/ 100d) * (c*1d)
val k = math.floor(n).toLong
val d = n - k
if( k <= 0) {
median = rdd_sorted.first()
}khác{
if (k <= c){
median = index.lookup(last - 1).head
}khác{
if(k >= c){
median = index.lookup(last - 1).head
}khác{
median = index.lookup(k-1).head + d* (index.lookup(k).head - index.lookup(k-1).head)
}
}
}
}
} //end of evaluate
1 Câu trả lời
Hãy thử cách này:
import org.apache.spark.functions._
val result = data.groupBy("col1").agg(callUDF("percentile_approx", col("col2"), lit(0.5)))
关于scala - Spark 斯卡拉: User defined aggregate function that calculates median,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37590230/
scala 中是否有任何等效于 C# 部分类的东西?我想用这样的对象离开我的功能: // file 1: object MainClass { def addValue(value: AnyR
你能定义一组变量供以后使用吗? 这里有一些伪代码突出了我的意图: def coordinates = x1, y1, x2, y2 log("Drawing from (%4.1f, %4.1f) t
在我的应用程序中,我有很多地方需要获取元组列表,按元组的第一个元素对其进行分组,然后将其从其余元素中删除。例如,我有元组 (1, "Joe", "Account"), (1, "Tom", "Empl
我在 Scala 中的类声明遇到了麻烦: class Class2[ A, B class Foo extends Class3[String, Foo] define
给定以下场景 val items = List("a", "b", "c", 1, 2, 3, false, true) def intItems = items.collect {case i :
我有一个我想忽略的日期列表: private val excludeDates = List( new DateTime("2015-07-17"),
我想创建一个方法,它将一个选项数组和一个默认值作为参数,并返回第一个非空选项,否则返回默认值: def customGetOrElse[T](options : Array[Option[T]], d
试图生成一个显示素因数多重性的元组列表......这个想法是将排序列表中的每个整数与元组中的第一个值相匹配,使用第二个值进行计数。使用 takeWhile 可能更容易做到这一点,但是嗯。不幸的是,我的
我是 Scala 新手,但有一些 Java 背景。 在编写 Scala 代码时,以这种方式处理 Option 参数很有用: val text = Option("Text") val length =
这个问题已经有答案了: Use of def, val, and var in scala (6 个回答) 已关闭 9 年前。 我正在寻找一种方法来解决 Scala 中的以下编译错误。我正在尝试更新变
我有一种情况,我想作为 future 并发执行多个任务,这样如果其中一个任务失败,其他任务仍然会执行。如果失败,我想记录它的错误。我希望我的父线程能够判断每个线程是否成功,然后根据它执行一些操作。例如
我被教导了 formal systems在大学时,但我很失望他们似乎并没有被真正使用。 我喜欢能够知道某些代码(对象、函数等)是否有效的想法,而不是通过测试,而是通过证明。 我相信我们都熟悉物理工程和
wiki 上的示例似乎工作得很好,但是我的问题更多是关于如何实现此结果以及如何使用 Eclipsify util 最终将项目(带有子项目)导入 Eclipse。 https://github.com/
我玩了一下占位符,发现了一个奇怪的情况: val integers = Seq(1, 2) val f = (x:Int) => x + 1 integers.map((_, f(_))) 返回 Se
使用 Slick,您可以执行以下操作以从表中生成结果流: val q = for (e println(s"Event: $s") } 这将打印 events 中的所有事件表并在最后一行之后终止。
我想在我的 Scala 摆动应用程序中使用一棵树,但该组件在 API 中不可用。 是否包装了 JTree存在吗? 如果没有,你对制作有什么建议吗? 谢谢 最佳答案 即使您可以在 Scala 程序中直接
我一直在试图了解莫纳德州。虽然使用起来并不总是那么容易,但是它的用法并不多。但是,我发现有关Monad州的每一次讨论都具有基本相同的信息,而且总会有一些我不理解的地方。 以this帖子为例。作者具有以
拜托,对不起我的英语:(让我们通过例子来解释我的问题。我们有一个数组a: var a = Array(1,1,1,1,2) 我们可以: 过滤a: a.filter( _ a.coun
为什么queue.get( ) 返回空列表? class MyQueue{ var queue=List[Int](3,5,7) def get(){ this.queue.head
Scala 2.10引入了value classes。它们对于编写类型安全代码非常有用。此外,还有一些限制,其中一些将被编译器检测到,而某些则需要在运行时分配。 我想使用case class语法创建值
Tôi là một lập trình viên xuất sắc, rất giỏi!