实时Flink作业创建
点击新建按钮,弹出【新建作业】弹窗。选择Flink作业类型,目前Flink作业分为两大类:FlinkSQL作业 和Flink自定义作业。
FlinkSQL:用户通过写SQL的方式就能够进行实时Flink作业的开发。
Flink自定义:用户需要在线下编辑好Flink作业包,上传到平台之上。
选择【FlinkSQL】类型后,输入作业名称,点击【确定】。在【任务开发】列表中,显示创建任务。
选择【FlinkJar】类型后,点击【确定】,进行在【任务开发】列表中显示创建的任务。
实时Flink作业编辑
FlinkSQL作业
创建的FlinkSQL作业中,用户能够通过类SQL脚本进行任务开发。
SQL脚本模板如下:
--创建kafka表
drop table if EXISTS kafka_run_log;
create table kafka_run_log(
carId int,
fuel_consumption int,
traffic string,
speed int,
latitude double,
longitude double,
create_time TIMESTAMP(3)
) with (
'connector.type' = 'kafka',
'connector.version' = '0.10',
'connector.topic' = 'vehicle_run_info',
'connector.properties.bootstrap.servers' = ' XX.XX.XX.XX:9093',
'connector.startup-mode' = 'latest-offset',
'connector.properties.enable.auto.commit' = 'false',
'format.type' = 'csv'
);
/*
创建mysql表语句
*/
drop table if EXISTS mysql_run_log;
CREATE TABLE mysql_run_log (
carId int,
fuel_consumption INT,
traffic varchar(50),
speed int,
latitude double,
longitude double,
create_time TIMESTAMP(3)
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://XX.XX.XX.XX:3306/car_run_log?useSSL=false',
'connector.table' = 'mysql_run_log',
'connector.username' = 'edap_test',
'connector.password' = 'edap_test@1'
);
insert into
mysql_run_log
select
carId,
fuel_consumption,
traffic,
speed,
latitude,
longitude,
create_time
from
kafka_run_log;
select
count(carId)
from
mysql_run_log;
用户可以在脚本编辑框中,编写相应的FlinkSQL脚本。
FlinkJAR作业
用户可以在开发界面中调整jar包的存储路径。
实时Flink作业配置
在Flink作业开发完成之后,点击【基本信息】,显示可视化作业的基本信息,并能够进行描述修改。
点击【参数设置】,弹出Flink作业的参数设置。设置的参数可以在Flink作业中进行引用。
点击【引擎设置】弹出引擎设置弹出框。设置引擎相关的配置信息。
实时Flink作业的保存及测试运行
Flink作业开发完成后,点击上面【保存】按钮,进行作业保存。
点击【执行】,进行Flink作业测试运行,且在【执行信息】中弹出执行日志信息。