Commit 057dbb84 by fangxingjun

Merge branch 'developer' of 47.106.101.75:abel_cjy/Amazon-Selection-Data into developer

parents 0feb4e27 e92a6ecf
......@@ -3,14 +3,13 @@ import sys
import re
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from utils.templates import Templates
# from ..utils.templates import Templates
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
# 导入udf公共方法
from yswg_utils.common_udf import udf_parse_bs_category
# from ..yswg_utils.common_udf import udf_parse_bs_category
# from yswg_utils.common_udf import udf_parse_bs_category
class DimBsAsinInfo(Templates):
......@@ -20,31 +19,19 @@ class DimBsAsinInfo(Templates):
self.site_name = site_name
self.date_type = date_type
self.date_info = date_info
# 初始化self.spark对
self.db_save = 'dim_asin_bs_info'
self.spark = self.create_spark_object(
app_name=f"{self.db_save}: {self.site_name}, {self.date_type}, {self.date_info}")
self.spark = self.create_spark_object(app_name=f"{self.db_save}: {self.site_name}, {self.date_type}, {self.date_info}")
self.df_save = self.spark.sql("select 1+1;")
self.df_asin_node_id = self.spark.sql("select 1+1;")
self.df_bs_asin_detail = self.spark.sql("select 1+1;")
self.df_bs_category = self.spark.sql("select 1+1;")
# 定义 UDF 的返回类型,即一个包含三个 DoubleType 字段的 StructType
schema = StructType([
StructField('asin_bs_cate_1_id', StringType(), True),
StructField('asin_bs_cate_current_id', StringType(), True),
StructField('asin_bs_cate_1_rank', IntegerType(), True),
StructField('asin_bs_cate_current_rank', IntegerType(), True),
])
# self.u_parse_bs_category = F.udf(self.udf_parse_bs_category, schema)
self.u_parse_bs_category = F.udf(udf_parse_bs_category, schema)
# self.pattern1_dict = {
# "us": "(\d+).*?See Top 100 in ".lower(),
# "uk": "(\d+).*?See Top 100 in ".lower(),
# "de": "(\d+).*?Siehe Top 100 in ".lower(),
# "es": "(\d+).*?Ver el Top 100 en ".lower(),
# "fr": "(\d+).*?Voir les 100 premiers en ".lower(),
# "it": "(\d+).*?Visualizza i Top 100 nella categoria ".lower(),
# }
self.u_parse_bs_category = F.udf(self.udf_parse_bs_category, schema)
self.pattern1_dict = {
"us": "See Top 100 in ".lower(),
"uk": "See Top 100 in ".lower(),
......@@ -66,11 +53,15 @@ class DimBsAsinInfo(Templates):
self.get_year_week_tuple()
@staticmethod
def udf_parse_bs_category(asin_bs_sellers_rank_lower, last_herf, all_best_sellers_href, cate_current_pattern, cate_1_pattern):
# if (site_name == 'us' and date_type in ['month', 'month_week'] and date_info >= '2023-11') or (site_name != 'us' and date_type in ['week'] and date_info >= '2023-41'):
# href_list = all_best_sellers_href.split("&&&&")
def udf_parse_bs_category(asin_bs_sellers_rank_lower, last_herf, all_best_sellers_href, cate_current_pattern, cate_1_pattern, node_id):
"""
asin_bs_sellers_rank_lower: 底部分类字符串
last_herf: 最后一级分类链接
all_best_sellers_href: 所有分类链接
cate_current_pattern: 当前分类排名匹配规则
cate_1_pattern: 一级分类排名匹配规则
node_id: 页面头部抓取分类id
"""
# 1. 判断用哪个字段来解析分类
if str(all_best_sellers_href).lower() not in ['', 'none', 'null']:
bs_href = all_best_sellers_href
......@@ -80,8 +71,54 @@ class DimBsAsinInfo(Templates):
bs_href = ''
href_list = bs_href.replace("?tf=1", "").split("&&&&")
# 新增climate-pledge分类优化--若最后一级是climate-pledge的分类,则向前取
rank_flag = None
while True:
if '/climate-pledge' in href_list[-1] and len(href_list) >= 2:
href_list.pop()
rank_flag = True
else:
break
# 2. 解析一级和当前 分类 + 排名
# 2.1 提取分类
# 2.1 先检查 node_id 是否在 href_list 中
cate_1_id, cate_current_id, cate_1_rank, cate_current_rank = None, None, None, None
if node_id and len(href_list) > 1:
node_id_str = str(node_id)
matched_idx = None
for i, href in enumerate(href_list):
if node_id_str in href: # 判断node_id是否在url中出现
matched_idx = i
break
if matched_idx is not None:
# 提取对应分类ID
cate_current_id = re.findall('bestsellers/(.*)/ref', href_list[matched_idx])
cate_current_id = cate_current_id[0].split("/")[-1] if cate_current_id else None
# 一级分类还是取第一个
cate_1_id = re.findall('bestsellers/(.*)/ref', href_list[0])
cate_1_id = cate_1_id[0].split("/")[0] if cate_1_id else None
# 解析排名
if asin_bs_sellers_rank_lower is not None:
asin_bs_sellers_rank_lower2 = asin_bs_sellers_rank_lower.replace(".", "").replace(",", "").replace(
" 100 ", "")
else:
asin_bs_sellers_rank_lower2 = ''
rank_list = re.findall(cate_current_pattern, asin_bs_sellers_rank_lower2)
rank_list = [int(rank) for rank in rank_list]
# 如果 rank_list 长度和 href_list 对齐,则取对应位置的排名
if matched_idx < len(rank_list):
cate_current_rank = rank_list[matched_idx]
# 一级分类排名
if rank_list and cate_1_pattern in asin_bs_sellers_rank_lower:
cate_1_rank = rank_list[0]
return cate_1_id, cate_current_id, cate_1_rank, cate_current_rank
# 2.2 提取分类
if href_list:
if len(href_list) == 1:
cate_list = re.findall('bestsellers/(.*)/ref', href_list[0])
......@@ -93,20 +130,32 @@ class DimBsAsinInfo(Templates):
else:
cate_1_id, cate_current_id = None, None
else:
cate_1_id = re.findall('bestsellers/(.*)/ref', href_list[0])[0] if re.findall('bestsellers/(.*)/ref', href_list[0]) else None
cate_current_id = re.findall('bestsellers/(.*)/ref', href_list[-1])[0] if re.findall('bestsellers/(.*)/ref', href_list[-1]) else None
cate_1_id = re.findall('bestsellers/(.*)/ref', href_list[0])[0] if re.findall('bestsellers/(.*)/ref',
href_list[0]) else None
cate_current_id = re.findall('bestsellers/(.*)/ref', href_list[-1])[0] if re.findall(
'bestsellers/(.*)/ref',
href_list[
-1]) else None
if "/" in cate_1_id:
cate_1_id = cate_1_id.split("/")[0]
if "/" in cate_current_id:
cate_current_id = cate_current_id.split("/")[-1]
else:
cate_1_id, cate_current_id = None, None
# 2.2 提取排名
asin_bs_sellers_rank_lower2 = asin_bs_sellers_rank_lower.replace(",", "").replace(" 100 ", "")
# 2.3 提取排名
if asin_bs_sellers_rank_lower is not None:
asin_bs_sellers_rank_lower2 = asin_bs_sellers_rank_lower.replace(".", "").replace(",", "").replace(" 100 ",
"")
else:
asin_bs_sellers_rank_lower2 = ''
rank_list = re.findall(cate_current_pattern, asin_bs_sellers_rank_lower2) # 匹配排名
rank_list = [int(rank) for rank in rank_list] # 转换成int类型
# print("rank_list:", rank_list)
if rank_flag:
if len(rank_list) > len(href_list):
rank_list = rank_list[:len(href_list)]
if rank_list:
if len(rank_list) == 1:
if cate_1_pattern in asin_bs_sellers_rank_lower:
......@@ -170,6 +219,10 @@ class DimBsAsinInfo(Templates):
# 小写
self.df_bs_asin_detail = self.df_bs_asin_detail.withColumn("asin_bs_sellers_rank_lower",
F.lower("asin_bs_sellers_rank"))
# 关联node_id
self.df_bs_asin_detail = self.df_asin_node_id.join(
self.df_bs_asin_detail, 'asin', how='left'
)
# self.df_bs_asin_detail.show(10, truncate=False)
# 提取分类字符串中的asin_bs_cate_1_rank, asin_bs_cate_current_rank
# 生成当前分类匹配规则
......@@ -178,7 +231,7 @@ class DimBsAsinInfo(Templates):
self.df_bs_asin_detail = self.df_bs_asin_detail.withColumn(
'asin_bs_cate_ranks',
self.u_parse_bs_category('asin_bs_sellers_rank_lower', 'last_herf', 'all_best_sellers_href',
F.lit(cate_current_pattern), F.lit(cate_1_pattern))
F.lit(cate_current_pattern), F.lit(cate_1_pattern), 'asin_bs_cate_current_id_node')
)
# self.df_bs_asin_detail.show(10, truncate=False)
self.df_bs_asin_detail = self.df_bs_asin_detail \
......@@ -190,18 +243,15 @@ class DimBsAsinInfo(Templates):
.withColumn('asin_bs_cate_current_rank',
self.df_bs_asin_detail.asin_bs_cate_ranks.getField('asin_bs_cate_current_rank')) \
.drop('asin_bs_cate_ranks')
self.df_bs_asin_detail.show(10, truncate=False)
# self.df_bs_asin_detail.show(10, truncate=False)
# self.df_save = self.df_asin_node_id.join(
# self.df_bs_asin_detail, 'asin', how='left'
# ).join(
# self.df_category_desc_id, 'asin_bs_cate_current_id', how='left'
# )
self.df_save = self.df_asin_node_id.join(
self.df_bs_asin_detail, 'asin', how='left'
)
# 用node_id的分类去补充一级分类和当前分类
self.df_save = self.df_save.withColumn(
self.df_save = self.df_bs_asin_detail.withColumn(
"asin_bs_cate_1_id",
F.when(F.col("asin_bs_cate_1_id").isNull(), F.col("asin_bs_cate_1_id_node")).otherwise(F.col("asin_bs_cate_1_id"))
).withColumn(
......@@ -225,4 +275,4 @@ if __name__ == '__main__':
date_type = sys.argv[2] # 参数2:类型:week/4_week/month/quarter
date_info = sys.argv[3] # 参数3:年-周/年-月/年-季, 比如: 2022-1
handle_obj = DimBsAsinInfo(site_name=site_name, date_type=date_type, date_info=date_info)
handle_obj.run()
\ No newline at end of file
handle_obj.run()
......@@ -40,7 +40,7 @@ class DwtStThemeAgg(object):
self.u_theme_pattern = F.udf(udf_ele_mattch, StringType())
self.u_theme_contain_judge = F.udf(self.udf_theme_contain_judge, IntegerType())
self.u_judge_twin_words = F.udf(self.udf_judge_twin_words, IntegerType())
self.u_filter_sec_pattern_words = F.udf(self.udf_filter_sec_pattern_words, IntegerType())
self.u_filter_pattern_words = F.udf(self.udf_filter_pattern_words, IntegerType())
# 全局df初始化
self.df_st_base = self.spark.sql(f"select 1+1;")
......@@ -180,8 +180,7 @@ class DwtStThemeAgg(object):
return F.udf(udf_filter_blacklist, IntegerType())
@staticmethod
def udf_filter_sec_pattern_words(st_word, pattern_list):
# 标记一些特殊情况指定的二级词,方便后期过滤
def udf_filter_pattern_words(st_word, pattern_list):
filter_flag = 0
theme_list = ['combination', 'size']
if pattern_list:
......@@ -191,7 +190,7 @@ class DwtStThemeAgg(object):
# 进行单项 数字+month/months的所有二级词 和 数字连接t+ boys/girls的二级词特殊匹配
date_pattern = re.compile(r"(\d+(?:\.\d+)?) +(month|months)\b", flags=re.IGNORECASE)
numt_pattern = re.compile(r"((?:\d+)t)(?: +)(boys|girls|boy|girl)\b", flags=re.IGNORECASE)
other_pattern = re.compile(r"\b(women|men|man|woman|for|cute|fashion|kids?|adults?|girls?|boys?)\b", flags=re.IGNORECASE)
other_pattern = re.compile(r"\b(womens?|mens?|mans?|womans?|fors?|cutes?|fashions?|kids?|adults?|girls?|boys?)\b", flags=re.IGNORECASE)
if re.search(date_pattern, st_word):
return 1
if re.search(numt_pattern, st_word):
......@@ -350,8 +349,6 @@ class DwtStThemeAgg(object):
self.read_data()
# 模板词归一化处理
self.handle_base_pattern_data()
# 二级词单独处理
self.handle_sec_st()
# 将一级二级模板词和搜索词进行匹配,做中间存储
self.handle_st_filter_table()
# 统计各模板词的指标 pattern_type=0
......@@ -399,23 +396,18 @@ class DwtStThemeAgg(object):
'st_blacklist_flag', self.filter_blacklist_words(pd_match_blacklist)("search_term")
).filter('st_blacklist_flag != 1').cache()
# 处理二级词
def handle_sec_st(self):
self.df_sec_words = self.df_base_filter_date.filter('st_word_num = 2')
self.df_sec_words = self.df_sec_words.join(
def handle_st_filter_table(self):
# 过滤特殊词
self.df_base_filter_date = self.df_base_filter_date.join(
self.df_theme, on=['search_term'], how='left'
)
self.df_sec_words = self.df_sec_words.withColumn(
"filter_flag", self.u_filter_sec_pattern_words(F.col("search_term"), F.col("pattern_list"))
)
# 过滤掉被标记为1的数据
self.df_sec_words = self.df_sec_words.filter("filter_flag != 1")
self.df_sec_words = self.df_sec_words.select(
'search_term', 'st_word_num', 'st_bsr_cate_1_id_new', 'st_bsr_cate_current_id_new',
'rank', 'rank_change_rate', 'rank_rate_of_change'
)
).withColumn(
"filter_flag", self.u_filter_pattern_words(F.col("search_term"), F.col("pattern_list"))
).filter(
"filter_flag != 1"
).select(
'search_term', 'st_word_num', 'st_bsr_cate_1_id_new', 'st_bsr_cate_current_id_new', 'rank', 'rank_change_rate', 'rank_rate_of_change'
).cache()
def handle_st_filter_table(self):
df_st_filter_base = self.df_st_base.select(
F.col('st_key'),
F.col('search_term'),
......@@ -425,12 +417,6 @@ class DwtStThemeAgg(object):
F.lit(self.date_info).alias('date_info')
).cache()
# 将处理后的二级词和一级词合并
df_one_word = self.df_base_filter_date.filter('st_word_num = 1').select(
'search_term', 'st_word_num', 'st_bsr_cate_1_id_new', 'st_bsr_cate_current_id_new',
'rank', 'rank_change_rate', 'rank_rate_of_change'
)
self.df_base_filter_date = self.df_sec_words.unionByName(df_one_word).cache()
pattern_words = self.df_base_filter_date.select('search_term')
# 将数据转换成pandas_df
dict_df = pattern_words.toPandas()
......@@ -461,7 +447,6 @@ class DwtStThemeAgg(object):
df_list.append(df_union_filter)
for i in range(0, len(df_list), batch_size):
print(f"当前是word_batches的轮回:f{word_batches.index(word_batch)},当前写入表的df索引位置:{i + 1}")
tmp_df = []
tmp_df = df_list[i:i + batch_size]
result_df = self.udf_unionAll(*tmp_df)
result_df = result_df.repartition(1)
......
......@@ -48,7 +48,11 @@ class EsStDetail(TemplatesMysql):
self.record_table_name_field = f'{self.site_name}_flow_asin_last_month' if self.date_type == 'month' else f'{self.site_name}_flow_asin_last30day'
# elasticsearch相关配置
self.client = EsUtils.get_es_client()
self.es_options = EsUtils.get_es_options(self.es_index_name)
# 富集策略相关配置,用于更新 usr_mask_type 字段
self.policy_name1 = "user_mask_asin_policy"
self.policy_name2 = "user_mask_category_policy"
self.pipeline_id = "user_asin_mask_enrich_pipeline"
self.es_options = EsUtils.get_es_options(self.es_index_name, self.pipeline_id)
self.es_body = EsUtils.get_es_body()
# 正式导出需入导出记录表
......@@ -105,6 +109,10 @@ class EsStDetail(TemplatesMysql):
def es_prepare(self):
print("当前链接的es节点信息为:" + str(EsUtils.__es_ip__))
EsUtils.create_index(self.es_index_name, self.client, self.es_body)
# 执行富集策略
EsUtils.user_enrich_pipeline(self.client, self.pipeline_id, self.policy_name1, self.policy_name2)
self.client.enrich.execute_policy(name=self.policy_name1)
self.client.enrich.execute_policy(name=self.policy_name2)
if self.date_type != 'month':
if not EsUtils.exist_index_alias(self.alias_name, self.client):
EsUtils.create_index_alias(self.es_index_name, self.alias_name, self.client)
......
......@@ -50,7 +50,11 @@ class KafkaFlowAsinDetail(Templates):
self.es_index_name = f"{self.topic_name}_test" if self.test_flag == 'test' else f"{self.topic_name}"
self.es_index_alias_name = f"{self.site_name}_st_detail_last_4_week_test" if self.test_flag == 'test' else f"{self.site_name}_st_detail_last_4_week"
self.es_index_body = EsUtils.get_es_body()
self.es_options = EsUtils.get_es_options(self.es_index_name)
# 富集策略相关配置,用于更新 usr_mask_type 字段
self.policy_name1 = "user_mask_asin_policy"
self.policy_name2 = "user_mask_category_policy"
self.pipeline_id = "user_asin_mask_enrich_pipeline"
self.es_options = EsUtils.get_es_options(self.es_index_name, self.pipeline_id)
self.db_save = 'kafka_flow_asin_detail'
self.app_name = self.get_app_name()
print(f"任务名称:{self.app_name}")
......@@ -235,7 +239,7 @@ class KafkaFlowAsinDetail(Templates):
cate_1_pattern = self.pattern1_dict[self.site_name]
df = df.withColumn("asin_bs_sellers_rank_lower", F.lower("best_sellers_rank"))
df = df.withColumn("asin_bs", self.u_parse_bs_category(
"asin_bs_sellers_rank_lower", "best_sellers_herf", "all_best_sellers_herf", F.lit(cate_current_pattern), F.lit(cate_1_pattern)))
"asin_bs_sellers_rank_lower", "best_sellers_herf", "all_best_sellers_herf", F.lit(cate_current_pattern), F.lit(cate_1_pattern), "node_id"))
df = df.withColumn("asin_bs_cate_1_id", df.asin_bs.getField("asin_bs_cate_1_id")) \
.withColumn("asin_bs_cate_current_id", df.asin_bs.getField("asin_bs_cate_current_id")) \
.withColumn("asin_bs_cate_1_rank", df.asin_bs.getField("asin_bs_cate_1_rank")) \
......@@ -828,6 +832,10 @@ class KafkaFlowAsinDetail(Templates):
# 创建对应es索引
EsUtils.create_index(self.es_index_name, self.client, self.es_index_body)
print("索引名称为:", self.es_index_name)
# 执行富集策略
self.client.enrich.execute_policy(name=self.policy_name1)
self.client.enrich.execute_policy(name=self.policy_name2)
EsUtils.user_enrich_pipeline(self.client, self.pipeline_id, self.policy_name1, self.policy_name2)
if not EsUtils.exist_index_alias(self.es_index_alias_name, self.client):
EsUtils.create_index_alias(self.es_index_name, self.es_index_alias_name, self.client)
else:
......
"""
@Author : HuangJian
@Description : asin详情表-周表
@SourceTable : us_asin_detail_2023_18
@SinkTable : ods_asin_detail
@CreateTime : 2022/05/18 14:55
@UpdateTime : 2022/05/18 14:55
"""
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
from utils.common_util import DateTypes
from utils.hdfs_utils import HdfsUtils
from utils.secure_db_client import get_remote_engine
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
......@@ -24,151 +14,40 @@ if __name__ == '__main__':
assert date_type is not None, "date_type 不能为空!"
assert date_info is not None, "date_info 不能为空!"
hive_table = f"ods_asin_detail"
d1, d2 = CommonUtil.split_month_week_date(date_type, date_info)
d2 = f'0{d2}' if int(d2) < 10 else f'{d2}'
db_type = 'postgresql_14'
import_table = f"{site_name}_asin_detail_month_{d1}_{d2}"
hive_table = "ods_asin_detail"
partition_dict = {
"site_name": site_name,
"date_type": date_type,
"date_info": date_info
}
# 落表路径校验
hdfs_path = CommonUtil.build_hdfs_path(hive_table, partition_dict=partition_dict)
print(f"hdfs_path is {hdfs_path}")
# 日期拆分
d1, d2 = CommonUtil.split_month_week_date(date_type, date_info)
if date_type == DateTypes.week.name:
# pg的分区周单位数是带0,如01、02、03
d2 = f'0{d2}' if int(d2) < 10 else f'{d2}'
# 这里主要是区分db链接
if site_name == 'us' and date_info >= '2023-26':
db_type = 'postgresql'
if date_info >= '2023-34':
db_type = 'postgresql_14'
date_col = "launch_time,created_time as created_at,updated_time as updated_at"
new_col = ',describe'
else:
db_type = 'postgresql_14'
date_col = "launch_time,created_time as created_at,updated_time as updated_at"
new_col = ',describe'
print(f"同步连接的db_type:{db_type}")
# 这里主要是区分新增字段
# 18周新增字段weight_str
if date_info >= '2023-18':
new_col += ',weight_str'
# 21周新增字段package_quantity、pattern_name
if date_info >= '2023-21':
new_col += ',package_quantity,pattern_name'
# 49周新增字段follow_sellers
if date_info >= '2023-49':
new_col += ',follow_sellers'
# 51周新增字段product_description,buy_sales
if date_info >= '2023-51':
new_col += ',product_description,buy_sales'
# 2024-02周新增字段image_view
if date_info >= '2024-02':
new_col += ',image_view'
# # 2024-05周新增字段product_json,product_detail_json,review_ai_text,review_label_json
# if date_info >= '2024-05':
# new_col += ',product_json,product_detail_json,review_ai_text,review_label_json'
import_table = f"{site_name}_asin_detail_{d1}_{d2}"
if date_type == DateTypes.month.name or date_type == DateTypes.month_week.name:
db_type = 'postgresql_14'
date_col = "launch_time, created_time as created_at, updated_time as updated_at"
new_col = "describe, weight_str, package_quantity, pattern_name, follow_sellers, product_description, buy_sales, image_view, spider_int, " \
"lob_asin_json, seller_json, customer_reviews_json, product_json, product_detail_json, review_ai_text, review_label_json, sp_initial_seen_asins_json, " \
"sp_4stars_initial_seen_asins_json, sp_delivery_initial_seen_asins_json, compare_similar_asin_json, together_asin_json, min_match_asin_json, " \
"variat_num, current_asin, img_list, variat_list, parent_asin, bundles_this_asins_json, video_m3u8_url, result_list_json, bundle_asin_component_json"
d2 = f'0{d2}' if int(d2) < 10 else f'{d2}'
import_table = f"{site_name}_asin_detail_month_{d1}_{d2}"
sql_query = f"""
select
id,
asin,
img_url,
title,
title_len,
price,
rating,
total_comments,
buy_box_seller_type,
page_inventory,
category,
volume,
weight,
rank,
{date_col},
category_state,
img_num,
img_type,
activity_type,
one_two_val,
three_four_val,
five_six_val,
eight_val,
qa_num,
one_star,
two_star,
three_star,
four_star,
five_star,
low_star,
together_asin,
brand,
ac_name,
material,
node_id,
data_type,
sp_num,
{new_col}
from {import_table}
where 1=1
and \$CONDITIONS
"""
# 进行schema和数据校验
CommonUtil.check_schema_before_import(db_type=db_type,
site_name=site_name,
query=sql_query,
hive_tb_name=hive_table,
msg_usr=['chenyuanjie'],
partition_dict=partition_dict)
# 生成导出脚本
import_sh = CommonUtil.build_import_sh(site_name=site_name,
db_type=db_type,
query=sql_query,
hdfs_path=hdfs_path,
map_num=50,
key='id')
# 导入前先删除原始hdfs数据
HdfsUtils.delete_hdfs_file(hdfs_path)
# 创建ssh Client对象--用于执行cmd命令
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, import_sh, ignore_err=False)
# 创建lzo索引和修复元数据
CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_table)
# 关闭链接
client.close()
# 导入后检测--检测数据一致性
if date_type != 'month_week':
CommonUtil.check_import_sync_num(db_type=db_type,
partition_dict=partition_dict,
import_query=sql_query,
hive_tb_name=hive_table,
msg_usr=['chenyuanjie'])
# 导入后验证--重点字段阈值预警
CommonUtil.check_fields_and_warning(hive_tb_name=hive_table, partition_dict=partition_dict)
cols = "id, asin, img_url, title, title_len, price, rating, total_comments, buy_box_seller_type, page_inventory, " \
"category, volume, weight, rank, launch_time, created_time as created_at, updated_time as updated_at, " \
"category_state, img_num, img_type, activity_type, one_two_val, three_four_val, five_six_val, eight_val, " \
"qa_num, one_star, two_star, three_star, four_star, five_star, low_star, together_asin, brand, ac_name, " \
"material, node_id, data_type, sp_num, describe, weight_str, package_quantity, pattern_name, follow_sellers, " \
"product_description, buy_sales, image_view, spider_int, lob_asin_json, seller_json, customer_reviews_json, " \
"product_json, product_detail_json, review_ai_text, review_label_json, sp_initial_seen_asins_json, " \
"sp_4stars_initial_seen_asins_json, sp_delivery_initial_seen_asins_json, compare_similar_asin_json, " \
"together_asin_json, min_match_asin_json, variat_num, current_asin, img_list, variat_list, parent_asin, " \
"bundles_this_asins_json, video_m3u8_url, result_list_json, bundle_asin_component_json, review_json_list"
engine = get_remote_engine(
site_name=site_name,
db_type=db_type
)
engine.sqoop_raw_import(
query=f"SELECT {cols} FROM {import_table} WHERE 1=1 and $CONDITIONS",
hive_table=hive_table,
hdfs_path=hdfs_path,
partitions=partition_dict,
m=50,
split_by='id'
)
pass
......@@ -2,16 +2,17 @@ import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils
from utils.db_util import DbTypes
from utils.secure_db_client import get_remote_engine
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
assert site_name is not None, "site_name 不能为空!"
import_tb = f"{site_name}_all_syn_st_asin"
db_type = DbTypes.postgresql.name
import_tb = f"{site_name}_all_syn_st_asin"
query = f"""
select asin,
state,
......@@ -20,32 +21,22 @@ if __name__ == '__main__':
where state = 4
and \$CONDITIONS
"""
hive_tb = "ods_asin_err_state"
partition_dict = {
"site_name": site_name
}
hdfs_path = CommonUtil.build_hdfs_path(hive_tb, partition_dict=partition_dict)
print(f"hdfs_path is {hdfs_path}")
db_type = DbTypes.postgresql.name
empty_flag, check_flag = CommonUtil.check_schema_before_import(db_type=db_type,
site_name=site_name,
query=query,
hive_tb_name=hive_tb,
msg_usr=['wujicang']
)
assert check_flag, f"导入hive表{hive_tb}表结构检查失败!请检查query是否异常!!"
engine = get_remote_engine(
site_name=site_name,
db_type=db_type
)
engine.sqoop_raw_import(
query=query,
hive_table=hive_tb,
hdfs_path=hdfs_path,
partitions=partition_dict
)
if not empty_flag:
sh = CommonUtil.build_import_sh(site_name=site_name,
db_type=db_type,
query=query,
hdfs_path=hdfs_path)
# 导入前先删除
HdfsUtils.delete_hdfs_file(hdfs_path)
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, sh, ignore_err=False)
CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_tb)
client.close()
pass
......@@ -2,75 +2,59 @@ import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils
from utils.secure_db_client import get_remote_engine
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
assert site_name is not None, "site_name 不能为空!"
hive_tb = "ods_bs_category"
db_type = "mysql"
import_tb = f"{site_name}_bs_category"
partition_dict = {
"site_name": site_name,
}
hdfs_path = CommonUtil.build_hdfs_path(hive_tb, partition_dict=partition_dict)
print(f"hdfs_path is {hdfs_path}")
query = f"""
select
id,
p_id,
ch_name,
en_name,
nodes_num,
path,
is_show,
one_category_id,
and_en_name,
leaf_node,
delete_time,
full_name,
category_id,
category_parent_id,
category_first_id,
category_state,
redirect_flag,
redirect_first_id,
created_at,
updated_at
select
id,
p_id,
ch_name,
en_name,
nodes_num,
path,
is_show,
one_category_id,
and_en_name,
leaf_node,
delete_time,
full_name,
category_id,
category_parent_id,
category_first_id,
category_state,
redirect_flag,
redirect_first_id,
created_at,
updated_at
from {import_tb}
where 1 = 1
and \$CONDITIONS
"""
"""
empty_flag, check_flag = CommonUtil.check_schema_before_import(db_type=db_type,
site_name=site_name,
query=query,
hive_tb_name=hive_tb,
msg_usr=['chenyuanjie']
)
assert check_flag, f"导入hive表{hive_tb}表结构检查失败!请检查query是否异常!!"
hive_tb = "ods_bs_category"
partition_dict = {
"site_name": site_name,
}
hdfs_path = CommonUtil.build_hdfs_path(hive_tb, partition_dict=partition_dict)
if not empty_flag:
sh = CommonUtil.build_import_sh(site_name=site_name,
db_type=db_type,
query=query,
hdfs_path=hdfs_path)
# 导入前先删除
HdfsUtils.delete_hdfs_file(hdfs_path)
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, sh, ignore_err=False)
CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_tb)
client.close()
engine = get_remote_engine(
site_name=site_name,
db_type=db_type
)
# 导入后检测--检测数据一致性
CommonUtil.check_import_sync_num(db_type=db_type,
partition_dict=partition_dict,
import_query=query,
hive_tb_name=hive_tb,
msg_usr=['chenyuanjie']
)
engine.sqoop_raw_import(
query=query,
hive_table=hive_tb,
hdfs_path=hdfs_path,
partitions=partition_dict
)
pass
......@@ -2,9 +2,9 @@ import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil,DateTypes
from utils.hdfs_utils import HdfsUtils
from utils.common_util import CommonUtil
from utils.secure_db_client import get_remote_engine
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
......@@ -13,7 +13,19 @@ if __name__ == '__main__':
assert site_name is not None, "site_name 不能为空!"
assert date_type is not None, "date_type 不能为空!"
assert date_info is not None, "date_info 不能为空!"
year, week = date_info.split("-")
d1, d2 = CommonUtil.split_month_week_date(date_type, date_info)
d2 = f'0{d2}' if int(d2) < 10 else f'{d2}'
db_type = 'postgresql_14'
import_tb = f"{site_name}_bs_category_asin_detail_month_{d1}_{d2}"
query = f"""
select
id, asin, null as week, best_sellers_rank, created_time as created_at, updated_time as updated_at, last_herf, all_best_sellers_href
from {import_tb}
where 1=1
and \$CONDITIONS
"""
hive_tb = "ods_bs_category_asin_detail"
partition_dict = {
"site_name": site_name,
......@@ -21,77 +33,19 @@ if __name__ == '__main__':
"date_info": date_info,
}
hdfs_path = CommonUtil.build_hdfs_path(hive_tb, partition_dict=partition_dict)
print(f"hdfs_path is {hdfs_path}")
if date_type == DateTypes.week.name:
if site_name == "us":
if date_info >= '2023-18':
db_type = "postgresql"
if date_info >= '2023-34':
db_type = 'postgresql_14'
import_tb = f"{site_name}_bs_category_asin_detail_{year}_{week}"
cols = f"id,asin,{week} as week,best_sellers_rank,created_time as created_at,updated_time as updated_at,last_herf,all_best_sellers_href"
params = "1 = 1"
else:
db_type = "mysql"
import_tb = f"{site_name}_bs_category_asin_detail"
cols = "id,asin,week,best_sellers_rank,created_at,updated_at,last_herf"
params = f"week = {int(week)} and DATE_FORMAT(created_at,'%Y') = {year}"
else:
db_type = "postgresql_14"
import_tb = f"{site_name}_bs_category_asin_detail_{year}_{week}"
cols = f"id,asin,{week} as week,best_sellers_rank,created_time as created_at,updated_time as updated_at,last_herf,all_best_sellers_href"
params = "1 = 1"
if date_type == DateTypes.month.name or date_type == DateTypes.month_week.name:
# 日期拆分
d1, d2 = CommonUtil.split_month_week_date(date_type, date_info)
if site_name in ['us', 'uk', 'de']:
db_type = 'postgresql_14'
# pg的分区单位数是带0,如01、02、03
d2 = f'0{d2}' if int(d2) < 10 else f'{d2}'
cols = f"id,asin,null as week,best_sellers_rank,created_time as created_at,updated_time as updated_at,last_herf,all_best_sellers_href"
import_tb = f"{site_name}_bs_category_asin_detail_month_{d1}_{d2}"
params = f" 1=1 "
else:
print(f"其他站点{date_type}数据暂未明确,请检查是否dateType传输有误")
exit()
query = f"""
select
{cols}
from {import_tb}
where {params}
and \$CONDITIONS
"""
empty_flag, check_flag = CommonUtil.check_schema_before_import(db_type=db_type,
site_name=site_name,
query=query,
hive_tb_name=hive_tb,
msg_usr=['chenyuanjie']
)
assert check_flag, f"导入hive表{hive_tb}表结构检查失败!请检查query是否异常!!"
if not empty_flag:
sh = CommonUtil.build_import_sh(site_name=site_name,
db_type=db_type,
query=query,
hdfs_path=hdfs_path)
# 导入前先删除
HdfsUtils.delete_hdfs_file(hdfs_path)
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, sh, ignore_err=False)
CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_tb)
client.close()
# 导入后检测--检测数据一致性
CommonUtil.check_import_sync_num(db_type=db_type,
partition_dict=partition_dict,
import_query=query,
hive_tb_name=hive_tb,
msg_usr=['chenyuanjie']
)
engine = get_remote_engine(
site_name=site_name,
db_type=db_type
)
engine.sqoop_raw_import(
query=query,
hive_table=hive_tb,
hdfs_path=hdfs_path,
partitions=partition_dict,
m=50,
split_by='id'
)
pass
......@@ -2,72 +2,69 @@ import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils
from utils.secure_db_client import get_remote_engine
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
date_info = CommonUtil.get_sys_arg(2, None)
assert site_name is not None, "sitename 不能为空!"
assert date_info is not None, "date_info 不能为空!"
hive_tb = "ods_bs_category_top100_asin"
partition_dict = {
"site_name": site_name
}
hdfs_path = CommonUtil.build_hdfs_path(hive_tb, partition_dict=partition_dict)
print(f"hdfs_path is {hdfs_path}")
db_type = "mysql"
if date_info == 'all':
query = f"""
select id,
asin,
cate_1_id,
cate_current_id,
bsr_rank,
rating,
total_comments,
created_at as updated_at,
date_info,
category_id
select
id,
asin,
cate_1_id,
cate_current_id,
bsr_rank,
rating,
total_comments,
created_at as updated_at,
date_info,
category_id
from {site_name}_bs_category_top100_asin
where 1 = 1
and \$CONDITIONS
"""
pass
"""
else:
query = f"""
select id,
asin,
cate_1_id,
cate_current_id,
bsr_rank,
rating,
total_comments,
created_at as updated_at,
date_info,
category_id
select
id,
asin,
cate_1_id,
cate_current_id,
bsr_rank,
rating,
total_comments,
created_at as updated_at,
date_info,
category_id
from {site_name}_bs_category_top100_asin
where 1 = 1
and date_info = '{date_info}'
and \$CONDITIONS
"""
pass
empty_flag, check_flag = CommonUtil.check_schema_before_import(db_type=db_type,
site_name=site_name,
query=query,
hive_tb_name=hive_tb,
msg_usr=['wujicang'])
assert check_flag, f"导入hive表{hive_tb}表结构检查失败!请检查query是否异常!!"
"""
hive_tb = "ods_bs_category_top100_asin"
partition_dict = {
"site_name": site_name
}
hdfs_path = CommonUtil.build_hdfs_path(hive_tb, partition_dict=partition_dict)
engine = get_remote_engine(
site_name=site_name,
db_type=db_type
)
engine.sqoop_raw_import(
query=query,
hive_table=hive_tb,
hdfs_path=hdfs_path,
partitions=partition_dict
)
if not empty_flag:
sh = CommonUtil.build_import_sh(site_name=site_name,
db_type=db_type,
query=query,
hdfs_path=hdfs_path)
# 导入前先删除
HdfsUtils.delete_hdfs_file(hdfs_path)
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, sh, ignore_err=False)
CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_tb)
pass
......@@ -2,61 +2,45 @@ import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils
from utils.secure_db_client import get_remote_engine
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
assert site_name is not None, "site_name 不能为空!"
hive_tb = "ods_bsr_end"
db_type = "mysql"
import_tb = f"{site_name}_bsr_end"
query = f"""
select
id,
rank,
bsr_name,
created_at,
updated_at,
category_id
from {import_tb}
where 1 = 1
and \$CONDITIONS
"""
hive_tb = "ods_bsr_end"
partition_dict = {
"site_name": site_name,
}
hdfs_path = CommonUtil.build_hdfs_path(hive_tb, partition_dict=partition_dict)
print(f"hdfs_path is {hdfs_path}")
query = f"""
select
id,
rank,
bsr_name,
created_at,
updated_at,
category_id
from {import_tb}
where 1 = 1
and \$CONDITIONS
"""
engine = get_remote_engine(
site_name=site_name,
db_type=db_type
)
empty_flag, check_flag = CommonUtil.check_schema_before_import(db_type=db_type,
site_name=site_name,
query=query,
hive_tb_name=hive_tb,
msg_usr=['chenyuanjie']
)
assert check_flag, f"导入hive表{hive_tb}表结构检查失败!请检查query是否异常!!"
if not empty_flag:
sh = CommonUtil.build_import_sh(site_name=site_name,
db_type=db_type,
query=query,
hdfs_path=hdfs_path)
# 导入前先删除
HdfsUtils.delete_hdfs_file(hdfs_path)
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, sh, ignore_err=False)
CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_tb)
client.close()
# 导入后检测--检测数据一致性
CommonUtil.check_import_sync_num(db_type=db_type,
partition_dict=partition_dict,
import_query=query,
hive_tb_name=hive_tb,
msg_usr=['chenyuanjie']
)
engine.sqoop_raw_import(
query=query,
hive_table=hive_tb,
hdfs_path=hdfs_path,
partitions=partition_dict
)
pass
......@@ -2,76 +2,69 @@ import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils
from utils.secure_db_client import get_remote_engine
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
date_info = CommonUtil.get_sys_arg(2, None)
assert site_name is not None, "sitename 不能为空!"
assert date_info is not None, "date_info 不能为空!"
hive_tb = "ods_new_releases_top100_asin"
partition_dict = {
"site_name": site_name
}
hdfs_path = CommonUtil.build_hdfs_path(hive_tb, partition_dict=partition_dict)
print(f"hdfs_path is {hdfs_path}")
db_type = "mysql"
if date_info == 'all':
query = f"""
select id,
asin,
cate_1_id,
cate_current_id,
bsr_rank,
rating,
total_comments,
created_at as updated_at,
date_info,
category_id
select
id,
asin,
cate_1_id,
cate_current_id,
bsr_rank,
rating,
total_comments,
created_at as updated_at,
date_info,
category_id
from {site_name}_new_releases_top100_asin
where 1 = 1
and \$CONDITIONS
"""
pass
"""
else:
query = f"""
select id,
asin,
cate_1_id,
cate_current_id,
bsr_rank,
rating,
total_comments,
created_at as updated_at,
date_info,
category_id
select
id,
asin,
cate_1_id,
cate_current_id,
bsr_rank,
rating,
total_comments,
created_at as updated_at,
date_info,
category_id
from {site_name}_new_releases_top100_asin
where 1 = 1
and date_info = '{date_info}'
and \$CONDITIONS
"""
pass
"""
print("================================sql====================================")
print(query)
db_type = "mysql"
empty_flag, check_flag = CommonUtil.check_schema_before_import(db_type=db_type,
site_name=site_name,
query=query,
hive_tb_name=hive_tb,
msg_usr=['wujicang']
)
assert check_flag, f"导入hive表{hive_tb}表结构检查失败!请检查query是否异常!!"
hive_tb = "ods_new_releases_top100_asin"
partition_dict = {
"site_name": site_name
}
hdfs_path = CommonUtil.build_hdfs_path(hive_tb, partition_dict=partition_dict)
engine = get_remote_engine(
site_name=site_name,
db_type=db_type
)
engine.sqoop_raw_import(
query=query,
hive_table=hive_tb,
hdfs_path=hdfs_path,
partitions=partition_dict
)
if not empty_flag:
sh = CommonUtil.build_import_sh(site_name=site_name,
db_type=db_type,
query=query,
hdfs_path=hdfs_path)
# 导入前先删除
HdfsUtils.delete_hdfs_file(hdfs_path)
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, sh, ignore_err=False)
CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_tb)
pass
import os
import sys
import json
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
from utils.db_util import DBUtil
from utils.hdfs_utils import HdfsUtils
from utils.spark_util import SparkUtil
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql import functions as F
def vertify_data(hive_table,partition_dict):
# 获取计算分区
msg_params = ""
# 解析partition_dict获取分区查询条件
partition_conditions = []
for key, value in partition_dict.items():
if value is not None:
msg_params += f"{value} "
partition_conditions.append(f"{key} = '{value}'")
base_msg = f"{hive_table} {msg_params} "
site_name = partition_dict.get("site_name")
spark_session = SparkUtil.get_spark_sessionV3("check_fields_rule")
# 获取维护的字段验证配置表数据
config_table_query = f"""select * from hive_field_verify_config
where table_name ='{hive_table}'
and site_name = '{site_name}'
and use_flag = 1 """
conn_info = DBUtil.get_connection_info('mysql', 'us')
check_field_df = SparkUtil.read_jdbc_query(
session=spark_session,
url=conn_info["url"],
pwd=conn_info["pwd"],
username=conn_info["username"],
query=config_table_query
)
# 获取验证消息
check_field_list = check_field_df.select('field_name', 'verify_desc', 'verify_type', 'config_json',
'msg_usr_list').collect()
if not check_field_list:
print("============================无验证匹配条件跳过验证===================================")
exit()
# 创建一个df用于储存验证情况
# 定义列的结构
schema = StructType([
StructField("验证描述", StringType(), True),
StructField("验证类型", StringType(), True),
StructField("校验字段", StringType(), True),
StructField("校验条件查询数值", StringType(), True),
StructField("验证临界值", StringType(), True),
StructField("是否验证通过", IntegerType(), True),
])
# 使用定义的结构创建空的 DataFrame
check_df = spark_session.createDataFrame([], schema)
for row in check_field_list:
vertify_flag = True
field_name = row['field_name']
verify_type = row['verify_type']
config_json = json.loads(row['config_json'])
msg_usr = row['msg_usr_list']
msg_usr_list = [user.strip() for user in msg_usr.split(",")] if msg_usr else []
partition_conf_list = config_json['partition_conf']
for conf in partition_conf_list:
conf_site_name = conf["site_name"]
conf_date_type = conf["date_type"]
if site_name == conf_site_name and date_type == conf_date_type:
vertify_flag = True
break
else:
vertify_flag = False
# assert base_rate is not None, f"未配置{field_name}验证周期{date_type}的基准值,请检查!"
# 没有合适的匹配维度
if not vertify_flag:
break
if verify_type == "自定义sql验证":
base_num = conf['max_rate']
confirm_sql = str(config_json['confirm_sql'])
base_condition = ' AND '.join(partition_conditions)
# 需把sql语句中的base_condition用时间周期的语句进行替换
confirm_sql = confirm_sql.replace("base_condition", base_condition)
confirm_df = spark_session.sql(confirm_sql)
confirm_row = confirm_df.collect()[0]
# 提取自定义sql中的验证结果
confirm_num = confirm_row["confirm_num"]
confirm_result = confirm_row["confirm_result"]
result_df = spark_session.createDataFrame(
[(row['verify_desc'], verify_type, field_name, confirm_num, base_num, confirm_result)],
schema).repartition(1)
elif verify_type == "分类销量最大排名验证":
sql_condition = config_json['sql_condition']
base_num = conf['max_rate']
confirm_sql = CommonUtil.generate_min_max_query(hive_table, field_name, partition_dict)
# 拼接外部查询条件
if sql_condition:
confirm_sql = confirm_sql + f" AND {sql_condition} "
confirm_df = spark_session.sql(confirm_sql)
confirm_row = confirm_df.collect()[0]
confirm_num = confirm_row["max_value"]
if confirm_num:
# 必须要大于该基准值才校验通过
confirm_result = 1 if (confirm_num >= base_num) else 0
else:
confirm_result = 0
result_df = spark_session.createDataFrame(
[(row['verify_desc'], verify_type, field_name, confirm_num, base_num, confirm_result)],
schema).repartition(1)
check_df = check_df.unionByName(result_df, False)
if check_df.count() < 1:
print("无验证项验证")
exit()
check_df.show(50, truncate=False)
schema_flag = bool(check_df.select(F.min("是否验证通过").alias("result")).first().asDict()['result'])
# print(schema_flag)
if not schema_flag:
msg = f"数据表:{hive_table} {msg_params},计算数据存在验证不通过,请检查数据是否异常!!具体信息请查看日志!!"
CommonUtil.send_wx_msg(['fangxingjun','pengyanbing','chenjianyun'], f"\u26A0 {hive_table} {msg_params}数据导入验证异常", msg)
raise Exception(msg)
spark_session.stop()
pass
from utils.common_util import CommonUtil
from utils.secure_db_client import get_remote_engine
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
......@@ -138,72 +13,56 @@ if __name__ == '__main__':
assert site_name is not None, "site_name 不能为空!"
assert date_type is not None, "date_type 不能为空!"
assert date_info is not None, "date_info 不能为空!"
hive_tb = "ods_one_category_report"
db_type = "mysql"
assert date_type in ('week', 'month', 'month_week'), "入参date_type类型存在问题,请检查!"
assert date_type in ('week','month','month_week'), "入参date_type类型存在问题,请检查!"
db_type = "mysql"
engine = get_remote_engine(
site_name=site_name,
db_type=db_type
)
# 该表为月表,因此如果传入week周期进行判断,获取周对应的月维度
if (date_type == 'week') and (date_info is not None):
engine = DBUtil.get_db_engine('mysql', 'us')
sql = f"""select `year_month` from date_20_to_30 where `year_week`='{date_info}' and week_day = 1 """
result = DBUtil.engine_exec_sql(engine, sql)
result = engine.read_sql(sql=sql)
year_month = result.scalar()
print(f"当前传入的周期为周维度,date_type:{date_type},date_info:{date_info};对应转换月为:{year_month}")
engine.dispose()
date_type = 'month'
date_info = year_month
year, month = date_info.split("-")
import_tb = f"{site_name}_one_category_report"
query = f"""
select
id,
cate_1_id,
name,
rank,
orders,
orders_day,
`year_month`,
week,
created_at,
updated_at,
category_id
from {import_tb}
where `year_month` = '{year}_{int(month)}'
and \$CONDITIONS
"""
hive_tb = "ods_one_category_report"
partition_dict = {
"site_name": site_name,
"date_type": date_type,
"date_info": date_info
}
hdfs_path = CommonUtil.build_hdfs_path(hive_tb, partition_dict=partition_dict)
print(f"hdfs_path is {hdfs_path}")
import_tb = f"{site_name}_one_category_report"
cols = "id,cate_1_id,name,rank,orders,orders_day,`year_month`,week,created_at,updated_at,category_id"
query = f"""
select
{cols}
from {import_tb}
where `year_month` = '{year}_{int(month)}'
and \$CONDITIONS
"""
print(query)
empty_flag, check_flag = CommonUtil.check_schema_before_import(db_type=db_type,
site_name=site_name,
query=query,
hive_tb_name=hive_tb,
msg_usr=['fangxingjun','pengyanbin']
)
assert check_flag, f"导入hive表{hive_tb}表结构检查失败!请检查query是否异常!!"
if not empty_flag:
sh = CommonUtil.build_import_sh(site_name=site_name,
db_type=db_type,
query=query,
hdfs_path=hdfs_path)
# 导入前先删除
HdfsUtils.delete_hdfs_file(hdfs_path)
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, sh, ignore_err=False)
CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_tb)
client.close()
# 导入后检测--检测数据一致性
CommonUtil.check_import_sync_num(db_type=db_type,
partition_dict=partition_dict,
import_query=query,
hive_tb_name=hive_tb,
msg_usr=['fangxingjun','pengyanbin']
)
vertify_data(hive_table=hive_tb, partition_dict=partition_dict)
engine.sqoop_raw_import(
query=query,
hive_table=hive_tb,
hdfs_path=hdfs_path,
partitions=partition_dict
)
pass
"""
@Author : HuangJian
@Description : 各站点店铺asin详情表-- 月抓取
@SourceTable : us_other_search_term_data_2023_18
@SinkTable : ods_other_search_term_data
@CreateTime : 2022/05/23 09:55
@UpdateTime : 2022/05/23 09:55
"""
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
from utils.common_util import DateTypes
from utils.hdfs_utils import HdfsUtils
from utils.common_util import CommonUtil
from utils.secure_db_client import get_remote_engine
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
......@@ -25,96 +14,51 @@ if __name__ == '__main__':
assert date_type is not None, "date_type 不能为空!"
assert date_info is not None, "date_info 不能为空!"
hive_table = f"ods_other_search_term_data"
db_type = 'postgresql_14'
d1, d2 = CommonUtil.split_month_week_date(date_type, date_info)
d2 = f'0{d2}' if int(d2) < 10 else f'{d2}'
import_table = f"{site_name}_other_search_term_month_{d1}_{d2}"
sql_query = f"""
select
id,
search_term,
asin,
page,
buy_data,
label,
created_time,
updated_time,
asin_brand
from {import_table}
where 1=1
and \$CONDITIONS
"""
if site_name == 'us':
map_num = 20
else:
map_num = 5
hive_table = "ods_other_search_term_data"
partition_dict = {
"site_name": site_name,
"date_type": date_type,
"date_info": date_info
}
# 落表路径校验
hdfs_path = CommonUtil.build_hdfs_path(hive_table, partition_dict=partition_dict)
print(f"hdfs_path is {hdfs_path}")
# 日期拆分
d1, d2 = CommonUtil.split_month_week_date(date_type, date_info)
db_type = ''
if date_type == DateTypes.week.name:
d2 = f'0{d2}' if int(d2) < 10 else f'{d2}'
if site_name == 'us' and date_info >= '2023-18':
db_type = 'postgresql'
if date_info >= '2023-34':
db_type = 'postgresql_14'
# pg的分区周单位数是带0,如01、02、03
import_table = f"{site_name}_other_search_term_{d1}_{d2}"
else:
db_type = 'postgresql_14'
import_table = f"{site_name}_other_search_term_{d1}_{d2}"
if date_type == DateTypes.month.name or date_type == DateTypes.month_week.name:
if site_name in ['us', 'uk', 'de']:
db_type = 'postgresql_14'
# pg的分区单位数是带0,如01、02、03
d2 = f'0{d2}' if int(d2) < 10 else f'{d2}'
import_table = f"{site_name}_other_search_term_month_{d1}_{d2}"
else :
print(f"其他站点{date_type}数据暂未明确,请检查是否dateType传输有误")
exit()
assert db_type != '', "未获取到db_type,请检查!"
sql_query = f"""
select
id,
search_term,
asin,
page,
buy_data,
label,
created_time,
updated_time,
asin_brand
from {import_table}
where 1=1
and \$CONDITIONS
"""
# 进行schema和数据校验
if site_name not in ('fr', 'it', 'es'):
CommonUtil.check_schema_before_import(db_type=db_type,
site_name=site_name,
query=sql_query,
hive_tb_name=hive_table,
msg_usr=['fangxingjun','pengyanbing','chenyuanjie']
,partition_dict = partition_dict)
engine = get_remote_engine(
site_name=site_name,
db_type=db_type
)
if site_name == 'us':
map_num = 20
else:
map_num = 5
# 生成导出脚本
import_sh = CommonUtil.build_import_sh(site_name=site_name,
db_type=db_type,
query=sql_query,
hdfs_path=hdfs_path,
map_num=map_num,
key='id'
)
# 导入前先删除原始hdfs数据
HdfsUtils.delete_hdfs_file(hdfs_path)
# 创建ssh Client对象--用于执行cmd命令
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, import_sh, ignore_err=False)
# 创建lzo索引和修复元数据
CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_table)
# 关闭链接
client.close()
engine.sqoop_raw_import(
query=sql_query,
hive_table=hive_table,
hdfs_path=hdfs_path,
partitions=partition_dict,
m=map_num,
split_by='id'
)
# 导入后检测--检测同步数据数据量的一致性
CommonUtil.check_import_sync_num(db_type=db_type,
partition_dict=partition_dict,
import_query=sql_query,
hive_tb_name=hive_table,
msg_usr=['fangxingjun','pengyanbing','chenyuanjie'])
\ No newline at end of file
pass
......@@ -2,10 +2,9 @@ import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
from utils.common_util import DateTypes
from utils.hdfs_utils import HdfsUtils
from utils.secure_db_client import get_remote_engine
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
......@@ -21,8 +20,6 @@ if __name__ == '__main__':
print("uk站点已无ac类型词,退出执行!")
sys.exit(0)
hive_tb = f"ods_search_term_{st_type}"
if st_type in ["zr", "sp"]:
cols = "search_term,asin,page,page_row,created_time,updated_time,id"
elif st_type in ["sb", "tr"]:
......@@ -30,92 +27,48 @@ if __name__ == '__main__':
else:
cols = "search_term,asin,page,created_time,updated_time,id"
# 日期拆分
db_type = 'postgresql_14'
d1, d2 = CommonUtil.split_month_week_date(date_type, date_info)
if date_type == DateTypes.week.name:
d2 = f'0{d2}' if int(d2) < 10 else f'{d2}'
if site_name == 'us' and date_info >= '2023-18':
db_type = 'postgresql'
# pg的分区周单位数是带0,如01、02、03
if date_info >= '2023-34':
db_type = 'postgresql_14'
import_tb = f"{site_name}_search_term_rank_{st_type}_{d1}_{d2}"
else:
db_type = 'postgresql_14'
import_tb = f"{site_name}_search_term_rank_{st_type}_{d1}_{d2}"
if date_type == DateTypes.month.name or date_type == DateTypes.month_week.name:
if site_name in ['us', 'uk', 'de']:
db_type = 'postgresql_14'
# pg的分区单位数是带0,如01、02、03
d2 = f'0{d2}' if int(d2) < 10 else f'{d2}'
import_tb = f"{site_name}_search_term_rank_{st_type}_month_{d1}_{d2}"
else :
print(f"其他站点{date_type}数据暂未明确,请检查是否dateType传输有误")
exit()
d2 = f'0{d2}' if int(d2) < 10 else f'{d2}'
import_tb = f"{site_name}_search_term_rank_{st_type}_month_{d1}_{d2}"
query = f"""
select {cols}
from {import_tb}
where 1 = 1
and \$CONDITIONS
select {cols} from {import_tb} where 1 = 1 and \$CONDITIONS
"""
print(f"当前链接的数据库为:{db_type},同步的表为:{import_tb}")
hive_tb = f"ods_search_term_{st_type}"
partition_dict = {
"site_name": site_name,
"date_type": date_type,
"date_info": date_info,
}
hdfs_path = CommonUtil.build_hdfs_path(hive_tb, partition_dict=partition_dict)
print(f"hdfs_path is {hdfs_path}")
if st_type in ['er', 'tr']:
empty_flag = False
print(f"st_type类型为{st_type},符合不检测类型跳过检测!")
else:
empty_flag, check_flag = CommonUtil.check_schema_before_import(db_type=db_type,
site_name=site_name,
query=query,
hive_tb_name=hive_tb,
msg_usr=['fangxingjun','pengyanbing','chenyuanjie'],
partition_dict=partition_dict
)
assert check_flag, f"导入hive表{hive_tb}表结构检查失败!请检查query是否异常!!"
if not empty_flag:
# zr的数据量较大,同步时进行多进程同步
if st_type in ['zr']:
sh = CommonUtil.build_import_sh(site_name=site_name,
db_type=db_type,
query=query,
hdfs_path=hdfs_path,
map_num=10,
key='id')
if st_type == "zr":
if site_name == "us":
map_num = 40
else:
sh = CommonUtil.build_import_sh(site_name=site_name,
db_type=db_type,
query=query,
hdfs_path=hdfs_path)
# 导入前先删除
HdfsUtils.delete_hdfs_file(hdfs_path)
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, sh, ignore_err=False)
CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_tb)
client.close()
# 导入后检测--检测数据一致性
if date_type != 'month_week':
CommonUtil.check_import_sync_num(db_type=db_type,
partition_dict=partition_dict,
import_query=query,
hive_tb_name=hive_tb,
msg_usr=['fangxingjun','pengyanbing','chenyuanjie'])
map_num = 15
elif st_type in ["sb", "sp"]:
if site_name == "us":
map_num = 6
else:
map_num = 2
else:
map_num = 1
engine = get_remote_engine(
site_name=site_name,
db_type=db_type
)
engine.sqoop_raw_import(
query=query,
hive_table=hive_tb,
hdfs_path=hdfs_path,
partitions=partition_dict,
m=map_num,
split_by='id'
)
pass
......@@ -2,61 +2,42 @@ import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils
from utils.secure_db_client import get_remote_engine
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
assert site_name is not None, "site_name 不能为空!"
hive_tb = "ods_self_asin"
partition_dict = {
"site_name": site_name,
}
hdfs_path = CommonUtil.build_hdfs_path(hive_tb, partition_dict=partition_dict)
print(f"hdfs_path is {hdfs_path}")
db_type = "mysql"
query = f"""
select
id,
asin,
created_at as created_time,
updated_at as updated_time
id,
asin,
created_at as created_time,
updated_at as updated_time
from {site_name}_self_asin
where 1 = 1
and \$CONDITIONS
"""
db_type = "mysql"
empty_flag, check_flag = CommonUtil.check_schema_before_import(db_type=db_type,
site_name=site_name,
query=query,
hive_tb_name=hive_tb,
msg_usr=['chenyuanjie']
)
assert check_flag, f"导入hive表{hive_tb}表结构检查失败!请检查query是否异常!!"
if not empty_flag:
sh = CommonUtil.build_import_sh(site_name=site_name,
db_type=db_type,
query=query,
hdfs_path=hdfs_path)
# 导入前先删除
HdfsUtils.delete_hdfs_file(hdfs_path)
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, sh, ignore_err=False)
CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_tb)
client.close()
# 导入后检测--检测数据一致性
CommonUtil.check_import_sync_num(db_type=db_type,
partition_dict=partition_dict,
import_query=query,
hive_tb_name=hive_tb,
msg_usr=['chenyuanjie'])
hive_tb = "ods_self_asin"
partition_dict = {
"site_name": site_name,
}
hdfs_path = CommonUtil.build_hdfs_path(hive_tb, partition_dict=partition_dict)
engine = get_remote_engine(
site_name=site_name,
db_type=db_type
)
engine.sqoop_raw_import(
query=query,
hive_table=hive_tb,
hdfs_path=hdfs_path,
partitions=partition_dict
)
pass
......@@ -2,96 +2,89 @@ import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils
from utils.db_util import DbTypes
from utils.secure_db_client import get_remote_engine
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
date_type = CommonUtil.get_sys_arg(2, None)
date_info = CommonUtil.get_sys_arg(3, None)
assert site_name is not None, "sitename 不能为空!"
assert site_name is not None, "site_name 不能为空!"
assert date_info is not None, "date_info 不能为空!"
year = CommonUtil.reformat_date(date_info, "%Y-%m-%d", "%Y", )
hive_tb = "ods_self_asin_detail"
partition_dict = {
"site_name": site_name,
"date_type": date_type,
"date_info": date_info,
}
hdfs_path = CommonUtil.build_hdfs_path(hive_tb, partition_dict=partition_dict)
print(f"hdfs_path is {hdfs_path}")
db_type = DbTypes.postgresql.name
year = CommonUtil.reformat_date(date_info, "%Y-%m-%d", "%Y", )
query = f"""
select
asin,
img_url,
title,
title_len,
price,
rating,
total_comments,
buy_box_seller_type,
page_inventory,
category,
volume,
weight,
rank,
launch_time,
video_url,
add_url,
material,
created_at,
img_num,
img_type,
qa_num,
brand,
ac_name,
node_id,
sp_num,
mpn,
online_time,
describe,
one_star,
two_star,
three_star,
four_star,
five_star,
low_star,
asin_type,
is_coupon,
search_category,
weight_str,
account_name,
other_seller_name,
account_id
asin,
img_url,
title,
title_len,
price,
rating,
total_comments,
buy_box_seller_type,
page_inventory,
category,
volume,
weight,
rank,
launch_time,
video_url,
add_url,
material,
created_at,
img_num,
img_type,
qa_num,
brand,
ac_name,
node_id,
sp_num,
mpn,
online_time,
describe,
one_star,
two_star,
three_star,
four_star,
five_star,
low_star,
asin_type,
is_coupon,
search_category,
weight_str,
account_name,
other_seller_name,
account_id
from {site_name}_self_asin_detail_{year}
where 1 = 1
and site = '{site_name}'
and bsr_date_info = '{date_info}'
and date_info >= '{date_info}'
and \$CONDITIONS
"""
print("sql ======================================================")
print(query)
db_type = DbTypes.postgresql.name
empty_flag, check_flag = CommonUtil.check_schema_before_import(db_type=db_type,
site_name=site_name,
query=query,
hive_tb_name=hive_tb,
msg_usr=['wujicang']
)
assert check_flag, f"导入hive表{hive_tb}表结构检查失败!请检查query是否异常!!"
"""
hive_tb = "ods_self_asin_detail"
partition_dict = {
"site_name": site_name,
"date_type": date_type,
"date_info": date_info,
}
hdfs_path = CommonUtil.build_hdfs_path(hive_tb, partition_dict=partition_dict)
engine = get_remote_engine(
site_name=site_name,
db_type=db_type
)
engine.sqoop_raw_import(
query=query,
hive_table=hive_tb,
hdfs_path=hdfs_path,
partitions=partition_dict
)
if not empty_flag:
sh = CommonUtil.build_import_sh(site_name=site_name,
db_type=db_type,
query=query,
hdfs_path=hdfs_path)
# 导入前先删除
HdfsUtils.delete_hdfs_file(hdfs_path)
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, sh, ignore_err=False)
CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_tb)
pass
......@@ -3,22 +3,15 @@ import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils
from utils.secure_db_client import get_remote_engine
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
assert site_name is not None, "site_name 不能为空!"
hive_table = "ods_self_asin_related_traffic"
partition_dict = {"site_name": site_name}
hdfs_path = CommonUtil.build_hdfs_path(hive_table, partition_dict=partition_dict)
print(f"hdfs_path is {hdfs_path}")
db_type = 'mysql'
import_table = f"{site_name}_self_asin_detail"
sql_query = f"""
select
id,
......@@ -37,17 +30,29 @@ if __name__ == '__main__':
and \$CONDITIONS
"""
# 生成导出脚本
import_sh = CommonUtil.build_import_sh(
site_name=site_name, db_type=db_type, query=sql_query, hdfs_path=hdfs_path, map_num=25, key='id'
hive_table = "ods_self_asin_related_traffic"
partition_dict = {
"site_name": site_name
}
hdfs_path = CommonUtil.build_hdfs_path(hive_table, partition_dict=partition_dict)
if site_name == 'us':
map_num = 25
else:
map_num = 1
engine = get_remote_engine(
site_name=site_name,
db_type=db_type
)
engine.sqoop_raw_import(
query=sql_query,
hive_table=hive_table,
hdfs_path=hdfs_path,
partitions=partition_dict,
m=map_num,
split_by='id'
)
# 导入前先删除原始hdfs数据
HdfsUtils.delete_hdfs_file(hdfs_path)
# 创建ssh Client对象--用于执行cmd命令
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, import_sh, ignore_err=False)
# 创建lzo索引和修复元数据
CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_table)
# 关闭链接
client.close()
pass
......@@ -2,11 +2,10 @@ import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
from utils.common_util import DateTypes
from utils.hdfs_utils import HdfsUtils
from utils.secure_db_client import get_remote_engine
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
......@@ -16,17 +15,6 @@ if __name__ == '__main__':
assert date_type is not None, "date_type 不能为空!"
assert date_info is not None, "date_info 不能为空!"
hive_table = f"ods_seller_account_feedback"
partition_dict = {
"site_name": site_name,
"date_type": date_type,
"date_info": date_info
}
# 落表路径校验
hdfs_path = CommonUtil.build_hdfs_path(hive_table, partition_dict=partition_dict)
print(f"hdfs_path is {hdfs_path}")
suffix = str(date_info).replace("-", "_")
import_table = f"{site_name}_seller_account_feedback_{suffix}"
if date_type == DateTypes.month.name and date_info >= '2023-08':
......@@ -51,24 +39,24 @@ if __name__ == '__main__':
and \$CONDITIONS
"""
# 进行schema和数据校验
CommonUtil.check_schema_before_import(db_type=db_type,
site_name=site_name,
query=sql_query,
hive_tb_name=hive_table,
msg_usr=['chenyuanjie'])
hive_table = "ods_seller_account_feedback"
partition_dict = {
"site_name": site_name,
"date_type": date_type,
"date_info": date_info
}
hdfs_path = CommonUtil.build_hdfs_path(hive_table, partition_dict=partition_dict)
engine = get_remote_engine(
site_name=site_name,
db_type=db_type
)
engine.sqoop_raw_import(
query=sql_query,
hive_table=hive_table,
hdfs_path=hdfs_path,
partitions=partition_dict
)
# 生成导出脚本
import_sh = CommonUtil.build_import_sh(site_name=site_name,
db_type=db_type,
query=sql_query,
hdfs_path=hdfs_path)
# 导入前先删除原始hdfs数据
HdfsUtils.delete_hdfs_file(hdfs_path)
# 创建ssh Client对象--用于执行cmd命令
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, import_sh, ignore_err=False)
# 创建lzo索引和修复元数据
CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_table)
# 关闭链接
client.close()
pass
"""
@Author : HuangJian
@Description : 各站点店铺名称与店铺id关系全量表--传参为单站点
@SourceTable : us_seller_account_feedback
@SinkTable : ods_seller_account_feedback
@CreateTime : 2022/05/19 14:55
@UpdateTime : 2022/05/19 14:55
"""
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.db_util import DBUtil
from utils.common_util import CommonUtil
from utils.common_util import DateTypes
from utils.hdfs_utils import HdfsUtils
from utils.spark_util import SparkUtil
from utils.common_util import CommonUtil
from utils.secure_db_client import get_remote_engine
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
assert site_name is not None, "site_name 不能为空!"
hive_table = f"ods_seller_account_syn"
db_type = 'mysql'
import_table = f"{site_name}_seller_account_syn_distinct"
sql_query = f"""
select
id,
account_name,
url,
state,
created_at,
updated_at,
seller_id
from {import_table}
where 1=1
and \$CONDITIONS
"""
hive_table = "ods_seller_account_syn"
partition_dict = {
"site_name": site_name
}
# 落表路径校验
hdfs_path = CommonUtil.build_hdfs_path(hive_table, partition_dict=partition_dict)
print(f"hdfs_path is {hdfs_path}")
import_table = f"{site_name}_seller_account_syn_distinct"
db_type = 'mysql'
sql_query = f"""
select
id,
account_name,
url,
state,
created_at,
updated_at,
seller_id
from {import_table}
where 1=1
and \$CONDITIONS
"""
engine = get_remote_engine(
site_name=site_name,
db_type=db_type
)
# 进行schema和数据校验
CommonUtil.check_schema_before_import(db_type=db_type,
site_name=site_name,
query=sql_query,
hive_tb_name=hive_table,
msg_usr=['chenyuanjie'])
engine.sqoop_raw_import(
query=sql_query,
hive_table=hive_table,
hdfs_path=hdfs_path,
partitions=partition_dict
)
# 生成导出脚本
import_sh = CommonUtil.build_import_sh(site_name=site_name,
db_type=db_type,
query=sql_query,
hdfs_path=hdfs_path)
# 导入前先删除原始hdfs数据
HdfsUtils.delete_hdfs_file(hdfs_path)
# 创建ssh Client对象--用于执行cmd命令
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, import_sh, ignore_err=False)
# 创建lzo索引和修复元数据
CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_table)
# 关闭链接
client.close()
pass
"""
@Author : HuangJian
@Description : 各站点店铺名称与asin关系全量表--传参为单站点
@SourceTable : us_seller_account_feedback
@SinkTable : ods_seller_account_feedback
@CreateTime : 2022/05/19 14:55
@UpdateTime : 2022/05/19 14:55
"""
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.db_util import DBUtil
from utils.common_util import CommonUtil
from utils.common_util import DateTypes
from utils.hdfs_utils import HdfsUtils
from utils.spark_util import SparkUtil
from utils.common_util import CommonUtil
from utils.secure_db_client import get_remote_engine
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
assert site_name is not None, "site_name 不能为空!"
hive_table = f"ods_seller_asin_account"
db_type = 'mysql'
import_table = f"{site_name}_seller_asin_account"
sql_query = f"""
select
id,
account_name,
asin,
created_at,
updated_at,
seller_id
from {import_table}
where 1=1
and \$CONDITIONS
"""
hive_table = "ods_seller_asin_account"
partition_dict = {
"site_name": site_name
}
# 落表路径校验
hdfs_path = CommonUtil.build_hdfs_path(hive_table, partition_dict=partition_dict)
print(f"hdfs_path is {hdfs_path}")
import_table = f"{site_name}_seller_asin_account"
db_type = 'mysql'
sql_query = f"""
select
id,
account_name,
asin,
created_at,
updated_at,
seller_id
from {import_table}
where 1=1
and \$CONDITIONS
"""
# 进行schema和数据校验
CommonUtil.check_schema_before_import(db_type=db_type,
site_name=site_name,
query=sql_query,
hive_tb_name=hive_table,
msg_usr=['chenyuanjie'])
# 生成导出脚本
import_sh = CommonUtil.build_import_sh(site_name=site_name,
db_type=db_type,
query=sql_query,
hdfs_path=hdfs_path,
map_num=10,
key='id')
# 导入前先删除原始hdfs数据
HdfsUtils.delete_hdfs_file(hdfs_path)
# 创建ssh Client对象--用于执行cmd命令
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, import_sh, ignore_err=False)
# 创建lzo索引和修复元数据
CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_table)
# 关闭链接
client.close()
if site_name == 'us':
map_num = 100
else:
map_num = 40
engine = get_remote_engine(
site_name=site_name,
db_type=db_type
)
engine.sqoop_raw_import(
query=sql_query,
hive_table=hive_table,
hdfs_path=hdfs_path,
partitions=partition_dict,
m=map_num,
split_by='id'
)
pass
"""
@Author : HuangJian
@Description : 各站点店铺asin详情表-- 月抓取
@SourceTable : us_asin_detail_product_2023
@SinkTable : ods_asin_detail_product
@CreateTime : 2022/05/19 14:55
@UpdateTime : 2022/05/19 14:55
"""
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.db_util import DBUtil
from utils.common_util import CommonUtil
from utils.common_util import DateTypes
from utils.hdfs_utils import HdfsUtils
from utils.spark_util import SparkUtil
from utils.secure_db_client import get_remote_engine
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
......@@ -29,21 +17,8 @@ if __name__ == '__main__':
# 该表现在为月同步表,因此增加月类型校验
assert date_type == DateTypes.month.name, "date_type类型不对,应为month"
hive_table = f"ods_asin_detail_product"
partition_dict = {
"site_name": site_name,
"date_type": date_type,
"date_info": date_info
}
# 落表路径校验
hdfs_path = CommonUtil.build_hdfs_path(hive_table, partition_dict=partition_dict)
print(f"hdfs_path is {hdfs_path}")
# 日期拆分
suffix = str(date_info).replace("-", "_")
import_table = f"{site_name}_seller_asin_product_{suffix}"
# db_type = 'postgresql'
if date_type == DateTypes.month.name and date_info >= '2023-08':
db_type = 'postgresql_14'
else:
......@@ -51,44 +26,51 @@ if __name__ == '__main__':
print("当前链接的数据库为:", db_type)
sql_query = f"""
select
id,
null as account_id,
asin,
title,
img_url,
price,
rating,
total_comments,
null as week,
row_num,
created_at,
updated_at,
null as month,
seller_id
from {import_table}
where 1=1
and \$CONDITIONS
"""
select
id,
null as account_id,
asin,
title,
img_url,
price,
rating,
total_comments,
null as week,
row_num,
created_at,
updated_at,
null as month,
seller_id
from {import_table}
where 1=1
and \$CONDITIONS
"""
hive_table = "ods_asin_detail_product"
partition_dict = {
"site_name": site_name,
"date_type": date_type,
"date_info": date_info
}
hdfs_path = CommonUtil.build_hdfs_path(hive_table, partition_dict=partition_dict)
if site_name == 'us':
map_num = 8
else:
map_num = 3
engine = get_remote_engine(
site_name=site_name,
db_type=db_type
)
# 进行schema和数据校验
CommonUtil.check_schema_before_import(db_type=db_type,
site_name=site_name,
query=sql_query,
hive_tb_name=hive_table,
msg_usr=['chenyuanjie'])
engine.sqoop_raw_import(
query=sql_query,
hive_table=hive_table,
hdfs_path=hdfs_path,
partitions=partition_dict,
m=map_num,
split_by='id'
)
# 生成导出脚本
import_sh = CommonUtil.build_import_sh(site_name=site_name,
db_type=db_type,
query=sql_query,
hdfs_path=hdfs_path)
# 导入前先删除原始hdfs数据
HdfsUtils.delete_hdfs_file(hdfs_path)
# 创建ssh Client对象--用于执行cmd命令
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, import_sh, ignore_err=False)
# 创建lzo索引和修复元数据
CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_table)
# 关闭链接
client.close()
pass
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
from utils.common_util import DateTypes
from utils.hdfs_utils import HdfsUtils
from utils.common_util import CommonUtil
from utils.secure_db_client import get_remote_engine
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
......@@ -17,75 +14,51 @@ if __name__ == '__main__':
assert date_type is not None, "date_type 不能为空!"
assert date_info is not None, "date_info 不能为空!"
hive_table = f"ods_st_quantity_being_sold"
db_type = 'postgresql_14'
d1, d2 = CommonUtil.split_month_week_date(date_type, date_info)
d2 = f'0{d2}' if int(d2) < 10 else f'{d2}'
import_table = f"{site_name}_brand_analytics_month_{d1}_{d2}"
sql_query = f"""
select
id,
search_term,
quantity_being_sold,
date_info as date_flag,
created_time,
updated_time,
quantity_being_sold_str,
result_count,
departments
from {import_table}
where 1=1
and \$CONDITIONS
"""
hive_table = "ods_st_quantity_being_sold"
partition_dict = {
"site_name": site_name,
"date_type": date_type,
"date_info": date_info
}
# 落表路径校验
hdfs_path = CommonUtil.build_hdfs_path(hive_table, partition_dict=partition_dict)
print(f"hdfs_path is {hdfs_path}")
# 日期拆分
d1, d2 = CommonUtil.split_month_week_date(date_type, date_info)
if date_type == DateTypes.week.name:
d2 = f'0{d2}' if int(d2) < 10 else f'{d2}'
if site_name == 'us' and date_info >= '2023-18':
db_type = 'postgresql'
if date_info >= '2023-34':
db_type = 'postgresql_14'
else:
db_type = 'postgresql_14'
import_table = f"{site_name}_brand_analytics_{d1}_{d2}"
if site_name == 'us':
map_num = 4
else:
map_num = 1
if date_type == DateTypes.month.name or date_type == DateTypes.month_week.name:
if site_name in ['us', 'uk', 'de']:
db_type = 'postgresql_14'
# pg的分区单位数是带0,如01、02、03
d2 = f'0{d2}' if int(d2) < 10 else f'{d2}'
import_table = f"{site_name}_brand_analytics_month_{d1}_{d2}"
else :
print(f"其他站点{date_type}数据暂未明确,请检查是否dateType传输有误")
exit()
sql_query = f"""
select
id,
search_term,
quantity_being_sold,
date_info as date_flag,
created_time,
updated_time,
quantity_being_sold_str,
result_count,
departments
from {import_table}
where 1=1
and \$CONDITIONS
"""
engine = get_remote_engine(
site_name=site_name,
db_type=db_type
)
# 进行schema和数据校验
if site_name not in ('fr', 'it', 'es'):
CommonUtil.check_schema_before_import(db_type=db_type,
site_name=site_name,
query=sql_query,
hive_tb_name=hive_table,
msg_usr=['fangxingjun','chenyuanjie'])
engine.sqoop_raw_import(
query=sql_query,
hive_table=hive_table,
hdfs_path=hdfs_path,
partitions=partition_dict,
m=map_num,
split_by='id'
)
# 生成导出脚本
import_sh = CommonUtil.build_import_sh(site_name=site_name,
db_type=db_type,
query=sql_query,
hdfs_path=hdfs_path)
# 导入前先删除原始hdfs数据
HdfsUtils.delete_hdfs_file(hdfs_path)
# 创建ssh Client对象--用于执行cmd命令
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, import_sh, ignore_err=False)
# 创建lzo索引和修复元数据
CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_table)
# 关闭链接
client.close()
pass
......@@ -2,62 +2,41 @@ import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils
from utils.secure_db_client import get_remote_engine
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
assert site_name is not None, "site_name 不能为空!"
hive_tb = "ods_theme"
db_type = "mysql"
partition_dict = {
"site_name": site_name
}
hdfs_path = CommonUtil.build_hdfs_path(hive_tb, partition_dict=partition_dict)
print(f"hdfs_path is {hdfs_path}")
import_tb = f"{site_name}_theme"
cols = "id,theme_type_en,theme_type_ch,theme_en,theme_ch,created_at,updated_at"
cols = "id, theme_type_en, theme_type_ch, theme_en, theme_ch, created_at, updated_at"
query = f"""
select
{cols}
{cols}
from {import_tb}
where 1 = 1
and \$CONDITIONS
"""
empty_flag, check_flag = CommonUtil.check_schema_before_import(db_type=db_type,
site_name=site_name,
query=query,
hive_tb_name=hive_tb,
msg_usr=['chenyuanjie']
)
assert check_flag, f"导入hive表{hive_tb}表结构检查失败!请检查query是否异常!!"
if not empty_flag:
sh = CommonUtil.build_import_sh(site_name=site_name,
db_type=db_type,
query=query,
hdfs_path=hdfs_path)
# 导入前先删除
HdfsUtils.delete_hdfs_file(hdfs_path)
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, sh, ignore_err=False)
CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_tb)
client.close()
# 导入后检测--检测数据一致性
CommonUtil.check_import_sync_num(db_type=db_type,
partition_dict=partition_dict,
import_query=query,
hive_tb_name=hive_tb,
msg_usr=['chenyuanjie'])
hive_tb = "ods_theme"
partition_dict = {
"site_name": site_name
}
hdfs_path = CommonUtil.build_hdfs_path(hive_tb, partition_dict=partition_dict)
engine = get_remote_engine(
site_name=site_name,
db_type=db_type
)
engine.sqoop_raw_import(
query=query,
hive_table=hive_tb,
hdfs_path=hdfs_path,
partitions=partition_dict
)
pass
# author : wangrui
# data : 2023/3/9 15:50
from elasticsearch import Elasticsearch
......@@ -27,7 +25,7 @@ class EsUtils(object):
# 获取elasticsearch相关配置
@staticmethod
def get_es_options(es_index_name):
def get_es_options(es_index_name, pipeline_id):
return {
"es.nodes": EsUtils.__es_ip__,
"es.port": EsUtils.__es_port__,
......@@ -40,7 +38,8 @@ class EsUtils(object):
"es.batch.size.entries": "5000",
"es.nodes.wan.only": "false",
"es.batch.write.concurrency": "30",
"es.write.operation": "upsert"
"es.write.operation": "index",
"es.ingest.pipeline": f"{pipeline_id}"
}
# 获取elasticsearch中索引配置信息
......@@ -487,7 +486,6 @@ class EsUtils(object):
index_name_list = list(alias_info.keys())
return index_name_list
#删除索引别名
@staticmethod
def delete_index_alias(alias_name, client):
......@@ -500,7 +498,79 @@ class EsUtils(object):
else:
print("索引别名不存在!")
@staticmethod
def user_enrich_pipeline(client, pipeline_id, policy_name1, policy_name2):
pipeline_body = {
"description": "asin flow user mask pipeline",
"processors": [
{
"enrich": {
"policy_name": f"{policy_name1}",
"field": "asin",
"target_field": "policy_add_1",
"max_matches": 1,
"ignore_missing": True
},
},
{
"enrich": {
"policy_name": f"{policy_name2}",
"field": "category_id",
"target_field": "policy_add_2",
"max_matches": 1,
"ignore_missing": True
},
},
{
"set": {
"field": "usr_mask_type",
"value": "{{policy_add_1.usr_mask_type}}",
"ignore_empty_value": True
}
},
{
"set": {
"field": "usr_mask_progress",
"value": "{{policy_add_1.usr_mask_progress}}",
"ignore_empty_value": True
}
},
{
"set": {
"field": "package_quantity",
"value": "{{policy_add_1.package_quantity}}",
"ignore_empty_value": True
}
},
{
"set": {
"field": "usr_mask_type",
"value": "{{policy_add_2.usr_mask_type}}",
"ignore_empty_value": True
}
},
{
"remove": {
"field": "policy_add_1",
"ignore_missing": True
}
},
{
"remove": {
"field": "policy_add_2",
"ignore_missing": True
}
},
{
"convert": {
"field": "package_quantity",
"type": "integer",
"ignore_missing": True
}
}
]
}
client.ingest.put_pipeline(id=pipeline_id, body=pipeline_body)
if __name__ == '__main__':
pass
......@@ -680,13 +680,14 @@ def udf_extract_weight_format(weight_str: str):
# 分类提取-返回: 一级/当前分类id+一级/当前分类排名
# 参考dim_asin_bs_info.py使用
def udf_parse_bs_category(asin_bs_sellers_rank_lower, last_herf, all_best_sellers_href, cate_current_pattern,
cate_1_pattern):
cate_1_pattern, node_id):
"""
asin_bs_sellers_rank_lower: 底部分类字符串
last_herf: 最后一级分类链接
all_best_sellers_href: 所有分类链接
cate_current_pattern: 当前分类排名匹配规则
cate_1_pattern: 一级分类排名匹配规则
node_id: 页面头部抓取分类id
"""
# if (site_name == 'us' and date_type in ['month', 'month_week'] and date_info >= '2023-11') or (site_name != 'us' and date_type in ['week'] and date_info >= '2023-41'):
......@@ -711,7 +712,43 @@ def udf_parse_bs_category(asin_bs_sellers_rank_lower, last_herf, all_best_seller
break
# 2. 解析一级和当前 分类 + 排名
# 2.1 提取分类
# 2.1 先检查 node_id 是否在 href_list 中
cate_1_id, cate_current_id, cate_1_rank, cate_current_rank = None, None, None, None
if node_id and len(href_list) > 1:
node_id_str = str(node_id)
matched_idx = None
for i, href in enumerate(href_list):
if node_id_str in href: # 判断node_id是否在url中出现
matched_idx = i
break
if matched_idx is not None:
# 提取对应分类ID
cate_current_id = re.findall('bestsellers/(.*)/ref', href_list[matched_idx])
cate_current_id = cate_current_id[0].split("/")[-1] if cate_current_id else None
# 一级分类还是取第一个
cate_1_id = re.findall('bestsellers/(.*)/ref', href_list[0])
cate_1_id = cate_1_id[0].split("/")[0] if cate_1_id else None
# 解析排名
if asin_bs_sellers_rank_lower is not None:
asin_bs_sellers_rank_lower2 = asin_bs_sellers_rank_lower.replace(".", "").replace(",", "").replace(" 100 ", "")
else:
asin_bs_sellers_rank_lower2 = ''
rank_list = re.findall(cate_current_pattern, asin_bs_sellers_rank_lower2)
rank_list = [int(rank) for rank in rank_list]
# 如果 rank_list 长度和 href_list 对齐,则取对应位置的排名
if matched_idx < len(rank_list):
cate_current_rank = rank_list[matched_idx]
# 一级分类排名
if rank_list and cate_1_pattern in asin_bs_sellers_rank_lower:
cate_1_rank = rank_list[0]
return cate_1_id, cate_current_id, cate_1_rank, cate_current_rank
# 2.2 提取分类
if href_list:
if len(href_list) == 1:
cate_list = re.findall('bestsellers/(.*)/ref', href_list[0])
......@@ -735,7 +772,7 @@ def udf_parse_bs_category(asin_bs_sellers_rank_lower, last_herf, all_best_seller
else:
cate_1_id, cate_current_id = None, None
# 2.2 提取排名
# 2.3 提取排名
if asin_bs_sellers_rank_lower is not None:
asin_bs_sellers_rank_lower2 = asin_bs_sellers_rank_lower.replace(".", "").replace(",", "").replace(" 100 ", "")
else:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment