ClickHouse数据导入
1.INSERT
2.文件
clickhouse-client -h wxpt-ck-01 --port="9000" -u default --password='abc123456' --format_csv_delimiter=$'\001' --max_partitions_per_insert_block=3000 --input_format_allow_errors_num=100000 --query="INSERT INTO default.tb_test_distributed FORMAT CSV" < CK_TEST.txt
--format_csv_delimiter 字段分隔符
--max_partitions_per_insert_block 此次插入最大分区数
--input_format_allow_errors_num 可忽略的错误条数
3.HDFS
需在ck建立HDFS映射表,高可用集群可以将配置文件加入config.xml中
<hdfs>
<libhdfs3_conf>/data/software/comm_config/hdfs-site.xml</libhdfs3_conf>
</hdfs>
创建HDFS映射表
CREATE TABLE default.hdfs_home_page_touch_collect_local
(
ACTIVITY_ID String COMMENT '活动ID',
RECOMMEND_TYPE String COMMENT '推荐类型',
ACTIVITY_NAME String COMMENT '活动名称',
ACTIVITY_START DateTime COMMENT '活动开始时间',
ACTIVITY_END DateTime COMMENT '活动结束时间',
CREATOR String COMMENT '创建人',
ORG_NAME String COMMENT '组织名称',
GOODS_ID String COMMENT '商品ID',
GOODS_TABLE_ID UInt64 COMMENT '商品表ID',
SECTION_TYPE String COMMENT '业务类型',
FIRST_DATA_TYPE String COMMENT '一级分类',
SECOND_DATA_TYPE String COMMENT '二级分类',
POSITION_CODE String COMMENT '位置编码',
POSITION_NAME String COMMENT '位置名称',
PHONE_NUMBER UInt64 COMMENT '手机号码',
RECOM_PV UInt64 COMMENT '推荐数',
CLICK_PV UInt64 COMMENT '点击数',
ORDER_PV UInt64 COMMENT '订单数',
GOODS_URL String COMMENT '商品URL',
GOODS_NAME String COMMENT '商品名称',
GOODS_CHANNEL String COMMENT '商品触点编码',
ACT_ORDER_PV UInt64 COMMENT '活动级订单数',
OPERA_DAY String COMMENT '统计日期'
) ENGINE HDFS('hdfs://cluster1/home/hadoop/baiyk/tb_dwa_home_page_touch_collect_bak/*', 'TSV');
导出数据到CK集群表中
INSERT INTO report.tb_home_page_touch_collect
SELECT ACTIVITY_ID,
RECOMMEND_TYPE,
ACTIVITY_NAME,
ACTIVITY_START,
ACTIVITY_END,
CREATOR,
ORG_NAME,
GOODS_ID,
GOODS_TABLE_ID,
SECTION_TYPE,
FIRST_DATA_TYPE,
SECOND_DATA_TYPE,
POSITION_CODE,
POSITION_NAME,
PHONE_NUMBER,
RECOM_PV,
CLICK_PV,
ORDER_PV,
GOODS_URL,
GOODS_NAME,
GOODS_CHANNEL,
ACT_ORDER_PV,
OPERA_DAY
FROM default.hdfs_home_page_touch_collect_local;
4.三方工具seaTunnel
官网
下载解压后,配置SPARK_HOME
vim ./config/seatunnel-env.sh
编写配置文件,参考
vim ./config/spark.hive2CK.conf
env {
spark.app.name = "Hive2CK_tb_st_user_lable_new"
spark.executor.instances = 2
# 启用动态分区,若资源消耗过于严重,也可以配置核心数与内存
spark.shuffle.service.enabled = true
spark.dynamicAllocation.enabled = true
}
source {
hive {
# hive数据源必须写作一行,可在SQL中进行字段处理
pre_sql = "SELECT phone_number, user_id, age, sex, name, user_sys, wo_credit, contact, prov_id, prov_name, city_id, cb_city_id, city_name, is_5g_city, is_tencent, product_base, is_2i_user, is_online, chnl_kind_id, product_id, product_name, product_alias_name, product_id_dg, product_id_dg_name, innet_age_day, innet_age_month, innet_date, is_5g_model, curr_terminal_length, brand_id, brand_name, model_id, model_name, is_first_pay, is_second_pay, first_pay_fee, pay_amount_all, kyye_fee, recent_pay_time, recent_pay_amount, month_fee, last_month_arpu, last_2month_avg_arpu, last_3month_avg_arpu, last_6month_avg_arpu, voice_extra_fee, svip_and_month_fee, last_month_flux_fee, last_2month_avg_flux_fee, last_3month_avg_flux_fee, last_2month_max_flux_fee, total_flux, total_call, expen_in_flux_rate, expen_in_call_rate, totaler_bytes, totaler_bytes_rate, is_new_reg_low_freq, out_jf_times, is_low_flux_preference, is_low_call_preference, is_open_intl, open_intl_date, hkmotw_data, intl_data, out_active_call, in_active_call, last_3month_avg_total_flux, last_6month_avg_total_flux, last_3month_avg_out_jf_times, last_6month_avg_out_jf_times, is_silence_user, user_desc, is_susp_card_user, is_fake_user, stop_days, half_stop_days, half_stop_num_last_60, boot_state, is_family_product, is_ykrh, ykrh_type, is_yyrh, yyrh_type, is_5g_user, dual_slot, customer_type, is_innet, is_acct, offnet_date, is_5g_product, is_free_flux, is_free_call, is_opencard, contract_type, is_high_risk_3month, is_second_pay_3month, transform(video_preference, x -> to_json(x)) video_preference, transform(game_preference, x -> to_json(x)) game_preference, contact_code, is_newinnet_user, is_double_flow_effective, is_second_pay_partake, is_high_risk_partake, is_5g_up_dg, is_prime_equity_package, is_prime_equity_similar, is_commit_low, is_low_pay_user, is_jinli_member, is_prime_equity_5g, is_prime_equity_5g_contract, is_fee_help_deduction, is_preferential_benefits, off_net_prob, transfer_net_prob, credit_scores, user_stability, is_loyal_user, is_vip_user, user_value_rank, rank_clustering_net_duration, rank_clustering_age, rank_clustering_total_fee, rank_clustering_total_flux, rank_clustering_user_value, bill_flux, ct_times, product_expen_in, product_expen_out, expen_in_call, expen_in_flux_g, expen_in_flux_m, is_5g_up_dg_month, is_5g_up_td_month, is_supervip_dg_month, is_supervip_td_month, is_cndx_dg_month, is_cndx_td_month, is_newinnet_pay_partake, is_half_stop_partake, is_2i_product, is_svip, is_2i_flow_fill, is_tencent_free_flow, is_share_prime_equity, is_5g_relative, is_shouting_6month, transform(read_preference, x -> to_json(x)) read_preference, transform(chat_preference, x -> to_json(x)) chat_preference, transform(shop_preference, x -> to_json(x)) shop_preference, transform(learn_preference, x -> to_json(x)) learn_preference, transform(travel_preference, x -> to_json(x)) travel_preference, transform(life_preference, x -> to_json(x)) life_preference, transform(health_preference, x -> to_json(x)) health_preference, transform(financial_preference, x -> to_json(x)) financial_preference, transform(utilities_preference, x -> to_json(x)) utilities_preference, is_jiasubao, jiasubao_time_rate, jiasubao_num_rate, is_flux_plunge, is_time_plunge, is_new_supervip, is_not_equity_supervip, is_not_equity_supervip_same_month, off_net_prob_no_wk, off_net_prob_wk, is_supervip_not_5g_user, is_supervip_not_5g_user_type, is_dg_5g_active, is_td_weixi_active, is_game_preference, is_video_preference, family_card_mainsub, family_card_amount, service_type_old, product_id_5g, prime_equity_list, id_sx_4g_svip, id_sx_5g_svip, id_sx_cndx, id_sx_5g_up, id_td_5g_up, id_td_4g_svip, id_td_5g_svip, id_td_svip, id_td_cndx, id_td_5g_jyb, id_sx_5g_us, id_td_5g_us, date_format(current_date(), 'yyyMMdd') opera_day FROM raw_layer.tb_st_user_lable_new"
result_table_name = "source_view_table"
}
}
transform {
# 转换模块,对于个别字段分割,处理也可以在这里做
}
sink {
clickhouse {
source_table_name = "source_view_table"
# 集群
host = "xxxx:8123"
clickhouse.socket_timeout = 50000
database = "raw_layer"
table = "tb_st_user_label_new"
# 表字段
fields = ["phone_number","user_id","age","sex","name","user_sys","wo_credit","contact","prov_id","prov_name","city_id","cb_city_id","city_name","is_5g_city","is_tencent","product_base","is_2i_user","is_online","chnl_kind_id","product_id","product_name","product_alias_name","product_id_dg","product_id_dg_name","innet_age_day","innet_age_month","innet_date","is_5g_model","curr_terminal_length","brand_id","brand_name","model_id","model_name","is_first_pay","is_second_pay","first_pay_fee","pay_amount_all","kyye_fee","recent_pay_time","recent_pay_amount","month_fee","last_month_arpu","last_2month_avg_arpu","last_3month_avg_arpu","last_6month_avg_arpu","voice_extra_fee","svip_and_month_fee","last_month_flux_fee","last_2month_avg_flux_fee","last_3month_avg_flux_fee","last_2month_max_flux_fee","total_flux","total_call","expen_in_flux_rate","expen_in_call_rate","totaler_bytes","totaler_bytes_rate","is_new_reg_low_freq","out_jf_times","is_low_flux_preference","is_low_call_preference","is_open_intl","open_intl_date","hkmotw_data","intl_data","out_active_call","in_active_call","last_3month_avg_total_flux","last_6month_avg_total_flux","last_3month_avg_out_jf_times","last_6month_avg_out_jf_times","is_silence_user","user_desc","is_susp_card_user","is_fake_user","stop_days","half_stop_days","half_stop_num_last_60","boot_state","is_family_product","is_ykrh","ykrh_type","is_yyrh","yyrh_type","is_5g_user","dual_slot","customer_type","is_innet","is_acct","offnet_date","is_5g_product","is_free_flux","is_free_call","is_opencard","contract_type","is_high_risk_3month","is_second_pay_3month","video_preference","game_preference","contact_code","is_newinnet_user","is_double_flow_effective","is_second_pay_partake","is_high_risk_partake","is_5g_up_dg","is_prime_equity_package","is_prime_equity_similar","is_commit_low","is_low_pay_user","is_jinli_member","is_prime_equity_5g","is_prime_equity_5g_contract","is_fee_help_deduction","is_preferential_benefits","off_net_prob","transfer_net_prob","credit_scores","user_stability","is_loyal_user","is_vip_user","user_value_rank","rank_clustering_net_duration","rank_clustering_age","rank_clustering_total_fee","rank_clustering_total_flux","rank_clustering_user_value","bill_flux","ct_times","product_expen_in","product_expen_out","expen_in_call","expen_in_flux_g","expen_in_flux_m","is_5g_up_dg_month","is_5g_up_td_month","is_supervip_dg_month","is_supervip_td_month","is_cndx_dg_month","is_cndx_td_month","is_newinnet_pay_partake","is_half_stop_partake","is_2i_product","is_svip","is_2i_flow_fill","is_tencent_free_flow","is_share_prime_equity","is_5g_relative","is_shouting_6month","read_preference","chat_preference","shop_preference","learn_preference","travel_preference","life_preference","health_preference","financial_preference","utilities_preference","is_jiasubao","jiasubao_time_rate","jiasubao_num_rate","is_flux_plunge","is_time_plunge","is_new_supervip","is_not_equity_supervip","is_not_equity_supervip_same_month","off_net_prob_no_wk","off_net_prob_wk","is_supervip_not_5g_user","is_supervip_not_5g_user_type","is_dg_5g_active","is_td_weixi_active","is_game_preference","is_video_preference","family_card_mainsub","family_card_amount","service_type_old","product_id_5g","prime_equity_list","id_sx_4g_svip","id_sx_5g_svip","id_sx_cndx","id_sx_5g_up","id_td_5g_up","id_td_4g_svip","id_td_5g_svip","id_td_svip","id_td_cndx","id_td_5g_jyb","id_sx_5g_us","id_td_5g_us","opera_day"]
username = "default"
password = "abc123456"
# 每批次最打条数
bulk_size = 20000
# 是否启动分割,仅用于分布表
split_mode = true
# 分布键
sharding_key = "cityHash64(phone_number, user_id)"
}
}
执行
./bin/start-seatunnel-spark.sh --config ./config/spark.hive2ck.conf --master yarn -e client
启动后会提交到YARN上,然后执行SPARK-SQL
5.seaTunnel的CLickHouseFile插件
还未测试,介绍
Comments | NOTHING