CFSDN nhấn mạnh vào giá trị tạo ra nguồn mở và chúng tôi cam kết xây dựng nền tảng chia sẻ tài nguyên để mọi nhân viên CNTT có thể tìm thấy thế giới tuyệt vời của bạn tại đây.
Bài viết trên blog CFSDN này có một số phương pháp tạo DataFrame bằng pyspark được tác giả sưu tầm và biên soạn. Nếu bạn quan tâm đến bài viết này thì nhớ like nhé.
pyspark tạo DataFrame
Để thuận tiện cho việc vận hành, khi sử dụng pyspark, chúng ta thường chuyển đổi dữ liệu sang dạng DataFrame để hoàn thành các thao tác dọn dẹp và phân tích.
RDD và DataFrame
Trong bài viết trước về các thao tác cơ bản của pyspark đã đề cập rằng RDD cũng là một đối tượng dữ liệu phân tán cho các thao tác trong spark.
Dưới đây là cái nhìn ngắn gọn về các loại RDD và DataFrame.
?
1
2
|
in
(
kiểu
(ngày)
in
(
kiểu
(trích dẫn))
|
Sau khi xem qua các định nghĩa về mã nguồn, chúng ta có thể thấy giữa chúng không có mối quan hệ kế thừa nào.
?
1
2
3
4
5
6
7
|
lớp học
Khung dữ liệu(
sự vật
):
|
RDD là tập dữ liệu phân tán linh hoạt, tính trừu tượng cơ bản trong Spark. Đại diện cho một bộ sưu tập lưu trữ được phân vùng, bất biến, có thể hoạt động song song. DataFrame là một bộ sưu tập phân tán nhóm dữ liệu theo các cột tương đương với một bảng quan hệ trong Spark SQL. Điều tương tự là chúng đều được thiết kế để hỗ trợ tính toán phân tán.
Nhưng RDD chỉ là một tập hợp các phần tử, còn DataFrame được nhóm theo các cột, tương tự như các bảng của MySQL hoặc DataFrame trong gấu trúc.

Trong công việc thực tế, chúng tôi vẫn sử dụng DataFrame nhiều hơn.
Tạo DataFrame bằng cách sử dụng bộ dữ liệu
Hãy thử trường hợp đầu tiên và thấy rằng nếu bạn chỉ chuyển vào bộ dữ liệu, kết quả sẽ không có tên cột. Vì vậy, chúng tôi thử tùy chọn thứ hai, chuyển cả bộ dữ liệu và tên cột.
?
1
2
3
4
5
6
7
8
|
Một
=
[(
'Alice'
,
1
)]
đầu ra
=
spark.createDataFrame(a).collect()
in
(đầu ra)
đầu ra
=
spark.createDataFrame(a, [
'tên'
,
'tuổi'
]).sưu tầm()
in
(đầu ra)
|
Ở đây coll() hiển thị từng hàng của bảng dữ liệu hoặc bạn có thể sử dụng show() để hiển thị bảng dữ liệu.
?
1
2
3
4
5
6
7
8
9
10
11
12
13
|
spark.createDataFrame(a).show()
spark.createDataFrame(a, [
'tên'
,
'tuổi'
]).trình diễn()
|
Tạo DataFrame bằng cặp khóa-giá trị
?
1
2
3
4
5
|
ngày
=
[{
'tên'
:
'Alice'
,
'tuổi'
:
1
}]
đầu ra
=
spark.createDataFrame(d).collect()
in
(đầu ra)
|
Tạo DataFrame bằng rdd
?
1
2
3
4
5
6
7
8
9
|
Một
=
[(
'Alice'
,
1
)]
rdd
=
sc.song song hóa(a)
đầu ra
=
spark.createDataFrame(rdd).collect()
in
(đầu ra)
đầu ra
=
spark.createDataFrame(rdd, [
"tên"
,
"tuổi"
]).sưu tầm()
in
(đầu ra)
|
Tạo DataFrame dựa trên rdd và ROW
?
1
2
3
4
5
6
7
8
9
10
11
|
từ
pyspark.sql
nhập khẩu
Hàng ngang
Một
=
[(
'Alice'
,
1
)]
rdd
=
sc.song song hóa(a)
Người
=
Hàng ngang(
"tên"
,
"tuổi"
)
người
=
ngày.
bản đồ
(
lambda
r: Người(
*
r))
đầu ra
=
spark.createDataFrame(người).collect()
in
(đầu ra)
|
Tạo DataFrame dựa trên rdd và StructType
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
từ
pyspark.sql.types
nhập khẩu
*
Một
=
[(
'Alice'
,
1
)]
rdd
=
sc.song song hóa(a)
sơ đồ
=
Kiểu cấu trúc(
[
Trường cấu trúc(
"tên"
, Kiểu chuỗi(),
ĐÚNG VẬY
),
Trường cấu trúc(
"tuổi"
, Kiểu số nguyên(),
ĐÚNG VẬY
)
]
)
đầu ra
=
spark.createDataFrame(rdd, lược đồ).collect()
in
(đầu ra)
|
Tạo DataFrame pyspark dựa trên DataFrame của gấu trúc
df.toPandas() có thể chuyển đổi DataFrame pyspark thành DataFrame của gấu trúc.
?
1
2
3
4
5
6
7
8
9
10
|
df
=
spark.createDataFrame(rdd, [
'tên'
,
'tuổi'
])
in
(trích dẫn)
in
(
kiểu
(df.toPandas()))
đầu ra
=
spark.createDataFrame(df.toPandas()).collect()
in
(đầu ra)
|
Tạo một DataFrame có thứ tự
?
1
2
3
4
5
6
7
|
đầu ra
=
tia lửa.
phạm vi
(
1
,
7
,
2
).sưu tầm()
in
(đầu ra)
đầu ra
=
tia lửa.
phạm vi
(
3
).sưu tầm()
in
(đầu ra)
|
Nhận DataFrame thông qua bảng tạm thời.
?
1
2
3
4
5
|
spark.registerDataFrameAsTable(df,
"bảng1"
)
df2
=
spark.bảng(
"bảng1"
)
b
=
df.thu thập()
=
=
df2.thu thập()
in
(b)
|
Định cấu hình DataFrame và bảng tạm thời
Chỉ định loại cột khi tạo DataFrame
Các loại cột có thể được chỉ định trong createDataFrame và chỉ những cột đáp ứng loại dữ liệu đó mới được giữ lại. Nếu không có cột nào thỏa mãn loại dữ liệu đó thì sẽ xảy ra lỗi.
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
Một
=
[(
'Alice'
,
1
)]
rdd
=
sc.song song hóa(a)
đầu ra
=
spark.createDataFrame(rdd,
"a: chuỗi, b: số nguyên"
).sưu tầm()
in
(đầu ra)
rdd
=
ngày.
bản đồ
(
lambda
hàng: hàng[
1
])
in
(ngày)
đầu ra
=
spark.createDataFrame(rdd,
"số nguyên"
).sưu tầm()
in
(đầu ra)
đầu ra
=
spark.createDataFrame(rdd,
"boolean"
).sưu tầm()
|
Đăng ký DataFrame làm bảng tạm thời
?
1
2
|
spark.registerDataFrameAsTable(df,
"bảng1"
)
spark.dropTempTable(
"bảng1"
)
|
Nhận và sửa đổi cấu hình
?
1
2
3
4
|
in
(spark.getConf(
"spark.sql.shuffle.partitions"
))
in
(spark.getConf(
"spark.sql.shuffle.partitions"
, TRONG
"10"
))
in
(spark. setConf(
"spark.sql.shuffle.partitions"
, TRONG
"50"
))
in
(spark.getConf(
"spark.sql.shuffle.partitions"
, TRONG
"10"
))
|
Đăng ký một chức năng tùy chỉnh
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
spark.registerFunction(
"chuỗiChuỗiChiều Dài"
,
lambda
x:
chỉ một
(x))
đầu ra
=
spark.sql(
"CHỌN stringLengthString('test')"
).sưu tầm()
in
(đầu ra)
spark.registerFunction(
"chuỗiChuỗiChiều Dài"
,
lambda
x:
chỉ một
(x), Kiểu số nguyên())
đầu ra
=
spark.sql(
"CHỌN stringLengthString('test')"
).sưu tầm()
in
(đầu ra)
spark.udf.register(
"chuỗiLengthInt"
,
lambda
x:
chỉ một
(x), Kiểu số nguyên())
đầu ra
=
spark.sql(
"CHỌN stringLengthInt('kiểm tra')"
).sưu tầm()
in
(đầu ra)
|
Xem danh sách bảng tạm thời
Tất cả các tên bảng và đối tượng tạm thời có thể được xem.
?
1
2
3
4
5
6
7
8
9
10
|
spark.registerDataFrameAsTable(df,
"bảng1"
)
in
(spark.tableNames())
in
(spark.tables())
in
(
"bảng1"
TRONG
spark. tên bảng())
in
(
"bảng1"
TRONG
spark. tên bảng(
"mặc định"
))
spark.registerDataFrameAsTable(df,
"bảng1"
)
df2
=
spark.tables()
df2.
lọc
(
"tên bảng = 'bảng1'"
).Đầu tiên()
in
(df2)
|
Tạo DataFrame từ các nguồn dữ liệu khác
MySQL
Tiền đề là bạn cần tải xuống gói jar. Trình kết nối Mysql-java.jar.
?
1
2
3
4
5
6
7
8
9
10
11
12
13
|
từ
pyspark
nhập khẩu
SparkContext
từ
pyspark.sql
nhập khẩu
Ngữ cảnh SQL
nhập khẩu
pyspark.sql.functions như F
sc
=
SparkContext(
"địa phương"
, tên ứng dụng
=
"kiểm tra mysql"
)
sqlBối cảnh
=
Ngữ cảnh SQL(sc)
df
=
sqlContext. đọc.
định dạng
(
"jdbc"
).tùy chọn(
địa chỉ
=
"jdbc:mysql://localhost:3306/mydata?user=root&password=mysql&"
"useUnicode=true&characterEncoding=utf-8&useJDBCCompliantTimezoneShift=true&"
"useLegacyDatetimeCode=false&serverTimezone=UTC "
, bảng cơ sở dữ liệu
=
"dữ liệu chi tiết"
).trọng tải()
df.hiển thị(n
=
5
)
sc.dừng()
|
Thẩm quyền giải quyết.
Sự khác biệt giữa RDD và DataFrame tạo ra bản dịch tài liệu chính thức pyspark.sql.SQLContext.
Đến đây là kết thúc bài viết về một số phương pháp tạo DataFrame bằng pyspark. Để biết thêm nội dung liên quan đến tạo DataFrame bằng pyspark, vui lòng tìm kiếm các bài viết trước của tôi hoặc tiếp tục duyệt qua các bài viết liên quan bên dưới. Mong các bạn sẽ ủng hộ tôi trong thời gian tới! .
Liên kết gốc: https://blog.csdn.net/weixin_39198406/article/details/104916715.
Cuối cùng, bài viết này về một số phương pháp tạo DataFrame bằng pyspark kết thúc tại đây. Nếu bạn muốn biết thêm về một số phương pháp tạo DataFrame bằng pyspark, vui lòng tìm kiếm bài viết CFSDN hoặc tiếp tục duyệt qua các bài viết liên quan. tương lai blog của tôi! .
Tôi là một lập trình viên xuất sắc, rất giỏi!