pyspark基础函数


# -*- coding: utf-8 -*-
"""
Created on Sun Jul  3 15:38:33 2022

@author: 
"""

import json
from pyspark import StorageLevel
from pyspark.sql import Window
from pyspark.sql import SparkSession
from pyspark.sql import function as F
from pyspark.sql.types import StringType, ArrayType, MapType
from pyspark.sql.column import Column, _to_seq, _to_java_column


class AoiProfile(object):
    
    def __init__(self, city_code):

        self.city_code = city_code
        self.storage_level = StorageLevel.MEMORY_AND_DISK
        self.path_csv = "path/to/abc.csv"

        self.spark = SparkSession.\
            builder.\
            appName("aoi_profile").\
            config("spark.some.config.option", "some-value").\
            enableHiveSupport().\
            getOrCreate()
        self.spark.sparkContext.setLogLevel("ERROR")
        self.udf_get_dict = F.udf(self.get_dict, MapType(KeyType=StringType(), valueType=StringType()))


    def read_csv(self):
        """读取csv"""
        return self.spark.read.options(header=True, sep="|").csv(self.path_csv)

    def read_table(self, sql):
        """读取sql"""
        return self.spark.sql(sql)
    
    def stop_spark(self):
        """停止操作"""
        self.spark.stop()
        
    def print_count_show(self, df, df_name=""):
        """方便好用的打印脚本"""
        print("打印...{0}".format(df_name))
        print(df.printSchema())
        print(df.count())
        print(df.show(5))
    
    def filter_null(self, df, column):
        """过滤空值,Null值,取值为Null的值,慎用"""
        return df.\
            filter(F.col(column).isNotNull()).\
            filter(F.col(column) != "").\
            filter(F.col(column) != "null")

    def save_data(self, df, target_table):
        """保存到hdfs"""
        hdfs_path = "hdfs://path/to/path_save.db/dir/path/"
        count = df.count()
        partition_num = count // 500000
        partition_num = partition_num if partition_num !=0 else 1
        df.repartition(partition_num).write.parquet(hdfs_path, mode='overwrite')

        sql_alter = """alter table {0} add if not exists partition(city_code='{1}') location '{2}'"""\
            .format(target_table, self.city_code, hdfs_path)

        self.spark.sql(sql_alter)

    def save_as_table(self, df):
        """保存到parquet"""
        df.write.mode("overwrite").format("parquet").saveAsTable("dbA.tableB")

    def csv_standard(self, df):
        """读取json"""
        F_strToList = F.udf(self.strToList, ArrayType(MapType(StringType(), StringType())))
        columns = df.columns
        for column in columns:
            df = df.withColumn(column, F_strToList(F.col(column)))

    @staticmethod
    def strToList(string):
        try:
            return list(json.loads(string))
        except Exception:
            return None

    def df_persist(self, df):
        """缓存,在dataframe需要被多次利用的场合"""
        return df.persist(self.storage_level)

    def df_rename(self, df):
        """修改列名"""
        return df.withColumnRenamed("col_old", "col_new")

    def df_concat(self, df):
        """多列concat"""
        return df.withColumn("col_concat", F.concat_ws("_@_", df.col1, df.col2, df.col3))

    def df_join(self, df1, df2):
        """多个dataframe进行join"""
        return df1.join(df2, "col_join", "left")

    def group_by(self, df):
        """各类groupby操作"""
        # 1.count / count distinct val by key
        gp_key = df.groupBy("key").\
            count().alias("count_key").\
            agg(F.count("val")).alias("count_val").\
            agg(F.countDistinct("val")).alias("count_distinct_val")

        # 2.count distinct, sum, collects val by key1 and key2
        gp_keys = df.groupBy("key1", "key2").\
            agg(F.countDistinct("val")).alias("count_distinct_val").\
            agg(F.sum("val")).alias("sum_val").\
            agg(F.collect_set("val")).alias("set_val"). \
            agg(F.collect_list("val")).alias("list_val")

        # 3.make distribution
        gp_key = gp_key.selectExpr("key", "struct(val1, val2, val3) as key_struct")
        gp_key = gp_key.groupBy("key").agg(F.collect_list("key_struct")).alias("key_distribution")

        return gp_key, gp_keys

    @staticmethod
    def udf_function(string1, string2):
        """字符串相加"""
        return string1 + string2

    def df_udf_function(self, df):
        """通过的函数相互作用生成一个新列"""
        return df.withColumn("col_new", F.udf(self.udf_function, StringType())(F.col("col_old1"), F.col("col_old2")))

    def df_explode_split(self, df):
        """将一列按字符串split后explode形成一个新列"""
        return df.withColumn("col_new", F.explode(F.split(df["col_old"], "[|]")))

    def df_window_partition(self, df):
        """按key1,key2列分组后,按另一列排序"""
        w = Window.partitonBy("key1", "key2").orderBy(F.col("rank_col").asc())
        df = df.withColumn("rank_number", F.row_number().over(w))
        return df.where(F.col("rank_number")==1)

    def df_filter_like(self, df):
        """过滤col1列中SQL-like(包含)str1,并且同时满足col2列中SQL-like(包含)str1的数据"""
        return df.filter((df["col1"].like("str1")) & (df["col2"].like("str2")))

    def df_filters(self, df):
        """过滤col1列取值为1,和col2列取值为2的数据"""
        return df.filter((F.col("col1")=="1") | (F.col("col2")=="2"))

    def df_dropna(self, df):
        """按列去重"""
        return df.dropna(subset=["col1"])

    def df_when_otherwise(self, df):
        """多列判断"""
        # make a new col, when col1!=None, return col2-col3 otherwise return None
        return df.withColumn("col_new", F.when(F.col("col1").isNotNull(), F.col("col2")-F.col("col3")).otherwise(None))

    @staticmethod
    def udf_get_dict(val1, val2):
        return {"key1":val1, "key2":val2}
        
    def df_get_dict(self, df):
        """udf函数对map类型进行拆分"""
        return df.withColumn("col_new", self.udf_get_dict(F.col("col1"), F.col("col2")))

        
    

#python##如何看待2023届秋招#
Python 文章被收录于专栏

Python由荷兰数学和计算机科学研究学会的吉多·范罗苏姆于1990年代初设计,作为一门叫做ABC语言的替代品。Python提供了高效的高级数据结构,还能简单有效地面向对象编程。Python语法和动态类型,以及解释型语言的本质,使它成为多数平台上写脚本和快速开发应用的编程语言,随着版本的不断更新和语言新功能的添加,逐渐被用于独立的、大型项目的开发

全部评论

相关推荐

不愿透露姓名的神秘牛友
10-11 19:51
已编辑
点赞 评论 收藏
分享
10-29 22:30
吉林大学 Java
同专业学长学姐,去互联网大厂的起薪 15k+,去国企 IT 岗的也有 12k+,就连去中小厂的都基本 13k 起步😤 我投的传统行业技术岗,拼死拼活拿到 1Woffer,本来还挺开心,结果逛了圈牛客直接破防,同是校招生,行业差距怎么就这么大啊!
喵喵喵6_6:应该哪里不对吧,大厂都是20k以上的,10k那种对于985本的学生基本就是点击一下过了笔试就送的,我前两天刚拿了一个11k,笔试完第2天就打电话了,非科班。坏消息是c++岗开这么低真是刷新认知了
校招生月薪1W算什么水平
点赞 评论 收藏
分享
评论
点赞
收藏
分享

创作者周榜

更多
牛客网
牛客网在线编程
牛客网题解
牛客企业服务