Commit 5c0104c0 by wangrui

Merge branch 'developer' of 47.106.101.75:abel_cjy/Amazon-Selection-Data into developer

parents 87f9f56d 7e1f38dc
.idea/
.venv/
.gitignore
Pyspark_job/utils/__pycache__/
Pyspark_job/ct
Pyspark_job/sqoop_ct
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="inheritedJdk" />
<content url="file://$MODULE_DIR$">
<sourceFolder url="file://$MODULE_DIR$/Pyspark_job" isTestSource="false" />
</content>
<orderEntry type="jdk" jdkName="Python 3.8 (Amazon-Selection)" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
<component name="PyDocumentationSettings">
......
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.9" project-jdk-type="Python SDK" />
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.8 (Amazon-Selection)" project-jdk-type="Python SDK" />
</project>
\ No newline at end of file
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.spark_util import SparkUtil
from pyspark.sql.functions import col
if __name__ == '__main__':
spark = SparkUtil.get_spark_session("ABA_2023_10_12_export")
sql1 = """
select
date_info,
search_term,
st_bsr_cate_1_id_new as category_id,
market_cycle_type,
is_first_text,
is_ascending_text,
is_high_return_text,
is_search_text,
st_movie_label,
st_brand_label,
bsr_orders,
st_word_num,
st_num,
rank
from dwt_aba_st_analytics
where site_name = 'us'
and date_type = 'month'
and date_info in ('2023-10','2023-11','2023-12');
"""
df_dwt_aba_st_analytics = spark.sql(sql1).cache()
sql2 = """
select
category_id,
en_name
from dim_bsr_category_tree
where site_name = 'us'
and category_parent_id = 0;
"""
df_dim_bsr_category_tree = spark.sql(sql2).cache()
sql3 = """
select
search_term,
rank_change_rate,
rank_rate_of_change,
date_info
from dwt_aba_last_change_rate
where site_name = 'us'
and date_type = 'month'
and date_info in ('2023-10','2023-11','2023-12');
"""
df_dwt_aba_last_change_rate = spark.sql(sql3).cache()
# 过滤出满足条件的词
df_dwt_aba_st_analytics = df_dwt_aba_st_analytics.filter(
"(is_first_text = 1) or (is_ascending_text = 1) or (market_cycle_type in (1, 2))"
)
df_save = df_dwt_aba_st_analytics.join(
df_dim_bsr_category_tree, on='category_id', how='left'
).join(
df_dwt_aba_last_change_rate, on=['date_info', 'search_term'], how='left'
)
df_save = df_save.select(
col('date_info').alias('year_month'),
col('search_term'),
col('en_name').alias('category'),
col('market_cycle_type'),
col('is_first_text'),
col('is_ascending_text'),
col('is_high_return_text'),
col('is_search_text'),
col('st_movie_label').alias('movie_label'),
col('st_brand_label').alias('brand_label'),
col('bsr_orders'),
col('st_word_num').alias('word_counts'),
col('st_num').alias('word_frequency'),
col('rank'),
col('rank_change_rate').alias('year_on_year'),
col('rank_rate_of_change').alias('month_on_month')
)
df_save.repartition(5).show(10, truncate=True)
df_save.write.saveAsTable(name='tmp_aba_2023_export', format='hive', mode='append')
spark.stop()
import os
import re
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.common_util import CommonUtil
from utils.spark_util import SparkUtil
from pyspark.sql.functions import count, explode, split, udf, lit
from pyspark.sql.types import ArrayType, StringType
if __name__ == '__main__':
date_info = CommonUtil.get_sys_arg(1, None)
spark = SparkUtil.get_spark_session("ABA_2023_10_12_word_frequency")
# 自定义函数,将词组拆分为2个单词为一组
def split_tow_by_tow(search_term):
words = search_term.split()
pairs = []
for i in range(len(words) - 1):
pairs.append(words[i] + ' ' + words[i + 1])
return pairs
u_split_tow_by_tow = udf(split_tow_by_tow, ArrayType(StringType()))
# 自定义函数,将词组拆分为3个单词为一组
def split_three_by_three(search_term):
words = search_term.split()
triplets = []
for i in range(len(words) - 2):
triplets.append(words[i] + ' ' + words[i + 1] + ' ' + words[i + 2])
return triplets
u_split_three_by_three = udf(split_three_by_three, ArrayType(StringType()))
# 自定义函数,剔除掉多余字符
def characters_to_remove(search_term):
pattern = r'\s[^\w\s%\']+?\s'
cleaned_text = re.sub(pattern, ' ', search_term)
cleaned_text = cleaned_text.replace('\n', ' ')
return cleaned_text
u_characters_to_remove = udf(characters_to_remove, StringType())
sql = f"""
select
search_term
from dwt_aba_st_analytics
where site_name = 'us'
and date_type = 'month'
and date_info = '{date_info}';
"""
df_aba = spark.sql(sql).cache()
df_aba = df_aba.select(
u_characters_to_remove(df_aba['search_term']).alias('search_term')
)
df_one_word = df_aba.select(
explode(split(df_aba['search_term'], ' ')).alias('word')
).groupby(
['word']
).agg(
count('word').alias('word_frequency')
).filter(
'word_frequency >= 50'
).withColumn(
'date_info',
lit(f'{date_info}-1')
)
df_tow_word = df_aba.select(
explode(u_split_tow_by_tow(df_aba['search_term'])).alias('word')
).groupby(
['word']
).agg(
count('word').alias('word_frequency')
).filter(
'word_frequency >= 50'
).withColumn(
'date_info',
lit(f'{date_info}-2')
)
df_three_word = df_aba.select(
explode(u_split_three_by_three(df_aba['search_term'])).alias('word')
).groupby(
['word']
).agg(
count('word').alias('word_frequency')
).filter(
'word_frequency >= 50'
).withColumn(
'date_info',
lit(f'{date_info}-3')
)
df_one_word.write.saveAsTable(name='tmp_word_frequency', format='hive', mode='append', partitionBy='date_info')
df_tow_word.write.saveAsTable(name='tmp_word_frequency', format='hive', mode='append', partitionBy='date_info')
df_three_word.write.saveAsTable(name='tmp_word_frequency', format='hive', mode='append', partitionBy='date_info')
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.spark_util import SparkUtil
from pyspark.sql.functions import count, explode, lit, desc, sum
from pyspark.sql.types import ArrayType, StringType
from textblob import Word
from googletrans import Translator
class ABA2023YearWordFrequency(object):
def __init__(self):
self.spark = SparkUtil.get_spark_session("spark_task: aba_2023_year_word_frequency")
self.df_aba_2023 = self.spark.sql(f"select 1+1;")
self.df_beside_category = self.spark.sql(f"select 1+1;")
self.df_translate = self.spark.sql(f"select 1+1;")
self.df_save = self.spark.sql(f"select 1+1;")
self.df_save1 = self.spark.sql(f"select 1+1;")
self.df_save2 = self.spark.sql(f"select 1+1;")
self.df_agg = self.spark.sql(f"select 1+1;")
# 自定义udf
self.u_get_singular_form = self.spark.udf.register('get_singular_form', self.get_singular_form, StringType())
self.u_word_tokenize = self.spark.udf.register('word_tokenize', self.word_tokenize, ArrayType(StringType()))
# self.u_word_translate = self.spark.udf.register('word_translate', self.word_translate, StringType())
@staticmethod
def get_singular_form(word: str):
"""
将单词全部转化为单数形式
"""
if word:
singular_form = Word(word).lemmatize("n")
# word_object = Word(word)
# singular_form = word_object.singularize()
return singular_form
return word
@staticmethod
def word_tokenize(title: str):
"""
分词器
"""
from nltk.tokenize import word_tokenize
result = word_tokenize(title, "english")
return result
# @staticmethod
# def word_translate(word: str):
# if word:
# try:
# translator = Translator()
# result = translator.translate(word, src='en', dest='zh-cn')
# return result.text
# except Exception as e:
# # 处理其他未知错误
# print(f"An unexpected error occurred: {e}")
# return None
# return None
def read_data(self):
sql1 = f"""
select
search_term,
category_id
from dwt_aba_last365
where site_name = 'us'
and date_type = 'last365day'
and date_info = '2023-12';
"""
self.df_aba_2023 = self.spark.sql(sql1).cache()
print("df_aba_2023的数量:")
print(self.df_aba_2023.count())
sql2 = f"""
select
category_id
from dim_bsr_category_tree
where site_name = 'us'
and en_name in ('Audible Books & Originals', 'Books', 'Kindle Store', 'Apps & Games', 'Movies & TV', 'CDs & Vinyl', 'Software', 'Video Games')
and category_parent_id = 0;
"""
self.df_beside_category = self.spark.sql(sql2).cache()
print("df_beside_category的数量:")
print(self.df_beside_category.count())
sql3 = f"""
select
word,
simple_cn as cn
from tmp_en_dict;
"""
self.df_translate = self.spark.sql(sql3).cache()
print("df_translate的数量:")
print(self.df_translate.count())
def handle_data(self):
self.df_save = self.df_aba_2023.join(
self.df_beside_category, on='category_id', how='left_anti'
).select('search_term')
self.df_save = self.df_save.select(explode(self.u_word_tokenize(self.df_save['search_term'])).alias('word'))
self.df_save = self.df_save.groupby(['word']).agg(
count('word').alias('word_frequency')
)
self.df_save = self.df_save.join(
self.df_translate, on='word', how='left'
).withColumn(
'word_singular_form',
self.u_get_singular_form(self.df_save['word'])
).cache()
self.df_save1 = self.df_save.select(
'word', 'word_frequency', 'cn'
).orderBy(
desc('word_frequency')
).withColumn(
'date_info',
lit('2023')
)
print("df_save1的数量:")
print(self.df_save1.count())
self.df_save1.write.saveAsTable(name='tmp_word_frequency', format='hive', mode='append', partitionBy='date_info')
print("df_save1存储完成!")
self.df_agg = self.df_save.groupby(['word_singular_form']).agg(
sum('word_frequency').alias('word_frequency')
)
self.df_save2 = self.df_save.select('word', 'cn', 'word_singular_form').join(
self.df_agg, on='word_singular_form', how='left'
).select(
'word', 'word_frequency', 'cn'
).orderBy(
desc('word_frequency')
).withColumn(
'date_info',
lit('2023-merge')
)
print("df_save2的数量:")
print(self.df_save2.count())
self.df_save2.write.saveAsTable(name='tmp_word_frequency', format='hive', mode='append', partitionBy='date_info')
print("df_save2存储完成!")
if __name__ == '__main__':
obj = ABA2023YearWordFrequency()
obj.read_data()
obj.handle_data()
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.DolphinschedulerHelper import DolphinschedulerHelper
from utils.common_util import CommonUtil
from utils.spark_util import SparkUtil
if __name__ == '__main__':
start_date = CommonUtil.get_sys_arg(1, None)
end_date = CommonUtil.get_sys_arg(2, None)
spark_session = SparkUtil.get_spark_session("re-run-aba-month")
sql = f"""
select distinct year_month as date_info from dim_date_20_to_30 where year_month >= '{start_date}' and year_month < '{end_date}';
"""
date_df = spark_session.sql(sql)
print(date_df.show())
date_list = sorted([d.asDict().get("date_info") for d in date_df.collect()])
print(date_list)
for date_info in date_list:
startParams = {
"site_name": "us",
"date_type": "month",
"date_info": date_info
}
print(startParams)
DolphinschedulerHelper.start_and_watch_process_instance(
"big_data_selection",
process_df_name='月-重跑ABA四分位',
startParams={
"site_name": "us",
"date_type": "month",
"date_info": date_info
}
)
CommonUtil.send_wx_msg(["huangjian", "chenyuanjie"], "【月-重跑ABA四分位】重跑完成", "")
pass
def asin_to_number(asin):
"""
Convert a 10-character ASIN string to a unique number.
This function assumes that ASIN consists of uppercase letters and digits.
"""
def char_to_number(char):
if char.isdigit():
return int(char)
else:
return ord(char) - 55 # 'A' -> 10, 'B' -> 11, ..., 'Z' -> 35
if len(asin) != 10:
raise ValueError("ASIN must be 10 characters long")
base = 36
asin_number = 0
for i, char in enumerate(reversed(asin)):
asin_number += char_to_number(char) * (base ** i)
# The final number is taken modulo 1 billion to fit the range 1-10 billion
return asin_number % 1000000000
if __name__ == '__main__':
x = asin_to_number('B0CGY4LZQ3')
print(x)
s = f'us_asin_image_part{int(x / 1000_0000) + 1}'
print(s)
import os
import sys
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from utils.templates import Templates
from utils.hdfs_utils import HdfsUtils
from utils.spark_util import SparkUtil
from pyspark.sql.window import Window
from pyspark.storagelevel import StorageLevel
from pyspark.sql import functions as F
class DwtMerchantwordsStDetailMerge(Templates):
def __init__(self, site_name='us'):
super().__init__()
self.site_name = site_name
self.batch = '2024-1'
self.db_save = 'dwt_merchantwords_st_detail_merge'
self.spark = self.create_spark_object(
app_name=f"DwtMerchantwordsStDetailMerge: {self.site_name}, {self.batch}")
self.partitions_num = 15
self.partitions_by = ['site_name', 'batch']
self.df = self.spark.sql(f"select 1+1;")
self.df_save = self.spark.sql(f"select 1+1;")
hdfs_path = f"/home/{SparkUtil.DEF_USE_DB}/dwt/{self.db_save}/site_name={self.site_name}/batch={self.batch}"
print(f"清除hdfs目录中.....{hdfs_path}")
HdfsUtils.delete_hdfs_file(hdfs_path)
def read_data(self):
print("读取dwt_merchantwords_st_detail数据")
sql = f"""
select
keyword,
volume,
avg_3m,
avg_12m,
depth,
results_count,
sponsored_ads_count,
page_1_reviews,
appearance,
last_seen,
update_time,
lang,
batch as last_batch
from dwt_merchantwords_st_detail
where site_name = '{self.site_name}'
and batch in ('2023-1', '2024-1');
"""
self.df = self.spark.sql(sqlQuery=sql)
self.df = self.df.repartition(80).persist(StorageLevel.MEMORY_ONLY)
self.df.show(10, truncate=True)
def handle_data(self):
window = Window.partitionBy('keyword').orderBy(
F.desc_nulls_last('last_batch')
)
self.df = self.df.withColumn("u_rank", F.row_number().over(window=window))
self.df = self.df.filter('u_rank=1').drop('u_rank')
self.df_save = self.df.withColumn(
'site_name',
F.lit(self.site_name)
).withColumn(
'batch',
F.lit(self.batch)
)
if __name__ == '__main__':
site_name = sys.argv[1]
handle_obj = DwtMerchantwordsStDetailMerge(site_name=site_name)
handle_obj.run()
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.hdfs_utils import HdfsUtils
from utils.common_util import CommonUtil
from utils.templates import Templates
from pyspark.sql import functions as F
class FlowAsinLast30days(Templates):
def __init__(self):
super().__init__()
self.db_save = "tmp_flow_asin_last30days"
self.spark = self.create_spark_object(app_name="FlowAsinLast30days")
self.partitions_num = 20
self.partition_dict = {}
self.df_es = self.spark.sql(f"select 1+1;")
self.df_parent = self.spark.sql(f"select 1+1;")
self.df_joined = self.spark.sql(f"select 1+1;")
self.df_save = self.spark.sql(f"select 1+1;")
def read_data(self):
self.df_es = self.spark.read.format("org.elasticsearch.spark.sql")\
.option("es.nodes", "192.168.10.217")\
.option("es.port", "9200")\
.option("es.net.http.auth.user", "elastic")\
.option("es.net.http.auth.pass", "selection2021.+")\
.option("es.resource", "us_st_detail_last_4_week")\
.option("es.query", '{"query": {"match_all": {}}}')\
.load()
columns = ["asin", "first_category_rank", "asin_bought_month", "total_comments", "variation_num", "site_name", "account_name"]
self.df_es = self.df_es.select(columns).cache()
self.df_es.show()
sql = f"""
select
asin,
parent_asin
from
ods_asin_variat;
"""
self.df_parent = self.spark.sql(sqlQuery=sql).cache()
def handle_data(self):
# self.df_parent = self.df_parent.groupby(["parent_asin"]).agg(F.count("asin").alias("variation_num"))
self.df_joined = self.df_es.join(self.df_parent, "asin", "left")
self.df_joined = self.df_joined\
.withColumn("parent_asin_is_null", F.when(F.col("parent_asin").isNull(), F.lit(1)).otherwise(F.lit(0)))\
.withColumn("parent_asin_exist", F.when(F.col("parent_asin").isNotNull(), F.lit(1)).otherwise(F.lit(0)))
def save_data(self):
self.df_save = self.df_joined
hdfs_path_asin_info = CommonUtil.build_hdfs_path(self.db_save, partition_dict=self.partition_dict)
print(f"清除hdfs目录中:{hdfs_path_asin_info}")
HdfsUtils.delete_file_in_folder(hdfs_path_asin_info)
print(f"当前存储的表名为:{self.db_save}")
self.df_save.write.saveAsTable(name=self.db_save, format='hive', mode='append')
print("success")
if __name__ == '__main__':
obj = FlowAsinLast30days()
obj.run()
\ No newline at end of file
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.DolphinschedulerHelper import DolphinschedulerHelper
from utils.common_util import CommonUtil
if __name__ == '__main__':
date_list = ["2024-02","2024-01","2023-12","2023-11","2023-10","2023-09"]
for date_info in date_list:
startParams = {
"site_name": "us",
"date_type": "month",
"date_info": date_info
}
print(startParams)
DolphinschedulerHelper.start_and_watch_process_instance(
"big_data_selection",
process_df_name='export_dwt_flow_asin_api',
startParams=startParams
)
CommonUtil.send_wx_msg(["chenyuanjie", "wangrui4"], "【export_dwt_flow_asin_api】导出完成", "")
pass
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.common_util import CommonUtil
from utils.spark_util import SparkUtil
from pyspark.sql.functions import row_number, lit
from pyspark.sql.window import Window
from pyspark.sql.types import StringType, ArrayType
from urllib.parse import quote
from datetime import datetime
if __name__ == '__main__':
date_info = CommonUtil.get_sys_arg(1, None)
n = CommonUtil.get_sys_arg(2, 0)
hive_tb = "dwt_merchantwords_st_detail"
export_tb = "us_merchantwords_search_term_month_syn_2024"
spark = SparkUtil.get_spark_session(f"export: {hive_tb}")
# 一次导出400w条数据
batch_size = (int(n)-1) * 4000000
start_index = 1 + batch_size
end_index = 4000000 + batch_size
# 构建 URL 的函数
def build_urls(search_term):
url_template = f"https://www.amazon.com/s?k={{search_term}}&page={{page_number}}"
search_term_chinese = quote(search_term, 'utf-8')
search_term_chinese = search_term_chinese.replace("'", '%27').replace("/", '%2F')
urls = [
url_template.format(
search_term=search_term_chinese.replace(' ', '+').replace('&', '%26').replace('#', '%23').replace('(',
'%28').replace(
')', '%29'), page_number=1),
url_template.format(
search_term=search_term_chinese.replace(' ', '+').replace('&', '%26').replace('#', '%23').replace('(',
'%28').replace(
')', '%29'), page_number=2),
url_template.format(
search_term=search_term_chinese.replace(' ', '+').replace('&', '%26').replace('#', '%23').replace('(',
'%28').replace(
')', '%29'), page_number=3)
]
return urls
# 将Python函数转换为UDF
spark.udf.register("build_urls", build_urls, ArrayType(StringType()))
# 从 PostgreSQL 数据库中读取已有数据
# df_pg = spark.read.format("jdbc") \
# .option("url", "jdbc:postgresql://192.168.10.225:5432/selection") \
# .option("dbtable", export_tb) \
# .option("user", "yswg_postgres") \
# .option("password", "yswg_postgres") \
# .load()
# df_pg = df_pg\
# .select("search_term") \
# .drop_duplicates(["search_term"]) \
# .repartition(70) \
# .cache()
# 从 Hive 表中读取数据
df_hive = spark.sql(f"SELECT keyword FROM {hive_tb}")
df_hive = df_hive\
.withColumn("row_num", row_number().over(Window.orderBy("keyword")))\
.filter(f"row_num BETWEEN {start_index} AND {end_index}")\
.select("keyword")\
.repartition(10) \
.cache()
# 过滤掉keyword含有中文的数据
df_hive = df_hive.filter(~df_hive["keyword"].rlike("[\u4e00-\u9fff]"))
# 过滤掉已存在于目标数据库中的数据
# df_hive = df_hive.join(df_pg, df_hive["keyword"] == df_pg["search_term"], "leftanti")
# 如果没有数据需要导出,退出循环
if df_hive.count() == 0:
print("-------数据已全部导出!-------")
quit()
df_hive = df_hive.selectExpr("keyword AS search_term")
df_hive = df_hive.selectExpr("search_term", "explode(build_urls(search_term)) AS url")
df_hive = df_hive.withColumn("date_info", lit(date_info))
# 导出数据到 PostgreSQL 数据库
df_hive.write.format("jdbc") \
.option("url", "jdbc:postgresql://192.168.10.225:5432/selection") \
.option("dbtable", export_tb) \
.option("user", "yswg_postgres") \
.option("password", "yswg_postgres") \
.mode("append") \
.save()
spark.stop()
\ No newline at end of file
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.common_util import CommonUtil
from utils.spark_util import SparkUtil
from pyspark.sql.functions import row_number, lit, length
from pyspark.sql.window import Window
from pyspark.sql.types import StringType, ArrayType
from urllib.parse import quote
if __name__ == '__main__':
date_info = CommonUtil.get_sys_arg(1, None)
n = CommonUtil.get_sys_arg(2, 0)
import_tb = "search_term_result_year"
export_tb = "us_merchantwords_search_term_month_syn_2024"
spark = SparkUtil.get_spark_session("MerchantwordsSRToPG16")
# 一次导出400w条数据
batch_size = (int(n)-1) * 4000000
start_index = 1 + batch_size
end_index = 4000000 + batch_size
# 构建 URL 的函数
def build_urls(search_term):
url_template = f"https://www.amazon.com/s?k={{search_term}}&page={{page_number}}"
search_term_chinese = quote(search_term, 'utf-8')
search_term_chinese = search_term_chinese.replace("'", '%27').replace("/", '%2F')
urls = [
url_template.format(
search_term=search_term_chinese.replace(' ', '+').replace('&', '%26').replace('#', '%23').replace('(',
'%28').replace(
')', '%29'), page_number=1),
url_template.format(
search_term=search_term_chinese.replace(' ', '+').replace('&', '%26').replace('#', '%23').replace('(',
'%28').replace(
')', '%29'), page_number=2),
url_template.format(
search_term=search_term_chinese.replace(' ', '+').replace('&', '%26').replace('#', '%23').replace('(',
'%28').replace(
')', '%29'), page_number=3)
]
return urls
# 将Python函数转换为UDF
spark.udf.register("build_urls", build_urls, ArrayType(StringType()))
# 从SR数据库中读取已有数据
df = spark.read.format("jdbc") \
.option("url", "jdbc:mysql://192.168.10.151:19030/test") \
.option("dbtable", import_tb) \
.option("user", "chenyuanjie") \
.option("password", "chenyuanjie12345") \
.load()
df = df.withColumn(
"row_num",
row_number().over(Window.orderBy("search_term"))
).filter(f"row_num BETWEEN {start_index} AND {end_index}").repartition(20).cache()
# 过滤掉keyword含有中文的数据
df = df.filter(~df["search_term"].rlike("[\u4e00-\u9fff]"))
# 如果没有数据需要导出,退出循环
if df.count() == 0:
print("-------数据已全部导出!-------")
quit()
df = df.selectExpr("search_term", "explode(build_urls(search_term)) AS url")
df = df.filter(length(df['url']) <= 450)
df = df.withColumn("date_info", lit(date_info))
# 导出数据到 PostgreSQL 数据库
df.write.format("jdbc") \
.option("url", "jdbc:postgresql://192.168.10.225:5432/selection") \
.option("dbtable", export_tb) \
.option("user", "yswg_postgres") \
.option("password", "yswg_postgres") \
.mode("append") \
.save()
spark.stop()
\ No newline at end of file
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.spark_util import SparkUtil
from utils.StarRocksHelper import StarRocksHelper
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils
if __name__ == '__main__':
spark = SparkUtil.get_spark_session("ods_asin_detail_sr_to_hive")
partition_dict = {
"site_name": 'us',
"date_type": 'month',
"date_info": '2024-03'
}
hdfs_path = CommonUtil.build_hdfs_path('ods_asin_detail_test', partition_dict=partition_dict)
HdfsUtils.delete_hdfs_file(hdfs_path)
connection_info = StarRocksHelper.get_connection_info('selection')
df_sr = spark.read.format("starrocks") \
.option("starrocks.fe.http.url", f"{connection_info['ip']}:{connection_info['http_port']}") \
.option("starrocks.fe.jdbc.url", f"jdbc:mysql://{connection_info['ip']}:{connection_info['jdbc_port']}") \
.option("starrocks.table.identifier", "test.ods_asin_detail_test2") \
.option("starrocks.user", connection_info['user']) \
.option("starrocks.password", connection_info['pwd']) \
.option("starrocks.request.tablet.size", "1") \
.option("starrocks.batch.size", "40960") \
.option("starrocks.exec.mem.limit", "21474836480") \
.load()
print("读取完毕")
df_sr.repartition(50)
partitions_by = ['site_name', 'date_type', 'date_info']
df_sr.write.saveAsTable(name='ods_asin_detail_test', format='hive', mode='append', partitionBy=partitions_by)
spark.stop()
# 创建lzo索引和修复元数据
CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb='ods_asin_detail_test')
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.spark_util import SparkUtil
from utils.StarRocksHelper import StarRocksHelper
if __name__ == '__main__':
spark = SparkUtil.get_spark_session("ods_asin_detail_to_sr_test")
sql = """
select
*
from ods_asin_detail
where site_name = 'us'
and date_type = 'month'
and date_info = '2024-03'
"""
df_hive = spark.sql(sql).repartition(40)
connection_info = StarRocksHelper.get_connection_info('selection')
df_hive.write.format("starrocks") \
.option("starrocks.fe.http.url", f"{connection_info['ip']}:{connection_info['http_port']}") \
.option("starrocks.fe.jdbc.url", f"jdbc:mysql://{connection_info['ip']}:{connection_info['jdbc_port']}") \
.option("starrocks.table.identifier", "test.ods_asin_detail_test") \
.option("starrocks.user", connection_info['user']) \
.option("starrocks.password", connection_info['pwd']) \
.option("starrocks.write.flush.interval.ms", "10000") \
.option("starrocks.write.properties.column_separator", "~!@#$%^&*~!@#$%^&*") \
.mode("append") \
.save()
print("导出完毕")
spark.stop()
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.DolphinschedulerHelper import DolphinschedulerHelper
from utils.common_util import CommonUtil
if __name__ == '__main__':
date_list = ['2022-02', '2022-03', '2022-04', '2022-05', '2022-06',
'2022-07', '2022-08', '2022-09', '2022-10', '2022-11', '2022-12']
for date_info in date_list:
print(f"当前执行的分区为:{date_info}")
success_flag = DolphinschedulerHelper.start_and_watch_process_instance(
"big_data_selection",
process_df_name='ABA品牌标签调整重跑_api',
startParams={
"site_name": "us",
"date_type": "month",
"date_info": date_info,
"wx_user": "chenyuanjie"
}
)
if success_flag:
continue
else:
CommonUtil.send_wx_msg(["chenyuanjie"], f"ABA品牌标签调整重跑_api {date_info} 执行失败")
break
CommonUtil.send_wx_msg(["chenyuanjie"], "ABA品牌标签调整重跑_api 2022年 执行结束")
pass
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.DolphinschedulerHelper import DolphinschedulerHelper
from utils.common_util import CommonUtil
if __name__ == '__main__':
date_list = ['2023-11', '2023-12']
for date_info in date_list:
print(f"当前执行的分区为:{date_info}")
success_flag = DolphinschedulerHelper.start_and_watch_process_instance(
"big_data_selection",
process_df_name='ABA品牌标签调整重跑_api',
startParams={
"site_name": "us",
"date_type": "month",
"date_info": date_info,
"wx_user": "chenyuanjie"
}
)
if success_flag:
continue
else:
CommonUtil.send_wx_msg(["chenyuanjie"], f"ABA品牌标签调整重跑_api {date_info} 执行失败")
break
CommonUtil.send_wx_msg(["chenyuanjie"], "ABA品牌标签调整重跑_api 2023年 执行结束")
pass
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.DolphinschedulerHelper import DolphinschedulerHelper
from utils.common_util import CommonUtil
if __name__ == '__main__':
date_list = ['2024-01', '2024-02', '2024-03', '2024-04',
'2024-05', '2024-06', '2024-07', '2024-08']
for date_info in date_list:
print(f"当前执行的分区为:{date_info}")
success_flag = DolphinschedulerHelper.start_and_watch_process_instance(
"big_data_selection",
process_df_name='ABA品牌标签调整重跑_api',
startParams={
"site_name": "us",
"date_type": "month",
"date_info": date_info,
"wx_user": "chenyuanjie"
}
)
if success_flag:
continue
else:
CommonUtil.send_wx_msg(["chenyuanjie"], f"ABA品牌标签调整重跑_api {date_info} 执行失败")
break
CommonUtil.send_wx_msg(["chenyuanjie"], "ABA品牌标签调整重跑_api 2024年 执行结束")
pass
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.common_util import CommonUtil
from utils.spark_util import SparkUtil
from pyspark.sql.functions import row_number, lit
from pyspark.sql.window import Window
from pyspark.sql.types import StringType, ArrayType
from urllib.parse import quote
from datetime import datetime
if __name__ == '__main__':
date_info = CommonUtil.get_sys_arg(1, None)
year, month, day = date_info.split("-")
table = f"us_merchantwords_brand_analytics_2024_{month}_{day}"
spark = SparkUtil.get_spark_session(f"us_merchantwords_brand_analytics_2024:pg2pg,{date_info}")
df = spark.read.format("jdbc") \
.option("url", "jdbc:postgresql://113.100.143.162:5432/selection") \
.option("dbtable", table) \
.option("user", "yswg_postgres") \
.option("password", "yswg_postgres") \
.load()
df.write.format("jdbc") \
.option("url", "jdbc:postgresql://113.100.143.162:5443/selection") \
.option("dbtable", table) \
.option("user", "yswg_postgres") \
.option("password", "yswg_postgres") \
.mode("append") \
.save()
spark.stop()
\ No newline at end of file
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.spark_util import SparkUtil
from pyspark.sql.functions import lit, col
from pyspark.sql.types import StringType, ArrayType
from urllib.parse import quote
if __name__ == '__main__':
export_tb = "de_merchantwords_search_term_month_syn_2024"
spark = SparkUtil.get_spark_session("MerchantwordsSupplement")
# 构建 URL 的函数
def build_urls(search_term):
url_template = f"https://www.amazon.de/s?k={{search_term}}&page={{page_number}}"
search_term_chinese = quote(search_term, 'utf-8')
search_term_chinese = search_term_chinese.replace("'", '%27').replace("/", '%2F')
urls = [
url_template.format(
search_term=search_term_chinese.replace(' ', '+').replace('&', '%26').replace('#', '%23').replace('(',
'%28').replace(
')', '%29'), page_number=1),
url_template.format(
search_term=search_term_chinese.replace(' ', '+').replace('&', '%26').replace('#', '%23').replace('(',
'%28').replace(
')', '%29'), page_number=2),
url_template.format(
search_term=search_term_chinese.replace(' ', '+').replace('&', '%26').replace('#', '%23').replace('(',
'%28').replace(
')', '%29'), page_number=3)
]
return urls
# 将Python函数转换为UDF
spark.udf.register("build_urls", build_urls, ArrayType(StringType()))
sql1 = """
select
keyword,
volume,
st_monthly_sales,
greatest(results_count, asin_total_num) as asin_total_num,
st_sp_counts,
st_zr_counts
from dwt_merchantwords_merge
where site_name = 'de'
and batch = '2024-07-01'
"""
df_dwt_merchantwords_merge = spark.sql(sql1)
# sql2 = """
# select
# keyword
# from dwt_merchantwords_st_detail
# where site_name = 'de'
# and batch = '2024-1'
# """
# df_dwt_merchantwords_st_detail = spark.sql(sql2)
# 产品总数大于80且没有月销
df1 = df_dwt_merchantwords_merge.filter('asin_total_num > 80 and st_monthly_sales <= 0').select('keyword')
print("产品总数大于80且没有月销:" + str(df1.count()))
# 搜索量较大且没有sp广告词
df2 = df_dwt_merchantwords_merge.filter('volume >= 1 and st_sp_counts <= 0').select('keyword')
print("搜索量较大且没有sp广告词:" + str(df2.count()))
# 自然词总数 <= 0的部分
df3 = df_dwt_merchantwords_merge.filter('st_zr_counts <= 0').select('keyword')
print("自然词总数 <= 0的部分:" + str(df3.count()))
# # 过滤掉keyword含有中文的数据
# df_hive = df_hive.filter(~df_hive["keyword"].rlike("[\u4e00-\u9fff]"))
df_save = df1.union(df2).union(df3).drop_duplicates(['keyword'])
df_save = df_save.selectExpr("keyword AS search_term")
df_save = df_save.selectExpr("search_term", "explode(build_urls(search_term)) AS url")
df_save = df_save.withColumn("date_info", lit('2024-06-26'))
# 导出数据到 PostgreSQL 数据库
df_save.write.format("jdbc") \
.option("url", "jdbc:postgresql://192.168.10.225:5433/selection_de") \
.option("dbtable", export_tb) \
.option("user", "yswg_postgres") \
.option("password", "yswg_postgres") \
.mode("append") \
.save()
spark.stop()
\ No newline at end of file
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.spark_util import SparkUtil
from pyspark.sql.functions import lit, col
from pyspark.sql.types import StringType, ArrayType
from urllib.parse import quote
if __name__ == '__main__':
export_tb = "us_merchantwords_search_term_month_syn_2024"
spark = SparkUtil.get_spark_session("MerchantwordsSupplement")
# 构建 URL 的函数
def build_urls(search_term):
url_template = f"https://www.amazon.com/s?k={{search_term}}&page={{page_number}}"
search_term_chinese = quote(search_term, 'utf-8')
search_term_chinese = search_term_chinese.replace("'", '%27').replace("/", '%2F')
urls = [
url_template.format(
search_term=search_term_chinese.replace(' ', '+').replace('&', '%26').replace('#', '%23').replace('(',
'%28').replace(
')', '%29'), page_number=1),
url_template.format(
search_term=search_term_chinese.replace(' ', '+').replace('&', '%26').replace('#', '%23').replace('(',
'%28').replace(
')', '%29'), page_number=2),
url_template.format(
search_term=search_term_chinese.replace(' ', '+').replace('&', '%26').replace('#', '%23').replace('(',
'%28').replace(
')', '%29'), page_number=3)
]
return urls
# 将Python函数转换为UDF
spark.udf.register("build_urls", build_urls, ArrayType(StringType()))
sql1 = """
select
keyword,
volume,
st_zr_counts,
st_sp_counts
from dwt_merchantwords_merge
where site_name = 'us'
and batch = '2024-07-01'
"""
df_dwt_merchantwords_merge = spark.sql(sql1)
# 搜索量较大且没有sp广告词
df1 = df_dwt_merchantwords_merge.filter('volume >= 1 and st_sp_counts <= 0').select('keyword')
print("搜索量较大且没有sp广告词:" + str(df1.count()))
# 自然词总数 <= 0的部分
df2 = df_dwt_merchantwords_merge.filter('st_zr_counts <= 0').select('keyword')
print("自然词总数 <= 0的部分:" + str(df2.count()))
# # 过滤掉keyword含有中文的数据
# df_hive = df_hive.filter(~df_hive["keyword"].rlike("[\u4e00-\u9fff]"))
df_save = df1.union(df2).drop_duplicates(['keyword'])
df_save = df_save.selectExpr("keyword AS search_term")
df_save = df_save.selectExpr("search_term", "explode(build_urls(search_term)) AS url")
df_save = df_save.withColumn("date_info", lit('2024-06-26'))
# 导出数据到 PostgreSQL 数据库
df_save.write.format("jdbc") \
.option("url", "jdbc:postgresql://192.168.10.225:5433/selection") \
.option("dbtable", export_tb) \
.option("user", "yswg_postgres") \
.option("password", "yswg_postgres") \
.mode("append") \
.save()
spark.stop()
\ No newline at end of file
import os
import sys
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from utils.DorisHelper import DorisHelper
from utils.spark_util import SparkUtil
from pyspark.sql import functions as F
if __name__ == '__main__':
spark = SparkUtil.get_spark_session('aba_to_doris_test')
sql = f"""
select *
from dwt_aba_last365
where site_name = 'us'
and date_type = 'month'
and date_info = '2024-10';
"""
df_aba = spark.sql(sql).drop('site_name', 'date_type').cache()
df_aba = df_aba.withColumn(
'date_info', F.concat(F.regexp_replace('date_info', '-', ''), F.lit('01'))
)
df_aba.show(10, True)
columns = df_aba.columns
columns_str = ",".join(columns)
DorisHelper.spark_export_with_columns(df_aba, 'test', 'dwt_aba_last365', columns_str)
print('导出完成')
from openai import OpenAI
api_key = "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"
client = OpenAI(api_key=api_key)
completion = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{
"role": "user",
"content": "Write a haiku about recursion in programming."
}
]
)
print(completion.choices[0].message)
import requests
response = requests.post(
f"https://api.stability.ai/v2beta/stable-image/generate/ultra",
headers={
"authorization": f"sk-f2iOAkResIloOY3yE6xk2LlQbVrtQi3EczZDjA3n9ns7bmeR",
"accept": "image/*"
},
files={"none": ''},
data={
"prompt": "A little cat is in a bedroom with a bed, TV, and sofa",
"output_format": "webp",
},
)
if response.status_code == 200:
with open("./cat01.webp", 'wb') as file:
file.write(response.content)
else:
raise Exception(str(response.json()))
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils
from utils.spark_util import SparkUtil
from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType, StringType, StructType, StructField, BooleanType, MapType
"""
merchantwords 搜索词分词词频
"""
def is_number(str):
"""
判断一个字符是否是数字
:param str:
:return:
"""
import re
return re.match(r"^-?\d+\.?\d+$", str) is not None
def word_tokenize(keyword: str):
import re
keyword = re.sub(r'(\d+\.?\d*|-|\"|,|,|?|\?|/|、|)', '', keyword).strip()
from nltk.tokenize import word_tokenize
result = word_tokenize(keyword, "english")
# 过滤标点如下
filter_arr = [
" ", "\t", "\r", "\n", "(", ")", ",", ",", "[", "]", "、", "-", ":", "&", "|", "+", "``", "'", "'", "\""
]
return list(filter(lambda x: not is_number(x) and x not in filter_arr, result))
def run():
spark = SparkUtil.get_spark_session("app_name")
udf_word_tokenize = F.udf(word_tokenize, ArrayType(StringType()))
keywords_all = spark.sql("select keyword from dwt_merchantwords_st_detail where site_name='us'").cache()
df_all = keywords_all.withColumn("word", F.explode(udf_word_tokenize(F.col("keyword"))))
df_all = df_all.groupby(F.col("word")) \
.agg(F.count("word").alias("frequency")) \
.orderBy(F.col("frequency").desc()) \
.select(
F.col("word"),
F.col("frequency"),
F.lit("us").alias("site_name")
)
hive_tb = 'tmp_word_frequency'
# # 去重
partition_dict = {
"site_name": "us"
}
hdfs_path = CommonUtil.build_hdfs_path(hive_tb, partition_dict)
HdfsUtils.delete_hdfs_file(hdfs_path)
partition_by = list(partition_dict.keys())
print(f"当前存储的表名为:{hive_tb},分区为{partition_by}", )
df_all.write.saveAsTable(name=hive_tb, format='hive', mode='append', partitionBy=partition_by)
def word_pluralize(keyword: str):
from textblob import Word
# 单数形式
singularize = Word(keyword).singularize().string
# 复数形式
pluralize = Word(singularize).pluralize().string
result = {
"text": keyword,
"singularize": singularize,
"pluralize": pluralize,
"pluralizeFlag": keyword == pluralize,
"not_regular": keyword not in [singularize, pluralize]
}
return result
def word_stem(keyword: str):
from nltk.stem.snowball import SnowballStemmer
stemmer = SnowballStemmer("english", ignore_stopwords=False)
return stemmer.stem(keyword)
def word_test():
spark = SparkUtil.get_spark_session("word_test")
udf_word_pluralize = F.udf(word_pluralize, StructType(
[
StructField('text', StringType(), True),
StructField('singularize', StringType(), True),
StructField('pluralize', StringType(), True),
StructField('pluralizeFlag', BooleanType(), True),
StructField('not_regular', BooleanType(), True),
]
))
udf_word_stem = F.udf(word_stem, StringType())
keywords_all = spark.sql("select word,frequency from tmp_word_frequency").cache()
keywords_all = keywords_all.withColumn("resultMap", udf_word_pluralize(F.col("word"))).select(
F.col("word"),
F.col("frequency"),
F.col("resultMap").getField("singularize").alias("singularize"),
F.col("resultMap").getField("pluralize").alias("pluralize"),
F.col("resultMap").getField("pluralizeFlag").alias("pluralizeFlag"),
F.col("resultMap").getField("not_regular").alias("not_regular"),
).where("(pluralizeFlag == true) or (not_regular == true)")
# 计算词根
keywords_all = keywords_all.withColumn("word_stem", udf_word_stem(F.col("word")))
keywords_all = keywords_all.withColumn("singularize_stem", udf_word_stem(F.col("singularize")))
keywords_all = keywords_all.withColumn("pluralize_stem", udf_word_stem(F.col("pluralize")))
hive_tb = 'tmp_word_not_regular_v2'
keywords_all.write.saveAsTable(name=hive_tb, format='hive', mode='append')
print("success")
def word_for_download():
spark = SparkUtil.get_spark_session("word_for_calc")
keywords_all = spark.sql("""
select word
from tmp_for_market
order by volume desc
""")
CommonUtil.df_export_csv(spark, keywords_all, csv_name='word_for_calc', limit=200 * 10000)
print("success")
pass
if __name__ == '__main__':
# word_for_calc()
word_for_download()
print("success")
import os
import sys
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from pyspark.sql.types import StringType
from utils.templates import Templates
from google.cloud import translate_v2 as translate
class Test(Templates):
def __init__(self):
super().__init__()
self.spark = self.create_spark_object(app_name=f"test")
self.df_st = self.spark.sql(f"select 1+1;")
self.translate_client = translate.Client()
# 自定义udf
self.u_translate_text = self.spark.udf.register('translate_text', self.translate_text, StringType())
def translate_text(self, word: str, target_language='zh'):
result = self.translate_client.translate(word, target_language=target_language)
return result['translatedText']
def read_data(self):
sql1 = f"""
select
search_term
from dwt_aba_last365
where site_name = 'us'
and date_type = 'last365day'
and date_info = '2023-12';
"""
self.df_st = self.spark.sql(sql1).limit(20).cache()
def handle_data(self):
self.df_st = self.df_st.withColumn(
'translate_text',
self.u_translate_text(self.df_st['search_term'])
)
self.df_st.show(20, False)
if __name__ == '__main__':
handle_obj = Test()
handle_obj.run()
import os
import sys
import json
sys.path.append(os.path.dirname(sys.path[0]))
from utils.db_util import DBUtil
from utils.common_util import CommonUtil
from utils.spark_util import SparkUtil
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
date_type = CommonUtil.get_sys_arg(2, None)
date_info = CommonUtil.get_sys_arg(3, None)
assert site_name is not None, "site_name 不能为空!"
assert date_type is not None, "date_type 不能为空!"
assert date_info is not None, "date_info 不能为空!"
hive_table = f"dwt_flow_asin"
partition_dict = {
"site_name": site_name,
"date_type": date_type,
"date_info": date_info
}
# 获取计算分区
msg_params = ""
# 解析partition_dict获取分区查询条件
partition_conditions = []
for key, value in partition_dict.items():
if value is not None:
msg_params += f"{value} "
partition_conditions.append(f"{key} = '{value}'")
base_msg = f"{hive_table} {msg_params} "
site_name = partition_dict.get("site_name")
date_type = partition_dict.get("date_type")
spark_session = SparkUtil.get_spark_sessionV3("check_fields_rule")
# 获取维护的字段验证配置表数据
config_table_query = f"""select * from hive_field_verify_config
where table_name ='{hive_table}'
and site_name = '{site_name}'
and use_flag = 1 """
conn_info = DBUtil.get_connection_info('postgresql', 'us')
check_field_df = SparkUtil.read_jdbc_query(
session=spark_session,
url=conn_info["url"],
pwd=conn_info["pwd"],
username=conn_info["username"],
query=config_table_query
)
# 获取验证消息
check_field_list = check_field_df.select('field_name', 'verify_desc', 'verify_type', 'config_json',
'msg_usr_list').collect()
if not check_field_list:
print("============================无验证匹配条件跳过验证===================================")
exit()
# 创建一个df用于储存验证情况
# 定义列的结构
schema = StructType([
StructField("验证描述", StringType(), True),
StructField("验证类型", StringType(), True),
StructField("校验字段", StringType(), True),
StructField("校验条件查询占比", StringType(), True),
StructField("验证占比临界值上限", StringType(), True),
StructField("验证占比临界值下限", StringType(), True),
StructField("是否验证通过", IntegerType(), True),
])
# 使用定义的结构创建空的 DataFrame
check_df = spark_session.createDataFrame([], schema)
# 进行验证sql组装
query = f"""
SELECT COUNT(1) AS total_count
FROM {hive_table}
"""
# 拼接where条件
if partition_conditions:
query_total = query + f" WHERE {' AND '.join(partition_conditions)}"
# 执行sql获取验证值与df
total_df = spark_session.sql(query_total).cache()
total_count = int(total_df.collect()[0]['total_count'])
for row in check_field_list:
vertify_flag = True
field_name = row['field_name']
verify_type = row['verify_type']
config_json = json.loads(row['config_json'])
msg_usr = row['msg_usr_list']
msg_usr_list = [user.strip() for user in msg_usr.split(",")] if msg_usr else []
sql_condition = config_json['sql_condition']
partition_conf_list = config_json['partition_conf']
for conf in partition_conf_list:
conf_site_name = conf["site_name"]
conf_date_type = conf["date_type"]
if site_name == conf_site_name and date_type == conf_date_type:
vertify_flag = True
break
else:
vertify_flag = False
# 没有合适的匹配维度
if not vertify_flag:
break
# 拼接外部查询条件
if sql_condition:
query_field_check = query_total + f" AND {sql_condition} "
check_count_df = spark_session.sql(query_field_check).cache()
check_count = int(check_count_df.collect()[0]['total_count'])
calcult_rate = round((check_count / total_count), 3)
waring_max = conf['max_rate']
waring_min = conf['min_rate']
verify_flag = 1 if (calcult_rate <= waring_max) and (calcult_rate >= waring_min) else 0
ratio_df = spark_session.createDataFrame([(row['verify_desc'],verify_type,field_name,calcult_rate,waring_max,waring_min,verify_flag)],schema).repartition(1)
check_df = check_df.unionByName(ratio_df, False)
if check_df.count() < 1 :
print("无验证项验证")
exit()
check_df.show(50, truncate=False)
# 对校验结果进行判断是否有校验不通过的数据
schema_flag = bool(check_df.select(F.min("是否验证通过").alias("result")).first().asDict()['result'])
if not schema_flag:
msg = f"数据表:{hive_table} {msg_params},计算数据存在验证不通过,请检查数据是否异常!!具体信息请查看日志!!"
CommonUtil.send_wx_msg(['chenjianyun'], f"\u26A0 {hive_table} {msg_params}流程数据导出前验证异常", msg)
spark_session.stop()
pass
\ No newline at end of file
def word_tokenize(title: str):
"""
分词器
"""
from nltk.tokenize import word_tokenize
result = word_tokenize(title, "english")
return result
if __name__ == '__main__':
aba = "nation's bravest tales of courage and heroism"
print(word_tokenize(aba))
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.spark_util import SparkUtil
if __name__ == '__main__':
export_tb = "de_brand_analytics_month"
spark = SparkUtil.get_spark_session("update_de_brand_analytics_month_2024_05")
sql1 = """
select
search_term
from ods_st_quantity_being_sold
where site_name = 'de'
and date_type = 'month'
and date_info = '2024-05'
and quantity_being_sold in (16, 48)
"""
df_aba = spark.sql(sql1)
sql2 = """
select
search_term,
quantity_being_sold
from dwt_merchantwords_merge
where site_name = 'de'
"""
df_me = spark.sql(sql2)
df_save = df_aba.join(
df_me, on='search_term', how='inner'
)
# 导出数据到 PostgreSQL 数据库
df_save.write.format("jdbc") \
.option("url", "jdbc:postgresql://192.168.10.223:5433/selection_de") \
.option("dbtable", export_tb) \
.option("user", "yswg_postgres") \
.option("password", "yswg_postgres") \
.mode("append") \
.save()
spark.stop()
\ No newline at end of file
import os
import sys
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from pyspark.sql.functions import row_number
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from utils.common_util import CommonUtil
from utils.spark_util import SparkUtil
if __name__ == '__main__':
spark = SparkUtil.get_spark_session("UpdateMerchantwords")
hive_tb = 'dwd_merchantwords_measure'
partition_dict = {
"site_name": 'us',
"batch": '2023-01'
}
sql1 = f"""
select
keyword,
lang,
st_ao_val,
st_zr_flow_proportion,
min_bid,
max_bid,
suggested_bid,
volume,
avg_3m,
avg_12m,
asin_total_num,
asin_num,
self_asin_num,
self_asin_proportion,
st_sp_counts,
st_zr_counts,
st_monthly_sales,
listing_sales_avg,
reviews_avg,
rating_avg,
price_avg,
depth
from dwd_merchantwords_measure
where site_name = 'us'
and batch = '2023-01';
"""
df_dwd = spark.sql(sqlQuery=sql1).cache()
df_dwd.repartition(80)
sql2 = f"""
select
keyword,
results_count,
sponsored_ads_count,
page_1_reviews,
appearance,
last_seen,
update_time
from dwt_merchantwords_st_detail
where site_name = 'us'
and batch = '2023-1';
"""
df_merchantwords_detail = spark.sql(sqlQuery=sql2)
df_merchantwords_detail = df_merchantwords_detail \
.withColumn("row_num", row_number().over(Window.orderBy("keyword"))) \
.filter("row_num BETWEEN 1 AND 12000000") \
.repartition(80) \
.drop("row_num") \
.cache()
df = df_dwd.join(df_merchantwords_detail, 'keyword', 'left')
df = df.withColumn(
'site_name',
F.lit('us')
).withColumn(
'batch',
F.lit('2023-01')
)
CommonUtil.save_or_update_table(spark_session=spark,
hive_tb_name=hive_tb,
partition_dict=partition_dict,
df_save=df,
drop_exist_tmp_flag=True)
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.spark_util import SparkUtil
from pyspark.sql.functions import count, col
class WordFrequency(object):
def __init__(self):
self.spark = SparkUtil.get_spark_session("us_aba_last365_word_frequency")
def run(self):
sql1 = f"""
select search_term, date_info
from dwt_aba_st_analytics
where site_name = 'us'
and date_type = 'month'
and date_info in
('2024-10', '2024-09', '2024-08', '2024-07', '2024-06', '2024-05',
'2024-04', '2024-03', '2024-02', '2024-01', '2023-12', '2023-11')
and rank <= 1000000
and st_brand_label = 1;
"""
df_st = self.spark.sql(sql1).cache()
print("df_st数量是:")
print(df_st.count())
sql2 = f"""
select search_term, first_match_brand as brand, date_info
from dws_st_brand_info
where site_name = 'us'
and date_type = 'month'
and date_info in
('2024-10', '2024-09', '2024-08', '2024-07', '2024-06', '2024-05',
'2024-04', '2024-03', '2024-02', '2024-01', '2023-12', '2023-11')
and st_brand_label = 1;
"""
df_brand = self.spark.sql(sql2).cache()
print("df_brand数量是:")
print(df_brand.count())
df_save = df_st.join(
df_brand, on=['date_info', 'search_term'], how='left'
).drop('date_info')
print("df_save数量是:")
print(df_save.count())
df_save = df_save.groupby(['brand']).agg(
count('brand').alias('frequency')
).orderBy('frequency', ascending=False)
df_save.show(20, False)
df_save = df_save.withColumn("frequency", col("frequency").cast("int"))
total_sum = df_save.select("frequency").groupBy().sum().collect()[0][0]
if total_sum == df_st.count():
print('验证成功')
else:
print('验证失败')
output_path = "hdfs:///user/chenyuanjie/test1/"
df_save.write.mode("overwrite").format("csv").option("delimiter", "^").option("lineSep", "\n").option("header", "false").option("compression", "none").save(output_path)
if __name__ == '__main__':
obj = WordFrequency()
obj.run()
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils
if __name__ == '__main__':
hive_tb = "tmp_us_st_keepa_syn_2024"
hdfs_path = "/home/big_data_selection/tmp/tmp_us_st_keepa_syn_2024"
print(f"hdfs_path is {hdfs_path}")
query = f"""
select
asin
from us_st_keepa_syn_2024
where 1 = 1
and \$CONDITIONS
"""
db_type = "postgresql"
empty_flag, check_flag = CommonUtil.check_schema_before_import(db_type=db_type,
site_name='us',
query=query,
hive_tb_name=hive_tb,
msg_usr=['chenyuanjie']
)
assert check_flag, f"导入hive表{hive_tb}表结构检查失败!请检查query是否异常!!"
if not empty_flag:
sh = CommonUtil.build_import_sh(site_name='us',
db_type=db_type,
query=query,
hdfs_path=hdfs_path)
# 导入前先删除
HdfsUtils.delete_hdfs_file(hdfs_path)
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, sh, ignore_err=False)
CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_tb)
client.close()
pass
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.spark_util import SparkUtil
from pyspark.sql.functions import col, lit
from utils.StarRocksHelper import StarRocksHelper
if __name__ == '__main__':
spark = SparkUtil.get_spark_session("us_st_keepa_syn_2024_export")
# 从SR数据库中读取已有数据
sql = """
select distinct asin from selection.us_asin_latest_detail where date_info = '2024-06'
"""
df_sr = StarRocksHelper.spark_import_with_sql(spark, sql).repartition(80, 'asin').cache()
print("starrocks读取:")
df_sr.show(10)
sql = """
select asin from tmp_us_st_keepa_syn_2024;
"""
df_pg = spark.sql(sql).drop_duplicates(['asin']).repartition(80, 'asin').cache()
print("pg读取:")
df_pg.show(10)
df = df_sr.subtract(df_pg)
df_sr.unpersist()
df_pg.unpersist()
df = df.withColumn(
'state',
lit(7)
).withColumn(
'asin_trun_4',
col('asin').substr(1, 4)
)
df.show(10)
print(df.count())
df.write.format("jdbc") \
.option("url", "jdbc:postgresql://192.168.10.224:5433/selection") \
.option("dbtable", "us_st_keepa_syn_2024") \
.option("user", "yswg_postgres") \
.option("password", "yswg_postgres") \
.mode("append") \
.save()
spark.stop()
import os
import sys
from sqlalchemy.dialects.postgresql import pypostgresql
sys.path.append(os.path.dirname(sys.path[0]))
from utils.spark_util import SparkUtil
from utils.db_util import DBUtil
from utils.StarRocksHelper import StarRocksHelper
if __name__ == '__main__':
spark = SparkUtil.get_spark_session("us_st_keepa_syn_2024_export")
# 从SR数据库中读取已有数据
sql = """
select distinct asin from selection.us_asin_latest_detail where date_info = '2024-06' and (asin_launch_time>'2024-07-19' or asin_launch_time<'1990-01-01')
"""
df_sr = StarRocksHelper.spark_import_with_sql(spark, sql).repartition(80, 'asin').cache()
print("starrocks读取:")
df_sr.show(10)
sql = """
select asin from tmp_us_st_keepa_syn_2024;
"""
df_pg = spark.sql(sql).drop_duplicates(['asin']).repartition(80, 'asin').cache()
print("pg读取:")
df_pg.show(10)
df = df_sr.subtract(df_pg)
print(df.count())
df_sr.unpersist()
df_pg.unpersist()
update_asin = df.select("asin").rdd.map(lambda row: row[0]).collect()
print(update_asin)
pg_engine = DBUtil.get_db_engine('postgresql', 'us')
with pg_engine.begin() as conn:
update_query = f"""
UPDATE us_st_keepa_syn_2024 SET state = 5 WHERE asin IN {tuple(update_asin)}
"""
conn.execute(update_query)
spark.stop()
import os
import sys
from pyspark.sql.types import ArrayType, FloatType, StructType, StructField, StringType
sys.path.append(os.path.dirname(sys.path[0]))
from utils.common_util import CommonUtil
from utils.spark_util import SparkUtil
from pyspark.sql import functions as F
class VerifyRank(object):
def __init__(self):
self.spark = SparkUtil.get_spark_session("{self.__class__.__name__}")
def run(self):
sql = f"""
select
search_term,
rank,
date_info
from ods_brand_analytics
where site_name = 'us'
and date_type = 'week'
and date_info >= '2024-01'
and rank < 100000
"""
df_all = self.spark.sql(sql).repartition(40, 'search_term').cache()
def leave_one_out_means(structs):
ranks = [x['rank'] for x in structs]
date_infos = [x['date_info'] for x in structs]
total_sum = sum(ranks)
n = len(ranks)
if n > 1:
means = [round((total_sum - rank) / (n - 1), 2) for rank in ranks]
else:
means = [ranks[0]]
result = [{"means": mean, "date_info": date_info} for mean, date_info in zip(means, date_infos)]
return result
leave_one_out_means_udf = F.udf(leave_one_out_means, ArrayType(StructType([
StructField("means", FloatType(), True),
StructField("date_info", StringType(), True)
])))
df_agg = df_all.groupBy("search_term").agg(
F.collect_list(F.struct("rank", "date_info")).alias("collect_row")
# F.collect_list("rank").alias("values")
)
df_agg = df_agg.withColumn(
"collect_row", leave_one_out_means_udf(F.col("collect_row"))
)
def calc_quantiles(structs):
values = [x['means'] for x in structs]
values = sorted(values) # 将组内的数值进行排序
n = len(values)
# 计算 Q1 和 Q3 的位置(基于 25% 和 75% 的位置)
q1_index = int(n * 0.25)
q3_index = int(n * 0.75)
if n > 1:
q1 = values[q1_index]
q3 = values[q3_index]
else:
q1 = values[0]
q3 = values[0]
return [float(q1), float(q3)]
quantile_udf = F.udf(calc_quantiles, ArrayType(FloatType()))
df_agg = df_agg.withColumn(
"quantiles", quantile_udf(F.col("collect_row"))
).withColumn(
"q1", F.col("quantiles")[0]
).withColumn(
"q3", F.col("quantiles")[1]
).withColumn(
"iqr", F.expr("q3 - q1")
).withColumn(
"lower_bound", F.expr("q1 - 100 * iqr")
).withColumn(
"upper_bound", F.expr("q3 + 100 * iqr")
).select(
'search_term', 'collect_row', 'lower_bound', 'upper_bound'
).repartition(40, 'search_term')
df_save = df_agg.withColumn(
"filtered_collect_row",
F.filter(
"collect_row",
lambda x: (x["means"] < F.col("lower_bound")) | (x["means"] > F.col("upper_bound"))
)
).filter(
F.size(F.col("filtered_collect_row")) > 0
).withColumn(
"has_2024_08",
F.exists(
"filtered_collect_row",
lambda x: x["date_info"].like("2024-08%")
)
).filter(
~F.col("has_2024_08") # 过滤掉包含 '2024-08' 的行
).select(
'search_term', 'filtered_collect_row', 'lower_bound', 'upper_bound'
)
df_save.show(20, False)
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
date_type = CommonUtil.get_sys_arg(2, None)
date_info = CommonUtil.get_sys_arg(3, None)
obj = VerifyRank()
obj.run()
......@@ -11,7 +11,7 @@ dev_work_place = "/opt/module/spark/demo/py_demo"
def get_git_work_place(branch_name: str) -> str:
if branch_name == 'develop':
if branch_name == 'developer':
return "/root/git_work_place"
usrName = CommonUtil.safeIndex(branch_name.split("/"), 1, None)
assert usrName is not None, "根据分支获取用户名失败!"
......@@ -32,7 +32,7 @@ def crlf_2_lf(full_path):
def zip_yswgutils_to_hdfs():
git_work_place = get_git_work_place("develop")
git_work_place = get_git_work_place("developer")
dist_src = f"{git_work_place}/Pyspark_job"
work_place = "/tmp"
os.chdir(work_place)
......@@ -77,7 +77,7 @@ def git_update(branch_name):
if len(os.listdir(work_place)) <= 0:
cmds = [
'git init',
'git remote add origin http://47.106.101.75/selection/Amazon-Selection.git',
'git remote add origin http://47.106.101.75/abel_cjy/Amazon-Selection-Data.git',
'git config core.sparseCheckout true',
f'echo "{src}" >> .git/info/sparse-checkout',
'git pull --depth=1 origin master',
......@@ -98,7 +98,7 @@ def git_update(branch_name):
if output.startswith("There is no tracking information for the current branch."):
update_flag = False
if update_flag and branch_name == 'develop':
if update_flag and branch_name == 'developer':
# 备份原始目录
cmd = f"cp -r {dev_work_place} {dev_work_place}_back"
os.popen(cmd)
......@@ -156,8 +156,8 @@ def zip_yswgutils_to_hdfs_local():
if __name__ == '__main__':
branch_name = CommonUtil.get_sys_arg(1, "develop")
branch_name = CommonUtil.get_sys_arg(1, "developer")
update_flag = git_update(branch_name)
# 更新分区
if update_flag and branch_name == 'develop':
if update_flag and branch_name == 'developer':
zip_yswgutils_to_hdfs()
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
assert site_name is not None, "site_name 不能为空!"
# hive_tb = "tmp_asin_state"
#
# partition_dict = {
# "site_name": site_name
# }
#
# hdfs_path = CommonUtil.build_hdfs_path(hive_tb , partition_dict)
# print(f"hdfs_path is {hdfs_path}")
#
# query = f"""
# select
# asin,
# state,
# created_at,
# updated_at,
# 3 as flag
# from us_all_syn_st_history_2022
# where 1 = 1
# and \$CONDITIONS
# """
# print(query)
# db_type = "mysql"
# empty_flag, check_flag = CommonUtil.check_schema_before_import(db_type=db_type,
# site_name=site_name,
# query=query,
# hive_tb_name=hive_tb,
# msg_usr=['chenyuanjie']
# )
#
# if not empty_flag:
# sh = CommonUtil.build_import_sh_v2(site_name=site_name,
# db_type=db_type,
# query=query,
# hdfs_path=hdfs_path,
# map_num=15,
# key="state"
# )
#
# client = SSHUtil.get_ssh_client()
# SSHUtil.exec_command_async(client, sh, ignore_err=False)
# CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_tb)
# client.close()
#
# # 导入后检测--检测数据一致性
# CommonUtil.check_import_sync_num(db_type=db_type,
# partition_dict=partition_dict,
# import_query=query,
# hive_tb_name=hive_tb,
# msg_usr=['chenyuanjie']
# )
# 导出到pg数据库
db_type = "postgresql"
export_tb = "us_all_syn_st_asin"
sh = CommonUtil.build_export_sh(
site_name=site_name,
db_type=db_type,
hive_tb="tmp_asin_state_copy",
export_tb=export_tb,
col=[
"asin",
"state"
],
partition_dict={
"site_name": site_name
}
)
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, sh, ignore_err=False)
client.close()
pass
\ No newline at end of file
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
if __name__ == '__main__':
# 导出到pg数据库
db_type = "postgresql_cluster"
export_tb = "de_st_month_2022_9_old"
sh = CommonUtil.build_export_sh(
site_name="de",
db_type=db_type,
hive_tb="tmp_st_month_2110_2208",
export_tb=export_tb,
col=[
"week",
"asin",
"search_term",
"ao_val",
"orders",
"orders_sum",
"flow",
"order_flow",
"search_num",
"search_rank",
"quantity_being_sold",
"adv_compet",
"zr_page_rank",
"zr_page",
"zr_page_row",
"sp_page",
"sp_page_rank",
"sp_page_row",
"sb1_page",
"sb2_page",
"sb3_page",
"ac_page",
"bs_page",
"er_page",
"tr_page",
"search_term_type",
"created_at",
"updated_at"
],
partition_dict={
"site_name": "de",
"year_month": "2022-9-old"
}
)
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, sh, ignore_err=False)
client.close()
pass
\ No newline at end of file
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
from utils.db_util import DBUtil
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
date_type = CommonUtil.get_sys_arg(2, None)
date_info = CommonUtil.get_sys_arg(3, None)
print(f"执行参数为{sys.argv}")
db_type = "postgresql"
print("导出到PG库中")
year_str = CommonUtil.safeIndex(date_info.split("-"), 0, None)
suffix = str(date_info).replace("-", "_")
base_tb = f"{site_name}_aba_last_top_asin"
export_master_tb = f"{base_tb}_{year_str}"
export_tb = f"{base_tb}_{suffix}"
next_month = CommonUtil.get_month_offset(date_info, 1)
engine = DBUtil.get_db_engine(db_type, site_name)
with engine.connect() as connection:
sql = f"""
drop table if exists {export_tb};
create table if not exists {export_tb}
(
like {export_master_tb} including comments
);
"""
print("================================执行sql================================")
print(sql)
connection.execute(sql)
# 导出表名
sh = CommonUtil.build_export_sh(
site_name=site_name,
db_type=db_type,
hive_tb="dwt_st_top_asin_info",
export_tb=export_tb,
col=[
"site_name",
"search_term_id",
"search_term",
"asin",
"date_info",
"data_type",
"zr_rank",
"created_time",
"updated_time"
],
partition_dict={
"site_name": site_name,
"date_type": date_type,
"date_info": date_info
}
)
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, sh, ignore_err=False)
client.close()
# 创建索引并交换分区
DBUtil.add_pg_part(
engine,
source_tb_name=export_tb,
part_master_tb=export_master_tb,
part_val={
"from": [date_info],
"to": [next_month]
},
cp_index_flag=True,
)
print("success")
print("success")
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
year_month = CommonUtil.get_sys_arg(2, None)
assert site_name is not None, "site_name 不能为空!"
assert year_month is not None, "year_month 不能为空!"
year,month = year_month.split("-")
hive_tb = "tmp_st_month_2110_2208"
partition_dict = {
"site_name": site_name,
"year_month": year_month
}
hdfs_path = CommonUtil.build_hdfs_path(hive_tb,partition_dict)
print(f"hdfs_path is {hdfs_path}")
query = f"""
select
week,
asin,
search_term,
ao_val,
orders,
orders_sum,
flow,
order_flow,
search_num,
search_rank,
quantity_being_sold,
adv_compet,
zr_page_rank,
zr_page,
zr_page_row,
sp_page,
sp_page_rank,
sp_page_row,
sb1_page,
sb2_page,
sb3_page,
ac_page,
bs_page,
er_page,
tr_page,
search_term_type,
created_at,
updated_at,
id
from {site_name}_st_month_{year}_{month}
where 1 = 1
and \$CONDITIONS
"""
print(query)
db_type = "mysql"
empty_flag, check_flag = CommonUtil.check_schema_before_import(db_type=db_type,
site_name=site_name,
query=query,
hive_tb_name=hive_tb,
msg_usr=['chenyuanjie']
)
if not empty_flag:
sh = CommonUtil.build_import_sh_v2(site_name=site_name,
db_type=db_type,
query=query,
hdfs_path=hdfs_path,
map_num=10,
key="id"
)
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, sh, ignore_err=False)
CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_tb)
client.close()
# 导入后检测--检测数据一致性
CommonUtil.check_import_sync_num(db_type=db_type,
partition_dict=partition_dict,
import_query=query,
hive_tb_name=hive_tb,
msg_usr=['chenyuanjie']
)
pass
\ No newline at end of file
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
year_month = CommonUtil.get_sys_arg(2, None)
assert site_name is not None, "site_name 不能为空!"
assert year_month is not None, "year_month 不能为空!"
year,month = year_month.split("-")
# 导出到pg数据库
db_type = "postgresql_cluster"
export_tb = f"{site_name}_st_month_{year}_{month}"
sh = CommonUtil.build_export_sh(
site_name=site_name,
db_type=db_type,
hive_tb="tmp_st_month_2110_2208",
export_tb=export_tb,
col=[
"week",
"asin",
"search_term",
"ao_val",
"orders",
"orders_sum",
"flow",
"order_flow",
"search_num",
"search_rank",
"quantity_being_sold",
"adv_compet",
"zr_page_rank",
"zr_page",
"zr_page_row",
"sp_page",
"sp_page_rank",
"sp_page_row",
"sb1_page",
"sb2_page",
"sb3_page",
"ac_page",
"bs_page",
"er_page",
"tr_page",
"search_term_type",
"created_at",
"updated_at"
],
partition_dict={
"site_name": site_name,
"year_month": year_month
}
)
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, sh, ignore_err=False)
client.close()
pass
\ No newline at end of file
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
year_month = CommonUtil.get_sys_arg(2, None)
assert site_name is not None, "site_name 不能为空!"
assert year_month is not None, "year_month 不能为空!"
year,month = year_month.split("-")
hive_tb = "tmp_st_month_2209_2303"
partition_dict = {
"site_name": site_name,
"year_month": year_month
}
hdfs_path = CommonUtil.build_hdfs_path(hive_tb,partition_dict)
print(f"hdfs_path is {hdfs_path}")
query = f"""
select
id,
search_term,
st_ao_val,
st_type,
st_rank,
st_rank_avg,
st_search_num,
st_search_rate,
st_search_sum,
st_adv_counts,
st_quantity_being_sold,
asin,
asin_st_zr_orders,
asin_st_zr_orders_sum,
asin_st_zr_flow,
asin_st_sp_orders,
asin_st_sp_orders_sum,
asin_st_sp_flow,
st_asin_zr_page,
st_asin_zr_page_row,
st_asin_zr_page_rank,
st_asin_zr_updated_at,
st_asin_sp_page,
st_asin_sp_page_rank,
st_asin_sp_page_row,
st_asin_sp_updated_at,
st_asin_sb1_page,
st_asin_sb1_updated_at,
st_asin_sb2_page,
st_asin_sb2_updated_at,
st_asin_sb3_page,
st_asin_sb3_updated_at,
st_asin_ac_page,
st_asin_ac_updated_at,
st_asin_bs_page,
st_asin_bs_updated_at,
st_asin_er_page,
st_asin_er_updated_at,
st_asin_tr_page,
st_asin_tr_updated_at,
created_at,
updated_at
from {site_name}_st_month_{year}_{month}
where 1 = 1
and \$CONDITIONS
"""
print(query)
db_type = "mysql"
empty_flag, check_flag = CommonUtil.check_schema_before_import(db_type=db_type,
site_name=site_name,
query=query,
hive_tb_name=hive_tb,
msg_usr=['chenyuanjie']
)
if not empty_flag:
sh = CommonUtil.build_import_sh_v2(site_name=site_name,
db_type=db_type,
query=query,
hdfs_path=hdfs_path,
map_num=10,
key="id"
)
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, sh, ignore_err=False)
CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_tb)
client.close()
# 导入后检测--检测数据一致性
CommonUtil.check_import_sync_num(db_type=db_type,
partition_dict=partition_dict,
import_query=query,
hive_tb_name=hive_tb,
msg_usr=['chenyuanjie']
)
pass
\ No newline at end of file
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
year_month = CommonUtil.get_sys_arg(2, None)
assert site_name is not None, "site_name 不能为空!"
assert year_month is not None, "year_month 不能为空!"
year,month = year_month.split("-")
# 导出到pg数据库
db_type = "postgresql_cluster"
export_tb = f"{site_name}_st_month_{year}_{month}"
sh = CommonUtil.build_export_sh(
site_name=site_name,
db_type=db_type,
hive_tb="tmp_st_month_2209_2303",
export_tb=export_tb,
col=[
"search_term",
"st_ao_val",
"st_type",
"st_rank",
"st_rank_avg",
"st_search_num",
"st_search_rate",
"st_search_sum",
"st_adv_counts",
"st_quantity_being_sold",
"asin",
"asin_st_zr_orders",
"asin_st_zr_orders_sum",
"asin_st_zr_flow",
"asin_st_sp_orders",
"asin_st_sp_orders_sum",
"asin_st_sp_flow",
"st_asin_zr_page",
"st_asin_zr_page_row",
"st_asin_zr_page_rank",
"st_asin_zr_updated_at",
"st_asin_sp_page",
"st_asin_sp_page_rank",
"st_asin_sp_page_row",
"st_asin_sp_updated_at",
"st_asin_sb1_page",
"st_asin_sb1_updated_at",
"st_asin_sb2_page",
"st_asin_sb2_updated_at",
"st_asin_sb3_page",
"st_asin_sb3_updated_at",
"st_asin_ac_page",
"st_asin_ac_updated_at",
"st_asin_bs_page",
"st_asin_bs_updated_at",
"st_asin_er_page",
"st_asin_er_updated_at",
"st_asin_tr_page",
"st_asin_tr_updated_at",
"created_at",
"updated_at"
],
partition_dict={
"site_name": site_name,
"year_month": year_month
}
)
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, sh, ignore_err=False)
client.close()
pass
\ No newline at end of file
if __name__ == '__main__':
num = int('2024-05'.split('-')[-1])
print(num)
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
year_month = CommonUtil.get_sys_arg(2, None)
assert site_name is not None, "site_name 不能为空!"
assert year_month is not None, "year_month 不能为空!"
year,month = year_month.split("-")
hive_tb = "tmp_st_month_2110_2208"
partition_dict = {
"site_name": site_name,
"year_month": year_month
}
hdfs_path = CommonUtil.build_hdfs_path(hive_tb,partition_dict)
print(f"hdfs_path is {hdfs_path}")
query = f"""
select
week,
asin,
search_term,
ao_val,
orders,
orders_sum,
flow,
order_flow,
search_num,
search_rank,
quantity_being_sold,
adv_compet,
zr_page_rank,
zr_page,
zr_page_row,
sp_page,
sp_page_rank,
sp_page_row,
sb1_page,
sb2_page,
sb3_page,
ac_page,
bs_page,
er_page,
tr_page,
search_term_type,
created_at,
updated_at,
id
from {site_name}_st_month_{year}_{month}
where 1 = 1
and \$CONDITIONS
"""
print(query)
db_type = "mysql"
empty_flag, check_flag = CommonUtil.check_schema_before_import(db_type=db_type,
site_name=site_name,
query=query,
hive_tb_name=hive_tb,
msg_usr=['chenyuanjie']
)
if not empty_flag:
sh = CommonUtil.build_import_sh_v2(site_name=site_name,
db_type=db_type,
query=query,
hdfs_path=hdfs_path,
)
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, sh, ignore_err=False)
CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_tb)
client.close()
# 导入后检测--检测数据一致性
CommonUtil.check_import_sync_num(db_type=db_type,
partition_dict=partition_dict,
import_query=query,
hive_tb_name=hive_tb,
msg_usr=['chenyuanjie']
)
pass
\ No newline at end of file
import json
import subprocess
from datetime import datetime, time
import sys
from pyspark.sql import SparkSession
from Pyspark_job.utils import common_util
from Pyspark_job.utils import DolphinschedulerHelper
from yswg_utils.common_df import get_asin_unlanuch_df
from Pyspark_job.utils.spark_util import SparkUtil
import script.pg14_to_pg6 as sc
from Pyspark_job.script import post_to_dolphin
import subprocess
if __name__ == '__main__':
# date_info = '2023_34'
# table_names = f"us_search_term_rank_er_{date_info}," \
# f"us_search_term_rank_hr_{date_info},us_search_term_rank_tr_{date_info},us_other_search_term_{date_info}," \
# f"us_brand_analytics_{date_info}"
# post_to_dolphin.DolphinschedulerHelper.start_process_instance('us', '2023-34', table_names, 'aba')
str.upper("seatunnel")
\ No newline at end of file
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils
from utils.db_util import DBUtil
if __name__ == '__main__':
site_name = "us"
hive_tb = f"tmp_asin_image"
partition_dict = {
"site_name": "us14",
}
hdfs_path = CommonUtil.build_hdfs_path(hive_tb, partition_dict=partition_dict)
print(f"hdfs_path is {hdfs_path}")
query = f"""
select
asin,
img_url,
img_order_by,
created_at,
updated_at,
data_type
from {site_name}_asin_image_pyb_copy
where 1 = 1
and \$CONDITIONS
"""
print(query)
db_type = "postgresql_14"
empty_flag, check_flag = CommonUtil.check_schema_before_import(db_type=db_type,
site_name=site_name,
query=query,
hive_tb_name=hive_tb,
msg_usr=['chenyuanjie']
)
if not empty_flag:
sh = CommonUtil.build_import_sh(site_name=site_name,
db_type=db_type,
query=query,
hdfs_path=hdfs_path,
map_num=10,
key='id')
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, sh, ignore_err=False)
CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_tb)
client.close()
#导入后检测--检测数据一致性
CommonUtil.check_import_sync_num(db_type=db_type,
partition_dict=partition_dict,
import_query=query,
hive_tb_name=hive_tb,
msg_usr=['chenyuanjie']
)
# # 导出到pg数据库
# db_type = "postgresql"
# export_tb = f"{site_name}_asin_image_copy"
#
# # 导出表名
# sh = CommonUtil.build_export_sh(
# site_name=site_name,
# db_type=db_type,
# hive_tb="tmp_asin_image_copy",
# export_tb=export_tb,
# col=[
# "asin",
# "img_url",
# "img_order_by",
# "created_at",
# "updated_at",
# "data_type"
# ],
# partition_dict={
# "site_name": site_name
# }
# )
# client = SSHUtil.get_ssh_client()
# SSHUtil.exec_command_async(client, sh, ignore_err=False)
# client.close()
pass
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils
from utils.db_util import DBUtil
if __name__ == '__main__':
site_name = "de"
# 导出到pg数据库
db_type = "postgresql"
export_tb = "de_asin_image_copy"
# 导出表名
sh = CommonUtil.build_export_sh(
site_name="de",
db_type=db_type,
hive_tb="tmp_asin_image_lzo",
export_tb=export_tb,
col=[
"asin",
"img_url",
"img_order_by",
"created_at",
"updated_at",
"data_type"
],
partition_dict={
"site_name": "de"
}
)
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, sh, ignore_err=False)
client.close()
pass
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils
from utils.db_util import DBUtil
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
assert site_name is not None, "site_name 不能为空!"
# hive_tb = f"tmp_asin_b09"
# partition_dict = {
# "site_name": site_name,
# }
# hdfs_path = CommonUtil.build_hdfs_path(hive_tb, partition_dict=partition_dict)
# print(f"hdfs_path is {hdfs_path}")
#
# query = f"""
# select
# asin,
# price,
# rating,
# total_comments,
# page_inventory,
# `rank`,
# img_num,
# ao_val,
# bsr_orders,
# sales,
# data_at,
# created_at,
# updated_at,
# year_week,
# '{site_name}' as site_name
# from {site_name}_asin_b09
# where 1 = 1
# and \$CONDITIONS
# """
# print(query)
# db_type = "mysql"
# empty_flag, check_flag = CommonUtil.check_schema_before_import(db_type=db_type,
# site_name=site_name,
# query=query,
# hive_tb_name=hive_tb,
# msg_usr=['chenyuanjie']
# )
#
# if not empty_flag:
# sh = CommonUtil.build_import_sh(site_name=site_name,
# db_type=db_type,
# query=query,
# hdfs_path=hdfs_path)
# # 导入前先删除
# HdfsUtils.delete_hdfs_file(hdfs_path)
# client = SSHUtil.get_ssh_client()
# SSHUtil.exec_command_async(client, sh, ignore_err=False)
# CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_tb)
# client.close()
#
# # 导入后检测--检测数据一致性
# CommonUtil.check_import_sync_num(db_type=db_type,
# partition_dict=partition_dict,
# import_query=query,
# hive_tb_name=hive_tb,
# msg_usr=['chenyuanjie']
# )
# 导出到pg数据库
db_type = "postgresql"
export_tb = f"{site_name}_asin_b09"
sh = CommonUtil.build_export_sh(
site_name=site_name,
db_type=db_type,
hive_tb="tmp_asin_b09",
export_tb=export_tb,
col=[
"asin",
"price",
"rating",
"total_comments",
"page_inventory",
"rank",
"img_num",
"ao_val",
"bsr_orders",
"sales",
"data_at",
"created_at",
"updated_at",
"year_week"
],
partition_dict={
"site_name": site_name,
}
)
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, sh, ignore_err=False)
client.close()
pass
\ No newline at end of file
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils
from utils.db_util import DBUtil
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
assert site_name is not None, "site_name 不能为空!"
# hive_tb = f"tmp_asin_detail_trend_month"
#
# partition_dict = {
# "site_name": site_name,
# }
# hdfs_path = CommonUtil.build_hdfs_path(hive_tb, partition_dict=partition_dict)
# print(f"hdfs_path is {hdfs_path}")
#
# query = f"""
# select
# asin,
# ym,
# rank_rise,
# rank_change,
# ao_rise,
# ao_change,
# price_rise,
# price_change,
# orders_rise,
# orders_change,
# rating_rise,
# rating_change,
# comments_rise,
# comments_change,
# bsr_orders_rise,
# bsr_orders_change,
# sales_rise,
# sales_change,
# variation_num,
# variation_rise,
# variation_change,
# created_at,
# updated_at
# from {site_name}_asin_detail_trend_month
# where 1 = 1
# and \$CONDITIONS
# """
# print(query)
# db_type = "mysql"
# empty_flag, check_flag = CommonUtil.check_schema_before_import(db_type=db_type,
# site_name=site_name,
# query=query,
# hive_tb_name=hive_tb,
# msg_usr=['chenyuanjie']
# )
#
# if not empty_flag:
# sh = CommonUtil.build_import_sh(site_name=site_name,
# db_type=db_type,
# query=query,
# hdfs_path=hdfs_path)
#
# client = SSHUtil.get_ssh_client()
# SSHUtil.exec_command_async(client, sh, ignore_err=False)
# CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_tb)
# client.close()
#
# # 导入后检测--检测数据一致性
# CommonUtil.check_import_sync_num(db_type=db_type,
# partition_dict=partition_dict,
# import_query=query,
# hive_tb_name=hive_tb,
# msg_usr=['chenyuanjie']
# )
# 导出到pg数据库
db_type = "postgresql"
export_tb = f"{site_name}_asin_detail_trend_month"
sh = CommonUtil.build_export_sh(
site_name=site_name,
db_type=db_type,
hive_tb="tmp_asin_detail_trend_month",
export_tb=export_tb,
col=[
"asin",
"ym",
"rank_rise",
"rank_change",
"ao_rise",
"ao_change",
"price_rise",
"price_change",
"orders_rise",
"orders_change",
"rating_rise",
"rating_change",
"comments_rise",
"comments_change",
"bsr_orders_rise",
"bsr_orders_change",
"sales_rise",
"sales_change",
"variation_num",
"variation_rise",
"variation_change",
"created_at",
"updated_at"
],
partition_dict={
"site_name": site_name,
}
)
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, sh, ignore_err=False)
client.close()
pass
\ No newline at end of file
import os
import sys
from pyspark.storagelevel import StorageLevel
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from utils.templates import Templates
# from ..utils.templates import Templates
# from AmazonSpider.pyspark_job.utils.templates import Templates
from pyspark.sql.types import StringType
# 分组排序的udf窗口函数
from pyspark.sql.window import Window
from pyspark.sql import functions as F
class AsinState(Templates):
def __init__(self):
super().__init__()
self.site_name = "us"
self.db_save = f"tmp_asin_state_copy"
self.spark = self.create_spark_object(app_name=f"{self.db_save}: {self.site_name}")
self.df_save = self.spark.sql(f"select 1+1;")
self.df = self.spark.sql(f"select 1+1;")
self.partitions_by = ['site_name']
self.reset_partitions(partitions_num=1)
def read_data(self):
sql = f"""
select
asin,
state,
updated_at,
flag,
site_name
from
tmp_asin_state
where
site_name = 'us';
"""
self.df = self.spark.sql(sqlQuery=sql).cache()
def handle_data(self):
df_window = Window.partitionBy(["asin"]).orderBy(self.df.flag.asc(), self.df.updated_at.desc())
self.df = self.df.withColumn("rk", F.row_number().over(window=df_window))
self.df_save = self.df.filter("rk = 1")
self.df_save = self.df_save.drop("flag").drop("rk").drop("updated_at")
if __name__ == "__main__":
handle_obj = AsinState()
handle_obj.run()
import os
import sys
from pyspark.storagelevel import StorageLevel
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from utils.templates import Templates
# from ..utils.templates import Templates
# from AmazonSpider.pyspark_job.utils.templates import Templates
from pyspark.sql.types import StringType
# 分组排序的udf窗口函数
from pyspark.sql.window import Window
from pyspark.sql import functions as F
class DeAsinImage(Templates):
def __init__(self):
super().__init__()
self.site_name = "de"
self.db_save = f"tmp_asin_image_copy"
self.spark = self.create_spark_object(app_name=f"{self.db_save}: {self.site_name}")
self.df_save = self.spark.sql(f"select 1+1;")
self.df1 = self.spark.sql(f"select 1+1;")
self.df2 = self.spark.sql(f"select 1+1;")
self.df = self.spark.sql(f"select 1+1;")
self.partitions_by = ['site_name']
self.reset_partitions(partitions_num=1)
def read_data(self):
sql1 = f"""
select
*,
1 as flag
from
tmp_asin_image
where
site_name = 'de1';
"""
sql2 = f"""
select
*,
2 as flag
from
tmp_asin_image
where
site_name = 'de';
"""
self.df1 = self.spark.sql(sqlQuery=sql1).cache()
self.df2 = self.spark.sql(sqlQuery=sql2).cache()
def handle_data(self):
self.df = self.df1.unionAll(self.df2)
df_window = Window.partitionBy(["asin"]).orderBy(self.df.flag.asc())
self.df = self.df.withColumn("rk",F.dense_rank().over(window=df_window))
self.df_save = self.df.filter("rk = 1")
self.df_save = self.df_save.drop("flag").drop("rk")
if __name__ == "__main__":
handle_obj = DeAsinImage()
handle_obj.run()
\ No newline at end of file
import os
import sys
from pyspark.storagelevel import StorageLevel
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from utils.templates import Templates
# from ..utils.templates import Templates
# from AmazonSpider.pyspark_job.utils.templates import Templates
from pyspark.sql.types import StringType
# 分组排序的udf窗口函数
from pyspark.sql.window import Window
from pyspark.sql import functions as F
class UsAsinImage(Templates):
def __init__(self):
super().__init__()
self.site_name = "uk"
self.db_save = f"tmp_asin_image_lzo"
self.spark = self.create_spark_object(app_name=f"{self.db_save}: {self.site_name}")
self.df_save = self.spark.sql(f"select 1+1;")
self.df1 = self.spark.sql(f"select 1+1;")
self.df2 = self.spark.sql(f"select 1+1;")
self.df = self.spark.sql(f"select 1+1;")
self.partitions_by = ['site_name']
self.reset_partitions(partitions_num=1)
def read_data(self):
sql1 = f"""
select
*,
1 as flag
from
tmp_asin_image_copy
where
site_name = 'us14'
limit 10;
"""
sql2 = f"""
select
*,
2 as flag
from
tmp_asin_image_copy
where
site_name = 'us6'
limit 10;
"""
self.df1 = self.spark.sql(sqlQuery=sql1).cache()
self.df2 = self.spark.sql(sqlQuery=sql2).cache()
self.df = self.df1.unionAll(self.df2)
df_window = Window.partitionBy(["asin"]).orderBy(self.df.flag.desc())
self.df = self.df.withColumn("rk",F.dense_rank().over(window=df_window))
self.df_save = self.df.filter("rk = 1")
self.df_save.withColumn("site_name",F.lit("us"))
self.df_save = self.df_save.drop("flag").drop("rk").show()
if __name__ == "__main__":
obj = UsAsinImage()
obj.read_data()
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils
from utils.db_util import DBUtil
if __name__ == '__main__':
site_name = "us"
hive_tb = f"tmp_asin_image"
partition_dict = {
"site_name": "us6",
}
hdfs_path = CommonUtil.build_hdfs_path(hive_tb, partition_dict=partition_dict)
print(f"hdfs_path is {hdfs_path}")
query = f"""
select
asin,
img_url,
img_order_by,
created_at,
updated_at,
data_type
from {site_name}_asin_image
where 1 = 1
and \$CONDITIONS
"""
print(query)
db_type = "postgresql"
empty_flag, check_flag = CommonUtil.check_schema_before_import(db_type=db_type,
site_name=site_name,
query=query,
hive_tb_name=hive_tb,
msg_usr=['chenyuanjie']
)
if not empty_flag:
sh = CommonUtil.build_import_sh(site_name=site_name,
db_type=db_type,
query=query,
hdfs_path=hdfs_path)
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, sh, ignore_err=False)
CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_tb)
client.close()
#导入后检测--检测数据一致性
CommonUtil.check_import_sync_num(db_type=db_type,
partition_dict=partition_dict,
import_query=query,
hive_tb_name=hive_tb,
msg_usr=['chenyuanjie']
)
# # 导出到pg数据库
# db_type = "postgresql"
# export_tb = f"{site_name}_asin_image_copy"
#
# # 导出表名
# sh = CommonUtil.build_export_sh(
# site_name=site_name,
# db_type=db_type,
# hive_tb="tmp_asin_image_copy",
# export_tb=export_tb,
# col=[
# "asin",
# "img_url",
# "img_order_by",
# "created_at",
# "updated_at",
# "data_type"
# ],
# partition_dict={
# "site_name": site_name
# }
# )
# client = SSHUtil.get_ssh_client()
# SSHUtil.exec_command_async(client, sh, ignore_err=False)
# client.close()
pass
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils
from utils.db_util import DBUtil
if __name__ == '__main__':
site_name = "us"
#
# hive_tb = "tmp_bs_category_asin"
#
# hdfs_path = CommonUtil.build_hdfs_path(hive_tb)
# print(f"hdfs_path is {hdfs_path}")
#
# query = """
# select
# asin,
# cate_1_id,
# cate_current_id,
# week,
# `year_month`,
# created_at,
# updated_at
# from us_bs_category_asin
# where 1 = 1
# and \$CONDITIONS
# """
# print(query)
# db_type = "mysql"
# empty_flag, check_flag = CommonUtil.check_schema_before_import(db_type=db_type,
# site_name=site_name,
# query=query,
# hive_tb_name=hive_tb,
# msg_usr=['chenyuanjie']
# )
#
# if not empty_flag:
# sh = CommonUtil.build_import_sh(site_name=site_name,
# db_type=db_type,
# query=query,
# hdfs_path=hdfs_path)
# # 导入前先删除
# HdfsUtils.delete_hdfs_file(hdfs_path)
# client = SSHUtil.get_ssh_client()
# SSHUtil.exec_command_async(client, sh, ignore_err=False)
# CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_tb)
# client.close()
#
# # 导入后检测--检测数据一致性
# CommonUtil.check_import_sync_num(db_type=db_type,
# import_query=query,
# hive_tb_name=hive_tb,
# msg_usr=['chenyuanjie']
# )
# 导出到pg数据库
db_type = "postgresql"
export_tb = "us_bs_category_asin"
sh = CommonUtil.build_export_sh(
site_name=site_name,
db_type=db_type,
hive_tb="tmp_bs_category_asin",
export_tb=export_tb,
col=[
"asin",
"cate_1_id",
"cate_current_id",
"week",
"year_month",
"created_at",
"updated_at"
],
partition_dict={
"site_name": site_name,
}
)
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, sh, ignore_err=False)
client.close()
pass
\ No newline at end of file
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils
from utils.db_util import DBUtil
if __name__ == '__main__':
# 导出到pg数据库
db_type = "postgresql"
export_tb = f"us_st_year_week"
sh = CommonUtil.build_export_sh(
site_name="us",
db_type=db_type,
hive_tb="dim_st_year_week",
export_tb=export_tb,
col=[
"search_term",
"st_key",
"year_week"
],
partition_dict={
"site_name": "us",
}
)
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, sh, ignore_err=False)
client.close()
pass
\ No newline at end of file
import os
import tkinter
import zipfile
from datetime import datetime
from tkinter.messagebox import *
from Pyspark_job.utils.hdfs_utils import HdfsUtils
import paramiko
ssh_host = "hadoop5"
ssh_port = 22
ssh_user = "root"
ssh_pwd = "LrmkEqypH4ZV4S4jA3gq3tSRTNsp2gpjqupLDM5K"
def crlf_2_lf(full_path):
"""
sh windows脚本转换为 unix分隔符
:param full_path:
:return:
"""
WINDOWS_LINE_ENDING = b'\r\n'
UNIX_LINE_ENDING = b'\n'
with open(full_path, 'rb') as open_file:
content = open_file.read()
return content.replace(WINDOWS_LINE_ENDING, UNIX_LINE_ENDING)
def handle_zip(dir_name, filename):
zfName = filename
result_path = os.path.join(os.getcwd(), zfName)
try:
os.remove(result_path)
except:
pass
foo = zipfile.ZipFile(zfName, 'w')
for root, dirs, files in os.walk(dir_name):
for f in files:
full_path = os.path.join(root, f)
zip_path = full_path[len(dir_name) + 1:]
if full_path.endswith("sh"):
foo.writestr(zinfo_or_arcname=zip_path, data=crlf_2_lf(full_path))
pass
else:
foo.write(full_path, zip_path)
foo.close()
print("压缩文件成功!!")
print(f"压缩文件目录为{result_path}")
return result_path
def put_and_unzip(local_file, remote_dir):
window = tkinter.Tk()
window.withdraw()
result = askquestion('确认', f'是否确认部署到目录{remote_dir}')
if (result == 'no'):
return
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(hostname=ssh_host, port=ssh_port, username=ssh_user, password=ssh_pwd)
print("连接远程服务器成功...")
# 时间后缀
suffix = datetime.now().strftime("%m_%d_%H_%M")
cmd = f"mv {remote_dir} {remote_dir}_back_{suffix}"
print("执行命令中")
client.exec_command(cmd)
cmd = f"mkdir {remote_dir}"
client.exec_command(cmd)
print(f"备份远程目录{remote_dir}下的文件中.....")
sftp = client.open_sftp()
file_name = local_file[local_file.rfind("\\") + 1:]
remote_path = f"{remote_dir}/{file_name}"
print(f"上传文件【{local_file}】到远程【{remote_path}】中...")
sftp.put(local_file, remote_path)
print("上传成功!!")
cmd = f""" cd {remote_dir} && unzip -d {remote_dir} {file_name}"""
client.exec_command(cmd)
print(f"解压远程压缩文件{remote_path}中.....")
cmd = f"rm -rf {remote_path}"
client.exec_command(cmd)
print(f"删除远程压缩文件{remote_path}中.....")
client.close()
print("success")
pass
def replace_file(local_file, remote_file):
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(hostname=ssh_host, port=ssh_port, username=ssh_user, password=ssh_pwd)
print("连接远程服务器成功...")
sftp = client.open_sftp()
print(f"上传文件【{local_file}】到远程【{remote_file}】中...")
sftp.put(local_file, remote_file)
print("上传成功!!")
pass
def zip_yswgutils_to_hdfs():
here = os.path.abspath(os.path.dirname(__file__))
src = os.path.join(here, "Pyspark_job")
# 定死
filename = "yswg_utils.zip"
result_path = os.path.join(here, filename)
if os.path.exists(result_path):
os.remove(result_path)
foo = zipfile.ZipFile(filename, 'w')
for root, dirs, files in os.walk(src):
for f in files:
full_path = os.path.join(root, f)
zip_path = full_path[len(src) + 1:]
if full_path.endswith("sh"):
foo.writestr(zinfo_or_arcname=zip_path, data=crlf_2_lf(full_path))
pass
else:
foo.write(full_path, zip_path)
foo.close()
print("上传环境到hdfs中.................")
hdfs_path = f"/lib/{filename}"
client = HdfsUtils.get_hdfs_cilent()
client.delete(hdfs_path)
client.upload(hdfs_path, result_path, cleanup=True)
print("删除本地包中.................")
os.remove(result_path)
pass
if __name__ == '__main__':
zip_yswgutils_to_hdfs()
# local_dir = "E:\Amazon-Selection\\Pyspark_job"
# remote_dir = "/opt/module/spark/demo/py_demo"
# local_dir = "E:\Amazon-Selection\\Pyspark_job"
# remote_dir = "/opt/module/spark-3.2.0-bin-hadoop3.2/python/lib/"
# local_dir = "E:\Amazon-Selection\\Pyspark_job"
# remote_dir = "/tmp/wjc_py"
# result_path = handle_zip(local_dir, "result.zip")
# put_and_unzip(result_path, remote_dir)
# local_file = r"E:\Amazon-Selection\Pyspark_job\dwd\dwd_asin_title_number.py"
# remote_file = "/opt/module/spark/demo/py_demo/dwd/dwd_asin_title_number.py"
# local_file = r"E:\Amazon-Selection\Pyspark_job\yswg_utils\dist\yswg_utils-0.1-py3.9.egg"
# remote_file = "E:\Amazon-Selection\Pyspark_job\yswg_utils\dist\yswg_utils-0.1-py3.9.egg"
# replace_file(local_file, remote_file)
pass
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment