Commit 17c72656 by chenyuanjie

店铺报表数据-新增字段-卖家公司信息

parent 3797377f
...@@ -65,33 +65,62 @@ class DwtFbBaseReport(object): ...@@ -65,33 +65,62 @@ class DwtFbBaseReport(object):
# 初始化UDF函数 # 初始化UDF函数
self.udf_new_asin_flag = F.udf(udf_new_asin_flag, IntegerType()) self.udf_new_asin_flag = F.udf(udf_new_asin_flag, IntegerType())
self.u_judge_package_quantity = F.udf(udf_get_package_quantity, IntegerType()) self.u_judge_package_quantity = F.udf(udf_get_package_quantity, IntegerType())
self.u_get_business_val = F.udf(self.get_business_val, StringType())
# 解析seller_address字段,获取卖家公司数据
@staticmethod
def get_business_val(seller_address, key):
if not seller_address:
return None
parts = [p.strip() for p in seller_address.split("|-|")]
for i, p in enumerate(parts):
if p.startswith(key):
# Business Address: 拼接后续所有内容
if key in ("Business Address", "Geschäftsadresse"):
return " ".join(parts[i + 1:]).strip()
# 其他key: 只取下一个
elif i + 1 < len(parts):
return parts[i + 1].strip()
return None
def read_data(self): def read_data(self):
# ods_seller_account_feedback 月度店铺报告表主表 # ods_seller_account_feedback 月度店铺报告表主表
print("获取 ods_seller_account_feedback") print("获取 ods_seller_account_feedback")
sql = f"""select cur_fd.seller_id, sql = f"""
select
cur_fd.seller_id,
cur_fd.fb_web_asin_num, cur_fd.fb_web_asin_num,
cur_fd.fb_country_name, cur_fd.fb_country_name,
cur_fd.count_30_day_num, cur_fd.count_30_day_num,
cur_fd.count_1_year_num, cur_fd.count_1_year_num,
cur_fd.count_lifetime_num, cur_fd.count_lifetime_num,
cur_fd.seller_address,
cur_fd.fb_crawl_date, cur_fd.fb_crawl_date,
round((count_30_day_num - last_30_day_num) / last_30_day_num, 4) as count_30_day_rate, round((count_30_day_num - last_30_day_num) / last_30_day_num, 4) as count_30_day_rate,
round((count_1_year_num - last_1_year_num) / last_1_year_num, 4) as count_1_year_rate, round((count_1_year_num - last_1_year_num) / last_1_year_num, 4) as count_1_year_rate,
round((count_lifetime_num - last_lifetime_num) / last_lifetime_num, 4) as count_life_time_rate round((count_lifetime_num - last_lifetime_num) / last_lifetime_num, 4) as count_life_time_rate
from (select seller_id, from
(
select
seller_id,
num as fb_web_asin_num, num as fb_web_asin_num,
count_30_day as count_30_day_num, count_30_day as count_30_day_num,
count_1_year as count_1_year_num, count_1_year as count_1_year_num,
count_lifetime as count_lifetime_num, count_lifetime as count_lifetime_num,
country_name as fb_country_name, country_name as fb_country_name,
seller_address,
date_format(updated_at, 'yyyy-MM-dd HH:mm:ss') as fb_crawl_date date_format(updated_at, 'yyyy-MM-dd HH:mm:ss') as fb_crawl_date
from ods_seller_account_feedback from ods_seller_account_feedback
where site_name = '{self.site_name}' where site_name = '{self.site_name}'
and date_type = '{self.date_type}' and date_type = '{self.date_type}'
and date_info = '{self.date_info}' and date_info = '{self.date_info}'
and length(seller_id) > 2 ) cur_fd and length(seller_id) > 2
left join (select seller_id, ) cur_fd
left join
(
select
seller_id,
count_30_day as last_30_day_num, count_30_day as last_30_day_num,
count_1_year as last_1_year_num, count_1_year as last_1_year_num,
count_lifetime as last_lifetime_num count_lifetime as last_lifetime_num
...@@ -99,8 +128,10 @@ from (select seller_id, ...@@ -99,8 +128,10 @@ from (select seller_id,
where site_name = '{self.site_name}' where site_name = '{self.site_name}'
and date_type = '{self.date_type}' and date_type = '{self.date_type}'
and date_info = '{self.last_month}' and date_info = '{self.last_month}'
and length(seller_id) > 2 ) last_fd and length(seller_id) > 2
on cur_fd.seller_id = last_fd.seller_id""" ) last_fd
on cur_fd.seller_id = last_fd.seller_id
"""
self.df_fb_feedback = self.spark.sql(sqlQuery=sql) self.df_fb_feedback = self.spark.sql(sqlQuery=sql)
self.df_fb_feedback = self.df_fb_feedback.drop_duplicates(['seller_id']).cache() self.df_fb_feedback = self.df_fb_feedback.drop_duplicates(['seller_id']).cache()
print(sql) print(sql)
...@@ -108,9 +139,8 @@ from (select seller_id, ...@@ -108,9 +139,8 @@ from (select seller_id,
# 获取我们内部的店铺与asin的数据库(从搜索词抓下来,店铺与asin的关系表) # 获取我们内部的店铺与asin的数据库(从搜索词抓下来,店铺与asin的关系表)
print("获取 ods_seller_asin_account") print("获取 ods_seller_asin_account")
sql = f""" sql = f"""
select seller_id,asin from ods_seller_asin_account select seller_id, asin from ods_seller_asin_account
where site_name='{self.site_name}' where site_name='{self.site_name}' and date_format(created_at,'yyyy-MM-dd') <= '{self.cal_date}'
and date_format(created_at,'yyyy-MM-dd') <= '{self.cal_date}'
""" """
self.df_fb_asin = self.spark.sql(sqlQuery=sql) self.df_fb_asin = self.spark.sql(sqlQuery=sql)
self.df_fb_asin = self.df_fb_asin.drop_duplicates(['seller_id', 'asin']) self.df_fb_asin = self.df_fb_asin.drop_duplicates(['seller_id', 'asin'])
...@@ -157,16 +187,17 @@ from (select seller_id, ...@@ -157,16 +187,17 @@ from (select seller_id,
# 获取ods_asin_variat提取parent_asin用于计算是多变体 # 获取ods_asin_variat提取parent_asin用于计算是多变体
print("获取 dim_asin_variation_info") print("获取 dim_asin_variation_info")
sql = f"select asin,parent_asin from dim_asin_variation_info " \ sql = f"""
f"where site_name='{self.site_name}'" \ select asin, parent_asin from dim_asin_variation_info where site_name='{self.site_name}' and asin != parent_asin
f" and asin != parent_asin " """
self.df_asin_parent = self.spark.sql(sqlQuery=sql) self.df_asin_parent = self.spark.sql(sqlQuery=sql)
print(sql) print(sql)
# 获取ods_seller_account_syn提取account_name # 获取ods_seller_account_syn提取account_name
print("获取 ods_seller_account_syn") print("获取 ods_seller_account_syn")
sql = f"select seller_id,account_name,id from ods_seller_account_syn " \ sql = f"""
f"where site_name='{self.site_name}'" select seller_id, account_name, id from ods_seller_account_syn where site_name='{self.site_name}'
"""
self.df_seller_account = self.spark.sql(sqlQuery=sql) self.df_seller_account = self.spark.sql(sqlQuery=sql)
# 进行去重 # 进行去重
self.df_seller_account = self.df_seller_account.orderBy(self.df_seller_account.id.desc()) self.df_seller_account = self.df_seller_account.orderBy(self.df_seller_account.id.desc())
...@@ -177,8 +208,7 @@ from (select seller_id, ...@@ -177,8 +208,7 @@ from (select seller_id,
# 获取mysql:selection.accounts ,用于排除公司内部店铺 # 获取mysql:selection.accounts ,用于排除公司内部店铺
print("获取 selection.accounts") print("获取 selection.accounts")
sql = f""" sql = f"""
select seller_id, 1 as is_self_fb from select seller_id, 1 as is_self_fb from (select distinct seller_id from selection.accounts) t1
(select distinct seller_id from selection.accounts) t1
""" """
conn_info = DBUtil.get_connection_info("mysql", "us") conn_info = DBUtil.get_connection_info("mysql", "us")
self.df_self_seller_id = SparkUtil.read_jdbc_query( self.df_self_seller_id = SparkUtil.read_jdbc_query(
...@@ -291,6 +321,22 @@ from (select seller_id, ...@@ -291,6 +321,22 @@ from (select seller_id,
# 没有关联上的赋值为0,则不是公司内部店铺 # 没有关联上的赋值为0,则不是公司内部店铺
self.df_fb_agg = self.df_fb_agg.na.fill({"is_self_fb": 0}) self.df_fb_agg = self.df_fb_agg.na.fill({"is_self_fb": 0})
# 解析卖家公司数据,不同语言区别处理
if self.site_name in ("us", "uk"):
self.df_fb_agg = self.df_fb_agg.withColumn(
"business_name", self.u_get_business_val(F.col("seller_address"), F.lit("Business Name"))
).withColumn(
"business_addr", self.u_get_business_val(F.col("seller_address"), F.lit("Business Address"))
)
elif self.site_name == "de":
self.df_fb_agg = self.df_fb_agg.withColumn(
"business_name", self.u_get_business_val(F.col("seller_address"), F.lit("Geschäftsname"))
).withColumn(
"business_addr", self.u_get_business_val(F.col("seller_address"), F.lit("Geschäftsadresse"))
)
else:
pass
# 输出数据集-report # 输出数据集-report
def save_data_report(self): def save_data_report(self):
# 关联ods_seller_account_syn,带回account_name-采用inner join过滤掉库中无店铺名称的数据 # 关联ods_seller_account_syn,带回account_name-采用inner join过滤掉库中无店铺名称的数据
...@@ -358,6 +404,8 @@ from (select seller_id, ...@@ -358,6 +404,8 @@ from (select seller_id,
F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS').alias('updated_time'), F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS').alias('updated_time'),
F.lit(None).alias('usr_mask_type'), F.lit(None).alias('usr_mask_type'),
F.lit(None).alias('usr_mask_progress'), F.lit(None).alias('usr_mask_progress'),
F.col('business_name'),
F.col('business_addr'),
F.lit(self.site_name).alias('site_name'), F.lit(self.site_name).alias('site_name'),
F.lit(self.date_type).alias('date_type'), F.lit(self.date_type).alias('date_type'),
F.lit(self.date_info).alias('date_info') F.lit(self.date_info).alias('date_info')
......
...@@ -5,7 +5,7 @@ sys.path.append(os.path.dirname(sys.path[0])) ...@@ -5,7 +5,7 @@ sys.path.append(os.path.dirname(sys.path[0]))
from utils.db_util import DBUtil from utils.db_util import DBUtil
from utils.ssh_util import SSHUtil from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil, DateTypes from utils.common_util import CommonUtil
if __name__ == '__main__': if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None) site_name = CommonUtil.get_sys_arg(1, None)
...@@ -23,8 +23,6 @@ if __name__ == '__main__': ...@@ -23,8 +23,6 @@ if __name__ == '__main__':
db_type = 'postgresql_test' db_type = 'postgresql_test'
print("导出到测试库中") print("导出到测试库中")
else: else:
# db_type = "postgresql"
# print("导出到PG库中")
db_type = "postgresql_cluster" db_type = "postgresql_cluster"
print("导出到PG集群库库中") print("导出到PG集群库库中")
...@@ -38,7 +36,6 @@ if __name__ == '__main__': ...@@ -38,7 +36,6 @@ if __name__ == '__main__':
# 获取数据库连接 # 获取数据库连接
engine = DBUtil.get_db_engine(db_type, site_name) engine = DBUtil.get_db_engine(db_type, site_name)
# 保证幂等性,先删除原始表同周期的数据 # 保证幂等性,先删除原始表同周期的数据
sql = f""" sql = f"""
drop table if exists {export_tb}; drop table if exists {export_tb};
...@@ -89,7 +86,9 @@ if __name__ == '__main__': ...@@ -89,7 +86,9 @@ if __name__ == '__main__':
"fb_new_asin_num_type", "fb_new_asin_num_type",
"fb_new_asin_rate_type", "fb_new_asin_rate_type",
"usr_mask_type", "usr_mask_type",
"usr_mask_progress" "usr_mask_progress",
"business_name",
"business_addr"
], ],
partition_dict={ partition_dict={
"site_name": site_name, "site_name": site_name,
...@@ -101,7 +100,6 @@ if __name__ == '__main__': ...@@ -101,7 +100,6 @@ if __name__ == '__main__':
client = SSHUtil.get_ssh_client() client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, sh_report, ignore_err=False) SSHUtil.exec_command_async(client, sh_report, ignore_err=False)
client.close() client.close()
# 创建索引并交换分区 # 创建索引并交换分区
DBUtil.add_pg_part( DBUtil.add_pg_part(
...@@ -124,6 +122,3 @@ if __name__ == '__main__': ...@@ -124,6 +122,3 @@ if __name__ == '__main__':
# 关闭链接 # 关闭链接
engine.dispose() engine.dispose()
print("success") print("success")
print("success")
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment