Commit 76fe262e by fangxingjun

no message

parent a063aaef
"""
数据源:
1. 搜索词对应的市场asin
2. asin详情页对应的广告位关联asin
3. 当前月对应所有天的新品和榜单和keep的asin
流程:
month: 新增未抓取的asin, 123
month_week: 清空爬虫的syn表, 12
day: 3
"""
import os
import random
import sys
import time
import traceback
import pandas as pd
from pyspark.storagelevel import StorageLevel
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from utils.templates import Templates
# from ..utils.templates import Templates
# from AmazonSpider.pyspark_job.utils.templates import Templates
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
# 分组排序的udf窗口函数
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from utils.db_util import DbTypes, DBUtil
from utils.common_util import CommonUtil
from utils.secure_db_client import get_remote_engine
class DwtAsinSync(Templates):
def __init__(self, site_name="us", date_type="week", date_info="2022-1"):
super().__init__()
self.site_name = site_name
self.date_type = date_type
self.date_info = date_info
self.db_save = f"dwt_asin_sync"
self.spark = self.create_spark_object(
app_name=f"{self.db_save}: {self.site_name}, {self.date_type}, {self.date_info}")
self.df_save = self.spark.sql(f"select 1+1;")
self.df_asin_from_st = self.spark.sql(f"select 1+1;")
self.df_asin_from_adv = self.spark.sql(f"select 1+1;")
self.df_asin_from_day = self.spark.sql(f"select 1+1;")
self.engine_pg = get_remote_engine(
site_name=self.site_name,
db_type='postgresql_14'
)
self.date_info_tuple = self.get_date_info_tuple()
def judge_date_type(self):
if self.date_type in ["month", "month_week"]:
sql_read = 1
def read_data_common(self, sql, content):
print(f"sql: {sql}, content: {content}")
df = self.spark.sql(sqlQuery=sql).drop_duplicates(["asin"]).cache()
df.show(10, truncate=False)
return df
def read_data(self):
sql_asin_from_st = f"select asin, asin_flag as 1 from dim_st_asin_info where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}'"
print(f"sql_asin_from_st: {sql_asin_from_st}")
self.df_asin_from_st = self.spark.sql(sqlQuery=sql_asin_from_st).cache()
self.df_asin_from_st = self.df_asin_from_st.drop_duplicates(["asin"])
self.df_asin_from_st.show(10, truncate=False)
sql_asin_from_adv = f"select related_asin from dwt_asin_related_traffic where site_name='{self.site_name}' and date_type='month_week' and date_info='{self.date_info}'"
print(f"sql_asin_from_adv: {sql_asin_from_adv}")
self.df_asin_from_adv = self.spark.sql(sqlQuery=sql_asin_from_adv).cache()
# 解析关联asin字段
self.df_asin_from_adv = self.df_asin_from_adv.withColumn(
'asin', F.explode(F.split(F.col('related_asin'), ','))
).select('asin').drop_duplicates(['asin'])
self.df_asin_from_adv = self.df_asin_from_adv.withColumn('asin_flag', F.lit(2))
self.df_asin_from_adv.show(10, truncate=False)
if self.date_info == 'day':
sql_asin_from_day = f"select asin, asin_flag as 3 from dwd_nsr_bsr_keepa_asin where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}'"
else:
sql_asin_from_day = f"select asin, asin_flag as 3 from dwd_nsr_bsr_keepa_asin where site_name='{self.site_name}' and date_type='month_week' and date_info in {self.date_info_tuple}"
print(f"sql_asin_from_day: {sql_asin_from_day}")
self.df_asin_from_day = self.spark.sql(sqlQuery=sql_asin_from_adv).cache()
self.df_asin_from_day = self.df_asin_from_day.drop_duplicates(["asin"])
self.df_asin_from_day.show(10, truncate=False)
self.df_asin_from_st = self.read_data_common(sql=sql_asin_from_st, content="1. 读取dim_st_asin_info表的st-asin数据")
self.df_asin_from_adv = self.read_data_common(sql=sql_asin_from_adv, content="2. 读取dwt_asin_related_traffic表的asin广告位数据")
self.df_asin_from_adv = self.df_asin_from_adv.withColumn(
'asin', F.explode(F.split(F.col('related_asin'), ','))
).select('asin').drop_duplicates(['asin'])
self.df_asin_from_adv = self.df_asin_from_adv.withColumn('asin_flag', F.lit(2))
if self.date_info == 'day':
sql_asin_from_day = f"select asin, asin_flag as 3 from dwd_nsr_bsr_keepa_asin where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}'"
else:
sql_asin_from_day = f"select asin, asin_flag as 3 from dwd_nsr_bsr_keepa_asin where site_name='{self.site_name}' and date_type='month_week' and date_info in {self.date_info_tuple}"
self.df_asin_from_day = self.read_data_common(sql=sql_asin_from_day, content="3. 读取dwd_nsr_bsr_keepa_asin表的asin新品和榜单数据")
def handle_data(self):
if self.date_type in ['month']:
self.df_save = self.df_asin_from_st.unionByName(
self.df_asin_from_adv, allowMissingColumns=True
).unionByName(
self.df_asin_from_day, allowMissingColumns=True
)
if self.date_type == 'month_week':
self.df_save = self.df_asin_from_st.unionByName(
self.df_asin_from_adv, allowMissingColumns=True
)
if self.date_type == 'day':
self.df_save = self.df_asin_from_day
self.df_save = self.df_save.withColumn('site_name', F.lit(self.site_name))
self.df_save = self.df_save.withColumn('date_type', F.lit(self.date_type))
self.df_save = self.df_save.withColumn('date_info', F.lit(self.date_info))
def save_data(self):
pass
if __name__ == "__main__":
site_name = sys.argv[1] # 参数1:站点
date_type = sys.argv[2] # 参数2:类型:week/4_week/month/quarter
date_info = sys.argv[3] # 参数3:年-周/年-月/年-季, 比如: 2022-1
handle_obj = DwtAsinSync(site_name=site_name, date_type=date_type, date_info=date_info)
handle_obj.run()
...@@ -7,7 +7,7 @@ from utils.secure_db_client import get_remote_engine ...@@ -7,7 +7,7 @@ from utils.secure_db_client import get_remote_engine
def export_data(site_name, date_type, date_info): def export_data(site_name, date_type, date_info):
engine = get_remote_engine( engine = get_remote_engine(
site_name="us", # -> database "selection" site_name="us", # -> database "selection"
db_type="mysql", # -> 服务端 alias "mysql" db_type="postgresql_cluster", # -> 服务端 alias "mysql"
# user="fangxingjun", # -> 服务端 alias "mysql" # user="fangxingjun", # -> 服务端 alias "mysql"
# user_token="5f1b2e9c3a4d7f60" # 可不传,走默认 # user_token="5f1b2e9c3a4d7f60" # 可不传,走默认
) )
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment