Commit 5081bd47 by fangxingjun

no message

parent 6d450d6e
...@@ -4,30 +4,45 @@ sys.path.append(os.path.dirname(sys.path[0])) ...@@ -4,30 +4,45 @@ sys.path.append(os.path.dirname(sys.path[0]))
from utils.secure_db_client import get_remote_engine from utils.secure_db_client import get_remote_engine
engine = get_remote_engine( def export_data(site_name, date_type, date_info):
site_name="us", # -> database "selection" engine = get_remote_engine(
db_type="mysql", # -> 服务端 alias "mysql" site_name="us", # -> database "selection"
# user="fangxingjun", # -> 服务端 alias "mysql" db_type="mysql", # -> 服务端 alias "mysql"
# user_token="5f1b2e9c3a4d7f60" # 可不传,走默认 # user="fangxingjun", # -> 服务端 alias "mysql"
) # user_token="5f1b2e9c3a4d7f60" # 可不传,走默认
)
partitions = {
'site_name': site_name,
'date_type': date_type,
'date_info': date_info,
}
cols_list = ['asin', 'rating_and_comments_info', 'date_info']
import_table = f'{site_name}_self_rating_180day_copy'
target_table = f'{site_name}_self_rating_180day'
sql_drop = f"drop table if exists {import_table};"
sql_create = f"create table {import_table} like {target_table};"
engine.execute(sql_drop)
engine.execute(sql_create)
print(f"sql_drop: {sql_drop}")
print(f"sql_create: {sql_create}")
engine.sqoop_raw_export(
hive_table='dwt_self_asin_detail_rating',
import_table=import_table,
partitions=partitions,
m=1,
cols=','.join(cols_list)
)
# engine.swap_tables(
# source_table=import_table,
# target_table=target_table
# )
# site_name = 'us'
# date_type = 'month'
# date_info = '2026-01'
site_name = sys.argv[1] # 参数1:站点
date_type = sys.argv[2] # 参数2:类型:week/4_week/month/quarter/day
date_info = sys.argv[3] # 参数3:年-周/年-月/年-季/年-月-日, 比如: 2022-1
partitions = {
'site_name': site_name,
'date_type': date_type,
'date_info': date_info,
}
cols_list = ['asin', 'rating_and_comments_info', 'date_info']
engine.sqoop_raw_export(
hive_table='dwt_self_asin_detail_rating',
import_table=f'{site_name}_self_rating_180day',
partitions=partitions,
m=1,
cols=','.join(cols_list)
)
if __name__ == '__main__':
# site_name = 'us'
# date_type = 'month'
# date_info = '2026-01'
site_name = sys.argv[1] # 参数1:站点
date_type = sys.argv[2] # 参数2:类型:week/4_week/month/quarter/day
date_info = sys.argv[3] # 参数3:年-周/年-月/年-季/年-月-日, 比如: 2022-1
export_data(site_name, date_type, date_info)
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.secure_db_client import get_remote_engine
engine = get_remote_engine(
site_name="us", # -> database "selection"
db_type="mysql", # -> 服务端 alias "mysql"
# user="fangxingjun", # -> 服务端 alias "mysql"
# user_token="5f1b2e9c3a4d7f60" # 可不传,走默认
)
# site_name = 'us'
# date_type = 'month'
# date_info = '2026-01'
site_name = sys.argv[1] # 参数1:站点
date_type = sys.argv[2] # 参数2:类型:week/4_week/month/quarter/day
date_info = sys.argv[3] # 参数3:年-周/年-月/年-季/年-月-日, 比如: 2022-1
partitions = {
'site_name': site_name,
'date_type': date_type,
'date_info': date_info,
}
cols_list = ['asin', 'rating_and_comments_info', 'date_info']
engine.sqoop_raw_export(
hive_table='dwt_self_asin_detail_rating',
import_table=f'{site_name}_self_rating_180day',
partitions=partitions,
m=1,
cols=','.join(cols_list)
)
...@@ -104,7 +104,6 @@ if __name__ == '__main__': ...@@ -104,7 +104,6 @@ if __name__ == '__main__':
REPLACE(REPLACE(REPLACE(bundle_asin_component_json, E'\n',' '), E'\r',' '), E'\t',' ') AS bundle_asin_component_json, REPLACE(REPLACE(REPLACE(bundle_asin_component_json, E'\n',' '), E'\r',' '), E'\t',' ') AS bundle_asin_component_json,
REPLACE(REPLACE(REPLACE(cart_type, E'\n',' '), E'\r',' '), E'\t',' ') AS cart_type, REPLACE(REPLACE(REPLACE(cart_type, E'\n',' '), E'\r',' '), E'\t',' ') AS cart_type,
REPLACE(REPLACE(REPLACE(site, E'\n',' '), E'\r',' '), E'\t',' ') AS site REPLACE(REPLACE(REPLACE(site, E'\n',' '), E'\r',' '), E'\t',' ') AS site
FROM {import_table} FROM {import_table}
WHERE date_info='{date_info}' AND \$CONDITIONS WHERE date_info='{date_info}' AND \$CONDITIONS
""" """
......
...@@ -28,6 +28,7 @@ site_name_db_dict = { ...@@ -28,6 +28,7 @@ site_name_db_dict = {
"it": "selection_it", "it": "selection_it",
} }
db_type_alias_map = { db_type_alias_map = {
"mysql": "mysql", # 阿里云mysql "mysql": "mysql", # 阿里云mysql
"postgresql_14": "postgresql_14", # pg14爬虫库-内网 "postgresql_14": "postgresql_14", # pg14爬虫库-内网
...@@ -41,7 +42,8 @@ db_type_alias_map = { ...@@ -41,7 +42,8 @@ db_type_alias_map = {
DEFAULT_SERVERS = [ DEFAULT_SERVERS = [
# "http://192.168.200.210:7777", # 内网-h5 # "http://192.168.200.210:7777", # 内网-h5
"http://192.168.200.210:7778", # 内网-h5 "http://61.145.136.61:7777", # 外网
# "http://192.168.200.210:7778", # 内网-h5
# "http://192.168.200.210:7778", # 内网-测试大数据-h5 # "http://192.168.200.210:7778", # 内网-测试大数据-h5
# "http://192.168.10.216:7777", # 内网-测试大数据-h6 # "http://192.168.10.216:7777", # 内网-测试大数据-h6
# "http://192.168.10.216:7777", # 内网-h6 # "http://192.168.10.216:7777", # 内网-h6
...@@ -273,6 +275,25 @@ class RemoteEngine: ...@@ -273,6 +275,25 @@ class RemoteEngine:
print(f"更新 {rows_updated} 行") print(f"更新 {rows_updated} 行")
return df return df
def swap_tables(self,
source_table: str,
target_table: str,
dry_run: bool = False):
payload = {
"db": self.db_type,
"site_name": self.site_name,
"source_table": source_table,
"target_table": target_table,
"dry_run": dry_run
}
resp = self._request("swap_tables", payload)
if not resp.get("ok"):
raise RuntimeError(resp)
return resp
def sqoop_raw_import(self, def sqoop_raw_import(self,
# site_name: str, # site_name: str,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment