1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.common_util import CommonUtil
from utils.db_util import DBUtil, DbTypes
from utils.spark_util import SparkUtil
from yswg_utils.common_df import get_asin_unlanuch_df
from pyspark.sql import functions as F
from pyspark.sql.types import BooleanType
import re
"""
asin 标题 卖点 查询
"""
class ExportAsinDesc(object):
def __init__(self, site_name):
app_name = f"{self.__class__.__name__}"
self.site_name = site_name
self.spark = SparkUtil.get_spark_session(app_name)
self.udf_has_keyword_reg = self.spark.udf.register('udf_has_keyword_reg', self.udf_has_keyword, BooleanType())
pass
@staticmethod
def udf_has_keyword(text: str):
if text is None:
return False
all = re.findall(r"(\s|\b)(US|UK|FR|DE|IT|ES|MX|CA)(\s|\b)", text, flags=re.IGNORECASE)
return len(all) > 0
def run(self):
sql = f"""
select asin,
title,
describe,
created_at,
site as site_name
from us_self_asin_detail_2023_08_14
where title is not null
"""
print(sql)
conn_info = DBUtil.get_connection_info(db_type=DbTypes.postgresql.name, site_name=self.site_name)
save_df = SparkUtil.read_jdbc_query(
session=self.spark,
url=conn_info["url"],
pwd=conn_info["pwd"],
username=conn_info["username"],
query=sql
).cache()
save_df = save_df.where(F.expr("udf_has_keyword_reg(title) = 1"))
unlanuch_df = get_asin_unlanuch_df(self.site_name, spark_session=self.spark)
# 排除下架的asin
save_df = save_df.join(unlanuch_df, on=['asin'], how='left_anti')
save_df = save_df.select(
F.col("asin"),
F.col("title"),
F.col("describe"),
F.col("site_name"),
F.col("created_at"),
)
save_df.write.saveAsTable(name="export_asin_desc_tmp", format='hive', mode='overwrite')
pass
def export(self):
"""
导出
:return:
"""
# CommonUtil.df_export_csv(self.spark, )
pass
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
method = CommonUtil.get_sys_arg(2, "run")
assert site_name is not None, "站点不能为空!"
assert method is not None, "method不能为空!"
obj = ExportAsinDesc(site_name)
if method == 'run':
obj.run()
elif method == 'export':
obj.export()