pyspark基础函数
# -*- coding: utf-8 -*-
"""
Created on Sun Jul 3 15:38:33 2022
@author:
"""
import json
from pyspark import StorageLevel
from pyspark.sql import Window
from pyspark.sql import SparkSession
from pyspark.sql import function as F
from pyspark.sql.types import StringType, ArrayType, MapType
from pyspark.sql.column import Column, _to_seq, _to_java_column
class AoiProfile(object):
def __init__(self, city_code):
self.city_code = city_code
self.storage_level = StorageLevel.MEMORY_AND_DISK
self.path_csv = "path/to/abc.csv"
self.spark = SparkSession.\
builder.\
appName("aoi_profile").\
config("spark.some.config.option", "some-value").\
enableHiveSupport().\
getOrCreate()
self.spark.sparkContext.setLogLevel("ERROR")
self.udf_get_dict = F.udf(self.get_dict, MapType(KeyType=StringType(), valueType=StringType()))
def read_csv(self):
"""读取csv"""
return self.spark.read.options(header=True, sep="|").csv(self.path_csv)
def read_table(self, sql):
"""读取sql"""
return self.spark.sql(sql)
def stop_spark(self):
"""停止操作"""
self.spark.stop()
def print_count_show(self, df, df_name=""):
"""方便好用的打印脚本"""
print("打印...{0}".format(df_name))
print(df.printSchema())
print(df.count())
print(df.show(5))
def filter_null(self, df, column):
"""过滤空值,Null值,取值为Null的值,慎用"""
return df.\
filter(F.col(column).isNotNull()).\
filter(F.col(column) != "").\
filter(F.col(column) != "null")
def save_data(self, df, target_table):
"""保存到hdfs"""
hdfs_path = "hdfs://path/to/path_save.db/dir/path/"
count = df.count()
partition_num = count // 500000
partition_num = partition_num if partition_num !=0 else 1
df.repartition(partition_num).write.parquet(hdfs_path, mode='overwrite')
sql_alter = """alter table {0} add if not exists partition(city_code='{1}') location '{2}'"""\
.format(target_table, self.city_code, hdfs_path)
self.spark.sql(sql_alter)
def save_as_table(self, df):
"""保存到parquet"""
df.write.mode("overwrite").format("parquet").saveAsTable("dbA.tableB")
def csv_standard(self, df):
"""读取json"""
F_strToList = F.udf(self.strToList, ArrayType(MapType(StringType(), StringType())))
columns = df.columns
for column in columns:
df = df.withColumn(column, F_strToList(F.col(column)))
@staticmethod
def strToList(string):
try:
return list(json.loads(string))
except Exception:
return None
def df_persist(self, df):
"""缓存,在dataframe需要被多次利用的场合"""
return df.persist(self.storage_level)
def df_rename(self, df):
"""修改列名"""
return df.withColumnRenamed("col_old", "col_new")
def df_concat(self, df):
"""多列concat"""
return df.withColumn("col_concat", F.concat_ws("_@_", df.col1, df.col2, df.col3))
def df_join(self, df1, df2):
"""多个dataframe进行join"""
return df1.join(df2, "col_join", "left")
def group_by(self, df):
"""各类groupby操作"""
# 1.count / count distinct val by key
gp_key = df.groupBy("key").\
count().alias("count_key").\
agg(F.count("val")).alias("count_val").\
agg(F.countDistinct("val")).alias("count_distinct_val")
# 2.count distinct, sum, collects val by key1 and key2
gp_keys = df.groupBy("key1", "key2").\
agg(F.countDistinct("val")).alias("count_distinct_val").\
agg(F.sum("val")).alias("sum_val").\
agg(F.collect_set("val")).alias("set_val"). \
agg(F.collect_list("val")).alias("list_val")
# 3.make distribution
gp_key = gp_key.selectExpr("key", "struct(val1, val2, val3) as key_struct")
gp_key = gp_key.groupBy("key").agg(F.collect_list("key_struct")).alias("key_distribution")
return gp_key, gp_keys
@staticmethod
def udf_function(string1, string2):
"""字符串相加"""
return string1 + string2
def df_udf_function(self, df):
"""通过的函数相互作用生成一个新列"""
return df.withColumn("col_new", F.udf(self.udf_function, StringType())(F.col("col_old1"), F.col("col_old2")))
def df_explode_split(self, df):
"""将一列按字符串split后explode形成一个新列"""
return df.withColumn("col_new", F.explode(F.split(df["col_old"], "[|]")))
def df_window_partition(self, df):
"""按key1,key2列分组后,按另一列排序"""
w = Window.partitonBy("key1", "key2").orderBy(F.col("rank_col").asc())
df = df.withColumn("rank_number", F.row_number().over(w))
return df.where(F.col("rank_number")==1)
def df_filter_like(self, df):
"""过滤col1列中SQL-like(包含)str1,并且同时满足col2列中SQL-like(包含)str1的数据"""
return df.filter((df["col1"].like("str1")) & (df["col2"].like("str2")))
def df_filters(self, df):
"""过滤col1列取值为1,和col2列取值为2的数据"""
return df.filter((F.col("col1")=="1") | (F.col("col2")=="2"))
def df_dropna(self, df):
"""按列去重"""
return df.dropna(subset=["col1"])
def df_when_otherwise(self, df):
"""多列判断"""
# make a new col, when col1!=None, return col2-col3 otherwise return None
return df.withColumn("col_new", F.when(F.col("col1").isNotNull(), F.col("col2")-F.col("col3")).otherwise(None))
@staticmethod
def udf_get_dict(val1, val2):
return {"key1":val1, "key2":val2}
def df_get_dict(self, df):
"""udf函数对map类型进行拆分"""
return df.withColumn("col_new", self.udf_get_dict(F.col("col1"), F.col("col2")))
#python##如何看待2023届秋招#Python 文章被收录于专栏
Python由荷兰数学和计算机科学研究学会的吉多·范罗苏姆于1990年代初设计,作为一门叫做ABC语言的替代品。Python提供了高效的高级数据结构,还能简单有效地面向对象编程。Python语法和动态类型,以及解释型语言的本质,使它成为多数平台上写脚本和快速开发应用的编程语言,随着版本的不断更新和语言新功能的添加,逐渐被用于独立的、大型项目的开发
韶音科技公司氛围 647人发布