judge.py 18.9 KB
Newer Older
chenyuanjie committed

import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录
from utils.templates_mysql import TemplatesMysql
# from ..utils.templates_mysql import TemplatesMysql
import time
import datetime
import pandas as pd


class Judge(TemplatesMysql):

    def __init__(self, site_name='us', data_type=1):
        super().__init__()
        self.site_name = site_name
        # 1: 关键词, 2: asin详情, 3, 4: 店铺产品, 5, 6, 7: ao_val,
        # 11: last_week, 12: last_4_week, 13: asin_last_4_week, 14: st_month_年_月(每个月最后一周), 15: asin_st
        # 21: brand_analytics_week, 22: brand_analytics_month
        self.data_type = data_type
        self.engine = self.mysql_connect()
        self.year, self.week, self.week_day = datetime.datetime.now().isocalendar()
        self.current_date = time.strftime("%Y-%m-%d", time.localtime())
        print("current_date:", self.current_date)
        print("site_name, year, week, week_day:", self.site_name, self.year, self.week, self.week_day)
        self.df_judge = pd.DataFrame()
        self.db_name = "selection" if self.site_name == "us" else f"selection_{self.site_name}"

    def read_workflow(self):
        # 1.1 data_type的值为1,2,4时, 监听workflow_crawling表的search_term和asin_detail和店铺产品抓取情况
        table = "selection.workflow_crawling"
        # 1.2 data_type的值为7时, 监听workflow_exhibition表的ao_va计算情况
        if self.data_type in [7]:
            table = "selection.workflow_exhibition"
        sql = f"select * from {table} WHERE week='{self.year}_{self.week}' and site_name='{self.site_name}' and data_type={self.data_type} and status=3;"
        print("sql_read:", sql)
        self.df_judge = pd.read_sql(sql=sql, con=self.engine)

    def judge(self):
        if self.data_type in [1, 2]:
            if self.data_type == 1:
                print_words = "关键词已经抓完, 结束判断, 进行下一步操作"
            else:
                print_words = "asin详情已经抓完, 结束判断, 进行下一步操作"
        elif self.data_type in [4]:
            print_words = "店铺产品已经抓完, 结束判断, 进行下一步操作"
        elif self.data_type in [7]:
            print_words = "关键词的ao_val已经计算完成, 结束判断, 进行下一步操作"
        else:
            print_words = "输入类型有误, 请重新检查"
            print(print_words)
        while True:
            if self.df_judge.shape[0] == 1:
                print("current_time:", time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
                print("site_name, year, week:", self.site_name, self.year, self.week, print_words)
                break
            else:
                print("current_time:", time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
                print("site_name, year, week:", self.site_name, self.year, self.week, "当前流程尚未完成,睡眠10min之后继续判断")
                time.sleep(600)
                self.read_workflow()
                continue

    def delete_and_rename_and_create_asin_last_4_week(self):
        num = 0
        while True:
            num += 1
            print("开始检查数据")
            sql_read = f"""SELECT min(id) as counts FROM {self.db_name}.{self.site_name}_asin_last_4_week_copy1 union
                    SELECT max(id) as counts FROM {self.db_name}.{self.site_name}_asin_last_4_week_copy1;"""
            df = pd.read_sql(sql_read, con=self.engine)
            count = list(df.counts)[1] - list(df.counts)[0] + 1
            print("copy表数据量:", count)
            if count > 1000000:
                print("3. 删除正式表,将copy表改成正式表")
                with self.engine.begin() as conn:
                    sql_drop = f"drop table if exists {self.db_name}.{self.site_name}_asin_last_4_week;"
                    sql_rename = f"ALTER TABLE {self.db_name}.{self.site_name}_asin_last_4_week_copy1 RENAME {self.db_name}.{self.site_name}_asin_last_4_week;"
                    print("sql_drop:", sql_drop)
                    conn.execute(sql_drop)
                    print("sql_rename:", sql_rename)
                    conn.execute(sql_rename)
                break
            else:
                if num > 3:
                    print("检查次数超过3次异常,停止")
                    break
                else:
                    print(f"第{num}次检查异常,继续删除+改表面")
                    continue
        # self.update_table_state(data_type=3)  # last_4_week
        with self.engine.begin() as conn:
            sql_drop = f"drop table if exists {self.db_name}.{self.site_name}_asin_last_4_week_copy1;"
            print("sql_drop:", sql_drop)
            conn.execute(sql_drop)
            sql_create = f"""CREATE TABLE {self.db_name}.{self.site_name}_asin_last_4_week_copy1 (
  'id' int(11) NOT NULL AUTO_INCREMENT,
  'asin' varchar(30) DEFAULT NULL,
  'ao_val' decimal(10,3) DEFAULT '0.000' COMMENT 'SP广告词+品牌广告词+视频广告词)/ asin自然词总数',
  'zr_counts' int(10) DEFAULT NULL COMMENT '搜索词对应的zr类型的asin总数',
  'sp_counts' int(10) DEFAULT NULL COMMENT '搜索词对应的sp类型的asin总数',
  'sb_counts' int(10) DEFAULT NULL COMMENT '搜索词对应的sb(底部、头部)类型的asin总数',
  'vi_counts' int(10) DEFAULT NULL COMMENT '搜索词对应的视频词类型的asin总数',
  'bs_counts' int(10) DEFAULT NULL COMMENT '搜索词对应的Best Seller词类型的asin总数',
  'ac_counts' int(10) DEFAULT NULL COMMENT '搜索词对应的AC词类型的asin总数',
  'tr_counts' int(10) DEFAULT NULL COMMENT '搜索词对应的TR词类型的asin总数',
  'er_counts' int(10) DEFAULT NULL COMMENT '搜索词对应的ER类型的asin总数',
  'bsr_orders' int(10) DEFAULT '0' COMMENT 'bsr销量(统一使用当前月该asin最后一周销量,不用求总和)',
  'orders' int(10) DEFAULT NULL COMMENT '流量预估销量(统一使用当前月该asin最后一周销量,不用求总和)',
  'sales' decimal(20,4) DEFAULT '0.0000' COMMENT 'bsr销量*asin价格=销售额',
  'is_self' smallint(2) DEFAULT '1' COMMENT '是否为公司内部asin(1否,2是,3brand表没有抓到的asin)',
  'pt_category_id' int(2) DEFAULT NULL COMMENT 'bsr该asin的上级分类id',
  'one_category_id' int(2) DEFAULT NULL COMMENT 'bsr该asin的一级分类id',
  'img_url' varchar(200) DEFAULT NULL COMMENT 'asin的URL',
  'title' varchar(400) DEFAULT NULL COMMENT '标题',
  'title_len' int(5) DEFAULT NULL COMMENT '标题长度',
  'price' decimal(10,3) DEFAULT '0.000' COMMENT '价格',
  'rating' decimal(10,1) DEFAULT NULL COMMENT '评分',
  'total_comments' int(20) DEFAULT NULL COMMENT '评论数',
  'buy_box_seller_type' double DEFAULT NULL COMMENT 'buybox卖家类型(1:Amazon,2:FBA,3FBM,4无BB卖家)',
  'page_inventory' double DEFAULT NULL COMMENT '页面库存(1:充足,2:1-20(正常),3:缺货)',
  'volume' varchar(30) DEFAULT NULL COMMENT '体积',
  'weight' double DEFAULT NULL COMMENT '重量',
  'rank' int(10) DEFAULT NULL COMMENT '排名',
  'launch_time' date DEFAULT NULL COMMENT '上架时间',
  'img_num' smallint(2) DEFAULT '0' COMMENT 'aisn的图片数量',
  'img_type' varchar(10) DEFAULT NULL COMMENT '所有图片类型(1:普通图片,2:视频,3:A+图片)',
  'activity_type' varchar(10) DEFAULT NULL COMMENT '1.coupon(with coupon、with coupon) 百分比 2.coupon(with coupon、with coupon) 金额 3.Join Prime(Prime Exclusive Discounts) 百分比 4.Join Prime(Prime Exclusive Discounts) 金额 5.Deal(Lightning Deal,7-day Deal,Deal OF The Day,Outlet deals,With  Deal)  6.Top Deal(Top Deal) 7.Extra Savings(绑定促销,Extra Savings)  可允许该ASIN同时做两种促销',
  'one_two_val' decimal(10,3) DEFAULT NULL COMMENT 'coupon额度 activity_type 1、2对应的值',
  'three_four_val' decimal(10,3) DEFAULT NULL COMMENT 'Prime Exclusive Discounts额度 activity_type 3、4对应的值',
  'five_six_val' decimal(10,3) DEFAULT NULL COMMENT 'Join Prime额度activity_type对应5,6的值',
  'eight_val' decimal(10,0) DEFAULT NULL COMMENT '9:降低幅度的值 %',
  'qa_num' int(10) DEFAULT NULL COMMENT 'QA数量',
  'brand_name' varchar(50) DEFAULT NULL COMMENT '品牌名称',
  'variation_num' int(10) DEFAULT NULL COMMENT '变体数量',
  'one_star' int(2) DEFAULT NULL COMMENT '1星比例',
  'two_star' int(2) DEFAULT NULL COMMENT '2星比例',
  'three_star' int(2) DEFAULT NULL COMMENT '3星比例',
  'four_star' int(2) DEFAULT NULL COMMENT '4星比例',
  'five_star' int(2) DEFAULT NULL COMMENT '5星比例',
  'low_star' int(2) DEFAULT NULL COMMENT 'one_star + two_star + three_star 比例总和',
  'together_asin' varchar(255) DEFAULT NULL COMMENT '一起购买的asin,多个逗号拼接',
  'account_name' varchar(255) DEFAULT NULL COMMENT '亚马逊卖家账号名称',
  'account_id' bigint(2) DEFAULT NULL COMMENT '卖家账号id',
  'site_name' varchar(100) DEFAULT NULL COMMENT '国家名称',
  'updated_at' timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  'created_at' timestamp NULL DEFAULT CURRENT_TIMESTAMP,
  PRIMARY KEY (id),
  UNIQUE KEY 'sel_asin' (asin) USING BTREE,
  KEY 'sel_updat' (updated_at) USING BTREE,
  KEY 'sel_ao' (ao_val) USING BTREE,
  KEY 'sel_is_self' (is_self) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4;"""
            conn.execute(sql_create)
        print(f"site_name: {self.site_name}站点的_asin_last_4_week已经跑完,copy表已经建立")

    def delete_and_rename_and_create_st_last_4_week(self):
        num = 0
        while True:
            num += 1
            print("开始检查数据")
            sql_read = f"""SELECT min(id) as counts FROM {self.db_name}.{self.site_name}_last_4_week_st_copy1 union
            SELECT max(id) as counts FROM {self.db_name}.{self.site_name}_last_4_week_st_copy1;"""
            df = pd.read_sql(sql_read, con=self.engine)
            count = list(df.counts)[1] - list(df.counts)[0] + 1
            print("copy表数据量:", count)
            if count > 1000000:
                print("3. 删除正式表,将copy表改成正式表")
                with self.engine.begin() as conn:
                    sql_drop = f"drop table if exists {self.db_name}.{self.site_name}_last_4_week_st;"
                    sql_rename = f"ALTER TABLE {self.db_name}.{self.site_name}_last_4_week_st_copy1 RENAME {self.db_name}.{self.site_name}_last_4_week_st;"
                    print("sql_drop:", sql_drop)
                    conn.execute(sql_drop)
                    print("sql_rename:", sql_rename)
                    conn.execute(sql_rename)
                break
            else:
                if num > 3:
                    print("检查次数超过3次异常,停止")
                    break
                else:
                    print(f"第{num}次检查异常,继续删除+改表面")
                    continue
        self.update_table_state(data_type=3)  # last_4_week
        with self.engine.begin() as conn:
            sql_drop = f"drop table if exists {self.db_name}.{self.site_name}_last_4_week_st_copy1;"
            print("sql_drop:", sql_drop)
            conn.execute(sql_drop)
            sql_create = f"""CREATE TABLE {self.db_name}.{self.site_name}_last_4_week_st_copy1 (
  id int(11) NOT NULL AUTO_INCREMENT,
  week int(10) NOT NULL COMMENT '周',
  asin varchar(30) NOT NULL,
  search_term varchar(500) NOT NULL COMMENT '搜索词',
  ao_val decimal(10,3) DEFAULT '0.000' COMMENT '该asin的搜索词的AO值:当前asin的(sp总数+sb总数+视频总数)/自然总数',
  zr_counts int(10) DEFAULT NULL COMMENT '搜索词对应的zr类型的asin总数',
  sp_counts int(10) DEFAULT NULL COMMENT '搜索词对应的sp类型的asin总数',
  orders int(20) DEFAULT '0' COMMENT '预估销量',
  orders_sum int(20) DEFAULT NULL COMMENT '总销量',
  flow double(10,4) DEFAULT NULL COMMENT '流量占比',
  search_num int(10) DEFAULT NULL COMMENT '搜索量',
  search_rank int(10) DEFAULT NULL COMMENT '搜索词排名',
  adv_compet int(1) DEFAULT '0' COMMENT '广告竞品数',
  compet int(1) DEFAULT '0' COMMENT '竞品数',
  quantity_being_sold bigint(10) DEFAULT NULL COMMENT '关键词搜索量总条数',
  zr_page_rank int(1) DEFAULT NULL COMMENT 'zr_词类型具体位置(所有页所有行的具体位置)',
  zr_page int(1) DEFAULT NULL COMMENT 'zr_词类型具体页',
  zr_page_row int(1) DEFAULT NULL COMMENT 'zr_词类型具体页面的行数',
  zr_updated_at timestamp NULL DEFAULT NULL COMMENT 'zr_词类型抓取时间',
  sp_page int(1) DEFAULT NULL COMMENT 'sp_词类型具体页',
  sp_page_rank int(1) DEFAULT NULL COMMENT 'sp_词类型具体位置(所有页所有行的具体位置)',
  sp_page_row int(1) DEFAULT NULL COMMENT 'sp_词类型具体页面的行数',
  sp_updated_at timestamp NULL DEFAULT NULL COMMENT 'sp_词类型抓取时间',
  sb1_page int(1) DEFAULT NULL COMMENT 'sb_词类型具体页(头部)',
  sb1_updated_at timestamp NULL DEFAULT NULL COMMENT 'sb_词类型抓取时间(头部)',
  sb2_page int(1) DEFAULT NULL COMMENT 'sb_词类型具体页(尾部)',
  sb2_updated_at timestamp NULL DEFAULT NULL COMMENT 'sb_词类型抓取时间(尾部)',
  sb3_page int(1) DEFAULT NULL COMMENT 'sb_词类型具体页(视频)',
  sb3_updated_at timestamp NULL DEFAULT NULL COMMENT 'sb_词类型抓取时间(视频)',
  ac_page int(1) DEFAULT NULL COMMENT 'ac_词类型具体页',
  ac_updated_at timestamp NULL DEFAULT NULL COMMENT 'ac_词类型抓取时间',
  bs_page int(1) DEFAULT NULL COMMENT 'bs_词类型具体页',
  bs_updated_at timestamp NULL DEFAULT NULL COMMENT 'bs_词类型抓取时间',
  er_page int(1) DEFAULT NULL COMMENT 'er_词类型具体页',
  er_updated_at timestamp NULL DEFAULT NULL COMMENT 'er_词类型抓取时间',
  tr_page int(1) DEFAULT NULL COMMENT 'tr_词类型具体页',
  tr_updated_at timestamp NULL DEFAULT NULL COMMENT 'tr_词类型抓取时间',
  search_term_type varchar(50) DEFAULT NULL COMMENT '搜索词类型(1.主要流量词;2.精准长尾词;3.精准流量词;4.转化优质词;5.转化平稳词;6.转化流失词;7.出单词;8.无效曝光词)',
  created_at timestamp NULL DEFAULT CURRENT_TIMESTAMP,
  updated_at timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (id),
  KEY sel_st (search_term),
  KEY sel_asin (asin),
  KEY sel_week (week) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=1020421895 DEFAULT CHARSET=utf8mb4;"""
            conn.execute(sql_create)
        print(f"site_name: {self.site_name}站点的last_4_week_st已经跑完,copy表已经建立")

    def update_table_state(self, data_type=1):
        table = "selection.workflow_integration"
        with self.engine.begin() as conn:
            sql_update = f"update {table} set status=3 where week='{self.year}_{self.week}' and site_name='{self.site_name}' and data_type={data_type}"
            print("sql_update:", sql_update)
            conn.execute(sql_update)

    def delete_and_rename_and_create_seller_asin_account_detail(self):
        num = 0
        while True:
            num += 1
            print("开始检查数据")
            sql_read = f"""SELECT min(id) as counts FROM {self.db_name}.{self.site_name}_seller_asin_account_detail_copy1 union
                            SELECT max(id) as counts FROM {self.db_name}.{self.site_name}_seller_asin_account_detail_copy1;"""
            df = pd.read_sql(sql_read, con=self.engine)
            count = list(df.counts)[1] - list(df.counts)[0] + 1
            print("copy表数据量:", count)
            if count > 1000000:
                print("3. 删除正式表,将copy表改成正式表")
                with self.engine.begin() as conn:
                    sql_drop = f"drop table if exists {self.db_name}.{self.site_name}_seller_asin_account_detail;"
                    sql_rename = f"ALTER TABLE {self.db_name}.{self.site_name}_seller_asin_account_detail_copy1 RENAME {self.db_name}.{self.site_name}_seller_asin_account_detail;"
                    print("sql_drop:", sql_drop)
                    conn.execute(sql_drop)
                    print("sql_rename:", sql_rename)
                    conn.execute(sql_rename)
                break
            else:
                if num > 3:
                    print("检查次数超过3次异常,停止")
                    break
                else:
                    print(f"第{num}次检查异常,继续删除+改表面")
                    continue
        with self.engine.begin() as conn:
            sql_drop = f"drop table if exists {self.db_name}.{self.site_name}_seller_asin_account_detail_copy1;"
            print("sql_drop:", sql_drop)
            conn.execute(sql_drop)
            sql_create = f"""CREATE TABLE {self.db_name}.{self.site_name}_seller_asin_account_detail_copy1 (
                  id int(11) NOT NULL AUTO_INCREMENT,
                  account_id int(11) NULL DEFAULT NULL COMMENT 'us_seller_account_syn表的 主键id',
                  account_name varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '亚马逊卖家账号名称',
                  asin varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,
                  launch_time date NULL DEFAULT NULL COMMENT '上架日期',
                  days_diff int(10) NULL DEFAULT NULL COMMENT '距离计算日期的间隔天数',
                  is_asin_new int(2) NULL DEFAULT NULL COMMENT '是否是新品(间隔天数<=180天为新品,否则为老品)0:老品,1:新品',
                  created_at timestamp(0) NULL DEFAULT CURRENT_TIMESTAMP(0),
                  updated_at timestamp(0) NULL DEFAULT CURRENT_TIMESTAMP(0) ON UPDATE CURRENT_TIMESTAMP(0),
                  PRIMARY KEY (id) USING BTREE,
                  INDEX sel_is_asin_new(account_name, is_asin_new) USING BTREE,
                  INDEX asin(asin) USING BTREE,
                  INDEX sel_account_id(account_id, is_asin_new) USING BTREE
                ) ENGINE = InnoDB AUTO_INCREMENT = 72102642 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;"""
            conn.execute(sql_create)
        print(f"site_name: {self.site_name}站点的_seller_asin_account_detail已经跑完,copy表已经建立")

    def run(self):
        if self.data_type in [1, 2, 4, 7]:
            self.read_workflow()
            self.judge()
        elif self.data_type in range(11, 21, 1):
            if self.data_type in [11]:
                self.update_table_state(data_type=2)  # week_st
                print(f"site_name: {self.site_name}站点的week_st已经跑完")
            if self.data_type in [12]:
                self.delete_and_rename_and_create_st_last_4_week()
            if self.data_type in [13]:
                self.delete_and_rename_and_create_st_last_4_week()
        elif self.data_type in range(21, 31, 1):
            if self.data_type in [21]:
                self.delete_and_rename_and_create_seller_asin_account_detail()
            pass


if __name__ == '__main__':
    site_name = sys.argv[1]  # 参数1:站点
    data_type = int(sys.argv[2])  # 参数1:站点
    handle_obj = Judge(site_name=site_name, data_type=data_type)
    handle_obj.run()