1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.db_util import DBUtil, DbTypes
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils
from utils.spark_util import SparkUtil
from pyspark.sql import functions as F, Window
from pyspark.sql import SparkSession
def get_update_df(spark: SparkSession, module: str, site_name: str, fileds: list):
info = DBUtil.get_connection_info(DbTypes.postgresql.name, 'us')
df_all = []
for filed in fileds:
sql = f"""
select edit_key_id,
val_after as {filed}
from (
select filed,
edit_key_id,
val_after,
row_number() over ( partition by module,site_name, filed, edit_key_id order by id desc ) as last_row
from sys_edit_log
where val_after is not null
and edit_key_id is not null
and edit_key_id != ''
and user_id != 'admin'
and site_name = '{site_name}'
and module in ('{module}')
and filed in ('{filed}')
) tmp
where last_row = 1
"""
print(sql)
df = SparkUtil.read_jdbc_query(
session=spark,
url=info['url'],
pwd=info['pwd'],
username=info['username'],
query=sql
)
df_all.append(df)
pass
df_rel = df_all[0]
for df in df_all[1:]:
df_rel = df_rel.join(df, how="fullouter", on=['edit_key_id'])
return df_rel
def update():
spark = SparkUtil.get_spark_session("usr_mask_update")
config_row = {
"ABA搜索词(新)": {
"fileds": ['usr_mask_type', 'usr_mask_progress'],
"key": "search_term",
"hive_table": "dwt_aba_st_analytics",
"rel_table": "us_aba_last_month",
},
# "店铺Feedback": {
# "fileds": ['usr_mask_type', 'usr_mask_progress'],
# "key": "search_term",
# "hive_table": "dwt_aba_st_analytics"
# },
# "AbaWordYear": {
# "fileds": ['usr_mask_type', 'usr_mask_progress'],
# "key": "search_term",
# "hive_table": "dwt_aba_last365"
# }
}
for module in config_row.keys():
fileds = config_row[module]['fileds']
key = config_row[module]['key']
hive_table = config_row[module]['hive_table']
site_name = 'us'
target_update_tb = "usr_mask_update_tmp"
df_all = get_update_df(spark, module, site_name, fileds=fileds)
info = DBUtil.get_connection_info(DbTypes.postgresql_cluster.name, 'us')
df_all.write.jdbc(info['url'], target_update_tb, mode='overwrite',
properties={'user': info['username'], 'password': info['pwd']})
month_rows = CommonUtil.select_partitions_df(spark, hive_table) \
.where(f"site_name= '{site_name}' and date_type= 'month' and date_info >= '2024-01' ") \
.select(F.col("date_info")) \
.sort(F.col("date_info").desc()) \
.toPandas().to_dict(orient='records')
suffixs = [it['date_info'] for it in month_rows]
DBUtil.get_db_engine(DbTypes.postgresql_cluster.name, site_name)
for suffix in suffixs:
suffix = suffix.replace("-", "_")
update_sql = f"""
update us_aba_last_month_{suffix} tb1
set {",".join([f"{it} = tmp.{it}" for it in fileds])}
from {target_update_tb} tmp
where tb1.{key} = tmp.edit_key_id;
"""
# 关联更新
DBUtil.exec_sql(DbTypes.postgresql_cluster.name, site_name, update_sql, True)
pass
print("success")
pass
pass
if __name__ == '__main__':
update()
pass