国产片侵犯亲女视频播放_亚洲精品二区_在线免费国产视频_欧美精品一区二区三区在线_少妇久久久_在线观看av不卡

服務器之家:專注于服務器技術及軟件下載分享
分類導航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術|正則表達式|C/C++|IOS|C#|Swift|Android|VB|R語言|JavaScript|易語言|vb.net|

服務器之家 - 編程語言 - Java教程 - Java和scala實現 Spark RDD轉換成DataFrame的兩種方法小結

Java和scala實現 Spark RDD轉換成DataFrame的兩種方法小結

2021-05-07 13:34黑白調92 Java教程

今天小編就為大家分享一篇Java和scala實現 Spark RDD轉換成DataFrame的兩種方法小結,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧

一:準備數據源

在項目下新建一個student.txt文件,里面的內容為:

?
1
2
3
4
1,zhangsan,20
2,lisi,21
3,wanger,19
4,fangliu,18

二:實現

java版:

1.首先新建一個student的bean對象,實現序列化和tostring()方法,具體代碼如下:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package com.cxd.sql;
import java.io.serializable;
@suppresswarnings("serial")
public class student implements serializable {
 string sid;
 string sname;
 int sage;
 public string getsid() {
  return sid;
 }
 public void setsid(string sid) {
  this.sid = sid;
 }
 public string getsname() {
  return sname;
 }
 public void setsname(string sname) {
  this.sname = sname;
 }
 public int getsage() {
  return sage;
 }
 public void setsage(int sage) {
  this.sage = sage;
 }
 @override
 public string tostring() {
  return "student [sid=" + sid + ", sname=" + sname + ", sage=" + sage + "]";
 }
 
}
       

2.轉換,具體代碼如下

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package com.cxd.sql;
import java.util.arraylist;
import org.apache.spark.sparkconf;
import org.apache.spark.api.java.javardd;
import org.apache.spark.sql.dataset;
import org.apache.spark.sql.row;
import org.apache.spark.sql.rowfactory;
import org.apache.spark.sql.savemode;
import org.apache.spark.sql.sparksession;
import org.apache.spark.sql.types.datatypes;
import org.apache.spark.sql.types.structfield;
import org.apache.spark.sql.types.structtype;
public class txttoparquetdemo {
 public static void main(string[] args) {
  
  sparkconf conf = new sparkconf().setappname("txttoparquet").setmaster("local");
  sparksession spark = sparksession.builder().config(conf).getorcreate();
  reflecttransform(spark);//java反射
  dynamictransform(spark);//動態轉換
 }
 
 /**
  * 通過java反射轉換
  * @param spark
  */
 private static void reflecttransform(sparksession spark)
 {
  javardd<string> source = spark.read().textfile("stuinfo.txt").javardd();
  
  javardd<student> rowrdd = source.map(line -> {
   string parts[] = line.split(",");
   student stu = new student();
   stu.setsid(parts[0]);
   stu.setsname(parts[1]);
   stu.setsage(integer.valueof(parts[2]));
   return stu;
  });
  
  dataset<row> df = spark.createdataframe(rowrdd, student.class);
  df.select("sid", "sname", "sage").
  coalesce(1).write().mode(savemode.append).parquet("parquet.res");
 }
 /**
  * 動態轉換
  * @param spark
  */
 private static void dynamictransform(sparksession spark)
 {
  javardd<string> source = spark.read().textfile("stuinfo.txt").javardd();
  
  javardd<row> rowrdd = source.map( line -> {
   string[] parts = line.split(",");
   string sid = parts[0];
   string sname = parts[1];
   int sage = integer.parseint(parts[2]);
   
   return rowfactory.create(
     sid,
     sname,
     sage
     );
  });
  
  arraylist<structfield> fields = new arraylist<structfield>();
  structfield field = null;
  field = datatypes.createstructfield("sid", datatypes.stringtype, true);
  fields.add(field);
  field = datatypes.createstructfield("sname", datatypes.stringtype, true);
  fields.add(field);
  field = datatypes.createstructfield("sage", datatypes.integertype, true);
  fields.add(field);
  
  structtype schema = datatypes.createstructtype(fields);
  
  dataset<row> df = spark.createdataframe(rowrdd, schema);
  df.coalesce(1).write().mode(savemode.append).parquet("parquet.res1");
  
  
 }
 
}

scala版本:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import org.apache.spark.sql.sparksession
import org.apache.spark.sql.types.stringtype
import org.apache.spark.sql.types.structfield
import org.apache.spark.sql.types.structtype
import org.apache.spark.sql.row
import org.apache.spark.sql.types.integertype
object rdd2dataset {
 
 case class student(id:int,name:string,age:int)
 def main(args:array[string])
 {
 
 val spark=sparksession.builder().master("local").appname("rdd2dataset").getorcreate()
 import spark.implicits._
 reflectcreate(spark)
 dynamiccreate(spark)
 }
 
 /**
     * 通過java反射轉換
     * @param spark
     */
 private def reflectcreate(spark:sparksession):unit={
 import spark.implicits._
 val sturdd=spark.sparkcontext.textfile("student2.txt")
 //todf()為隱式轉換
 val studf=sturdd.map(_.split(",")).map(parts⇒student(parts(0).trim.toint,parts(1),parts(2).trim.toint)).todf()
 //studf.select("id","name","age").write.text("result") //對寫入文件指定列名
 studf.printschema()
 studf.createorreplacetempview("student")
 val namedf=spark.sql("select name from student where age<20")
 //namedf.write.text("result") //將查詢結果寫入一個文件
 namedf.show()
 }
 
 /**
     * 動態轉換
     * @param spark
     */
 private def dynamiccreate(spark:sparksession):unit={
 val sturdd=spark.sparkcontext.textfile("student.txt")
 import spark.implicits._
 val schemastring="id,name,age"
 val fields=schemastring.split(",").map(fieldname => structfield(fieldname, stringtype, nullable = true))
 val schema=structtype(fields)
 val rowrdd=sturdd.map(_.split(",")).map(parts⇒row(parts(0),parts(1),parts(2)))
 val studf=spark.createdataframe(rowrdd, schema)
  studf.printschema()
 val tmpview=studf.createorreplacetempview("student")
 val namedf=spark.sql("select name from student where age<20")
 //namedf.write.text("result") //將查詢結果寫入一個文件
 namedf.show()
 }
}

注:

1.上面代碼全都已經測試通過,測試的環境為spark2.1.0,jdk1.8。

2.此代碼不適用于spark2.0以前的版本。

以上這篇java和scala實現 spark rdd轉換成dataframe的兩種方法小結就是小編分享給大家的全部內容了,希望能給大家一個參考,也希望大家多多支持服務器之家。

原文鏈接:https://blog.csdn.net/u010592112/article/details/73730796

延伸 · 閱讀

精彩推薦
Weibo Article 1 Weibo Article 2 Weibo Article 3 Weibo Article 4 Weibo Article 5 Weibo Article 6 Weibo Article 7 Weibo Article 8 Weibo Article 9 Weibo Article 10 Weibo Article 11 Weibo Article 12 Weibo Article 13 Weibo Article 14 Weibo Article 15 Weibo Article 16 Weibo Article 17 Weibo Article 18 Weibo Article 19 Weibo Article 20 Weibo Article 21 Weibo Article 22 Weibo Article 23 Weibo Article 24 Weibo Article 25 Weibo Article 26 Weibo Article 27 Weibo Article 28 Weibo Article 29 Weibo Article 30 Weibo Article 31 Weibo Article 32 Weibo Article 33 Weibo Article 34 Weibo Article 35 Weibo Article 36 Weibo Article 37 Weibo Article 38 Weibo Article 39 Weibo Article 40
主站蜘蛛池模板: 久久精品亚洲精品 | 精品人成 | 日韩在线成人 | 欧美在线播放一区 | 日韩高清在线一区 | 欧美资源在线 | 免费一级片免费一级片 | 成人在线播放 | 国产a区 | 日韩国产一区二区 | 欧美一区二区在线观看 | 三级黄色片在线观看 | 欧美精品一区二区三区蜜桃视频 | 日韩三区 | 精品成人国产在线观看男人呻吟 | 另类国产ts人妖高潮系列视频 | 欧美日韩亚洲视频 | 在线视频 亚洲 | 久久久久久久久久久久久久av | 亚洲国产一区视频 | 久久久久久成人 | 精品国产不卡一区二区三区 | 久久伊人久久 | 中文字幕成人av | 中国大陆高清aⅴ毛片 | 久热中文在线 | 日韩在线观看一区 | 中文字幕综合 | 大香伊蕉在人线视频777 | 国产精品视屏 | 亚洲一区二区 | 成人h在线 | 亚洲精品在线免费看 | av手机在线电影 | 久久综合欧美 | 久热精品免费视频 | 一区二区三区免费 | 日韩国伦理久久一区 | 中文字幕在线资源 | 精品国产乱码一区二区三区 | 91视频一88av |