Commit 7e1f38dc by chenyuanjie

no message

parent 57100823
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
assert site_name is not None, "site_name 不能为空!"
# hive_tb = "tmp_asin_state"
#
# partition_dict = {
# "site_name": site_name
# }
#
# hdfs_path = CommonUtil.build_hdfs_path(hive_tb , partition_dict)
# print(f"hdfs_path is {hdfs_path}")
#
# query = f"""
# select
# asin,
# state,
# created_at,
# updated_at,
# 3 as flag
# from us_all_syn_st_history_2022
# where 1 = 1
# and \$CONDITIONS
# """
# print(query)
# db_type = "mysql"
# empty_flag, check_flag = CommonUtil.check_schema_before_import(db_type=db_type,
# site_name=site_name,
# query=query,
# hive_tb_name=hive_tb,
# msg_usr=['chenyuanjie']
# )
#
# if not empty_flag:
# sh = CommonUtil.build_import_sh_v2(site_name=site_name,
# db_type=db_type,
# query=query,
# hdfs_path=hdfs_path,
# map_num=15,
# key="state"
# )
#
# client = SSHUtil.get_ssh_client()
# SSHUtil.exec_command_async(client, sh, ignore_err=False)
# CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_tb)
# client.close()
#
# # 导入后检测--检测数据一致性
# CommonUtil.check_import_sync_num(db_type=db_type,
# partition_dict=partition_dict,
# import_query=query,
# hive_tb_name=hive_tb,
# msg_usr=['chenyuanjie']
# )
# 导出到pg数据库
db_type = "postgresql"
export_tb = "us_all_syn_st_asin"
sh = CommonUtil.build_export_sh(
site_name=site_name,
db_type=db_type,
hive_tb="tmp_asin_state_copy",
export_tb=export_tb,
col=[
"asin",
"state"
],
partition_dict={
"site_name": site_name
}
)
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, sh, ignore_err=False)
client.close()
pass
\ No newline at end of file
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
if __name__ == '__main__':
# 导出到pg数据库
db_type = "postgresql_cluster"
export_tb = "de_st_month_2022_9_old"
sh = CommonUtil.build_export_sh(
site_name="de",
db_type=db_type,
hive_tb="tmp_st_month_2110_2208",
export_tb=export_tb,
col=[
"week",
"asin",
"search_term",
"ao_val",
"orders",
"orders_sum",
"flow",
"order_flow",
"search_num",
"search_rank",
"quantity_being_sold",
"adv_compet",
"zr_page_rank",
"zr_page",
"zr_page_row",
"sp_page",
"sp_page_rank",
"sp_page_row",
"sb1_page",
"sb2_page",
"sb3_page",
"ac_page",
"bs_page",
"er_page",
"tr_page",
"search_term_type",
"created_at",
"updated_at"
],
partition_dict={
"site_name": "de",
"year_month": "2022-9-old"
}
)
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, sh, ignore_err=False)
client.close()
pass
\ No newline at end of file
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
from utils.db_util import DBUtil
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
date_type = CommonUtil.get_sys_arg(2, None)
date_info = CommonUtil.get_sys_arg(3, None)
print(f"执行参数为{sys.argv}")
db_type = "postgresql"
print("导出到PG库中")
year_str = CommonUtil.safeIndex(date_info.split("-"), 0, None)
suffix = str(date_info).replace("-", "_")
base_tb = f"{site_name}_aba_last_top_asin"
export_master_tb = f"{base_tb}_{year_str}"
export_tb = f"{base_tb}_{suffix}"
next_month = CommonUtil.get_month_offset(date_info, 1)
engine = DBUtil.get_db_engine(db_type, site_name)
with engine.connect() as connection:
sql = f"""
drop table if exists {export_tb};
create table if not exists {export_tb}
(
like {export_master_tb} including comments
);
"""
print("================================执行sql================================")
print(sql)
connection.execute(sql)
# 导出表名
sh = CommonUtil.build_export_sh(
site_name=site_name,
db_type=db_type,
hive_tb="dwt_st_top_asin_info",
export_tb=export_tb,
col=[
"site_name",
"search_term_id",
"search_term",
"asin",
"date_info",
"data_type",
"zr_rank",
"created_time",
"updated_time"
],
partition_dict={
"site_name": site_name,
"date_type": date_type,
"date_info": date_info
}
)
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, sh, ignore_err=False)
client.close()
# 创建索引并交换分区
DBUtil.add_pg_part(
engine,
source_tb_name=export_tb,
part_master_tb=export_master_tb,
part_val={
"from": [date_info],
"to": [next_month]
},
cp_index_flag=True,
)
print("success")
print("success")
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
year_month = CommonUtil.get_sys_arg(2, None)
assert site_name is not None, "site_name 不能为空!"
assert year_month is not None, "year_month 不能为空!"
year,month = year_month.split("-")
hive_tb = "tmp_st_month_2110_2208"
partition_dict = {
"site_name": site_name,
"year_month": year_month
}
hdfs_path = CommonUtil.build_hdfs_path(hive_tb,partition_dict)
print(f"hdfs_path is {hdfs_path}")
query = f"""
select
week,
asin,
search_term,
ao_val,
orders,
orders_sum,
flow,
order_flow,
search_num,
search_rank,
quantity_being_sold,
adv_compet,
zr_page_rank,
zr_page,
zr_page_row,
sp_page,
sp_page_rank,
sp_page_row,
sb1_page,
sb2_page,
sb3_page,
ac_page,
bs_page,
er_page,
tr_page,
search_term_type,
created_at,
updated_at,
id
from {site_name}_st_month_{year}_{month}
where 1 = 1
and \$CONDITIONS
"""
print(query)
db_type = "mysql"
empty_flag, check_flag = CommonUtil.check_schema_before_import(db_type=db_type,
site_name=site_name,
query=query,
hive_tb_name=hive_tb,
msg_usr=['chenyuanjie']
)
if not empty_flag:
sh = CommonUtil.build_import_sh_v2(site_name=site_name,
db_type=db_type,
query=query,
hdfs_path=hdfs_path,
map_num=10,
key="id"
)
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, sh, ignore_err=False)
CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_tb)
client.close()
# 导入后检测--检测数据一致性
CommonUtil.check_import_sync_num(db_type=db_type,
partition_dict=partition_dict,
import_query=query,
hive_tb_name=hive_tb,
msg_usr=['chenyuanjie']
)
pass
\ No newline at end of file
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
year_month = CommonUtil.get_sys_arg(2, None)
assert site_name is not None, "site_name 不能为空!"
assert year_month is not None, "year_month 不能为空!"
year,month = year_month.split("-")
# 导出到pg数据库
db_type = "postgresql_cluster"
export_tb = f"{site_name}_st_month_{year}_{month}"
sh = CommonUtil.build_export_sh(
site_name=site_name,
db_type=db_type,
hive_tb="tmp_st_month_2110_2208",
export_tb=export_tb,
col=[
"week",
"asin",
"search_term",
"ao_val",
"orders",
"orders_sum",
"flow",
"order_flow",
"search_num",
"search_rank",
"quantity_being_sold",
"adv_compet",
"zr_page_rank",
"zr_page",
"zr_page_row",
"sp_page",
"sp_page_rank",
"sp_page_row",
"sb1_page",
"sb2_page",
"sb3_page",
"ac_page",
"bs_page",
"er_page",
"tr_page",
"search_term_type",
"created_at",
"updated_at"
],
partition_dict={
"site_name": site_name,
"year_month": year_month
}
)
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, sh, ignore_err=False)
client.close()
pass
\ No newline at end of file
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
year_month = CommonUtil.get_sys_arg(2, None)
assert site_name is not None, "site_name 不能为空!"
assert year_month is not None, "year_month 不能为空!"
year,month = year_month.split("-")
hive_tb = "tmp_st_month_2209_2303"
partition_dict = {
"site_name": site_name,
"year_month": year_month
}
hdfs_path = CommonUtil.build_hdfs_path(hive_tb,partition_dict)
print(f"hdfs_path is {hdfs_path}")
query = f"""
select
id,
search_term,
st_ao_val,
st_type,
st_rank,
st_rank_avg,
st_search_num,
st_search_rate,
st_search_sum,
st_adv_counts,
st_quantity_being_sold,
asin,
asin_st_zr_orders,
asin_st_zr_orders_sum,
asin_st_zr_flow,
asin_st_sp_orders,
asin_st_sp_orders_sum,
asin_st_sp_flow,
st_asin_zr_page,
st_asin_zr_page_row,
st_asin_zr_page_rank,
st_asin_zr_updated_at,
st_asin_sp_page,
st_asin_sp_page_rank,
st_asin_sp_page_row,
st_asin_sp_updated_at,
st_asin_sb1_page,
st_asin_sb1_updated_at,
st_asin_sb2_page,
st_asin_sb2_updated_at,
st_asin_sb3_page,
st_asin_sb3_updated_at,
st_asin_ac_page,
st_asin_ac_updated_at,
st_asin_bs_page,
st_asin_bs_updated_at,
st_asin_er_page,
st_asin_er_updated_at,
st_asin_tr_page,
st_asin_tr_updated_at,
created_at,
updated_at
from {site_name}_st_month_{year}_{month}
where 1 = 1
and \$CONDITIONS
"""
print(query)
db_type = "mysql"
empty_flag, check_flag = CommonUtil.check_schema_before_import(db_type=db_type,
site_name=site_name,
query=query,
hive_tb_name=hive_tb,
msg_usr=['chenyuanjie']
)
if not empty_flag:
sh = CommonUtil.build_import_sh_v2(site_name=site_name,
db_type=db_type,
query=query,
hdfs_path=hdfs_path,
map_num=10,
key="id"
)
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, sh, ignore_err=False)
CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_tb)
client.close()
# 导入后检测--检测数据一致性
CommonUtil.check_import_sync_num(db_type=db_type,
partition_dict=partition_dict,
import_query=query,
hive_tb_name=hive_tb,
msg_usr=['chenyuanjie']
)
pass
\ No newline at end of file
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
year_month = CommonUtil.get_sys_arg(2, None)
assert site_name is not None, "site_name 不能为空!"
assert year_month is not None, "year_month 不能为空!"
year,month = year_month.split("-")
# 导出到pg数据库
db_type = "postgresql_cluster"
export_tb = f"{site_name}_st_month_{year}_{month}"
sh = CommonUtil.build_export_sh(
site_name=site_name,
db_type=db_type,
hive_tb="tmp_st_month_2209_2303",
export_tb=export_tb,
col=[
"search_term",
"st_ao_val",
"st_type",
"st_rank",
"st_rank_avg",
"st_search_num",
"st_search_rate",
"st_search_sum",
"st_adv_counts",
"st_quantity_being_sold",
"asin",
"asin_st_zr_orders",
"asin_st_zr_orders_sum",
"asin_st_zr_flow",
"asin_st_sp_orders",
"asin_st_sp_orders_sum",
"asin_st_sp_flow",
"st_asin_zr_page",
"st_asin_zr_page_row",
"st_asin_zr_page_rank",
"st_asin_zr_updated_at",
"st_asin_sp_page",
"st_asin_sp_page_rank",
"st_asin_sp_page_row",
"st_asin_sp_updated_at",
"st_asin_sb1_page",
"st_asin_sb1_updated_at",
"st_asin_sb2_page",
"st_asin_sb2_updated_at",
"st_asin_sb3_page",
"st_asin_sb3_updated_at",
"st_asin_ac_page",
"st_asin_ac_updated_at",
"st_asin_bs_page",
"st_asin_bs_updated_at",
"st_asin_er_page",
"st_asin_er_updated_at",
"st_asin_tr_page",
"st_asin_tr_updated_at",
"created_at",
"updated_at"
],
partition_dict={
"site_name": site_name,
"year_month": year_month
}
)
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, sh, ignore_err=False)
client.close()
pass
\ No newline at end of file
if __name__ == '__main__':
num = int('2024-05'.split('-')[-1])
print(num)
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
year_month = CommonUtil.get_sys_arg(2, None)
assert site_name is not None, "site_name 不能为空!"
assert year_month is not None, "year_month 不能为空!"
year,month = year_month.split("-")
hive_tb = "tmp_st_month_2110_2208"
partition_dict = {
"site_name": site_name,
"year_month": year_month
}
hdfs_path = CommonUtil.build_hdfs_path(hive_tb,partition_dict)
print(f"hdfs_path is {hdfs_path}")
query = f"""
select
week,
asin,
search_term,
ao_val,
orders,
orders_sum,
flow,
order_flow,
search_num,
search_rank,
quantity_being_sold,
adv_compet,
zr_page_rank,
zr_page,
zr_page_row,
sp_page,
sp_page_rank,
sp_page_row,
sb1_page,
sb2_page,
sb3_page,
ac_page,
bs_page,
er_page,
tr_page,
search_term_type,
created_at,
updated_at,
id
from {site_name}_st_month_{year}_{month}
where 1 = 1
and \$CONDITIONS
"""
print(query)
db_type = "mysql"
empty_flag, check_flag = CommonUtil.check_schema_before_import(db_type=db_type,
site_name=site_name,
query=query,
hive_tb_name=hive_tb,
msg_usr=['chenyuanjie']
)
if not empty_flag:
sh = CommonUtil.build_import_sh_v2(site_name=site_name,
db_type=db_type,
query=query,
hdfs_path=hdfs_path,
)
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, sh, ignore_err=False)
CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_tb)
client.close()
# 导入后检测--检测数据一致性
CommonUtil.check_import_sync_num(db_type=db_type,
partition_dict=partition_dict,
import_query=query,
hive_tb_name=hive_tb,
msg_usr=['chenyuanjie']
)
pass
\ No newline at end of file
import json
import subprocess
from datetime import datetime, time
import sys
from pyspark.sql import SparkSession
from Pyspark_job.utils import common_util
from Pyspark_job.utils import DolphinschedulerHelper
from yswg_utils.common_df import get_asin_unlanuch_df
from Pyspark_job.utils.spark_util import SparkUtil
import script.pg14_to_pg6 as sc
from Pyspark_job.script import post_to_dolphin
import subprocess
if __name__ == '__main__':
# date_info = '2023_34'
# table_names = f"us_search_term_rank_er_{date_info}," \
# f"us_search_term_rank_hr_{date_info},us_search_term_rank_tr_{date_info},us_other_search_term_{date_info}," \
# f"us_brand_analytics_{date_info}"
# post_to_dolphin.DolphinschedulerHelper.start_process_instance('us', '2023-34', table_names, 'aba')
str.upper("seatunnel")
\ No newline at end of file
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils
from utils.db_util import DBUtil
if __name__ == '__main__':
site_name = "us"
hive_tb = f"tmp_asin_image"
partition_dict = {
"site_name": "us14",
}
hdfs_path = CommonUtil.build_hdfs_path(hive_tb, partition_dict=partition_dict)
print(f"hdfs_path is {hdfs_path}")
query = f"""
select
asin,
img_url,
img_order_by,
created_at,
updated_at,
data_type
from {site_name}_asin_image_pyb_copy
where 1 = 1
and \$CONDITIONS
"""
print(query)
db_type = "postgresql_14"
empty_flag, check_flag = CommonUtil.check_schema_before_import(db_type=db_type,
site_name=site_name,
query=query,
hive_tb_name=hive_tb,
msg_usr=['chenyuanjie']
)
if not empty_flag:
sh = CommonUtil.build_import_sh(site_name=site_name,
db_type=db_type,
query=query,
hdfs_path=hdfs_path,
map_num=10,
key='id')
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, sh, ignore_err=False)
CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_tb)
client.close()
#导入后检测--检测数据一致性
CommonUtil.check_import_sync_num(db_type=db_type,
partition_dict=partition_dict,
import_query=query,
hive_tb_name=hive_tb,
msg_usr=['chenyuanjie']
)
# # 导出到pg数据库
# db_type = "postgresql"
# export_tb = f"{site_name}_asin_image_copy"
#
# # 导出表名
# sh = CommonUtil.build_export_sh(
# site_name=site_name,
# db_type=db_type,
# hive_tb="tmp_asin_image_copy",
# export_tb=export_tb,
# col=[
# "asin",
# "img_url",
# "img_order_by",
# "created_at",
# "updated_at",
# "data_type"
# ],
# partition_dict={
# "site_name": site_name
# }
# )
# client = SSHUtil.get_ssh_client()
# SSHUtil.exec_command_async(client, sh, ignore_err=False)
# client.close()
pass
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils
from utils.db_util import DBUtil
if __name__ == '__main__':
site_name = "de"
# 导出到pg数据库
db_type = "postgresql"
export_tb = "de_asin_image_copy"
# 导出表名
sh = CommonUtil.build_export_sh(
site_name="de",
db_type=db_type,
hive_tb="tmp_asin_image_lzo",
export_tb=export_tb,
col=[
"asin",
"img_url",
"img_order_by",
"created_at",
"updated_at",
"data_type"
],
partition_dict={
"site_name": "de"
}
)
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, sh, ignore_err=False)
client.close()
pass
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils
from utils.db_util import DBUtil
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
assert site_name is not None, "site_name 不能为空!"
# hive_tb = f"tmp_asin_b09"
# partition_dict = {
# "site_name": site_name,
# }
# hdfs_path = CommonUtil.build_hdfs_path(hive_tb, partition_dict=partition_dict)
# print(f"hdfs_path is {hdfs_path}")
#
# query = f"""
# select
# asin,
# price,
# rating,
# total_comments,
# page_inventory,
# `rank`,
# img_num,
# ao_val,
# bsr_orders,
# sales,
# data_at,
# created_at,
# updated_at,
# year_week,
# '{site_name}' as site_name
# from {site_name}_asin_b09
# where 1 = 1
# and \$CONDITIONS
# """
# print(query)
# db_type = "mysql"
# empty_flag, check_flag = CommonUtil.check_schema_before_import(db_type=db_type,
# site_name=site_name,
# query=query,
# hive_tb_name=hive_tb,
# msg_usr=['chenyuanjie']
# )
#
# if not empty_flag:
# sh = CommonUtil.build_import_sh(site_name=site_name,
# db_type=db_type,
# query=query,
# hdfs_path=hdfs_path)
# # 导入前先删除
# HdfsUtils.delete_hdfs_file(hdfs_path)
# client = SSHUtil.get_ssh_client()
# SSHUtil.exec_command_async(client, sh, ignore_err=False)
# CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_tb)
# client.close()
#
# # 导入后检测--检测数据一致性
# CommonUtil.check_import_sync_num(db_type=db_type,
# partition_dict=partition_dict,
# import_query=query,
# hive_tb_name=hive_tb,
# msg_usr=['chenyuanjie']
# )
# 导出到pg数据库
db_type = "postgresql"
export_tb = f"{site_name}_asin_b09"
sh = CommonUtil.build_export_sh(
site_name=site_name,
db_type=db_type,
hive_tb="tmp_asin_b09",
export_tb=export_tb,
col=[
"asin",
"price",
"rating",
"total_comments",
"page_inventory",
"rank",
"img_num",
"ao_val",
"bsr_orders",
"sales",
"data_at",
"created_at",
"updated_at",
"year_week"
],
partition_dict={
"site_name": site_name,
}
)
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, sh, ignore_err=False)
client.close()
pass
\ No newline at end of file
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils
from utils.db_util import DBUtil
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
assert site_name is not None, "site_name 不能为空!"
# hive_tb = f"tmp_asin_detail_trend_month"
#
# partition_dict = {
# "site_name": site_name,
# }
# hdfs_path = CommonUtil.build_hdfs_path(hive_tb, partition_dict=partition_dict)
# print(f"hdfs_path is {hdfs_path}")
#
# query = f"""
# select
# asin,
# ym,
# rank_rise,
# rank_change,
# ao_rise,
# ao_change,
# price_rise,
# price_change,
# orders_rise,
# orders_change,
# rating_rise,
# rating_change,
# comments_rise,
# comments_change,
# bsr_orders_rise,
# bsr_orders_change,
# sales_rise,
# sales_change,
# variation_num,
# variation_rise,
# variation_change,
# created_at,
# updated_at
# from {site_name}_asin_detail_trend_month
# where 1 = 1
# and \$CONDITIONS
# """
# print(query)
# db_type = "mysql"
# empty_flag, check_flag = CommonUtil.check_schema_before_import(db_type=db_type,
# site_name=site_name,
# query=query,
# hive_tb_name=hive_tb,
# msg_usr=['chenyuanjie']
# )
#
# if not empty_flag:
# sh = CommonUtil.build_import_sh(site_name=site_name,
# db_type=db_type,
# query=query,
# hdfs_path=hdfs_path)
#
# client = SSHUtil.get_ssh_client()
# SSHUtil.exec_command_async(client, sh, ignore_err=False)
# CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_tb)
# client.close()
#
# # 导入后检测--检测数据一致性
# CommonUtil.check_import_sync_num(db_type=db_type,
# partition_dict=partition_dict,
# import_query=query,
# hive_tb_name=hive_tb,
# msg_usr=['chenyuanjie']
# )
# 导出到pg数据库
db_type = "postgresql"
export_tb = f"{site_name}_asin_detail_trend_month"
sh = CommonUtil.build_export_sh(
site_name=site_name,
db_type=db_type,
hive_tb="tmp_asin_detail_trend_month",
export_tb=export_tb,
col=[
"asin",
"ym",
"rank_rise",
"rank_change",
"ao_rise",
"ao_change",
"price_rise",
"price_change",
"orders_rise",
"orders_change",
"rating_rise",
"rating_change",
"comments_rise",
"comments_change",
"bsr_orders_rise",
"bsr_orders_change",
"sales_rise",
"sales_change",
"variation_num",
"variation_rise",
"variation_change",
"created_at",
"updated_at"
],
partition_dict={
"site_name": site_name,
}
)
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, sh, ignore_err=False)
client.close()
pass
\ No newline at end of file
import os
import sys
from pyspark.storagelevel import StorageLevel
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from utils.templates import Templates
# from ..utils.templates import Templates
# from AmazonSpider.pyspark_job.utils.templates import Templates
from pyspark.sql.types import StringType
# 分组排序的udf窗口函数
from pyspark.sql.window import Window
from pyspark.sql import functions as F
class AsinState(Templates):
def __init__(self):
super().__init__()
self.site_name = "us"
self.db_save = f"tmp_asin_state_copy"
self.spark = self.create_spark_object(app_name=f"{self.db_save}: {self.site_name}")
self.df_save = self.spark.sql(f"select 1+1;")
self.df = self.spark.sql(f"select 1+1;")
self.partitions_by = ['site_name']
self.reset_partitions(partitions_num=1)
def read_data(self):
sql = f"""
select
asin,
state,
updated_at,
flag,
site_name
from
tmp_asin_state
where
site_name = 'us';
"""
self.df = self.spark.sql(sqlQuery=sql).cache()
def handle_data(self):
df_window = Window.partitionBy(["asin"]).orderBy(self.df.flag.asc(), self.df.updated_at.desc())
self.df = self.df.withColumn("rk", F.row_number().over(window=df_window))
self.df_save = self.df.filter("rk = 1")
self.df_save = self.df_save.drop("flag").drop("rk").drop("updated_at")
if __name__ == "__main__":
handle_obj = AsinState()
handle_obj.run()
import os
import sys
from pyspark.storagelevel import StorageLevel
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from utils.templates import Templates
# from ..utils.templates import Templates
# from AmazonSpider.pyspark_job.utils.templates import Templates
from pyspark.sql.types import StringType
# 分组排序的udf窗口函数
from pyspark.sql.window import Window
from pyspark.sql import functions as F
class DeAsinImage(Templates):
def __init__(self):
super().__init__()
self.site_name = "de"
self.db_save = f"tmp_asin_image_copy"
self.spark = self.create_spark_object(app_name=f"{self.db_save}: {self.site_name}")
self.df_save = self.spark.sql(f"select 1+1;")
self.df1 = self.spark.sql(f"select 1+1;")
self.df2 = self.spark.sql(f"select 1+1;")
self.df = self.spark.sql(f"select 1+1;")
self.partitions_by = ['site_name']
self.reset_partitions(partitions_num=1)
def read_data(self):
sql1 = f"""
select
*,
1 as flag
from
tmp_asin_image
where
site_name = 'de1';
"""
sql2 = f"""
select
*,
2 as flag
from
tmp_asin_image
where
site_name = 'de';
"""
self.df1 = self.spark.sql(sqlQuery=sql1).cache()
self.df2 = self.spark.sql(sqlQuery=sql2).cache()
def handle_data(self):
self.df = self.df1.unionAll(self.df2)
df_window = Window.partitionBy(["asin"]).orderBy(self.df.flag.asc())
self.df = self.df.withColumn("rk",F.dense_rank().over(window=df_window))
self.df_save = self.df.filter("rk = 1")
self.df_save = self.df_save.drop("flag").drop("rk")
if __name__ == "__main__":
handle_obj = DeAsinImage()
handle_obj.run()
\ No newline at end of file
import os
import sys
from pyspark.storagelevel import StorageLevel
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from utils.templates import Templates
# from ..utils.templates import Templates
# from AmazonSpider.pyspark_job.utils.templates import Templates
from pyspark.sql.types import StringType
# 分组排序的udf窗口函数
from pyspark.sql.window import Window
from pyspark.sql import functions as F
class UsAsinImage(Templates):
def __init__(self):
super().__init__()
self.site_name = "uk"
self.db_save = f"tmp_asin_image_lzo"
self.spark = self.create_spark_object(app_name=f"{self.db_save}: {self.site_name}")
self.df_save = self.spark.sql(f"select 1+1;")
self.df1 = self.spark.sql(f"select 1+1;")
self.df2 = self.spark.sql(f"select 1+1;")
self.df = self.spark.sql(f"select 1+1;")
self.partitions_by = ['site_name']
self.reset_partitions(partitions_num=1)
def read_data(self):
sql1 = f"""
select
*,
1 as flag
from
tmp_asin_image_copy
where
site_name = 'us14'
limit 10;
"""
sql2 = f"""
select
*,
2 as flag
from
tmp_asin_image_copy
where
site_name = 'us6'
limit 10;
"""
self.df1 = self.spark.sql(sqlQuery=sql1).cache()
self.df2 = self.spark.sql(sqlQuery=sql2).cache()
self.df = self.df1.unionAll(self.df2)
df_window = Window.partitionBy(["asin"]).orderBy(self.df.flag.desc())
self.df = self.df.withColumn("rk",F.dense_rank().over(window=df_window))
self.df_save = self.df.filter("rk = 1")
self.df_save.withColumn("site_name",F.lit("us"))
self.df_save = self.df_save.drop("flag").drop("rk").show()
if __name__ == "__main__":
obj = UsAsinImage()
obj.read_data()
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils
from utils.db_util import DBUtil
if __name__ == '__main__':
site_name = "us"
hive_tb = f"tmp_asin_image"
partition_dict = {
"site_name": "us6",
}
hdfs_path = CommonUtil.build_hdfs_path(hive_tb, partition_dict=partition_dict)
print(f"hdfs_path is {hdfs_path}")
query = f"""
select
asin,
img_url,
img_order_by,
created_at,
updated_at,
data_type
from {site_name}_asin_image
where 1 = 1
and \$CONDITIONS
"""
print(query)
db_type = "postgresql"
empty_flag, check_flag = CommonUtil.check_schema_before_import(db_type=db_type,
site_name=site_name,
query=query,
hive_tb_name=hive_tb,
msg_usr=['chenyuanjie']
)
if not empty_flag:
sh = CommonUtil.build_import_sh(site_name=site_name,
db_type=db_type,
query=query,
hdfs_path=hdfs_path)
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, sh, ignore_err=False)
CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_tb)
client.close()
#导入后检测--检测数据一致性
CommonUtil.check_import_sync_num(db_type=db_type,
partition_dict=partition_dict,
import_query=query,
hive_tb_name=hive_tb,
msg_usr=['chenyuanjie']
)
# # 导出到pg数据库
# db_type = "postgresql"
# export_tb = f"{site_name}_asin_image_copy"
#
# # 导出表名
# sh = CommonUtil.build_export_sh(
# site_name=site_name,
# db_type=db_type,
# hive_tb="tmp_asin_image_copy",
# export_tb=export_tb,
# col=[
# "asin",
# "img_url",
# "img_order_by",
# "created_at",
# "updated_at",
# "data_type"
# ],
# partition_dict={
# "site_name": site_name
# }
# )
# client = SSHUtil.get_ssh_client()
# SSHUtil.exec_command_async(client, sh, ignore_err=False)
# client.close()
pass
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils
from utils.db_util import DBUtil
if __name__ == '__main__':
site_name = "us"
#
# hive_tb = "tmp_bs_category_asin"
#
# hdfs_path = CommonUtil.build_hdfs_path(hive_tb)
# print(f"hdfs_path is {hdfs_path}")
#
# query = """
# select
# asin,
# cate_1_id,
# cate_current_id,
# week,
# `year_month`,
# created_at,
# updated_at
# from us_bs_category_asin
# where 1 = 1
# and \$CONDITIONS
# """
# print(query)
# db_type = "mysql"
# empty_flag, check_flag = CommonUtil.check_schema_before_import(db_type=db_type,
# site_name=site_name,
# query=query,
# hive_tb_name=hive_tb,
# msg_usr=['chenyuanjie']
# )
#
# if not empty_flag:
# sh = CommonUtil.build_import_sh(site_name=site_name,
# db_type=db_type,
# query=query,
# hdfs_path=hdfs_path)
# # 导入前先删除
# HdfsUtils.delete_hdfs_file(hdfs_path)
# client = SSHUtil.get_ssh_client()
# SSHUtil.exec_command_async(client, sh, ignore_err=False)
# CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_tb)
# client.close()
#
# # 导入后检测--检测数据一致性
# CommonUtil.check_import_sync_num(db_type=db_type,
# import_query=query,
# hive_tb_name=hive_tb,
# msg_usr=['chenyuanjie']
# )
# 导出到pg数据库
db_type = "postgresql"
export_tb = "us_bs_category_asin"
sh = CommonUtil.build_export_sh(
site_name=site_name,
db_type=db_type,
hive_tb="tmp_bs_category_asin",
export_tb=export_tb,
col=[
"asin",
"cate_1_id",
"cate_current_id",
"week",
"year_month",
"created_at",
"updated_at"
],
partition_dict={
"site_name": site_name,
}
)
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, sh, ignore_err=False)
client.close()
pass
\ No newline at end of file
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils
from utils.db_util import DBUtil
if __name__ == '__main__':
# 导出到pg数据库
db_type = "postgresql"
export_tb = f"us_st_year_week"
sh = CommonUtil.build_export_sh(
site_name="us",
db_type=db_type,
hive_tb="dim_st_year_week",
export_tb=export_tb,
col=[
"search_term",
"st_key",
"year_week"
],
partition_dict={
"site_name": "us",
}
)
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, sh, ignore_err=False)
client.close()
pass
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment