Commit 508bfd64 by chenyuanjie

最近30天asin导出计算利润率

parent 42cac787
......@@ -4,9 +4,11 @@ import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.spark_util import SparkUtil
from utils.es_util import EsUtils
from pyspark.sql import functions as F, Window
from utils.hdfs_utils import HdfsUtils
from utils.common_util import CommonUtil
from datetime import datetime, timedelta
class DwtFlowKeepaAsin(object):
......@@ -39,10 +41,32 @@ class DwtFlowKeepaAsin(object):
and asin_price is not null
and asin_price > 0
"""
self.df_flow_asin = self.spark.sql(sqlQuery=sql).repartition(40, 'asin')
window = Window.partitionBy(['asin', 'price']).orderBy(
self.df_flow_asin.source_month.desc_nulls_last()
)
df_flow_asin_month = self.spark.sql(sqlQuery=sql) \
.withColumn('price', F.round(F.col('price'), 2).cast('decimal(10,2)'))
# 读取ES最近30天缺少利润率的asin
days_30_ago = (datetime.now() - timedelta(days=30)).strftime("%Y-%m-%d 00:00:00")
es_read_options = {
"es.nodes": EsUtils.__es_ip__,
"es.port": EsUtils.__es_port__,
"es.net.http.auth.user": EsUtils.__es_user__,
"es.net.http.auth.pass": EsUtils.__es_passwd__,
"es.nodes.wan.only": "false",
"es.mapping.date.rich": "false",
"es.scroll.size": "2000",
"es.read.field.include": "asin,price,category_first_id,asin_crawl_date",
"es.query": f'{{"query":{{"bool":{{"must":[{{"range":{{"price":{{"gt":0}}}}}},{{"range":{{"asin_crawl_date":{{"gte":"{days_30_ago}"}}}}}}],"must_not":{{"exists":{{"field":"profit_rate_extra.ocean_profit"}}}}}}}}}}'
}
df_flow_asin_30day = self.spark.read.format("org.elasticsearch.spark.sql") \
.options(**es_read_options) \
.load(f"{self.site_name}_flow_asin_30day") \
.withColumn('price', F.round(F.col('price'), 2).cast('decimal(10,2)')) \
.withColumn('source_month', F.date_format(F.col('asin_crawl_date'), 'yyyy-MM')) \
.select('asin', 'price', 'category_first_id', 'source_month')
# 合并两部分,按(asin, price)去重保留最新source_month
self.df_flow_asin = df_flow_asin_month.union(df_flow_asin_30day).repartition(40, 'asin')
window = Window.partitionBy(['asin', 'price']).orderBy(F.col('source_month').desc_nulls_last())
self.df_flow_asin = self.df_flow_asin.withColumn(
'rank', F.row_number().over(window=window)
).filter('rank = 1').drop('rank').cache()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment