sách gpt4 ai đã đi

hadoop - 以压缩文件作为输入运行 hadoop。 hadoop 读取的数据输入不按顺序。数字格式异常

In lại 作者:可可西里 更新时间:2023-11-01 14:53:57 27 4
mua khóa gpt4 Nike

在更改 mapred-site.xml 中的属性后,我给出了一个 tar.bz2 文件、.gz 和 tar.gz 文件作为输入。以上似乎都没有奏效。我假设这里发生的是 hadoop 作为输入读取的记录乱序,即。输入的一列是字符串,另一列是整数,但是由于一些乱序数据从压缩文件中读取它时,在某些时候hadoop将字符串部分读取为整数并生成非法格式异常。我只是个菜鸟。我想知道是配置有问题还是我的代码有问题。

core-site.xml中的属性是


io.compression.codecs
org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apac\
he.hadoop.io.compress.SnappyCodec

A list of the compression codec classes that can be used for compression/decompression.

mapred-site.xml 中的属性是


mapred.compress.map.output
true



mapred.map.output.compression.codec
org.apache.hadoop.io.compress.BZip2Codec


mapred.output.compression.type
BLOCK

这是我的代码

package org.myorg;

import java.io.IOException;
import java.util.*;
import org.apache.hadoop.util.NativeCodeLoader;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.io.compress.*;
import org.apache.hadoop.io.compress.BZip2Codec;

public class MySort{
public static class Map extends Mapper {
private final static IntWritable Marks = new IntWritable();
private Text name = new Text();
String one,two;
int num;
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
one=tokenizer.nextToken();
name.set(one);
if(tokenizer.hasMoreTokens())
two=tokenizer.nextToken();
num=Integer.parseInt(two);
Marks.set(num);
context.write(name, Marks);
}
}
}

public static class Reduce extends Reducer {

public void reduce(Text key, Iterable values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
context.write(key, new IntWritable(sum));
}
}

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
// conf.set("mapreduce.job.inputformat.class", "com.wizecommerce.utils.mapred.TextInputFormat");

// conf.set("mapreduce.job.outputformat.class", "com.wizecommerce.utils.mapred.TextOutputFormat");
// conf.setBoolean("mapreduce.map.output.compress",true);
conf.setBoolean("mapred.output.compress",true);
//conf.setBoolean("mapreduce.output.fileoutputformat.compress",false);
//conf.setBoolean("mapreduce.map.output.compress",true);
conf.set("mapred.output.compression.type", "BLOCK");
//conf.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class);
// conf.setClass("mapred.map.output.compression.codec", GzipCodec.class, CompressionCodec.class);
conf.setClass("mapred.map.output.compression.codec", BZip2Codec.class, CompressionCodec.class);
Job job = new Job(conf, "mysort");
job.setJarByClass(org.myorg.MySort.class);
job.setJobName("mysort");
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);

job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
// FileInputFormat.setCompressInput(job,true);
FileOutputFormat.setCompressOutput(job, true);
//FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
// conf.set("mapred.output.compression.type", CompressionType.BLOCK.toString());

FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.waitForCompletion(true);
}

}

这些都是放在一个makefile中的所有命令

run: all
-sudo ./a.out
sudo chmod 777 -R Data
-sudo rm data.tar.bz2
sudo tar -cvjf data.tar.bz2 Data/data.txt
sudo javac -classpath /home/hduser/12115_Select_Query/hadoop-core-1.1.2.jar -d mysort MySort.java
sudo jar -cvf mysort.jar -C mysort/ .
-hadoop fs -rmr MySort/output
-hadoop fs -rmr MySort/input
hadoop fs -mkdir MySort/input
hadoop fs -put data.tar.bz2 MySort/input
hadoop jar mysort.jar org.myorg.MySort MySort/input/ MySort/output
-sudo rm /home/hduser/Out/sort.txt
hadoop fs -copyToLocal MySort/output/part-r-00000 /home/hduser/Out/sort.txt
sudo gedit /home/hduser/Out/sort.txt

all: rdata.c
-sudo rm a.out
-gcc rdata.c -o a.out

exec: run

.PHONY: exec run

Đặt hàng:

hadoop jar mysort.jar org.myorg.MySort MySort/input/ MySort/output

Đây là kết quả đầu ra:

Java HotSpot(TM) 64-Bit Server VM warning: You have loaded library /usr/local/hadoop/lib/native/libhadoop.so.1.0.0 which might have disabled stack guard. The VM will try to fix the stack guard now.
It's highly recommended that you fix the library with 'execstack -c ', or link it with '-z noexecstack'.
14/06/25 11:20:28 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
14/06/25 11:20:28 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
14/06/25 11:20:29 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
14/06/25 11:20:29 INFO input.FileInputFormat: Total input paths to process : 1
14/06/25 11:20:29 INFO mapreduce.JobSubmitter: number of splits:1
14/06/25 11:20:29 INFO Configuration.deprecation: mapred.output.compress is deprecated. Instead, use mapreduce.output.fileoutputformat.compress
14/06/25 11:20:29 INFO Configuration.deprecation: mapred.map.output.compression.codec is deprecated. Instead, use mapreduce.map.output.compress.codec
14/06/25 11:20:29 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1403675322820_0001
14/06/25 11:20:30 INFO impl.YarnClientImpl: Submitted application application_1403675322820_0001
14/06/25 11:20:30 INFO mapreduce.Job: The url to track the job: http://localhost:8088/proxy/application_1403675322820_0001/
14/06/25 11:20:30 INFO mapreduce.Job: Running job: job_1403675322820_0001
14/06/25 11:20:52 INFO mapreduce.Job: Job job_1403675322820_0001 running in uber mode : false
14/06/25 11:20:52 INFO mapreduce.Job: map 0% reduce 0%
14/06/25 11:21:10 INFO mapreduce.Job: Task Id : attempt_1403675322820_0001_m_000000_0, Status : FAILED
Error: java.lang.NumberFormatException: For input string: "0ustar"
at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Integer.parseInt(Integer.java:580)
at java.lang.Integer.parseInt(Integer.java:615)
at org.myorg.MySort$Map.map(MySort.java:36)
at org.myorg.MySort$Map.map(MySort.java:23)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:340)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:163)

14/06/25 11:21:29 INFO mapreduce.Job: Task Id : attempt_1403675322820_0001_m_000000_1, Status : FAILED
Error: java.lang.NumberFormatException: For input string: "0ustar"
at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Integer.parseInt(Integer.java:580)
at java.lang.Integer.parseInt(Integer.java:615)
at org.myorg.MySort$Map.map(MySort.java:36)
at org.myorg.MySort$Map.map(MySort.java:23)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:340)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:163)

14/06/25 11:21:49 INFO mapreduce.Job: Task Id : attempt_1403675322820_0001_m_000000_2, Status : FAILED
Error: java.lang.NumberFormatException: For input string: "0ustar"
at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Integer.parseInt(Integer.java:580)
at java.lang.Integer.parseInt(Integer.java:615)
at org.myorg.MySort$Map.map(MySort.java:36)
at org.myorg.MySort$Map.map(MySort.java:23)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:340)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:163)

14/06/25 11:22:10 INFO mapreduce.Job: map 100% reduce 100%
14/06/25 11:22:10 INFO mapreduce.Job: Job job_1403675322820_0001 failed with state FAILED due to: Task failed task_1403675322820_0001_m_000000
Job failed as tasks failed. failedMaps:1 failedReduces:0

14/06/25 11:22:10 INFO mapreduce.Job: Counters: 9
Job Counters
Failed map tasks=4
Launched map tasks=4
Other local map tasks=3
Data-local map tasks=1
Total time spent by all maps in occupied slots (ms)=69797
Total time spent by all reduces in occupied slots (ms)=0
Total time spent by all map tasks (ms)=69797
Total vcore-seconds taken by all map tasks=69797
Total megabyte-seconds taken by all map tasks=71472128

我也试过用这个:

hadoop jar /usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.3.0.jar -Dmapred.output.compress=true -Dmapred.compress.map.output=true -Dmapred.output.compression.codec=org.apache.hadoop.io.compress.BZip2Codec -Dmapred.reduce.tasks=0 -input MySort/input/data.txt -output MySort/zip1

创建压缩文件成功

hadoop fs -ls MySort/zip1

Found 3 items
-rw-r--r-- 1 hduser supergroup 0 2014-06-25 10:43 MySort/zip1/_SUCCESS
-rw-r--r-- 1 hduser supergroup 42488018 2014-06-25 10:43 MySort/zip1/part-00000.bz2
-rw-r--r-- 1 hduser supergroup 42504084 2014-06-25 10:43 MySort/zip1/part-00001.bz2

然后运行这个:

hadoop jar mysort.jar org.myorg.MySort MySort/input/ MySort/zip1

还是不行。我在这里遗漏了什么吗?

当我运行它而不使用压缩文件 bz2 并直接将文本文件 Data/data.txt 传递给它时它工作正常,即将它上传到 hdfs 中的 MySort/input (hadoop fs -put Data/data.txt MySort/input) .

感谢任何帮助

1 Câu trả lời

我为此做了一些工作。我使用了 Tool Runner。

package org.myorg;

import java.io.IOException;
import java.util.*;
import org.apache.hadoop.util.NativeCodeLoader;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.io.compress.*;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class ToolMapReduce extends Configured implements Tool
{


public static class Map extends Mapper
{
private final static IntWritable Marks = new IntWritable();
private Text name = new Text();
String one,two;
int num;
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
{
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens())
{
one=tokenizer.nextToken();
name.set(one);
if(tokenizer.hasMoreTokens())
two=tokenizer.nextToken();
num=Integer.parseInt(two);
Marks.set(num);
context.write(name, Marks);
}
}
}

public static class Reduce extends Reducer
{

public void reduce(Text key, Iterable values, Context context)
throws IOException, InterruptedException
{
int sum = 0;
for (IntWritable val : values)
{
sum += val.get();
}
context.write(key, new IntWritable(sum));
}
}

public static void main(String[] args) throws Exception
{
int res = ToolRunner.run(new Configuration(), new ToolMapReduce(), args);
System.exit(res);
}

public int run(String[] args) throws Exception
{

Configuration conf = this.getConf();
//Configuration conf = new Configuration();
//conf.setOutputFormat(SequenceFileOutputFormat.class);
//SequenceFileOutputFormat.setOutputCompressionType(conf, CompressionType.BLOCK);
//SequenceFileOutputFormat.setCompressOutput(conf, true);
//conf.set("mapred.output.compress","true");
// conf.set("mapred.output.compression","org.apache.hadoop.io.compress.SnappyCodec");

//conf.set("mapred.output.compression.codec","org.apache.hadoop.io.compress.SnappyCodec");
// conf.set("mapreduce.job.inputformat.class", "com.wizecommerce.utils.mapred.TextInputFormat");

// conf.set("mapreduce.job.outputformat.class", "com.wizecommerce.utils.mapred.TextOutputFormat");
// conf.setBoolean("mapreduce.map.output.compress",true);
conf.setBoolean("mapred.output.compress",true);
//conf.setBoolean("mapreduce.output.fileoutputformat.compress",false);
//conf.setBoolean("mapreduce.map.output.compress",true);
conf.set("mapred.output.compression.type", "BLOCK");
//conf.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class);
// conf.setClass("mapred.map.output.compression.codec", GzipCodec.class, CompressionCodec.class);
conf.setClass("mapred.map.output.compression.codec", GzipCodec.class, CompressionCodec.class);
Job job = new Job(conf, "mysort");
job.setJarByClass(org.myorg.ToolMapReduce.class);
//job.setJarByClass(org.myorg.MySort.class);
job.setJobName("mysort");
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);

job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
// FileInputFormat.setCompressInput(job,true);
FileOutputFormat.setCompressOutput(job, true);
//FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
// conf.set("mapred.output.compression.type", CompressionType.BLOCK.toString());

FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
return job.waitForCompletion(true) ? 0 : 1;
//job.waitForCompletion(true);
}


}

关于hadoop - 以压缩文件作为输入运行 hadoop。 hadoop 读取的数据输入不按顺序。数字格式异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24401674/

27 4 0
可可西里
Hồ sơ cá nhân

Tôi là một lập trình viên xuất sắc, rất giỏi!

Nhận phiếu giảm giá Didi Taxi miễn phí
Mã giảm giá Didi Taxi
Giấy chứng nhận ICP Bắc Kinh số 000000
Hợp tác quảng cáo: 1813099741@qq.com 6ren.com