Commit 40fcc4ea by chenyuanjie

es月索引增加利润率字段

parent 31f19f7e
...@@ -26,6 +26,7 @@ class EsStDetail(TemplatesMysql): ...@@ -26,6 +26,7 @@ class EsStDetail(TemplatesMysql):
use_db="big_data_selection") use_db="big_data_selection")
# DataFrame对象初始化 # DataFrame对象初始化
self.df_synchronize = self.spark.sql("select 1+1") self.df_synchronize = self.spark.sql("select 1+1")
self.df_profit_rate = self.spark.sql("select 1+1")
if self.date_type == '4_week': if self.date_type == '4_week':
self.cur_date = self.get_date_from_week() self.cur_date = self.get_date_from_week()
else: else:
...@@ -102,13 +103,29 @@ class EsStDetail(TemplatesMysql): ...@@ -102,13 +103,29 @@ class EsStDetail(TemplatesMysql):
from {self.table_name} where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}' from {self.table_name} where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}'
""" """
print("sql:", sql) print("sql:", sql)
self.df_synchronize = self.spark.sql(sqlQuery=sql) self.df_synchronize = self.spark.sql(sqlQuery=sql).repartition(40, 'asin')
self.df_synchronize = self.df_synchronize.repartition(40).withColumn(
sql = f"""
select asin, price, ocean_profit, air_profit from dim_asin_profit_rate_info where site_name='{self.site_name}'
"""
print("sql:", sql)
self.df_profit_rate = self.spark.sql(sqlQuery=sql).repartition(40, 'asin').withColumn(
"profit_rate_extra",
F.struct(
F.col("ocean_profit").alias("ocean_profit"),
F.col("air_profit").alias("air_profit")
)
).drop("ocean_profit", "air_profit")
self.df_synchronize = self.df_synchronize.join(
self.df_profit_rate, on=['asin', 'price'], how='left'
).withColumn(
"img_type", F.split(F.col("img_type"), ",") "img_type", F.split(F.col("img_type"), ",")
).withColumn( ).withColumn(
"img_type", F.expr("transform(img_type, x -> cast(x as int))") "img_type", F.expr("transform(img_type, x -> cast(x as int))")
).withColumn(
'profit_key', F.concat_ws("_", F.col("asin"), F.col("price"))
).cache() ).cache()
self.df_synchronize.show(10, truncate=False)
# 同步数据前的准备工作 # 同步数据前的准备工作
def es_prepare(self): def es_prepare(self):
......
...@@ -485,6 +485,20 @@ class EsUtils(object): ...@@ -485,6 +485,20 @@ class EsUtils(object):
}, },
"tracking_since_type": { "tracking_since_type": {
"type": "short" "type": "short"
},
"profit_key": {
"type": "keyword"
},
"profit_rate_extra": {
"type": "object",
"properties": {
"ocean_profit": {
"type": "float"
},
"air_profit": {
"type": "float"
}
}
} }
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment