-- create join table --
create table join_table with ('command'='get', 'value.data.structure'='row') like sink_redis
-- create result table --
create table result_table(uid VARCHAR, username VARCHAR, score double, score2 double) with ('connector'='print')
-- create source table --
create table source_table(uid VARCHAR, username VARCHAR, proc_time as procTime()) with ('connector'='datagen', 'fields.uid.kind'='sequence', 'fields.uid.start'='1', 'fields.uid.end'='2')
-- 关联查询维表,获得维表的多个字段值 --
insert
into
result_table
select
s.uid,
s.username,
j.score, -- 来自维表
j.score2 -- 来自维表
from
source_table as s
join join_table for system_time as of s.proc_time as j on
j.uid = s.uid
用自带例子 ,收到数据
-- 创建表 create table sink_redis(uid VARCHAR,score double,score2 double ) with ( 'connector' = 'redis', 'host' = '10.11.69.176', 'port' = '6379', 'redis-mode' = 'single', 'password' = '***', 'command' = 'SET', 'value.data.structure' = 'row'); -- 'value.data.structure'='row':整行内容保存至value并以'\01'分割 -- 写入测试数据,score、score2为需要被关联查询出的两个维度 insert into sink_redis select from (values ('1', 10.3, 10.1));
-- 在redis中,value的值为: "1\x0110.3\x0110.1" -- -- 写入结束 --
-- create join table -- create table join_table with ('command'='get', 'value.data.structure'='row') like sink_redis
-- create result table -- create table result_table(uid VARCHAR, username VARCHAR, score double, score2 double) with ('connector'='print')
-- create source table -- create table source_table(uid VARCHAR, username VARCHAR, proc_time as procTime()) with ('connector'='datagen', 'fields.uid.kind'='sequence', 'fields.uid.start'='1', 'fields.uid.end'='2')
-- 关联查询维表,获得维表的多个字段值 -- insert into result_table select s.uid, s.username, j.score, -- 来自维表 j.score2 -- 来自维表 from source_table as s join join_table for system_time as of s.proc_time as j on j.uid = s.uid