sách gpt4 ai đã đi

Spark/scala 中的 SQL 查询大小超过 Integer.MAX_VALUE

In lại 作者:行者123 更新时间:2023-12-03 10:33:37 27 4
mua khóa gpt4 Nike

我正在尝试使用 Spark 在 S3 事件上创建一个简单的 sql 查询。我正在加载 ~30GB 的 JSON 文件,如下所示:

val d2 = spark.read.json("s3n://myData/2017/02/01/1234");
d2.persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK);
d2.registerTempTable("d2");

然后我试图写入文件我的查询结果:
val users_count = sql("select count(distinct data.user_id) from d2");
users_count.write.format("com.databricks.spark.csv").option("header", "true").save("s3n://myfolder/UsersCount.csv");

但 Spark 抛出以下异常:
java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:869)
at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:103)
at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:91)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1287)
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:105)
at org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:439)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:672)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

请注意,相同的查询适用于更少量的数据。这里有什么问题?

1 Câu trả lời

没有 Spark shuffle 块可以大于 2GB(Integer.MAX_VALUE 字节),因此您需要更多/更小的分区。

您应该调整 spark.default.parallelism 和 spark.sql.shuffle.partitions(默认为 200),以便分区数量可以容纳您的数据而不会达到 2GB 的限制(您可以尝试瞄准 256MB/分区,因此对于 200GB,您将获得 800分区)。数千个分区很常见,所以不要害怕按照建议重新分区到 1000 个。

仅供引用,您可以使用 rdd.getNumPartitions(即 d2.rdd.getNumPartitions)之类的内容检查 RDD 的分区数

有一个故事来跟踪解决各种 2GB 限制(现已开放一段时间)的努力:https://issues.apache.org/jira/browse/SPARK-6235

Nhìn thấy http://www.slideshare.net/cloudera/top-5-mistakes-to-avoid-when-writing-apache-spark-applications/25有关此错误的更多信息。

关于Spark/scala 中的 SQL 查询大小超过 Integer.MAX_VALUE,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42247630/

27 4 0
行者123
Hồ sơ cá nhân

Tôi là một lập trình viên xuất sắc, rất giỏi!

Nhận phiếu giảm giá Didi Taxi miễn phí
Mã giảm giá Didi Taxi
Giấy chứng nhận ICP Bắc Kinh số 000000
Hợp tác quảng cáo: 1813099741@qq.com 6ren.com