Commit a9b8d60d by fangxingjun

no message

parent da83d3ac
......@@ -143,16 +143,6 @@ class DwdAsinToPg(Templates):
self.date_last_5_day_tuple = tuple(df_last_5_day.date)
def read_data(self):
# 测试月流程用
# sql = f"select asin from us_all_syn_st_month_{2024} where date_info='{2023-12}' limit 100000"
# print("sql===:", sql)
# pdf_asin = pd.read_sql(sql, con=self.engine_pg14)
# schema = StructType([
# StructField('asin', StringType(), True),
# ])
# df_asin = self.spark.createDataFrame(pdf_asin, schema=schema)
# df_asin.show(10, truncate=False)
if self.date_type == 'day':
print("1.1 读取dim_st_asin_info表(当前日)")
sql = f"select asin, site_name from dim_st_asin_info where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}';"
......
......@@ -33,11 +33,10 @@ from utils.secure_db_client import get_remote_engine
class DwtAsinSync(Templates):
def __init__(self, site_name="us", date_type="week", date_info="2022-1"):
def __init__(self, site_name="us", date_type="month", date_info="2026-01"):
super().__init__()
self.site_name = site_name
self.date_type = date_type
self.date_type = self.judge_date_type()
self.date_info = date_info
self.db_save = f"dwt_asin_sync"
self.spark = self.create_spark_object(
......@@ -57,14 +56,20 @@ class DwtAsinSync(Templates):
site_name=self.site_name,
db_type='mysql'
)
self.date_info_tuple = self.get_date_info_tuple()
print(type(self.engine_mysql))
print(self.engine_mysql)
self.date_type = self.judge_date_type()
self.get_date_info_tuple() # self.date_info_tuple
self.table_syn = f"{self.site_name}_all_syn_st_month_{self.date_info.replace('-', '_')}" if self.site_name == 'us' else f"{self.site_name}_all_syn_st_{self.date_info.replace('-', '_')}"
self.partitions_by = ['site_name', 'date_type', 'date_info']
self.reset_partitions(partitions_num=5)
def judge_date_type(self):
print(f"site_name: {self.site_name}, date_type: {self.date_type}, date_info: {self.date_info}")
if self.date_type in ["month", "month_week"]:
year, month = self.date_info.split("-")
sql = f"select count(*) as st_count from {self.site_name}_brand_analytics_month_{year} where year={year} and month={int(month)} ;"
df = pd.read_sql(sql, con=self.engine_mysql)
df = self.engine_mysql.read_sql(sql)
print("sql:", sql, df.shape)
if list(df.st_count)[0] >= 100_0000:
self.date_type = "month" # 追加syn爬虫表
......@@ -72,26 +77,28 @@ class DwtAsinSync(Templates):
self.date_type = "month_week" # 清空syn爬虫表
return self.date_type
def read_data_common(self, sql, content):
df = self.spark.sql(sqlQuery=sql).drop_duplicates(["asin"]).cache()
def read_data_common(self, sql, content, col_dup='asin'):
df = self.spark.sql(sqlQuery=sql).drop_duplicates([col_dup]).cache()
df.show(10, truncate=False)
print(f"sql: {sql}, content: {content}, count: {df.count()}")
return df
def read_data(self):
# 读取asin数据源
sql_asin_from_st = f"select asin, asin_flag as 1 from dim_st_asin_info where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}'"
sql_asin_from_st = f"select asin, 1 as asin_flag from dim_st_asin_info where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}'"
self.df_asin_from_st = self.read_data_common(sql=sql_asin_from_st, content="1.1. 读取dim_st_asin_info表的st-asin数据")
sql_asin_from_adv = f"select related_asin from dwt_asin_related_traffic where site_name='{self.site_name}' and date_type='month_week' and date_info='{self.date_info}'"
self.df_asin_from_adv = self.read_data_common(sql=sql_asin_from_adv, content="1.2. 读取dwt_asin_related_traffic表的asin广告位数据")
self.df_asin_from_adv = self.read_data_common(sql=sql_asin_from_adv, content="1.2. 读取dwt_asin_related_traffic表的asin广告位数据", col_dup="related_asin")
self.df_asin_from_adv = self.df_asin_from_adv.withColumn(
'asin', F.explode(F.split(F.col('related_asin'), ','))
).select('asin').drop_duplicates(['asin'])
self.df_asin_from_adv = self.df_asin_from_adv.withColumn('asin_flag', F.lit(2))
if self.date_info == 'day':
sql_asin_from_day = f"select asin, asin_flag as 3 from dwd_nsr_bsr_keepa_asin where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}'"
sql_asin_from_day = f"select asin, 3 as asin_flag from dwd_nsr_bsr_keepa_asin where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}'"
else:
sql_asin_from_day = f"select asin, asin_flag as 3 from dwd_nsr_bsr_keepa_asin where site_name='{self.site_name}' and date_type='month_week' and date_info in {self.date_info_tuple}"
sql_asin_from_day = f"select asin, 3 as asin_flag from dwd_nsr_bsr_keepa_asin where site_name='{self.site_name}' and date_type='day' and date_info in {self.date_info_tuple}"
self.df_asin_from_day = self.read_data_common(sql=sql_asin_from_day, content="1.3. 读取dwd_nsr_bsr_keepa_asin表的asin新品和榜单数据")
# 读取asin属性值
sql_asin_variation = f"""select asin, 1 as asin_is_variation from dim_asin_variation_info where site_name="{self.site_name}";"""
......@@ -101,11 +108,13 @@ class DwtAsinSync(Templates):
# 读取syn爬虫表
year, month = self.date_info.split("-")
sql_asin_syn = f"select asin from us_all_syn_st_month_{year} where date_info='{self.date_info}'"
pdf_asin = pd.read_sql(sql_asin_syn, con=self.engine_pg14)
pdf_asin = self.engine_pg14.read_sql(sql_asin_syn)
schema = StructType([
StructField('asin', StringType(), True),
])
self.df_asin_syn = self.spark.createDataFrame(pdf_asin, schema=schema)
self.df_asin_syn = self.df_asin_syn.drop_duplicates(["asin"]).cache()
print(f"self.df_asin_syn: {self.df_asin_syn.count()}")
self.df_asin_syn.show(10, truncate=False)
def handle_data(self):
......@@ -122,28 +131,33 @@ class DwtAsinSync(Templates):
if self.date_type == 'day':
self.df_save = self.df_asin_from_day
print(f"去重之前的asin_flag类型数量统计")
self.df_save.groupby(f"asin_flag").agg(
F.count("asin").alias("asin_flag_count")
).show()
window = Window.partitionBy(['asin']).orderBy(F.desc(f"asin_flag"))
self.df_save = self.df_save.withColumn(f"asin_flag_row_number", F.row_number().over(window)) \
.filter(f'asin_flag_row_number = 1')
self.df_save.groupby(f"asin_flag_count").agg(
.filter(f'asin_flag_row_number = 1').drop(f"asin_flag_row_number")
print(f"去重之后的asin_flag类型数量统计")
self.df_save.groupby(f"asin_flag").agg(
F.count("asin").alias("asin_flag_count")
).show()
self.df_save = self.df_save.drop(f"asin_flag_row_number_percent_rank")
self.df_save = self.df_save.join(
self.df_asin_variation, on='asin', how='left'
).join(
self.df_asin_stable, on='asin', how='left'
)
# 处理同步逻辑
self.df_save = self.df_save.join(self.df_asin_syn, on=['asin'], how="left_anti")
self.df_save = self.df_save.withColumn('site_name', F.lit(self.site_name))
self.df_save = self.df_save.withColumn('date_type', F.lit(self.date_type))
self.df_save = self.df_save.withColumn('date_info', F.lit(self.date_info))
self.df_save.show(10, truncate=False)
print(f"self.df_save: {self.df_save.count()}")
def save_data(self):
pass
# def save_data(self):
# pass
if __name__ == "__main__":
......
......@@ -19,8 +19,9 @@ def export_data(site_name, date_type, date_info):
cols_list = ['asin', 'rating_and_comments_info', 'date_info']
import_table = f'{site_name}_self_rating_180day_copy'
target_table = f'{site_name}_self_rating_180day'
sql_drop = f"drop table if exists {import_table};"
sql_create = f"create table {import_table} like {target_table};"
# sql_drop = f"drop table if exists {import_table};"
# sql_create = f"create table {import_table} like {target_table};"
sql_drop = f"drop table if exists {import_table};" # 删除依赖
sql_create = f"""CREATE TABLE {import_table} (LIKE {target_table} INCLUDING ALL);""" # pg
engine.execute(sql_drop)
engine.execute(sql_create)
......
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.secure_db_client import get_remote_engine
def export_data(site_name, date_type, date_info):
engine = get_remote_engine(
site_name="us", # -> database "selection"
db_type="postgresql_14", # -> 服务端 alias "mysql"
# user="fangxingjun", # -> 服务端 alias "mysql"
# user_token="5f1b2e9c3a4d7f60" # 可不传,走默认
)
partitions = {
'site_name': site_name,
'date_type': date_type,
'date_info': date_info,
}
cols_list = ['asin', 'asin_is_variation', 'volume', 'weight_str', 'date_info']
import_table = f'{site_name}_all_syn_st_month_{date_info.replace("-", "_")}'
hive_table = 'dwt_asin_sync'
print(f"import_table: {import_table}, hive_table: {hive_table}")
print(f"partitions: {partitions}")
engine.sqoop_raw_export(
hive_table=hive_table,
import_table=import_table,
partitions=partitions,
m=1,
cols=','.join(cols_list)
)
if __name__ == '__main__':
# site_name = 'us'
# date_type = 'month'
# date_info = '2026-01'
site_name = sys.argv[1] # 参数1:站点
date_type = sys.argv[2] # 参数2:类型:week/4_week/month/quarter/day
date_info = sys.argv[3] # 参数3:年-周/年-月/年-季/年-月-日, 比如: 2022-1
export_data(site_name, date_type, date_info)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment