import os import sys sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 from utils.templates_mysql import TemplatesMysql # from ..utils.templates_mysql import TemplatesMysql import time import datetime import pandas as pd class Judge(TemplatesMysql): def __init__(self, site_name='us', data_type=1): super().__init__() self.site_name = site_name # 1: 关键词, 2: asin详情, 3, 4: 店铺产品, 5, 6, 7: ao_val, # 11: last_week, 12: last_4_week, 13: asin_last_4_week, 14: st_month_年_月(每个月最后一周), 15: asin_st # 21: brand_analytics_week, 22: brand_analytics_month self.data_type = data_type self.engine = self.mysql_connect() self.year, self.week, self.week_day = datetime.datetime.now().isocalendar() self.current_date = time.strftime("%Y-%m-%d", time.localtime()) print("current_date:", self.current_date) print("site_name, year, week, week_day:", self.site_name, self.year, self.week, self.week_day) self.df_judge = pd.DataFrame() self.db_name = "selection" if self.site_name == "us" else f"selection_{self.site_name}" def read_workflow(self): # 1.1 data_type的值为1,2,4时, 监听workflow_crawling表的search_term和asin_detail和店铺产品抓取情况 table = "selection.workflow_crawling" # 1.2 data_type的值为7时, 监听workflow_exhibition表的ao_va计算情况 if self.data_type in [7]: table = "selection.workflow_exhibition" sql = f"select * from {table} WHERE week='{self.year}_{self.week}' and site_name='{self.site_name}' and data_type={self.data_type} and status=3;" print("sql_read:", sql) self.df_judge = pd.read_sql(sql=sql, con=self.engine) def judge(self): if self.data_type in [1, 2]: if self.data_type == 1: print_words = "关键词已经抓完, 结束判断, 进行下一步操作" else: print_words = "asin详情已经抓完, 结束判断, 进行下一步操作" elif self.data_type in [4]: print_words = "店铺产品已经抓完, 结束判断, 进行下一步操作" elif self.data_type in [7]: print_words = "关键词的ao_val已经计算完成, 结束判断, 进行下一步操作" else: print_words = "输入类型有误, 请重新检查" print(print_words) while True: if self.df_judge.shape[0] == 1: print("current_time:", time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) print("site_name, year, week:", self.site_name, self.year, self.week, print_words) break else: print("current_time:", time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) print("site_name, year, week:", self.site_name, self.year, self.week, "当前流程尚未完成,睡眠10min之后继续判断") time.sleep(600) self.read_workflow() continue def delete_and_rename_and_create_asin_last_4_week(self): num = 0 while True: num += 1 print("开始检查数据") sql_read = f"""SELECT min(id) as counts FROM {self.db_name}.{self.site_name}_asin_last_4_week_copy1 union SELECT max(id) as counts FROM {self.db_name}.{self.site_name}_asin_last_4_week_copy1;""" df = pd.read_sql(sql_read, con=self.engine) count = list(df.counts)[1] - list(df.counts)[0] + 1 print("copy表数据量:", count) if count > 1000000: print("3. 删除正式表,将copy表改成正式表") with self.engine.begin() as conn: sql_drop = f"drop table if exists {self.db_name}.{self.site_name}_asin_last_4_week;" sql_rename = f"ALTER TABLE {self.db_name}.{self.site_name}_asin_last_4_week_copy1 RENAME {self.db_name}.{self.site_name}_asin_last_4_week;" print("sql_drop:", sql_drop) conn.execute(sql_drop) print("sql_rename:", sql_rename) conn.execute(sql_rename) break else: if num > 3: print("检查次数超过3次异常,停止") break else: print(f"第{num}次检查异常,继续删除+改表面") continue # self.update_table_state(data_type=3) # last_4_week with self.engine.begin() as conn: sql_drop = f"drop table if exists {self.db_name}.{self.site_name}_asin_last_4_week_copy1;" print("sql_drop:", sql_drop) conn.execute(sql_drop) sql_create = f"""CREATE TABLE {self.db_name}.{self.site_name}_asin_last_4_week_copy1 ( 'id' int(11) NOT NULL AUTO_INCREMENT, 'asin' varchar(30) DEFAULT NULL, 'ao_val' decimal(10,3) DEFAULT '0.000' COMMENT 'SP广告词+品牌广告词+视频广告词)/ asin自然词总数', 'zr_counts' int(10) DEFAULT NULL COMMENT '搜索词对应的zr类型的asin总数', 'sp_counts' int(10) DEFAULT NULL COMMENT '搜索词对应的sp类型的asin总数', 'sb_counts' int(10) DEFAULT NULL COMMENT '搜索词对应的sb(底部、头部)类型的asin总数', 'vi_counts' int(10) DEFAULT NULL COMMENT '搜索词对应的视频词类型的asin总数', 'bs_counts' int(10) DEFAULT NULL COMMENT '搜索词对应的Best Seller词类型的asin总数', 'ac_counts' int(10) DEFAULT NULL COMMENT '搜索词对应的AC词类型的asin总数', 'tr_counts' int(10) DEFAULT NULL COMMENT '搜索词对应的TR词类型的asin总数', 'er_counts' int(10) DEFAULT NULL COMMENT '搜索词对应的ER类型的asin总数', 'bsr_orders' int(10) DEFAULT '0' COMMENT 'bsr销量(统一使用当前月该asin最后一周销量,不用求总和)', 'orders' int(10) DEFAULT NULL COMMENT '流量预估销量(统一使用当前月该asin最后一周销量,不用求总和)', 'sales' decimal(20,4) DEFAULT '0.0000' COMMENT 'bsr销量*asin价格=销售额', 'is_self' smallint(2) DEFAULT '1' COMMENT '是否为公司内部asin(1否,2是,3brand表没有抓到的asin)', 'pt_category_id' int(2) DEFAULT NULL COMMENT 'bsr该asin的上级分类id', 'one_category_id' int(2) DEFAULT NULL COMMENT 'bsr该asin的一级分类id', 'img_url' varchar(200) DEFAULT NULL COMMENT 'asin的URL', 'title' varchar(400) DEFAULT NULL COMMENT '标题', 'title_len' int(5) DEFAULT NULL COMMENT '标题长度', 'price' decimal(10,3) DEFAULT '0.000' COMMENT '价格', 'rating' decimal(10,1) DEFAULT NULL COMMENT '评分', 'total_comments' int(20) DEFAULT NULL COMMENT '评论数', 'buy_box_seller_type' double DEFAULT NULL COMMENT 'buybox卖家类型(1:Amazon,2:FBA,3FBM,4无BB卖家)', 'page_inventory' double DEFAULT NULL COMMENT '页面库存(1:充足,2:1-20(正常),3:缺货)', 'volume' varchar(30) DEFAULT NULL COMMENT '体积', 'weight' double DEFAULT NULL COMMENT '重量', 'rank' int(10) DEFAULT NULL COMMENT '排名', 'launch_time' date DEFAULT NULL COMMENT '上架时间', 'img_num' smallint(2) DEFAULT '0' COMMENT 'aisn的图片数量', 'img_type' varchar(10) DEFAULT NULL COMMENT '所有图片类型(1:普通图片,2:视频,3:A+图片)', 'activity_type' varchar(10) DEFAULT NULL COMMENT '1.coupon(with coupon、with coupon) 百分比 2.coupon(with coupon、with coupon) 金额 3.Join Prime(Prime Exclusive Discounts) 百分比 4.Join Prime(Prime Exclusive Discounts) 金额 5.Deal(Lightning Deal,7-day Deal,Deal OF The Day,Outlet deals,With Deal) 6.Top Deal(Top Deal) 7.Extra Savings(绑定促销,Extra Savings) 可允许该ASIN同时做两种促销', 'one_two_val' decimal(10,3) DEFAULT NULL COMMENT 'coupon额度 activity_type 1、2对应的值', 'three_four_val' decimal(10,3) DEFAULT NULL COMMENT 'Prime Exclusive Discounts额度 activity_type 3、4对应的值', 'five_six_val' decimal(10,3) DEFAULT NULL COMMENT 'Join Prime额度activity_type对应5,6的值', 'eight_val' decimal(10,0) DEFAULT NULL COMMENT '9:降低幅度的值 %', 'qa_num' int(10) DEFAULT NULL COMMENT 'QA数量', 'brand_name' varchar(50) DEFAULT NULL COMMENT '品牌名称', 'variation_num' int(10) DEFAULT NULL COMMENT '变体数量', 'one_star' int(2) DEFAULT NULL COMMENT '1星比例', 'two_star' int(2) DEFAULT NULL COMMENT '2星比例', 'three_star' int(2) DEFAULT NULL COMMENT '3星比例', 'four_star' int(2) DEFAULT NULL COMMENT '4星比例', 'five_star' int(2) DEFAULT NULL COMMENT '5星比例', 'low_star' int(2) DEFAULT NULL COMMENT 'one_star + two_star + three_star 比例总和', 'together_asin' varchar(255) DEFAULT NULL COMMENT '一起购买的asin,多个逗号拼接', 'account_name' varchar(255) DEFAULT NULL COMMENT '亚马逊卖家账号名称', 'account_id' bigint(2) DEFAULT NULL COMMENT '卖家账号id', 'site_name' varchar(100) DEFAULT NULL COMMENT '国家名称', 'updated_at' timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, 'created_at' timestamp NULL DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (id), UNIQUE KEY 'sel_asin' (asin) USING BTREE, KEY 'sel_updat' (updated_at) USING BTREE, KEY 'sel_ao' (ao_val) USING BTREE, KEY 'sel_is_self' (is_self) USING BTREE ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4;""" conn.execute(sql_create) print(f"site_name: {self.site_name}站点的_asin_last_4_week已经跑完,copy表已经建立") def delete_and_rename_and_create_st_last_4_week(self): num = 0 while True: num += 1 print("开始检查数据") sql_read = f"""SELECT min(id) as counts FROM {self.db_name}.{self.site_name}_last_4_week_st_copy1 union SELECT max(id) as counts FROM {self.db_name}.{self.site_name}_last_4_week_st_copy1;""" df = pd.read_sql(sql_read, con=self.engine) count = list(df.counts)[1] - list(df.counts)[0] + 1 print("copy表数据量:", count) if count > 1000000: print("3. 删除正式表,将copy表改成正式表") with self.engine.begin() as conn: sql_drop = f"drop table if exists {self.db_name}.{self.site_name}_last_4_week_st;" sql_rename = f"ALTER TABLE {self.db_name}.{self.site_name}_last_4_week_st_copy1 RENAME {self.db_name}.{self.site_name}_last_4_week_st;" print("sql_drop:", sql_drop) conn.execute(sql_drop) print("sql_rename:", sql_rename) conn.execute(sql_rename) break else: if num > 3: print("检查次数超过3次异常,停止") break else: print(f"第{num}次检查异常,继续删除+改表面") continue self.update_table_state(data_type=3) # last_4_week with self.engine.begin() as conn: sql_drop = f"drop table if exists {self.db_name}.{self.site_name}_last_4_week_st_copy1;" print("sql_drop:", sql_drop) conn.execute(sql_drop) sql_create = f"""CREATE TABLE {self.db_name}.{self.site_name}_last_4_week_st_copy1 ( id int(11) NOT NULL AUTO_INCREMENT, week int(10) NOT NULL COMMENT '周', asin varchar(30) NOT NULL, search_term varchar(500) NOT NULL COMMENT '搜索词', ao_val decimal(10,3) DEFAULT '0.000' COMMENT '该asin的搜索词的AO值:当前asin的(sp总数+sb总数+视频总数)/自然总数', zr_counts int(10) DEFAULT NULL COMMENT '搜索词对应的zr类型的asin总数', sp_counts int(10) DEFAULT NULL COMMENT '搜索词对应的sp类型的asin总数', orders int(20) DEFAULT '0' COMMENT '预估销量', orders_sum int(20) DEFAULT NULL COMMENT '总销量', flow double(10,4) DEFAULT NULL COMMENT '流量占比', search_num int(10) DEFAULT NULL COMMENT '搜索量', search_rank int(10) DEFAULT NULL COMMENT '搜索词排名', adv_compet int(1) DEFAULT '0' COMMENT '广告竞品数', compet int(1) DEFAULT '0' COMMENT '竞品数', quantity_being_sold bigint(10) DEFAULT NULL COMMENT '关键词搜索量总条数', zr_page_rank int(1) DEFAULT NULL COMMENT 'zr_词类型具体位置(所有页所有行的具体位置)', zr_page int(1) DEFAULT NULL COMMENT 'zr_词类型具体页', zr_page_row int(1) DEFAULT NULL COMMENT 'zr_词类型具体页面的行数', zr_updated_at timestamp NULL DEFAULT NULL COMMENT 'zr_词类型抓取时间', sp_page int(1) DEFAULT NULL COMMENT 'sp_词类型具体页', sp_page_rank int(1) DEFAULT NULL COMMENT 'sp_词类型具体位置(所有页所有行的具体位置)', sp_page_row int(1) DEFAULT NULL COMMENT 'sp_词类型具体页面的行数', sp_updated_at timestamp NULL DEFAULT NULL COMMENT 'sp_词类型抓取时间', sb1_page int(1) DEFAULT NULL COMMENT 'sb_词类型具体页(头部)', sb1_updated_at timestamp NULL DEFAULT NULL COMMENT 'sb_词类型抓取时间(头部)', sb2_page int(1) DEFAULT NULL COMMENT 'sb_词类型具体页(尾部)', sb2_updated_at timestamp NULL DEFAULT NULL COMMENT 'sb_词类型抓取时间(尾部)', sb3_page int(1) DEFAULT NULL COMMENT 'sb_词类型具体页(视频)', sb3_updated_at timestamp NULL DEFAULT NULL COMMENT 'sb_词类型抓取时间(视频)', ac_page int(1) DEFAULT NULL COMMENT 'ac_词类型具体页', ac_updated_at timestamp NULL DEFAULT NULL COMMENT 'ac_词类型抓取时间', bs_page int(1) DEFAULT NULL COMMENT 'bs_词类型具体页', bs_updated_at timestamp NULL DEFAULT NULL COMMENT 'bs_词类型抓取时间', er_page int(1) DEFAULT NULL COMMENT 'er_词类型具体页', er_updated_at timestamp NULL DEFAULT NULL COMMENT 'er_词类型抓取时间', tr_page int(1) DEFAULT NULL COMMENT 'tr_词类型具体页', tr_updated_at timestamp NULL DEFAULT NULL COMMENT 'tr_词类型抓取时间', search_term_type varchar(50) DEFAULT NULL COMMENT '搜索词类型(1.主要流量词;2.精准长尾词;3.精准流量词;4.转化优质词;5.转化平稳词;6.转化流失词;7.出单词;8.无效曝光词)', created_at timestamp NULL DEFAULT CURRENT_TIMESTAMP, updated_at timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (id), KEY sel_st (search_term), KEY sel_asin (asin), KEY sel_week (week) USING BTREE ) ENGINE=InnoDB AUTO_INCREMENT=1020421895 DEFAULT CHARSET=utf8mb4;""" conn.execute(sql_create) print(f"site_name: {self.site_name}站点的last_4_week_st已经跑完,copy表已经建立") def update_table_state(self, data_type=1): table = "selection.workflow_integration" with self.engine.begin() as conn: sql_update = f"update {table} set status=3 where week='{self.year}_{self.week}' and site_name='{self.site_name}' and data_type={data_type}" print("sql_update:", sql_update) conn.execute(sql_update) def delete_and_rename_and_create_seller_asin_account_detail(self): num = 0 while True: num += 1 print("开始检查数据") sql_read = f"""SELECT min(id) as counts FROM {self.db_name}.{self.site_name}_seller_asin_account_detail_copy1 union SELECT max(id) as counts FROM {self.db_name}.{self.site_name}_seller_asin_account_detail_copy1;""" df = pd.read_sql(sql_read, con=self.engine) count = list(df.counts)[1] - list(df.counts)[0] + 1 print("copy表数据量:", count) if count > 1000000: print("3. 删除正式表,将copy表改成正式表") with self.engine.begin() as conn: sql_drop = f"drop table if exists {self.db_name}.{self.site_name}_seller_asin_account_detail;" sql_rename = f"ALTER TABLE {self.db_name}.{self.site_name}_seller_asin_account_detail_copy1 RENAME {self.db_name}.{self.site_name}_seller_asin_account_detail;" print("sql_drop:", sql_drop) conn.execute(sql_drop) print("sql_rename:", sql_rename) conn.execute(sql_rename) break else: if num > 3: print("检查次数超过3次异常,停止") break else: print(f"第{num}次检查异常,继续删除+改表面") continue with self.engine.begin() as conn: sql_drop = f"drop table if exists {self.db_name}.{self.site_name}_seller_asin_account_detail_copy1;" print("sql_drop:", sql_drop) conn.execute(sql_drop) sql_create = f"""CREATE TABLE {self.db_name}.{self.site_name}_seller_asin_account_detail_copy1 ( id int(11) NOT NULL AUTO_INCREMENT, account_id int(11) NULL DEFAULT NULL COMMENT 'us_seller_account_syn表的 主键id', account_name varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '亚马逊卖家账号名称', asin varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL, launch_time date NULL DEFAULT NULL COMMENT '上架日期', days_diff int(10) NULL DEFAULT NULL COMMENT '距离计算日期的间隔天数', is_asin_new int(2) NULL DEFAULT NULL COMMENT '是否是新品(间隔天数<=180天为新品,否则为老品)0:老品,1:新品', created_at timestamp(0) NULL DEFAULT CURRENT_TIMESTAMP(0), updated_at timestamp(0) NULL DEFAULT CURRENT_TIMESTAMP(0) ON UPDATE CURRENT_TIMESTAMP(0), PRIMARY KEY (id) USING BTREE, INDEX sel_is_asin_new(account_name, is_asin_new) USING BTREE, INDEX asin(asin) USING BTREE, INDEX sel_account_id(account_id, is_asin_new) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 72102642 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;""" conn.execute(sql_create) print(f"site_name: {self.site_name}站点的_seller_asin_account_detail已经跑完,copy表已经建立") def run(self): if self.data_type in [1, 2, 4, 7]: self.read_workflow() self.judge() elif self.data_type in range(11, 21, 1): if self.data_type in [11]: self.update_table_state(data_type=2) # week_st print(f"site_name: {self.site_name}站点的week_st已经跑完") if self.data_type in [12]: self.delete_and_rename_and_create_st_last_4_week() if self.data_type in [13]: self.delete_and_rename_and_create_st_last_4_week() elif self.data_type in range(21, 31, 1): if self.data_type in [21]: self.delete_and_rename_and_create_seller_asin_account_detail() pass if __name__ == '__main__': site_name = sys.argv[1] # 参数1:站点 data_type = int(sys.argv[2]) # 参数1:站点 handle_obj = Judge(site_name=site_name, data_type=data_type) handle_obj.run()