修炼者
修炼者
发布于 2026-05-06 / 106 阅读
0
0

TDengine

从源码构建

# 安装构建基本环境
sudo apt-get install -y gcc cmake build-essential git libjansson-dev   libsnappy-dev liblzma-dev zlib1g-dev pkg-config libtool autoconf automake groff

# 用来安装adaptor keeper explore之类的辅助,如果不要可以不安装go
sudo apt install -y golang-go

# 指定版本下载源码
git clone --depth 1 --branch ver-3.4.1.0 https://github.com/taosdata/TDengine.git
cd TDengine/

# 
cmake -B build -DBUILD_TOOLS=true -DBUILD_CONTRIB=true
cmake -B build -DBUILD_TOOLS=true -DBUILD_CONTRIB=true -DBUILD_KEEPER=true -DBUILD_HTTP=false
cmake --build build/ install

配置

# 可检查下/etc/taos.cfg的配置,一般只需要修改下日志、数据的存放路径,然后可直接启动服务
systemctrl start taos

# 首次登录无密码,如果无日志写入权限,需要sudo
taos

# 进入之后,可设置密码,创建用户
alter user root pass 'abc123456@';

create user gauss pass 'Gauss20250922@';

# 设定密码后,再次使用客户端
sudo taos -uroot -fb123456@

# 如果要使用流计算,需要创建sndoe,再taos命令下
show dnodes;
create snode on dnode 1;

# 删除snode
drop snode on dnode 1;

taos.cfg

-- 第一个节点的配置
firstEp                   192.168.7.186:6030
fqdn                      192.168.7.186
serverPort                6030
# logDir                    /var/log/taos
# dataDir                   /var/lib/taos
# tempDir                   /tmp/
supportVnodes              256
timezone                  Asia/Shanghai

-- 后期加入的数据节点1的配置
firstEp                   192.168.7.111:6030
fqdn                      192.168.7.87
serverPort                6030
logDir                    /data01/gauss/soft/taos/log
dataDir                   /data01/gauss/soft/taos/dat
tempDir                   /data01/gauss/soft/taos/tmp
supportVnodes             256
timezone                  Asia/Shanghai

-- 后期加入的数据节点2的配置
firstEp                   192.168.7.186:6030
fqdn                     192.168.7.111
serverPort                6030
logDir                    /data/gaoyong/soft/taos/log_new/
dataDir                   /data/gaoyong/soft/taos/data_new/
tempDir                   /data/gaoyong/soft/taos/tmp_new/
supportVnodes             256
timezone                  Asia/Shanghai

节点

首次启动顺序,先第一个节点,taos登录,使用root用户,密码为空

-- 修改root的密码
alter user root pass "root@123456";
-- 添加用户
create user gauss pass "gauss@123456";
-- 添加数据节点
create dnode "192.168.7.87";
create dnode "192.168.7.111";
-- 启动另外两个节点,可看到状态从offline到ready
show dnodes; 
-- 创建管理节点 :第一个节点默认时mnode
create mnode on dnode 2;
create mnode on dnode 3;
show mnodes;
-- 创建流计算节点
create snode on dnode 1;
create snode on dnode 2;
create snode on dnode 3;
show snodes;

环境变量

SHOW LOCAL VARIABLES;

数据库

创建

# tdengine的数据库有一些配置选项,这里希望缓存最后数据,提供十年存放周期,时间精度为ms
CREATE DATABASE `db_tick` 
   CACHEMODEL 'last_row' 
   KEEP 36500 
   PRECISION 'ms' ;

删除

# 删除数据库 : tdengine如果只删除超级表,不删除子表,会存在隐患
drop database `db_tick`;

查看

# 查看数据库列表
show databases;
# 查看数据库DDL
show create database db_tick\G;
# 查看数据库里的超级表
select stable_name,db_name,`tags` from information_schema.ins_stables order by stable_name;
# 选定指定数据库
use db_tick;
# 显示超级表列表
show stables;
# 查看超级表DDL
show create stable t_tick\G;

超级表

创建

CREATE STABLE `t_tick` (
  `ts` TIMESTAMP, 
  `ask` DOUBLE, 
  `bid` DOUBLE, 
  `close` DOUBLE, 
  `vol` DOUBLE, 
  `dir` TINYINT, 
  `create_time` TIMESTAMP
) TAGS (`symbol` VARCHAR(64)) ;

CREATE STABLE `t_kline_s_01` (
  `ts` TIMESTAMP, 
  `h` DOUBLE, 
  `l` DOUBLE, 
  `o` DOUBLE, 
  `c` DOUBLE, 
  `vol` DOUBLE, 
  `turn_over` DOUBLE, 
  `create_time` TIMESTAMP
) TAGS (`symbol` VARCHAR(64)) ;

CREATE STABLE `t_kline_dir_s_01` (
  `ts` TIMESTAMP, 
  `cumulative_bid_vol` DOUBLE, 
  `cumulative_ask_vol` DOUBLE, 
  `cumulative_mid_vol` DOUBLE, 
  `create_time` TIMESTAMP
) TAGS (`symbol` VARCHAR(64)) ;

CREATE STABLE `t_kline_vol_s_01` (
  `ts` TIMESTAMP, 
  `min_bid_vol` DOUBLE, 
  `max_bid_vol` DOUBLE, 
  `fin_bid_vol` DOUBLE, 
  `create_time` TIMESTAMP
) TAGS (`symbol` VARCHAR(64)) ;

CREATE STABLE `t_footprint_s_01` (
  `ts` TIMESTAMP, 
  `price` DOUBLE, 
  `bid_vol` DOUBLE, 
  `ask_vol` DOUBLE, 
  `mid_vol` DOUBLE, 
  `create_time` TIMESTAMP
) TAGS (`symbol` VARCHAR(64)) ;

流中如果要创建表名,需要确保名字合法。比如基于symbol创建的表,可能存在.之类的特殊符号。

replace(symbol, '.', '_')

创建

drop stream stream_tick_to_kline_s_01;
create stream stream_tick_to_kline_s_01 
  interval(1s) sliding(1s) 
from t_tick 
  partition by tbname,symbol 
  stream_options(ignore_nodata_trigger | max_delay(1m) |watermark(10s)) 
into t_kline_s_01 
  output_subtable(concat('t_kline_s_01_', replace(symbol, '.', '_'))) 
  tags (symbol binary(64) as symbol) 
as 
  select 
    _twstart as ts, 
    cast(max(close) as double) as h, 
    cast(min(close) as double) as l, 
    cast(first(close) as double) as o, 
    cast(last(close) as double) as c, 
    cast(sum(vol) as double) as vol, 
    cast(sum(vol*close) as double) as turn_over,
    _twend  as create_time 
  from %%tbname 
  where ts>= _twstart and ts < _twend;

drop stream stream_tick_to_footprint_s_01;
create stream stream_tick_to_footprint_s_01 
  interval(1s) sliding(1s) 
from t_tick 
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m)|watermark(10s)) 
into t_footprint_s_01 
  output_subtable(concat('t_footprint_s_01_', symbol)) 
  tags (symbol binary(64) as symbol) 
as 
  select 
    first(ts) as ts, 
    close as price, 
    cast(sum(case when dir =1 then vol else 0.0 end) as double) as bid_vol, 
    cast(sum(case when dir =3 then vol else 0.0 end) as double) as ask_vol, 
    cast(sum(case when dir =2 then vol else 0.0 end) as double) as mid_vol, 
    _twend as create_time 
  from %%tbname 
  where ts>= _twstart and ts < _twend 
  group by close;

drop stream stream_tick_to_kline_dir_s_01;
create stream stream_tick_to_kline_dir_s_01 
  interval(1s) sliding(1s) 
from t_tick 
  partition by tbname,symbol 
  stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s)) 
into t_kline_dir_s_01 
  output_subtable(concat('t_kline_dir_s_01_', symbol)) 
  tags (symbol binary(64) as symbol) 
as 
  select 
    _twstart as ts, 
    sum(case when dir =1 then vol else 0.0 end) as cumulative_bid_vol, 
    sum(case when dir =3 then vol else 0.0 end) as cumulative_ask_vol, 
    sum(case when dir =2 then vol else 0.0 end) as cumulative_mid_vol, 
    _twend as create_time 
  from %%tbname 
  where ts>= _twstart and ts <= _twend;

drop stream stream_tick_to_kline_vol_s_01;
create stream stream_tick_to_kline_vol_s_01 
  interval(1s) sliding(1s) 
from db_tick.t_tick 
  partition by tbname,symbol 
  stream_options(ignore_nodata_trigger | max_delay(1m) |watermark(10s)) 
into db_tick.t_kline_vol_s_01 
  output_subtable(concat('t_kline_vol_s_01_', symbol)) 
  tags (symbol binary(64) as symbol) 
as 
  select 
    _twstart as ts, 
    min(a) as min_bid_vol,
    max(a) as max_bid_vol,
    last(a) as fin_bid_vol,
    _twend as create_time
  from (select  
    ts,
    csum(vol * cast(2-dir as double)) as a
  from %%tbname 
  where ts>= _twstart and ts <= _twend)b;

查看运行状态

select stream_name ,stream_id,`snodeLeader`,`snodeReplica`,status from information_schema.ins_streams order by stream_name;

测试

创建1,5,15s,1,5,15,30m,1,4h,1d,1w这11个周期的流计算,每个都有基于tick的四个计算包括kline、klvol、kldir、footprint。如下脚本。

超级表

创建

CREATE STABLE `t_tick` (`ts` TIMESTAMP, `ask` DOUBLE, `bid` DOUBLE, `close` DOUBLE, `vol` DOUBLE, `dir` TINYINT, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;

CREATE STABLE `t_kline_s_01` (`ts` TIMESTAMP, `h` DOUBLE, `l` DOUBLE, `o` DOUBLE, `c` DOUBLE, `vol` DOUBLE, `turn_over` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_kldir_s_01` (`ts` TIMESTAMP, `cumulative_bid_vol` DOUBLE, `cumulative_ask_vol` DOUBLE, `cumulative_mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_klvol_s_01` (`ts` TIMESTAMP, `min_bid_vol` DOUBLE, `max_bid_vol` DOUBLE, `fin_bid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_footprint_s_01` (`ts` TIMESTAMP, `price` DOUBLE, `bid_vol` DOUBLE, `ask_vol` DOUBLE, `mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;

CREATE STABLE `t_kline_s_05` (`ts` TIMESTAMP, `h` DOUBLE, `l` DOUBLE, `o` DOUBLE, `c` DOUBLE, `vol` DOUBLE, `turn_over` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_kldir_s_05` (`ts` TIMESTAMP, `cumulative_bid_vol` DOUBLE, `cumulative_ask_vol` DOUBLE, `cumulative_mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_klvol_s_05` (`ts` TIMESTAMP, `min_bid_vol` DOUBLE, `max_bid_vol` DOUBLE, `fin_bid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_footprint_s_05` (`ts` TIMESTAMP, `price` DOUBLE, `bid_vol` DOUBLE, `ask_vol` DOUBLE, `mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;

CREATE STABLE `t_kline_s_15` (`ts` TIMESTAMP, `h` DOUBLE, `l` DOUBLE, `o` DOUBLE, `c` DOUBLE, `vol` DOUBLE, `turn_over` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_kldir_s_15` (`ts` TIMESTAMP, `cumulative_bid_vol` DOUBLE, `cumulative_ask_vol` DOUBLE, `cumulative_mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_klvol_s_15` (`ts` TIMESTAMP, `min_bid_vol` DOUBLE, `max_bid_vol` DOUBLE, `fin_bid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_footprint_s_15` (`ts` TIMESTAMP, `price` DOUBLE, `bid_vol` DOUBLE, `ask_vol` DOUBLE, `mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;

CREATE STABLE `t_kline_m_01` (`ts` TIMESTAMP, `h` DOUBLE, `l` DOUBLE, `o` DOUBLE, `c` DOUBLE, `vol` DOUBLE, `turn_over` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_kldir_m_01` (`ts` TIMESTAMP, `cumulative_bid_vol` DOUBLE, `cumulative_ask_vol` DOUBLE, `cumulative_mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_klvol_m_01` (`ts` TIMESTAMP, `min_bid_vol` DOUBLE, `max_bid_vol` DOUBLE, `fin_bid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_footprint_m_01` (`ts` TIMESTAMP, `price` DOUBLE, `bid_vol` DOUBLE, `ask_vol` DOUBLE, `mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;

CREATE STABLE `t_kline_m_05` (`ts` TIMESTAMP, `h` DOUBLE, `l` DOUBLE, `o` DOUBLE, `c` DOUBLE, `vol` DOUBLE, `turn_over` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_kldir_m_05` (`ts` TIMESTAMP, `cumulative_bid_vol` DOUBLE, `cumulative_ask_vol` DOUBLE, `cumulative_mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_klvol_m_05` (`ts` TIMESTAMP, `min_bid_vol` DOUBLE, `max_bid_vol` DOUBLE, `fin_bid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_footprint_m_05` (`ts` TIMESTAMP, `price` DOUBLE, `bid_vol` DOUBLE, `ask_vol` DOUBLE, `mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;

CREATE STABLE `t_kline_m_15` (`ts` TIMESTAMP, `h` DOUBLE, `l` DOUBLE, `o` DOUBLE, `c` DOUBLE, `vol` DOUBLE, `turn_over` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_kldir_m_15` (`ts` TIMESTAMP, `cumulative_bid_vol` DOUBLE, `cumulative_ask_vol` DOUBLE, `cumulative_mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_klvol_m_15` (`ts` TIMESTAMP, `min_bid_vol` DOUBLE, `max_bid_vol` DOUBLE, `fin_bid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_footprint_m_15` (`ts` TIMESTAMP, `price` DOUBLE, `bid_vol` DOUBLE, `ask_vol` DOUBLE, `mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;

CREATE STABLE `t_kline_m_30` (`ts` TIMESTAMP, `h` DOUBLE, `l` DOUBLE, `o` DOUBLE, `c` DOUBLE, `vol` DOUBLE, `turn_over` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_kldir_m_30` (`ts` TIMESTAMP, `cumulative_bid_vol` DOUBLE, `cumulative_ask_vol` DOUBLE, `cumulative_mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_klvol_m_30` (`ts` TIMESTAMP, `min_bid_vol` DOUBLE, `max_bid_vol` DOUBLE, `fin_bid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_footprint_m_30` (`ts` TIMESTAMP, `price` DOUBLE, `bid_vol` DOUBLE, `ask_vol` DOUBLE, `mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;

CREATE STABLE `t_kline_h_01` (`ts` TIMESTAMP, `h` DOUBLE, `l` DOUBLE, `o` DOUBLE, `c` DOUBLE, `vol` DOUBLE, `turn_over` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_kldir_h_01` (`ts` TIMESTAMP, `cumulative_bid_vol` DOUBLE, `cumulative_ask_vol` DOUBLE, `cumulative_mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_klvol_h_01` (`ts` TIMESTAMP, `min_bid_vol` DOUBLE, `max_bid_vol` DOUBLE, `fin_bid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_footprint_h_01` (`ts` TIMESTAMP, `price` DOUBLE, `bid_vol` DOUBLE, `ask_vol` DOUBLE, `mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;

CREATE STABLE `t_kline_h_04` (`ts` TIMESTAMP, `h` DOUBLE, `l` DOUBLE, `o` DOUBLE, `c` DOUBLE, `vol` DOUBLE, `turn_over` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_kldir_h_04` (`ts` TIMESTAMP, `cumulative_bid_vol` DOUBLE, `cumulative_ask_vol` DOUBLE, `cumulative_mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_klvol_h_04` (`ts` TIMESTAMP, `min_bid_vol` DOUBLE, `max_bid_vol` DOUBLE, `fin_bid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_footprint_h_04` (`ts` TIMESTAMP, `price` DOUBLE, `bid_vol` DOUBLE, `ask_vol` DOUBLE, `mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;

CREATE STABLE `t_kline_d_01` (`ts` TIMESTAMP, `h` DOUBLE, `l` DOUBLE, `o` DOUBLE, `c` DOUBLE, `vol` DOUBLE, `turn_over` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_kldir_d_01` (`ts` TIMESTAMP, `cumulative_bid_vol` DOUBLE, `cumulative_ask_vol` DOUBLE, `cumulative_mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_klvol_d_01` (`ts` TIMESTAMP, `min_bid_vol` DOUBLE, `max_bid_vol` DOUBLE, `fin_bid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_footprint_d_01` (`ts` TIMESTAMP, `price` DOUBLE, `bid_vol` DOUBLE, `ask_vol` DOUBLE, `mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;

CREATE STABLE `t_kline_w_01` (`ts` TIMESTAMP, `h` DOUBLE, `l` DOUBLE, `o` DOUBLE, `c` DOUBLE, `vol` DOUBLE, `turn_over` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_kldir_w_01` (`ts` TIMESTAMP, `cumulative_bid_vol` DOUBLE, `cumulative_ask_vol` DOUBLE, `cumulative_mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_klvol_w_01` (`ts` TIMESTAMP, `min_bid_vol` DOUBLE, `max_bid_vol` DOUBLE, `fin_bid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_footprint_w_01` (`ts` TIMESTAMP, `price` DOUBLE, `bid_vol` DOUBLE, `ask_vol` DOUBLE, `mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;

删除

drop stable if exists `t_footprint_w_01`;
drop stable if exists `t_footprint_d_01`;
drop stable if exists `t_footprint_h_04`;
drop stable if exists `t_footprint_h_01`;
drop stable if exists `t_footprint_m_30`;
drop stable if exists `t_footprint_m_15`;
drop stable if exists `t_footprint_m_05`;
drop stable if exists `t_footprint_m_01`;
drop stable if exists `t_footprint_s_15`;
drop stable if exists `t_footprint_s_05`;
drop stable if exists `t_footprint_s_01`;
drop stable if exists `t_klvol_w_01`;
drop stable if exists `t_klvol_d_01`;
drop stable if exists `t_klvol_h_04`;
drop stable if exists `t_klvol_h_01`;
drop stable if exists `t_klvol_m_30`;
drop stable if exists `t_klvol_m_15`;
drop stable if exists `t_klvol_m_05`;
drop stable if exists `t_klvol_m_01`;
drop stable if exists `t_klvol_s_15`;
drop stable if exists `t_klvol_s_05`;
drop stable if exists `t_klvol_s_01`;
drop stable if exists `t_kldir_w_01`;
drop stable if exists `t_kldir_d_01`;
drop stable if exists `t_kldir_h_04`;
drop stable if exists `t_kldir_h_01`;
drop stable if exists `t_kldir_m_30`;
drop stable if exists `t_kldir_m_15`;
drop stable if exists `t_kldir_m_05`;
drop stable if exists `t_kldir_m_01`;
drop stable if exists `t_kldir_s_15`;
drop stable if exists `t_kldir_s_05`;
drop stable if exists `t_kldir_s_01`;
drop stable if exists `t_kline_w_01`;
drop stable if exists `t_kline_d_01`;
drop stable if exists `t_kline_h_04`;
drop stable if exists `t_kline_h_01`;
drop stable if exists `t_kline_m_30`;
drop stable if exists `t_kline_m_15`;
drop stable if exists `t_kline_m_05`;
drop stable if exists `t_kline_m_01`;
drop stable if exists `t_kline_s_15`;
drop stable if exists `t_kline_s_05`;
drop stable if exists `t_kline_s_01`;
drop stable if exists `t_tick`;

创建


create stream stream_tick_to_kline_s_01
  interval(1s) sliding(1s)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_kline_s_01
  output_subtable(concat('t_kline_s_01_', replace(replace(symbol,'.','_'),'&','_')))
  tags (symbol binary(64) as symbol)
as
  select
    _twstart as ts,
    cast(max(close) as double) as h,
    cast(min(close) as double) as l,
    cast(first(close) as double) as o,
    cast(last(close) as double) as c,
    cast(sum(vol) as double) as vol,
    cast(sum(vol*close) as double) as turn_over,
    _twend as create_time
  from %%tbname
  where ts>= _twstart and ts < _twend;

create stream stream_tick_to_footprint_s_01
  interval(1s) sliding(1s)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_footprint_s_01
  output_subtable(concat('t_footprint_s_01_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    first(ts) as ts,
    close as price,
    cast(sum(case when dir =1 then vol else 0.0 end) as double) as bid_vol,
    cast(sum(case when dir =3 then vol else 0.0 end) as double) as ask_vol,
    cast(sum(case when dir =2 then vol else 0.0 end) as double) as mid_vol,
    _twend as create_time
  from %%tbname
  where ts>= _twstart and ts < _twend
  group by close;

create stream stream_tick_to_kldir_s_01
  interval(1s) sliding(1s)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_kldir_s_01
  output_subtable(concat('t_kldir_s_01_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    _twstart as ts,
    sum(case when dir =1 then vol else 0.0 end) as cumulative_bid_vol,
    sum(case when dir =3 then vol else 0.0 end) as cumulative_ask_vol,
    sum(case when dir =2 then vol else 0.0 end) as cumulative_mid_vol,
    _twend as create_time
  from %%tbname
  where ts>= _twstart and ts <= _twend;

create stream stream_tick_to_klvol_s_01
  interval(1s) sliding(1s)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_klvol_s_01
  output_subtable(concat('t_klvol_s_01_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    _twstart as ts,
    min(a) as min_bid_vol,
    max(a) as max_bid_vol,
    last(a) as fin_bid_vol,
    _twend as create_time
  from (select
    ts,
    csum(vol * cast(2-dir as double)) as a
  from %%tbname
  where ts>= _twstart and ts <= _twend)b;

create stream stream_tick_to_kline_s_05
  interval(5s) sliding(5s)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_kline_s_05
  output_subtable(concat('t_kline_s_05_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    _twstart as ts,
    cast(max(close) as double) as h,
    cast(min(close) as double) as l,
    cast(first(close) as double) as o,
    cast(last(close) as double) as c,
    cast(sum(vol) as double) as vol,
    cast(sum(vol*close) as double) as turn_over,
    _twend as create_time
  from %%tbname
  where ts>= _twstart and ts < _twend;

create stream stream_tick_to_footprint_s_05
  interval(5s) sliding(5s)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_footprint_s_05
  output_subtable(concat('t_footprint_s_05_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    first(ts) as ts,
    close as price,
    cast(sum(case when dir =1 then vol else 0.0 end) as double) as bid_vol,
    cast(sum(case when dir =3 then vol else 0.0 end) as double) as ask_vol,
    cast(sum(case when dir =2 then vol else 0.0 end) as double) as mid_vol,
    _twend as create_time
  from %%tbname
  where ts>= _twstart and ts < _twend
  group by close;

create stream stream_tick_to_kldir_s_05
  interval(5s) sliding(5s)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_kldir_s_05
  output_subtable(concat('t_kldir_s_05_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    _twstart as ts,
    sum(case when dir =1 then vol else 0.0 end) as cumulative_bid_vol,
    sum(case when dir =3 then vol else 0.0 end) as cumulative_ask_vol,
    sum(case when dir =2 then vol else 0.0 end) as cumulative_mid_vol,
    _twend as create_time
  from %%tbname
  where ts>= _twstart and ts <= _twend;

create stream stream_tick_to_klvol_s_05
  interval(5s) sliding(5s)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m))
into t_klvol_s_05
  output_subtable(concat('t_klvol_s_05_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    _twstart as ts,
    min(a) as min_bid_vol,
    max(a) as max_bid_vol,
    last(a) as fin_bid_vol,
    _twend as create_time
  from (select
    ts,
    csum(vol * cast(2-dir as double)) as a
  from %%tbname
  where ts>= _twstart and ts <= _twend)b;

create stream stream_tick_to_kline_s_15
  interval(15s) sliding(15s)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_kline_s_15
  output_subtable(concat('t_kline_s_15_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    _twstart as ts,
    cast(max(close) as double) as h,
    cast(min(close) as double) as l,
    cast(first(close) as double) as o,
    cast(last(close) as double) as c,
    cast(sum(vol) as double) as vol,
    cast(sum(vol*close) as double) as turn_over,
    _twend as create_time
  from %%tbname
  where ts>= _twstart and ts < _twend;

create stream stream_tick_to_footprint_s_15
  interval(15s) sliding(15s)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_footprint_s_15
  output_subtable(concat('t_footprint_s_15_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    first(ts) as ts,
    close as price,
    cast(sum(case when dir =1 then vol else 0.0 end) as double) as bid_vol,
    cast(sum(case when dir =3 then vol else 0.0 end) as double) as ask_vol,
    cast(sum(case when dir =2 then vol else 0.0 end) as double) as mid_vol,
    _twend as create_time
  from %%tbname
  where ts>= _twstart and ts < _twend
  group by close;

create stream stream_tick_to_kldir_s_15
  interval(15s) sliding(15s)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_kldir_s_15
  output_subtable(concat('t_kldir_s_15_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    _twstart as ts,
    sum(case when dir =1 then vol else 0.0 end) as cumulative_bid_vol,
    sum(case when dir =3 then vol else 0.0 end) as cumulative_ask_vol,
    sum(case when dir =2 then vol else 0.0 end) as cumulative_mid_vol,
    _twend as create_time
  from %%tbname
  where ts>= _twstart and ts <= _twend;

create stream stream_tick_to_klvol_s_15
  interval(15s) sliding(15s)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_klvol_s_15
  output_subtable(concat('t_klvol_s_15_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    _twstart as ts,
    min(a) as min_bid_vol,
    max(a) as max_bid_vol,
    last(a) as fin_bid_vol,
    _twend as create_time
  from (select
    ts,
    csum(vol * cast(2-dir as double)) as a
  from %%tbname
  where ts>= _twstart and ts <= _twend)b;

create stream stream_tick_to_kline_m_01
  interval(1m) sliding(1m)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_kline_m_01
  output_subtable(concat('t_kline_m_01_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    _twstart as ts,
    cast(max(close) as double) as h,
    cast(min(close) as double) as l,
    cast(first(close) as double) as o,
    cast(last(close) as double) as c,
    cast(sum(vol) as double) as vol,
    cast(sum(vol*close) as double) as turn_over,
    _twend as create_time
  from %%tbname
  where ts>= _twstart and ts < _twend;

create stream stream_tick_to_footprint_m_01
  interval(1m) sliding(1m)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_footprint_m_01
  output_subtable(concat('t_footprint_m_01_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    first(ts) as ts,
    close as price,
    cast(sum(case when dir =1 then vol else 0.0 end) as double) as bid_vol,
    cast(sum(case when dir =3 then vol else 0.0 end) as double) as ask_vol,
    cast(sum(case when dir =2 then vol else 0.0 end) as double) as mid_vol,
    _twend as create_time
  from %%tbname
  where ts>= _twstart and ts < _twend
  group by close;

create stream stream_tick_to_kldir_m_01
  interval(1m) sliding(1m)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_kldir_m_01
  output_subtable(concat('t_kldir_m_01_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    _twstart as ts,
    sum(case when dir =1 then vol else 0.0 end) as cumulative_bid_vol,
    sum(case when dir =3 then vol else 0.0 end) as cumulative_ask_vol,
    sum(case when dir =2 then vol else 0.0 end) as cumulative_mid_vol,
    _twend as create_time
  from %%tbname
  where ts>= _twstart and ts <= _twend;

create stream stream_tick_to_klvol_m_01
  interval(1m) sliding(1m)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_klvol_m_01
  output_subtable(concat('t_klvol_m_01_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    _twstart as ts,
    min(a) as min_bid_vol,
    max(a) as max_bid_vol,
    last(a) as fin_bid_vol,
    _twend as create_time
  from (select
    ts,
    csum(vol * cast(2-dir as double)) as a
  from %%tbname
  where ts>= _twstart and ts <= _twend)b;

create stream stream_tick_to_kline_m_05
  interval(5m) sliding(5m)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_kline_m_05
  output_subtable(concat('t_kline_m_05_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    _twstart as ts,
    cast(max(close) as double) as h,
    cast(min(close) as double) as l,
    cast(first(close) as double) as o,
    cast(last(close) as double) as c,
    cast(sum(vol) as double) as vol,
    cast(sum(vol*close) as double) as turn_over,
    _twend as create_time
  from %%tbname
  where ts>= _twstart and ts < _twend;

create stream stream_tick_to_footprint_m_05
  interval(5m) sliding(5m)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_footprint_m_05
  output_subtable(concat('t_footprint_m_05_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    first(ts) as ts,
    close as price,
    cast(sum(case when dir =1 then vol else 0.0 end) as double) as bid_vol,
    cast(sum(case when dir =3 then vol else 0.0 end) as double) as ask_vol,
    cast(sum(case when dir =2 then vol else 0.0 end) as double) as mid_vol,
    _twend as create_time
  from %%tbname
  where ts>= _twstart and ts < _twend
  group by close;

create stream stream_tick_to_kldir_m_05
  interval(5m) sliding(5m)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_kldir_m_05
  output_subtable(concat('t_kldir_m_05_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    _twstart as ts,
    sum(case when dir =1 then vol else 0.0 end) as cumulative_bid_vol,
    sum(case when dir =3 then vol else 0.0 end) as cumulative_ask_vol,
    sum(case when dir =2 then vol else 0.0 end) as cumulative_mid_vol,
    _twend as create_time
  from %%tbname
  where ts>= _twstart and ts <= _twend;

create stream stream_tick_to_klvol_m_05
  interval(5m) sliding(5m)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_klvol_m_05
  output_subtable(concat('t_klvol_m_05_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    _twstart as ts,
    min(a) as min_bid_vol,
    max(a) as max_bid_vol,
    last(a) as fin_bid_vol,
    _twend as create_time
  from (select
    ts,
    csum(vol * cast(2-dir as double)) as a
  from %%tbname
  where ts>= _twstart and ts <= _twend)b;

create stream stream_tick_to_kline_m_15
  interval(15m) sliding(15m)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_kline_m_15
  output_subtable(concat('t_kline_m_15_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    _twstart as ts,
    cast(max(close) as double) as h,
    cast(min(close) as double) as l,
    cast(first(close) as double) as o,
    cast(last(close) as double) as c,
    cast(sum(vol) as double) as vol,
    cast(sum(vol*close) as double) as turn_over,
    _twend as create_time
  from %%tbname
  where ts>= _twstart and ts < _twend;

create stream stream_tick_to_footprint_m_15
  interval(15m) sliding(15m)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_footprint_m_15
  output_subtable(concat('t_footprint_m_15_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    first(ts) as ts,
    close as price,
    cast(sum(case when dir =1 then vol else 0.0 end) as double) as bid_vol,
    cast(sum(case when dir =3 then vol else 0.0 end) as double) as ask_vol,
    cast(sum(case when dir =2 then vol else 0.0 end) as double) as mid_vol,
    _twend as create_time
  from %%tbname
  where ts>= _twstart and ts < _twend
  group by close;

create stream stream_tick_to_kldir_m_15
  interval(15m) sliding(15m)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_kldir_m_15
  output_subtable(concat('t_kldir_m_15_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    _twstart as ts,
    sum(case when dir =1 then vol else 0.0 end) as cumulative_bid_vol,
    sum(case when dir =3 then vol else 0.0 end) as cumulative_ask_vol,
    sum(case when dir =2 then vol else 0.0 end) as cumulative_mid_vol,
    _twend as create_time
  from %%tbname
  where ts>= _twstart and ts <= _twend;

create stream stream_tick_to_klvol_m_15
  interval(15m) sliding(15m)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_klvol_m_15
  output_subtable(concat('t_klvol_m_15_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    _twstart as ts,
    min(a) as min_bid_vol,
    max(a) as max_bid_vol,
    last(a) as fin_bid_vol,
    _twend as create_time
  from (select
    ts,
    csum(vol * cast(2-dir as double)) as a
  from %%tbname
  where ts>= _twstart and ts <= _twend)b;

create stream stream_tick_to_kline_m_30
  interval(30m) sliding(30m)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_kline_m_30
  output_subtable(concat('t_kline_m_30_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    _twstart as ts,
    cast(max(close) as double) as h,
    cast(min(close) as double) as l,
    cast(first(close) as double) as o,
    cast(last(close) as double) as c,
    cast(sum(vol) as double) as vol,
    cast(sum(vol*close) as double) as turn_over,
    _twend as create_time
  from %%tbname
  where ts>= _twstart and ts < _twend;

create stream stream_tick_to_footprint_m_30
  interval(30m) sliding(30m)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_footprint_m_30
  output_subtable(concat('t_footprint_m_30_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    first(ts) as ts,
    close as price,
    cast(sum(case when dir =1 then vol else 0.0 end) as double) as bid_vol,
    cast(sum(case when dir =3 then vol else 0.0 end) as double) as ask_vol,
    cast(sum(case when dir =2 then vol else 0.0 end) as double) as mid_vol,
    _twend as create_time
  from %%tbname
  where ts>= _twstart and ts < _twend
  group by close;

create stream stream_tick_to_kldir_m_30
  interval(30m) sliding(30m)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_kldir_m_30
  output_subtable(concat('t_kldir_m_30_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    _twstart as ts,
    sum(case when dir =1 then vol else 0.0 end) as cumulative_bid_vol,
    sum(case when dir =3 then vol else 0.0 end) as cumulative_ask_vol,
    sum(case when dir =2 then vol else 0.0 end) as cumulative_mid_vol,
    _twend as create_time
  from %%tbname
  where ts>= _twstart and ts <= _twend;

create stream stream_tick_to_klvol_m_30
  interval(30m) sliding(30m)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_klvol_m_30
  output_subtable(concat('t_klvol_m_30_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    _twstart as ts,
    min(a) as min_bid_vol,
    max(a) as max_bid_vol,
    last(a) as fin_bid_vol,
    _twend as create_time
  from (select
    ts,
    csum(vol * cast(2-dir as double)) as a
  from %%tbname
  where ts>= _twstart and ts <= _twend)b;

create stream stream_tick_to_kline_h_01
  interval(1h) sliding(1h)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_kline_h_01
  output_subtable(concat('t_kline_h_01_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    _twstart as ts,
    cast(max(close) as double) as h,
    cast(min(close) as double) as l,
    cast(first(close) as double) as o,
    cast(last(close) as double) as c,
    cast(sum(vol) as double) as vol,
    cast(sum(vol*close) as double) as turn_over,
    _twend as create_time
  from %%tbname
  where ts>= _twstart and ts < _twend;

create stream stream_tick_to_footprint_h_01
  interval(1h) sliding(1h)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_footprint_h_01
  output_subtable(concat('t_footprint_h_01_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    first(ts) as ts,
    close as price,
    cast(sum(case when dir =1 then vol else 0.0 end) as double) as bid_vol,
    cast(sum(case when dir =3 then vol else 0.0 end) as double) as ask_vol,
    cast(sum(case when dir =2 then vol else 0.0 end) as double) as mid_vol,
    _twend as create_time
  from %%tbname
  where ts>= _twstart and ts < _twend
  group by close;

create stream stream_tick_to_kldir_h_01
  interval(1h) sliding(1h)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_kldir_h_01
  output_subtable(concat('t_kldir_h_01_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    _twstart as ts,
    sum(case when dir =1 then vol else 0.0 end) as cumulative_bid_vol,
    sum(case when dir =3 then vol else 0.0 end) as cumulative_ask_vol,
    sum(case when dir =2 then vol else 0.0 end) as cumulative_mid_vol,
    _twend as create_time
  from %%tbname
  where ts>= _twstart and ts <= _twend;

create stream stream_tick_to_klvol_h_01
  interval(1h) sliding(1h)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_klvol_h_01
  output_subtable(concat('t_klvol_h_01_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    _twstart as ts,
    min(a) as min_bid_vol,
    max(a) as max_bid_vol,
    last(a) as fin_bid_vol,
    _twend as create_time
  from (select
    ts,
    csum(vol * cast(2-dir as double)) as a
  from %%tbname
  where ts>= _twstart and ts <= _twend)b;

create stream stream_tick_to_kline_h_04
  interval(4h) sliding(4h)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_kline_h_04
  output_subtable(concat('t_kline_h_04_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    _twstart as ts,
    cast(max(close) as double) as h,
    cast(min(close) as double) as l,
    cast(first(close) as double) as o,
    cast(last(close) as double) as c,
    cast(sum(vol) as double) as vol,
    cast(sum(vol*close) as double) as turn_over,
    _twend as create_time
  from %%tbname
  where ts>= _twstart and ts < _twend;

create stream stream_tick_to_footprint_h_04
  interval(4h) sliding(4h)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_footprint_h_04
  output_subtable(concat('t_footprint_h_04_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    first(ts) as ts,
    close as price,
    cast(sum(case when dir =1 then vol else 0.0 end) as double) as bid_vol,
    cast(sum(case when dir =3 then vol else 0.0 end) as double) as ask_vol,
    cast(sum(case when dir =2 then vol else 0.0 end) as double) as mid_vol,
    _twend as create_time
  from %%tbname
  where ts>= _twstart and ts < _twend
  group by close;

create stream stream_tick_to_kldir_h_04
  interval(4h) sliding(4h)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_kldir_h_04
  output_subtable(concat('t_kldir_h_04_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    _twstart as ts,
    sum(case when dir =1 then vol else 0.0 end) as cumulative_bid_vol,
    sum(case when dir =3 then vol else 0.0 end) as cumulative_ask_vol,
    sum(case when dir =2 then vol else 0.0 end) as cumulative_mid_vol,
    _twend as create_time
  from %%tbname
  where ts>= _twstart and ts <= _twend;

create stream stream_tick_to_klvol_h_04
  interval(4h) sliding(4h)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_klvol_h_04
  output_subtable(concat('t_klvol_h_04_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    _twstart as ts,
    min(a) as min_bid_vol,
    max(a) as max_bid_vol,
    last(a) as fin_bid_vol,
    _twend as create_time
  from (select
    ts,
    csum(vol * cast(2-dir as double)) as a
  from %%tbname
  where ts>= _twstart and ts <= _twend)b;

create stream stream_tick_to_kline_d_01
  interval(1d) sliding(1d)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_kline_d_01
  output_subtable(concat('t_kline_d_01_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    _twstart as ts,
    cast(max(close) as double) as h,
    cast(min(close) as double) as l,
    cast(first(close) as double) as o,
    cast(last(close) as double) as c,
    cast(sum(vol) as double) as vol,
    cast(sum(vol*close) as double) as turn_over,
    _twend as create_time
  from %%tbname
  where ts>= _twstart and ts < _twend;

create stream stream_tick_to_footprint_d_01
  interval(1d) sliding(1d)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_footprint_d_01
  output_subtable(concat('t_footprint_d_01_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    first(ts) as ts,
    close as price,
    cast(sum(case when dir =1 then vol else 0.0 end) as double) as bid_vol,
    cast(sum(case when dir =3 then vol else 0.0 end) as double) as ask_vol,
    cast(sum(case when dir =2 then vol else 0.0 end) as double) as mid_vol,
    _twend as create_time
  from %%tbname
  where ts>= _twstart and ts < _twend
  group by close;

create stream stream_tick_to_kldir_d_01
  interval(1d) sliding(1d)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_kldir_d_01
  output_subtable(concat('t_kldir_d_01_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    _twstart as ts,
    sum(case when dir =1 then vol else 0.0 end) as cumulative_bid_vol,
    sum(case when dir =3 then vol else 0.0 end) as cumulative_ask_vol,
    sum(case when dir =2 then vol else 0.0 end) as cumulative_mid_vol,
    _twend as create_time
  from %%tbname
  where ts>= _twstart and ts <= _twend;

create stream stream_tick_to_klvol_d_01
  interval(1d) sliding(1d)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_klvol_d_01
  output_subtable(concat('t_klvol_d_01_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    _twstart as ts,
    min(a) as min_bid_vol,
    max(a) as max_bid_vol,
    last(a) as fin_bid_vol,
    _twend as create_time
  from (select
    ts,
    csum(vol * cast(2-dir as double)) as a
  from %%tbname
  where ts>= _twstart and ts <= _twend)b;

create stream stream_tick_to_kline_w_01
  interval(1w) sliding(1w)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_kline_w_01
  output_subtable(concat('t_kline_w_01_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    _twstart as ts,
    cast(max(close) as double) as h,
    cast(min(close) as double) as l,
    cast(first(close) as double) as o,
    cast(last(close) as double) as c,
    cast(sum(vol) as double) as vol,
    cast(sum(vol*close) as double) as turn_over,
    _twend as create_time
  from %%tbname
  where ts>= _twstart and ts < _twend;

create stream stream_tick_to_footprint_w_01
  interval(1w) sliding(1w)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_footprint_w_01
  output_subtable(concat('t_footprint_w_01_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    first(ts) as ts,
    close as price,
    cast(sum(case when dir =1 then vol else 0.0 end) as double) as bid_vol,
    cast(sum(case when dir =3 then vol else 0.0 end) as double) as ask_vol,
    cast(sum(case when dir =2 then vol else 0.0 end) as double) as mid_vol,
    _twend as create_time
  from %%tbname
  where ts>= _twstart and ts < _twend
  group by close;

create stream stream_tick_to_kldir_w_01
  interval(1w) sliding(1w)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_kldir_w_01
  output_subtable(concat('t_kldir_w_01_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    _twstart as ts,
    sum(case when dir =1 then vol else 0.0 end) as cumulative_bid_vol,
    sum(case when dir =3 then vol else 0.0 end) as cumulative_ask_vol,
    sum(case when dir =2 then vol else 0.0 end) as cumulative_mid_vol,
    _twend as create_time
  from %%tbname
  where ts>= _twstart and ts <= _twend;

create stream stream_tick_to_klvol_w_01
  interval(1w) sliding(1w)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_klvol_w_01
  output_subtable(concat('t_klvol_w_01_', symbol))
  tags (symbol binary(64) as symbol)
as
  select
    _twstart as ts,
    min(a) as min_bid_vol,
    max(a) as max_bid_vol,
    last(a) as fin_bid_vol,
    _twend as create_time
  from (select
    ts,
    csum(vol * cast(2-dir as double)) as a
  from %%tbname
  where ts>= _twstart and ts <= _twend)b;

删除


drop stream if exists stream_tick_to_klvol_w_01;
drop stream if exists stream_tick_to_kldir_w_01;
drop stream if exists stream_tick_to_footprint_w_01;
drop stream if exists stream_tick_to_kline_w_01;

drop stream if exists stream_tick_to_klvol_d_01;
drop stream if exists stream_tick_to_kldir_d_01;
drop stream if exists stream_tick_to_footprint_d_01;
drop stream if exists stream_tick_to_kline_d_01;

drop stream if exists stream_tick_to_klvol_h_04;
drop stream if exists stream_tick_to_kldir_h_04;
drop stream if exists stream_tick_to_footprint_h_04;
drop stream if exists stream_tick_to_kline_h_04;

drop stream if exists stream_tick_to_klvol_h_01;
drop stream if exists stream_tick_to_kldir_h_01;
drop stream if exists stream_tick_to_footprint_h_01;
drop stream if exists stream_tick_to_kline_h_01;

drop stream if exists stream_tick_to_klvol_m_30;
drop stream if exists stream_tick_to_kldir_m_30;
drop stream if exists stream_tick_to_footprint_m_30;
drop stream if exists stream_tick_to_kline_m_30;

drop stream if exists stream_tick_to_klvol_m_15;
drop stream if exists stream_tick_to_kldir_m_15;
drop stream if exists stream_tick_to_footprint_m_15;
drop stream if exists stream_tick_to_kline_m_15;

drop stream if exists stream_tick_to_klvol_m_05;
drop stream if exists stream_tick_to_kldir_m_05;
drop stream if exists stream_tick_to_footprint_m_05;
drop stream if exists stream_tick_to_kline_m_05;

drop stream if exists stream_tick_to_klvol_m_01;
drop stream if exists stream_tick_to_kldir_m_01;
drop stream if exists stream_tick_to_footprint_m_01;
drop stream if exists stream_tick_to_kline_m_01;

drop stream if exists stream_tick_to_klvol_s_15;
drop stream if exists stream_tick_to_kldir_s_15;
drop stream if exists stream_tick_to_footprint_s_15;
drop stream if exists stream_tick_to_kline_s_15;

drop stream if exists stream_tick_to_klvol_s_05;
drop stream if exists stream_tick_to_kldir_s_05;
drop stream if exists stream_tick_to_footprint_s_05;
drop stream if exists stream_tick_to_kline_s_05;

drop stream if exists stream_tick_to_klvol_s_01;
drop stream if exists stream_tick_to_kldir_s_01;
drop stream if exists stream_tick_to_footprint_s_01;
drop stream if exists stream_tick_to_kline_s_01;

查看流在节点的分布

 select * from information_schema.ins_streams;

写入tick

从指定的kafka写入tick。观察stream的运行。

存量

获得最大吞吐量

增量

查看持续运行稳定性

查询

查询品类列表含最新tick

select symbol,last(ts) as lts,last(close) as lclose,last(dir) as ldir from t_tick group by symbol order by lclose desc limit 0,10;

场景

行情这类基于tick生成kline、footprint、vol、dir后,可通过如下语句查询。

查看总共多少个品类

tick写入时,按照品类做了子表,查询meta数据可获得品类数量。

-- 导出品类列表到特定文件
select replace(table_name,'t_tick_',''),* from information_schema.ins_tables where stable_name ="t_tick" order by table_name >> /home/gauss/stat/symbol.csv;

-- 直接获得品类也行
select replace(table_name,'t_tick_','') from information_schema.ins_tables where stable_name ="t_tick" order by table_name >> /home/gauss/stat/symbol.txt;

品类最新价格与时间

taos> select distinct(symbol) as symbol,last(close) as lclose,last(ts) as lts from t_tick group by symbol order by lclose >> /tmp/symbol_last_price_by_p
rice.txt;
Query OK, 34263 row(s) in set (5.670962s)

taos> select distinct(symbol) as symbol,last(close) as lclose,last(ts) as lts from t_tick group by symbol order by lts >> /tmp/symbol_last_price_by_ts.t
Query OK, 34263 row(s) in set (5.875431s)

taos> select distinct(symbol) as symbol,last(close) as lclose,last(ts) as lts from t_tick group by symbol order by symbol >> /tmp/symbol_last_price_by_symbol.txt;
Query OK, 34263 row(s) in set (8.643509s)

足迹图

-- d
select * from t_footprint_d_01 where symbol = "1100_601398" and TIMETRUNCATE(ts,1d) = "2026-05-14 00:00:00";
-- h
select * from t_footprint_h_01 where symbol = "1100_601398" and TIMETRUNCATE(ts,1h) = "2026-05-14 09:00:00";
-- m
select * from t_footprint_m_01 where symbol = "1100_601398" and TIMETRUNCATE(ts,1m) = "2026-05-14 09:40:00";
-- s
select * from t_footprint_s_01 where symbol = "1100_601398" and TIMETRUNCATE(ts,1s) = "2026-05-14 09:40:00";

-- 5
select first(ts),symbol,price,sum(bid_vol),sum(ask_vol),sum(mid_vol),first(create_time) from t_footprint_m_01 where symbol = "1100_601398" and TIMETRUNCATE(ts,1m)>= "2026-05-14 09:40:00" and  TIMETRUNCATE(ts,1m)< "2026-05-14 09:45:00" group by symbol,price order by price;
-- 10
select first(ts),symbol,price,sum(bid_vol),sum(ask_vol),sum(mid_vol),first(create_time) from t_footprint_m_01 where symbol = "1100_601398" and TIMETRUNCATE(ts,1m)>= "2026-05-14 09:40:00" and  TIMETRUNCATE(ts,1m)< "2026-05-14 09:50:00" group by symbol,price order by price;
-- 15
select first(ts),symbol,price,sum(bid_vol),sum(ask_vol),sum(mid_vol),first(create_time) from t_footprint_m_01 where symbol = "1100_601398" and TIMETRUNCATE(ts,1m)>= "2026-05-14 09:45:00" and  TIMETRUNCATE(ts,1m)< "2026-05-14 10:00:00" group by symbol,price order by pric
e;
-- 30
select first(ts),symbol,price,sum(bid_vol),sum(ask_vol),sum(mid_vol),first(create_time) from t_footprint_m_01 where symbol = "1100_601398" and TIMETRUNCATE(ts,1m)>= "2026-05-14 09:45:00" and  TIMETRUNCATE(ts,1m)< "2026-05-14 10:00:00" group by symbol,price order by price;

K线

select * from t_kline_d_01 where symbol = "1100_601398" and TIMETRUNCATE(ts,1d) = "2026-05-14 00:00:00";

select * from t_kline_h_01 where symbol = "1100_601398" and TIMETRUNCATE(ts,1h) = "2026-05-14 09:00:00";

select * from t_kline_m_01 where symbol = "1100_601398" and TIMETRUNCATE(ts,1m) = "2026-05-14 09:40:00";

select * from t_kline_s_01 where symbol = "1100_601398" and TIMETRUNCATE(ts,1s) = "2026-05-14 09:40:00";

成交方向

select * from t_kldir_d_01 where symbol = "1100_601398" and TIMETRUNCATE(ts,1d) = "2026-05-14 00:00:00";

select * from t_kldir_h_01 where symbol = "1100_601398" and TIMETRUNCATE(ts,1h) = "2026-05-14 09:00:00";

select * from t_kldir_m_01 where symbol = "1100_601398" and TIMETRUNCATE(ts,1m) = "2026-05-14 09:40:00";

select * from t_kldir_s_01 where symbol = "1100_601398" and TIMETRUNCATE(ts,1s) = "2026-05-14 09:40:00";

买盘量k线

select * from t_klvol_d_01 where symbol = "1100_601398" and TIMETRUNCATE(ts,1d) = "2026-05-14 00:00:00";

select * from t_klvol_h_01 where symbol = "1100_601398" and TIMETRUNCATE(ts,1h) = "2026-05-14 09:00:00";

select * from t_klvol_m_01 where symbol = "1100_601398" and TIMETRUNCATE(ts,1m) = "2026-05-14 09:40:00";

select * from t_klvol_s_01 where symbol = "1100_601398" and TIMETRUNCATE(ts,1s) = "2026-05-14 09:40:00";

-- 5
select first(ts),min(min_bid_vol),max(max_bid_vol),sum(fin_bid_vol) from t_klvol_s_01 where symbol = "1100_601398" and TIMETRUNCATE(ts,1s) >= "2026-05-14 09:40:00" and TIMETRUNCATE(ts,1s) < "2026-05-14 09:45:00" group by symbol ;

-- 30
select first(ts),min(min_bid_vol),max(max_bid_vol),sum(fin_bid_vol) from t_klvol_s_01 where symbol = "1100_601398" and TIMETRUNCATE(ts,1s) >= "2026-05-14 09:30:00" and TIMETRUNCATE(ts,1s) < "2026-05-14 10:00:00" group by symbol ;

集群

taosd社区版本支持计算复制集、计算、查询分离,以合理分担压力。可使用三台机器形成数据复制实例,并将流计算放在一个节点,查询放在一个节点,主管理在一个节点。可如下配置/etc/taos/taos.cfg

hosts

每一个节点fqdn就是主机名,并需要在/etc/hosts配置,确保网络是通的。



192.168.7.111 snode
192.168.7.186 mnode
192.168.7.87  qnode

mnode

# The end point of the first dnode in the cluster to be connected to when this dnode or the CLI utility is started
firstEp                   mnode:6030
fqdn                      mnode
serverPort                6030
supportVnodes             1
supportSnodes             0
supportQnodes             0

默认firstEp就是mnode,taos命令增加mnodes时,所在节点可能会有error,时正常的提示,meta数据会从mnode同步。

-- 创建mnode要求基于dnode来,详细见dnode章节
create mnode on dnode 2;
create mnode on dnode 3;

dnode

create dnode "192.168.7.87";
create dnode "192.168.7.111";

show mnodes;
show dnodes; --  可以获得id,用来创建snodes,流计算需要snode,如果有多个snode,可以更可靠

snode

firstEp                   mnode:6030
fqdn                     snode
serverPort                6030
supportVnodes             256
supportQnodes             1
supportSnodes             1

qnode

firstEp                   mnode:6030
fqdn                     qnode
serverPort                6030
supportVnodes             256
supportQnodes             1
supportSnodes             0
create snode on dnode 1;
create snode on dnode 3;
create snode on dnode 4;

taos> show dnodes;
     id      |            endpoint            | vnodes | support_vnodes |    status    |       create_time       |       reboot_time       |              note              |
=============================================================================================================================================================================
           1 | tdengine-0.186:6030            |      3 |             29 | ready        | 2026-04-24 18:16:11.228 | 2026-05-06 17:51:10.269 |                                |
           3 | ct4dev:6030                    |      2 |            256 | ready        | 2026-05-06 16:35:02.383 | 2026-05-07 13:40:44.757 |                                |
           4 | tdengine-7.87:6030             |      2 |            256 | ready        | 2026-05-06 17:19:23.917 | 2026-05-06 17:51:46.640 |                                |
Query OK, 3 row(s) in set (0.005232s)

taos> show snodes;
     id      |            endpoint            |       create_time       |  replicaId  |          asReplicaOf           |
========================================================================================================================
           1 | tdengine-0.186:6030            | 2026-05-06 18:12:28.481 |           3 | 3, 4                           |
           3 | ct4dev:6030                    | 2026-05-06 16:54:59.151 |           1 | 1                              |
           4 | tdengine-7.87:6030             | 2026-05-06 18:12:30.914 |           1 | None                           |
Query OK, 3 row(s) in set (0.005251s)

订阅

创建主题

create topic `tick2` as select * from `db_tick`.`t_tick`

可获得这个链接: tmq+ws://gauss:Gauss20250922%40@192.168.7.111:6041/tick2

消费数据

consumer>tree
.
├── pom.xml
├── src
│   ├── main
│   │   └── java
│   │       └── com
│   │           └── taos
│   │               └── App.java
│   └── test
│       └── java
│           └── com
│               └── taos
│                   └── AppTest.java
// export TDENGINE_JDBC_URL="jdbc:TAOS-RS://192.168.7.186:6041/rest/tmq?user=root&password=A7fK9xQ2LmR8tY4Z"
package com.taos;

import com.taosdata.jdbc.tmq.ConsumerRecord;
import com.taosdata.jdbc.tmq.ConsumerRecords;
import com.taosdata.jdbc.tmq.TMQConstants;
import com.taosdata.jdbc.tmq.TaosConsumer;

import java.sql.SQLException;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;

public class App {
    public static void main(String[] args) throws SQLException {

        String url = System.getenv("TDENGINE_JDBC_URL");

        Properties properties = new Properties();
        properties.setProperty(TMQConstants.CONNECT_TYPE, "ws");
        properties.setProperty(TMQConstants.CONNECT_URL, url);
        properties.setProperty(TMQConstants.CONNECT_TIMEOUT, "10000");
        properties.setProperty(TMQConstants.CONNECT_MESSAGE_TIMEOUT, "10000");
        properties.setProperty(TMQConstants.MSG_WITH_TABLE_NAME, "true");
        properties.setProperty(TMQConstants.ENABLE_AUTO_COMMIT, "true");
        properties.setProperty(TMQConstants.GROUP_ID, "gId");
        properties.setProperty(TMQConstants.VALUE_DESERIALIZER, "com.taosdata.jdbc.tmq.MapDeserializer");


        try (TaosConsumer<Map<String, Object>> consumer = new TaosConsumer<>(properties)) {
            consumer.subscribe(Collections.singletonList("tick2"));

            // 无限循环消费
            while (true) {
                try {
                    // 核心:poll 套一层 try-catch,忽略末尾的信号错误
                    ConsumerRecords<Map<String, Object>> consumerRecords = consumer.poll(Duration.ofMillis(100));
                    for (ConsumerRecord<Map<String, Object>> r : consumerRecords) {


                        Map<String, Object> bean = r.value();
                        bean.forEach((k, v) -> {
                            if ("symbol".equals(k) && v instanceof byte[]) {
                                System.out.print(k + " : " + new String((byte[]) v) + " ");
                            } else {
                                System.out.print(k + " : " + v + " ");
                            }
                        });
                        System.out.println();
                    }
                } catch (Exception e) {
                    String msg = e.getMessage();

                    // ✅ 忽略这两种常见“非错误”:
                    // 1. 消息类型不匹配
                    // 2. 读取超时(正常无数据)
                    if (msg.contains("message type is not data")
                        || msg.contains("Fetch data timeout")) {

                        try { Thread.sleep(200); } catch (InterruptedException ignored) {}
                        continue;
                    }

                    // 其他错误才打印并退出
                    e.printStackTrace();
                    break;
                }

            }
        }

    }
}


package com.taos;

import junit.framework.Test;
import junit.framework.TestCase;
import junit.framework.TestSuite;

/**
 * Unit test for simple App.
 */
public class AppTest
    extends TestCase
{
    /**
     * Create the test case
     *
     * @param testName name of the test case
     */
    public AppTest( String testName )
    {
        super( testName );
    }

    /**
     * @return the suite of tests being tested
     */
    public static Test suite()
    {
        return new TestSuite( AppTest.class );
    }

    /**
     * Rigourous Test :-)
     */
    public void testApp()
    {
        assertTrue( true );
    }
}
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <groupId>com.taos</groupId>
        <artifactId>consumer</artifactId>
        <packaging>jar</packaging>
        <version>1.0.0</version>
        <name>consumer</name>
        <url>http://maven.apache.org</url>

        <properties>
                <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
                <maven.compiler.source>1.8</maven.compiler.source>
                <maven.compiler.target>1.8</maven.compiler.target>
        </properties>

        <dependencies>
                <dependency>
                        <groupId>junit</groupId>
                        <artifactId>junit</artifactId>
                        <version>3.8.1</version>
                        <scope>test</scope>
                </dependency>

                <dependency>
                        <groupId>com.taosdata.jdbc</groupId>
                        <artifactId>taos-jdbcdriver</artifactId>
                        <version>3.2.1</version>
                </dependency>
        </dependencies>
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-assembly-plugin</artifactId>
                    <version>3.3.0</version>
                    <configuration>
                        <archive>
                            <manifest>
                                <mainClass>com.taos.App</mainClass>
                            </manifest>
                        </archive>
                        <descriptorRefs>
                            <descriptorRef>jar-with-dependencies</descriptorRef>
                        </descriptorRefs>
                    </configuration>
                    <executions>
                        <execution>
                            <id>make-assembly</id>
                            <phase>package</phase>
                            <goals>
                                <goal>single</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
</project>

异常

限定资源上限

TDengien如果不限定资源上线,一些节点会被拉死,导致无法工作之外,连登录都无法做到了。可通过Linux的Service进行资源上限控制。

# cat /etc/systemd/system/taosd.service
[Unit]
Description=TDengine server service
After=network-online.target
Wants=network-online.target

[Service]
CPUQuota=800%
MemoryMax=8G
MemoryHigh=6G
Type=simple
ExecStart=/usr/bin/taosd
ExecStartPre=/usr/local/taos/bin/startPre.sh
TimeoutStopSec=1000000s
LimitNOFILE=infinity
LimitNPROC=infinity
LimitCORE=infinity
TimeoutStartSec=0
StandardOutput=null
Restart=always
StartLimitBurst=3
StartLimitInterval=900s

[Install]
WantedBy=multi-user.target

杀死进程

kill -9 taosd后,taosd可能存在坏掉的wal,而且taos无法自行恢复。这应该是一个重大bug,证明其wal机制存在瑕疵。

删除数据

如果删除data目录数据,重启节点,这个节点将如同新启动的节点一样开始工作。如果这是一台firstEP,那就是新的,否则将从firstEP所在的集群同步数据。

rm -r -f $data,如果发生在单个mnode集群的mnode上,如果重启此节点,其他节点会即可同步数据,导致数据丢失无法找回。

如果不重启,mnode会一直报错,但是也只是报错,没有任何有意义的提示。

猜测应该是mnode保存了schema,schema丢失,其他数据也无法访问。

如果配置多个mnode,不确定删除一个,能否从另外的同步过来。这里面估计还要区分是否firstEP。待测试验证。

表名

流计算通常使用tag来作为表明,tag却不能包含特殊字符如.

SELECT DISTINCT symbol  FROM t_tick  WHERE symbol REGEXP '[^a-zA-Z0-9_]';

-- 创建的流可将symbol的特定字符替换,以满足表名命名规则
create stream stream_tick_to_kline_m_01
  interval(1m) sliding(1m)
from t_tick
  partition by tbname,symbol
  stream_options(ignore_nodata_trigger | max_delay(1m))
into t_kline_m_01
  output_subtable(concat('t_kline_m_01_', replace(symbol, '.', '_')))
  tags (symbol binary(64) as symbol)
as
  select
    _twstart as ts,
    cast(max(close) as double) as h,
    cast(min(close) as double) as l,
    cast(first(close) as double) as o,
    cast(last(close) as double) as c,
    cast(sum(vol) as double) as vol,
    cast(sum(vol*close) as double) as turn_over,
    _twend as create_time
  from %%tbname
  where ts>= _twstart and ts < _twend;

文件数量

在本文描述场景中,tdengine将打开近7000个文件。如果文件句柄不够,将造成流无法创建新的 数据,甚至taosd因为无法打开wal文件而崩溃。

另外当taos受制于文件句柄,内存时,将吃光系统内存,导致无法登录系统。可通过服务限制其最高cpu和内存,确保服务器不崩溃。

#/etc/sysctl.conf
sudo sysctl -p
kernel.core_pattern = core.%e.%p.%t
vm.swappiness = 0
vm.vfs_cache_pressure = 50
vm.dirty_ratio = 20
vm.dirty_background_ratio = 10
vm.dirty_expire_centisecs = 2000
# 如下两行
fs.nr_open = 1048576
fs.file-max = 2097152

#cat /etc/systemd/system/taosd.service
[Unit]
Description=TDengine server service
After=network-online.target
Wants=network-online.target

[Service]
# CPU 限制:200% 表示最多使用 2 个完整的 CPU 核心
CPUQuota=200%
# 内存限制:8GB 硬上限
MemoryMax=8G
# 内存限制:6GB 软上限
MemoryHigh=6G
Type=simple
ExecStart=/usr/bin/taosd
ExecStartPre=/usr/local/taos/bin/startPre.sh
TimeoutStopSec=1000000s
LimitNOFILE=infinity
LimitNPROC=infinity
LimitCORE=infinity
TimeoutStartSec=0
StandardOutput=null
Restart=always
StartLimitBurst=3
StartLimitInterval=900s

[Install]
WantedBy=multi-user.target

数据迁移

运行中的数据库,会将节点配置信息写入数据目录的文件中,导致taos.cfg配置无效。比如之前使用ip-port启动的服务,如果直接拷贝数据目录到其他地方,将还是只能从原先设定的ip-port启动。例如:

cat $data/dnode/dnode.json
{
        "dnodeId":      3,
        "dnodeVer":     "8",
        "engineVer":    "30000000",
        "clusterId":    "4482909662002860117",
        "dropped":      0,
        "dnodes":       [{
                        "id":   1,
                        "fqdn": "192.168.7.186",
                        "port": 6030,
                        "isMnode":      1
                }, {
                        "id":   2,
                        "fqdn": "192.168.7.111",
                        "port": 6030,
                        "isMnode":      1
                }, {
                        "id":   3,
                        "fqdn": "192.168.7.87",
                        "port": 6030,
                        "isMnode":      1
                }]
}

cat $dat/snode/snode3/snode.json
{
        "deployed":     1,
        "snodeId":      "3",
        "leader0":      {
                "nodeId":       "0",
                "inUse":        "0",
                "numOfEps":     "0"
        },
        "leader1":      {
                "nodeId":       "0",
                "inUse":        "0",
                "numOfEps":     "0"
        },
        "replica":      {
                "nodeId":       "1",
                "inUse":        "0",
                "numOfEps":     "1",
                "eps":  [{
                                "fqdn": "192.168.7.186",
                                "port": "6030"
                        }]
        }
}

cat $dat/mnode/mnode.json
{
        "lastIndex":    142,
        "deployed":     1,
        "version":      1,
        "encrypted":    0
}


评论