1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import os
import sys
import pandas as pd
sys.path.append(os.path.dirname(sys.path[0]))
from utils.spark_util import SparkUtil
from pyspark.sql.functions import col, lit
from utils.db_util import DbTypes, DBUtil
from pyspark.sql.types import StructType, StructField, StringType
class UpdateStKeepaSyn(object):
def __init__(self, site_name="us", date_type="month", date_info="2022-01"):
self.site_name = site_name
self.date_type = date_type
self.date_info = date_info
app_name = f"{self.__class__.__name__}:{site_name}:{date_type}:{date_info}"
self.spark = SparkUtil.get_spark_session(app_name)
self.year = date_info.split('-')[0]
self.schema = StructType([StructField('asin', StringType(), False)])
# 数据库连接
self.engine_pg14 = DBUtil.get_db_engine(db_type=DbTypes.postgresql_14.name, site_name=self.site_name)
self.engine_pg15 = DBUtil.get_db_engine(db_type=DbTypes.postgresql.name, site_name=self.site_name)
# df对象
self.df_pg14 = self.spark.sql(f"select 1+1;")
self.df_pg15 = self.spark.sql(f"select 1+1;")
self.df_save = self.spark.sql(f"select 1+1;")
def run(self):
self.read_data()
self.handle_data()
self.save_data()
def read_data(self):
dt_info = self.date_info.replace('-', '_')
sql_pg14 = f"""
select asin from {self.site_name}_all_syn_st_month_{dt_info};
"""
df_asin = pd.read_sql(sql_pg14, con=self.engine_pg14)
self.df_pg14 = self.spark.createDataFrame(df_asin, schema=self.schema)
self.df_pg14 = self.df_pg14.drop_duplicates(['asin']).repartition(80, 'asin').cache()
print("从pg14的syn表读取最新月份asin:")
self.df_pg14.show(10)
sql_pg15 = f"""
select asin from {self.site_name}_st_keepa_syn_{self.year};
"""
df_asin = pd.read_sql(sql_pg15, con=self.engine_pg15)
self.df_pg15 = self.spark.createDataFrame(df_asin, schema=self.schema)
self.df_pg15 = self.df_pg15.drop_duplicates(['asin']).repartition(80, 'asin').cache()
print("从pg15的keepa表读取所有asin:")
self.df_pg15.show(10)
def handle_data(self):
# 找出df_pg14中存在,df_pg15中不存在的asin
self.df_save = self.df_pg14.join(
self.df_pg15, on='asin', how='left_anti'
)
self.df_save = self.df_save.withColumn(
'state', lit(7)
).withColumn(
'asin_trun_4', col('asin').substr(1, 4)
).cache()
self.df_pg14.unpersist()
self.df_pg15.unpersist()
self.df_save.show(10)
print("本月新增asin数量为:")
print(self.df_save.count())
def save_data(self):
self.df_save.write.format("jdbc") \
.option("url", "jdbc:postgresql://192.168.10.224:5433/selection") \
.option("dbtable", f"{self.site_name}_st_keepa_syn_{self.year}") \
.option("user", "yswg_postgres") \
.option("password", "yswg_postgres") \
.mode("append") \
.save()
if __name__ == "__main__":
site_name = sys.argv[1]
date_type = sys.argv[2]
date_info = sys.argv[3]
handle_obj = UpdateStKeepaSyn(site_name=site_name, date_type=date_type, date_info=date_info)
handle_obj.run()