import os import sys sys.path.append(os.path.dirname(sys.path[0])) from utils.hdfs_utils import HdfsUtils from utils.common_util import CommonUtil from utils.templates import Templates from pyspark.sql import functions as F class FlowAsinLast30days(Templates): def __init__(self): super().__init__() self.db_save = "tmp_flow_asin_last30days" self.spark = self.create_spark_object(app_name="FlowAsinLast30days") self.partitions_num = 20 self.partition_dict = {} self.df_es = self.spark.sql(f"select 1+1;") self.df_parent = self.spark.sql(f"select 1+1;") self.df_joined = self.spark.sql(f"select 1+1;") self.df_save = self.spark.sql(f"select 1+1;") def read_data(self): self.df_es = self.spark.read.format("org.elasticsearch.spark.sql")\ .option("es.nodes", "192.168.10.217")\ .option("es.port", "9200")\ .option("es.net.http.auth.user", "elastic")\ .option("es.net.http.auth.pass", "selection2021.+")\ .option("es.resource", "us_st_detail_last_4_week")\ .option("es.query", '{"query": {"match_all": {}}}')\ .load() columns = ["asin", "first_category_rank", "asin_bought_month", "total_comments", "variation_num", "site_name", "account_name"] self.df_es = self.df_es.select(columns).cache() self.df_es.show() sql = f""" select asin, parent_asin from ods_asin_variat; """ self.df_parent = self.spark.sql(sqlQuery=sql).cache() def handle_data(self): # self.df_parent = self.df_parent.groupby(["parent_asin"]).agg(F.count("asin").alias("variation_num")) self.df_joined = self.df_es.join(self.df_parent, "asin", "left") self.df_joined = self.df_joined\ .withColumn("parent_asin_is_null", F.when(F.col("parent_asin").isNull(), F.lit(1)).otherwise(F.lit(0)))\ .withColumn("parent_asin_exist", F.when(F.col("parent_asin").isNotNull(), F.lit(1)).otherwise(F.lit(0))) def save_data(self): self.df_save = self.df_joined hdfs_path_asin_info = CommonUtil.build_hdfs_path(self.db_save, partition_dict=self.partition_dict) print(f"清除hdfs目录中:{hdfs_path_asin_info}") HdfsUtils.delete_file_in_folder(hdfs_path_asin_info) print(f"当前存储的表名为:{self.db_save}") self.df_save.write.saveAsTable(name=self.db_save, format='hive', mode='append') print("success") if __name__ == '__main__': obj = FlowAsinLast30days() obj.run()