Commit f4da7619 by chenyuanjie

整合导出需要计算利润率的asin

parent bf96b202
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.spark_util import SparkUtil
from pyspark.sql import functions as F, Window
from utils.hdfs_utils import HdfsUtils
from utils.common_util import CommonUtil
class DwtFlowKeepaAsin(object):
def __init__(self, site_name, date_info):
self.site_name = site_name
self.date_info = date_info
self.spark = SparkUtil.get_spark_session(f"{self.__class__.__name__}: {self.site_name} {self.date_info}")
self.df_flow_asin = self.spark.sql(f"select 1+1;")
self.df_category_id = self.spark.sql(f"select 1+1;")
self.df_keepa_asin = self.spark.sql(f"select 1+1;")
self.df_calc_asin = self.spark.sql(f"select 1+1;")
self.df_export_asin = self.spark.sql(f"select 1+1;")
self.df_save = self.spark.sql(f"select 1+1;")
def run(self):
self.read_data()
self.handle_data()
self.save_data()
def read_data(self):
# 读取流量选品月asin
sql = f"""
select asin, asin_price as price, category_first_id, date_info as source_month
from dwt_flow_asin
where site_name = '{self.site_name}'
and date_type = 'month'
and date_info >= '2025-05'
and asin_price is not null
and asin_price > 0
"""
self.df_flow_asin = self.spark.sql(sqlQuery=sql).repartition(40, 'asin')
window = Window.partitionBy(['asin', 'price']).orderBy(
self.df_flow_asin.source_month.desc_nulls_last()
)
self.df_flow_asin = self.df_flow_asin.withColumn(
'rank', F.row_number().over(window=window)
).filter('rank = 1').drop('rank').cache()
# 读取分类数据
sql = f"""
select category_first_id, en_name as category from dim_bsr_category_tree where site_name = '{self.site_name}' and nodes_num = 2
"""
self.df_category_id = self.spark.sql(sqlQuery=sql).cache()
# 读取keepa数据
sql = f"""
select asin, package_length, package_width, package_height, weight from dim_keepa_asin_info where site_name = '{self.site_name}' and date_info = '{self.date_info}'
"""
self.df_keepa_asin = self.spark.sql(sqlQuery=sql).repartition(40, 'asin').filter(
(F.col("package_length") > 0) & (F.col("package_width") > 0) & (F.col("package_height") > 0) & (F.col("weight") > 0)
).cache()
# 读取已经计算过利润率的asin
sql = f"""
select asin, price from dim_asin_profit_rate_info where site_name = '{self.site_name}' and date_info = '{self.date_info}'
"""
self.df_calc_asin = self.spark.sql(sqlQuery=sql).repartition(40, 'asin').cache()
# 读取已经导出过asin+price,避免重复计算
sql = f"""
select asin, price from dwt_flow_keepa_asin where site_name = '{self.site_name}'
"""
self.df_export_asin = self.spark.sql(sqlQuery=sql).repartition(40, 'asin').cache()
def handle_data(self):
self.df_save = self.df_flow_asin.join(
self.df_category_id, on='category_first_id', how='left'
).join(
self.df_keepa_asin, on='asin', how='inner'
).join(
self.df_calc_asin, on=['asin', 'price'], how='left_anti'
).join(
self.df_export_asin, on=['asin', 'price'], how='left_anti'
).cache()
self.df_flow_asin.unpersist()
self.df_category_id.unpersist()
self.df_keepa_asin.unpersist()
self.df_calc_asin.unpersist()
self.df_export_asin.unpersist()
start_key = 1
self.df_save = self.df_save.withColumn(
'part_key', F.ntile(50).over(Window.orderBy(F.rand())) + (start_key - 1)
).select(
F.col('asin'),
F.col('price'),
F.col('category'),
F.col('package_length'),
F.col('package_width'),
F.col('package_height'),
F.col('weight'),
F.col('part_key'),
F.col('source_month'),
F.lit(self.site_name).alias('site_name'),
F.lit(self.date_info).alias('date_info')
).repartition(10)
def save_data(self):
hive_tb = "dwt_flow_keepa_asin"
partition_dict = {
"site_name": self.site_name,
"date_info": self.date_info
}
partition_by = list(partition_dict.keys())
hdfs_path = CommonUtil.build_hdfs_path(hive_tb, partition_dict)
HdfsUtils.delete_file_in_folder(hdfs_path)
print(f"正在进行数据存储,当前存储的表名为:{hive_tb},存储路径:{hdfs_path}")
self.df_save.write.saveAsTable(name=hive_tb, format='hive', mode='append', partitionBy=partition_by)
print("success")
if __name__ == "__main__":
site_name = sys.argv[1]
date_info = sys.argv[2]
handle_obj = DwtFlowKeepaAsin(site_name, date_info)
handle_obj.run()
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.secure_db_client import get_remote_engine
if __name__ == '__main__':
site_name = sys.argv[1]
date_info = sys.argv[2]
db_type = "postgresql_cluster"
export_tb = f"{site_name}_asin_profit_rate_calc"
engine = get_remote_engine(
site_name=site_name,
db_type=db_type
)
engine.sqoop_raw_export(
hive_table="dwt_flow_keepa_asin",
import_table=export_tb,
partitions={
"site_name": site_name,
"date_info": date_info
},
m=10,
cols="asin,price,package_length,package_width,package_height,weight,category,part_key,source_month"
)
print("success")
...@@ -30,7 +30,7 @@ if __name__ == '__main__': ...@@ -30,7 +30,7 @@ if __name__ == '__main__':
query = f""" query = f"""
select asin, price, category, ocean_profit, air_profit, package_length, package_width, package_height, weight, updated_time select asin, price, category, ocean_profit, air_profit, package_length, package_width, package_height, weight, updated_time
from {site_name}_asin_profit_rate_calc_2025 where calc_flag = 1 and updated_time >= '{last_date_info}' and updated_time < '{date_info}' and \$CONDITIONS from {site_name}_asin_profit_rate_calc where calc_flag = 1 and updated_time >= '{last_date_info}' and updated_time < '{date_info}' and \$CONDITIONS
""" """
engine.sqoop_raw_import( engine.sqoop_raw_import(
query=query, query=query,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment