Hadoop多表关联
输入文件
address.txt factory.txt:
1 Beijing Beijing Red Star 1
2 Guangzhou Shenzhen Thunder 3
3 Shenzhen Guangzhou Honda 2
4 Xian Beijing Rising 1
Guangzhou Development Bank 2
Tencent 3
Back of Beijing 1
输出文件
factory city
Beijing Red Star Beijing
Shenzhen Thunder Shenzhen
Guangzhou Honda Guangzhou
Beijing Rising Beijing
Guangzhou DevelopmentBank Guangzhou
Tencent Shenzhen
Back of Beijing Beijing
设计思路
1:创建两个list,其中一个list存放factory,另一个存放city。
2:利用双重for循环,以编号相等的条件下,逐一筛选。
3:打出表头,输出。
代码
mapper
package FindConnection;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class Find2mapper extends Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
String line = value.toString();
String word[] = line.split("\n");
char at = word[0].charAt(0);
if(at>'0'&&at<'9')
{
context.write(new Text(word[0]), new Text("1"));
}
else{
context.write(new Text(word[0]), new Text("2"));
}
}
}
reducer
package FindConnection;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class Find2Reducer extends Reducer<Text, Text, Text, Text> {
int x = 0;
int i = 0;
int y = 0;
int time = 0;
List<String> list1 = new ArrayList<String>();
List<String> list2 = new ArrayList<String>();
@Override
protected void reduce(Text arg0, Iterable<Text> arg1,Context arg2) throws IOException,
InterruptedException {
if(time==0)
{
arg2.write(new Text("city---"), new Text("---factory")); //输出表头
time++;
}
int flag = 0;
for(Text flg:arg1)
{
String flgs = flg.toString();
flag = Integer.parseInt(flgs);
}
x++;
String word = arg0.toString();
if(flag==1)
{
list1.add(word);
i++;
}
else{
list2.add(word);
y++;
}
if(x==11) //当x=11的时候,是全部存入两个list表的时候。下面开始逐一筛选
{
for(int p=0;p<i;p++)
{
for(int m=0;m<y;m++)
{
String word1 = list1.get(p);
String word2 = list2.get(m);
String word11[] = word1.split(" ");
String word22[] = word2.split(" ");
int n = word22.length;
if(word11[0].equals(word22[n-1]))
{
String newword1 = new String();
for(int u=1;u<word11.length;u++)
{
newword1 = newword1 +" "+word11[u];
}
String newword2 = new String();
for(int o=0;o<word22.length-1;o++)
{
newword2 = newword2 +" "+word22[o];
}
arg2.write(new Text(newword1+"---"), new Text("---"+newword2));
}
}
}
}
}
}
main
package FindConnection;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class Find2Main {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(Find2Main.class);
job.setMapperClass(Find2mapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setReducerClass(Find2Reducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean xx = job.waitForCompletion(true);
}
}
- 点赞
- 收藏
- 关注作者
评论(0)