Commit a1cae846 by fangxingjun

no message

parent 139c5fd6
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from pyspark.sql.types import StringType, MapType
from utils.common_util import CommonUtil, DateTypes
from utils.hdfs_utils import HdfsUtils
from utils.spark_util import SparkUtil
from utils.db_util import DBUtil
from pyspark.sql import functions as F
import numpy as np
from utils.templates import Templates
class DwtStMtChristmasInfo(Templates):
def __init__(self, site_name, date_type, date_info):
super().__init__()
self.site_name = site_name
self.date_type = date_type
self.date_info = date_info
self.db_save = f'dwt_st_mt_christmas_info'
self.partition_dict = {
"site_name": site_name,
"date_type": date_type,
"date_info": date_info
}
# 落表路径校验
self.hdfs_path = CommonUtil.build_hdfs_path(self.db_save, partition_dict=self.partition_dict)
# 注册自定义函数 (UDF)
self.u_theme_pattern = F.udf(self.udf_theme_pattern, StringType())
# 创建spark_session对象相关
app_name = f"{self.__class__.__name__}:{site_name}:{date_info}"
self.spark = SparkUtil.get_spark_session(app_name)
# 全局df初始化
self.df_st = self.spark.sql(f"select 1+1;")
self.df_st_quantity_being_sold = self.spark.sql(f"select 1+1;")
self.df_mt_volume = self.spark.sql(f"select 1+1;")
self.df_st_measure = self.spark.sql(f"select 1+1;")
self.df_st_asin_measure = self.spark.sql(f"select 1+1;")
self.df_st_asin = self.spark.sql(f"select 1+1;")
self.df_asin_measure = self.spark.sql(f"select 1+1;")
self.df_asin_detail = self.spark.sql(f"select 1+1;")
self.df_self_asin = self.spark.sql(f"select 1+1;")
self.df_save = self.spark.sql(f"select 1+1;")
# 其他变量
self.theme_list_str = str() # 正则匹配
self.reset_partitions(partitions_num=5)
self.partitions_by = ['site_name', 'date_type', 'date_info']
@staticmethod
def udf_theme_pattern(parttern_text, theme_list_str):
found_themes = [theme.strip() for theme in eval(theme_list_str) if theme in parttern_text]
if found_themes:
return ','.join(set(found_themes))
else:
return None
def read_data(self):
print(f"1.1 读取aba筛选逻辑的词: dim_st_detail表, 筛选逻辑: ABA词近四周包含christmas ,stocking stuffers, elf on shelf, garland ,santa ,nativity, grinch, ugly sweater ,tree , jingle bells,xmas, christmass,grinchmas,christmad,christams,yule,ms claus(剔除包含fall,winter,halloween,thanksgiving的aba词)")
sql = f"""
select distinct a.search_term from
(
select
search_term
from dim_st_detail
where site_name = '{self.site_name}'
and date_type = '{self.date_type}'
and date_info = '{self.date_info}'
-- and lower(search_term) like'%christmas%' --
-- 必须包含 Christmas 相关关键词(任意一个)
and lower(search_term) rlike
'(christmas|xmas|christmass|christmad|christams|grinchmas|yule|stocking\\s*stuffers|elf\\s*on\\s*shelf|garland|santa|ms\\s*claus|nativity|grinch|ugly\\s*sweater|tree|jingle\\s*bells)'
-- 剔除 fall / winter / halloween / thanksgiving
and not (lower(search_term) rlike
'(fall|winter|halloween|thanksgiving)')
union
select keyword as search_term from dwt_merchantwords_st_detail
where site_name='us'
and keyword like '%christmas%'
) as a
"""
self.df_st = self.spark.sql(sqlQuery=sql)
print(f"sql: {sql}, \n去重的搜索词数量: {self.df_st.count()}")
self.df_st.show(10, truncate=False)
print(f"2.1 搜索词维度: 获取搜索词及搜索词下搜索产品总数--取分区最新的一条")
sql = f"""
select
search_term,
cast(
max_by(quantity_being_sold, created_time)
as int
) as total_searched_products
from ods_st_quantity_being_sold
where site_name = '{self.site_name}'
and date_type = '{self.date_type}'
and date_info = '{self.date_info}'
group by search_term;
"""
self.df_st_quantity_being_sold = self.spark.sql(sql)
self.df_st_quantity_being_sold.show(10, truncate=False)
print(f"2.2 搜索词维度: 读取dwt_merchantwords_st_detail表--me搜索词搜索量--去重取最新")
# 读取Merchantwords中的对应christmas的搜索量
# sql = f"""
# select
# keyword as search_term,
# volume as mt_volume,
# batch
# from dwt_merchantwords_st_detail
# where site_name='{self.site_name}'
# """
sql = f"""
select
keyword as search_term,
max_by(volume, batch) as mt_volume
-- , max(batch) as batch
from dwt_merchantwords_st_detail
where site_name = '{self.site_name}'
group by keyword
"""
print(sql)
self.df_mt_volume = self.spark.sql(sqlQuery=sql)
self.df_mt_volume.show(10, truncate=False)
print(f"2.3 搜索词维度: 读取dwd_st_measure表--平均价格和ao值")
# 获取asin的销量相关数据
sql = f"""
select
search_term,
st_price_avg,
st_4_20_ao_avg as st_ao_val,
st_dd50_proportion as total_amazon_orders_per50,
-- st_zr_orders as total_orders,
st_bsr_orders as total_bsr_orders,
st_total_asin_counts as total_page3_products
from dwd_st_measure
where site_name = '{self.site_name}'
and date_type = '{self.date_type}'
and date_info = '{self.date_info}'
"""
print(sql)
self.df_st_measure = self.spark.sql(sql)
self.df_st_measure.show(10, truncate=False)
print(f"3.1 搜索词-asin维度: 读取dwd_st_asin_measure表--st和asin对应关系")
# 获取搜索词与asin的关系
sql = f"""
select
search_term,
asin
from dwd_st_asin_measure
where site_name = '{self.site_name}'
and date_type = '{self.date_type}'
and date_info = '{self.date_info}'
"""
print(sql)
self.df_st_asin_measure = self.spark.sql(sql)
self.df_st_asin_measure.show(10, truncate=False)
print(f"4.1 asin维度: 读取dim_asin_detail表--上架日期和是否新品标记")
# 获取asin详情中的上架日期数据
sql = f"""
select
asin,
asin_launch_time,
case when(datediff(`current_date`(),to_date(asin_launch_time)) <= 90) then 1 end as is_new_flag
from dim_asin_detail
where site_name = '{self.site_name}'
and date_type = '{self.date_type}'
and date_info = '{self.date_info}'
"""
print(sql)
self.df_asin_detail = self.spark.sql(sql)
self.df_asin_detail.show(10, truncate=False)
print(f"4.2 asin维度: 读取ods_self_asin表--内部asin信息")
# 读取ods_self_asin,获得公司内部asin信息
sql = f"""select
asin,
1 as is_self_asin
from ods_self_asin
where site_name='{self.site_name}' group by asin """
print(sql)
self.df_self_asin = self.spark.sql(sqlQuery=sql)
self.df_self_asin.show(10, truncate=False)
print(f"4.3 asin维度: 读取dwd_asin_measure表--asin销量信息")
# 获取asin的销量相关数据
sql = f"""
select
asin,
asin_zr_orders_sum,
asin_amazon_orders
from dwd_asin_measure
where site_name = '{self.site_name}'
and date_type = '{self.date_type}'
and date_info = '{self.date_info}'
"""
print(sql)
self.df_asin_measure = self.spark.sql(sql)
self.df_asin_measure.show(10, truncate=False)
def handle_data(self):
self.read_data()
self.handle_st_data()
self.save_data()
def handle_st_data(self):
# 关联st维度数据--st搜索商品数量--st的me词搜索量
self.df_save = self.df_st.join(
self.df_st_quantity_being_sold, on=['search_term'], how='left'
).join(
self.df_mt_volume, on=['search_term'], how='left'
).join(
self.df_st_measure, on=['search_term'], how='left'
).cache()
# 关联st-asin对应关系维度数据
self.df_st_asin = self.df_save.join(
self.df_st_asin_measure, on=['search_term'], how='left'
).cache()
# 关联asin维度数据
self.df_st_asin = self.df_st_asin.join(
self.df_asin_detail, on=['asin'], how='left'
).join(
self.df_self_asin, on=['asin'], how='left'
).join(
self.df_asin_measure, on=['asin'], how='left'
).cache()
# 内部asin和新品数量
df_st_products = self.df_st_asin.groupby(['search_term']).agg(
F.sum('is_self_asin').alias('total_self_products'),
F.sum('is_new_flag').alias('total_new_products'),
F.sum('asin_zr_orders_sum').alias('total_orders'),
F.sum('asin_amazon_orders').alias('total_amazon_orders'),
)
df_st_products.show(10, truncate=False)
print(f"df_st_products.columns: {df_st_products.columns}")
self.df_save = self.df_save.join(
df_st_products, on=['search_term'], how='left'
)
self.df_save = self.df_save.withColumn('self_product_rate', F.round(F.col('total_self_products')/F.col('total_page3_products'), 4))
self.df_save = self.df_save.withColumn('new_product_rate', F.round(F.col('total_new_products')/F.col('total_page3_products'), 4))
self.df_save = self.df_save.withColumn('created_time', F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS'))
self.df_save = self.df_save.withColumn('updated_time', F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS'))
self.df_save = self.df_save.withColumn('site_name', F.lit(self.site_name))
self.df_save = self.df_save.withColumn('date_type', F.lit(self.date_type))
self.df_save = self.df_save.withColumn('date_info', F.lit(self.date_info))
self.df_save.show(10, truncate=False)
print(f"self.df_save.columns: {self.df_save.columns}")
# def save_data(self):
# pass
# df_save = self.df_st_handle.select(
# F.col('search_term'),
# F.col('mt_volume'),
# F.col('total_searched_products'),
# F.col('total_page3_products'),
# F.col('total_self_products'),
# F.round(F.col('total_self_products')/F.col('total_page3_products'), 4).alias('self_product_rate'),
# F.col('total_bsr_orders'),
# F.col('total_orders'),
# F.col('total_new_products'),
# F.round(F.col('total_new_products') / F.col('total_page3_products'), 4).alias('new_product_rate'),
# F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS').alias('created_time'),
# F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS').alias('updated_time'),
# F.lit(self.site_name).alias('site_name'),
# F.lit(self.date_type).alias('date_type'),
# F.lit(self.date_info).alias('date_info')
# )
# # CommonUtil.check_schema(self.spark, df_save, self.hive_tb)
#
# print(f"清除hdfs目录中:{self.hdfs_path}")
# HdfsUtils.delete_file_in_folder(self.hdfs_path)
#
# df_save = df_save.repartition(10)
# partition_by = ["site_name", "date_type", "date_info"]
# print(f"当前存储的表名为:{self.hive_tb},分区为{partition_by}", )
# df_save.write.saveAsTable(name=self.hive_tb, format='hive', mode='append', partitionBy=partition_by)
# print("success")
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
date_type = CommonUtil.get_sys_arg(2, None)
date_info = CommonUtil.get_sys_arg(3, None) # 参数3:年-周/年-月/年-季/年-月-日, 比如: 2022-1
assert site_name is not None, "site_name 不能为空!"
assert date_type is not None, "date_type 不能为空!"
assert date_info is not None, "date_info 不能为空!"
obj = DwtStMtChristmasInfo(site_name=site_name, date_type=date_type, date_info=date_info)
obj.handle_data()
\ No newline at end of file
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.secure_db_client import get_remote_engine
engine = get_remote_engine(
site_name="us", # -> database "selection"
db_type="postgresql_cluster", # -> 服务端 alias "mysql"
user="fangxingjun", # -> 服务端 alias "mysql"
user_token="5f1b2e9c3a4d7f60" # 可不传,走默认
)
site_name = 'us'
date_type = 'month_aba_me'
date_info = '2025-11'
partitions = {
'site_name': site_name,
'date_type': date_type,
'date_info': date_info,
}
cols_list = ['search_term', 'total_searched_products', 'mt_volume', 'st_price_avg', 'st_ao_val', 'total_amazon_orders_per50', 'total_bsr_orders', 'total_page3_products', 'total_self_products', 'total_new_products', 'total_orders', 'total_amazon_orders', 'self_product_rate', 'new_product_rate', 'created_time', 'updated_time', 'site_name', 'date_type', 'date_info']
engine.sqoop_raw_export(
hive_table='dwt_st_mt_christmas_info',
import_table=f'{site_name}_st_mt_christmas_report_2025',
partitions=partitions,
m=1,
cols=','.join(cols_list)
)
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment