我使用以下代码计算数据帧所有行之间的余弦相似度:
from pyspark.ml.feature import Normalizer
from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix
normalizer = Normalizer(inputCol="features", outputCol="norm")
data = normalizer.transform(transformed_df)
data = index_df(data)
mat = IndexedRowMatrix(
data.select("id", "norm")\
.rdd.map(lambda row: IndexedRow(row.id, row.norm.toArray()))).toBlockMatrix()
dot = mat.multiply(mat.transpose())
indexed_dot = dot.toIndexedRowMatrix()
indexed_rdd = indexed_dot.rows
df = indexed_rdd.toDF()
当我使用数据帧的子集(100k 行)时,它可以工作,但当我尝试使用更多行(我的目标是 300k 行)时,我会收到下面的错误.
----> 1 df.write.mode('overwrite').parquet('some_path')
/usr/lib/spark/python/pyspark/sql/readwriter.py in parquet(self, path, mode, partitionBy, compression)
802 self.partitionBy(partitionBy)
803 self._set_opts(compression=compression)
--> 804 self._jwrite.parquet(path)
805
806 @since(1.6)
/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:
/usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling o303.parquet.
: org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:224)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:154)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:656)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:656)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:656)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:273)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:267)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:225)
at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:549)
tại sun.reflect.NativeMethodAccessorImpl.invoke0(Phương thức gốc)
tại sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
tại sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
tại java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
tại java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 62.0 failed 4 times, most recent failure: Lost task 3.3 in stage 62.0 (TID 1892, blabla-worker, executor 77): ExecutorLostFailure (executor 77 exited caused by one of the running tasks) Reason: Container marked as failed: container_1557859612139_0001_01_000086 on host: blabla-worker Exit status: 143. Diagnostics: [2019-05-14 19:19:23.665]Container killed on request. Exit code is 143
[2019-05-14 19:19:23.665]Container exited with a non-zero exit code 143.
[2019-05-14 19:19:23.665]Killed by external signal
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1661)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1649)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1648)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1648)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1882)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1831)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1820)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
tại org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
tại org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:194)
... 31 thêm
Nhiệm vụ dường như bị kẹt ở một mức độ cụ thể và thất bại nhiều lần nên người quản lý đã loại bỏ nó.
Bạn có biết làm thế nào tôi có thể giải quyết vấn đề này?
sử dụngnhật ký sợi -applicationId -containerId
Sau khi điều tra nhật ký, vấn đề dường như xuất phát từ một tác vụ liên tục bị lỗi. Spark thực hiện khả năng chịu lỗi và các tác vụ bị lặp lại, dẫn đến không đủ dung lượng ổ đĩa cho nhân viên của tôi (hơn 90%). Nút trở nên không khỏe mạnh và công việc cuối cùng thất bại.
Lý do tại sao nhiệm vụ thất bại vẫn còn là một bí ẩn. Tôi sẽ cập nhật nếu tôi biết chuyện gì đang xảy ra ở đó.
Tôi là một lập trình viên xuất sắc, rất giỏi!