第二个是表生成函数(UDTF):json_tuple(string jsonstr,p1,p2,…,pn) 本函数可以接受多个标签名称,对输入的JSON字符串进行处理,这个和get_json_object这个UDF类似,不过更高效,其通过一次调用就可以获得多个键值,例:select b.* from test_json a lateral view json_tuple(a.id,’id’,’name’) b as f1,f2;通过lateral view行转列。
最理想的方式就是能有一种JSON SERDE,只要我们LOAD完数据,就直接可以select * from test,而不是select get_json_object这种方式来获取,N个字段就要解析N次,效率太低了。
[java]
view plain
copy
1. package com.besttone.hive.serde;
2.
3. import java.util.ArrayList;
4. import java.util.Arrays;
5. import java.util.HashMap;
6. import java.util.List;
7. import java.util.Map;
8. import java.util.Properties;
9.
10. import org.apache.hadoop.conf.Configuration;
11. import org.apache.hadoop.hive.serde.serdeConstants;
12. import org.apache.hadoop.hive.serde2.SerDe;
13. import org.apache.hadoop.hive.serde2.SerDeException;
14. import org.apache.hadoop.hive.serde2.SerDeStats;
15. import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
16. import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
17. import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
18. import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
19. import org.apache.hadoop.hive.serde2.objectinspector.StructField;
20. import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
21. import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
22. import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
23. import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
24. import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
25. import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
26. import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
27. import org.apache.hadoop.io.Text;
28. import org.apache.hadoop.io.Writable;
29. import org.codehaus.jackson.map.ObjectMapper;
30.
31. /**
32. * This SerDe can be used for processing JSON data in Hive. It supports
33. * arbitrary JSON data, and can handle all Hive types except for UNION. However,
34. * the JSON data is expected to be a series of discrete records, rather than a
35. * JSON array of objects.
36. *
37. * The Hive table is expected to contain columns with names corresponding to
38. * fields in the JSON data, but it is not necessary for every JSON field to have
39. * a corresponding Hive column. Those JSON fields will be ignored during
40. * queries.
41. *
42. * Example:
43. *
44. * { "a": 1, "b": [ "str1", "str2" ], "c": { "field1": "val1" } }
45. *
46. * Could correspond to a table:
47. *
48. * CREATE TABLE foo (a INT, b ARRAY, c STRUCT);
49. *
50. * JSON objects can also interpreted as a Hive MAP type, so long as the keys and
51. * values in the JSON object are all of the appropriate types. For example, in
52. * the JSON above, another valid table declaraction would be:
53. *
54. * CREATE TABLE foo (a INT, b ARRAY, c MAP);
55. *
56. * Only STRING keys are supported for Hive MAPs.
57. */
58. public class JSONSerDe implements SerDe {
59.
60. private StructTypeInfo rowTypeInfo;
61. private ObjectInspector rowOI;
62. private List colNames;
63. private List
我使用了一个STRUCT类型来保存requestParams的值,row format我们用的是自定义的json serde:com.besttone.hive.serde.JSONSerDe,SERDEPROPERTIES中,除了设置JSON对象的映射关系外,我还设置了一个自定义的参数:”input.invalid.ignore”=”true”,忽略掉所有非JSON格式的输入行。这里不是真正意义的忽略,只是非法行的每个输出字段都为NULL了,要在结果集上忽略,必须这样写:select * from test where requestUrl is not null;
OK表建好了,现在就差数据了,我们启动flumedemo的WriteLog,往hive表test目录下面输出一些日志数据,然后在进入hive客户端,select * from test;所以字段都正确的解析,大功告成。
第二个是表生成函数(UDTF):json_tuple(string jsonstr,p1,p2,…,pn) 本函数可以接受多个标签名称,对输入的JSON字符串进行处理,这个和get_json_object这个UDF类似,不过更高效,其通过一次调用就可以获得多个键值,例:select b.* from test_json a lateral view json_tuple(a.id,’id’,’name’) b as f1,f2;通过lateral view行转列。
最理想的方式就是能有一种JSON SERDE,只要我们LOAD完数据,就直接可以select * from test,而不是select get_json_object这种方式来获取,N个字段就要解析N次,效率太低了。
[java]
view plain
copy
1. package com.besttone.hive.serde;
2.
3. import java.util.ArrayList;
4. import java.util.Arrays;
5. import java.util.HashMap;
6. import java.util.List;
7. import java.util.Map;
8. import java.util.Properties;
9.
10. import org.apache.hadoop.conf.Configuration;
11. import org.apache.hadoop.hive.serde.serdeConstants;
12. import org.apache.hadoop.hive.serde2.SerDe;
13. import org.apache.hadoop.hive.serde2.SerDeException;
14. import org.apache.hadoop.hive.serde2.SerDeStats;
15. import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
16. import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
17. import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
18. import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
19. import org.apache.hadoop.hive.serde2.objectinspector.StructField;
20. import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
21. import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
22. import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
23. import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
24. import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
25. import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
26. import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
27. import org.apache.hadoop.io.Text;
28. import org.apache.hadoop.io.Writable;
29. import org.codehaus.jackson.map.ObjectMapper;
30.
31. /**
32. * This SerDe can be used for processing JSON data in Hive. It supports
33. * arbitrary JSON data, and can handle all Hive types except for UNION. However,
34. * the JSON data is expected to be a series of discrete records, rather than a
35. * JSON array of objects.
36. *
37. * The Hive table is expected to contain columns with names corresponding to
38. * fields in the JSON data, but it is not necessary for every JSON field to have
39. * a corresponding Hive column. Those JSON fields will be ignored during
40. * queries.
41. *
42. * Example:
43. *
44. * { "a": 1, "b": [ "str1", "str2" ], "c": { "field1": "val1" } }
45. *
46. * Could correspond to a table:
47. *
48. * CREATE TABLE foo (a INT, b ARRAY, c STRUCT);
49. *
50. * JSON objects can also interpreted as a Hive MAP type, so long as the keys and
51. * values in the JSON object are all of the appropriate types. For example, in
52. * the JSON above, another valid table declaraction would be:
53. *
54. * CREATE TABLE foo (a INT, b ARRAY, c MAP);
55. *
56. * Only STRING keys are supported for Hive MAPs.
57. */
58. public class JSONSerDe implements SerDe {
59.
60. private StructTypeInfo rowTypeInfo;
61. private ObjectInspector rowOI;
62. private List colNames;
63. private List row = new ArrayList();
64.
65. //遇到非JSON格式输入的时候的处理。
66. private boolean ignoreInvalidInput;
67.
68. /**
69. * An initialization function used to gather information about the table.
70. * Typically, a SerDe implementation will be interested in the list of
71. * column names and their types. That information will be used to help
72. * perform actual serialization and deserialization of data.
73. */
74. @Override
75. public void initialize(Configuration conf, Properties tbl)
76. throws SerDeException {
77. // 遇到无法转换成JSON对象的字符串时,是否忽略,默认不忽略,抛出异常,设置为true将跳过异常。
78. ignoreInvalidInput = Boolean.valueOf(tbl.getProperty(
79. "input.invalid.ignore", "false"));
80.
81. // Get a list of the table's column names.
82.
83. String colNamesStr = tbl.getProperty(serdeConstants.LIST_COLUMNS);
84. ","));
85.
86. // Get a list of TypeInfos for the columns. This list lines up with
87. // the list of column names.
88. String colTypesStr = tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES);
89. List colTypes = TypeInfoUtils
90. .getTypeInfosFromTypeString(colTypesStr);
91.
92. rowTypeInfo = (StructTypeInfo) TypeInfoFactory.getStructTypeInfo(
93. colNames, colTypes);
94. rowOI = TypeInfoUtils
95. .getStandardJavaObjectInspectorFromTypeInfo(rowTypeInfo);
96. }
97.
98. /**
99. * This method does the work of deserializing a record into Java objects
100. * that Hive can work with via the ObjectInspector interface. For this
101. * SerDe, the blob that is passed in is a JSON string, and the Jackson JSON
102. * parser is being used to translate the string into Java objects.
103. *
104. * The JSON deserialization works by taking the column names in the Hive
105. * table, and looking up those fields in the parsed JSON object. If the
106. * value of the field is not a primitive, the object is parsed further.
107. */
108. @Override
109. public Object deserialize(Writable blob) throws SerDeException {
110. null;
111. row.clear();
112. try {
113. new ObjectMapper();
114. // This is really a Map. For more information about
115. // how
116. // Jackson parses JSON in this example, see
117. // http://wiki.fasterxml.com/JacksonDataBinding
118. class);
119. catch (Exception e) {
120. // 如果为true,不抛出异常,忽略该行数据
121. if (!ignoreInvalidInput)
122. throw new SerDeException(e);
123. else {
124. return null;
125. }
126.
127. }
128.
129. // Lowercase the keys as expected by hive
130. new HashMap();
131. for (Map.Entry entry : root.entrySet()) {
132. lowerRoot.put(((String) entry.getKey()).toLowerCase(),
133. entry.getValue());
134. }
135. root = lowerRoot;
136.
137. null;
138. for (String fieldName : rowTypeInfo.getAllStructFieldNames()) {
139. try {
140. TypeInfo fieldTypeInfo = rowTypeInfo
141. .getStructFieldTypeInfo(fieldName);
142. value = parseField(root.get(fieldName), fieldTypeInfo);
143. catch (Exception e) {
144. null;
145. }
146. row.add(value);
147. }
148. return row;
149. }
150.
151. /**
152. * Parses a JSON object according to the Hive column's type.
153. *
154. * @param field
155. * - The JSON object to parse
156. * @param fieldTypeInfo
157. * - Metadata about the Hive column
158. * @return - The parsed value of the field
159. */
160. private Object parseField(Object field, TypeInfo fieldTypeInfo) {
161. switch (fieldTypeInfo.getCategory()) {
162. case PRIMITIVE:
163. // Jackson will return the right thing in this case, so just return
164. // the object
165. if (field instanceof String) {
166. "n", "\\n");
167. }
168. return field;
169. case LIST:
170. return parseList(field, (ListTypeInfo) fieldTypeInfo);
171. case MAP:
172. return parseMap(field, (MapTypeInfo) fieldTypeInfo);
173. case STRUCT:
174. return parseStruct(field, (StructTypeInfo) fieldTypeInfo);
175. case UNION:
176. // Unsupported by JSON
177. default:
178. return null;
179. }
180. }
181.
182. /**
183. * Parses a JSON object and its fields. The Hive metadata is used to
184. * determine how to parse the object fields.
185. *
186. * @param field
187. * - The JSON object to parse
188. * @param fieldTypeInfo
189. * - Metadata about the Hive column
190. * @return - A map representing the object and its fields
191. */
192. private Object parseStruct(Object field, StructTypeInfo fieldTypeInfo) {
193. Map map = (Map) field;
194. ArrayList structTypes = fieldTypeInfo
195. .getAllStructFieldTypeInfos();
196. ArrayList structNames = fieldTypeInfo.getAllStructFieldNames();
197.
198. new ArrayList(structTypes.size());
199. for (int i = 0; i list = (ArrayList) field;
218. TypeInfo elemTypeInfo = fieldTypeInfo.getListElementTypeInfo();
219.
220. for (int i = 0; i map = (Map) field;
240. TypeInfo valueTypeInfo = fieldTypeInfo.getMapValueTypeInfo();
241.
242. for (Map.Entry entry : map.entrySet()) {
243. map.put(entry.getKey(), parseField(entry.getValue(), valueTypeInfo));
244. }
245. return map;
246. }
247.
248. /**
249. * Return an ObjectInspector for the row of data
250. */
251. @Override
252. public ObjectInspector getObjectInspector() throws SerDeException {
253. return rowOI;
254. }
255.
256. /**
257. * Unimplemented
258. */
259. @Override
260. public SerDeStats getSerDeStats() {
261. return null;
262. }
263.
264. /**
265. * JSON is just a textual representation, so our serialized class is just
266. * Text.
267. */
268. @Override
269. public Class extends Writable> getSerializedClass() {
270. return Text.class;
271. }
272.
273. /**
274. * This method takes an object representing a row of data from Hive, and
275. * uses the ObjectInspector to get the data for each column and serialize
276. * it. This implementation deparses the row into an object that Jackson can
277. * easily serialize into a JSON blob.
278. */
279. @Override
280. public Writable serialize(Object obj, ObjectInspector oi)
281. throws SerDeException {
282. Object deparsedObj = deparseRow(obj, oi);
283. new ObjectMapper();
284. try {
285. // Let Jackson do the work of serializing the object
286. return new Text(mapper.writeValueAsString(deparsedObj));
287. catch (Exception e) {
288. throw new SerDeException(e);
289. }
290. }
291.
292. /**
293. * Deparse a Hive object into a Jackson-serializable object. This uses the
294. * ObjectInspector to extract the column data.
295. *
296. * @param obj
297. * - Hive object to deparse
298. * @param oi
299. * - ObjectInspector for the object
300. * @return - A deparsed object
301. */
302. private Object deparseObject(Object obj, ObjectInspector oi) {
303. switch (oi.getCategory()) {
304. case LIST:
305. return deparseList(obj, (ListObjectInspector) oi);
306. case MAP:
307. return deparseMap(obj, (MapObjectInspector) oi);
308. case PRIMITIVE:
309. return deparsePrimitive(obj, (PrimitiveObjectInspector) oi);
310. case STRUCT:
311. return deparseStruct(obj, (StructObjectInspector) oi, false);
312. case UNION:
313. // Unsupported by JSON
314. default:
315. return null;
316. }
317. }
318.
319. /**
320. * Deparses a row of data. We have to treat this one differently from other
321. * structs, because the field names for the root object do not match the
322. * column names for the Hive table.
323. *
324. * @param obj
325. * - Object representing the top-level row
326. * @param structOI
327. * - ObjectInspector for the row
328. * @return - A deparsed row of data
329. */
330. private Object deparseRow(Object obj, ObjectInspector structOI) {
331. return deparseStruct(obj, (StructObjectInspector) structOI, true);
332. }
333.
334. /**
335. * Deparses struct data into a serializable JSON object.
336. *
337. * @param obj
338. * - Hive struct data
339. * @param structOI
340. * - ObjectInspector for the struct
341. * @param isRow
342. * - Whether or not this struct represents a top-level row
343. * @return - A deparsed struct
344. */
345. private Object deparseStruct(Object obj, StructObjectInspector structOI,
346. boolean isRow) {
347. new HashMap();
348. extends StructField> fields = structOI.getAllStructFieldRefs();
349. for (int i = 0; i ();
381. ObjectInspector mapValOI = mapOI.getMapValueObjectInspector();
382. Map, ?> fields = mapOI.getMap(obj);
383. for (Map.Entry, ?> field : fields.entrySet()) {
384. Object fieldName = field.getKey();
385. Object fieldObj = field.getValue();
386. map.put(fieldName, deparseObject(fieldObj, mapValOI));
387. }
388. return map;
389. }
390.
391. /**
392. * Deparses a list and its elements.
393. *
394. * @param obj
395. * - Hive object to deparse
396. * @param oi
397. * - ObjectInspector for the object
398. * @return - A deparsed object
399. */
400. private Object deparseList(Object obj, ListObjectInspector listOI) {
401. new ArrayList();
402. List> field = listOI.getList(obj);
403. ObjectInspector elemOI = listOI.getListElementObjectInspector();
404. for (Object elem : field) {
405. list.add(deparseObject(elem, elemOI));
406. }
407. return list;
408. }
409. }
我使用了一个STRUCT类型来保存requestParams的值,row format我们用的是自定义的json serde:com.besttone.hive.serde.JSONSerDe,SERDEPROPERTIES中,除了设置JSON对象的映射关系外,我还设置了一个自定义的参数:”input.invalid.ignore”=”true”,忽略掉所有非JSON格式的输入行。这里不是真正意义的忽略,只是非法行的每个输出字段都为NULL了,要在结果集上忽略,必须这样写:select * from test where requestUrl is not null;
OK表建好了,现在就差数据了,我们启动flumedemo的WriteLog,往hive表test目录下面输出一些日志数据,然后在进入hive客户端,select * from test;所以字段都正确的解析,大功告成。