Commit 6d566455 by fangxingjun

no message

parent 6d8c9baa
......@@ -30,6 +30,7 @@ from pyspark.sql import functions as F
from utils.db_util import DbTypes, DBUtil
from utils.common_util import CommonUtil
from utils.secure_db_client import get_remote_engine
from utils.spark_util import SparkUtil
class DwtAsinSync(Templates):
......@@ -64,6 +65,8 @@ class DwtAsinSync(Templates):
self.partitions_by = ['site_name', 'date_type', 'date_info']
self.reset_partitions(partitions_num=5)
self.date_type = self.judge_date_type()
# 导出到利润表
self.pg_table = f"{site_name}_asin_profit_keepa_add"
@staticmethod
def judge_today():
......@@ -121,10 +124,10 @@ class DwtAsinSync(Templates):
).select('asin').drop_duplicates(['asin'])
self.df_asin_from_adv = self.df_asin_from_adv.withColumn('asin_flag', F.lit(2))
if self.date_info == 'day':
sql_asin_from_day = f"select asin, 3 as asin_flag from dwd_nsr_bsr_keepa_asin where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}'"
if self.date_type == 'day':
sql_asin_from_day = f"select asin, 3 as asin_flag from dwd_nsr_bsr_keepa_asin where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}' and asin_type!='1'"
else:
sql_asin_from_day = f"select asin, 3 as asin_flag from dwd_nsr_bsr_keepa_asin where site_name='{self.site_name}' and date_type='day' and date_info in {self.date_info_tuple}"
sql_asin_from_day = f"select asin, 3 as asin_flag from dwd_nsr_bsr_keepa_asin where site_name='{self.site_name}' and date_type='day' and date_info in {self.date_info_tuple} and asin_type!='1'"
self.df_asin_from_day = self.read_data_common(sql=sql_asin_from_day, content="1.3. 读取dwd_nsr_bsr_keepa_asin表的asin新品和榜单数据")
# 读取asin属性值
sql_asin_variation = f"""select asin, 1 as asin_is_variation from dim_asin_variation_info where site_name="{self.site_name}";"""
......@@ -159,6 +162,56 @@ class DwtAsinSync(Templates):
)
continue
def sava_profit_data(self):
df = self.df_asin_from_day.select("asin")
df = df.drop_duplicates(['asin'])
df = df.withColumn("month", F.lit(self.date_info[:7]))
# 排除 dim_keepa_asin_info 中已有有效keepa数据的ASIN
# 若 package_length/width/height/weight 任意一个 < 0,视为数据异常,不排除(需重新抓取)
print("8. 排除已有keepa数据的ASIN (dim_keepa_asin_info)")
df_keepa = self.spark.sql(f"""
select asin from dim_keepa_asin_info
where site_name = '{self.site_name}'
and package_length >= 0
and package_width >= 0
and package_height >= 0
and weight >= 0
""").repartition(40, 'asin')
df = df.join(df_keepa, on='asin', how='left_anti').cache()
print(f"排除keepa后数据量: {df.count()}")
# 排除 {pg_table} 中已导出的ASIN
print(f"9. 排除已导出的ASIN ({self.pg_table})")
pg_con_info = DBUtil.get_connection_info("postgresql_cluster", self.site_name)
df_exported = SparkUtil.read_jdbc_query(
session=self.spark,
url=pg_con_info['url'],
username=pg_con_info['username'],
pwd=pg_con_info['pwd'],
query=f"select asin from {self.pg_table}"
).repartition(40, 'asin')
df = df.join(df_exported, on='asin', how='left_anti').cache()
print(f"排除已导出后数据量: {df.count()}")
df.show(10, truncate=False)
total = df.count()
print(f"10. 写入 PostgreSQL 表 {self.pg_table},共 {total} 条")
# quit()
df = df.select(
F.col("asin"),
F.col("month"),
).cache()
con_info = DBUtil.get_connection_info('postgresql_cluster', self.site_name)
df.write.format("jdbc") \
.option("url", con_info["url"]) \
.option("dbtable", self.pg_table) \
.option("user", con_info["username"]) \
.option("password", con_info["pwd"]) \
.mode("append") \
.save()
print(f"写入完毕")
def handle_data(self):
if self.date_type in ['month']:
self.df_save = self.df_asin_from_st.unionByName(
......@@ -175,6 +228,8 @@ class DwtAsinSync(Templates):
)
if self.date_type == 'day':
self.df_save = self.df_asin_from_day
# 同步到利润表
self.sava_profit_data()
print(f"去重之前的asin_flag类型数量统计")
self.df_save.groupby(f"asin_flag").agg(
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment