Commit c023bddc by fangxingjun

no message

parent d22775a0
import os
import random
import sys
import time
import traceback
import pandas as pd
from pyspark.storagelevel import StorageLevel
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from utils.templates import Templates
# from ..utils.templates import Templates
# from AmazonSpider.pyspark_job.utils.templates import Templates
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
# 分组排序的udf窗口函数
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from utils.db_util import DbTypes, DBUtil
from utils.common_util import CommonUtil
from datetime import datetime, timedelta
class DwdNsrBsrKeepaAsin(Templates):
def __init__(self, site_name="us", date_type="day", date_info="2026-02-02"):
super().__init__()
self.site_name = site_name
self.date_type = date_type
self.date_info = date_info
self.db_save = f"dwd_nsr_bsr_keepa_asin"
self.db_save_cate = f"dwd_asin_cate_flag"
self.spark = self.create_spark_object(app_name=f"{self.db_save}: {self.site_name}, {self.date_type}, {self.date_info}")
self.df_save = self.spark.sql(f"select 1+1;")
self.df_save_asin_cate = self.spark.sql(f"select 1+1;")
self.df_asin_nsr = self.spark.sql(f"select 1+1;")
self.df_asin_bsr = self.spark.sql(f"select 1+1;")
self.df_asin_keepa = self.spark.sql(f"select 1+1;")
self.partitions_by = ['site_name', 'date_type', 'date_info']
self.reset_partitions(partitions_num=5)
@staticmethod
def get_date_30_days_ago(date_str: str, date_format: str = "%Y-%m-%d") -> str:
# 将字符串转换为日期对象
date_obj = datetime.strptime(date_str, date_format)
# 获取 30 天前的日期
thirty_days_ago = date_obj - timedelta(days=30)
# 转换为字符串并返回
return thirty_days_ago.strftime(date_format)
# # 测试
# date_input = "2026-02-03" # 示例日期
# result = get_date_30_days_ago(date_input)
# print(result)
@staticmethod
def is_saturday(date_str: str, fmt: str = "%Y-%m-%d") -> bool:
dt = datetime.strptime(date_str, fmt)
# weekday(): 周一=0 ... 周六=5 周日=6
return dt.weekday() == 5
# # 示例
# print(is_saturday("2026-02-07")) # True
# print(is_saturday("2026-02-06")) # False
def read_data(self):
thirty_days_ago = self.get_date_30_days_ago(date_str=self.date_info)
print(f"1.1 读取最近30天的bsr的asin")
sql_bsr = f"select asin, date_info, 1 as asin_cate_flag from dim_bsr_asin_rank_history where site_name='{self.site_name}' and date_info between '{thirty_days_ago}' and '{self.date_info}'"
print("sql_bsr:", sql_bsr)
self.df_asin_bsr = self.spark.sql(sqlQuery=sql_bsr).cache()
self.df_asin_bsr.show(10, truncate=False)
print(f"1.2 读取最近30天的nsr的asin")
sql_nsr = f"select asin, date_info, 2 as asin_cate_flag from dim_nsr_asin_rank_history where site_name='{self.site_name}' and date_info between '{thirty_days_ago}' and '{self.date_info}'"
print("sql_nsr:", sql_nsr)
self.df_asin_nsr = self.spark.sql(sqlQuery=sql_nsr).cache()
self.df_asin_nsr.show(10, truncate=False)
print(f"1.3 读取最近30天的keepa的asin")
sql_keepa = f"select distinct(asin), date_info, 3 as asin_cate_flag from ods_keepa_finder_asin where site_name='{self.site_name}' and date_info between '{thirty_days_ago}' and '{self.date_info}'"
print("sql_keepa:", sql_keepa)
self.df_asin_keepa = self.spark.sql(sqlQuery=sql_keepa).cache()
self.df_asin_keepa.show(10, truncate=False)
def handle_data(self):
df_bsr = self.df_asin_bsr.select("asin", "asin_cate_flag")
df_nsr = self.df_asin_nsr.select("asin", "asin_cate_flag")
df_keepa = self.df_asin_keepa.select("asin", "asin_cate_flag")
# df_keepa = self.df_asin_keepa.select("asin", "asin_cate_flag")
df_union = df_bsr.unionByName(df_nsr).unionByName(df_keepa)
# df_union = df_bsr.unionByName(df_nsr)
df_result = (
df_union
.groupBy("asin")
.agg(
F.sort_array(F.collect_set("asin_cate_flag")).alias("flag_list")
)
.withColumn(
"asin_cate_flag",
F.concat_ws(",", F.col("flag_list"))
)
.drop("flag_list")
)
df_result.show(10, truncate=False)
df_result.groupBy("asin_cate_flag").count().orderBy("count", ascending=False).show(truncate=False)
self.df_save_asin_cate = df_result
self.df_save_asin_cate = self.df_save_asin_cate.withColumn("site_name", F.lit(self.site_name))
self.df_save_asin_cate.show(10, truncate=False)
self.save_data_common(
df_save=self.df_save_asin_cate,
db_save=self.db_save_cate,
partitions_num=self.partitions_num,
partitions_by=["site_name"]
)
# self.df_save_asin_cate = self.df_save_asin_cate.withColumn("date_type", F.lit(self.date_type))
# self.df_save_asin_cate = self.df_save_asin_cate.withColumn("date_info", F.lit(self.date_info))
# df_asin_today = self.df_save.filter(f"date_info='{self.date_info}'")
is_saturday_flag = self.is_saturday(self.date_info)
self.df_save = self.df_asin_bsr.unionByName(self.df_asin_nsr, allowMissingColumns=True).unionByName(self.df_asin_keepa, allowMissingColumns=True)
window = Window.partitionBy(['asin']).orderBy(F.asc("date_info"))
self.df_save = self.df_save.withColumn(
"rk", F.row_number().over(window=window)
).filter("rk=1").drop("rk").cache()
self.df_save = self.df_save.select("asin", "date_info")
self.df_save = self.df_save.withColumn("site_name", F.lit(self.site_name))
self.df_save = self.df_save.withColumn("date_type", F.lit(self.date_type))
if is_saturday_flag:
self.df_save = self.df_save.withColumn("date_info", F.lit(self.date_info))
else:
self.df_save = self.df_save.filter(f"date_info='{self.date_info}'")
self.df_save = self.df_save.join(
df_result, on=['asin'], how='left'
)
self.df_save.show(10, truncate=False)
# print(self.df_save.count())
# self.df_save.groupBy("date_info").count().orderBy("count", ascending=False).show(truncate=False)
# self.df_save.groupBy("asin_cate_flag").count().orderBy("count", ascending=False).show(truncate=False)
# exit()
# def save_data(self):
# pass
if __name__ == '__main__':
site_name = sys.argv[1] # 参数1:站点
date_type = sys.argv[2] # 参数2:类型:week/4_week/month/quarter/day
date_info = sys.argv[3] # 参数3:年-周/年-月/年-季/年-月-日, 比如: 2022-1
handle_obj = DwdNsrBsrKeepaAsin(site_name=site_name, date_type=date_type, date_info=date_info)
handle_obj.run()
import sys import sys
import os import os
import time
import pandas as pd import pandas as pd
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
# from utils.templates import Templates # from utils.templates import Templates
...@@ -70,10 +72,20 @@ class ImportStToPg14(object): ...@@ -70,10 +72,20 @@ class ImportStToPg14(object):
self.df_save.to_sql(f"{self.site_name}_search_term_month", con=self.engine_pg14, index=False, if_exists="append") self.df_save.to_sql(f"{self.site_name}_search_term_month", con=self.engine_pg14, index=False, if_exists="append")
# pass # pass
def run(self): def run(self, num=0):
self.read_data() while num<=3:
self.handle_data() try:
self.save_data() self.read_data()
self.handle_data()
self.save_data()
break
except Exception as e:
print(f"搜索词导入到pg14失败:{self.site_name}-{self.date_type}--{self.date_info}")
self.engine_mysql = DBUtil.get_db_engine(db_type=DbTypes.mysql.name, site_name=self.site_name)
self.engine_pg14 = DBUtil.get_db_engine(db_type=DbTypes.postgresql_14.name, site_name=self.site_name)
time.sleep(60)
num += 1
continue
if __name__ == '__main__': if __name__ == '__main__':
......
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.secure_db_client import get_remote_engine
engine = get_remote_engine(
site_name="us", # -> database "selection"
db_type="postgresql_14", # -> 服务端 alias "mysql"
)
site_name = 'us'
date_type = 'day'
date_info = '2026-02-04'
partitions = {
'site_name': site_name,
'date_type': date_type,
'date_info': date_info,
}
cols_list = ['asin', 'asin_cate_flag', 'date_info']
engine.sqoop_raw_export(
hive_table='dwd_nsr_bsr_keepa_asin',
import_table=f'{site_name}_all_syn_st_day_{date_info.replace("-", "_")}',
partitions=partitions,
m=1,
cols=','.join(cols_list)
)
\ No newline at end of file
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.secure_db_client import get_remote_engine
site_name = sys.argv[1] # 参数1:站点
date_type = sys.argv[2] # 参数2:类型:day/week/4_week/month/quarter
date_info = sys.argv[3] # 参数3:年-月-日/年-周/年-月/年-季, 比如: 2022-1
db_type = "mysql"
engine = get_remote_engine(
site_name=site_name,
db_type=db_type
)
hive_table = "ods_keepa_finder_asin"
partition_dict = {
"site_name": site_name,
"date_type": date_type,
"date_info": date_info,
}
sql_query = f"select id, request_id, asin, tracking_since from keepa_finder_asin where 1=1 " \
f"and \$CONDITIONS"
engine.sqoop_raw_import(
query=sql_query,
hive_table=hive_table,
partitions=partition_dict,
m=1,
)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment