修炼者
修炼者
发布于 2026-05-06 / 1 阅读
0
0

TDengine

从源码构建

# 安装构建基本环境
sudo apt-get install -y gcc cmake build-essential git libjansson-dev   libsnappy-dev liblzma-dev zlib1g-dev pkg-config libtool autoconf automake groff

# 用来安装adaptor keeper explore之类的辅助,如果不要可以不安装go
sudo apt install -y golang-go

# 指定版本下载源码
git clone --depth 1 --branch ver-3.4.1.0 https://github.com/taosdata/TDengine.git
cd TDengine/

# 
cmake -B build -DBUILD_TOOLS=true -DBUILD_CONTRIB=true
cmake -B build -DBUILD_TOOLS=true -DBUILD_CONTRIB=true -DBUILD_KEEPER=true -DBUILD_HTTP=false
cmake --build build/ install

配置

# 可检查下/etc/taos.cfg的配置,一般只需要修改下日志、数据的存放路径,然后可直接启动服务
systemctrl start taos

# 首次登录无密码,如果无日志写入权限,需要sudo
taos

# 进入之后,可设置密码,创建用户
alter user root pass 'abc123456@';

create user gauss pass 'Gauss20250922@';

# 设定密码后,再次使用客户端
sudo taos -uroot -fb123456@

# 如果要使用流计算,需要创建sndoe,再taos命令下
show dnodes;
create snode on dnode 1;

# 删除snode
drop snode on dnode 1;

管理数据库

创建

# tdengine的数据库有一些配置选项,这里希望缓存最后数据,提供十年存放周期,时间精度为ms
CREATE DATABASE `db_tick` 
   CACHEMODEL 'last_row' 
   KEEP 36500 
   PRECISION 'ms' ;

删除

# 删除数据库 : tdengine如果只删除超级表,不删除子表,会存在隐患
drop database `db_tick`;

查看

# 查看数据库列表
show databases;
# 查看数据库DDL
show create database db_tick\G;
# 查看数据库里的超级表
select stable_name,db_name,`tags` from information_schema.ins_stables order by stable_name;
# 选定指定数据库
use db_tick;
# 显示超级表列表
show stables;
# 查看超级表DDL
show create stable t_tick\G;

创建超级表

CREATE STABLE `t_tick` (
  `ts` TIMESTAMP, 
  `ask` DOUBLE, 
  `bid` DOUBLE, 
  `close` DOUBLE, 
  `vol` DOUBLE, 
  `dir` TINYINT, 
  `create_time` TIMESTAMP
) TAGS (`symbol` VARCHAR(64)) ;

CREATE STABLE `t_kline_s_01` (
  `ts` TIMESTAMP, 
  `h` DOUBLE, 
  `l` DOUBLE, 
  `o` DOUBLE, 
  `c` DOUBLE, 
  `vol` DOUBLE, 
  `turn_over` DOUBLE, 
  `create_time` TIMESTAMP
) TAGS (`symbol` VARCHAR(64)) ;

CREATE STABLE `t_kline_dir_s_01` (
  `ts` TIMESTAMP, 
  `cumulative_bid_vol` DOUBLE, 
  `cumulative_ask_vol` DOUBLE, 
  `cumulative_mid_vol` DOUBLE, 
  `create_time` TIMESTAMP
) TAGS (`symbol` VARCHAR(64)) ;

CREATE STABLE `t_kline_vol_s_01` (
  `ts` TIMESTAMP, 
  `min_bid_vol` DOUBLE, 
  `max_bid_vol` DOUBLE, 
  `fin_bid_vol` DOUBLE, 
  `create_time` TIMESTAMP
) TAGS (`symbol` VARCHAR(64)) ;

CREATE STABLE `t_footprint_s_01` (
  `ts` TIMESTAMP, 
  `price` DOUBLE, 
  `bid_vol` DOUBLE, 
  `ask_vol` DOUBLE, 
  `mid_vol` DOUBLE, 
  `create_time` TIMESTAMP
) TAGS (`symbol` VARCHAR(64)) ;

创建流

drop stream stream_tick_to_kline_s_01;
create stream stream_tick_to_kline_s_01 
  interval(1s) sliding(1s) 
from t_tick 
  partition by tbname,symbol 
  stream_options(ignore_nodata_trigger | max_delay(1m)) 
into t_kline_s_01 
  output_subtable(concat('t_kline_s_01_', symbol)) 
  tags (symbol binary(64) as symbol) 
as 
  select 
    _twstart as ts, 
    cast(max(close) as double) as h, 
    cast(min(close) as double) as l, 
    cast(first(close) as double) as o, 
    cast(last(close) as double) as c, 
    cast(sum(vol) as double) as vol, 
    cast(sum(vol*close) as double) as turn_over,
    _twend  as create_time 
  from %%tbname 
  where ts>= _twstart and ts < _twend;

drop stream stream_tick_to_footprint_s_01;
create stream stream_tick_to_footprint_s_01 
  interval(1s) sliding(1s) 
from t_tick 
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m)) 
into t_footprint_s_01 
  output_subtable(concat('t_footprint_s_01_', symbol)) 
  tags (symbol binary(64) as symbol) 
as 
  select 
    first(ts) as ts, 
    close as price, 
    cast(sum(case when dir =1 then vol else 0.0 end) as double) as bid_vol, 
    cast(sum(case when dir =3 then vol else 0.0 end) as double) as ask_vol, 
    cast(sum(case when dir =2 then vol else 0.0 end) as double) as mid_vol, 
    _twend as create_time 
  from %%tbname 
  where ts>= _twstart and ts < _twend 
  group by close;

drop stream stream_tick_to_kline_dir_s_01;
create stream stream_tick_to_kline_dir_s_01 
  interval(1s) sliding(1s) 
from t_tick 
  partition by tbname,symbol 
  stream_options(ignore_nodata_trigger | max_delay(1m)) 
into t_kline_dir_s_01 
  output_subtable(concat('t_kline_dir_s_01_', symbol)) 
  tags (symbol binary(64) as symbol) 
as 
  select 
    _twstart as ts, 
    sum(case when dir =1 then vol else 0.0 end) as cumulative_bid_vol, 
    sum(case when dir =3 then vol else 0.0 end) as cumulative_ask_vol, 
    sum(case when dir =2 then vol else 0.0 end) as cumulative_mid_vol, 
    _twend as create_time 
  from %%tbname 
  where ts>= _twstart and ts <= _twend;

drop stream stream_tick_to_kline_vol_s_01;
create stream stream_tick_to_kline_vol_s_01 
  interval(1s) sliding(1s) 
from db_tick.t_tick 
  partition by tbname,symbol 
  stream_options(ignore_nodata_trigger | max_delay(1m) |FILL_HISTORY(1) ) 
into db_tick.t_kline_vol_s_01 
  output_subtable(concat('t_kline_vol_s_01_', symbol)) 
  tags (symbol binary(64) as symbol) 
as 
  select 
    _twstart as ts, 
    min(a) as min_bid_vol,
    max(a) as max_bid_vol,
    last(a) as fin_bid_vol,
    _twend as create_time
  from (select  
    ts,
    csum(vol * cast(2-dir as double)) as a
  from %%tbname 
  where ts>= _twstart and ts <= _twend)b;

测试

创建1,5,15s,1,5,15,30m,1,4h,1d,1w这11个周期的流计算,每个都有基于tick的四个计算包括kline、klvol、kldir、footprint。如下脚本。

超级表

创建

CREATE STABLE `t_tick` (`ts` TIMESTAMP, `ask` DOUBLE, `bid` DOUBLE, `close` DOUBLE, `vol` DOUBLE, `dir` TINYINT, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;

CREATE STABLE `t_kline_s_01` (`ts` TIMESTAMP, `h` DOUBLE, `l` DOUBLE, `o` DOUBLE, `c` DOUBLE, `vol` DOUBLE, `turn_over` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_kldir_s_01` (`ts` TIMESTAMP, `cumulative_bid_vol` DOUBLE, `cumulative_ask_vol` DOUBLE, `cumulative_mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_klvol_s_01` (`ts` TIMESTAMP, `min_bid_vol` DOUBLE, `max_bid_vol` DOUBLE, `fin_bid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_footprint_s_01` (`ts` TIMESTAMP, `price` DOUBLE, `bid_vol` DOUBLE, `ask_vol` DOUBLE, `mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;

CREATE STABLE `t_kline_s_05` (`ts` TIMESTAMP, `h` DOUBLE, `l` DOUBLE, `o` DOUBLE, `c` DOUBLE, `vol` DOUBLE, `turn_over` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_kldir_s_05` (`ts` TIMESTAMP, `cumulative_bid_vol` DOUBLE, `cumulative_ask_vol` DOUBLE, `cumulative_mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_klvol_s_05` (`ts` TIMESTAMP, `min_bid_vol` DOUBLE, `max_bid_vol` DOUBLE, `fin_bid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_footprint_s_05` (`ts` TIMESTAMP, `price` DOUBLE, `bid_vol` DOUBLE, `ask_vol` DOUBLE, `mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;

CREATE STABLE `t_kline_s_15` (`ts` TIMESTAMP, `h` DOUBLE, `l` DOUBLE, `o` DOUBLE, `c` DOUBLE, `vol` DOUBLE, `turn_over` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_kldir_s_15` (`ts` TIMESTAMP, `cumulative_bid_vol` DOUBLE, `cumulative_ask_vol` DOUBLE, `cumulative_mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_klvol_s_15` (`ts` TIMESTAMP, `min_bid_vol` DOUBLE, `max_bid_vol` DOUBLE, `fin_bid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_footprint_s_15` (`ts` TIMESTAMP, `price` DOUBLE, `bid_vol` DOUBLE, `ask_vol` DOUBLE, `mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;

CREATE STABLE `t_kline_m_01` (`ts` TIMESTAMP, `h` DOUBLE, `l` DOUBLE, `o` DOUBLE, `c` DOUBLE, `vol` DOUBLE, `turn_over` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_kldir_m_01` (`ts` TIMESTAMP, `cumulative_bid_vol` DOUBLE, `cumulative_ask_vol` DOUBLE, `cumulative_mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_klvol_m_01` (`ts` TIMESTAMP, `min_bid_vol` DOUBLE, `max_bid_vol` DOUBLE, `fin_bid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_footprint_m_01` (`ts` TIMESTAMP, `price` DOUBLE, `bid_vol` DOUBLE, `ask_vol` DOUBLE, `mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;

CREATE STABLE `t_kline_m_05` (`ts` TIMESTAMP, `h` DOUBLE, `l` DOUBLE, `o` DOUBLE, `c` DOUBLE, `vol` DOUBLE, `turn_over` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_kldir_m_05` (`ts` TIMESTAMP, `cumulative_bid_vol` DOUBLE, `cumulative_ask_vol` DOUBLE, `cumulative_mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_klvol_m_05` (`ts` TIMESTAMP, `min_bid_vol` DOUBLE, `max_bid_vol` DOUBLE, `fin_bid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_footprint_m_05` (`ts` TIMESTAMP, `price` DOUBLE, `bid_vol` DOUBLE, `ask_vol` DOUBLE, `mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;

CREATE STABLE `t_kline_m_15` (`ts` TIMESTAMP, `h` DOUBLE, `l` DOUBLE, `o` DOUBLE, `c` DOUBLE, `vol` DOUBLE, `turn_over` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_kldir_m_15` (`ts` TIMESTAMP, `cumulative_bid_vol` DOUBLE, `cumulative_ask_vol` DOUBLE, `cumulative_mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_klvol_m_15` (`ts` TIMESTAMP, `min_bid_vol` DOUBLE, `max_bid_vol` DOUBLE, `fin_bid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_footprint_m_15` (`ts` TIMESTAMP, `price` DOUBLE, `bid_vol` DOUBLE, `ask_vol` DOUBLE, `mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;

CREATE STABLE `t_kline_m_30` (`ts` TIMESTAMP, `h` DOUBLE, `l` DOUBLE, `o` DOUBLE, `c` DOUBLE, `vol` DOUBLE, `turn_over` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_kldir_m_30` (`ts` TIMESTAMP, `cumulative_bid_vol` DOUBLE, `cumulative_ask_vol` DOUBLE, `cumulative_mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_klvol_m_30` (`ts` TIMESTAMP, `min_bid_vol` DOUBLE, `max_bid_vol` DOUBLE, `fin_bid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_footprint_m_30` (`ts` TIMESTAMP, `price` DOUBLE, `bid_vol` DOUBLE, `ask_vol` DOUBLE, `mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;

CREATE STABLE `t_kline_h_01` (`ts` TIMESTAMP, `h` DOUBLE, `l` DOUBLE, `o` DOUBLE, `c` DOUBLE, `vol` DOUBLE, `turn_over` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_kldir_h_01` (`ts` TIMESTAMP, `cumulative_bid_vol` DOUBLE, `cumulative_ask_vol` DOUBLE, `cumulative_mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_klvol_h_01` (`ts` TIMESTAMP, `min_bid_vol` DOUBLE, `max_bid_vol` DOUBLE, `fin_bid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_footprint_h_01` (`ts` TIMESTAMP, `price` DOUBLE, `bid_vol` DOUBLE, `ask_vol` DOUBLE, `mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;

CREATE STABLE `t_kline_h_04` (`ts` TIMESTAMP, `h` DOUBLE, `l` DOUBLE, `o` DOUBLE, `c` DOUBLE, `vol` DOUBLE, `turn_over` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_kldir_h_04` (`ts` TIMESTAMP, `cumulative_bid_vol` DOUBLE, `cumulative_ask_vol` DOUBLE, `cumulative_mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_klvol_h_04` (`ts` TIMESTAMP, `min_bid_vol` DOUBLE, `max_bid_vol` DOUBLE, `fin_bid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_footprint_h_04` (`ts` TIMESTAMP, `price` DOUBLE, `bid_vol` DOUBLE, `ask_vol` DOUBLE, `mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;

CREATE STABLE `t_kline_d_01` (`ts` TIMESTAMP, `h` DOUBLE, `l` DOUBLE, `o` DOUBLE, `c` DOUBLE, `vol` DOUBLE, `turn_over` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_kldir_d_01` (`ts` TIMESTAMP, `cumulative_bid_vol` DOUBLE, `cumulative_ask_vol` DOUBLE, `cumulative_mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_klvol_d_01` (`ts` TIMESTAMP, `min_bid_vol` DOUBLE, `max_bid_vol` DOUBLE, `fin_bid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_footprint_d_01` (`ts` TIMESTAMP, `price` DOUBLE, `bid_vol` DOUBLE, `ask_vol` DOUBLE, `mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;

CREATE STABLE `t_kline_w_01` (`ts` TIMESTAMP, `h` DOUBLE, `l` DOUBLE, `o` DOUBLE, `c` DOUBLE, `vol` DOUBLE, `turn_over` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_kldir_w_01` (`ts` TIMESTAMP, `cumulative_bid_vol` DOUBLE, `cumulative_ask_vol` DOUBLE, `cumulative_mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_klvol_w_01` (`ts` TIMESTAMP, `min_bid_vol` DOUBLE, `max_bid_vol` DOUBLE, `fin_bid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_footprint_w_01` (`ts` TIMESTAMP, `price` DOUBLE, `bid_vol` DOUBLE, `ask_vol` DOUBLE, `mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;

删除

drop stable if exists `t_footprint_w_01`;
drop stable if exists `t_footprint_d_01`;
drop stable if exists `t_footprint_h_04`;
drop stable if exists `t_footprint_h_01`;
drop stable if exists `t_footprint_m_30`;
drop stable if exists `t_footprint_m_15`;
drop stable if exists `t_footprint_m_05`;
drop stable if exists `t_footprint_m_01`;
drop stable if exists `t_footprint_s_15`;
drop stable if exists `t_footprint_s_05`;
drop stable if exists `t_footprint_s_01`;
drop stable if exists `t_klvol_w_01`;
drop stable if exists `t_klvol_d_01`;
drop stable if exists `t_klvol_h_04`;
drop stable if exists `t_klvol_h_01`;
drop stable if exists `t_klvol_m_30`;
drop stable if exists `t_klvol_m_15`;
drop stable if exists `t_klvol_m_05`;
drop stable if exists `t_klvol_m_01`;
drop stable if exists `t_klvol_s_15`;
drop stable if exists `t_klvol_s_05`;
drop stable if exists `t_klvol_s_01`;
drop stable if exists `t_kldir_w_01`;
drop stable if exists `t_kldir_d_01`;
drop stable if exists `t_kldir_h_04`;
drop stable if exists `t_kldir_h_01`;
drop stable if exists `t_kldir_m_30`;
drop stable if exists `t_kldir_m_15`;
drop stable if exists `t_kldir_m_05`;
drop stable if exists `t_kldir_m_01`;
drop stable if exists `t_kldir_s_15`;
drop stable if exists `t_kldir_s_05`;
drop stable if exists `t_kldir_s_01`;
drop stable if exists `t_kline_w_01`;
drop stable if exists `t_kline_d_01`;
drop stable if exists `t_kline_h_04`;
drop stable if exists `t_kline_h_01`;
drop stable if exists `t_kline_m_30`;
drop stable if exists `t_kline_m_15`;
drop stable if exists `t_kline_m_05`;
drop stable if exists `t_kline_m_01`;
drop stable if exists `t_kline_s_15`;
drop stable if exists `t_kline_s_05`;
drop stable if exists `t_kline_s_01`;
drop stable if exists `t_tick`;

创建


create stream stream_tick_to_kline_s_01
  interval(1s) sliding(1s)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m))
into t_kline_s_01
  output_subtable(concat('t_kline_s_01_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    _twstart as ts,
    cast(max(close) as double) as h,
    cast(min(close) as double) as l,
    cast(first(close) as double) as o,
    cast(last(close) as double) as c,
    cast(sum(vol) as double) as vol,
    cast(sum(vol*close) as double) as turn_over,
    _twend as create_time
  from %%tbname
  where ts>= _twstart and ts < _twend;

create stream stream_tick_to_footprint_s_01
  interval(1s) sliding(1s)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m))
into t_footprint_s_01
  output_subtable(concat('t_footprint_s_01_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    first(ts) as ts,
    close as price,
    cast(sum(case when dir =1 then vol else 0.0 end) as double) as bid_vol,
    cast(sum(case when dir =3 then vol else 0.0 end) as double) as ask_vol,
    cast(sum(case when dir =2 then vol else 0.0 end) as double) as mid_vol,
    _twend as create_time
  from %%tbname
  where ts>= _twstart and ts < _twend
  group by close;

create stream stream_tick_to_kldir_s_01
  interval(1s) sliding(1s)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m))
into t_kldir_s_01
  output_subtable(concat('t_kldir_s_01_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    _twstart as ts,
    sum(case when dir =1 then vol else 0.0 end) as cumulative_bid_vol,
    sum(case when dir =3 then vol else 0.0 end) as cumulative_ask_vol,
    sum(case when dir =2 then vol else 0.0 end) as cumulative_mid_vol,
    _twend as create_time
  from %%tbname
  where ts>= _twstart and ts <= _twend;

create stream stream_tick_to_klvol_s_01
  interval(1s) sliding(1s)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m))
into t_klvol_s_01
  output_subtable(concat('t_klvol_s_01_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    _twstart as ts,
    min(a) as min_bid_vol,
    max(a) as max_bid_vol,
    last(a) as fin_bid_vol,
    _twend as create_time
  from (select
    ts,
    csum(vol * cast(2-dir as double)) as a
  from %%tbname
  where ts>= _twstart and ts <= _twend)b;

create stream stream_tick_to_kline_s_05
  interval(5s) sliding(5s)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m))
into t_kline_s_05
  output_subtable(concat('t_kline_s_05_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    _twstart as ts,
    cast(max(close) as double) as h,
    cast(min(close) as double) as l,
    cast(first(close) as double) as o,
    cast(last(close) as double) as c,
    cast(sum(vol) as double) as vol,
    cast(sum(vol*close) as double) as turn_over,
    _twend as create_time
  from %%tbname
  where ts>= _twstart and ts < _twend;

create stream stream_tick_to_footprint_s_05
  interval(5s) sliding(5s)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m))
into t_footprint_s_05
  output_subtable(concat('t_footprint_s_05_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    first(ts) as ts,
    close as price,
    cast(sum(case when dir =1 then vol else 0.0 end) as double) as bid_vol,
    cast(sum(case when dir =3 then vol else 0.0 end) as double) as ask_vol,
    cast(sum(case when dir =2 then vol else 0.0 end) as double) as mid_vol,
    _twend as create_time
  from %%tbname
  where ts>= _twstart and ts < _twend
  group by close;

create stream stream_tick_to_kldir_s_05
  interval(5s) sliding(5s)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m))
into t_kldir_s_05
  output_subtable(concat('t_kldir_s_05_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    _twstart as ts,
    sum(case when dir =1 then vol else 0.0 end) as cumulative_bid_vol,
    sum(case when dir =3 then vol else 0.0 end) as cumulative_ask_vol,
    sum(case when dir =2 then vol else 0.0 end) as cumulative_mid_vol,
    _twend as create_time
  from %%tbname
  where ts>= _twstart and ts <= _twend;

create stream stream_tick_to_klvol_s_05
  interval(5s) sliding(5s)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m))
into t_klvol_s_05
  output_subtable(concat('t_klvol_s_05_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    _twstart as ts,
    min(a) as min_bid_vol,
    max(a) as max_bid_vol,
    last(a) as fin_bid_vol,
    _twend as create_time
  from (select
    ts,
    csum(vol * cast(2-dir as double)) as a
  from %%tbname
  where ts>= _twstart and ts <= _twend)b;

create stream stream_tick_to_kline_s_15
  interval(15s) sliding(15s)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m))
into t_kline_s_15
  output_subtable(concat('t_kline_s_15_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    _twstart as ts,
    cast(max(close) as double) as h,
    cast(min(close) as double) as l,
    cast(first(close) as double) as o,
    cast(last(close) as double) as c,
    cast(sum(vol) as double) as vol,
    cast(sum(vol*close) as double) as turn_over,
    _twend as create_time
  from %%tbname
  where ts>= _twstart and ts < _twend;

create stream stream_tick_to_footprint_s_15
  interval(15s) sliding(15s)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m))
into t_footprint_s_15
  output_subtable(concat('t_footprint_s_15_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    first(ts) as ts,
    close as price,
    cast(sum(case when dir =1 then vol else 0.0 end) as double) as bid_vol,
    cast(sum(case when dir =3 then vol else 0.0 end) as double) as ask_vol,
    cast(sum(case when dir =2 then vol else 0.0 end) as double) as mid_vol,
    _twend as create_time
  from %%tbname
  where ts>= _twstart and ts < _twend
  group by close;

create stream stream_tick_to_kldir_s_15
  interval(15s) sliding(15s)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m))
into t_kldir_s_15
  output_subtable(concat('t_kldir_s_15_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    _twstart as ts,
    sum(case when dir =1 then vol else 0.0 end) as cumulative_bid_vol,
    sum(case when dir =3 then vol else 0.0 end) as cumulative_ask_vol,
    sum(case when dir =2 then vol else 0.0 end) as cumulative_mid_vol,
    _twend as create_time
  from %%tbname
  where ts>= _twstart and ts <= _twend;

create stream stream_tick_to_klvol_s_15
  interval(15s) sliding(15s)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m))
into t_klvol_s_15
  output_subtable(concat('t_klvol_s_15_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    _twstart as ts,
    min(a) as min_bid_vol,
    max(a) as max_bid_vol,
    last(a) as fin_bid_vol,
    _twend as create_time
  from (select
    ts,
    csum(vol * cast(2-dir as double)) as a
  from %%tbname
  where ts>= _twstart and ts <= _twend)b;

create stream stream_tick_to_kline_m_01
  interval(1m) sliding(1m)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m))
into t_kline_m_01
  output_subtable(concat('t_kline_m_01_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    _twstart as ts,
    cast(max(close) as double) as h,
    cast(min(close) as double) as l,
    cast(first(close) as double) as o,
    cast(last(close) as double) as c,
    cast(sum(vol) as double) as vol,
    cast(sum(vol*close) as double) as turn_over,
    _twend as create_time
  from %%tbname
  where ts>= _twstart and ts < _twend;

create stream stream_tick_to_footprint_m_01
  interval(1m) sliding(1m)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m))
into t_footprint_m_01
  output_subtable(concat('t_footprint_m_01_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    first(ts) as ts,
    close as price,
    cast(sum(case when dir =1 then vol else 0.0 end) as double) as bid_vol,
    cast(sum(case when dir =3 then vol else 0.0 end) as double) as ask_vol,
    cast(sum(case when dir =2 then vol else 0.0 end) as double) as mid_vol,
    _twend as create_time
  from %%tbname
  where ts>= _twstart and ts < _twend
  group by close;

create stream stream_tick_to_kldir_m_01
  interval(1m) sliding(1m)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m))
into t_kldir_m_01
  output_subtable(concat('t_kldir_m_01_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    _twstart as ts,
    sum(case when dir =1 then vol else 0.0 end) as cumulative_bid_vol,
    sum(case when dir =3 then vol else 0.0 end) as cumulative_ask_vol,
    sum(case when dir =2 then vol else 0.0 end) as cumulative_mid_vol,
    _twend as create_time
  from %%tbname
  where ts>= _twstart and ts <= _twend;

create stream stream_tick_to_klvol_m_01
  interval(1m) sliding(1m)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m))
into t_klvol_m_01
  output_subtable(concat('t_klvol_m_01_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    _twstart as ts,
    min(a) as min_bid_vol,
    max(a) as max_bid_vol,
    last(a) as fin_bid_vol,
    _twend as create_time
  from (select
    ts,
    csum(vol * cast(2-dir as double)) as a
  from %%tbname
  where ts>= _twstart and ts <= _twend)b;

create stream stream_tick_to_kline_m_05
  interval(5m) sliding(5m)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m))
into t_kline_m_05
  output_subtable(concat('t_kline_m_05_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    _twstart as ts,
    cast(max(close) as double) as h,
    cast(min(close) as double) as l,
    cast(first(close) as double) as o,
    cast(last(close) as double) as c,
    cast(sum(vol) as double) as vol,
    cast(sum(vol*close) as double) as turn_over,
    _twend as create_time
  from %%tbname
  where ts>= _twstart and ts < _twend;

create stream stream_tick_to_footprint_m_05
  interval(5m) sliding(5m)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m))
into t_footprint_m_05
  output_subtable(concat('t_footprint_m_05_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    first(ts) as ts,
    close as price,
    cast(sum(case when dir =1 then vol else 0.0 end) as double) as bid_vol,
    cast(sum(case when dir =3 then vol else 0.0 end) as double) as ask_vol,
    cast(sum(case when dir =2 then vol else 0.0 end) as double) as mid_vol,
    _twend as create_time
  from %%tbname
  where ts>= _twstart and ts < _twend
  group by close;

create stream stream_tick_to_kldir_m_05
  interval(5m) sliding(5m)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m))
into t_kldir_m_05
  output_subtable(concat('t_kldir_m_05_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    _twstart as ts,
    sum(case when dir =1 then vol else 0.0 end) as cumulative_bid_vol,
    sum(case when dir =3 then vol else 0.0 end) as cumulative_ask_vol,
    sum(case when dir =2 then vol else 0.0 end) as cumulative_mid_vol,
    _twend as create_time
  from %%tbname
  where ts>= _twstart and ts <= _twend;

create stream stream_tick_to_klvol_m_05
  interval(5m) sliding(5m)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m))
into t_klvol_m_05
  output_subtable(concat('t_klvol_m_05_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    _twstart as ts,
    min(a) as min_bid_vol,
    max(a) as max_bid_vol,
    last(a) as fin_bid_vol,
    _twend as create_time
  from (select
    ts,
    csum(vol * cast(2-dir as double)) as a
  from %%tbname
  where ts>= _twstart and ts <= _twend)b;

create stream stream_tick_to_kline_m_15
  interval(15m) sliding(15m)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m))
into t_kline_m_15
  output_subtable(concat('t_kline_m_15_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    _twstart as ts,
    cast(max(close) as double) as h,
    cast(min(close) as double) as l,
    cast(first(close) as double) as o,
    cast(last(close) as double) as c,
    cast(sum(vol) as double) as vol,
    cast(sum(vol*close) as double) as turn_over,
    _twend as create_time
  from %%tbname
  where ts>= _twstart and ts < _twend;

create stream stream_tick_to_footprint_m_15
  interval(15m) sliding(15m)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m))
into t_footprint_m_15
  output_subtable(concat('t_footprint_m_15_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    first(ts) as ts,
    close as price,
    cast(sum(case when dir =1 then vol else 0.0 end) as double) as bid_vol,
    cast(sum(case when dir =3 then vol else 0.0 end) as double) as ask_vol,
    cast(sum(case when dir =2 then vol else 0.0 end) as double) as mid_vol,
    _twend as create_time
  from %%tbname
  where ts>= _twstart and ts < _twend
  group by close;

create stream stream_tick_to_kldir_m_15
  interval(15m) sliding(15m)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m))
into t_kldir_m_15
  output_subtable(concat('t_kldir_m_15_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    _twstart as ts,
    sum(case when dir =1 then vol else 0.0 end) as cumulative_bid_vol,
    sum(case when dir =3 then vol else 0.0 end) as cumulative_ask_vol,
    sum(case when dir =2 then vol else 0.0 end) as cumulative_mid_vol,
    _twend as create_time
  from %%tbname
  where ts>= _twstart and ts <= _twend;

create stream stream_tick_to_klvol_m_15
  interval(15m) sliding(15m)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m))
into t_klvol_m_15
  output_subtable(concat('t_klvol_m_15_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    _twstart as ts,
    min(a) as min_bid_vol,
    max(a) as max_bid_vol,
    last(a) as fin_bid_vol,
    _twend as create_time
  from (select
    ts,
    csum(vol * cast(2-dir as double)) as a
  from %%tbname
  where ts>= _twstart and ts <= _twend)b;

create stream stream_tick_to_kline_m_30
  interval(30m) sliding(30m)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m))
into t_kline_m_30
  output_subtable(concat('t_kline_m_30_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    _twstart as ts,
    cast(max(close) as double) as h,
    cast(min(close) as double) as l,
    cast(first(close) as double) as o,
    cast(last(close) as double) as c,
    cast(sum(vol) as double) as vol,
    cast(sum(vol*close) as double) as turn_over,
    _twend as create_time
  from %%tbname
  where ts>= _twstart and ts < _twend;

create stream stream_tick_to_footprint_m_30
  interval(30m) sliding(30m)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m))
into t_footprint_m_30
  output_subtable(concat('t_footprint_m_30_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    first(ts) as ts,
    close as price,
    cast(sum(case when dir =1 then vol else 0.0 end) as double) as bid_vol,
    cast(sum(case when dir =3 then vol else 0.0 end) as double) as ask_vol,
    cast(sum(case when dir =2 then vol else 0.0 end) as double) as mid_vol,
    _twend as create_time
  from %%tbname
  where ts>= _twstart and ts < _twend
  group by close;

create stream stream_tick_to_kldir_m_30
  interval(30m) sliding(30m)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m))
into t_kldir_m_30
  output_subtable(concat('t_kldir_m_30_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    _twstart as ts,
    sum(case when dir =1 then vol else 0.0 end) as cumulative_bid_vol,
    sum(case when dir =3 then vol else 0.0 end) as cumulative_ask_vol,
    sum(case when dir =2 then vol else 0.0 end) as cumulative_mid_vol,
    _twend as create_time
  from %%tbname
  where ts>= _twstart and ts <= _twend;

create stream stream_tick_to_klvol_m_30
  interval(30m) sliding(30m)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m))
into t_klvol_m_30
  output_subtable(concat('t_klvol_m_30_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    _twstart as ts,
    min(a) as min_bid_vol,
    max(a) as max_bid_vol,
    last(a) as fin_bid_vol,
    _twend as create_time
  from (select
    ts,
    csum(vol * cast(2-dir as double)) as a
  from %%tbname
  where ts>= _twstart and ts <= _twend)b;

create stream stream_tick_to_kline_h_01
  interval(1h) sliding(1h)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m))
into t_kline_h_01
  output_subtable(concat('t_kline_h_01_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    _twstart as ts,
    cast(max(close) as double) as h,
    cast(min(close) as double) as l,
    cast(first(close) as double) as o,
    cast(last(close) as double) as c,
    cast(sum(vol) as double) as vol,
    cast(sum(vol*close) as double) as turn_over,
    _twend as create_time
  from %%tbname
  where ts>= _twstart and ts < _twend;

create stream stream_tick_to_footprint_h_01
  interval(1h) sliding(1h)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m))
into t_footprint_h_01
  output_subtable(concat('t_footprint_h_01_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    first(ts) as ts,
    close as price,
    cast(sum(case when dir =1 then vol else 0.0 end) as double) as bid_vol,
    cast(sum(case when dir =3 then vol else 0.0 end) as double) as ask_vol,
    cast(sum(case when dir =2 then vol else 0.0 end) as double) as mid_vol,
    _twend as create_time
  from %%tbname
  where ts>= _twstart and ts < _twend
  group by close;

create stream stream_tick_to_kldir_h_01
  interval(1h) sliding(1h)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m))
into t_kldir_h_01
  output_subtable(concat('t_kldir_h_01_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    _twstart as ts,
    sum(case when dir =1 then vol else 0.0 end) as cumulative_bid_vol,
    sum(case when dir =3 then vol else 0.0 end) as cumulative_ask_vol,
    sum(case when dir =2 then vol else 0.0 end) as cumulative_mid_vol,
    _twend as create_time
  from %%tbname
  where ts>= _twstart and ts <= _twend;

create stream stream_tick_to_klvol_h_01
  interval(1h) sliding(1h)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m))
into t_klvol_h_01
  output_subtable(concat('t_klvol_h_01_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    _twstart as ts,
    min(a) as min_bid_vol,
    max(a) as max_bid_vol,
    last(a) as fin_bid_vol,
    _twend as create_time
  from (select
    ts,
    csum(vol * cast(2-dir as double)) as a
  from %%tbname
  where ts>= _twstart and ts <= _twend)b;

create stream stream_tick_to_kline_h_04
  interval(4h) sliding(4h)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m))
into t_kline_h_04
  output_subtable(concat('t_kline_h_04_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    _twstart as ts,
    cast(max(close) as double) as h,
    cast(min(close) as double) as l,
    cast(first(close) as double) as o,
    cast(last(close) as double) as c,
    cast(sum(vol) as double) as vol,
    cast(sum(vol*close) as double) as turn_over,
    _twend as create_time
  from %%tbname
  where ts>= _twstart and ts < _twend;

create stream stream_tick_to_footprint_h_04
  interval(4h) sliding(4h)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m))
into t_footprint_h_04
  output_subtable(concat('t_footprint_h_04_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    first(ts) as ts,
    close as price,
    cast(sum(case when dir =1 then vol else 0.0 end) as double) as bid_vol,
    cast(sum(case when dir =3 then vol else 0.0 end) as double) as ask_vol,
    cast(sum(case when dir =2 then vol else 0.0 end) as double) as mid_vol,
    _twend as create_time
  from %%tbname
  where ts>= _twstart and ts < _twend
  group by close;

create stream stream_tick_to_kldir_h_04
  interval(4h) sliding(4h)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m))
into t_kldir_h_04
  output_subtable(concat('t_kldir_h_04_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    _twstart as ts,
    sum(case when dir =1 then vol else 0.0 end) as cumulative_bid_vol,
    sum(case when dir =3 then vol else 0.0 end) as cumulative_ask_vol,
    sum(case when dir =2 then vol else 0.0 end) as cumulative_mid_vol,
    _twend as create_time
  from %%tbname
  where ts>= _twstart and ts <= _twend;

create stream stream_tick_to_klvol_h_04
  interval(4h) sliding(4h)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m))
into t_klvol_h_04
  output_subtable(concat('t_klvol_h_04_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    _twstart as ts,
    min(a) as min_bid_vol,
    max(a) as max_bid_vol,
    last(a) as fin_bid_vol,
    _twend as create_time
  from (select
    ts,
    csum(vol * cast(2-dir as double)) as a
  from %%tbname
  where ts>= _twstart and ts <= _twend)b;

create stream stream_tick_to_kline_d_01
  interval(1d) sliding(1d)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m))
into t_kline_d_01
  output_subtable(concat('t_kline_d_01_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    _twstart as ts,
    cast(max(close) as double) as h,
    cast(min(close) as double) as l,
    cast(first(close) as double) as o,
    cast(last(close) as double) as c,
    cast(sum(vol) as double) as vol,
    cast(sum(vol*close) as double) as turn_over,
    _twend as create_time
  from %%tbname
  where ts>= _twstart and ts < _twend;

create stream stream_tick_to_footprint_d_01
  interval(1d) sliding(1d)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m))
into t_footprint_d_01
  output_subtable(concat('t_footprint_d_01_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    first(ts) as ts,
    close as price,
    cast(sum(case when dir =1 then vol else 0.0 end) as double) as bid_vol,
    cast(sum(case when dir =3 then vol else 0.0 end) as double) as ask_vol,
    cast(sum(case when dir =2 then vol else 0.0 end) as double) as mid_vol,
    _twend as create_time
  from %%tbname
  where ts>= _twstart and ts < _twend
  group by close;

create stream stream_tick_to_kldir_d_01
  interval(1d) sliding(1d)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m))
into t_kldir_d_01
  output_subtable(concat('t_kldir_d_01_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    _twstart as ts,
    sum(case when dir =1 then vol else 0.0 end) as cumulative_bid_vol,
    sum(case when dir =3 then vol else 0.0 end) as cumulative_ask_vol,
    sum(case when dir =2 then vol else 0.0 end) as cumulative_mid_vol,
    _twend as create_time
  from %%tbname
  where ts>= _twstart and ts <= _twend;

create stream stream_tick_to_klvol_d_01
  interval(1d) sliding(1d)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m))
into t_klvol_d_01
  output_subtable(concat('t_klvol_d_01_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    _twstart as ts,
    min(a) as min_bid_vol,
    max(a) as max_bid_vol,
    last(a) as fin_bid_vol,
    _twend as create_time
  from (select
    ts,
    csum(vol * cast(2-dir as double)) as a
  from %%tbname
  where ts>= _twstart and ts <= _twend)b;

create stream stream_tick_to_kline_w_01
  interval(1w) sliding(1w)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m))
into t_kline_w_01
  output_subtable(concat('t_kline_w_01_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    _twstart as ts,
    cast(max(close) as double) as h,
    cast(min(close) as double) as l,
    cast(first(close) as double) as o,
    cast(last(close) as double) as c,
    cast(sum(vol) as double) as vol,
    cast(sum(vol*close) as double) as turn_over,
    _twend as create_time
  from %%tbname
  where ts>= _twstart and ts < _twend;

create stream stream_tick_to_footprint_w_01
  interval(1w) sliding(1w)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m))
into t_footprint_w_01
  output_subtable(concat('t_footprint_w_01_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    first(ts) as ts,
    close as price,
    cast(sum(case when dir =1 then vol else 0.0 end) as double) as bid_vol,
    cast(sum(case when dir =3 then vol else 0.0 end) as double) as ask_vol,
    cast(sum(case when dir =2 then vol else 0.0 end) as double) as mid_vol,
    _twend as create_time
  from %%tbname
  where ts>= _twstart and ts < _twend
  group by close;

create stream stream_tick_to_kldir_w_01
  interval(1w) sliding(1w)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m))
into t_kldir_w_01
  output_subtable(concat('t_kldir_w_01_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    _twstart as ts,
    sum(case when dir =1 then vol else 0.0 end) as cumulative_bid_vol,
    sum(case when dir =3 then vol else 0.0 end) as cumulative_ask_vol,
    sum(case when dir =2 then vol else 0.0 end) as cumulative_mid_vol,
    _twend as create_time
  from %%tbname
  where ts>= _twstart and ts <= _twend;

create stream stream_tick_to_klvol_w_01
  interval(1w) sliding(1w)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m))
into t_klvol_w_01
  output_subtable(concat('t_klvol_w_01_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    _twstart as ts,
    min(a) as min_bid_vol,
    max(a) as max_bid_vol,
    last(a) as fin_bid_vol,
    _twend as create_time
  from (select
    ts,
    csum(vol * cast(2-dir as double)) as a
  from %%tbname
  where ts>= _twstart and ts <= _twend)b;

删除


drop stream if exists stream_tick_to_klvol_w_01;
drop stream if exists stream_tick_to_kldir_w_01;
drop stream if exists stream_tick_to_footprint_w_01;
drop stream if exists stream_tick_to_kline_w_01;

drop stream if exists stream_tick_to_klvol_d_01;
drop stream if exists stream_tick_to_kldir_d_01;
drop stream if exists stream_tick_to_footprint_d_01;
drop stream if exists stream_tick_to_kline_d_01;

drop stream if exists stream_tick_to_klvol_h_04;
drop stream if exists stream_tick_to_kldir_h_04;
drop stream if exists stream_tick_to_footprint_h_04;
drop stream if exists stream_tick_to_kline_h_04;

drop stream if exists stream_tick_to_klvol_h_01;
drop stream if exists stream_tick_to_kldir_h_01;
drop stream if exists stream_tick_to_footprint_h_01;
drop stream if exists stream_tick_to_kline_h_01;

drop stream if exists stream_tick_to_klvol_m_30;
drop stream if exists stream_tick_to_kldir_m_30;
drop stream if exists stream_tick_to_footprint_m_30;
drop stream if exists stream_tick_to_kline_m_30;

drop stream if exists stream_tick_to_klvol_m_15;
drop stream if exists stream_tick_to_kldir_m_15;
drop stream if exists stream_tick_to_footprint_m_15;
drop stream if exists stream_tick_to_kline_m_15;

drop stream if exists stream_tick_to_klvol_m_05;
drop stream if exists stream_tick_to_kldir_m_05;
drop stream if exists stream_tick_to_footprint_m_05;
drop stream if exists stream_tick_to_kline_m_05;

drop stream if exists stream_tick_to_klvol_m_01;
drop stream if exists stream_tick_to_kldir_m_01;
drop stream if exists stream_tick_to_footprint_m_01;
drop stream if exists stream_tick_to_kline_m_01;

drop stream if exists stream_tick_to_klvol_s_15;
drop stream if exists stream_tick_to_kldir_s_15;
drop stream if exists stream_tick_to_footprint_s_15;
drop stream if exists stream_tick_to_kline_s_15;

drop stream if exists stream_tick_to_klvol_s_05;
drop stream if exists stream_tick_to_kldir_s_05;
drop stream if exists stream_tick_to_footprint_s_05;
drop stream if exists stream_tick_to_kline_s_05;

drop stream if exists stream_tick_to_klvol_s_01;
drop stream if exists stream_tick_to_kldir_s_01;
drop stream if exists stream_tick_to_footprint_s_01;
drop stream if exists stream_tick_to_kline_s_01;

查看流在节点的分布

 select * from information_schema.ins_streams;

写入tick

从指定的kafka写入tick。观察stream的运行。

存量

获得最大吞吐量

增量

查看持续运行稳定性

查询

查询品类列表含最新tick

select symbol,last(ts) as lts,last(close) as lclose,last(dir) as ldir from t_tick group by symbol order by lclose desc limit 0,10;

集群

taosd社区版本支持计算复制集、计算、查询分离,以合理分担压力。可使用三台机器形成数据复制实例,并将流计算放在一个节点,查询放在一个节点,主管理在一个节点。可如下配置/etc/taos/taos.cfg

hosts

每一个节点fqdn就是主机名,并需要在/etc/hosts配置,确保网络是通的。



192.168.7.111 snode
192.168.7.186 mnode
192.168.7.87  qnode

mnode

# The end point of the first dnode in the cluster to be connected to when this dnode or the CLI utility is started
firstEp                   mnode:6030
fqdn                      mnode
serverPort                6030
supportVnodes             1
supportSnodes             0
supportQnodes             0

snode

firstEp                   mnode:6030
fqdn                     snode
serverPort                6030
supportVnodes             256
supportQnodes             1
supportSnodes             1

qnode

firstEp                   mnode:6030
fqdn                     qnode
serverPort                6030
supportVnodes             256
supportQnodes             1
supportSnodes             0
create snode on dnode 1;
create snode on dnode 3;
create snode on dnode 4;

taos> show dnodes;
     id      |            endpoint            | vnodes | support_vnodes |    status    |       create_time       |       reboot_time       |              note              |
=============================================================================================================================================================================
           1 | tdengine-0.186:6030            |      3 |             29 | ready        | 2026-04-24 18:16:11.228 | 2026-05-06 17:51:10.269 |                                |
           3 | ct4dev:6030                    |      2 |            256 | ready        | 2026-05-06 16:35:02.383 | 2026-05-07 13:40:44.757 |                                |
           4 | tdengine-7.87:6030             |      2 |            256 | ready        | 2026-05-06 17:19:23.917 | 2026-05-06 17:51:46.640 |                                |
Query OK, 3 row(s) in set (0.005232s)

taos> show snodes;
     id      |            endpoint            |       create_time       |  replicaId  |          asReplicaOf           |
========================================================================================================================
           1 | tdengine-0.186:6030            | 2026-05-06 18:12:28.481 |           3 | 3, 4                           |
           3 | ct4dev:6030                    | 2026-05-06 16:54:59.151 |           1 | 1                              |
           4 | tdengine-7.87:6030             | 2026-05-06 18:12:30.914 |           1 | None                           |
Query OK, 3 row(s) in set (0.005251s)

订阅

consumer>tree
.
├── pom.xml
├── src
│   ├── main
│   │   └── java
│   │       └── com
│   │           └── taos
│   │               └── App.java
│   └── test
│       └── java
│           └── com
│               └── taos
│                   └── AppTest.java
// export TDENGINE_JDBC_URL="jdbc:TAOS-RS://192.168.7.186:6041/rest/tmq?user=root&password=A7fK9xQ2LmR8tY4Z"
package com.taos;

import com.taosdata.jdbc.tmq.ConsumerRecord;
import com.taosdata.jdbc.tmq.ConsumerRecords;
import com.taosdata.jdbc.tmq.TMQConstants;
import com.taosdata.jdbc.tmq.TaosConsumer;

import java.sql.SQLException;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;

public class App {
    public static void main(String[] args) throws SQLException {

        String url = System.getenv("TDENGINE_JDBC_URL");

        Properties properties = new Properties();
        properties.setProperty(TMQConstants.CONNECT_TYPE, "ws");
        properties.setProperty(TMQConstants.CONNECT_URL, url);
        properties.setProperty(TMQConstants.CONNECT_TIMEOUT, "10000");
        properties.setProperty(TMQConstants.CONNECT_MESSAGE_TIMEOUT, "10000");
        properties.setProperty(TMQConstants.MSG_WITH_TABLE_NAME, "true");
        properties.setProperty(TMQConstants.ENABLE_AUTO_COMMIT, "true");
        properties.setProperty(TMQConstants.GROUP_ID, "gId");
        properties.setProperty(TMQConstants.VALUE_DESERIALIZER, "com.taosdata.jdbc.tmq.MapDeserializer");


        try (TaosConsumer<Map<String, Object>> consumer = new TaosConsumer<>(properties)) {
            consumer.subscribe(Collections.singletonList("tick2"));

            // 无限循环消费
            while (true) {
                try {
                    // 核心:poll 套一层 try-catch,忽略末尾的信号错误
                    ConsumerRecords<Map<String, Object>> consumerRecords = consumer.poll(Duration.ofMillis(100));
                    for (ConsumerRecord<Map<String, Object>> r : consumerRecords) {


                        Map<String, Object> bean = r.value();
                        bean.forEach((k, v) -> {
                            if ("symbol".equals(k) && v instanceof byte[]) {
                                System.out.print(k + " : " + new String((byte[]) v) + " ");
                            } else {
                                System.out.print(k + " : " + v + " ");
                            }
                        });
                        System.out.println();
                    }
                } catch (Exception e) {
                    // 忽略 0x2350 这个正常错误
                    if (e.getMessage().contains("message type is not data")) {
                        // 无数据,休息一下继续轮询
                        try { Thread.sleep(50); } catch (InterruptedException ignored) {}
                        continue;
                    } else {
                        // 其他错误才抛出
                        throw e;
                    }
                }
            }
        }

    }
}

package com.taos;

import junit.framework.Test;
import junit.framework.TestCase;
import junit.framework.TestSuite;

/**
 * Unit test for simple App.
 */
public class AppTest
    extends TestCase
{
    /**
     * Create the test case
     *
     * @param testName name of the test case
     */
    public AppTest( String testName )
    {
        super( testName );
    }

    /**
     * @return the suite of tests being tested
     */
    public static Test suite()
    {
        return new TestSuite( AppTest.class );
    }

    /**
     * Rigourous Test :-)
     */
    public void testApp()
    {
        assertTrue( true );
    }
}
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <groupId>com.taos</groupId>
        <artifactId>consumer</artifactId>
        <packaging>jar</packaging>
        <version>1.0.0</version>
        <name>consumer</name>
        <url>http://maven.apache.org</url>

        <properties>
                <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
                <maven.compiler.source>1.8</maven.compiler.source>
                <maven.compiler.target>1.8</maven.compiler.target>
        </properties>

        <dependencies>
                <dependency>
                        <groupId>junit</groupId>
                        <artifactId>junit</artifactId>
                        <version>3.8.1</version>
                        <scope>test</scope>
                </dependency>

                <dependency>
                        <groupId>com.taosdata.jdbc</groupId>
                        <artifactId>taos-jdbcdriver</artifactId>
                        <version>3.2.1</version>
                </dependency>
        </dependencies>
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-assembly-plugin</artifactId>
                    <version>3.3.0</version>
                    <configuration>
                        <archive>
                            <manifest>
                                <mainClass>com.taos.App</mainClass>
                            </manifest>
                        </archive>
                        <descriptorRefs>
                            <descriptorRef>jar-with-dependencies</descriptorRef>
                        </descriptorRefs>
                    </configuration>
                    <executions>
                        <execution>
                            <id>make-assembly</id>
                            <phase>package</phase>
                            <goals>
                                <goal>single</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
</project>

异常

文件数量

在本文描述场景中,tdengine将打开近7000个文件。如果文件句柄不够,将造成流无法创建新的 数据,甚至taosd因为无法打开wal文件而崩溃。

另外当taos受制于文件句柄,内存时,将吃光系统内存,导致无法登录系统。可通过服务限制其最高cpu和内存,确保服务器不崩溃。

#/etc/sysctl.conf
sudo sysctl -p
kernel.core_pattern = core.%e.%p.%t
vm.swappiness = 0
vm.vfs_cache_pressure = 50
vm.dirty_ratio = 20
vm.dirty_background_ratio = 10
vm.dirty_expire_centisecs = 2000
# 如下两行
fs.nr_open = 1048576
fs.file-max = 2097152

#cat /etc/systemd/system/taosd.service
[Unit]
Description=TDengine server service
After=network-online.target
Wants=network-online.target

[Service]
# CPU 限制:200% 表示最多使用 2 个完整的 CPU 核心
CPUQuota=200%
# 内存限制:8GB 硬上限
MemoryMax=8G
# 内存限制:6GB 软上限
MemoryHigh=6G
Type=simple
ExecStart=/usr/bin/taosd
ExecStartPre=/usr/local/taos/bin/startPre.sh
TimeoutStopSec=1000000s
LimitNOFILE=infinity
LimitNPROC=infinity
LimitCORE=infinity
TimeoutStartSec=0
StandardOutput=null
Restart=always
StartLimitBurst=3
StartLimitInterval=900s

[Install]
WantedBy=multi-user.target

数据迁移

运行中的数据库,会将EP计入全局配置,cfg等全部无效。比如之前使用ip-port启动的服务,如果直接拷贝迁移到其他地方,将还是只能从原先设定的ip-port启动。


评论