Commit 10eb734f by chenyuanjie

no message

parent 80aaf611
...@@ -106,7 +106,9 @@ class DimKeepaAsinInfo(object): ...@@ -106,7 +106,9 @@ class DimKeepaAsinInfo(object):
"site_name", F.lit(self.site_name) "site_name", F.lit(self.site_name)
).withColumn( ).withColumn(
"date_info", F.lit(self.date_info) "date_info", F.lit(self.date_info)
).repartition(50) ).repartition(50).cache()
new_count = self.df_save.count()
old_count = self.df_keepa_asin_history.count()
hive_tb = "dim_keepa_asin_info" hive_tb = "dim_keepa_asin_info"
partition_dict = { partition_dict = {
"site_name": self.site_name, "site_name": self.site_name,
...@@ -118,7 +120,7 @@ class DimKeepaAsinInfo(object): ...@@ -118,7 +120,7 @@ class DimKeepaAsinInfo(object):
self.df_save.write.saveAsTable(name=hive_tb, format='hive', mode='append', partitionBy=partition_by) self.df_save.write.saveAsTable(name=hive_tb, format='hive', mode='append', partitionBy=partition_by)
print("success!") print("success!")
if self.df_save.count() >= self.df_keepa_asin_history.count(): if new_count >= old_count:
print(f"正在删除历史分区数据") print(f"正在删除历史分区数据")
self.spark.sql(f""" self.spark.sql(f"""
ALTER TABLE {hive_tb} DROP IF EXISTS PARTITION (site_name='{self.site_name}', date_info='{self.last_date_info}') ALTER TABLE {hive_tb} DROP IF EXISTS PARTITION (site_name='{self.site_name}', date_info='{self.last_date_info}')
......
...@@ -40,7 +40,8 @@ if __name__ == '__main__': ...@@ -40,7 +40,8 @@ if __name__ == '__main__':
query=query, query=query,
hive_table=hive_table, hive_table=hive_table,
hdfs_path=hdfs_path, hdfs_path=hdfs_path,
partitions=partition_dict partitions=partition_dict,
check_count=False
) )
pass pass
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment