Commit 6f57e4be by fangxingjun

no message

parent 81ba7508
...@@ -36,7 +36,7 @@ class DwdNsrBsrKeepaAsin(Templates): ...@@ -36,7 +36,7 @@ class DwdNsrBsrKeepaAsin(Templates):
self.df_asin_keepa = self.spark.sql(f"select 1+1;") self.df_asin_keepa = self.spark.sql(f"select 1+1;")
self.partitions_by = ['site_name', 'date_type', 'date_info'] self.partitions_by = ['site_name', 'date_type', 'date_info']
self.reset_partitions(partitions_num=5) self.reset_partitions(partitions_num=5)
self.get_date_info_tuple()
@staticmethod @staticmethod
def get_date_30_days_ago(date_str: str, date_format: str = "%Y-%m-%d") -> str: def get_date_30_days_ago(date_str: str, date_format: str = "%Y-%m-%d") -> str:
...@@ -65,19 +65,28 @@ class DwdNsrBsrKeepaAsin(Templates): ...@@ -65,19 +65,28 @@ class DwdNsrBsrKeepaAsin(Templates):
# print(is_saturday("2026-02-06")) # False # print(is_saturday("2026-02-06")) # False
def read_data(self): def read_data(self):
thirty_days_ago = self.get_date_30_days_ago(date_str=self.date_info) if self.date_type == 'day':
thirty_days_ago = self.get_date_30_days_ago(date_str=self.date_info)
else:
thirty_days_ago = ''
print(f"1.1 读取最近30天的bsr的asin") print(f"1.1 读取最近30天的bsr的asin")
sql_bsr = f"select asin, date_info, 1 as asin_cate_flag from dim_bsr_asin_rank_history where site_name='{self.site_name}' and date_info between '{thirty_days_ago}' and '{self.date_info}'" sql_bsr = f"select asin, date_info, 1 as asin_cate_flag from dim_bsr_asin_rank_history where site_name='{self.site_name}' and date_info between '{thirty_days_ago}' and '{self.date_info}'"
if self.date_type == 'month':
sql_bsr = f"select asin, date_info, 1 as asin_cate_flag from dim_bsr_asin_rank_history where site_name='{self.site_name}' and date_info in {self.date_info_tuple}"
print("sql_bsr:", sql_bsr) print("sql_bsr:", sql_bsr)
self.df_asin_bsr = self.spark.sql(sqlQuery=sql_bsr).cache() self.df_asin_bsr = self.spark.sql(sqlQuery=sql_bsr).cache()
self.df_asin_bsr.show(10, truncate=False) self.df_asin_bsr.show(10, truncate=False)
print(f"1.2 读取最近30天的nsr的asin") print(f"1.2 读取最近30天的nsr的asin")
sql_nsr = f"select asin, date_info, 2 as asin_cate_flag from dim_nsr_asin_rank_history where site_name='{self.site_name}' and date_info between '{thirty_days_ago}' and '{self.date_info}'" sql_nsr = f"select asin, date_info, 2 as asin_cate_flag from dim_nsr_asin_rank_history where site_name='{self.site_name}' and date_info between '{thirty_days_ago}' and '{self.date_info}'"
if self.date_type == 'month':
sql_nsr = f"select asin, date_info, 2 as asin_cate_flag from dim_nsr_asin_rank_history where site_name='{self.site_name}' and date_info in {self.date_info_tuple}"
print("sql_nsr:", sql_nsr) print("sql_nsr:", sql_nsr)
self.df_asin_nsr = self.spark.sql(sqlQuery=sql_nsr).cache() self.df_asin_nsr = self.spark.sql(sqlQuery=sql_nsr).cache()
self.df_asin_nsr.show(10, truncate=False) self.df_asin_nsr.show(10, truncate=False)
print(f"1.3 读取最近30天的keepa的asin") print(f"1.3 读取最近30天的keepa的asin")
sql_keepa = f"select distinct(asin), date_info, 3 as asin_cate_flag from ods_keepa_finder_asin where site_name='{self.site_name}' and date_info between '{thirty_days_ago}' and '{self.date_info}'" sql_keepa = f"select distinct(asin), date_info, 3 as asin_cate_flag from ods_keepa_finder_asin where site_name='{self.site_name}' and date_info between '{thirty_days_ago}' and '{self.date_info}'"
if self.date_type == 'month':
sql_keepa = f"select asin, date_info, 3 as asin_cate_flag from ods_keepa_finder_asin where site_name='{self.site_name}' and date_info in {self.date_info_tuple}"
print("sql_keepa:", sql_keepa) print("sql_keepa:", sql_keepa)
self.df_asin_keepa = self.spark.sql(sqlQuery=sql_keepa).cache() self.df_asin_keepa = self.spark.sql(sqlQuery=sql_keepa).cache()
self.df_asin_keepa.show(10, truncate=False) self.df_asin_keepa.show(10, truncate=False)
...@@ -134,7 +143,7 @@ class DwdNsrBsrKeepaAsin(Templates): ...@@ -134,7 +143,7 @@ class DwdNsrBsrKeepaAsin(Templates):
# self.df_save_asin_cate = self.df_save_asin_cate.withColumn("date_info", F.lit(self.date_info)) # self.df_save_asin_cate = self.df_save_asin_cate.withColumn("date_info", F.lit(self.date_info))
# df_asin_today = self.df_save.filter(f"date_info='{self.date_info}'") # df_asin_today = self.df_save.filter(f"date_info='{self.date_info}'")
is_saturday_flag = self.is_saturday(self.date_info) is_saturday_flag = True if self.date_type =='month' else self.is_saturday(self.date_info)
self.df_save = self.df_asin_bsr.unionByName(self.df_asin_nsr, allowMissingColumns=True).unionByName(self.df_asin_keepa, allowMissingColumns=True) self.df_save = self.df_asin_bsr.unionByName(self.df_asin_nsr, allowMissingColumns=True).unionByName(self.df_asin_keepa, allowMissingColumns=True)
window = Window.partitionBy(['asin']).orderBy(F.asc("date_info")) window = Window.partitionBy(['asin']).orderBy(F.asc("date_info"))
self.df_save = self.df_save.withColumn( self.df_save = self.df_save.withColumn(
...@@ -162,33 +171,34 @@ class DwdNsrBsrKeepaAsin(Templates): ...@@ -162,33 +171,34 @@ class DwdNsrBsrKeepaAsin(Templates):
content = f"整合asin完成--等待导出到pg提供爬虫使用--数量: {self.df_save.count()}" content = f"整合asin完成--等待导出到pg提供爬虫使用--数量: {self.df_save.count()}"
CommonUtil().send_wx_msg(users=users, title=title, content=content) CommonUtil().send_wx_msg(users=users, title=title, content=content)
# 更新30day分区 if self.date_type == 'day':
hdfs_day = CommonUtil.build_hdfs_path(self.db_save_cate, {"site_name": self.site_name, "date_type": self.date_type, "date_info": self.date_info}) # 更新30day分区
hdfs_30day = CommonUtil.build_hdfs_path(self.db_save_cate, {"site_name": self.site_name, "date_type": "30day", "date_info": "1970-01"}) hdfs_day = CommonUtil.build_hdfs_path(self.db_save_cate, {"site_name": self.site_name, "date_type": self.date_type, "date_info": self.date_info})
hdfs_30day_copy = CommonUtil.build_hdfs_path(self.db_save_cate, {"site_name": self.site_name, "date_type": "30day_copy", "date_info": "1970-01"}) hdfs_30day = CommonUtil.build_hdfs_path(self.db_save_cate, {"site_name": self.site_name, "date_type": "30day", "date_info": "1970-01"})
if not HdfsUtils.path_exist(hdfs_day): hdfs_30day_copy = CommonUtil.build_hdfs_path(self.db_save_cate, {"site_name": self.site_name, "date_type": "30day_copy", "date_info": "1970-01"})
print(f"源目录不存在: {hdfs_day}") if not HdfsUtils.path_exist(hdfs_day):
wx_users = ['fangxingjun', 'chenyuanjie'] print(f"源目录不存在: {hdfs_day}")
wx_msg = f"{hdfs_day} 目录数据不存在,请检查!" wx_users = ['fangxingjun', 'chenyuanjie']
CommonUtil.send_wx_msg(wx_users, "复制数据至30day分区", wx_msg) wx_msg = f"{hdfs_day} 目录数据不存在,请检查!"
sys.exit(1) CommonUtil.send_wx_msg(wx_users, "复制数据至30day分区", wx_msg)
if HdfsUtils.path_exist(hdfs_30day_copy): sys.exit(1)
print(f"中间目录已存在,先清空: {hdfs_30day_copy}") if HdfsUtils.path_exist(hdfs_30day_copy):
HdfsUtils.delete_file_in_folder(hdfs_30day_copy) print(f"中间目录已存在,先清空: {hdfs_30day_copy}")
else: HdfsUtils.delete_file_in_folder(hdfs_30day_copy)
print(f"中间目录不存在,创建: {hdfs_30day_copy}") else:
HdfsUtils.create_if_not_exist(hdfs_30day_copy) print(f"中间目录不存在,创建: {hdfs_30day_copy}")
os.system(f"hdfs dfs -cp {hdfs_day}/* {hdfs_30day_copy}/") HdfsUtils.create_if_not_exist(hdfs_30day_copy)
files = HdfsUtils.read_list(hdfs_30day_copy) os.system(f"hdfs dfs -cp {hdfs_day}/* {hdfs_30day_copy}/")
print(f"中间目录文件数: {len(files) if files else 0}") files = HdfsUtils.read_list(hdfs_30day_copy)
if not HdfsUtils.path_exist(hdfs_30day): print(f"中间目录文件数: {len(files) if files else 0}")
print(f"目标目录不存在,创建: {hdfs_30day}") if not HdfsUtils.path_exist(hdfs_30day):
HdfsUtils.create_if_not_exist(hdfs_30day) print(f"目标目录不存在,创建: {hdfs_30day}")
HdfsUtils.exchange_path(hdfs_30day_copy, hdfs_30day) HdfsUtils.create_if_not_exist(hdfs_30day)
print(f"交换完成! 30day与30day_copy已互换") HdfsUtils.exchange_path(hdfs_30day_copy, hdfs_30day)
print(f"交换完成! 30day与30day_copy已互换")
# 4. 修复hive元数据
CommonUtil.hive_cmd_exec(f"set hive.msck.path.validation=ignore; msck repair table big_data_selection.{self.db_save_cate};") # 4. 修复hive元数据
CommonUtil.hive_cmd_exec(f"set hive.msck.path.validation=ignore; msck repair table big_data_selection.{self.db_save_cate};")
# def save_data(self): # def save_data(self):
# pass # pass
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment