Commit 7839ed18 by chenyuanjie

ASIN信息库相关代码

parent 45f83a44
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.spark_util import SparkUtil
from pyspark.sql import functions as F, Window
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils
class DwtAiAsinAdd(object):
def __init__(self, site_name="us", date_type="month", date_info="2024-10"):
self.site_name = site_name
self.date_type = date_type
self.date_info = date_info
app_name = f"{self.__class__.__name__}:{site_name}:{date_type}:{date_info}"
self.spark = SparkUtil.get_spark_session(app_name)
# 近6个月list
self.last_6_month = []
for i in range(0, 6):
self.last_6_month.append(CommonUtil.get_month_offset(self.date_info, -i))
self.df_base_asin = self.spark.sql(f"select 1+1;")
self.df_flow_asin_detail = self.spark.sql(f"select 1+1;")
self.df_fb_info = self.spark.sql(f"select 1+1;")
self.df_ods_asin_detail = self.spark.sql(f"select 1+1;")
self.df_ai_asin_detail = self.spark.sql(f"select 1+1;")
self.df_asin_bought_flag = self.spark.sql(f"select 1+1;")
self.df_save = self.spark.sql(f"select 1+1;")
def run(self):
self.read_data()
self.handle_data()
self.save_data()
def read_data(self):
# 读取ASIN信息库基础数据
sql1 = f"""
select
asin,
asin_bought_month,
asin_bought_mom,
asin_bought_yoy,
asin_bought_month_flag,
asin_is_new_flag
from dwd_ai_asin_add
where site_name = '{self.site_name}'
and date_type = '{self.date_type}'
and date_info = '{self.date_info}'
"""
self.df_base_asin = self.spark.sql(sqlQuery=sql1).repartition(40, 'asin').cache()
print("ASIN信息库基础数据如下:")
self.df_base_asin.show(10, truncate=True)
# 读取流量选品详情数据
sql2 = f"""
select
asin,
asin_weight,
asin_category_desc,
asin_img_url,
asin_title,
asin_brand_name,
account_name,
asin_buy_box_seller_type,
asin_launch_time,
asin_img_num,
case when variation_num > 0 then 1 else 0 end as variation_flag,
variation_num,
asin_ao_val,
category_first_id,
category_id,
parent_asin,
first_category_rank,
asin_price,
asin_rating,
asin_total_comments,
asin_launch_time_type,
asin_describe
from dwt_flow_asin
where site_name = '{self.site_name}'
and date_type = '{self.date_type}'
and date_info = '{self.date_info}'
and asin_type in (0, 1)
and asin_bought_month >= 50
"""
self.df_flow_asin_detail = self.spark.sql(sqlQuery=sql2).repartition(40, 'asin').cache()
print("流量选品详情数据如下:")
self.df_flow_asin_detail.show(10, truncate=True)
# 读取店铺数据
sql3 = f"""
select
account_name,
seller_id,
fb_country_name,
business_addr
from dwt_fb_base_report
where site_name = '{self.site_name}'
and date_type = '{self.date_type}'
and date_info = '{self.date_info}'
"""
self.df_fb_info = self.spark.sql(sqlQuery=sql3).dropDuplicates(['account_name']).cache()
print("店铺详情数据如下:")
self.df_fb_info.show(10, truncate=True)
# 读取review_json_list等详情数据
sql4 = f"""
select
asin,
review_json_list,
product_json,
product_detail_json,
updated_at
from ods_asin_detail
where site_name = '{self.site_name}'
and date_type = '{self.date_type}'
and date_info = '{self.date_info}'
"""
self.df_ods_asin_detail = self.spark.sql(sqlQuery=sql4)
window = Window.partitionBy(['asin']).orderBy(
self.df_ods_asin_detail.updated_at.desc_nulls_last()
)
self.df_ods_asin_detail = self.df_ods_asin_detail.withColumn(
'rank', F.row_number().over(window=window)
).filter('rank = 1').drop('rank', 'updated_at').repartition(40, 'asin').cache()
print("ods详情数据如下:")
self.df_ods_asin_detail.show(10, truncate=True)
# df对象join聚合
self.df_ai_asin_detail = self.df_base_asin.join(
self.df_flow_asin_detail, 'asin', 'left'
).join(
self.df_ods_asin_detail, 'asin', 'left'
).join(
self.df_fb_info, 'account_name', 'left'
).cache()
self.df_base_asin.unpersist()
self.df_flow_asin_detail.unpersist()
self.df_fb_info.unpersist()
self.df_ods_asin_detail.unpersist()
# 读取dwd_ai_asin_add月销标识
sql5 = f"""
select
asin,
asin_bought_month_flag
from dwd_ai_asin_add
where site_name = '{self.site_name}'
and date_type = '{self.date_type}'
and date_info in ({CommonUtil.list_to_insql(self.last_6_month)})
"""
self.df_asin_bought_flag = self.spark.sql(sqlQuery=sql5).repartition(40, 'asin').cache()
print("dwd_ai_asin_add月销标识数据如下:")
self.df_asin_bought_flag.show(10, truncate=True)
def handle_data(self):
# 计算上升产品标识:连续6个月销量上升
self.df_asin_bought_flag = self.df_asin_bought_flag.groupBy('asin').agg(
F.sum(F.when(F.col('asin_bought_month_flag') == 1, 1).otherwise(0)).alias('sum_flag')
).withColumn(
'is_ascending_flag', F.when(F.col('sum_flag') == 6, 1).otherwise(0)
)
def save_data(self):
# 字段标准化
self.df_save = self.df_ai_asin_detail.join(
self.df_asin_bought_flag, 'asin', 'left'
).select(
F.col("asin"),
F.col("asin_weight").alias("weight"),
F.col("asin_bought_month").alias("bought_month"),
F.col("asin_category_desc").alias("category"),
F.col("asin_img_url").alias("img"),
F.col("asin_title").alias("title"),
F.col("asin_brand_name").alias("brand"),
F.col("account_name"),
F.col("business_addr").alias("account_addr"),
F.col("asin_buy_box_seller_type").alias("buy_box_seller_type"),
F.col("asin_launch_time").alias("launch_time"),
F.col("asin_img_num").alias("img_num"),
F.col("variation_flag"),
F.col("variation_num"),
F.col("asin_ao_val").alias("ao_val"),
F.col("category_first_id").alias("category_id"),
F.col("category_id").alias("category_current_id"),
F.col("parent_asin"),
F.col("first_category_rank").alias("bsr_rank"),
F.col("asin_price").alias("price"),
F.col("asin_rating").alias("rating"),
F.col("asin_total_comments").alias("total_comments"),
F.col("seller_id"),
F.col("fb_country_name"),
F.col("review_json_list"),
F.col("asin_launch_time_type").alias("launch_time_type"),
F.col("asin_describe").alias("describe"),
F.col("product_json"),
F.col("product_detail_json"),
F.col("asin_bought_mom").alias("bought_month_mom"),
F.col("asin_bought_yoy").alias("bought_month_yoy"),
F.col("asin_is_new_flag").alias("is_new_flag"),
F.col("is_ascending_flag"),
F.lit(self.site_name).alias("site_name"),
F.lit(self.date_type).alias("date_type"),
F.lit(self.date_info).alias("date_info")
).repartition(100).cache()
# 数据存储
partition_by = ["site_name", "date_type", "date_info"]
hive_tb = "dwt_ai_asin_add"
hdfs_path = CommonUtil.build_hdfs_path(
hive_tb,
partition_dict={
"site_name": self.site_name,
"date_type": self.date_type,
"date_info": self.date_info,
}
)
HdfsUtils.delete_file_in_folder(hdfs_path)
print(f"正在进行数据存储,当前存储的表名为:{hive_tb},存储路径:{hdfs_path}")
self.df_save.write.saveAsTable(name=hive_tb, format='hive', mode='append', partitionBy=partition_by)
print("success!")
if __name__ == "__main__":
site_name = sys.argv[1]
date_type = sys.argv[2]
date_info = sys.argv[3]
handle_obj = DwtAiAsinAdd(site_name=site_name, date_type=date_type, date_info=date_info)
handle_obj.run()
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.spark_util import SparkUtil
from utils.es_util import EsUtils
from pyspark.sql import functions as F
from utils.common_util import CommonUtil
class EsAiAsinAll(object):
def __init__(self, site_name):
self.site_name = site_name
self.spark = SparkUtil.get_spark_session(f"{self.__class__.__name__}")
# ES相关配置
self.es_client = EsUtils.get_es_client()
self.es_index = f"{site_name}_ai_analyze_extra"
self.es_body = self.get_es_body()
self.es_options = self.get_es_options(self.es_index)
self.policy_name = f"{site_name}_ai_analyze_policy"
self.pipeline_id = f"{site_name}_ai_analyze_pipeline"
self.df_asin_detail = self.spark.sql(f"select 1+1;")
@staticmethod
def get_es_body():
return {
"settings": {
"number_of_shards": "3",
"number_of_replicas": "1"
},
"mappings": {
"properties": {
"asin": {
"type": "keyword"
},
"is_stable_flag": {
"type": "short"
},
"is_periodic_flag": {
"type": "short"
},
"is_ascending_flag": {
"type": "short"
},
"max_bought_month_arr": {
"type": "integer"
}
}
}
}
@staticmethod
def get_es_options(index_name):
return {
"es.nodes": EsUtils.__es_ip__,
"es.port": EsUtils.__es_port__,
"es.net.http.auth.user": EsUtils.__es_user__,
"es.net.http.auth.pass": EsUtils.__es_passwd__,
"es.mapping.id": "asin",
"es.resource": f"{index_name}/_doc",
"es.batch.write.refresh": "false",
"es.batch.write.retry.wait": "60s",
"es.batch.size.entries": "5000",
"es.nodes.wan.only": "false",
"es.batch.write.concurrency": "40",
"es.write.operation": "index"
}
def run(self):
self.read_data()
self.es_save()
self.create_enrich_policy()
self.create_enrich_pipeline()
def read_data(self):
sql = f"""
select
asin,
is_stable_flag,
is_periodic_flag,
is_ascending_flag,
max_month_last_12_month as max_bought_month_arr
from dwt_ai_asin_all
where site_name = '{self.site_name}'
"""
self.df_asin_detail = self.spark.sql(sqlQuery=sql).repartition(40, 'asin').withColumn(
"max_bought_month_arr", F.split(F.col("max_bought_month_arr"), ",")
).withColumn(
"max_bought_month_arr", F.expr("transform(max_bought_month_arr, x -> cast(x as int))")
).cache()
print("ASIN信息库数据如下:")
self.df_asin_detail.show(10, True)
def es_save(self):
print(f"创建富集索引:{self.es_index}!")
EsUtils.create_index(self.es_index, self.es_client, self.es_body)
try:
self.df_asin_detail.write.format("org.elasticsearch.spark.sql") \
.options(**self.es_options) \
.mode("append") \
.save()
print(f"ES {self.es_index} 索引更新完毕!")
except Exception as e:
print("An error occurred while writing to Elasticsearch:", str(e))
CommonUtil.send_wx_msg(['chenyuanjie'], '\u26A0 ES数据更新失败', f'失败索引:{self.es_index}')
def create_enrich_policy(self):
# print(f"创建富集策略:{self.policy_name}!")
# policy_body = {
# "match": {
# "indices": f"{self.es_index}",
# "match_field": "asin",
# "enrich_fields": ["is_stable_flag", "is_periodic_flag", "is_ascending_flag", "max_bought_month_arr"]
# }
# }
# self.es_client.enrich.put_policy(name=self.policy_name, body=policy_body)
print(f"刷新富集策略:{self.policy_name}!")
self.es_client.enrich.execute_policy(self.policy_name, request_timeout=1800)
def create_enrich_pipeline(self):
print(f"创建富集管道:{self.pipeline_id}!")
pipeline_body = {
"description": "ai asin analyze pipeline",
"processors": [
{
"enrich": {
"policy_name": self.policy_name,
"field": "asin",
"target_field": "last_year_extra",
"max_matches": 1,
"ignore_missing": True
},
}
]
}
self.es_client.ingest.put_pipeline(id=self.pipeline_id, body=pipeline_body)
pass
if __name__ == "__main__":
site_name = sys.argv[1]
handle_obj = EsAiAsinAll(site_name)
handle_obj.run()
print("success!!!")
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.common_util import CommonUtil
from utils.secure_db_client import get_remote_engine
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
date_type = CommonUtil.get_sys_arg(2, None)
date_info = CommonUtil.get_sys_arg(3, None)
print(f"执行参数为{sys.argv}")
# 获取数据库引擎
db_type = "postgresql_15"
engine = get_remote_engine(
site_name='us',
db_type=db_type
)
if site_name == 'us':
export_tb = f"ai_asin_detail_month_{date_info.replace('-', '_')}"
else:
export_tb = f"{site_name}_ai_asin_detail_month_{date_info.replace('-', '_')}"
# 导出数据
engine.sqoop_raw_export(
hive_table="dwt_ai_asin_add",
import_table=export_tb,
partitions={
"site_name": site_name,
"date_type": date_type,
"date_info": date_info
},
m=30,
cols="site_name,asin,weight,bought_month,category,img,title,brand,account_name,account_addr,buy_box_seller_type,"
"launch_time,img_num,variation_flag,variation_num,ao_val,category_id,category_current_id,parent_asin,bsr_rank,"
"price,rating,total_comments,seller_id,fb_country_name,review_json_list,launch_time_type,describe,product_json,"
"product_detail_json,bought_month_mom,bought_month_yoy,is_new_flag,is_ascending_flag"
)
print("success")
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment