sách gpt4 ai đã đi

hadoop - Spark 写入 hdfs 不使用 saveAsNewAPIHadoopFile 方法

In lại 作者:可可西里 更新时间:2023-11-01 14:23:59 28 4
mua khóa gpt4 Nike

我在 CDH 5.2.0 上使用 Spark 1.1.0,并试图确保我可以读取和写入 hdfs。

我很快意识到 .textFile 和 .saveAsTextFile 调用旧的 api 并且似乎与我们的 hdfs 版本不兼容。

  def testHDFSReadOld(sc: SparkContext, readFile: String){
//THIS WILL FAIL WITH
//(TID 0, dl1rhd416.internal.edmunds.com): java.lang.IllegalStateException: unread block data
//java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2420)

sc.textFile(readFile).take(2).foreach(println)
}

def testHDFSWriteOld(sc: SparkContext, writeFile: String){
//THIS WILL FAIL WITH
//(TID 0, dl1rhd416.internal.edmunds.com): java.lang.IllegalStateException: unread block data
//java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2420)

sc.parallelize(List("THIS","ISCOOL")).saveAsTextFile(writeFile)
}

转移到新的 API 方法固定从 hdfs 读取!

  def testHDFSReadNew(sc: SparkContext, readFile: String){
//THIS WORKS
sc.newAPIHadoopFile(readFile, classOf[TextInputFormat], classOf[LongWritable],
classOf[Text],sc.hadoopConfiguration).map{
case (x:LongWritable, y: Text) => y.toString
}.take(2).foreach(println)
}

所以看起来我正在取得进步。写入不再像上面那样因硬错误而退出,相反它似乎在工作。唯一的问题是目录中除了一个孤独的 SUCCESS 标志文件外,什么都没有。更令人困惑的是,日志显示数据正在写入 _temporary 目录。文件提交者似乎从未意识到需要将文件从 _temporary 目录移动到输出目录。

  def testHDFSWriteNew(sc: SparkContext, writeFile: String){
/*This will have an error message of:
INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(dl1rhd400.internal.edmunds.com,35927)
14/11/21 02:02:27 INFO ConnectionManager: Key not valid ? sun.nio.ch.SelectionKeyImpl@2281f1b2
14/11/21 02:02:27 INFO ConnectionManager: key already cancelled ? sun.nio.ch.SelectionKeyImpl@2281f1b2
java.nio.channels.CancelledKeyException
at org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:386)
at org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:139)

However lately it hasn't even had errors, symptoms are no part files in the directory but a success flag is there
*/
val conf = sc.hadoopConfiguration
conf.set("mapreduce.task.files.preserve.failedtasks", "true")
conf.set("mapred.output.dir", writeFile)
sc.parallelize(List("THIS","ISCOOL")).map(x => (NullWritable.get, new Text(x)))
.saveAsNewAPIHadoopFile(writeFile, classOf[NullWritable], classOf[Text], classOf[TextOutputFormat[NullWritable, Text]], conf)

}

当我在本地运行并指定 hdfs 路径时,文件在 hdfs 中显示正常。只有当我在我们的 spark 独立集群上运行时才会发生这种情况。

我按如下方式提交工作:spark-submit --deploy-mode client --master spark://sparkmaster --class driverclass driverjar

1 Câu trả lời

你能试试下面的代码吗?

import org.apache.hadoop.io._
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
val nums = sc.makeRDD(1 to 3).map(x => (new IntWritable(x), new Text("a" * x)))
nums.saveAsNewAPIHadoopFile[TextOutputFormat[IntWritable, Text]]("/data/newAPIHadoopFile")

以下代码也适用于我。

val x = sc.parallelize(List("THIS","ISCOOL")).map(x => (NullWritable.get, new Text(x)))
x.saveAsNewAPIHadoopFile("/data/nullwritable", classOf[NullWritable], classOf[Text], classOf[TextOutputFormat[NullWritable, Text]], sc.hadoopConfiguration)

[root@sparkmaster ~]# hadoop fs -cat/data/nullwritable/*

15/08/20 02:09:19 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

关于hadoop - Spark 写入 hdfs 不使用 saveAsNewAPIHadoopFile 方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27072911/

28 4 0
可可西里
Hồ sơ cá nhân

Tôi là một lập trình viên xuất sắc, rất giỏi!

Nhận phiếu giảm giá Didi Taxi miễn phí
Mã giảm giá Didi Taxi
Giấy chứng nhận ICP Bắc Kinh số 000000
Hợp tác quảng cáo: 1813099741@qq.com 6ren.com