Commit 9a8892d9 by chenyuanjie

年度信息库

parent 508bfd64
"""
@Author : CT
@Description : 年度ASIN信息数据聚合
@SourceTable :
dwt_ai_asin_add (近12个月)
dwt_flow_asin (关联获取together_asin、title_len)
{site_name}_ai_asin_analyze_detail (AI分析数据,唯一)
@SinkTable : dwt_ai_asin_year
@CreateTime : 2025/03/03
@UpdateTime : 2025/03/03
"""
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.common_util import CommonUtil, DateTypes
from utils.hdfs_utils import HdfsUtils
from utils.spark_util import SparkUtil
from utils.db_util import DBUtil
from pyspark.sql import functions as F, Window
from pyspark.sql.types import IntegerType, StringType
from pyspark.storagelevel import StorageLevel
class DwtAiAsinYear(object):
"""
年度ASIN信息数据聚合
- 数据来源:dwt_ai_asin_add 近12个月数据
- 主字段asin是唯一字段
- 月销是聚合字段(近一年该asin月销和)
- Q1到Q4销量字段
- 出现月份
- 其他字段保留该asin出现的最新月份的月数据
- AI分析字段本身是唯一的,关联即可
"""
def __init__(self, site_name, date_type, date_info):
self.site_name = site_name
self.date_type = date_type
self.date_info = date_info
app_name = f"{self.__class__.__name__}:{self.site_name}:{self.date_type}:{self.date_info}"
self.spark = SparkUtil.get_spark_session(app_name)
self.hive_tb = "dwt_ai_asin_year"
# PostgreSQL连接
self.pg_conn = DBUtil.get_connection_info("postgresql", "us")
# 全局df
self.df_ai_asin_add = self.spark.sql(f"select 1+1;")
self.df_flow_asin = self.spark.sql(f"select 1+1;")
self.df_base = self.spark.sql(f"select 1+1;")
self.df_monthly_sales = self.spark.sql(f"select 1+1;")
self.df_ai_analyze = self.spark.sql(f"select 1+1;")
self.df_save = self.spark.sql(f"select 1+1;")
# 过去12月list
self.last_12_month = []
for i in range(0, 12):
self.last_12_month.append(CommonUtil.get_month_offset(self.date_info, -i))
def run(self):
self.read_data()
self.handle_data()
self.save_data()
def read_data(self):
"""
分别读取数据后再关联:
1. dwt_ai_asin_add近12个月数据
2. dwt_flow_asin获取together_asin、title_len字段
3. AI分析数据(唯一)
"""
# 1. 读取dwt_ai_asin_add近12个月数据
sql1 = f"""
select
asin,
weight,
bought_month,
category,
img,
title,
brand,
account_name,
account_addr,
buy_box_seller_type,
launch_time,
img_num,
variation_flag,
variation_num,
ao_val,
category_id,
category_current_id,
parent_asin,
bsr_rank,
price,
rating,
total_comments,
seller_id,
fb_country_name,
launch_time_type,
`describe`,
bought_month_mom,
bought_month_yoy,
is_new_flag,
is_ascending_flag,
date_info
from dwt_ai_asin_add
where site_name = '{self.site_name}'
and date_type = '{self.date_type}'
and date_info in ({CommonUtil.list_to_insql(self.last_12_month)})
"""
self.df_ai_asin_add = self.spark.sql(sql1).repartition(80, "asin", "date_info") \
.persist(StorageLevel.DISK_ONLY)
print("dwt_ai_asin_add近12个月数据读取完成,示例数据如下:")
self.df_ai_asin_add.show(10, truncate=True)
# 2. 读取dwt_flow_asin获取together_asin、title_len
sql2 = f"""
select
asin,
together_asin,
asin_title_len as title_len,
asin_price as flow_price,
asin_price_mom as price_mom,
asin_price_yoy as price_yoy,
date_info
from dwt_flow_asin
where site_name = '{self.site_name}'
and date_type = '{self.date_type}'
and date_info in ({CommonUtil.list_to_insql(self.last_12_month)})
and asin_type in (0, 1)
and asin_bought_month >= 50
"""
self.df_flow_asin = self.spark.sql(sql2).repartition(80, "asin", "date_info") \
.persist(StorageLevel.DISK_ONLY)
print("dwt_flow_asin数据读取完成,示例数据如下:")
self.df_flow_asin.show(10, truncate=True)
# 3. 读取AI分析数据(本身是唯一的)
sql3 = f"""
select
asin,
id as analyze_id,
package_quantity,
material,
color,
appearance,
size,
function,
shape,
scene_title,
scene_comment,
uses,
theme,
crowd,
short_desc,
title_pic_flag,
title_word_flag,
title_pic_content,
title_word_content,
array_to_string(package_quantity_arr, ',') as package_quantity_arr,
package_quantity_flag,
label_content,
multi_color_flag,
multi_color_content,
festival
from {self.site_name}_ai_asin_analyze_detail
"""
self.df_ai_analyze = SparkUtil.read_jdbc_query(
session=self.spark,
url=self.pg_conn["url"],
pwd=self.pg_conn["pwd"],
username=self.pg_conn["username"],
query=sql3
).repartition(40, 'asin').persist(StorageLevel.DISK_ONLY)
print("AI分析数据如下:")
self.df_ai_analyze.show(10, truncate=True)
# 4. 关联dwt_ai_asin_add和dwt_flow_asin
self.df_base = self.df_ai_asin_add.join(
self.df_flow_asin, on=['asin', 'date_info'], how='left'
).persist(StorageLevel.DISK_ONLY)
print("关联后数据如下:")
self.df_base.show(10, truncate=True)
def handle_data(self):
# 计算每个ASIN最新出现的月份
self.handle_latest_appear()
# 计算12个月销量(用于季度销量计算和出现时间)
self.handle_monthly_sales()
# 合并数据
self.handle_merge()
def handle_latest_appear(self):
"""
对于每个ASIN,取近一年中最新出现的月份的数据
使用窗口函数按date_info降序排序,取第一条
"""
window = Window.partitionBy("asin").orderBy(F.col("date_info").desc_nulls_last())
self.df_base = self.df_base.withColumn(
"row_num", F.row_number().over(window)
).filter(
F.col("row_num") == 1
).drop("row_num").cache()
print("每个ASIN取最新出现月份数据完成,示例数据如下:")
self.df_base.show(10, truncate=True)
def handle_monthly_sales(self):
"""
计算12个月销量,用于:
1. 季度销量计算
2. 出现时间统计
3. 年度总销量
"""
# 读取12个月的月销数据
sql = f"""
select
asin,
bought_month,
date_info
from dwt_ai_asin_add
where site_name = '{self.site_name}'
and date_type = '{self.date_type}'
and date_info in ({CommonUtil.list_to_insql(self.last_12_month)})
"""
df_sales = self.spark.sql(sql).repartition(80, "asin", "date_info").cache()
# 行转列:12个月销量
df_pivot = df_sales.groupBy("asin").pivot("date_info", self.last_12_month).agg(
F.first("bought_month").alias("bought_month")
).cache()
# 重命名列:2024-07 -> bought_month_7
for month_str in self.last_12_month:
month_num = int(month_str.split('-')[-1])
df_pivot = df_pivot.withColumnRenamed(month_str, f"bought_month_{month_num}")
# 计算季度销量
df_pivot = df_pivot.withColumn(
"q1_bought_month",
F.expr("coalesce(bought_month_1,0) + coalesce(bought_month_2,0) + coalesce(bought_month_3,0)")
).withColumn(
"q2_bought_month",
F.expr("coalesce(bought_month_4,0) + coalesce(bought_month_5,0) + coalesce(bought_month_6,0)")
).withColumn(
"q3_bought_month",
F.expr("coalesce(bought_month_7,0) + coalesce(bought_month_8,0) + coalesce(bought_month_9,0)")
).withColumn(
"q4_bought_month",
F.expr("coalesce(bought_month_10,0) + coalesce(bought_month_11,0) + coalesce(bought_month_12,0)")
)
# 计算年度总销量
df_pivot = df_pivot.withColumn(
"bought_month_sum",
F.col("q1_bought_month") + F.col("q2_bought_month") +
F.col("q3_bought_month") + F.col("q4_bought_month")
)
# 计算出现时间:该ASIN在哪些月份出现过(有销量)
df_pivot = df_pivot.withColumn(
"appear_month_list",
F.array(
F.when(F.col("bought_month_1").isNotNull(), F.lit(1)),
F.when(F.col("bought_month_2").isNotNull(), F.lit(2)),
F.when(F.col("bought_month_3").isNotNull(), F.lit(3)),
F.when(F.col("bought_month_4").isNotNull(), F.lit(4)),
F.when(F.col("bought_month_5").isNotNull(), F.lit(5)),
F.when(F.col("bought_month_6").isNotNull(), F.lit(6)),
F.when(F.col("bought_month_7").isNotNull(), F.lit(7)),
F.when(F.col("bought_month_8").isNotNull(), F.lit(8)),
F.when(F.col("bought_month_9").isNotNull(), F.lit(9)),
F.when(F.col("bought_month_10").isNotNull(), F.lit(10)),
F.when(F.col("bought_month_11").isNotNull(), F.lit(11)),
F.when(F.col("bought_month_12").isNotNull(), F.lit(12))
)
).withColumn(
"appear_month_list",
F.expr("filter(appear_month_list, x -> x is not null)")
).withColumn(
"total_appear_month_str",
F.concat_ws(",", F.col("appear_month_list"))
).drop("appear_month_list")
# 选择需要的字段(含每月销量)
self.df_monthly_sales = df_pivot.select(
"asin",
"bought_month_sum",
"q1_bought_month",
"q2_bought_month",
"q3_bought_month",
"q4_bought_month",
"total_appear_month_str",
"bought_month_1",
"bought_month_2",
"bought_month_3",
"bought_month_4",
"bought_month_5",
"bought_month_6",
"bought_month_7",
"bought_month_8",
"bought_month_9",
"bought_month_10",
"bought_month_11",
"bought_month_12"
).cache()
df_sales.unpersist()
print("季度销量和出现时间计算完成,示例数据如下:")
self.df_monthly_sales.show(10, truncate=True)
def handle_merge(self):
self.df_save = self.df_base.join(
self.df_monthly_sales, "asin", "left"
).join(
self.df_ai_analyze, "asin", "inner"
).withColumn(
"profit_key", F.concat_ws("_", F.col("asin"), F.col("price"))
).select(
F.col("account_addr"),
F.col("account_name"),
F.col("analyze_id"),
F.col("ao_val"),
F.col("appearance"),
F.col("asin"),
F.col("bought_month"),
F.col("bought_month_1").cast(IntegerType()),
F.col("bought_month_2").cast(IntegerType()),
F.col("bought_month_3").cast(IntegerType()),
F.col("bought_month_4").cast(IntegerType()),
F.col("bought_month_5").cast(IntegerType()),
F.col("bought_month_6").cast(IntegerType()),
F.col("bought_month_7").cast(IntegerType()),
F.col("bought_month_8").cast(IntegerType()),
F.col("bought_month_9").cast(IntegerType()),
F.col("bought_month_10").cast(IntegerType()),
F.col("bought_month_11").cast(IntegerType()),
F.col("bought_month_12").cast(IntegerType()),
F.col("bought_month_mom"),
F.col("bought_month_sum"),
F.col("bought_month_yoy"),
F.col("brand"),
F.col("bsr_rank"),
F.col("buy_box_seller_type"),
F.col("category"),
F.col("category_current_id"),
F.col("category_id"),
F.col("color"),
F.col("crowd"),
F.col("describe"),
F.col("fb_country_name"),
F.col("festival"),
F.col("function"),
F.col("img"),
F.col("img_num"),
F.col("is_ascending_flag"),
F.col("is_new_flag"),
F.col("label_content"),
F.col("launch_time"),
F.col("launch_time_type"),
F.col("material"),
F.col("multi_color_content"),
F.col("multi_color_flag"),
F.col("package_quantity"),
F.col("package_quantity_arr"),
F.col("package_quantity_flag"),
F.col("parent_asin"),
F.col("price"),
F.col("price_mom"),
F.col("price_yoy"),
F.col("profit_key"),
F.col("q1_bought_month").cast(IntegerType()),
F.col("q2_bought_month").cast(IntegerType()),
F.col("q3_bought_month").cast(IntegerType()),
F.col("q4_bought_month").cast(IntegerType()),
F.col("rating"),
F.col("scene_comment"),
F.col("scene_title"),
F.col("seller_id"),
F.col("shape"),
F.col("short_desc"),
F.col("size"),
F.col("theme"),
F.col("title"),
F.col("title_len"),
F.col("title_pic_content"),
F.col("title_pic_flag"),
F.col("title_word_content"),
F.col("title_word_flag"),
F.col("together_asin"),
F.col("total_appear_month_str"),
F.col("total_comments"),
F.col("uses"),
F.col("variation_flag"),
F.col("variation_num"),
F.col("weight"),
# ===== 分区字段 =====
F.lit(self.site_name).alias("site_name"),
F.lit(self.date_type).alias("date_type"),
F.lit(self.date_info).alias("date_info")
).cache()
# 释放内存
self.df_ai_asin_add.unpersist()
self.df_flow_asin.unpersist()
self.df_base.unpersist()
self.df_monthly_sales.unpersist()
self.df_ai_analyze.unpersist()
print("数据合并完成,示例数据如下:")
self.df_save.show(10, truncate=True)
def save_data(self):
"""
数据存储到Hive表
"""
# 重新分区
self.df_save = self.df_save.repartition(40)
partition_by = ["site_name", "date_type", "date_info"]
print(f"当前存储的表名为:{self.hive_tb}, 分区为{partition_by}")
hdfs_path = CommonUtil.build_hdfs_path(
self.hive_tb,
partition_dict={
"site_name": self.site_name,
"date_type": self.date_type,
"date_info": self.date_info,
}
)
print(f"清除hdfs目录中: {hdfs_path}")
HdfsUtils.delete_file_in_folder(hdfs_path)
self.df_save.write.saveAsTable(name=self.hive_tb, format='hive', mode='append', partitionBy=partition_by)
print("数据存储成功!")
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
date_type = CommonUtil.get_sys_arg(2, None)
date_info = CommonUtil.get_sys_arg(3, None)
assert site_name is not None, "site_name 不能为空!"
assert date_type is not None, "date_type 不能为空!"
assert date_info is not None, "date_info 不能为空!"
obj = DwtAiAsinYear(site_name=site_name, date_type=date_type, date_info=date_info)
obj.run()
"""
@Author : CT
@Description : 年度ASIN聚合数据导出ES
@SourceTable :
dwt_ai_asin_year (年度聚合数据)
dim_asin_profit_rate_info (利润率数据)
@SinkIndex : {site_name}_ai_asin_analyze_detail_last365_day
@CreateTime : 2026/03/13
"""
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.common_util import CommonUtil
from utils.spark_util import SparkUtil
from utils.es_util import EsUtils
from datetime import datetime
from pyspark.sql import functions as F
class EsAiAsinYear(object):
def __init__(self, site_name, date_type, date_info):
self.site_name = site_name
self.date_type = date_type
self.date_info = date_info
self.spark = SparkUtil.get_spark_session(f"{self.__class__.__name__}")
self.es_client = EsUtils.get_es_client()
self.es_index = f"{self.site_name}_ai_asin_analyze_detail_last365_day"
self.es_pipeline = f"{self.site_name}_ai_analyze_pipeline"
self.es_options = self.get_es_options(self.es_index, self.es_pipeline)
self.df_year = self.spark.sql("select 1+1;")
self.df_profit_rate = self.spark.sql("select 1+1;")
self.df_save = self.spark.sql("select 1+1;")
@staticmethod
def get_index_body():
return {
"settings": {
"number_of_shards": "3",
"number_of_replicas": "1",
"analysis": {
"filter": {
"en_snowball": {
"type": "snowball",
"language": "English"
},
"en_synonym": {
"type": "synonym_graph",
"synonyms_path": "analysis/synonyms_en.txt",
"updateable": "true"
}
},
"analyzer": {
"en_analyzer": {
"type": "custom",
"tokenizer": "standard",
"filter": ["lowercase", "en_snowball"]
},
"en_search_analyzer": {
"tokenizer": "standard",
"filter": ["lowercase", "en_synonym", "en_snowball"]
}
},
"normalizer": {
"lowercase_normalizer": {
"type": "custom",
"char_filter": [],
"filter": ["lowercase"]
}
}
}
},
"mappings": {
"properties": {
"account_addr": {"type": "keyword", "normalizer": "lowercase_normalizer"},
"account_name": {"type": "keyword", "normalizer": "lowercase_normalizer"},
"analyze_id": {"type": "integer"},
"ao_val": {"type": "scaled_float", "scaling_factor": 100.0},
"appearance": {"type": "keyword", "normalizer": "lowercase_normalizer"},
"asin": {"type": "keyword", "index": True},
"bought_month": {"type": "integer"},
"bought_month_1": {"type": "integer"},
"bought_month_2": {"type": "integer"},
"bought_month_3": {"type": "integer"},
"bought_month_4": {"type": "integer"},
"bought_month_5": {"type": "integer"},
"bought_month_6": {"type": "integer"},
"bought_month_7": {"type": "integer"},
"bought_month_8": {"type": "integer"},
"bought_month_9": {"type": "integer"},
"bought_month_10": {"type": "integer"},
"bought_month_11": {"type": "integer"},
"bought_month_12": {"type": "integer"},
"bought_month_mom": {"type": "scaled_float", "scaling_factor": 100.0},
"bought_month_sum": {"type": "integer"},
"bought_month_yoy": {"type": "scaled_float", "scaling_factor": 100.0},
"brand": {"type": "keyword", "normalizer": "lowercase_normalizer"},
"bsr_rank": {"type": "integer"},
"buy_box_seller_type": {"type": "keyword"},
"category": {"type": "keyword"},
"category_current_id": {"type": "keyword"},
"category_id": {"type": "keyword"},
"color": {"type": "keyword", "normalizer": "lowercase_normalizer"},
"crowd": {"type": "keyword", "normalizer": "lowercase_normalizer"},
"date_info": {"type": "keyword"},
"describe": {"type": "keyword"},
"fb_country_name": {"type": "keyword"},
"festival": {"type": "keyword"},
"function": {"type": "keyword", "normalizer": "lowercase_normalizer"},
"img": {"type": "keyword"},
"img_num": {"type": "integer"},
"is_ascending_flag": {"type": "integer"},
"is_new_flag": {"type": "integer"},
"label_content": {"type": "text", "fields": {"keyword": {"type": "keyword", "normalizer": "lowercase_normalizer"}}},
"launch_time": {"type": "keyword"},
"launch_time_type": {"type": "integer"},
"last_year_extra": {"type": "object", "properties": {"asin": {"type": "keyword"}, "is_stable_flag": {"type": "integer"}, "is_periodic_flag": {"type": "integer"}, "is_ascending_flag": {"type": "integer"}, "max_bought_month_arr": {"type": "integer"}}},
"material": {"type": "keyword", "normalizer": "lowercase_normalizer"},
"multi_color_content": {"type": "keyword", "normalizer": "lowercase_normalizer"},
"multi_color_flag": {"type": "keyword"},
"package_quantity": {"type": "keyword", "normalizer": "lowercase_normalizer"},
"package_quantity_arr": {"type": "integer"},
"package_quantity_flag": {"type": "keyword"},
"parent_asin": {"type": "keyword"},
"price": {"type": "scaled_float", "scaling_factor": 100.0},
"price_mom": {"type": "float"},
"price_yoy": {"type": "float"},
"profit_key": {"type": "keyword"},
"profit_rate_extra": {"type": "object", "properties": {"profit_key": {"type": "keyword"}, "ocean_profit": {"type": "float"}, "air_profit": {"type": "float"}}},
"q1_bought_month": {"type": "integer"},
"q2_bought_month": {"type": "integer"},
"q3_bought_month": {"type": "integer"},
"q4_bought_month": {"type": "integer"},
"rating": {"type": "scaled_float", "scaling_factor": 100.0},
"scene_comment": {"type": "keyword", "normalizer": "lowercase_normalizer"},
"scene_title": {"type": "keyword", "normalizer": "lowercase_normalizer"},
"seller_id": {"type": "keyword"},
"shape": {"type": "keyword", "normalizer": "lowercase_normalizer"},
"short_desc": {"type": "keyword", "normalizer": "lowercase_normalizer"},
"site_name": {"type": "keyword"},
"size": {"type": "keyword", "normalizer": "lowercase_normalizer"},
"theme": {"type": "keyword", "normalizer": "lowercase_normalizer"},
"title": {"type": "text", "analyzer": "en_analyzer", "search_analyzer": "en_search_analyzer"},
"title_len": {"type": "integer"},
"title_pic_content": {"type": "keyword", "normalizer": "lowercase_normalizer"},
"title_pic_flag": {"type": "keyword"},
"title_word_content": {"type": "keyword", "normalizer": "lowercase_normalizer"},
"title_word_flag": {"type": "keyword"},
"together_asin": {"type": "keyword"},
"total_appear_month": {"type": "integer"},
"total_appear_month_str":{"type": "keyword"},
"total_comments": {"type": "integer"},
"uses": {"type": "keyword", "normalizer": "lowercase_normalizer"},
"variation_flag": {"type": "integer"},
"variation_num": {"type": "integer"},
"weight": {"type": "keyword", "normalizer": "lowercase_normalizer"}
}
}
}
@staticmethod
def get_es_options(index_name, pipeline_id):
return {
"es.nodes": EsUtils.__es_ip__,
"es.port": EsUtils.__es_port__,
"es.net.http.auth.user": EsUtils.__es_user__,
"es.net.http.auth.pass": EsUtils.__es_passwd__,
"es.mapping.id": "asin",
"es.resource": f"{index_name}/_doc",
"es.batch.write.refresh": "false",
"es.batch.write.retry.wait": "60s",
"es.batch.size.entries": "5000",
"es.nodes.wan.only": "false",
"es.batch.write.concurrency": "40",
"es.write.operation": "index",
"es.ingest.pipeline": f"{pipeline_id}"
}
def run(self):
self.read_data()
self.handle_data()
self.save_data()
def read_data(self):
# 1. 读取年度聚合数据
sql1 = f"""
select
account_addr,
account_name,
analyze_id,
ao_val,
appearance,
asin,
bought_month,
bought_month_1,
bought_month_2,
bought_month_3,
bought_month_4,
bought_month_5,
bought_month_6,
bought_month_7,
bought_month_8,
bought_month_9,
bought_month_10,
bought_month_11,
bought_month_12,
bought_month_mom,
bought_month_sum,
bought_month_yoy,
brand,
bsr_rank,
buy_box_seller_type,
category,
category_current_id,
category_id,
color,
crowd,
`describe`,
fb_country_name,
festival,
`function`,
img,
img_num,
is_ascending_flag,
is_new_flag,
label_content,
launch_time,
launch_time_type,
material,
multi_color_content,
multi_color_flag,
package_quantity,
package_quantity_arr,
package_quantity_flag,
parent_asin,
price,
price_mom,
price_yoy,
profit_key,
q1_bought_month,
q2_bought_month,
q3_bought_month,
q4_bought_month,
rating,
scene_comment,
scene_title,
seller_id,
shape,
short_desc,
size,
theme,
title,
title_len,
title_pic_content,
title_pic_flag,
title_word_content,
title_word_flag,
together_asin,
total_appear_month_str,
total_comments,
uses,
variation_flag,
variation_num,
weight,
site_name,
date_info
from dwt_ai_asin_year
where site_name = '{self.site_name}'
and date_type = '{self.date_type}'
and date_info = '{self.date_info}'
"""
self.df_year = self.spark.sql(sql1).repartition(40, 'asin').cache()
print("年度聚合数据读取完成,示例数据如下:")
self.df_year.show(10, truncate=True)
# 2. 读取利润率数据
sql2 = f"""
select asin, price, ocean_profit, air_profit
from dim_asin_profit_rate_info
where site_name = '{self.site_name}'
"""
self.df_profit_rate = self.spark.sql(sql2).repartition(40, 'asin').cache()
print("利润率数据读取完成,示例数据如下:")
self.df_profit_rate.show(10, truncate=True)
def handle_data(self):
# 组装profit_rate_extra结构体
df_profit = self.df_profit_rate.withColumn(
"profit_rate_extra",
F.when(
F.col("ocean_profit").isNull() & F.col("air_profit").isNull(),
F.lit(None)
).otherwise(
F.struct(
F.col("ocean_profit").alias("ocean_profit"),
F.col("air_profit").alias("air_profit")
)
)
).drop("ocean_profit", "air_profit")
# package_quantity_arr: 字符串转int数组
# total_appear_month: total_appear_month_str逗号切分后的int数组
self.df_save = self.df_year.join(
df_profit, on=["asin", "price"], how="left"
).withColumn(
"package_quantity_arr",
F.expr("transform(split(package_quantity_arr, ','), x -> cast(x as int))")
).withColumn(
"total_appear_month",
F.expr("transform(split(total_appear_month_str, ','), x -> cast(x as int))")
).select(
"account_addr",
"account_name",
"analyze_id",
"ao_val",
"appearance",
"asin",
"bought_month",
"bought_month_1",
"bought_month_2",
"bought_month_3",
"bought_month_4",
"bought_month_5",
"bought_month_6",
"bought_month_7",
"bought_month_8",
"bought_month_9",
"bought_month_10",
"bought_month_11",
"bought_month_12",
"bought_month_mom",
"bought_month_sum",
"bought_month_yoy",
"brand",
"bsr_rank",
"buy_box_seller_type",
"category",
"category_current_id",
"category_id",
"color",
"crowd",
"date_info",
F.col("describe"),
"fb_country_name",
"festival",
F.col("function"),
"img",
"img_num",
"is_ascending_flag",
"is_new_flag",
"label_content",
"launch_time",
"launch_time_type",
"material",
"multi_color_content",
"multi_color_flag",
"package_quantity",
"package_quantity_arr",
"package_quantity_flag",
"parent_asin",
"price",
"price_mom",
"price_yoy",
"profit_key",
"profit_rate_extra",
"q1_bought_month",
"q2_bought_month",
"q3_bought_month",
"q4_bought_month",
"rating",
"scene_comment",
"scene_title",
"seller_id",
"shape",
"short_desc",
"site_name",
"size",
"theme",
"title",
"title_len",
"title_pic_content",
"title_pic_flag",
"title_word_content",
"title_word_flag",
"together_asin",
"total_appear_month",
"total_appear_month_str",
"total_comments",
"uses",
"variation_flag",
"variation_num",
"weight"
).cache()
print("数据处理完成,示例数据如下:")
self.df_save.show(10, truncate=True)
self.df_year.unpersist()
self.df_profit_rate.unpersist()
def save_data(self):
EsUtils.create_index(self.es_index, self.es_client, self.get_index_body())
try:
self.df_save.repartition(40).write.format("org.elasticsearch.spark.sql") \
.options(**self.es_options) \
.mode("append") \
.save()
print(f"ES {self.es_index} 索引更新完毕!")
CommonUtil.send_wx_msg(['chenyuanjie'], 'ES年度ASIN数据更新完成', f'索引:{self.es_index} {self.site_name} {self.date_type} {self.date_info}')
except Exception as e:
print("An error occurred while writing to Elasticsearch:", str(e))
CommonUtil.send_wx_msg(['chenyuanjie'], '\u26A0 ES数据更新失败', f'失败索引:{self.es_index}')
if __name__ == "__main__":
site_name = CommonUtil.get_sys_arg(1, None)
date_type = CommonUtil.get_sys_arg(2, None)
date_info = CommonUtil.get_sys_arg(3, None)
assert site_name is not None, "site_name 不能为空!"
assert date_type is not None, "date_type 不能为空!"
assert date_info is not None, "date_info 不能为空!"
print("开始执行时间:", datetime.now().strftime("%Y-%m-%d %H:%M"))
obj = EsAiAsinYear(site_name=site_name, date_type=date_type, date_info=date_info)
obj.run()
print("执行结束时间:", datetime.now().strftime("%Y-%m-%d %H:%M"))
print("success!!!")
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment