Spark SQL支持两种不一样的方法将现有的RDDs转换为数据集。java
第一个方法:使用反射来推断包含特定对象类型的RDD的模式。这种基于反射的方法使代码更加简洁,而且当您在编写Spark应用程序时已经了解了模式时,它能够很好地工做。sql
第一种方法代码实例java版本实现:apache
数据准备studentDatatxt
api
1001,20,zhangsan1002,17,lisi1003,24,wangwu1004,16,zhaogang
本地模式代码实现:ide
package com.unicom.ljs.spark220.study;
import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.Function;import org.apache.spark.api.java.function.VoidFunction;import org.apache.spark.sql.Dataset;import org.apache.spark.sql.Row;import org.apache.spark.sql.SQLContext;
/** * @author: Created By lujisen * @company ChinaUnicom Software JiNan * @date: 2020-01-20 08:58 * @version: v1.0 * @description: com.unicom.ljs.spark220.study */public class RDD2DataFrameReflect { public static void main(String[] args) { SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD2DataFrameReflect"); JavaSparkContext sc = new JavaSparkContext(sparkConf); SQLContext sqlContext=new SQLContext(sc);
JavaRDD<String> lines = sc.textFile("C:\\Users\\Administrator\\Desktop\\studentData.txt"); JavaRDD<Student2> studentRDD = lines.map(new Function<String, Student2>() { @Override public Student2 call(String line) throws Exception { String[] split = line.split(","); Student2 student=new Student2(); student.setId(Integer.valueOf(split[0])); student.setAge(Integer.valueOf(split[1])); student.setName(split[2]); return student; } }); //使用反射方式将RDD转换成dataFrame //将Student.calss传递进去,其实就是利用反射的方式来建立DataFrame Dataset<Row> dataFrame = sqlContext.createDataFrame(studentRDD, Student2.class); //拿到DataFrame以后将其注册为临时表,而后针对其中的数据执行SQL语句 dataFrame.registerTempTable("studentTable");
//针对student临时表,执行sql语句查询年龄小于18岁的学生, /*DataFrame rowDF */ Dataset<Row> dataset = sqlContext.sql("select * from studentTable where age < 18"); JavaRDD<Row> rowJavaRDD = dataset.toJavaRDD(); JavaRDD<Student2> ageRDD = rowJavaRDD.map(new Function<Row, Student2>() { @Override public Student2 call(Row row) throws Exception { Student2 student = new Student2(); student.setId(row.getInt(0)); student.setAge(row.getInt(1)); student.setName(row.getString(2));
return student; } }); ageRDD.foreach(new VoidFunction<Student2>() { @Override public void call(Student2 student) throws Exception { System.out.println(student.toString()); } }); }}
Student2类:this
package com.unicom.ljs.spark220.study;
import java.io.Serializable;
/** * @author: Created By lujisen * @company ChinaUnicom Software JiNan * @date: 2020-01-20 08:57 * @version: v1.0 * @description: com.unicom.ljs.spark220.study */public class Student2 implements Serializable { int id; int age; String name;
public int getId() { return id; }
public void setId(int id) { this.id = id; }
public int getAge() { return age; }
public void setAge(int age) { this.age = age; }
public String getName() { return name; }
public void setName(String name) { this.name = name; }
@Override public String toString() { return "Student2{" + "id=" + id + ", age=" + age + ", name='" + name + '\'' + '}'; }}
pom.xml关键依赖:
spa
2.2.02.11.8
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>${spark.version}</version></dependency><dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>${spark.version}</version></dependency>