从源码构建
# 安装构建基本环境
sudo apt-get install -y gcc cmake build-essential git libjansson-dev libsnappy-dev liblzma-dev zlib1g-dev pkg-config libtool autoconf automake groff
# 用来安装adaptor keeper explore之类的辅助,如果不要可以不安装go
sudo apt install -y golang-go
# 指定版本下载源码
git clone --depth 1 --branch ver-3.4.1.0 https://github.com/taosdata/TDengine.git
cd TDengine/
#
cmake -B build -DBUILD_TOOLS=true -DBUILD_CONTRIB=true
cmake -B build -DBUILD_TOOLS=true -DBUILD_CONTRIB=true -DBUILD_KEEPER=true -DBUILD_HTTP=false
cmake --build build/ install
配置
# 可检查下/etc/taos.cfg的配置,一般只需要修改下日志、数据的存放路径,然后可直接启动服务
systemctrl start taos
# 首次登录无密码,如果无日志写入权限,需要sudo
taos
# 进入之后,可设置密码,创建用户
alter user root pass 'abc123456@';
create user gauss pass 'Gauss20250922@';
# 设定密码后,再次使用客户端
sudo taos -uroot -fb123456@
# 如果要使用流计算,需要创建sndoe,再taos命令下
show dnodes;
create snode on dnode 1;
# 删除snode
drop snode on dnode 1;管理数据库
创建
# tdengine的数据库有一些配置选项,这里希望缓存最后数据,提供十年存放周期,时间精度为ms
CREATE DATABASE `db_tick`
CACHEMODEL 'last_row'
KEEP 36500
PRECISION 'ms' ;删除
# 删除数据库 : tdengine如果只删除超级表,不删除子表,会存在隐患
drop database `db_tick`;查看
# 查看数据库列表
show databases;
# 查看数据库DDL
show create database db_tick\G;
# 查看数据库里的超级表
select stable_name,db_name,`tags` from information_schema.ins_stables order by stable_name;
# 选定指定数据库
use db_tick;
# 显示超级表列表
show stables;
# 查看超级表DDL
show create stable t_tick\G;创建超级表
CREATE STABLE `t_tick` (
`ts` TIMESTAMP,
`ask` DOUBLE,
`bid` DOUBLE,
`close` DOUBLE,
`vol` DOUBLE,
`dir` TINYINT,
`create_time` TIMESTAMP
) TAGS (`symbol` VARCHAR(64)) ;
CREATE STABLE `t_kline_s_01` (
`ts` TIMESTAMP,
`h` DOUBLE,
`l` DOUBLE,
`o` DOUBLE,
`c` DOUBLE,
`vol` DOUBLE,
`turn_over` DOUBLE,
`create_time` TIMESTAMP
) TAGS (`symbol` VARCHAR(64)) ;
CREATE STABLE `t_kline_dir_s_01` (
`ts` TIMESTAMP,
`cumulative_bid_vol` DOUBLE,
`cumulative_ask_vol` DOUBLE,
`cumulative_mid_vol` DOUBLE,
`create_time` TIMESTAMP
) TAGS (`symbol` VARCHAR(64)) ;
CREATE STABLE `t_kline_vol_s_01` (
`ts` TIMESTAMP,
`min_bid_vol` DOUBLE,
`max_bid_vol` DOUBLE,
`fin_bid_vol` DOUBLE,
`create_time` TIMESTAMP
) TAGS (`symbol` VARCHAR(64)) ;
CREATE STABLE `t_footprint_s_01` (
`ts` TIMESTAMP,
`price` DOUBLE,
`bid_vol` DOUBLE,
`ask_vol` DOUBLE,
`mid_vol` DOUBLE,
`create_time` TIMESTAMP
) TAGS (`symbol` VARCHAR(64)) ;创建流
drop stream stream_tick_to_kline_s_01;
create stream stream_tick_to_kline_s_01
interval(1s) sliding(1s)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m))
into t_kline_s_01
output_subtable(concat('t_kline_s_01_', symbol))
tags (symbol binary(64) as symbol)
as
select
_twstart as ts,
cast(max(close) as double) as h,
cast(min(close) as double) as l,
cast(first(close) as double) as o,
cast(last(close) as double) as c,
cast(sum(vol) as double) as vol,
cast(sum(vol*close) as double) as turn_over,
_twend as create_time
from %%tbname
where ts>= _twstart and ts < _twend;
drop stream stream_tick_to_footprint_s_01;
create stream stream_tick_to_footprint_s_01
interval(1s) sliding(1s)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m))
into t_footprint_s_01
output_subtable(concat('t_footprint_s_01_', symbol))
tags (symbol binary(64) as symbol)
as
select
first(ts) as ts,
close as price,
cast(sum(case when dir =1 then vol else 0.0 end) as double) as bid_vol,
cast(sum(case when dir =3 then vol else 0.0 end) as double) as ask_vol,
cast(sum(case when dir =2 then vol else 0.0 end) as double) as mid_vol,
_twend as create_time
from %%tbname
where ts>= _twstart and ts < _twend
group by close;
drop stream stream_tick_to_kline_dir_s_01;
create stream stream_tick_to_kline_dir_s_01
interval(1s) sliding(1s)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m))
into t_kline_dir_s_01
output_subtable(concat('t_kline_dir_s_01_', symbol))
tags (symbol binary(64) as symbol)
as
select
_twstart as ts,
sum(case when dir =1 then vol else 0.0 end) as cumulative_bid_vol,
sum(case when dir =3 then vol else 0.0 end) as cumulative_ask_vol,
sum(case when dir =2 then vol else 0.0 end) as cumulative_mid_vol,
_twend as create_time
from %%tbname
where ts>= _twstart and ts <= _twend;
drop stream stream_tick_to_kline_vol_s_01;
create stream stream_tick_to_kline_vol_s_01
interval(1s) sliding(1s)
from db_tick.t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m) |FILL_HISTORY(1) )
into db_tick.t_kline_vol_s_01
output_subtable(concat('t_kline_vol_s_01_', symbol))
tags (symbol binary(64) as symbol)
as
select
_twstart as ts,
min(a) as min_bid_vol,
max(a) as max_bid_vol,
last(a) as fin_bid_vol,
_twend as create_time
from (select
ts,
csum(vol * cast(2-dir as double)) as a
from %%tbname
where ts>= _twstart and ts <= _twend)b;测试
创建1,5,15s,1,5,15,30m,1,4h,1d,1w这11个周期的流计算,每个都有基于tick的四个计算包括kline、klvol、kldir、footprint。如下脚本。
超级表
创建
CREATE STABLE `t_tick` (`ts` TIMESTAMP, `ask` DOUBLE, `bid` DOUBLE, `close` DOUBLE, `vol` DOUBLE, `dir` TINYINT, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_kline_s_01` (`ts` TIMESTAMP, `h` DOUBLE, `l` DOUBLE, `o` DOUBLE, `c` DOUBLE, `vol` DOUBLE, `turn_over` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_kldir_s_01` (`ts` TIMESTAMP, `cumulative_bid_vol` DOUBLE, `cumulative_ask_vol` DOUBLE, `cumulative_mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_klvol_s_01` (`ts` TIMESTAMP, `min_bid_vol` DOUBLE, `max_bid_vol` DOUBLE, `fin_bid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_footprint_s_01` (`ts` TIMESTAMP, `price` DOUBLE, `bid_vol` DOUBLE, `ask_vol` DOUBLE, `mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_kline_s_05` (`ts` TIMESTAMP, `h` DOUBLE, `l` DOUBLE, `o` DOUBLE, `c` DOUBLE, `vol` DOUBLE, `turn_over` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_kldir_s_05` (`ts` TIMESTAMP, `cumulative_bid_vol` DOUBLE, `cumulative_ask_vol` DOUBLE, `cumulative_mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_klvol_s_05` (`ts` TIMESTAMP, `min_bid_vol` DOUBLE, `max_bid_vol` DOUBLE, `fin_bid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_footprint_s_05` (`ts` TIMESTAMP, `price` DOUBLE, `bid_vol` DOUBLE, `ask_vol` DOUBLE, `mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_kline_s_15` (`ts` TIMESTAMP, `h` DOUBLE, `l` DOUBLE, `o` DOUBLE, `c` DOUBLE, `vol` DOUBLE, `turn_over` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_kldir_s_15` (`ts` TIMESTAMP, `cumulative_bid_vol` DOUBLE, `cumulative_ask_vol` DOUBLE, `cumulative_mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_klvol_s_15` (`ts` TIMESTAMP, `min_bid_vol` DOUBLE, `max_bid_vol` DOUBLE, `fin_bid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_footprint_s_15` (`ts` TIMESTAMP, `price` DOUBLE, `bid_vol` DOUBLE, `ask_vol` DOUBLE, `mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_kline_m_01` (`ts` TIMESTAMP, `h` DOUBLE, `l` DOUBLE, `o` DOUBLE, `c` DOUBLE, `vol` DOUBLE, `turn_over` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_kldir_m_01` (`ts` TIMESTAMP, `cumulative_bid_vol` DOUBLE, `cumulative_ask_vol` DOUBLE, `cumulative_mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_klvol_m_01` (`ts` TIMESTAMP, `min_bid_vol` DOUBLE, `max_bid_vol` DOUBLE, `fin_bid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_footprint_m_01` (`ts` TIMESTAMP, `price` DOUBLE, `bid_vol` DOUBLE, `ask_vol` DOUBLE, `mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_kline_m_05` (`ts` TIMESTAMP, `h` DOUBLE, `l` DOUBLE, `o` DOUBLE, `c` DOUBLE, `vol` DOUBLE, `turn_over` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_kldir_m_05` (`ts` TIMESTAMP, `cumulative_bid_vol` DOUBLE, `cumulative_ask_vol` DOUBLE, `cumulative_mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_klvol_m_05` (`ts` TIMESTAMP, `min_bid_vol` DOUBLE, `max_bid_vol` DOUBLE, `fin_bid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_footprint_m_05` (`ts` TIMESTAMP, `price` DOUBLE, `bid_vol` DOUBLE, `ask_vol` DOUBLE, `mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_kline_m_15` (`ts` TIMESTAMP, `h` DOUBLE, `l` DOUBLE, `o` DOUBLE, `c` DOUBLE, `vol` DOUBLE, `turn_over` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_kldir_m_15` (`ts` TIMESTAMP, `cumulative_bid_vol` DOUBLE, `cumulative_ask_vol` DOUBLE, `cumulative_mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_klvol_m_15` (`ts` TIMESTAMP, `min_bid_vol` DOUBLE, `max_bid_vol` DOUBLE, `fin_bid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_footprint_m_15` (`ts` TIMESTAMP, `price` DOUBLE, `bid_vol` DOUBLE, `ask_vol` DOUBLE, `mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_kline_m_30` (`ts` TIMESTAMP, `h` DOUBLE, `l` DOUBLE, `o` DOUBLE, `c` DOUBLE, `vol` DOUBLE, `turn_over` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_kldir_m_30` (`ts` TIMESTAMP, `cumulative_bid_vol` DOUBLE, `cumulative_ask_vol` DOUBLE, `cumulative_mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_klvol_m_30` (`ts` TIMESTAMP, `min_bid_vol` DOUBLE, `max_bid_vol` DOUBLE, `fin_bid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_footprint_m_30` (`ts` TIMESTAMP, `price` DOUBLE, `bid_vol` DOUBLE, `ask_vol` DOUBLE, `mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_kline_h_01` (`ts` TIMESTAMP, `h` DOUBLE, `l` DOUBLE, `o` DOUBLE, `c` DOUBLE, `vol` DOUBLE, `turn_over` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_kldir_h_01` (`ts` TIMESTAMP, `cumulative_bid_vol` DOUBLE, `cumulative_ask_vol` DOUBLE, `cumulative_mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_klvol_h_01` (`ts` TIMESTAMP, `min_bid_vol` DOUBLE, `max_bid_vol` DOUBLE, `fin_bid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_footprint_h_01` (`ts` TIMESTAMP, `price` DOUBLE, `bid_vol` DOUBLE, `ask_vol` DOUBLE, `mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_kline_h_04` (`ts` TIMESTAMP, `h` DOUBLE, `l` DOUBLE, `o` DOUBLE, `c` DOUBLE, `vol` DOUBLE, `turn_over` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_kldir_h_04` (`ts` TIMESTAMP, `cumulative_bid_vol` DOUBLE, `cumulative_ask_vol` DOUBLE, `cumulative_mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_klvol_h_04` (`ts` TIMESTAMP, `min_bid_vol` DOUBLE, `max_bid_vol` DOUBLE, `fin_bid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_footprint_h_04` (`ts` TIMESTAMP, `price` DOUBLE, `bid_vol` DOUBLE, `ask_vol` DOUBLE, `mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_kline_d_01` (`ts` TIMESTAMP, `h` DOUBLE, `l` DOUBLE, `o` DOUBLE, `c` DOUBLE, `vol` DOUBLE, `turn_over` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_kldir_d_01` (`ts` TIMESTAMP, `cumulative_bid_vol` DOUBLE, `cumulative_ask_vol` DOUBLE, `cumulative_mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_klvol_d_01` (`ts` TIMESTAMP, `min_bid_vol` DOUBLE, `max_bid_vol` DOUBLE, `fin_bid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_footprint_d_01` (`ts` TIMESTAMP, `price` DOUBLE, `bid_vol` DOUBLE, `ask_vol` DOUBLE, `mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_kline_w_01` (`ts` TIMESTAMP, `h` DOUBLE, `l` DOUBLE, `o` DOUBLE, `c` DOUBLE, `vol` DOUBLE, `turn_over` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_kldir_w_01` (`ts` TIMESTAMP, `cumulative_bid_vol` DOUBLE, `cumulative_ask_vol` DOUBLE, `cumulative_mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_klvol_w_01` (`ts` TIMESTAMP, `min_bid_vol` DOUBLE, `max_bid_vol` DOUBLE, `fin_bid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_footprint_w_01` (`ts` TIMESTAMP, `price` DOUBLE, `bid_vol` DOUBLE, `ask_vol` DOUBLE, `mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
删除
drop stable if exists `t_footprint_w_01`;
drop stable if exists `t_footprint_d_01`;
drop stable if exists `t_footprint_h_04`;
drop stable if exists `t_footprint_h_01`;
drop stable if exists `t_footprint_m_30`;
drop stable if exists `t_footprint_m_15`;
drop stable if exists `t_footprint_m_05`;
drop stable if exists `t_footprint_m_01`;
drop stable if exists `t_footprint_s_15`;
drop stable if exists `t_footprint_s_05`;
drop stable if exists `t_footprint_s_01`;
drop stable if exists `t_klvol_w_01`;
drop stable if exists `t_klvol_d_01`;
drop stable if exists `t_klvol_h_04`;
drop stable if exists `t_klvol_h_01`;
drop stable if exists `t_klvol_m_30`;
drop stable if exists `t_klvol_m_15`;
drop stable if exists `t_klvol_m_05`;
drop stable if exists `t_klvol_m_01`;
drop stable if exists `t_klvol_s_15`;
drop stable if exists `t_klvol_s_05`;
drop stable if exists `t_klvol_s_01`;
drop stable if exists `t_kldir_w_01`;
drop stable if exists `t_kldir_d_01`;
drop stable if exists `t_kldir_h_04`;
drop stable if exists `t_kldir_h_01`;
drop stable if exists `t_kldir_m_30`;
drop stable if exists `t_kldir_m_15`;
drop stable if exists `t_kldir_m_05`;
drop stable if exists `t_kldir_m_01`;
drop stable if exists `t_kldir_s_15`;
drop stable if exists `t_kldir_s_05`;
drop stable if exists `t_kldir_s_01`;
drop stable if exists `t_kline_w_01`;
drop stable if exists `t_kline_d_01`;
drop stable if exists `t_kline_h_04`;
drop stable if exists `t_kline_h_01`;
drop stable if exists `t_kline_m_30`;
drop stable if exists `t_kline_m_15`;
drop stable if exists `t_kline_m_05`;
drop stable if exists `t_kline_m_01`;
drop stable if exists `t_kline_s_15`;
drop stable if exists `t_kline_s_05`;
drop stable if exists `t_kline_s_01`;
drop stable if exists `t_tick`;
流
创建
create stream stream_tick_to_kline_s_01
interval(1s) sliding(1s)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m))
into t_kline_s_01
output_subtable(concat('t_kline_s_01_', symbol))
tags (symbol binary(64) as symbol)
as
select
_twstart as ts,
cast(max(close) as double) as h,
cast(min(close) as double) as l,
cast(first(close) as double) as o,
cast(last(close) as double) as c,
cast(sum(vol) as double) as vol,
cast(sum(vol*close) as double) as turn_over,
_twend as create_time
from %%tbname
where ts>= _twstart and ts < _twend;
create stream stream_tick_to_footprint_s_01
interval(1s) sliding(1s)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m))
into t_footprint_s_01
output_subtable(concat('t_footprint_s_01_', symbol))
tags (symbol binary(64) as symbol)
as
select
first(ts) as ts,
close as price,
cast(sum(case when dir =1 then vol else 0.0 end) as double) as bid_vol,
cast(sum(case when dir =3 then vol else 0.0 end) as double) as ask_vol,
cast(sum(case when dir =2 then vol else 0.0 end) as double) as mid_vol,
_twend as create_time
from %%tbname
where ts>= _twstart and ts < _twend
group by close;
create stream stream_tick_to_kldir_s_01
interval(1s) sliding(1s)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m))
into t_kldir_s_01
output_subtable(concat('t_kldir_s_01_', symbol))
tags (symbol binary(64) as symbol)
as
select
_twstart as ts,
sum(case when dir =1 then vol else 0.0 end) as cumulative_bid_vol,
sum(case when dir =3 then vol else 0.0 end) as cumulative_ask_vol,
sum(case when dir =2 then vol else 0.0 end) as cumulative_mid_vol,
_twend as create_time
from %%tbname
where ts>= _twstart and ts <= _twend;
create stream stream_tick_to_klvol_s_01
interval(1s) sliding(1s)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m))
into t_klvol_s_01
output_subtable(concat('t_klvol_s_01_', symbol))
tags (symbol binary(64) as symbol)
as
select
_twstart as ts,
min(a) as min_bid_vol,
max(a) as max_bid_vol,
last(a) as fin_bid_vol,
_twend as create_time
from (select
ts,
csum(vol * cast(2-dir as double)) as a
from %%tbname
where ts>= _twstart and ts <= _twend)b;
create stream stream_tick_to_kline_s_05
interval(5s) sliding(5s)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m))
into t_kline_s_05
output_subtable(concat('t_kline_s_05_', symbol))
tags (symbol binary(64) as symbol)
as
select
_twstart as ts,
cast(max(close) as double) as h,
cast(min(close) as double) as l,
cast(first(close) as double) as o,
cast(last(close) as double) as c,
cast(sum(vol) as double) as vol,
cast(sum(vol*close) as double) as turn_over,
_twend as create_time
from %%tbname
where ts>= _twstart and ts < _twend;
create stream stream_tick_to_footprint_s_05
interval(5s) sliding(5s)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m))
into t_footprint_s_05
output_subtable(concat('t_footprint_s_05_', symbol))
tags (symbol binary(64) as symbol)
as
select
first(ts) as ts,
close as price,
cast(sum(case when dir =1 then vol else 0.0 end) as double) as bid_vol,
cast(sum(case when dir =3 then vol else 0.0 end) as double) as ask_vol,
cast(sum(case when dir =2 then vol else 0.0 end) as double) as mid_vol,
_twend as create_time
from %%tbname
where ts>= _twstart and ts < _twend
group by close;
create stream stream_tick_to_kldir_s_05
interval(5s) sliding(5s)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m))
into t_kldir_s_05
output_subtable(concat('t_kldir_s_05_', symbol))
tags (symbol binary(64) as symbol)
as
select
_twstart as ts,
sum(case when dir =1 then vol else 0.0 end) as cumulative_bid_vol,
sum(case when dir =3 then vol else 0.0 end) as cumulative_ask_vol,
sum(case when dir =2 then vol else 0.0 end) as cumulative_mid_vol,
_twend as create_time
from %%tbname
where ts>= _twstart and ts <= _twend;
create stream stream_tick_to_klvol_s_05
interval(5s) sliding(5s)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m))
into t_klvol_s_05
output_subtable(concat('t_klvol_s_05_', symbol))
tags (symbol binary(64) as symbol)
as
select
_twstart as ts,
min(a) as min_bid_vol,
max(a) as max_bid_vol,
last(a) as fin_bid_vol,
_twend as create_time
from (select
ts,
csum(vol * cast(2-dir as double)) as a
from %%tbname
where ts>= _twstart and ts <= _twend)b;
create stream stream_tick_to_kline_s_15
interval(15s) sliding(15s)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m))
into t_kline_s_15
output_subtable(concat('t_kline_s_15_', symbol))
tags (symbol binary(64) as symbol)
as
select
_twstart as ts,
cast(max(close) as double) as h,
cast(min(close) as double) as l,
cast(first(close) as double) as o,
cast(last(close) as double) as c,
cast(sum(vol) as double) as vol,
cast(sum(vol*close) as double) as turn_over,
_twend as create_time
from %%tbname
where ts>= _twstart and ts < _twend;
create stream stream_tick_to_footprint_s_15
interval(15s) sliding(15s)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m))
into t_footprint_s_15
output_subtable(concat('t_footprint_s_15_', symbol))
tags (symbol binary(64) as symbol)
as
select
first(ts) as ts,
close as price,
cast(sum(case when dir =1 then vol else 0.0 end) as double) as bid_vol,
cast(sum(case when dir =3 then vol else 0.0 end) as double) as ask_vol,
cast(sum(case when dir =2 then vol else 0.0 end) as double) as mid_vol,
_twend as create_time
from %%tbname
where ts>= _twstart and ts < _twend
group by close;
create stream stream_tick_to_kldir_s_15
interval(15s) sliding(15s)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m))
into t_kldir_s_15
output_subtable(concat('t_kldir_s_15_', symbol))
tags (symbol binary(64) as symbol)
as
select
_twstart as ts,
sum(case when dir =1 then vol else 0.0 end) as cumulative_bid_vol,
sum(case when dir =3 then vol else 0.0 end) as cumulative_ask_vol,
sum(case when dir =2 then vol else 0.0 end) as cumulative_mid_vol,
_twend as create_time
from %%tbname
where ts>= _twstart and ts <= _twend;
create stream stream_tick_to_klvol_s_15
interval(15s) sliding(15s)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m))
into t_klvol_s_15
output_subtable(concat('t_klvol_s_15_', symbol))
tags (symbol binary(64) as symbol)
as
select
_twstart as ts,
min(a) as min_bid_vol,
max(a) as max_bid_vol,
last(a) as fin_bid_vol,
_twend as create_time
from (select
ts,
csum(vol * cast(2-dir as double)) as a
from %%tbname
where ts>= _twstart and ts <= _twend)b;
create stream stream_tick_to_kline_m_01
interval(1m) sliding(1m)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m))
into t_kline_m_01
output_subtable(concat('t_kline_m_01_', symbol))
tags (symbol binary(64) as symbol)
as
select
_twstart as ts,
cast(max(close) as double) as h,
cast(min(close) as double) as l,
cast(first(close) as double) as o,
cast(last(close) as double) as c,
cast(sum(vol) as double) as vol,
cast(sum(vol*close) as double) as turn_over,
_twend as create_time
from %%tbname
where ts>= _twstart and ts < _twend;
create stream stream_tick_to_footprint_m_01
interval(1m) sliding(1m)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m))
into t_footprint_m_01
output_subtable(concat('t_footprint_m_01_', symbol))
tags (symbol binary(64) as symbol)
as
select
first(ts) as ts,
close as price,
cast(sum(case when dir =1 then vol else 0.0 end) as double) as bid_vol,
cast(sum(case when dir =3 then vol else 0.0 end) as double) as ask_vol,
cast(sum(case when dir =2 then vol else 0.0 end) as double) as mid_vol,
_twend as create_time
from %%tbname
where ts>= _twstart and ts < _twend
group by close;
create stream stream_tick_to_kldir_m_01
interval(1m) sliding(1m)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m))
into t_kldir_m_01
output_subtable(concat('t_kldir_m_01_', symbol))
tags (symbol binary(64) as symbol)
as
select
_twstart as ts,
sum(case when dir =1 then vol else 0.0 end) as cumulative_bid_vol,
sum(case when dir =3 then vol else 0.0 end) as cumulative_ask_vol,
sum(case when dir =2 then vol else 0.0 end) as cumulative_mid_vol,
_twend as create_time
from %%tbname
where ts>= _twstart and ts <= _twend;
create stream stream_tick_to_klvol_m_01
interval(1m) sliding(1m)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m))
into t_klvol_m_01
output_subtable(concat('t_klvol_m_01_', symbol))
tags (symbol binary(64) as symbol)
as
select
_twstart as ts,
min(a) as min_bid_vol,
max(a) as max_bid_vol,
last(a) as fin_bid_vol,
_twend as create_time
from (select
ts,
csum(vol * cast(2-dir as double)) as a
from %%tbname
where ts>= _twstart and ts <= _twend)b;
create stream stream_tick_to_kline_m_05
interval(5m) sliding(5m)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m))
into t_kline_m_05
output_subtable(concat('t_kline_m_05_', symbol))
tags (symbol binary(64) as symbol)
as
select
_twstart as ts,
cast(max(close) as double) as h,
cast(min(close) as double) as l,
cast(first(close) as double) as o,
cast(last(close) as double) as c,
cast(sum(vol) as double) as vol,
cast(sum(vol*close) as double) as turn_over,
_twend as create_time
from %%tbname
where ts>= _twstart and ts < _twend;
create stream stream_tick_to_footprint_m_05
interval(5m) sliding(5m)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m))
into t_footprint_m_05
output_subtable(concat('t_footprint_m_05_', symbol))
tags (symbol binary(64) as symbol)
as
select
first(ts) as ts,
close as price,
cast(sum(case when dir =1 then vol else 0.0 end) as double) as bid_vol,
cast(sum(case when dir =3 then vol else 0.0 end) as double) as ask_vol,
cast(sum(case when dir =2 then vol else 0.0 end) as double) as mid_vol,
_twend as create_time
from %%tbname
where ts>= _twstart and ts < _twend
group by close;
create stream stream_tick_to_kldir_m_05
interval(5m) sliding(5m)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m))
into t_kldir_m_05
output_subtable(concat('t_kldir_m_05_', symbol))
tags (symbol binary(64) as symbol)
as
select
_twstart as ts,
sum(case when dir =1 then vol else 0.0 end) as cumulative_bid_vol,
sum(case when dir =3 then vol else 0.0 end) as cumulative_ask_vol,
sum(case when dir =2 then vol else 0.0 end) as cumulative_mid_vol,
_twend as create_time
from %%tbname
where ts>= _twstart and ts <= _twend;
create stream stream_tick_to_klvol_m_05
interval(5m) sliding(5m)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m))
into t_klvol_m_05
output_subtable(concat('t_klvol_m_05_', symbol))
tags (symbol binary(64) as symbol)
as
select
_twstart as ts,
min(a) as min_bid_vol,
max(a) as max_bid_vol,
last(a) as fin_bid_vol,
_twend as create_time
from (select
ts,
csum(vol * cast(2-dir as double)) as a
from %%tbname
where ts>= _twstart and ts <= _twend)b;
create stream stream_tick_to_kline_m_15
interval(15m) sliding(15m)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m))
into t_kline_m_15
output_subtable(concat('t_kline_m_15_', symbol))
tags (symbol binary(64) as symbol)
as
select
_twstart as ts,
cast(max(close) as double) as h,
cast(min(close) as double) as l,
cast(first(close) as double) as o,
cast(last(close) as double) as c,
cast(sum(vol) as double) as vol,
cast(sum(vol*close) as double) as turn_over,
_twend as create_time
from %%tbname
where ts>= _twstart and ts < _twend;
create stream stream_tick_to_footprint_m_15
interval(15m) sliding(15m)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m))
into t_footprint_m_15
output_subtable(concat('t_footprint_m_15_', symbol))
tags (symbol binary(64) as symbol)
as
select
first(ts) as ts,
close as price,
cast(sum(case when dir =1 then vol else 0.0 end) as double) as bid_vol,
cast(sum(case when dir =3 then vol else 0.0 end) as double) as ask_vol,
cast(sum(case when dir =2 then vol else 0.0 end) as double) as mid_vol,
_twend as create_time
from %%tbname
where ts>= _twstart and ts < _twend
group by close;
create stream stream_tick_to_kldir_m_15
interval(15m) sliding(15m)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m))
into t_kldir_m_15
output_subtable(concat('t_kldir_m_15_', symbol))
tags (symbol binary(64) as symbol)
as
select
_twstart as ts,
sum(case when dir =1 then vol else 0.0 end) as cumulative_bid_vol,
sum(case when dir =3 then vol else 0.0 end) as cumulative_ask_vol,
sum(case when dir =2 then vol else 0.0 end) as cumulative_mid_vol,
_twend as create_time
from %%tbname
where ts>= _twstart and ts <= _twend;
create stream stream_tick_to_klvol_m_15
interval(15m) sliding(15m)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m))
into t_klvol_m_15
output_subtable(concat('t_klvol_m_15_', symbol))
tags (symbol binary(64) as symbol)
as
select
_twstart as ts,
min(a) as min_bid_vol,
max(a) as max_bid_vol,
last(a) as fin_bid_vol,
_twend as create_time
from (select
ts,
csum(vol * cast(2-dir as double)) as a
from %%tbname
where ts>= _twstart and ts <= _twend)b;
create stream stream_tick_to_kline_m_30
interval(30m) sliding(30m)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m))
into t_kline_m_30
output_subtable(concat('t_kline_m_30_', symbol))
tags (symbol binary(64) as symbol)
as
select
_twstart as ts,
cast(max(close) as double) as h,
cast(min(close) as double) as l,
cast(first(close) as double) as o,
cast(last(close) as double) as c,
cast(sum(vol) as double) as vol,
cast(sum(vol*close) as double) as turn_over,
_twend as create_time
from %%tbname
where ts>= _twstart and ts < _twend;
create stream stream_tick_to_footprint_m_30
interval(30m) sliding(30m)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m))
into t_footprint_m_30
output_subtable(concat('t_footprint_m_30_', symbol))
tags (symbol binary(64) as symbol)
as
select
first(ts) as ts,
close as price,
cast(sum(case when dir =1 then vol else 0.0 end) as double) as bid_vol,
cast(sum(case when dir =3 then vol else 0.0 end) as double) as ask_vol,
cast(sum(case when dir =2 then vol else 0.0 end) as double) as mid_vol,
_twend as create_time
from %%tbname
where ts>= _twstart and ts < _twend
group by close;
create stream stream_tick_to_kldir_m_30
interval(30m) sliding(30m)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m))
into t_kldir_m_30
output_subtable(concat('t_kldir_m_30_', symbol))
tags (symbol binary(64) as symbol)
as
select
_twstart as ts,
sum(case when dir =1 then vol else 0.0 end) as cumulative_bid_vol,
sum(case when dir =3 then vol else 0.0 end) as cumulative_ask_vol,
sum(case when dir =2 then vol else 0.0 end) as cumulative_mid_vol,
_twend as create_time
from %%tbname
where ts>= _twstart and ts <= _twend;
create stream stream_tick_to_klvol_m_30
interval(30m) sliding(30m)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m))
into t_klvol_m_30
output_subtable(concat('t_klvol_m_30_', symbol))
tags (symbol binary(64) as symbol)
as
select
_twstart as ts,
min(a) as min_bid_vol,
max(a) as max_bid_vol,
last(a) as fin_bid_vol,
_twend as create_time
from (select
ts,
csum(vol * cast(2-dir as double)) as a
from %%tbname
where ts>= _twstart and ts <= _twend)b;
create stream stream_tick_to_kline_h_01
interval(1h) sliding(1h)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m))
into t_kline_h_01
output_subtable(concat('t_kline_h_01_', symbol))
tags (symbol binary(64) as symbol)
as
select
_twstart as ts,
cast(max(close) as double) as h,
cast(min(close) as double) as l,
cast(first(close) as double) as o,
cast(last(close) as double) as c,
cast(sum(vol) as double) as vol,
cast(sum(vol*close) as double) as turn_over,
_twend as create_time
from %%tbname
where ts>= _twstart and ts < _twend;
create stream stream_tick_to_footprint_h_01
interval(1h) sliding(1h)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m))
into t_footprint_h_01
output_subtable(concat('t_footprint_h_01_', symbol))
tags (symbol binary(64) as symbol)
as
select
first(ts) as ts,
close as price,
cast(sum(case when dir =1 then vol else 0.0 end) as double) as bid_vol,
cast(sum(case when dir =3 then vol else 0.0 end) as double) as ask_vol,
cast(sum(case when dir =2 then vol else 0.0 end) as double) as mid_vol,
_twend as create_time
from %%tbname
where ts>= _twstart and ts < _twend
group by close;
create stream stream_tick_to_kldir_h_01
interval(1h) sliding(1h)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m))
into t_kldir_h_01
output_subtable(concat('t_kldir_h_01_', symbol))
tags (symbol binary(64) as symbol)
as
select
_twstart as ts,
sum(case when dir =1 then vol else 0.0 end) as cumulative_bid_vol,
sum(case when dir =3 then vol else 0.0 end) as cumulative_ask_vol,
sum(case when dir =2 then vol else 0.0 end) as cumulative_mid_vol,
_twend as create_time
from %%tbname
where ts>= _twstart and ts <= _twend;
create stream stream_tick_to_klvol_h_01
interval(1h) sliding(1h)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m))
into t_klvol_h_01
output_subtable(concat('t_klvol_h_01_', symbol))
tags (symbol binary(64) as symbol)
as
select
_twstart as ts,
min(a) as min_bid_vol,
max(a) as max_bid_vol,
last(a) as fin_bid_vol,
_twend as create_time
from (select
ts,
csum(vol * cast(2-dir as double)) as a
from %%tbname
where ts>= _twstart and ts <= _twend)b;
create stream stream_tick_to_kline_h_04
interval(4h) sliding(4h)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m))
into t_kline_h_04
output_subtable(concat('t_kline_h_04_', symbol))
tags (symbol binary(64) as symbol)
as
select
_twstart as ts,
cast(max(close) as double) as h,
cast(min(close) as double) as l,
cast(first(close) as double) as o,
cast(last(close) as double) as c,
cast(sum(vol) as double) as vol,
cast(sum(vol*close) as double) as turn_over,
_twend as create_time
from %%tbname
where ts>= _twstart and ts < _twend;
create stream stream_tick_to_footprint_h_04
interval(4h) sliding(4h)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m))
into t_footprint_h_04
output_subtable(concat('t_footprint_h_04_', symbol))
tags (symbol binary(64) as symbol)
as
select
first(ts) as ts,
close as price,
cast(sum(case when dir =1 then vol else 0.0 end) as double) as bid_vol,
cast(sum(case when dir =3 then vol else 0.0 end) as double) as ask_vol,
cast(sum(case when dir =2 then vol else 0.0 end) as double) as mid_vol,
_twend as create_time
from %%tbname
where ts>= _twstart and ts < _twend
group by close;
create stream stream_tick_to_kldir_h_04
interval(4h) sliding(4h)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m))
into t_kldir_h_04
output_subtable(concat('t_kldir_h_04_', symbol))
tags (symbol binary(64) as symbol)
as
select
_twstart as ts,
sum(case when dir =1 then vol else 0.0 end) as cumulative_bid_vol,
sum(case when dir =3 then vol else 0.0 end) as cumulative_ask_vol,
sum(case when dir =2 then vol else 0.0 end) as cumulative_mid_vol,
_twend as create_time
from %%tbname
where ts>= _twstart and ts <= _twend;
create stream stream_tick_to_klvol_h_04
interval(4h) sliding(4h)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m))
into t_klvol_h_04
output_subtable(concat('t_klvol_h_04_', symbol))
tags (symbol binary(64) as symbol)
as
select
_twstart as ts,
min(a) as min_bid_vol,
max(a) as max_bid_vol,
last(a) as fin_bid_vol,
_twend as create_time
from (select
ts,
csum(vol * cast(2-dir as double)) as a
from %%tbname
where ts>= _twstart and ts <= _twend)b;
create stream stream_tick_to_kline_d_01
interval(1d) sliding(1d)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m))
into t_kline_d_01
output_subtable(concat('t_kline_d_01_', symbol))
tags (symbol binary(64) as symbol)
as
select
_twstart as ts,
cast(max(close) as double) as h,
cast(min(close) as double) as l,
cast(first(close) as double) as o,
cast(last(close) as double) as c,
cast(sum(vol) as double) as vol,
cast(sum(vol*close) as double) as turn_over,
_twend as create_time
from %%tbname
where ts>= _twstart and ts < _twend;
create stream stream_tick_to_footprint_d_01
interval(1d) sliding(1d)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m))
into t_footprint_d_01
output_subtable(concat('t_footprint_d_01_', symbol))
tags (symbol binary(64) as symbol)
as
select
first(ts) as ts,
close as price,
cast(sum(case when dir =1 then vol else 0.0 end) as double) as bid_vol,
cast(sum(case when dir =3 then vol else 0.0 end) as double) as ask_vol,
cast(sum(case when dir =2 then vol else 0.0 end) as double) as mid_vol,
_twend as create_time
from %%tbname
where ts>= _twstart and ts < _twend
group by close;
create stream stream_tick_to_kldir_d_01
interval(1d) sliding(1d)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m))
into t_kldir_d_01
output_subtable(concat('t_kldir_d_01_', symbol))
tags (symbol binary(64) as symbol)
as
select
_twstart as ts,
sum(case when dir =1 then vol else 0.0 end) as cumulative_bid_vol,
sum(case when dir =3 then vol else 0.0 end) as cumulative_ask_vol,
sum(case when dir =2 then vol else 0.0 end) as cumulative_mid_vol,
_twend as create_time
from %%tbname
where ts>= _twstart and ts <= _twend;
create stream stream_tick_to_klvol_d_01
interval(1d) sliding(1d)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m))
into t_klvol_d_01
output_subtable(concat('t_klvol_d_01_', symbol))
tags (symbol binary(64) as symbol)
as
select
_twstart as ts,
min(a) as min_bid_vol,
max(a) as max_bid_vol,
last(a) as fin_bid_vol,
_twend as create_time
from (select
ts,
csum(vol * cast(2-dir as double)) as a
from %%tbname
where ts>= _twstart and ts <= _twend)b;
create stream stream_tick_to_kline_w_01
interval(1w) sliding(1w)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m))
into t_kline_w_01
output_subtable(concat('t_kline_w_01_', symbol))
tags (symbol binary(64) as symbol)
as
select
_twstart as ts,
cast(max(close) as double) as h,
cast(min(close) as double) as l,
cast(first(close) as double) as o,
cast(last(close) as double) as c,
cast(sum(vol) as double) as vol,
cast(sum(vol*close) as double) as turn_over,
_twend as create_time
from %%tbname
where ts>= _twstart and ts < _twend;
create stream stream_tick_to_footprint_w_01
interval(1w) sliding(1w)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m))
into t_footprint_w_01
output_subtable(concat('t_footprint_w_01_', symbol))
tags (symbol binary(64) as symbol)
as
select
first(ts) as ts,
close as price,
cast(sum(case when dir =1 then vol else 0.0 end) as double) as bid_vol,
cast(sum(case when dir =3 then vol else 0.0 end) as double) as ask_vol,
cast(sum(case when dir =2 then vol else 0.0 end) as double) as mid_vol,
_twend as create_time
from %%tbname
where ts>= _twstart and ts < _twend
group by close;
create stream stream_tick_to_kldir_w_01
interval(1w) sliding(1w)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m))
into t_kldir_w_01
output_subtable(concat('t_kldir_w_01_', symbol))
tags (symbol binary(64) as symbol)
as
select
_twstart as ts,
sum(case when dir =1 then vol else 0.0 end) as cumulative_bid_vol,
sum(case when dir =3 then vol else 0.0 end) as cumulative_ask_vol,
sum(case when dir =2 then vol else 0.0 end) as cumulative_mid_vol,
_twend as create_time
from %%tbname
where ts>= _twstart and ts <= _twend;
create stream stream_tick_to_klvol_w_01
interval(1w) sliding(1w)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m))
into t_klvol_w_01
output_subtable(concat('t_klvol_w_01_', symbol))
tags (symbol binary(64) as symbol)
as
select
_twstart as ts,
min(a) as min_bid_vol,
max(a) as max_bid_vol,
last(a) as fin_bid_vol,
_twend as create_time
from (select
ts,
csum(vol * cast(2-dir as double)) as a
from %%tbname
where ts>= _twstart and ts <= _twend)b;
删除
drop stream if exists stream_tick_to_klvol_w_01;
drop stream if exists stream_tick_to_kldir_w_01;
drop stream if exists stream_tick_to_footprint_w_01;
drop stream if exists stream_tick_to_kline_w_01;
drop stream if exists stream_tick_to_klvol_d_01;
drop stream if exists stream_tick_to_kldir_d_01;
drop stream if exists stream_tick_to_footprint_d_01;
drop stream if exists stream_tick_to_kline_d_01;
drop stream if exists stream_tick_to_klvol_h_04;
drop stream if exists stream_tick_to_kldir_h_04;
drop stream if exists stream_tick_to_footprint_h_04;
drop stream if exists stream_tick_to_kline_h_04;
drop stream if exists stream_tick_to_klvol_h_01;
drop stream if exists stream_tick_to_kldir_h_01;
drop stream if exists stream_tick_to_footprint_h_01;
drop stream if exists stream_tick_to_kline_h_01;
drop stream if exists stream_tick_to_klvol_m_30;
drop stream if exists stream_tick_to_kldir_m_30;
drop stream if exists stream_tick_to_footprint_m_30;
drop stream if exists stream_tick_to_kline_m_30;
drop stream if exists stream_tick_to_klvol_m_15;
drop stream if exists stream_tick_to_kldir_m_15;
drop stream if exists stream_tick_to_footprint_m_15;
drop stream if exists stream_tick_to_kline_m_15;
drop stream if exists stream_tick_to_klvol_m_05;
drop stream if exists stream_tick_to_kldir_m_05;
drop stream if exists stream_tick_to_footprint_m_05;
drop stream if exists stream_tick_to_kline_m_05;
drop stream if exists stream_tick_to_klvol_m_01;
drop stream if exists stream_tick_to_kldir_m_01;
drop stream if exists stream_tick_to_footprint_m_01;
drop stream if exists stream_tick_to_kline_m_01;
drop stream if exists stream_tick_to_klvol_s_15;
drop stream if exists stream_tick_to_kldir_s_15;
drop stream if exists stream_tick_to_footprint_s_15;
drop stream if exists stream_tick_to_kline_s_15;
drop stream if exists stream_tick_to_klvol_s_05;
drop stream if exists stream_tick_to_kldir_s_05;
drop stream if exists stream_tick_to_footprint_s_05;
drop stream if exists stream_tick_to_kline_s_05;
drop stream if exists stream_tick_to_klvol_s_01;
drop stream if exists stream_tick_to_kldir_s_01;
drop stream if exists stream_tick_to_footprint_s_01;
drop stream if exists stream_tick_to_kline_s_01;
查看流在节点的分布
select * from information_schema.ins_streams;写入tick
从指定的kafka写入tick。观察stream的运行。
存量
获得最大吞吐量
增量
查看持续运行稳定性
查询
查询品类列表含最新tick
select symbol,last(ts) as lts,last(close) as lclose,last(dir) as ldir from t_tick group by symbol order by lclose desc limit 0,10;集群
taosd社区版本支持计算复制集、计算、查询分离,以合理分担压力。可使用三台机器形成数据复制实例,并将流计算放在一个节点,查询放在一个节点,主管理在一个节点。可如下配置/etc/taos/taos.cfg
hosts
每一个节点fqdn就是主机名,并需要在/etc/hosts配置,确保网络是通的。
192.168.7.111 snode
192.168.7.186 mnode
192.168.7.87 qnode
mnode
# The end point of the first dnode in the cluster to be connected to when this dnode or the CLI utility is started
firstEp mnode:6030
fqdn mnode
serverPort 6030
supportVnodes 1
supportSnodes 0
supportQnodes 0snode
firstEp mnode:6030
fqdn snode
serverPort 6030
supportVnodes 256
supportQnodes 1
supportSnodes 1
qnode
firstEp mnode:6030
fqdn qnode
serverPort 6030
supportVnodes 256
supportQnodes 1
supportSnodes 0
create snode on dnode 1;
create snode on dnode 3;
create snode on dnode 4;
taos> show dnodes;
id | endpoint | vnodes | support_vnodes | status | create_time | reboot_time | note |
=============================================================================================================================================================================
1 | tdengine-0.186:6030 | 3 | 29 | ready | 2026-04-24 18:16:11.228 | 2026-05-06 17:51:10.269 | |
3 | ct4dev:6030 | 2 | 256 | ready | 2026-05-06 16:35:02.383 | 2026-05-07 13:40:44.757 | |
4 | tdengine-7.87:6030 | 2 | 256 | ready | 2026-05-06 17:19:23.917 | 2026-05-06 17:51:46.640 | |
Query OK, 3 row(s) in set (0.005232s)
taos> show snodes;
id | endpoint | create_time | replicaId | asReplicaOf |
========================================================================================================================
1 | tdengine-0.186:6030 | 2026-05-06 18:12:28.481 | 3 | 3, 4 |
3 | ct4dev:6030 | 2026-05-06 16:54:59.151 | 1 | 1 |
4 | tdengine-7.87:6030 | 2026-05-06 18:12:30.914 | 1 | None |
Query OK, 3 row(s) in set (0.005251s)
订阅
consumer>tree
.
├── pom.xml
├── src
│ ├── main
│ │ └── java
│ │ └── com
│ │ └── taos
│ │ └── App.java
│ └── test
│ └── java
│ └── com
│ └── taos
│ └── AppTest.java
// export TDENGINE_JDBC_URL="jdbc:TAOS-RS://192.168.7.186:6041/rest/tmq?user=root&password=A7fK9xQ2LmR8tY4Z"
package com.taos;
import com.taosdata.jdbc.tmq.ConsumerRecord;
import com.taosdata.jdbc.tmq.ConsumerRecords;
import com.taosdata.jdbc.tmq.TMQConstants;
import com.taosdata.jdbc.tmq.TaosConsumer;
import java.sql.SQLException;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
public class App {
public static void main(String[] args) throws SQLException {
String url = System.getenv("TDENGINE_JDBC_URL");
Properties properties = new Properties();
properties.setProperty(TMQConstants.CONNECT_TYPE, "ws");
properties.setProperty(TMQConstants.CONNECT_URL, url);
properties.setProperty(TMQConstants.CONNECT_TIMEOUT, "10000");
properties.setProperty(TMQConstants.CONNECT_MESSAGE_TIMEOUT, "10000");
properties.setProperty(TMQConstants.MSG_WITH_TABLE_NAME, "true");
properties.setProperty(TMQConstants.ENABLE_AUTO_COMMIT, "true");
properties.setProperty(TMQConstants.GROUP_ID, "gId");
properties.setProperty(TMQConstants.VALUE_DESERIALIZER, "com.taosdata.jdbc.tmq.MapDeserializer");
try (TaosConsumer<Map<String, Object>> consumer = new TaosConsumer<>(properties)) {
consumer.subscribe(Collections.singletonList("tick2"));
// 无限循环消费
while (true) {
try {
// 核心:poll 套一层 try-catch,忽略末尾的信号错误
ConsumerRecords<Map<String, Object>> consumerRecords = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<Map<String, Object>> r : consumerRecords) {
Map<String, Object> bean = r.value();
bean.forEach((k, v) -> {
if ("symbol".equals(k) && v instanceof byte[]) {
System.out.print(k + " : " + new String((byte[]) v) + " ");
} else {
System.out.print(k + " : " + v + " ");
}
});
System.out.println();
}
} catch (Exception e) {
// 忽略 0x2350 这个正常错误
if (e.getMessage().contains("message type is not data")) {
// 无数据,休息一下继续轮询
try { Thread.sleep(50); } catch (InterruptedException ignored) {}
continue;
} else {
// 其他错误才抛出
throw e;
}
}
}
}
}
}
package com.taos;
import junit.framework.Test;
import junit.framework.TestCase;
import junit.framework.TestSuite;
/**
* Unit test for simple App.
*/
public class AppTest
extends TestCase
{
/**
* Create the test case
*
* @param testName name of the test case
*/
public AppTest( String testName )
{
super( testName );
}
/**
* @return the suite of tests being tested
*/
public static Test suite()
{
return new TestSuite( AppTest.class );
}
/**
* Rigourous Test :-)
*/
public void testApp()
{
assertTrue( true );
}
}
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.taos</groupId>
<artifactId>consumer</artifactId>
<packaging>jar</packaging>
<version>1.0.0</version>
<name>consumer</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>3.2.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.3.0</version>
<configuration>
<archive>
<manifest>
<mainClass>com.taos.App</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
异常
文件数量
在本文描述场景中,tdengine将打开近7000个文件。如果文件句柄不够,将造成流无法创建新的 数据,甚至taosd因为无法打开wal文件而崩溃。
另外当taos受制于文件句柄,内存时,将吃光系统内存,导致无法登录系统。可通过服务限制其最高cpu和内存,确保服务器不崩溃。
#/etc/sysctl.conf
sudo sysctl -p
kernel.core_pattern = core.%e.%p.%t
vm.swappiness = 0
vm.vfs_cache_pressure = 50
vm.dirty_ratio = 20
vm.dirty_background_ratio = 10
vm.dirty_expire_centisecs = 2000
# 如下两行
fs.nr_open = 1048576
fs.file-max = 2097152
#cat /etc/systemd/system/taosd.service
[Unit]
Description=TDengine server service
After=network-online.target
Wants=network-online.target
[Service]
# CPU 限制:200% 表示最多使用 2 个完整的 CPU 核心
CPUQuota=200%
# 内存限制:8GB 硬上限
MemoryMax=8G
# 内存限制:6GB 软上限
MemoryHigh=6G
Type=simple
ExecStart=/usr/bin/taosd
ExecStartPre=/usr/local/taos/bin/startPre.sh
TimeoutStopSec=1000000s
LimitNOFILE=infinity
LimitNPROC=infinity
LimitCORE=infinity
TimeoutStartSec=0
StandardOutput=null
Restart=always
StartLimitBurst=3
StartLimitInterval=900s
[Install]
WantedBy=multi-user.target
数据迁移
运行中的数据库,会将EP计入全局配置,cfg等全部无效。比如之前使用ip-port启动的服务,如果直接拷贝迁移到其他地方,将还是只能从原先设定的ip-port启动。