import random import sys # sys.path.append("/home/yswg/amazon_selection/") import time import traceback import pandas as pd import re import nltk # from utils.utils import Utils from collections import Counter from nltk.stem import WordNetLemmatizer from nltk.corpus import stopwords from nltk.sentiment import SentimentIntensityAnalyzer # 情绪打分模型 from app.context import create_engine_db pd.set_option('expand_frame_repr', False) # 显示所有列 pd.set_option('display.max_colwidth', 100) # 显示打印的字符数 class CommentAnalytics(): def __init__(self, site_name='uk', asin='', phrases_counts=20): # super(CommentAnalytics, self).__init__() self.site_name = site_name self.asin = asin self.phrases_counts = phrases_counts # self.engine_pg = self.class_db.db_connection(db_type="pg", db_node="h15", site_name=self.site_name) self.engine_pg = create_engine_db(site_name=self.site_name) self.table_name_read = self.get_table_name_read_by_asin() self.table_name_save = f"{self.site_name}_asin_comment_analytics" self.df_asin = pd.DataFrame() self.df_save = pd.DataFrame() def init_refresh(self, site_name, asin, phrases_counts): self.site_name = site_name self.asin = asin self.phrases_counts = phrases_counts # self.engine_pg = self.class_db.db_connection(db_type="pg", db_node="h15", site_name=self.site_name) self.engine_pg = create_engine_db(site_name=self.site_name) self.table_name_read = self.get_table_name_read_by_asin() self.table_name_save = f"{self.site_name}_asin_comment_analytics" self.df_asin = pd.DataFrame() self.df_save = pd.DataFrame() # @staticmethod def _hd_clean_and_extract(self, text): # 去掉标点符号和表情包 text = re.sub(r'[^\w\s]|_+', '', text) # 将文本转换成小写 text = text.lower() # 加载停用词 stop_words = set(stopwords.words('english')) # 将文本分词,并过滤停用词 words = nltk.word_tokenize(text) words = [word for word in words if word not in stop_words] # 将词性标注统一为nltk库的标注方式 pos_tags = [] for word, tag in nltk.pos_tag(words): if tag.startswith('NN'): pos_tags.append((word, 'n')) elif tag.startswith('VB'): pos_tags.append((word, 'v')) elif tag.startswith('JJ'): pos_tags.append((word, 'a')) elif tag.startswith('R'): pos_tags.append((word, 'r')) else: pos_tags.append((word, None)) # 使用WordNetLemmatizer进行单复数合并 lemmatizer = WordNetLemmatizer() pos_tags = [(lemmatizer.lemmatize(word, pos) if pos else word, pos) for word, pos in pos_tags] # 使用nltk库的ngram函数进行短语提取,提取2-gram和3-gram phrases_list = [] for i in range(2, 5): ngrams = nltk.ngrams(pos_tags, i) for ngram in ngrams: # 过滤掉不合法的短语,只保留名词短语和形容词短语 if ngram[-1][1] == 'n' or ngram[-1][1] == 'a': phrase = ' '.join([word[0] for word in ngram]) phrases_list.append(phrase) phrases_counter = Counter(phrases_list) # 按数量降序排序 sorted_items = sorted(phrases_counter.items(), key=lambda x: x[1], reverse=True) keys_list, values_list = [], [] for k, v in sorted_items: keys_list.append(k) values_list.append(str(v)) if len(keys_list) >= self.phrases_counts: keys_list, values_list = keys_list[:self.phrases_counts], values_list[:self.phrases_counts] phrases_str = ",".join(keys_list) phrases_counts_str = ",".join(values_list) # 求和 phrases_counts_total = sum(map(int, values_list)) # 计算每个元素针对求和值的占比 percentages = [(int(x) / phrases_counts_total) for x in values_list] # 将每个占比保留2位小数 phrases_counts_rate_list = [str(round(x, 4)) for x in percentages] phrases_counts_rate_str = ",".join(phrases_counts_rate_list) # print("总计词频数量:", phrases_counts_total) # print("每个词组占总计词频的占比:", phrases_counts_rate_str) return phrases_list, phrases_str, phrases_counts_str, phrases_counts_rate_str def get_table_name_read_by_asin(self): if self.asin.lower()[:3] in ['b01', 'b02', 'b03', 'b04', 'b05', 'b06']: table_name_read = f'{self.site_name}_asin_comment_b01_b06' elif self.asin.lower()[:3] in ['b00', 'b07', 'b08', 'b09']: table_name_read = f'{self.site_name}_asin_comment_{self.asin.lower()[:3]}' else: table_name_read = f'{self.site_name}_asin_comment_other' print("asin, table_name_read:", self.asin, table_name_read) return table_name_read def read_data(self): sql = f"select asin, content, rating from {self.table_name_read} where asin='{self.asin}';" print("sql:", sql) self.df_asin = pd.read_sql(sql, con=self.engine_pg) print(self.df_asin.shape) print(self.df_asin.head()) def handle_data(self): # 合并评论 df_asin_good = self.df_asin.loc[self.df_asin.rating.isin([4, 5])].groupby(['asin']).agg( {"content": " ".join}).reset_index() df_asin_bad = self.df_asin.loc[self.df_asin.rating.isin([1, 2])].groupby(['asin']).agg( {"content": " ".join}).reset_index() df_asin_neu = self.df_asin.loc[self.df_asin.rating.isin([3])].groupby(['asin']).agg( {"content": " ".join}).reset_index() df_asin_mix = self.df_asin.groupby(['asin']).agg({"content": " ".join}).reset_index() df_asin_good['rating_level'] = '4~5' df_asin_bad['rating_level'] = '1~2' df_asin_neu['rating_level'] = '3' df_asin_mix['rating_level'] = '1~5' self.df_save = pd.concat([df_asin_mix, df_asin_good, df_asin_bad, df_asin_neu]) # self.df_save = self.df_asin.groupby(['asin']).agg({"content": " ".join}).reset_index() # self.df_save['rating'] = '1~5' # 清洗英文评论文本+提取关键词组+统计词频+词频占比 self.df_save[['phrases_list', 'phrases_str', 'phrases_counts_str', 'phrases_counts_rate_str']] = self.df_save.content.apply(self._hd_clean_and_extract).apply(pd.Series) print(self.df_save.head()) print(self.df_save.shape, self.df_save.columns) self._hd_emotional_analysis() def _hd_emotional_analysis(self): # 创建SentimentIntensityAnalyzer对象 sia = SentimentIntensityAnalyzer() self.df_save['sentiment_score'] = self.df_save.content.apply(lambda x: sia.polarity_scores(x)) self.df_save['sentiment_score'] = self.df_save['sentiment_score'].astype("U") def save_data(self): print("开始存储评论分析的结果数据: 先删除已经存在的,再存储") with self.engine_pg.begin() as conn: sql = f"delete from {self.table_name_save} where asin='{self.asin}' and phrases_counts={self.phrases_counts}" conn.execute(sql) self.df_save = self.df_save.loc[:, ['asin', 'rating_level', 'phrases_str', 'phrases_counts_str', 'phrases_counts_rate_str', 'sentiment_score']] self.df_save['phrases_counts'] = self.phrases_counts print(self.df_save.head()) self.df_save.to_sql(self.table_name_save, con=self.engine_pg, if_exists="append", index=False) def run(self, site_name, asin, phrases_counts): while True: try: self.init_refresh(site_name=site_name, asin=asin, phrases_counts=phrases_counts) # 先判断asin+phrases_counts是否近24h出现,如果出现直接结束 sql = f"select asin, phrases_counts from {self.table_name_save} where asin='{self.asin}' and phrases_counts={self.phrases_counts} and created_time >= now() - interval '24 hour';" print("sql:", sql) df_judge = pd.read_sql(sql, con=self.engine_pg) if df_judge.shape[0] > 0: # 如果数据库近24h内查询出来有记录,则退出 print("数据库近24h内查询出来有记录,则退出") else: self.read_data() self.handle_data() self.save_data() break except Exception as e: print("error:", traceback.format_exc(e)) print("随机等待10-30s:", time.sleep(random.randint(10, 30))) # self.engine_pg = self.class_db.db_connection(db_type="pg", db_node="h15", site_name=self.site_name) self.engine_pg = create_engine_db(site_name=self.site_name) continue def get_comment_api(data): import requests import json # url = 'http://192.168.2.77:5000/analyze_comments' # url = 'http://127.0.0.1:5000/analyze_comments' url = 'http://113.100.143.162:5000/analyze_comments' # url = 'http://192.168.10.225:5000/analyze_comments' # url = 'http://192.168.2.41:5000/analyze_comments' # url = 'http://192.168.10.221:5000/analyze_comments' headers = {'Content-type': 'application/json'} # data = {'arg1': 'value1', 'arg2': 'value2'} # 示例 response = requests.post(url, headers=headers, data=json.dumps(data)) # {"stat{self.site_name}_code": 200, "message": "success"}.json() print(response.json()) if __name__ == '__main__': # handle_obj = CommentAnalytics(site_name='us', asin='B00CBAWIIY', phrases_counts=20) # 默认最多保留20个关键词 # handle_obj.run() # site_name = sys.argv[1] # 参数1:站点 # asin = sys.argv[2] # 参数2:asin码 # phrases_counts = sys.argv[3] # 参数3:提取关键词的数量 # data = {"site_name": site_name, "asin": asin, "phrases_counts": phrases_counts} data = {"site_name": 'us', "asin": 'B00061MWH2', "phrases_counts": '40'} # data = {"site_name": 'us', "asin": 'B0C8BJL1D3', "phrases_counts": '40'} get_comment_api(data=data)