comment_analytics.py 10.1 KB
Newer Older
chenyuanjie committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208
import random
import sys
# sys.path.append("/home/yswg/amazon_selection/")
import time
import traceback
import pandas as pd
import re
import nltk
# from utils.utils import Utils
from collections import Counter
from nltk.stem import WordNetLemmatizer
from nltk.corpus import stopwords
from nltk.sentiment import SentimentIntensityAnalyzer  # 情绪打分模型
from app.context import create_engine_db

pd.set_option('expand_frame_repr', False)  # 显示所有列
pd.set_option('display.max_colwidth', 100)  # 显示打印的字符数


class CommentAnalytics():

    def __init__(self, site_name='uk', asin='', phrases_counts=20):
        # super(CommentAnalytics, self).__init__()
        self.site_name = site_name
        self.asin = asin
        self.phrases_counts = phrases_counts
        # self.engine_pg = self.class_db.db_connection(db_type="pg", db_node="h15", site_name=self.site_name)
        self.engine_pg = create_engine_db(site_name=self.site_name)
        self.table_name_read = self.get_table_name_read_by_asin()
        self.table_name_save = f"{self.site_name}_asin_comment_analytics"
        self.df_asin = pd.DataFrame()
        self.df_save = pd.DataFrame()

    def init_refresh(self, site_name, asin, phrases_counts):
        self.site_name = site_name
        self.asin = asin
        self.phrases_counts = phrases_counts
        # self.engine_pg = self.class_db.db_connection(db_type="pg", db_node="h15", site_name=self.site_name)
        self.engine_pg = create_engine_db(site_name=self.site_name)
        self.table_name_read = self.get_table_name_read_by_asin()
        self.table_name_save = f"{self.site_name}_asin_comment_analytics"
        self.df_asin = pd.DataFrame()
        self.df_save = pd.DataFrame()

    # @staticmethod
    def _hd_clean_and_extract(self, text):
        # 去掉标点符号和表情包
        text = re.sub(r'[^\w\s]|_+', '', text)
        # 将文本转换成小写
        text = text.lower()
        # 加载停用词
        stop_words = set(stopwords.words('english'))
        # 将文本分词,并过滤停用词
        words = nltk.word_tokenize(text)
        words = [word for word in words if word not in stop_words]
        # 将词性标注统一为nltk库的标注方式
        pos_tags = []
        for word, tag in nltk.pos_tag(words):
            if tag.startswith('NN'):
                pos_tags.append((word, 'n'))
            elif tag.startswith('VB'):
                pos_tags.append((word, 'v'))
            elif tag.startswith('JJ'):
                pos_tags.append((word, 'a'))
            elif tag.startswith('R'):
                pos_tags.append((word, 'r'))
            else:
                pos_tags.append((word, None))
        # 使用WordNetLemmatizer进行单复数合并
        lemmatizer = WordNetLemmatizer()
        pos_tags = [(lemmatizer.lemmatize(word, pos) if pos else word, pos) for word, pos in pos_tags]
        # 使用nltk库的ngram函数进行短语提取,提取2-gram和3-gram
        phrases_list = []
        for i in range(2, 5):
            ngrams = nltk.ngrams(pos_tags, i)
            for ngram in ngrams:
                # 过滤掉不合法的短语,只保留名词短语和形容词短语
                if ngram[-1][1] == 'n' or ngram[-1][1] == 'a':
                    phrase = ' '.join([word[0] for word in ngram])
                    phrases_list.append(phrase)
        phrases_counter = Counter(phrases_list)
        # 按数量降序排序
        sorted_items = sorted(phrases_counter.items(), key=lambda x: x[1], reverse=True)
        keys_list, values_list = [], []
        for k, v in sorted_items:
            keys_list.append(k)
            values_list.append(str(v))
        if len(keys_list) >= self.phrases_counts:
            keys_list, values_list = keys_list[:self.phrases_counts], values_list[:self.phrases_counts]
        phrases_str = ",".join(keys_list)
        phrases_counts_str = ",".join(values_list)
        # 求和
        phrases_counts_total = sum(map(int, values_list))
        # 计算每个元素针对求和值的占比
        percentages = [(int(x) / phrases_counts_total) for x in values_list]
        # 将每个占比保留2位小数
        phrases_counts_rate_list = [str(round(x, 4)) for x in percentages]
        phrases_counts_rate_str = ",".join(phrases_counts_rate_list)
        # print("总计词频数量:", phrases_counts_total)
        # print("每个词组占总计词频的占比:", phrases_counts_rate_str)
        return phrases_list, phrases_str, phrases_counts_str, phrases_counts_rate_str

    def get_table_name_read_by_asin(self):
        if self.asin.lower()[:3] in ['b01', 'b02', 'b03', 'b04', 'b05', 'b06']:
            table_name_read = f'{self.site_name}_asin_comment_b01_b06'
        elif self.asin.lower()[:3] in ['b00', 'b07', 'b08', 'b09']:
            table_name_read = f'{self.site_name}_asin_comment_{self.asin.lower()[:3]}'
        else:
            table_name_read = f'{self.site_name}_asin_comment_other'
        print("asin, table_name_read:", self.asin, table_name_read)
        return table_name_read

    def read_data(self):
        sql = f"select asin, content, rating from {self.table_name_read} where asin='{self.asin}';"
        print("sql:", sql)
        self.df_asin = pd.read_sql(sql, con=self.engine_pg)
        print(self.df_asin.shape)
        print(self.df_asin.head())

    def handle_data(self):
        # 合并评论
        df_asin_good = self.df_asin.loc[self.df_asin.rating.isin([4, 5])].groupby(['asin']).agg(
            {"content": " ".join}).reset_index()
        df_asin_bad = self.df_asin.loc[self.df_asin.rating.isin([1, 2])].groupby(['asin']).agg(
            {"content": " ".join}).reset_index()
        df_asin_neu = self.df_asin.loc[self.df_asin.rating.isin([3])].groupby(['asin']).agg(
            {"content": " ".join}).reset_index()
        df_asin_mix = self.df_asin.groupby(['asin']).agg({"content": " ".join}).reset_index()
        df_asin_good['rating_level'] = '4~5'
        df_asin_bad['rating_level'] = '1~2'
        df_asin_neu['rating_level'] = '3'
        df_asin_mix['rating_level'] = '1~5'
        self.df_save = pd.concat([df_asin_mix, df_asin_good, df_asin_bad, df_asin_neu])
        # self.df_save = self.df_asin.groupby(['asin']).agg({"content": " ".join}).reset_index()
        # self.df_save['rating'] = '1~5'
        # 清洗英文评论文本+提取关键词组+统计词频+词频占比
        self.df_save[['phrases_list', 'phrases_str', 'phrases_counts_str', 'phrases_counts_rate_str']] = self.df_save.content.apply(self._hd_clean_and_extract).apply(pd.Series)
        print(self.df_save.head())
        print(self.df_save.shape, self.df_save.columns)
        self._hd_emotional_analysis()

    def _hd_emotional_analysis(self):
        # 创建SentimentIntensityAnalyzer对象
        sia = SentimentIntensityAnalyzer()
        self.df_save['sentiment_score'] = self.df_save.content.apply(lambda x: sia.polarity_scores(x))
        self.df_save['sentiment_score'] = self.df_save['sentiment_score'].astype("U")

    def save_data(self):
        print("开始存储评论分析的结果数据: 先删除已经存在的,再存储")
        with self.engine_pg.begin() as conn:
            sql = f"delete from {self.table_name_save} where asin='{self.asin}' and phrases_counts={self.phrases_counts}"
            conn.execute(sql)
        self.df_save = self.df_save.loc[:, ['asin', 'rating_level', 'phrases_str', 'phrases_counts_str', 'phrases_counts_rate_str', 'sentiment_score']]
        self.df_save['phrases_counts'] = self.phrases_counts
        print(self.df_save.head())
        self.df_save.to_sql(self.table_name_save, con=self.engine_pg, if_exists="append", index=False)

    def run(self, site_name, asin, phrases_counts):
        while True:
            try:
                self.init_refresh(site_name=site_name, asin=asin, phrases_counts=phrases_counts)
                # 先判断asin+phrases_counts是否近24h出现,如果出现直接结束
                sql = f"select asin, phrases_counts from {self.table_name_save} where asin='{self.asin}' and phrases_counts={self.phrases_counts} and created_time >= now() - interval '24 hour';"
                print("sql:", sql)
                df_judge = pd.read_sql(sql, con=self.engine_pg)
                if df_judge.shape[0] > 0:
                    # 如果数据库近24h内查询出来有记录,则退出
                    print("数据库近24h内查询出来有记录,则退出")
                else:
                    self.read_data()
                    self.handle_data()
                    self.save_data()
                break
            except Exception as e:
                print("error:", traceback.format_exc(e))
                print("随机等待10-30s:", time.sleep(random.randint(10, 30)))
                # self.engine_pg = self.class_db.db_connection(db_type="pg", db_node="h15", site_name=self.site_name)
                self.engine_pg = create_engine_db(site_name=self.site_name)
                continue


def get_comment_api(data):
    import requests
    import json
    # url = 'http://192.168.2.77:5000/analyze_comments'
    # url = 'http://127.0.0.1:5000/analyze_comments'
    url = 'http://113.100.143.162:5000/analyze_comments'
    # url = 'http://192.168.10.225:5000/analyze_comments'
    # url = 'http://192.168.2.41:5000/analyze_comments'
    # url = 'http://192.168.10.221:5000/analyze_comments'
    headers = {'Content-type': 'application/json'}
    # data = {'arg1': 'value1', 'arg2': 'value2'}  # 示例
    response = requests.post(url, headers=headers, data=json.dumps(data))
    # {"stat{self.site_name}_code": 200, "message": "success"}.json()
    print(response.json())


if __name__ == '__main__':
    # handle_obj = CommentAnalytics(site_name='us', asin='B00CBAWIIY', phrases_counts=20)  # 默认最多保留20个关键词
    # handle_obj.run()

    # site_name = sys.argv[1]  # 参数1:站点
    # asin = sys.argv[2]  # 参数2:asin码
    # phrases_counts = sys.argv[3]  # 参数3:提取关键词的数量
    # data = {"site_name": site_name, "asin": asin, "phrases_counts": phrases_counts}
    data = {"site_name": 'us', "asin": 'B00061MWH2', "phrases_counts": '40'}
    # data = {"site_name": 'us', "asin": 'B0C8BJL1D3', "phrases_counts": '40'}
    get_comment_api(data=data)