Map和List数据类型以parquet格式存储

本文知识点

  1. 将文本内容转为parquet格式
  2. List,Map数据类型转成parquet格式
  3. 查看parquet格式文件的方法
  4. 将parquet格式数据文件存到hive表

maven依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.3</version>
</dependency>


<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.7.3</version>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<version>2.7.3</version>
</dependency>

<!-- 输出parquet格式 -->
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-common</artifactId>
<version>1.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-encoding</artifactId>
<version>1.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-column</artifactId>
<version>1.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
<version>1.8.1</version>
</dependency>

MapReduce - java 代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package com.test.parquest;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.SimpleGroupFactory;
import org.apache.parquet.hadoop.ParquetOutputFormat;
import org.apache.parquet.hadoop.example.GroupWriteSupport;
import org.apache.parquet.schema.MessageType;

import java.io.IOException;

public class ParquetMRTest {

public static class MyMap extends
Mapper<LongWritable, Text, Text, Text> {
//不操作,直接传到reduce
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
int i = line.indexOf("|");
String id = line.substring(0, i);
context.write(new Text(id),new Text(line));
}
}

public static class MyReduce extends
Reducer<Text, Text, NullWritable, Group> {
private SimpleGroupFactory factory;

public void reduce(Text key, Iterable<Text> values,
Context context) throws IOException, InterruptedException {
String line = values.iterator().next().toString();
final String[] fields = line.split("\\|");
String name = fields[1];
Integer age = Integer.parseInt(fields[2]);
final String[] arr = fields[3].split(":");
String sub = arr[0];
int grade = Integer.parseInt(arr[1]);
final String[] hobbys = fields[4].split(",");

Group group = factory.newGroup()
.append("name", name)
.append("age", age);

Group hobbyGroup = group.addGroup("hobbyList");
for (String hobby : hobbys) {
hobbyGroup.append("hobby",hobby);
}
Group map1 = group.addGroup("map1");
Group key_value = map1.addGroup("key_value");//需要加上
key_value.append("key",sub);
key_value.append("value",grade);

context.write(null,group);
}

@Override
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
MessageType schema = GroupWriteSupport.getSchema(context.getConfiguration());
factory = new SimpleGroupFactory(schema);
}
}

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String writeSchema = "message example {\n" +
"required binary name;\n" +
"required int32 age;\n" +
"optional group hobbyList (LIST) {\n"+
"repeated binary hobby (UTF8);\n" +
"}\n"+
"optional group map1 (MAP) {\n"+
"repeated group key_value {\n"+
"required binary key (UTF8);\n"+
"optional int32 value;\n"+
"}\n"+
"}\n"+
"}";
//String writeSchema = "message example {\n" +
// "required binary name;\n" +
// "required int32 age;\n" +
// "optional group hobbyList (LIST) {\n"+
// "repeated binary hobby (UTF8);\n" +
// "}\n"+
// "optional group map1 (MAP) {\n"+
// "required binary key (UTF8);\n"+
// "optional int32 value;\n"+
// "}\n"+
//"}";

//注意点
//writeSchema 格式问题
// 1. group之间不需要加分号
// 2. group key_value 不加也可以生成 parquet文件,但是无法导入hive表中

conf.set("parquet.example.schema",writeSchema);

Job job = Job.getInstance(conf);
job.setJarByClass(ParquetMRTest.class);
job.setJobName("parquet");

String in = "input/student.txt";
String out = "output";

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

job.setOutputValueClass(Group.class);

job.setMapperClass(MyMap.class);
job.setReducerClass(MyReduce.class);

job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(ParquetOutputFormat.class);

FileInputFormat.addInputPath(job, new Path(in));
ParquetOutputFormat.setOutputPath(job, new Path(out));
ParquetOutputFormat.setWriteSupportClass(job, GroupWriteSupport.class);

boolean b = job.waitForCompletion(true);
System.out.println(b);
}
}

说明

  1. “input/student.txt” 为项目目录下的文件

    1
    2
    3
    4
    1|Tom|23|语文:100|打球,打游戏,游泳
    2|Deng|17|数学:99|唱歌,跳舞
    3|Bill|22|体育:88|睡觉
    4|Mtthew|18|英语:77|coding,play ball,睡觉,跑步
  2. 输出文件 out/part-r-00000.parquet

查看parquet格式文件

运行程序后,生成 part-r-00000.parquet,可用工具查看schema和内容
查看工具下载

将下载好的工具放在parquet文件同级目录下。
查看结构:

1
java -jar parquet-tools-1.6.0rc3-SNAPSHOT.jar schema -d part-r-00000.parquet |head -n 30

查看内容:

1
java -jar parquet-tools-1.6.0rc3-SNAPSHOT.jar head -n 4 part-r-00000.parquet

参考:Parquet 格式文件,查看Schema

将List和Map转为parquet格式

我们这儿只在reduce里面处理,schema定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
String writeSchema = "message example {\n" +
"required binary name;\n" +
"required int32 age;\n" +
"optional group hobbyList (LIST) {\n"+
"repeated binary hobby (UTF8);\n" +
"}\n"+
"optional group map1 (MAP) {\n"+
"repeated group key_value {\n"+
"required binary key (UTF8);\n"+
"optional int32 value;\n"+
"}\n"+
"}\n"+
"}";

schema书写规范

schema的书写需要非常规范,应该注意以下两点:

  1. group 之间不需要分号分隔
  2. Map 转为group时, key_value 不加也可以生成 parquet文件,但是无法导入hive表中
    像下面这样
    1
    2
    3
    4
    "optional group map1 (MAP) {\n"+
    "required binary key (UTF8);\n"+
    "optional int32 value;\n"+
    "}\n"+

生成的map格式如下:

1
2
3
map1:
.key = 语文
.value = 100

导入hive表时报错:

1
Failed with exception java.io.IOException:java.lang.ClassCastException: required binary key (UTF8) is not a group

这样写就OK了

1
2
3
4
5
6
"optional group map1 (MAP) {\n"+
"repeated group key_value {\n"+
"required binary key (UTF8);\n"+
"optional int32 value;\n"+
"}\n"+
"}\n"+

用查看器可以看到map是这样的结构:

1
2
3
4
map1:
.key_value:
..key = 语文
..value = 100

将生成的parquet格式文件导入hive表

  1. 建hive表
    1
    hive> CREATE TABLE dengmx.parquet_table(name string, age int, map1 map<string,int>,hobbyList array<string>)STORED AS PARQUET;

字段名称应该与schema里面一致。

查看hive表在hdfs上的位置:

1
hive> show create table parquet_table;

可以看到如下:

1
2
LOCATION
'hdfs://localhost:9000/user/hive/warehouse/dengmx.db/parquet_table'

上传parquet文件至表目录下:

1
hdfs dfs -put /Users/apple/Documents/tmp/part-r-00000.parquet /user/hive/warehouse/dengmx.db/parquet_table

查看

1
2
3
4
5
6
hive> select * from parquet_table;
OK
Tom 23 {"语文":100} ["打球","打游戏","游泳"]
Deng 17 {"数学":99} ["唱歌","跳舞"]
Bill 22 {"体育":88} ["睡觉"]
Mtthew 18 {"英语":77} ["coding","play ball","睡觉","跑步"]