Commit bf96b202 by chenyuanjie

整合利润率数据

parent 10eb734f
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.spark_util import SparkUtil
from pyspark.sql import functions as F, Window
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils
from datetime import datetime, timedelta
class DimAsinProfitRateInfo(object):
def __init__(self, site_name, date_info):
self.site_name = site_name
self.date_info = date_info
self.last_date_info = (datetime.strptime(date_info, "%Y-%m-%d").date() - timedelta(days=1)).strftime("%Y-%m-%d")
self.spark = SparkUtil.get_spark_session(f"{self.__class__.__name__}: {self.site_name} {self.date_info}")
self.df_asin_profit = self.spark.sql(f"select 1+1;")
self.df_asin_profit_history = self.spark.sql(f"select 1+1;")
self.df_save = self.spark.sql(f"select 1+1;")
def run(self):
self.read_data()
self.handle_data()
self.save_data()
def read_data(self):
# 清洗整合利润率表
sql = f"""
select asin, price, category, ocean_profit, air_profit, package_length, package_width, package_height, weight, updated_time, date_info
from dim_asin_profit_rate_info where site_name = '{self.site_name}';
"""
self.df_asin_profit = self.spark.sql(sqlQuery=sql).repartition(40, 'asin').cache()
self.df_asin_profit_history = self.df_asin_profit.filter(f"date_info < '{self.date_info}'").cache()
def handle_data(self):
# 去重
window = Window.partitionBy(['asin', 'price']).orderBy(
self.df_asin_profit.updated_time.desc_nulls_last()
)
self.df_asin_profit = self.df_asin_profit.withColumn(
'rank', F.row_number().over(window=window)
).filter('rank = 1').drop('rank', 'date_info')
def save_data(self):
self.df_save = self.df_asin_profit.withColumn(
"site_name", F.lit(self.site_name)
).withColumn(
"date_info", F.lit(self.date_info)
).repartition(10).cache()
new_count = self.df_save.count()
old_count = self.df_asin_profit_history.count()
print(f"历史数据量:{old_count}")
print(f"最新数据量:{new_count}")
if new_count >= old_count:
hive_tb = "dim_asin_profit_rate_info"
partition_dict = {
"site_name": self.site_name,
"date_info": self.date_info
}
partition_by = list(partition_dict.keys())
hdfs_path = CommonUtil.build_hdfs_path(hive_tb, partition_dict)
HdfsUtils.delete_hdfs_file(hdfs_path)
print(f"正在进行数据存储,当前存储的表名为:{hive_tb},存储路径:{hdfs_path}")
self.df_save.write.saveAsTable(name=hive_tb, format='hive', mode='append', partitionBy=partition_by)
print("success!")
print(f"正在删除历史分区数据")
self.spark.sql(f"""
ALTER TABLE {hive_tb} DROP IF EXISTS PARTITION (site_name='{self.site_name}', date_info='{self.last_date_info}')
""")
HdfsUtils.delete_hdfs_file(
CommonUtil.build_hdfs_path(hive_tb, partition_dict={"site_name": self.site_name, "date_info": self.last_date_info})
)
print("success!")
if __name__ == "__main__":
site_name = sys.argv[1]
date_info = sys.argv[2]
handle_obj = DimAsinProfitRateInfo(site_name, date_info)
handle_obj.run()
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.common_util import CommonUtil
from utils.secure_db_client import get_remote_engine
from datetime import datetime, timedelta
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
date_info = CommonUtil.get_sys_arg(2, None)
assert site_name is not None, "site_name 不能为空!"
assert date_info is not None, "date_info 不能为空!"
last_date_info = (datetime.strptime(date_info, "%Y-%m-%d").date() - timedelta(days=1)).strftime("%Y-%m-%d")
db_type = 'postgresql_cluster'
hive_table = "dim_asin_profit_rate_info"
partition_dict = {
"site_name": site_name,
"date_info": date_info
}
hdfs_path = CommonUtil.build_hdfs_path(hive_table, partition_dict=partition_dict)
engine = get_remote_engine(
site_name=site_name,
db_type=db_type
)
query = f"""
select asin, price, category, ocean_profit, air_profit, package_length, package_width, package_height, weight, updated_time
from {site_name}_asin_profit_rate_calc_2025 where calc_flag = 1 and updated_time >= '{last_date_info}' and updated_time < '{date_info}' and \$CONDITIONS
"""
engine.sqoop_raw_import(
query=query,
hive_table=hive_table,
hdfs_path=hdfs_path,
partitions=partition_dict,
check_count=False
)
print('success!')
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment