judge.py 18.9 KB
Newer Older
chenyuanjie committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318
import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录
from utils.templates_mysql import TemplatesMysql
# from ..utils.templates_mysql import TemplatesMysql
import time
import datetime
import pandas as pd


class Judge(TemplatesMysql):

    def __init__(self, site_name='us', data_type=1):
        super().__init__()
        self.site_name = site_name
        # 1: 关键词, 2: asin详情, 3, 4: 店铺产品, 5, 6, 7: ao_val,
        # 11: last_week, 12: last_4_week, 13: asin_last_4_week, 14: st_month_年_月(每个月最后一周), 15: asin_st
        # 21: brand_analytics_week, 22: brand_analytics_month
        self.data_type = data_type
        self.engine = self.mysql_connect()
        self.year, self.week, self.week_day = datetime.datetime.now().isocalendar()
        self.current_date = time.strftime("%Y-%m-%d", time.localtime())
        print("current_date:", self.current_date)
        print("site_name, year, week, week_day:", self.site_name, self.year, self.week, self.week_day)
        self.df_judge = pd.DataFrame()
        self.db_name = "selection" if self.site_name == "us" else f"selection_{self.site_name}"

    def read_workflow(self):
        # 1.1 data_type的值为1,2,4时, 监听workflow_crawling表的search_term和asin_detail和店铺产品抓取情况
        table = "selection.workflow_crawling"
        # 1.2 data_type的值为7时, 监听workflow_exhibition表的ao_va计算情况
        if self.data_type in [7]:
            table = "selection.workflow_exhibition"
        sql = f"select * from {table} WHERE week='{self.year}_{self.week}' and site_name='{self.site_name}' and data_type={self.data_type} and status=3;"
        print("sql_read:", sql)
        self.df_judge = pd.read_sql(sql=sql, con=self.engine)

    def judge(self):
        if self.data_type in [1, 2]:
            if self.data_type == 1:
                print_words = "关键词已经抓完, 结束判断, 进行下一步操作"
            else:
                print_words = "asin详情已经抓完, 结束判断, 进行下一步操作"
        elif self.data_type in [4]:
            print_words = "店铺产品已经抓完, 结束判断, 进行下一步操作"
        elif self.data_type in [7]:
            print_words = "关键词的ao_val已经计算完成, 结束判断, 进行下一步操作"
        else:
            print_words = "输入类型有误, 请重新检查"
            print(print_words)
        while True:
            if self.df_judge.shape[0] == 1:
                print("current_time:", time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
                print("site_name, year, week:", self.site_name, self.year, self.week, print_words)
                break
            else:
                print("current_time:", time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
                print("site_name, year, week:", self.site_name, self.year, self.week, "当前流程尚未完成,睡眠10min之后继续判断")
                time.sleep(600)
                self.read_workflow()
                continue

    def delete_and_rename_and_create_asin_last_4_week(self):
        num = 0
        while True:
            num += 1
            print("开始检查数据")
            sql_read = f"""SELECT min(id) as counts FROM {self.db_name}.{self.site_name}_asin_last_4_week_copy1 union
                    SELECT max(id) as counts FROM {self.db_name}.{self.site_name}_asin_last_4_week_copy1;"""
            df = pd.read_sql(sql_read, con=self.engine)
            count = list(df.counts)[1] - list(df.counts)[0] + 1
            print("copy表数据量:", count)
            if count > 1000000:
                print("3. 删除正式表,将copy表改成正式表")
                with self.engine.begin() as conn:
                    sql_drop = f"drop table if exists {self.db_name}.{self.site_name}_asin_last_4_week;"
                    sql_rename = f"ALTER TABLE {self.db_name}.{self.site_name}_asin_last_4_week_copy1 RENAME {self.db_name}.{self.site_name}_asin_last_4_week;"
                    print("sql_drop:", sql_drop)
                    conn.execute(sql_drop)
                    print("sql_rename:", sql_rename)
                    conn.execute(sql_rename)
                break
            else:
                if num > 3:
                    print("检查次数超过3次异常,停止")
                    break
                else:
                    print(f"第{num}次检查异常,继续删除+改表面")
                    continue
        # self.update_table_state(data_type=3)  # last_4_week
        with self.engine.begin() as conn:
            sql_drop = f"drop table if exists {self.db_name}.{self.site_name}_asin_last_4_week_copy1;"
            print("sql_drop:", sql_drop)
            conn.execute(sql_drop)
            sql_create = f"""CREATE TABLE {self.db_name}.{self.site_name}_asin_last_4_week_copy1 (
  'id' int(11) NOT NULL AUTO_INCREMENT,
  'asin' varchar(30) DEFAULT NULL,
  'ao_val' decimal(10,3) DEFAULT '0.000' COMMENT 'SP广告词+品牌广告词+视频广告词)/ asin自然词总数',
  'zr_counts' int(10) DEFAULT NULL COMMENT '搜索词对应的zr类型的asin总数',
  'sp_counts' int(10) DEFAULT NULL COMMENT '搜索词对应的sp类型的asin总数',
  'sb_counts' int(10) DEFAULT NULL COMMENT '搜索词对应的sb(底部、头部)类型的asin总数',
  'vi_counts' int(10) DEFAULT NULL COMMENT '搜索词对应的视频词类型的asin总数',
  'bs_counts' int(10) DEFAULT NULL COMMENT '搜索词对应的Best Seller词类型的asin总数',
  'ac_counts' int(10) DEFAULT NULL COMMENT '搜索词对应的AC词类型的asin总数',
  'tr_counts' int(10) DEFAULT NULL COMMENT '搜索词对应的TR词类型的asin总数',
  'er_counts' int(10) DEFAULT NULL COMMENT '搜索词对应的ER类型的asin总数',
  'bsr_orders' int(10) DEFAULT '0' COMMENT 'bsr销量(统一使用当前月该asin最后一周销量,不用求总和)',
  'orders' int(10) DEFAULT NULL COMMENT '流量预估销量(统一使用当前月该asin最后一周销量,不用求总和)',
  'sales' decimal(20,4) DEFAULT '0.0000' COMMENT 'bsr销量*asin价格=销售额',
  'is_self' smallint(2) DEFAULT '1' COMMENT '是否为公司内部asin(1否,2是,3brand表没有抓到的asin)',
  'pt_category_id' int(2) DEFAULT NULL COMMENT 'bsr该asin的上级分类id',
  'one_category_id' int(2) DEFAULT NULL COMMENT 'bsr该asin的一级分类id',
  'img_url' varchar(200) DEFAULT NULL COMMENT 'asin的URL',
  'title' varchar(400) DEFAULT NULL COMMENT '标题',
  'title_len' int(5) DEFAULT NULL COMMENT '标题长度',
  'price' decimal(10,3) DEFAULT '0.000' COMMENT '价格',
  'rating' decimal(10,1) DEFAULT NULL COMMENT '评分',
  'total_comments' int(20) DEFAULT NULL COMMENT '评论数',
  'buy_box_seller_type' double DEFAULT NULL COMMENT 'buybox卖家类型(1:Amazon,2:FBA,3FBM,4无BB卖家)',
  'page_inventory' double DEFAULT NULL COMMENT '页面库存(1:充足,2:1-20(正常),3:缺货)',
  'volume' varchar(30) DEFAULT NULL COMMENT '体积',
  'weight' double DEFAULT NULL COMMENT '重量',
  'rank' int(10) DEFAULT NULL COMMENT '排名',
  'launch_time' date DEFAULT NULL COMMENT '上架时间',
  'img_num' smallint(2) DEFAULT '0' COMMENT 'aisn的图片数量',
  'img_type' varchar(10) DEFAULT NULL COMMENT '所有图片类型(1:普通图片,2:视频,3:A+图片)',
  'activity_type' varchar(10) DEFAULT NULL COMMENT '1.coupon(with coupon、with coupon) 百分比 2.coupon(with coupon、with coupon) 金额 3.Join Prime(Prime Exclusive Discounts) 百分比 4.Join Prime(Prime Exclusive Discounts) 金额 5.Deal(Lightning Deal,7-day Deal,Deal OF The Day,Outlet deals,With  Deal)  6.Top Deal(Top Deal) 7.Extra Savings(绑定促销,Extra Savings)  可允许该ASIN同时做两种促销',
  'one_two_val' decimal(10,3) DEFAULT NULL COMMENT 'coupon额度 activity_type 1、2对应的值',
  'three_four_val' decimal(10,3) DEFAULT NULL COMMENT 'Prime Exclusive Discounts额度 activity_type 3、4对应的值',
  'five_six_val' decimal(10,3) DEFAULT NULL COMMENT 'Join Prime额度activity_type对应5,6的值',
  'eight_val' decimal(10,0) DEFAULT NULL COMMENT '9:降低幅度的值 %',
  'qa_num' int(10) DEFAULT NULL COMMENT 'QA数量',
  'brand_name' varchar(50) DEFAULT NULL COMMENT '品牌名称',
  'variation_num' int(10) DEFAULT NULL COMMENT '变体数量',
  'one_star' int(2) DEFAULT NULL COMMENT '1星比例',
  'two_star' int(2) DEFAULT NULL COMMENT '2星比例',
  'three_star' int(2) DEFAULT NULL COMMENT '3星比例',
  'four_star' int(2) DEFAULT NULL COMMENT '4星比例',
  'five_star' int(2) DEFAULT NULL COMMENT '5星比例',
  'low_star' int(2) DEFAULT NULL COMMENT 'one_star + two_star + three_star 比例总和',
  'together_asin' varchar(255) DEFAULT NULL COMMENT '一起购买的asin,多个逗号拼接',
  'account_name' varchar(255) DEFAULT NULL COMMENT '亚马逊卖家账号名称',
  'account_id' bigint(2) DEFAULT NULL COMMENT '卖家账号id',
  'site_name' varchar(100) DEFAULT NULL COMMENT '国家名称',
  'updated_at' timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  'created_at' timestamp NULL DEFAULT CURRENT_TIMESTAMP,
  PRIMARY KEY (id),
  UNIQUE KEY 'sel_asin' (asin) USING BTREE,
  KEY 'sel_updat' (updated_at) USING BTREE,
  KEY 'sel_ao' (ao_val) USING BTREE,
  KEY 'sel_is_self' (is_self) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4;"""
            conn.execute(sql_create)
        print(f"site_name: {self.site_name}站点的_asin_last_4_week已经跑完,copy表已经建立")

    def delete_and_rename_and_create_st_last_4_week(self):
        num = 0
        while True:
            num += 1
            print("开始检查数据")
            sql_read = f"""SELECT min(id) as counts FROM {self.db_name}.{self.site_name}_last_4_week_st_copy1 union
            SELECT max(id) as counts FROM {self.db_name}.{self.site_name}_last_4_week_st_copy1;"""
            df = pd.read_sql(sql_read, con=self.engine)
            count = list(df.counts)[1] - list(df.counts)[0] + 1
            print("copy表数据量:", count)
            if count > 1000000:
                print("3. 删除正式表,将copy表改成正式表")
                with self.engine.begin() as conn:
                    sql_drop = f"drop table if exists {self.db_name}.{self.site_name}_last_4_week_st;"
                    sql_rename = f"ALTER TABLE {self.db_name}.{self.site_name}_last_4_week_st_copy1 RENAME {self.db_name}.{self.site_name}_last_4_week_st;"
                    print("sql_drop:", sql_drop)
                    conn.execute(sql_drop)
                    print("sql_rename:", sql_rename)
                    conn.execute(sql_rename)
                break
            else:
                if num > 3:
                    print("检查次数超过3次异常,停止")
                    break
                else:
                    print(f"第{num}次检查异常,继续删除+改表面")
                    continue
        self.update_table_state(data_type=3)  # last_4_week
        with self.engine.begin() as conn:
            sql_drop = f"drop table if exists {self.db_name}.{self.site_name}_last_4_week_st_copy1;"
            print("sql_drop:", sql_drop)
            conn.execute(sql_drop)
            sql_create = f"""CREATE TABLE {self.db_name}.{self.site_name}_last_4_week_st_copy1 (
  id int(11) NOT NULL AUTO_INCREMENT,
  week int(10) NOT NULL COMMENT '周',
  asin varchar(30) NOT NULL,
  search_term varchar(500) NOT NULL COMMENT '搜索词',
  ao_val decimal(10,3) DEFAULT '0.000' COMMENT '该asin的搜索词的AO值:当前asin的(sp总数+sb总数+视频总数)/自然总数',
  zr_counts int(10) DEFAULT NULL COMMENT '搜索词对应的zr类型的asin总数',
  sp_counts int(10) DEFAULT NULL COMMENT '搜索词对应的sp类型的asin总数',
  orders int(20) DEFAULT '0' COMMENT '预估销量',
  orders_sum int(20) DEFAULT NULL COMMENT '总销量',
  flow double(10,4) DEFAULT NULL COMMENT '流量占比',
  search_num int(10) DEFAULT NULL COMMENT '搜索量',
  search_rank int(10) DEFAULT NULL COMMENT '搜索词排名',
  adv_compet int(1) DEFAULT '0' COMMENT '广告竞品数',
  compet int(1) DEFAULT '0' COMMENT '竞品数',
  quantity_being_sold bigint(10) DEFAULT NULL COMMENT '关键词搜索量总条数',
  zr_page_rank int(1) DEFAULT NULL COMMENT 'zr_词类型具体位置(所有页所有行的具体位置)',
  zr_page int(1) DEFAULT NULL COMMENT 'zr_词类型具体页',
  zr_page_row int(1) DEFAULT NULL COMMENT 'zr_词类型具体页面的行数',
  zr_updated_at timestamp NULL DEFAULT NULL COMMENT 'zr_词类型抓取时间',
  sp_page int(1) DEFAULT NULL COMMENT 'sp_词类型具体页',
  sp_page_rank int(1) DEFAULT NULL COMMENT 'sp_词类型具体位置(所有页所有行的具体位置)',
  sp_page_row int(1) DEFAULT NULL COMMENT 'sp_词类型具体页面的行数',
  sp_updated_at timestamp NULL DEFAULT NULL COMMENT 'sp_词类型抓取时间',
  sb1_page int(1) DEFAULT NULL COMMENT 'sb_词类型具体页(头部)',
  sb1_updated_at timestamp NULL DEFAULT NULL COMMENT 'sb_词类型抓取时间(头部)',
  sb2_page int(1) DEFAULT NULL COMMENT 'sb_词类型具体页(尾部)',
  sb2_updated_at timestamp NULL DEFAULT NULL COMMENT 'sb_词类型抓取时间(尾部)',
  sb3_page int(1) DEFAULT NULL COMMENT 'sb_词类型具体页(视频)',
  sb3_updated_at timestamp NULL DEFAULT NULL COMMENT 'sb_词类型抓取时间(视频)',
  ac_page int(1) DEFAULT NULL COMMENT 'ac_词类型具体页',
  ac_updated_at timestamp NULL DEFAULT NULL COMMENT 'ac_词类型抓取时间',
  bs_page int(1) DEFAULT NULL COMMENT 'bs_词类型具体页',
  bs_updated_at timestamp NULL DEFAULT NULL COMMENT 'bs_词类型抓取时间',
  er_page int(1) DEFAULT NULL COMMENT 'er_词类型具体页',
  er_updated_at timestamp NULL DEFAULT NULL COMMENT 'er_词类型抓取时间',
  tr_page int(1) DEFAULT NULL COMMENT 'tr_词类型具体页',
  tr_updated_at timestamp NULL DEFAULT NULL COMMENT 'tr_词类型抓取时间',
  search_term_type varchar(50) DEFAULT NULL COMMENT '搜索词类型(1.主要流量词;2.精准长尾词;3.精准流量词;4.转化优质词;5.转化平稳词;6.转化流失词;7.出单词;8.无效曝光词)',
  created_at timestamp NULL DEFAULT CURRENT_TIMESTAMP,
  updated_at timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (id),
  KEY sel_st (search_term),
  KEY sel_asin (asin),
  KEY sel_week (week) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=1020421895 DEFAULT CHARSET=utf8mb4;"""
            conn.execute(sql_create)
        print(f"site_name: {self.site_name}站点的last_4_week_st已经跑完,copy表已经建立")

    def update_table_state(self, data_type=1):
        table = "selection.workflow_integration"
        with self.engine.begin() as conn:
            sql_update = f"update {table} set status=3 where week='{self.year}_{self.week}' and site_name='{self.site_name}' and data_type={data_type}"
            print("sql_update:", sql_update)
            conn.execute(sql_update)

    def delete_and_rename_and_create_seller_asin_account_detail(self):
        num = 0
        while True:
            num += 1
            print("开始检查数据")
            sql_read = f"""SELECT min(id) as counts FROM {self.db_name}.{self.site_name}_seller_asin_account_detail_copy1 union
                            SELECT max(id) as counts FROM {self.db_name}.{self.site_name}_seller_asin_account_detail_copy1;"""
            df = pd.read_sql(sql_read, con=self.engine)
            count = list(df.counts)[1] - list(df.counts)[0] + 1
            print("copy表数据量:", count)
            if count > 1000000:
                print("3. 删除正式表,将copy表改成正式表")
                with self.engine.begin() as conn:
                    sql_drop = f"drop table if exists {self.db_name}.{self.site_name}_seller_asin_account_detail;"
                    sql_rename = f"ALTER TABLE {self.db_name}.{self.site_name}_seller_asin_account_detail_copy1 RENAME {self.db_name}.{self.site_name}_seller_asin_account_detail;"
                    print("sql_drop:", sql_drop)
                    conn.execute(sql_drop)
                    print("sql_rename:", sql_rename)
                    conn.execute(sql_rename)
                break
            else:
                if num > 3:
                    print("检查次数超过3次异常,停止")
                    break
                else:
                    print(f"第{num}次检查异常,继续删除+改表面")
                    continue
        with self.engine.begin() as conn:
            sql_drop = f"drop table if exists {self.db_name}.{self.site_name}_seller_asin_account_detail_copy1;"
            print("sql_drop:", sql_drop)
            conn.execute(sql_drop)
            sql_create = f"""CREATE TABLE {self.db_name}.{self.site_name}_seller_asin_account_detail_copy1 (
                  id int(11) NOT NULL AUTO_INCREMENT,
                  account_id int(11) NULL DEFAULT NULL COMMENT 'us_seller_account_syn表的 主键id',
                  account_name varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '亚马逊卖家账号名称',
                  asin varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,
                  launch_time date NULL DEFAULT NULL COMMENT '上架日期',
                  days_diff int(10) NULL DEFAULT NULL COMMENT '距离计算日期的间隔天数',
                  is_asin_new int(2) NULL DEFAULT NULL COMMENT '是否是新品(间隔天数<=180天为新品,否则为老品)0:老品,1:新品',
                  created_at timestamp(0) NULL DEFAULT CURRENT_TIMESTAMP(0),
                  updated_at timestamp(0) NULL DEFAULT CURRENT_TIMESTAMP(0) ON UPDATE CURRENT_TIMESTAMP(0),
                  PRIMARY KEY (id) USING BTREE,
                  INDEX sel_is_asin_new(account_name, is_asin_new) USING BTREE,
                  INDEX asin(asin) USING BTREE,
                  INDEX sel_account_id(account_id, is_asin_new) USING BTREE
                ) ENGINE = InnoDB AUTO_INCREMENT = 72102642 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;"""
            conn.execute(sql_create)
        print(f"site_name: {self.site_name}站点的_seller_asin_account_detail已经跑完,copy表已经建立")

    def run(self):
        if self.data_type in [1, 2, 4, 7]:
            self.read_workflow()
            self.judge()
        elif self.data_type in range(11, 21, 1):
            if self.data_type in [11]:
                self.update_table_state(data_type=2)  # week_st
                print(f"site_name: {self.site_name}站点的week_st已经跑完")
            if self.data_type in [12]:
                self.delete_and_rename_and_create_st_last_4_week()
            if self.data_type in [13]:
                self.delete_and_rename_and_create_st_last_4_week()
        elif self.data_type in range(21, 31, 1):
            if self.data_type in [21]:
                self.delete_and_rename_and_create_seller_asin_account_detail()
            pass


if __name__ == '__main__':
    site_name = sys.argv[1]  # 参数1:站点
    data_type = int(sys.argv[2])  # 参数1:站点
    handle_obj = Judge(site_name=site_name, data_type=data_type)
    handle_obj.run()