Commit a73a1324 by fangxingjun

新增上个月流量选品有月销进行补充的asin

parent 074ed033
...@@ -31,6 +31,8 @@ from utils.db_util import DbTypes, DBUtil ...@@ -31,6 +31,8 @@ from utils.db_util import DbTypes, DBUtil
from utils.common_util import CommonUtil from utils.common_util import CommonUtil
from utils.secure_db_client import get_remote_engine from utils.secure_db_client import get_remote_engine
from utils.spark_util import SparkUtil from utils.spark_util import SparkUtil
from datetime import datetime
from dateutil.relativedelta import relativedelta
class DwtAsinSync(Templates): class DwtAsinSync(Templates):
...@@ -44,6 +46,7 @@ class DwtAsinSync(Templates): ...@@ -44,6 +46,7 @@ class DwtAsinSync(Templates):
self.spark = self.create_spark_object( self.spark = self.create_spark_object(
app_name=f"{self.db_save}: {self.site_name}, {self.date_type}, {self.date_info}") app_name=f"{self.db_save}: {self.site_name}, {self.date_type}, {self.date_info}")
self.df_save = self.spark.sql(f"select 1+1;") self.df_save = self.spark.sql(f"select 1+1;")
self.df_asin_flow = self.spark.sql(f"select 1+1;")
self.df_asin_from_st = self.spark.sql(f"select 1+1;") self.df_asin_from_st = self.spark.sql(f"select 1+1;")
self.df_asin_from_adv = self.spark.sql(f"select 1+1;") self.df_asin_from_adv = self.spark.sql(f"select 1+1;")
self.df_asin_from_day = self.spark.sql(f"select 1+1;") self.df_asin_from_day = self.spark.sql(f"select 1+1;")
...@@ -61,7 +64,7 @@ class DwtAsinSync(Templates): ...@@ -61,7 +64,7 @@ class DwtAsinSync(Templates):
print(type(self.engine_mysql)) print(type(self.engine_mysql))
print(self.engine_mysql) print(self.engine_mysql)
self.get_date_info_tuple() # self.date_info_tuple self.get_date_info_tuple() # self.date_info_tuple
self.table_syn = f"{self.site_name}_all_syn_st_month_{self.date_info.replace('-', '_')}" if self.site_name == 'us' else f"{self.site_name}_all_syn_st_{self.date_info.replace('-', '_')}" self.table_syn = f"{self.site_name}_all_syn_st_month_{self.date_info.replace('-', '_')}" if self.date_type == 'month' else f"{self.site_name}_all_syn_st_{self.date_info.replace('-', '_')}"
self.partitions_by = ['site_name', 'date_type', 'date_info'] self.partitions_by = ['site_name', 'date_type', 'date_info']
self.reset_partitions(partitions_num=5) self.reset_partitions(partitions_num=5)
self.date_type = self.judge_date_type() self.date_type = self.judge_date_type()
...@@ -76,6 +79,18 @@ class DwtAsinSync(Templates): ...@@ -76,6 +79,18 @@ class DwtAsinSync(Templates):
else: else:
return False return False
@staticmethod
def get_prev_month(month_str: str) -> str:
"""
输入: YYYY-MM
输出: 上一个月,格式仍为 YYYY-MM
示例:
get_prev_month("2026-01") -> "2025-12"
"""
dt = datetime.strptime(month_str, "%Y-%m")
return (dt - relativedelta(months=1)).strftime("%Y-%m")
def judge_date_type(self): def judge_date_type(self):
day_flag = self.judge_today() day_flag = self.judge_today()
print(f"site_name: {self.site_name}, date_type: {self.date_type}, date_info: {self.date_info}") print(f"site_name: {self.site_name}, date_type: {self.date_type}, date_info: {self.date_info}")
...@@ -129,6 +144,11 @@ class DwtAsinSync(Templates): ...@@ -129,6 +144,11 @@ class DwtAsinSync(Templates):
else: else:
sql_asin_from_day = f"select asin, 3 as asin_flag from dwd_nsr_bsr_keepa_asin where site_name='{self.site_name}' and date_type='day' and date_info in {self.date_info_tuple} and asin_type!='1'" sql_asin_from_day = f"select asin, 3 as asin_flag from dwd_nsr_bsr_keepa_asin where site_name='{self.site_name}' and date_type='day' and date_info in {self.date_info_tuple} and asin_type!='1'"
self.df_asin_from_day = self.read_data_common(sql=sql_asin_from_day, content="1.3. 读取dwd_nsr_bsr_keepa_asin表的asin新品和榜单数据") self.df_asin_from_day = self.read_data_common(sql=sql_asin_from_day, content="1.3. 读取dwd_nsr_bsr_keepa_asin表的asin新品和榜单数据")
# 获取上一个月的流量选品月份数据, amazon月销>0的asin
prev_month = self.get_prev_month(self.date_info)
sql_asin_flow = f"select asin, 4 as asin_flag from dwt_flow_asin where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{prev_month}' and asin_bought_month>0"
self.df_asin_flow = self.read_data_common(sql=sql_asin_flow, content="1.4. 读取dwt_flow_asin表的asin数据")
# 读取asin属性值 # 读取asin属性值
sql_asin_variation = f"""select asin, 1 as asin_is_variation from dim_asin_variation_info where site_name="{self.site_name}";""" sql_asin_variation = f"""select asin, 1 as asin_is_variation from dim_asin_variation_info where site_name="{self.site_name}";"""
self.df_asin_variation = self.read_data_common(sql=sql_asin_variation, content="2.1 读取dim_asin_variation_info表的asin变体属性") self.df_asin_variation = self.read_data_common(sql=sql_asin_variation, content="2.1 读取dim_asin_variation_info表的asin变体属性")
...@@ -211,10 +231,11 @@ class DwtAsinSync(Templates): ...@@ -211,10 +231,11 @@ class DwtAsinSync(Templates):
.save() .save()
print(f"写入完毕") print(f"写入完毕")
def handle_data(self): def handle_data(self):
if self.date_type in ['month']: if self.date_type in ['month']:
self.df_save = self.df_asin_from_st.unionByName( self.df_save = self.df_asin_flow.unionByName(
self.df_asin_from_st, allowMissingColumns=True
).unionByName(
self.df_asin_from_adv, allowMissingColumns=True self.df_asin_from_adv, allowMissingColumns=True
).unionByName( ).unionByName(
self.df_asin_from_day, allowMissingColumns=True self.df_asin_from_day, allowMissingColumns=True
...@@ -223,7 +244,9 @@ class DwtAsinSync(Templates): ...@@ -223,7 +244,9 @@ class DwtAsinSync(Templates):
# self.df_asin_from_day, allowMissingColumns=True # self.df_asin_from_day, allowMissingColumns=True
# ) # )
if self.date_type == 'month_week': if self.date_type == 'month_week':
self.df_save = self.df_asin_from_st.unionByName( self.df_save = self.df_asin_flow.unionByName(
self.df_asin_from_st, allowMissingColumns=True
).unionByName(
self.df_asin_from_adv, allowMissingColumns=True self.df_asin_from_adv, allowMissingColumns=True
) )
if self.date_type == 'day': if self.date_type == 'day':
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment