1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
import random
import sys
# sys.path.append("/home/yswg/amazon_selection/")
import time
import traceback
import pandas as pd
import re
import nltk
# from utils.utils import Utils
from collections import Counter
from nltk.stem import WordNetLemmatizer
from nltk.corpus import stopwords
from nltk.sentiment import SentimentIntensityAnalyzer # 情绪打分模型
from app.context import create_engine_db
pd.set_option('expand_frame_repr', False) # 显示所有列
pd.set_option('display.max_colwidth', 100) # 显示打印的字符数
class CommentAnalytics():
def __init__(self, site_name='uk', asin='', phrases_counts=20):
# super(CommentAnalytics, self).__init__()
self.site_name = site_name
self.asin = asin
self.phrases_counts = phrases_counts
# self.engine_pg = self.class_db.db_connection(db_type="pg", db_node="h15", site_name=self.site_name)
self.engine_pg = create_engine_db(site_name=self.site_name)
self.table_name_read = self.get_table_name_read_by_asin()
self.table_name_save = f"{self.site_name}_asin_comment_analytics"
self.df_asin = pd.DataFrame()
self.df_save = pd.DataFrame()
def init_refresh(self, site_name, asin, phrases_counts):
self.site_name = site_name
self.asin = asin
self.phrases_counts = phrases_counts
# self.engine_pg = self.class_db.db_connection(db_type="pg", db_node="h15", site_name=self.site_name)
self.engine_pg = create_engine_db(site_name=self.site_name)
self.table_name_read = self.get_table_name_read_by_asin()
self.table_name_save = f"{self.site_name}_asin_comment_analytics"
self.df_asin = pd.DataFrame()
self.df_save = pd.DataFrame()
# @staticmethod
def _hd_clean_and_extract(self, text):
# 去掉标点符号和表情包
text = re.sub(r'[^\w\s]|_+', '', text)
# 将文本转换成小写
text = text.lower()
# 加载停用词
stop_words = set(stopwords.words('english'))
# 将文本分词,并过滤停用词
words = nltk.word_tokenize(text)
words = [word for word in words if word not in stop_words]
# 将词性标注统一为nltk库的标注方式
pos_tags = []
for word, tag in nltk.pos_tag(words):
if tag.startswith('NN'):
pos_tags.append((word, 'n'))
elif tag.startswith('VB'):
pos_tags.append((word, 'v'))
elif tag.startswith('JJ'):
pos_tags.append((word, 'a'))
elif tag.startswith('R'):
pos_tags.append((word, 'r'))
else:
pos_tags.append((word, None))
# 使用WordNetLemmatizer进行单复数合并
lemmatizer = WordNetLemmatizer()
pos_tags = [(lemmatizer.lemmatize(word, pos) if pos else word, pos) for word, pos in pos_tags]
# 使用nltk库的ngram函数进行短语提取,提取2-gram和3-gram
phrases_list = []
for i in range(2, 5):
ngrams = nltk.ngrams(pos_tags, i)
for ngram in ngrams:
# 过滤掉不合法的短语,只保留名词短语和形容词短语
if ngram[-1][1] == 'n' or ngram[-1][1] == 'a':
phrase = ' '.join([word[0] for word in ngram])
phrases_list.append(phrase)
phrases_counter = Counter(phrases_list)
# 按数量降序排序
sorted_items = sorted(phrases_counter.items(), key=lambda x: x[1], reverse=True)
keys_list, values_list = [], []
for k, v in sorted_items:
keys_list.append(k)
values_list.append(str(v))
if len(keys_list) >= self.phrases_counts:
keys_list, values_list = keys_list[:self.phrases_counts], values_list[:self.phrases_counts]
phrases_str = ",".join(keys_list)
phrases_counts_str = ",".join(values_list)
# 求和
phrases_counts_total = sum(map(int, values_list))
# 计算每个元素针对求和值的占比
percentages = [(int(x) / phrases_counts_total) for x in values_list]
# 将每个占比保留2位小数
phrases_counts_rate_list = [str(round(x, 4)) for x in percentages]
phrases_counts_rate_str = ",".join(phrases_counts_rate_list)
# print("总计词频数量:", phrases_counts_total)
# print("每个词组占总计词频的占比:", phrases_counts_rate_str)
return phrases_list, phrases_str, phrases_counts_str, phrases_counts_rate_str
def get_table_name_read_by_asin(self):
if self.asin.lower()[:3] in ['b01', 'b02', 'b03', 'b04', 'b05', 'b06']:
table_name_read = f'{self.site_name}_asin_comment_b01_b06'
elif self.asin.lower()[:3] in ['b00', 'b07', 'b08', 'b09']:
table_name_read = f'{self.site_name}_asin_comment_{self.asin.lower()[:3]}'
else:
table_name_read = f'{self.site_name}_asin_comment_other'
print("asin, table_name_read:", self.asin, table_name_read)
return table_name_read
def read_data(self):
sql = f"select asin, content, rating from {self.table_name_read} where asin='{self.asin}';"
print("sql:", sql)
self.df_asin = pd.read_sql(sql, con=self.engine_pg)
print(self.df_asin.shape)
print(self.df_asin.head())
def handle_data(self):
# 合并评论
df_asin_good = self.df_asin.loc[self.df_asin.rating.isin([4, 5])].groupby(['asin']).agg(
{"content": " ".join}).reset_index()
df_asin_bad = self.df_asin.loc[self.df_asin.rating.isin([1, 2])].groupby(['asin']).agg(
{"content": " ".join}).reset_index()
df_asin_neu = self.df_asin.loc[self.df_asin.rating.isin([3])].groupby(['asin']).agg(
{"content": " ".join}).reset_index()
df_asin_mix = self.df_asin.groupby(['asin']).agg({"content": " ".join}).reset_index()
df_asin_good['rating_level'] = '4~5'
df_asin_bad['rating_level'] = '1~2'
df_asin_neu['rating_level'] = '3'
df_asin_mix['rating_level'] = '1~5'
self.df_save = pd.concat([df_asin_mix, df_asin_good, df_asin_bad, df_asin_neu])
# self.df_save = self.df_asin.groupby(['asin']).agg({"content": " ".join}).reset_index()
# self.df_save['rating'] = '1~5'
# 清洗英文评论文本+提取关键词组+统计词频+词频占比
self.df_save[['phrases_list', 'phrases_str', 'phrases_counts_str', 'phrases_counts_rate_str']] = self.df_save.content.apply(self._hd_clean_and_extract).apply(pd.Series)
print(self.df_save.head())
print(self.df_save.shape, self.df_save.columns)
self._hd_emotional_analysis()
def _hd_emotional_analysis(self):
# 创建SentimentIntensityAnalyzer对象
sia = SentimentIntensityAnalyzer()
self.df_save['sentiment_score'] = self.df_save.content.apply(lambda x: sia.polarity_scores(x))
self.df_save['sentiment_score'] = self.df_save['sentiment_score'].astype("U")
def save_data(self):
print("开始存储评论分析的结果数据: 先删除已经存在的,再存储")
with self.engine_pg.begin() as conn:
sql = f"delete from {self.table_name_save} where asin='{self.asin}' and phrases_counts={self.phrases_counts}"
conn.execute(sql)
self.df_save = self.df_save.loc[:, ['asin', 'rating_level', 'phrases_str', 'phrases_counts_str', 'phrases_counts_rate_str', 'sentiment_score']]
self.df_save['phrases_counts'] = self.phrases_counts
print(self.df_save.head())
self.df_save.to_sql(self.table_name_save, con=self.engine_pg, if_exists="append", index=False)
def run(self, site_name, asin, phrases_counts):
while True:
try:
self.init_refresh(site_name=site_name, asin=asin, phrases_counts=phrases_counts)
# 先判断asin+phrases_counts是否近24h出现,如果出现直接结束
sql = f"select asin, phrases_counts from {self.table_name_save} where asin='{self.asin}' and phrases_counts={self.phrases_counts} and created_time >= now() - interval '24 hour';"
print("sql:", sql)
df_judge = pd.read_sql(sql, con=self.engine_pg)
if df_judge.shape[0] > 0:
# 如果数据库近24h内查询出来有记录,则退出
print("数据库近24h内查询出来有记录,则退出")
else:
self.read_data()
self.handle_data()
self.save_data()
break
except Exception as e:
print("error:", traceback.format_exc(e))
print("随机等待10-30s:", time.sleep(random.randint(10, 30)))
# self.engine_pg = self.class_db.db_connection(db_type="pg", db_node="h15", site_name=self.site_name)
self.engine_pg = create_engine_db(site_name=self.site_name)
continue
def get_comment_api(data):
import requests
import json
# url = 'http://192.168.2.77:5000/analyze_comments'
# url = 'http://127.0.0.1:5000/analyze_comments'
url = 'http://113.100.143.162:5000/analyze_comments'
# url = 'http://192.168.10.225:5000/analyze_comments'
# url = 'http://192.168.2.41:5000/analyze_comments'
# url = 'http://192.168.10.221:5000/analyze_comments'
headers = {'Content-type': 'application/json'}
# data = {'arg1': 'value1', 'arg2': 'value2'} # 示例
response = requests.post(url, headers=headers, data=json.dumps(data))
# {"stat{self.site_name}_code": 200, "message": "success"}.json()
print(response.json())
if __name__ == '__main__':
# handle_obj = CommentAnalytics(site_name='us', asin='B00CBAWIIY', phrases_counts=20) # 默认最多保留20个关键词
# handle_obj.run()
# site_name = sys.argv[1] # 参数1:站点
# asin = sys.argv[2] # 参数2:asin码
# phrases_counts = sys.argv[3] # 参数3:提取关键词的数量
# data = {"site_name": site_name, "asin": asin, "phrases_counts": phrases_counts}
data = {"site_name": 'us', "asin": 'B00061MWH2', "phrases_counts": '40'}
# data = {"site_name": 'us', "asin": 'B0C8BJL1D3', "phrases_counts": '40'}
get_comment_api(data=data)