Commit 160f062d by fangxingjun

no message

parent 0f3fa922
...@@ -54,6 +54,7 @@ class DimStBrandInfo(object): ...@@ -54,6 +54,7 @@ class DimStBrandInfo(object):
self.df_asin_measure = self.spark.sql(f"select 1+1;") self.df_asin_measure = self.spark.sql(f"select 1+1;")
self.df_base_brand = self.spark.sql(f"select 1+1;") self.df_base_brand = self.spark.sql(f"select 1+1;")
self.df_brand_black = self.spark.sql(f"select 1+1;") self.df_brand_black = self.spark.sql(f"select 1+1;")
self.df_brand_st_exclude = self.spark.sql(f"select 1+1;")
self.df_st_key = self.spark.sql(f"select 1+1;") self.df_st_key = self.spark.sql(f"select 1+1;")
def run(self): def run(self):
...@@ -128,6 +129,17 @@ class DimStBrandInfo(object): ...@@ -128,6 +129,17 @@ class DimStBrandInfo(object):
query=pg_sql query=pg_sql
) )
# 读取ods_brand_st_exclude,取排除品牌词的关键词
sql = f"""select lower(trim(search_term)) as st_brand_name_lower, 2 as black_flag
from ods_brand_st_exclude
where site_name='{self.site_name}';"""
print("sql:", sql)
self.df_brand_st_exclude = self.spark.sql(sqlQuery=sql).cache()
self.df_brand_black = self.df_brand_black.unionByName(self.df_brand_st_exclude)
window = Window.partitionBy("st_brand_name_lower").orderBy(F.col("black_flag").desc())
self.df_brand_black = self.df_brand_black.withColumn("_rn", F.row_number().over(window)) \
.filter("_rn = 1").select("st_brand_name_lower", "black_flag")
def handle_data(self): def handle_data(self):
# 100w内搜索词和st_asin关联,找到范围内, 搜索词和asin的关系 # 100w内搜索词和st_asin关联,找到范围内, 搜索词和asin的关系
......
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.common_util import CommonUtil
from utils.secure_db_client import get_remote_engine
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
assert site_name is not None, "site_name 不能为空!"
db_type = "mysql"
import_tb = f"{site_name}_brand_st_exclude"
cols = "id, search_term, created_at, updated_at"
query = f"""
select
{cols}
from {import_tb}
where 1 = 1
and \$CONDITIONS
"""
hive_tb = "ods_brand_st_exclude"
partition_dict = {
"site_name": site_name
}
engine = get_remote_engine(
site_name=site_name,
db_type=db_type
)
engine.sqoop_raw_import(
query=query,
hive_table=hive_tb,
partitions=partition_dict
)
pass
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment