Commit 906830e1 by chenyuanjie

fix

parent a9b8d60d
...@@ -19,7 +19,6 @@ class DimAsinProfitRateInfo(object): ...@@ -19,7 +19,6 @@ class DimAsinProfitRateInfo(object):
self.spark = SparkUtil.get_spark_session(f"{self.__class__.__name__}: {self.site_name} {self.date_info}") self.spark = SparkUtil.get_spark_session(f"{self.__class__.__name__}: {self.site_name} {self.date_info}")
self.df_asin_profit = self.spark.sql(f"select 1+1;") self.df_asin_profit = self.spark.sql(f"select 1+1;")
self.df_asin_profit_history = self.spark.sql(f"select 1+1;")
self.df_keepa_asin = self.spark.sql(f"select 1+1;") self.df_keepa_asin = self.spark.sql(f"select 1+1;")
self.df_save = self.spark.sql(f"select 1+1;") self.df_save = self.spark.sql(f"select 1+1;")
...@@ -35,7 +34,6 @@ class DimAsinProfitRateInfo(object): ...@@ -35,7 +34,6 @@ class DimAsinProfitRateInfo(object):
from dim_asin_profit_rate_info where site_name = '{self.site_name}'; from dim_asin_profit_rate_info where site_name = '{self.site_name}';
""" """
self.df_asin_profit = self.spark.sql(sqlQuery=sql).repartition(40, 'asin').cache() self.df_asin_profit = self.spark.sql(sqlQuery=sql).repartition(40, 'asin').cache()
self.df_asin_profit_history = self.df_asin_profit.filter(f"date_info < '{self.date_info}'").cache()
# 读取keepa数据 # 读取keepa数据
sql = f""" sql = f"""
...@@ -65,30 +63,39 @@ class DimAsinProfitRateInfo(object): ...@@ -65,30 +63,39 @@ class DimAsinProfitRateInfo(object):
).repartition(10).cache() ).repartition(10).cache()
new_count = self.df_save.count() new_count = self.df_save.count()
old_count = self.df_asin_profit_history.count()
print(f"历史数据量:{old_count}")
print(f"最新数据量:{new_count}") print(f"最新数据量:{new_count}")
if new_count >= old_count:
hive_tb = "dim_asin_profit_rate_info" hive_tb = "dim_asin_profit_rate_info"
partition_dict = { partition_dict = {
"site_name": self.site_name, "site_name": self.site_name,
"date_info": self.date_info "date_info": self.date_info
} }
partition_by = list(partition_dict.keys()) partition_by = list(partition_dict.keys())
hdfs_path = CommonUtil.build_hdfs_path(hive_tb, partition_dict) hdfs_path = CommonUtil.build_hdfs_path(hive_tb, partition_dict)
HdfsUtils.delete_hdfs_file(hdfs_path) HdfsUtils.delete_hdfs_file(hdfs_path)
print(f"正在进行数据存储,当前存储的表名为:{hive_tb},存储路径:{hdfs_path}") print(f"正在进行数据存储,当前存储的表名为:{hive_tb},存储路径:{hdfs_path}")
self.df_save.write.saveAsTable(name=hive_tb, format='hive', mode='append', partitionBy=partition_by) self.df_save.write.saveAsTable(name=hive_tb, format='hive', mode='append', partitionBy=partition_by)
print("success!")
# 验证实际写入数量,确保写入成功后再删除历史分区
print(f"正在删除历史分区数据") written_count = self.spark.sql(f"""
self.spark.sql(f""" select count(1) as cnt from {hive_tb}
ALTER TABLE {hive_tb} DROP IF EXISTS PARTITION (site_name='{self.site_name}', date_info='{self.last_date_info}') where site_name='{self.site_name}' and date_info='{self.date_info}'
""") """).collect()[0]['cnt']
HdfsUtils.delete_hdfs_file( print(f"实际写入数量:{written_count},预期:{new_count}")
CommonUtil.build_hdfs_path(hive_tb, partition_dict={"site_name": self.site_name, "date_info": self.last_date_info}) if written_count != new_count:
raise RuntimeError(
f"写入数量校验失败!实际写入 {written_count} != 预期 {new_count},终止删除历史分区,请人工检查。"
) )
print("success!") print("写入校验通过!")
print(f"正在删除历史分区数据")
self.spark.sql(f"""
ALTER TABLE {hive_tb} DROP IF EXISTS PARTITION (site_name='{self.site_name}', date_info='{self.last_date_info}')
""")
HdfsUtils.delete_hdfs_file(
CommonUtil.build_hdfs_path(hive_tb, partition_dict={"site_name": self.site_name, "date_info": self.last_date_info})
)
print("success!")
if __name__ == "__main__": if __name__ == "__main__":
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment