Commit 7445726f by wangjing

周趋势计算和同步 放在周增长里面了 月的趋势计算和同步取消周的数据

parent f9e42a8c
...@@ -106,45 +106,45 @@ class DwtSTBaseReport(object): ...@@ -106,45 +106,45 @@ class DwtSTBaseReport(object):
df_save.write.saveAsTable(name=self.hive_tb, format='hive', mode='append', partitionBy=partition_by) df_save.write.saveAsTable(name=self.hive_tb, format='hive', mode='append', partitionBy=partition_by)
print("success") print("success")
# 计算周维度的趋势图数据 # # 计算周维度的趋势图数据
# 读日期字典表,获取date_info对应的week_list # # 读日期字典表,获取date_info对应的week_list
sql = f""" # sql = f"""
select year_week from dim_date_20_to_30 where week_day = 1 and year_month = '{self.date_info}'; # select year_week from dim_date_20_to_30 where week_day = 1 and year_month = '{self.date_info}';
""" # """
df_week = self.spark.sql(sql) # df_week = self.spark.sql(sql)
week_list = sorted([row['year_week'] for row in df_week.collect()]) # week_list = sorted([row['year_week'] for row in df_week.collect()])
for year_week in week_list: # for year_week in week_list:
sql = f""" # sql = f"""
select # select
search_term, # search_term,
rank as st_rank # rank as st_rank
from ods_brand_analytics # from ods_brand_analytics
where site_name = '{self.site_name}' # where site_name = '{self.site_name}'
and date_type = 'week' # and date_type = 'week'
and date_info = '{year_week}' # and date_info = '{year_week}'
and rank <= 1500000; # and rank <= 1500000;
""" # """
df_st_rank_week = self.spark.sql(sql).repartition(40, 'search_term').cache() # df_st_rank_week = self.spark.sql(sql).repartition(40, 'search_term').cache()
print(f"搜索词+排名对应关系,{year_week}周:") # print(f"搜索词+排名对应关系,{year_week}周:")
df_st_rank_week.show(10, False) # df_st_rank_week.show(10, False)
df_save = df_st_base\ # df_save = df_st_base\
.join(df_st_rank_week, 'search_term', 'inner')\ # .join(df_st_rank_week, 'search_term', 'inner')\
.repartition(40, 'st_rank')\ # .repartition(40, 'st_rank')\
.join(df_rank_sv, 'st_rank', 'left')\ # .join(df_rank_sv, 'st_rank', 'left')\
.withColumn('created_time', F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS'))\ # .withColumn('created_time', F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS'))\
.withColumn('updated_time', F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS'))\ # .withColumn('updated_time', F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS'))\
.withColumn('years', F.lit(int(year_week.split("-")[0])))\ # .withColumn('years', F.lit(int(year_week.split("-")[0])))\
.withColumn('site_name', F.lit(self.site_name))\ # .withColumn('site_name', F.lit(self.site_name))\
.withColumn('date_type', F.lit('week'))\ # .withColumn('date_type', F.lit('week'))\
.withColumn('date_info', F.lit(year_week)) # .withColumn('date_info', F.lit(year_week))
hdfs_path = f"/home/{SparkUtil.DEF_USE_DB}/dwt/{self.hive_tb}/site_name={self.site_name}/date_type=week/date_info={year_week}" # hdfs_path = f"/home/{SparkUtil.DEF_USE_DB}/dwt/{self.hive_tb}/site_name={self.site_name}/date_type=week/date_info={year_week}"
print(f"清除hdfs目录中数据:{hdfs_path}") # print(f"清除hdfs目录中数据:{hdfs_path}")
HdfsUtils.delete_hdfs_file(hdfs_path) # HdfsUtils.delete_hdfs_file(hdfs_path)
df_save = df_save.repartition(self.partitions_num) # df_save = df_save.repartition(self.partitions_num)
partition_by = ["site_name", "date_type", "date_info"] # partition_by = ["site_name", "date_type", "date_info"]
print(f"当前存储的表名为:{self.hive_tb},分区为{partition_by}", ) # print(f"当前存储的表名为:{self.hive_tb},分区为{partition_by}", )
df_save.write.saveAsTable(name=self.hive_tb, format='hive', mode='append', partitionBy=partition_by) # df_save.write.saveAsTable(name=self.hive_tb, format='hive', mode='append', partitionBy=partition_by)
print("success") # print("success")
if __name__ == '__main__': if __name__ == '__main__':
......
...@@ -51,43 +51,43 @@ if __name__ == '__main__': ...@@ -51,43 +51,43 @@ if __name__ == '__main__':
client.close() client.close()
print("导出月趋势图完成!") print("导出月趋势图完成!")
# 导出周数据 # # 导出周数据
# date_info对应的week_list # # date_info对应的week_list
spark = SparkUtil.get_spark_session(f"export_dwt_st_base_report:{site_name} {date_type} {date_info}") # spark = SparkUtil.get_spark_session(f"export_dwt_st_base_report:{site_name} {date_type} {date_info}")
sql = f""" # sql = f"""
select year_week from dim_date_20_to_30 where week_day = 1 and year_month = '{date_info}'; # select year_week from dim_date_20_to_30 where week_day = 1 and year_month = '{date_info}';
""" # """
df_week = spark.sql(sql) # df_week = spark.sql(sql)
week_list = sorted([row['year_week'] for row in df_week.collect()]) # week_list = sorted([row['year_week'] for row in df_week.collect()])
for year_week in week_list: # for year_week in week_list:
print(f"导出周趋势图到PG集群:{year_week}周") # print(f"导出周趋势图到PG集群:{year_week}周")
db_type = "postgresql_cluster" # db_type = "postgresql_cluster"
year_str = CommonUtil.safeIndex(year_week.split("-"), 0, None) # year_str = CommonUtil.safeIndex(year_week.split("-"), 0, None)
year_next = str(int(year_str) + 1) # year_next = str(int(year_str) + 1)
export_master_tb = f"{site_name}_aba_last_total_week" # export_master_tb = f"{site_name}_aba_last_total_week"
export_table = f"{export_master_tb}_{year_str}" # export_table = f"{export_master_tb}_{year_str}"
engine = DBUtil.get_db_engine(db_type, site_name) # engine = DBUtil.get_db_engine(db_type, site_name)
sql = f""" # sql = f"""
create table if not exists {export_table} partition of {export_master_tb} for values from ('{year_str}') to ('{year_next}'); # create table if not exists {export_table} partition of {export_master_tb} for values from ('{year_str}') to ('{year_next}');
delete from {export_table} where date_info = '{year_week}'; # delete from {export_table} where date_info = '{year_week}';
""" # """
DBUtil.engine_exec_sql(engine, sql) # DBUtil.engine_exec_sql(engine, sql)
# sqoop导出的sh脚本编写 # # sqoop导出的sh脚本编写
sh = CommonUtil.build_export_sh( # sh = CommonUtil.build_export_sh(
site_name=site_name, # site_name=site_name,
db_type=db_type, # db_type=db_type,
hive_tb="dwt_st_base_report", # hive_tb="dwt_st_base_report",
export_tb=export_table, # export_tb=export_table,
col=columns, # col=columns,
partition_dict={ # partition_dict={
"site_name": site_name, # "site_name": site_name,
"date_type": 'week', # "date_type": 'week',
"date_info": year_week # "date_info": year_week
} # }
) # )
client = SSHUtil.get_ssh_client() # client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, sh, ignore_err=False) # SSHUtil.exec_command_async(client, sh, ignore_err=False)
client.close() # client.close()
print(f"导出周趋势图{year_week}周完成!") # print(f"导出周趋势图{year_week}周完成!")
pass pass
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment