从源码构建
# 安装构建基本环境
sudo apt-get install -y gcc cmake build-essential git libjansson-dev libsnappy-dev liblzma-dev zlib1g-dev pkg-config libtool autoconf automake groff
# 用来安装adaptor keeper explore之类的辅助,如果不要可以不安装go
sudo apt install -y golang-go
# 指定版本下载源码
git clone --depth 1 --branch ver-3.4.1.0 https://github.com/taosdata/TDengine.git
cd TDengine/
#
cmake -B build -DBUILD_TOOLS=true -DBUILD_CONTRIB=true
cmake -B build -DBUILD_TOOLS=true -DBUILD_CONTRIB=true -DBUILD_KEEPER=true -DBUILD_HTTP=false
cmake --build build/ install
配置
# 可检查下/etc/taos.cfg的配置,一般只需要修改下日志、数据的存放路径,然后可直接启动服务
systemctrl start taos
# 首次登录无密码,如果无日志写入权限,需要sudo
taos
# 进入之后,可设置密码,创建用户
alter user root pass 'abc123456@';
create user gauss pass 'Gauss20250922@';
# 设定密码后,再次使用客户端
sudo taos -uroot -fb123456@
# 如果要使用流计算,需要创建sndoe,再taos命令下
show dnodes;
create snode on dnode 1;
# 删除snode
drop snode on dnode 1;taos.cfg
-- 第一个节点的配置
firstEp 192.168.7.186:6030
fqdn 192.168.7.186
serverPort 6030
# logDir /var/log/taos
# dataDir /var/lib/taos
# tempDir /tmp/
supportVnodes 256
timezone Asia/Shanghai
-- 后期加入的数据节点1的配置
firstEp 192.168.7.111:6030
fqdn 192.168.7.87
serverPort 6030
logDir /data01/gauss/soft/taos/log
dataDir /data01/gauss/soft/taos/dat
tempDir /data01/gauss/soft/taos/tmp
supportVnodes 256
timezone Asia/Shanghai
-- 后期加入的数据节点2的配置
firstEp 192.168.7.186:6030
fqdn 192.168.7.111
serverPort 6030
logDir /data/gaoyong/soft/taos/log_new/
dataDir /data/gaoyong/soft/taos/data_new/
tempDir /data/gaoyong/soft/taos/tmp_new/
supportVnodes 256
timezone Asia/Shanghai节点
首次启动顺序,先第一个节点,taos登录,使用root用户,密码为空
-- 修改root的密码
alter user root pass "root@123456";
-- 添加用户
create user gauss pass "gauss@123456";
-- 添加数据节点
create dnode "192.168.7.87";
create dnode "192.168.7.111";
-- 启动另外两个节点,可看到状态从offline到ready
show dnodes;
-- 创建管理节点 :第一个节点默认时mnode
create mnode on dnode 2;
create mnode on dnode 3;
show mnodes;
-- 创建流计算节点
create snode on dnode 1;
create snode on dnode 2;
create snode on dnode 3;
show snodes;环境变量
SHOW LOCAL VARIABLES;
数据库
创建
# tdengine的数据库有一些配置选项,这里希望缓存最后数据,提供十年存放周期,时间精度为ms
CREATE DATABASE `db_tick`
CACHEMODEL 'last_row'
KEEP 36500
PRECISION 'ms' ;删除
# 删除数据库 : tdengine如果只删除超级表,不删除子表,会存在隐患
drop database `db_tick`;查看
# 查看数据库列表
show databases;
# 查看数据库DDL
show create database db_tick\G;
# 查看数据库里的超级表
select stable_name,db_name,`tags` from information_schema.ins_stables order by stable_name;
# 选定指定数据库
use db_tick;
# 显示超级表列表
show stables;
# 查看超级表DDL
show create stable t_tick\G;超级表
创建
CREATE STABLE `t_tick` (
`ts` TIMESTAMP,
`ask` DOUBLE,
`bid` DOUBLE,
`close` DOUBLE,
`vol` DOUBLE,
`dir` TINYINT,
`create_time` TIMESTAMP
) TAGS (`symbol` VARCHAR(64)) ;
CREATE STABLE `t_kline_s_01` (
`ts` TIMESTAMP,
`h` DOUBLE,
`l` DOUBLE,
`o` DOUBLE,
`c` DOUBLE,
`vol` DOUBLE,
`turn_over` DOUBLE,
`create_time` TIMESTAMP
) TAGS (`symbol` VARCHAR(64)) ;
CREATE STABLE `t_kline_dir_s_01` (
`ts` TIMESTAMP,
`cumulative_bid_vol` DOUBLE,
`cumulative_ask_vol` DOUBLE,
`cumulative_mid_vol` DOUBLE,
`create_time` TIMESTAMP
) TAGS (`symbol` VARCHAR(64)) ;
CREATE STABLE `t_kline_vol_s_01` (
`ts` TIMESTAMP,
`min_bid_vol` DOUBLE,
`max_bid_vol` DOUBLE,
`fin_bid_vol` DOUBLE,
`create_time` TIMESTAMP
) TAGS (`symbol` VARCHAR(64)) ;
CREATE STABLE `t_footprint_s_01` (
`ts` TIMESTAMP,
`price` DOUBLE,
`bid_vol` DOUBLE,
`ask_vol` DOUBLE,
`mid_vol` DOUBLE,
`create_time` TIMESTAMP
) TAGS (`symbol` VARCHAR(64)) ;流
流中如果要创建表名,需要确保名字合法。比如基于symbol创建的表,可能存在.之类的特殊符号。
replace(symbol, '.', '_')创建
drop stream stream_tick_to_kline_s_01;
create stream stream_tick_to_kline_s_01
interval(1s) sliding(1s)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m) |watermark(10s))
into t_kline_s_01
output_subtable(concat('t_kline_s_01_', replace(symbol, '.', '_')))
tags (symbol binary(64) as symbol)
as
select
_twstart as ts,
cast(max(close) as double) as h,
cast(min(close) as double) as l,
cast(first(close) as double) as o,
cast(last(close) as double) as c,
cast(sum(vol) as double) as vol,
cast(sum(vol*close) as double) as turn_over,
_twend as create_time
from %%tbname
where ts>= _twstart and ts < _twend;
drop stream stream_tick_to_footprint_s_01;
create stream stream_tick_to_footprint_s_01
interval(1s) sliding(1s)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m)|watermark(10s))
into t_footprint_s_01
output_subtable(concat('t_footprint_s_01_', symbol))
tags (symbol binary(64) as symbol)
as
select
first(ts) as ts,
close as price,
cast(sum(case when dir =1 then vol else 0.0 end) as double) as bid_vol,
cast(sum(case when dir =3 then vol else 0.0 end) as double) as ask_vol,
cast(sum(case when dir =2 then vol else 0.0 end) as double) as mid_vol,
_twend as create_time
from %%tbname
where ts>= _twstart and ts < _twend
group by close;
drop stream stream_tick_to_kline_dir_s_01;
create stream stream_tick_to_kline_dir_s_01
interval(1s) sliding(1s)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_kline_dir_s_01
output_subtable(concat('t_kline_dir_s_01_', symbol))
tags (symbol binary(64) as symbol)
as
select
_twstart as ts,
sum(case when dir =1 then vol else 0.0 end) as cumulative_bid_vol,
sum(case when dir =3 then vol else 0.0 end) as cumulative_ask_vol,
sum(case when dir =2 then vol else 0.0 end) as cumulative_mid_vol,
_twend as create_time
from %%tbname
where ts>= _twstart and ts <= _twend;
drop stream stream_tick_to_kline_vol_s_01;
create stream stream_tick_to_kline_vol_s_01
interval(1s) sliding(1s)
from db_tick.t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m) |watermark(10s))
into db_tick.t_kline_vol_s_01
output_subtable(concat('t_kline_vol_s_01_', symbol))
tags (symbol binary(64) as symbol)
as
select
_twstart as ts,
min(a) as min_bid_vol,
max(a) as max_bid_vol,
last(a) as fin_bid_vol,
_twend as create_time
from (select
ts,
csum(vol * cast(2-dir as double)) as a
from %%tbname
where ts>= _twstart and ts <= _twend)b;查看运行状态
select stream_name ,stream_id,`snodeLeader`,`snodeReplica`,status from information_schema.ins_streams order by stream_name;测试
创建1,5,15s,1,5,15,30m,1,4h,1d,1w这11个周期的流计算,每个都有基于tick的四个计算包括kline、klvol、kldir、footprint。如下脚本。
超级表
创建
CREATE STABLE `t_tick` (`ts` TIMESTAMP, `ask` DOUBLE, `bid` DOUBLE, `close` DOUBLE, `vol` DOUBLE, `dir` TINYINT, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_kline_s_01` (`ts` TIMESTAMP, `h` DOUBLE, `l` DOUBLE, `o` DOUBLE, `c` DOUBLE, `vol` DOUBLE, `turn_over` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_kldir_s_01` (`ts` TIMESTAMP, `cumulative_bid_vol` DOUBLE, `cumulative_ask_vol` DOUBLE, `cumulative_mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_klvol_s_01` (`ts` TIMESTAMP, `min_bid_vol` DOUBLE, `max_bid_vol` DOUBLE, `fin_bid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_footprint_s_01` (`ts` TIMESTAMP, `price` DOUBLE, `bid_vol` DOUBLE, `ask_vol` DOUBLE, `mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_kline_s_05` (`ts` TIMESTAMP, `h` DOUBLE, `l` DOUBLE, `o` DOUBLE, `c` DOUBLE, `vol` DOUBLE, `turn_over` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_kldir_s_05` (`ts` TIMESTAMP, `cumulative_bid_vol` DOUBLE, `cumulative_ask_vol` DOUBLE, `cumulative_mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_klvol_s_05` (`ts` TIMESTAMP, `min_bid_vol` DOUBLE, `max_bid_vol` DOUBLE, `fin_bid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_footprint_s_05` (`ts` TIMESTAMP, `price` DOUBLE, `bid_vol` DOUBLE, `ask_vol` DOUBLE, `mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_kline_s_15` (`ts` TIMESTAMP, `h` DOUBLE, `l` DOUBLE, `o` DOUBLE, `c` DOUBLE, `vol` DOUBLE, `turn_over` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_kldir_s_15` (`ts` TIMESTAMP, `cumulative_bid_vol` DOUBLE, `cumulative_ask_vol` DOUBLE, `cumulative_mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_klvol_s_15` (`ts` TIMESTAMP, `min_bid_vol` DOUBLE, `max_bid_vol` DOUBLE, `fin_bid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_footprint_s_15` (`ts` TIMESTAMP, `price` DOUBLE, `bid_vol` DOUBLE, `ask_vol` DOUBLE, `mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_kline_m_01` (`ts` TIMESTAMP, `h` DOUBLE, `l` DOUBLE, `o` DOUBLE, `c` DOUBLE, `vol` DOUBLE, `turn_over` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_kldir_m_01` (`ts` TIMESTAMP, `cumulative_bid_vol` DOUBLE, `cumulative_ask_vol` DOUBLE, `cumulative_mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_klvol_m_01` (`ts` TIMESTAMP, `min_bid_vol` DOUBLE, `max_bid_vol` DOUBLE, `fin_bid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_footprint_m_01` (`ts` TIMESTAMP, `price` DOUBLE, `bid_vol` DOUBLE, `ask_vol` DOUBLE, `mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_kline_m_05` (`ts` TIMESTAMP, `h` DOUBLE, `l` DOUBLE, `o` DOUBLE, `c` DOUBLE, `vol` DOUBLE, `turn_over` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_kldir_m_05` (`ts` TIMESTAMP, `cumulative_bid_vol` DOUBLE, `cumulative_ask_vol` DOUBLE, `cumulative_mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_klvol_m_05` (`ts` TIMESTAMP, `min_bid_vol` DOUBLE, `max_bid_vol` DOUBLE, `fin_bid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_footprint_m_05` (`ts` TIMESTAMP, `price` DOUBLE, `bid_vol` DOUBLE, `ask_vol` DOUBLE, `mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_kline_m_15` (`ts` TIMESTAMP, `h` DOUBLE, `l` DOUBLE, `o` DOUBLE, `c` DOUBLE, `vol` DOUBLE, `turn_over` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_kldir_m_15` (`ts` TIMESTAMP, `cumulative_bid_vol` DOUBLE, `cumulative_ask_vol` DOUBLE, `cumulative_mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_klvol_m_15` (`ts` TIMESTAMP, `min_bid_vol` DOUBLE, `max_bid_vol` DOUBLE, `fin_bid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_footprint_m_15` (`ts` TIMESTAMP, `price` DOUBLE, `bid_vol` DOUBLE, `ask_vol` DOUBLE, `mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_kline_m_30` (`ts` TIMESTAMP, `h` DOUBLE, `l` DOUBLE, `o` DOUBLE, `c` DOUBLE, `vol` DOUBLE, `turn_over` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_kldir_m_30` (`ts` TIMESTAMP, `cumulative_bid_vol` DOUBLE, `cumulative_ask_vol` DOUBLE, `cumulative_mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_klvol_m_30` (`ts` TIMESTAMP, `min_bid_vol` DOUBLE, `max_bid_vol` DOUBLE, `fin_bid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_footprint_m_30` (`ts` TIMESTAMP, `price` DOUBLE, `bid_vol` DOUBLE, `ask_vol` DOUBLE, `mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_kline_h_01` (`ts` TIMESTAMP, `h` DOUBLE, `l` DOUBLE, `o` DOUBLE, `c` DOUBLE, `vol` DOUBLE, `turn_over` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_kldir_h_01` (`ts` TIMESTAMP, `cumulative_bid_vol` DOUBLE, `cumulative_ask_vol` DOUBLE, `cumulative_mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_klvol_h_01` (`ts` TIMESTAMP, `min_bid_vol` DOUBLE, `max_bid_vol` DOUBLE, `fin_bid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_footprint_h_01` (`ts` TIMESTAMP, `price` DOUBLE, `bid_vol` DOUBLE, `ask_vol` DOUBLE, `mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_kline_h_04` (`ts` TIMESTAMP, `h` DOUBLE, `l` DOUBLE, `o` DOUBLE, `c` DOUBLE, `vol` DOUBLE, `turn_over` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_kldir_h_04` (`ts` TIMESTAMP, `cumulative_bid_vol` DOUBLE, `cumulative_ask_vol` DOUBLE, `cumulative_mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_klvol_h_04` (`ts` TIMESTAMP, `min_bid_vol` DOUBLE, `max_bid_vol` DOUBLE, `fin_bid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_footprint_h_04` (`ts` TIMESTAMP, `price` DOUBLE, `bid_vol` DOUBLE, `ask_vol` DOUBLE, `mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_kline_d_01` (`ts` TIMESTAMP, `h` DOUBLE, `l` DOUBLE, `o` DOUBLE, `c` DOUBLE, `vol` DOUBLE, `turn_over` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_kldir_d_01` (`ts` TIMESTAMP, `cumulative_bid_vol` DOUBLE, `cumulative_ask_vol` DOUBLE, `cumulative_mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_klvol_d_01` (`ts` TIMESTAMP, `min_bid_vol` DOUBLE, `max_bid_vol` DOUBLE, `fin_bid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_footprint_d_01` (`ts` TIMESTAMP, `price` DOUBLE, `bid_vol` DOUBLE, `ask_vol` DOUBLE, `mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_kline_w_01` (`ts` TIMESTAMP, `h` DOUBLE, `l` DOUBLE, `o` DOUBLE, `c` DOUBLE, `vol` DOUBLE, `turn_over` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_kldir_w_01` (`ts` TIMESTAMP, `cumulative_bid_vol` DOUBLE, `cumulative_ask_vol` DOUBLE, `cumulative_mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_klvol_w_01` (`ts` TIMESTAMP, `min_bid_vol` DOUBLE, `max_bid_vol` DOUBLE, `fin_bid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
CREATE STABLE `t_footprint_w_01` (`ts` TIMESTAMP, `price` DOUBLE, `bid_vol` DOUBLE, `ask_vol` DOUBLE, `mid_vol` DOUBLE, `create_time` TIMESTAMP) TAGS (`symbol` VARCHAR(64)) SECURE_DELETE 0;
删除
drop stable if exists `t_footprint_w_01`;
drop stable if exists `t_footprint_d_01`;
drop stable if exists `t_footprint_h_04`;
drop stable if exists `t_footprint_h_01`;
drop stable if exists `t_footprint_m_30`;
drop stable if exists `t_footprint_m_15`;
drop stable if exists `t_footprint_m_05`;
drop stable if exists `t_footprint_m_01`;
drop stable if exists `t_footprint_s_15`;
drop stable if exists `t_footprint_s_05`;
drop stable if exists `t_footprint_s_01`;
drop stable if exists `t_klvol_w_01`;
drop stable if exists `t_klvol_d_01`;
drop stable if exists `t_klvol_h_04`;
drop stable if exists `t_klvol_h_01`;
drop stable if exists `t_klvol_m_30`;
drop stable if exists `t_klvol_m_15`;
drop stable if exists `t_klvol_m_05`;
drop stable if exists `t_klvol_m_01`;
drop stable if exists `t_klvol_s_15`;
drop stable if exists `t_klvol_s_05`;
drop stable if exists `t_klvol_s_01`;
drop stable if exists `t_kldir_w_01`;
drop stable if exists `t_kldir_d_01`;
drop stable if exists `t_kldir_h_04`;
drop stable if exists `t_kldir_h_01`;
drop stable if exists `t_kldir_m_30`;
drop stable if exists `t_kldir_m_15`;
drop stable if exists `t_kldir_m_05`;
drop stable if exists `t_kldir_m_01`;
drop stable if exists `t_kldir_s_15`;
drop stable if exists `t_kldir_s_05`;
drop stable if exists `t_kldir_s_01`;
drop stable if exists `t_kline_w_01`;
drop stable if exists `t_kline_d_01`;
drop stable if exists `t_kline_h_04`;
drop stable if exists `t_kline_h_01`;
drop stable if exists `t_kline_m_30`;
drop stable if exists `t_kline_m_15`;
drop stable if exists `t_kline_m_05`;
drop stable if exists `t_kline_m_01`;
drop stable if exists `t_kline_s_15`;
drop stable if exists `t_kline_s_05`;
drop stable if exists `t_kline_s_01`;
drop stable if exists `t_tick`;
流
创建
create stream stream_tick_to_kline_s_01
interval(1s) sliding(1s)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_kline_s_01
output_subtable(concat('t_kline_s_01_', replace(replace(symbol,'.','_'),'&','_')))
tags (symbol binary(64) as symbol)
as
select
_twstart as ts,
cast(max(close) as double) as h,
cast(min(close) as double) as l,
cast(first(close) as double) as o,
cast(last(close) as double) as c,
cast(sum(vol) as double) as vol,
cast(sum(vol*close) as double) as turn_over,
_twend as create_time
from %%tbname
where ts>= _twstart and ts < _twend;
create stream stream_tick_to_footprint_s_01
interval(1s) sliding(1s)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_footprint_s_01
output_subtable(concat('t_footprint_s_01_', symbol))
tags (symbol binary(64) as symbol)
as
select
first(ts) as ts,
close as price,
cast(sum(case when dir =1 then vol else 0.0 end) as double) as bid_vol,
cast(sum(case when dir =3 then vol else 0.0 end) as double) as ask_vol,
cast(sum(case when dir =2 then vol else 0.0 end) as double) as mid_vol,
_twend as create_time
from %%tbname
where ts>= _twstart and ts < _twend
group by close;
create stream stream_tick_to_kldir_s_01
interval(1s) sliding(1s)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_kldir_s_01
output_subtable(concat('t_kldir_s_01_', symbol))
tags (symbol binary(64) as symbol)
as
select
_twstart as ts,
sum(case when dir =1 then vol else 0.0 end) as cumulative_bid_vol,
sum(case when dir =3 then vol else 0.0 end) as cumulative_ask_vol,
sum(case when dir =2 then vol else 0.0 end) as cumulative_mid_vol,
_twend as create_time
from %%tbname
where ts>= _twstart and ts <= _twend;
create stream stream_tick_to_klvol_s_01
interval(1s) sliding(1s)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_klvol_s_01
output_subtable(concat('t_klvol_s_01_', symbol))
tags (symbol binary(64) as symbol)
as
select
_twstart as ts,
min(a) as min_bid_vol,
max(a) as max_bid_vol,
last(a) as fin_bid_vol,
_twend as create_time
from (select
ts,
csum(vol * cast(2-dir as double)) as a
from %%tbname
where ts>= _twstart and ts <= _twend)b;
create stream stream_tick_to_kline_s_05
interval(5s) sliding(5s)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_kline_s_05
output_subtable(concat('t_kline_s_05_', symbol))
tags (symbol binary(64) as symbol)
as
select
_twstart as ts,
cast(max(close) as double) as h,
cast(min(close) as double) as l,
cast(first(close) as double) as o,
cast(last(close) as double) as c,
cast(sum(vol) as double) as vol,
cast(sum(vol*close) as double) as turn_over,
_twend as create_time
from %%tbname
where ts>= _twstart and ts < _twend;
create stream stream_tick_to_footprint_s_05
interval(5s) sliding(5s)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_footprint_s_05
output_subtable(concat('t_footprint_s_05_', symbol))
tags (symbol binary(64) as symbol)
as
select
first(ts) as ts,
close as price,
cast(sum(case when dir =1 then vol else 0.0 end) as double) as bid_vol,
cast(sum(case when dir =3 then vol else 0.0 end) as double) as ask_vol,
cast(sum(case when dir =2 then vol else 0.0 end) as double) as mid_vol,
_twend as create_time
from %%tbname
where ts>= _twstart and ts < _twend
group by close;
create stream stream_tick_to_kldir_s_05
interval(5s) sliding(5s)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_kldir_s_05
output_subtable(concat('t_kldir_s_05_', symbol))
tags (symbol binary(64) as symbol)
as
select
_twstart as ts,
sum(case when dir =1 then vol else 0.0 end) as cumulative_bid_vol,
sum(case when dir =3 then vol else 0.0 end) as cumulative_ask_vol,
sum(case when dir =2 then vol else 0.0 end) as cumulative_mid_vol,
_twend as create_time
from %%tbname
where ts>= _twstart and ts <= _twend;
create stream stream_tick_to_klvol_s_05
interval(5s) sliding(5s)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m))
into t_klvol_s_05
output_subtable(concat('t_klvol_s_05_', symbol))
tags (symbol binary(64) as symbol)
as
select
_twstart as ts,
min(a) as min_bid_vol,
max(a) as max_bid_vol,
last(a) as fin_bid_vol,
_twend as create_time
from (select
ts,
csum(vol * cast(2-dir as double)) as a
from %%tbname
where ts>= _twstart and ts <= _twend)b;
create stream stream_tick_to_kline_s_15
interval(15s) sliding(15s)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_kline_s_15
output_subtable(concat('t_kline_s_15_', symbol))
tags (symbol binary(64) as symbol)
as
select
_twstart as ts,
cast(max(close) as double) as h,
cast(min(close) as double) as l,
cast(first(close) as double) as o,
cast(last(close) as double) as c,
cast(sum(vol) as double) as vol,
cast(sum(vol*close) as double) as turn_over,
_twend as create_time
from %%tbname
where ts>= _twstart and ts < _twend;
create stream stream_tick_to_footprint_s_15
interval(15s) sliding(15s)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_footprint_s_15
output_subtable(concat('t_footprint_s_15_', symbol))
tags (symbol binary(64) as symbol)
as
select
first(ts) as ts,
close as price,
cast(sum(case when dir =1 then vol else 0.0 end) as double) as bid_vol,
cast(sum(case when dir =3 then vol else 0.0 end) as double) as ask_vol,
cast(sum(case when dir =2 then vol else 0.0 end) as double) as mid_vol,
_twend as create_time
from %%tbname
where ts>= _twstart and ts < _twend
group by close;
create stream stream_tick_to_kldir_s_15
interval(15s) sliding(15s)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_kldir_s_15
output_subtable(concat('t_kldir_s_15_', symbol))
tags (symbol binary(64) as symbol)
as
select
_twstart as ts,
sum(case when dir =1 then vol else 0.0 end) as cumulative_bid_vol,
sum(case when dir =3 then vol else 0.0 end) as cumulative_ask_vol,
sum(case when dir =2 then vol else 0.0 end) as cumulative_mid_vol,
_twend as create_time
from %%tbname
where ts>= _twstart and ts <= _twend;
create stream stream_tick_to_klvol_s_15
interval(15s) sliding(15s)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_klvol_s_15
output_subtable(concat('t_klvol_s_15_', symbol))
tags (symbol binary(64) as symbol)
as
select
_twstart as ts,
min(a) as min_bid_vol,
max(a) as max_bid_vol,
last(a) as fin_bid_vol,
_twend as create_time
from (select
ts,
csum(vol * cast(2-dir as double)) as a
from %%tbname
where ts>= _twstart and ts <= _twend)b;
create stream stream_tick_to_kline_m_01
interval(1m) sliding(1m)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_kline_m_01
output_subtable(concat('t_kline_m_01_', symbol))
tags (symbol binary(64) as symbol)
as
select
_twstart as ts,
cast(max(close) as double) as h,
cast(min(close) as double) as l,
cast(first(close) as double) as o,
cast(last(close) as double) as c,
cast(sum(vol) as double) as vol,
cast(sum(vol*close) as double) as turn_over,
_twend as create_time
from %%tbname
where ts>= _twstart and ts < _twend;
create stream stream_tick_to_footprint_m_01
interval(1m) sliding(1m)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_footprint_m_01
output_subtable(concat('t_footprint_m_01_', symbol))
tags (symbol binary(64) as symbol)
as
select
first(ts) as ts,
close as price,
cast(sum(case when dir =1 then vol else 0.0 end) as double) as bid_vol,
cast(sum(case when dir =3 then vol else 0.0 end) as double) as ask_vol,
cast(sum(case when dir =2 then vol else 0.0 end) as double) as mid_vol,
_twend as create_time
from %%tbname
where ts>= _twstart and ts < _twend
group by close;
create stream stream_tick_to_kldir_m_01
interval(1m) sliding(1m)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_kldir_m_01
output_subtable(concat('t_kldir_m_01_', symbol))
tags (symbol binary(64) as symbol)
as
select
_twstart as ts,
sum(case when dir =1 then vol else 0.0 end) as cumulative_bid_vol,
sum(case when dir =3 then vol else 0.0 end) as cumulative_ask_vol,
sum(case when dir =2 then vol else 0.0 end) as cumulative_mid_vol,
_twend as create_time
from %%tbname
where ts>= _twstart and ts <= _twend;
create stream stream_tick_to_klvol_m_01
interval(1m) sliding(1m)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_klvol_m_01
output_subtable(concat('t_klvol_m_01_', symbol))
tags (symbol binary(64) as symbol)
as
select
_twstart as ts,
min(a) as min_bid_vol,
max(a) as max_bid_vol,
last(a) as fin_bid_vol,
_twend as create_time
from (select
ts,
csum(vol * cast(2-dir as double)) as a
from %%tbname
where ts>= _twstart and ts <= _twend)b;
create stream stream_tick_to_kline_m_05
interval(5m) sliding(5m)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_kline_m_05
output_subtable(concat('t_kline_m_05_', symbol))
tags (symbol binary(64) as symbol)
as
select
_twstart as ts,
cast(max(close) as double) as h,
cast(min(close) as double) as l,
cast(first(close) as double) as o,
cast(last(close) as double) as c,
cast(sum(vol) as double) as vol,
cast(sum(vol*close) as double) as turn_over,
_twend as create_time
from %%tbname
where ts>= _twstart and ts < _twend;
create stream stream_tick_to_footprint_m_05
interval(5m) sliding(5m)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_footprint_m_05
output_subtable(concat('t_footprint_m_05_', symbol))
tags (symbol binary(64) as symbol)
as
select
first(ts) as ts,
close as price,
cast(sum(case when dir =1 then vol else 0.0 end) as double) as bid_vol,
cast(sum(case when dir =3 then vol else 0.0 end) as double) as ask_vol,
cast(sum(case when dir =2 then vol else 0.0 end) as double) as mid_vol,
_twend as create_time
from %%tbname
where ts>= _twstart and ts < _twend
group by close;
create stream stream_tick_to_kldir_m_05
interval(5m) sliding(5m)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_kldir_m_05
output_subtable(concat('t_kldir_m_05_', symbol))
tags (symbol binary(64) as symbol)
as
select
_twstart as ts,
sum(case when dir =1 then vol else 0.0 end) as cumulative_bid_vol,
sum(case when dir =3 then vol else 0.0 end) as cumulative_ask_vol,
sum(case when dir =2 then vol else 0.0 end) as cumulative_mid_vol,
_twend as create_time
from %%tbname
where ts>= _twstart and ts <= _twend;
create stream stream_tick_to_klvol_m_05
interval(5m) sliding(5m)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_klvol_m_05
output_subtable(concat('t_klvol_m_05_', symbol))
tags (symbol binary(64) as symbol)
as
select
_twstart as ts,
min(a) as min_bid_vol,
max(a) as max_bid_vol,
last(a) as fin_bid_vol,
_twend as create_time
from (select
ts,
csum(vol * cast(2-dir as double)) as a
from %%tbname
where ts>= _twstart and ts <= _twend)b;
create stream stream_tick_to_kline_m_15
interval(15m) sliding(15m)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_kline_m_15
output_subtable(concat('t_kline_m_15_', symbol))
tags (symbol binary(64) as symbol)
as
select
_twstart as ts,
cast(max(close) as double) as h,
cast(min(close) as double) as l,
cast(first(close) as double) as o,
cast(last(close) as double) as c,
cast(sum(vol) as double) as vol,
cast(sum(vol*close) as double) as turn_over,
_twend as create_time
from %%tbname
where ts>= _twstart and ts < _twend;
create stream stream_tick_to_footprint_m_15
interval(15m) sliding(15m)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_footprint_m_15
output_subtable(concat('t_footprint_m_15_', symbol))
tags (symbol binary(64) as symbol)
as
select
first(ts) as ts,
close as price,
cast(sum(case when dir =1 then vol else 0.0 end) as double) as bid_vol,
cast(sum(case when dir =3 then vol else 0.0 end) as double) as ask_vol,
cast(sum(case when dir =2 then vol else 0.0 end) as double) as mid_vol,
_twend as create_time
from %%tbname
where ts>= _twstart and ts < _twend
group by close;
create stream stream_tick_to_kldir_m_15
interval(15m) sliding(15m)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_kldir_m_15
output_subtable(concat('t_kldir_m_15_', symbol))
tags (symbol binary(64) as symbol)
as
select
_twstart as ts,
sum(case when dir =1 then vol else 0.0 end) as cumulative_bid_vol,
sum(case when dir =3 then vol else 0.0 end) as cumulative_ask_vol,
sum(case when dir =2 then vol else 0.0 end) as cumulative_mid_vol,
_twend as create_time
from %%tbname
where ts>= _twstart and ts <= _twend;
create stream stream_tick_to_klvol_m_15
interval(15m) sliding(15m)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_klvol_m_15
output_subtable(concat('t_klvol_m_15_', symbol))
tags (symbol binary(64) as symbol)
as
select
_twstart as ts,
min(a) as min_bid_vol,
max(a) as max_bid_vol,
last(a) as fin_bid_vol,
_twend as create_time
from (select
ts,
csum(vol * cast(2-dir as double)) as a
from %%tbname
where ts>= _twstart and ts <= _twend)b;
create stream stream_tick_to_kline_m_30
interval(30m) sliding(30m)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_kline_m_30
output_subtable(concat('t_kline_m_30_', symbol))
tags (symbol binary(64) as symbol)
as
select
_twstart as ts,
cast(max(close) as double) as h,
cast(min(close) as double) as l,
cast(first(close) as double) as o,
cast(last(close) as double) as c,
cast(sum(vol) as double) as vol,
cast(sum(vol*close) as double) as turn_over,
_twend as create_time
from %%tbname
where ts>= _twstart and ts < _twend;
create stream stream_tick_to_footprint_m_30
interval(30m) sliding(30m)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_footprint_m_30
output_subtable(concat('t_footprint_m_30_', symbol))
tags (symbol binary(64) as symbol)
as
select
first(ts) as ts,
close as price,
cast(sum(case when dir =1 then vol else 0.0 end) as double) as bid_vol,
cast(sum(case when dir =3 then vol else 0.0 end) as double) as ask_vol,
cast(sum(case when dir =2 then vol else 0.0 end) as double) as mid_vol,
_twend as create_time
from %%tbname
where ts>= _twstart and ts < _twend
group by close;
create stream stream_tick_to_kldir_m_30
interval(30m) sliding(30m)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_kldir_m_30
output_subtable(concat('t_kldir_m_30_', symbol))
tags (symbol binary(64) as symbol)
as
select
_twstart as ts,
sum(case when dir =1 then vol else 0.0 end) as cumulative_bid_vol,
sum(case when dir =3 then vol else 0.0 end) as cumulative_ask_vol,
sum(case when dir =2 then vol else 0.0 end) as cumulative_mid_vol,
_twend as create_time
from %%tbname
where ts>= _twstart and ts <= _twend;
create stream stream_tick_to_klvol_m_30
interval(30m) sliding(30m)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_klvol_m_30
output_subtable(concat('t_klvol_m_30_', symbol))
tags (symbol binary(64) as symbol)
as
select
_twstart as ts,
min(a) as min_bid_vol,
max(a) as max_bid_vol,
last(a) as fin_bid_vol,
_twend as create_time
from (select
ts,
csum(vol * cast(2-dir as double)) as a
from %%tbname
where ts>= _twstart and ts <= _twend)b;
create stream stream_tick_to_kline_h_01
interval(1h) sliding(1h)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_kline_h_01
output_subtable(concat('t_kline_h_01_', symbol))
tags (symbol binary(64) as symbol)
as
select
_twstart as ts,
cast(max(close) as double) as h,
cast(min(close) as double) as l,
cast(first(close) as double) as o,
cast(last(close) as double) as c,
cast(sum(vol) as double) as vol,
cast(sum(vol*close) as double) as turn_over,
_twend as create_time
from %%tbname
where ts>= _twstart and ts < _twend;
create stream stream_tick_to_footprint_h_01
interval(1h) sliding(1h)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_footprint_h_01
output_subtable(concat('t_footprint_h_01_', symbol))
tags (symbol binary(64) as symbol)
as
select
first(ts) as ts,
close as price,
cast(sum(case when dir =1 then vol else 0.0 end) as double) as bid_vol,
cast(sum(case when dir =3 then vol else 0.0 end) as double) as ask_vol,
cast(sum(case when dir =2 then vol else 0.0 end) as double) as mid_vol,
_twend as create_time
from %%tbname
where ts>= _twstart and ts < _twend
group by close;
create stream stream_tick_to_kldir_h_01
interval(1h) sliding(1h)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_kldir_h_01
output_subtable(concat('t_kldir_h_01_', symbol))
tags (symbol binary(64) as symbol)
as
select
_twstart as ts,
sum(case when dir =1 then vol else 0.0 end) as cumulative_bid_vol,
sum(case when dir =3 then vol else 0.0 end) as cumulative_ask_vol,
sum(case when dir =2 then vol else 0.0 end) as cumulative_mid_vol,
_twend as create_time
from %%tbname
where ts>= _twstart and ts <= _twend;
create stream stream_tick_to_klvol_h_01
interval(1h) sliding(1h)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_klvol_h_01
output_subtable(concat('t_klvol_h_01_', symbol))
tags (symbol binary(64) as symbol)
as
select
_twstart as ts,
min(a) as min_bid_vol,
max(a) as max_bid_vol,
last(a) as fin_bid_vol,
_twend as create_time
from (select
ts,
csum(vol * cast(2-dir as double)) as a
from %%tbname
where ts>= _twstart and ts <= _twend)b;
create stream stream_tick_to_kline_h_04
interval(4h) sliding(4h)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_kline_h_04
output_subtable(concat('t_kline_h_04_', symbol))
tags (symbol binary(64) as symbol)
as
select
_twstart as ts,
cast(max(close) as double) as h,
cast(min(close) as double) as l,
cast(first(close) as double) as o,
cast(last(close) as double) as c,
cast(sum(vol) as double) as vol,
cast(sum(vol*close) as double) as turn_over,
_twend as create_time
from %%tbname
where ts>= _twstart and ts < _twend;
create stream stream_tick_to_footprint_h_04
interval(4h) sliding(4h)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_footprint_h_04
output_subtable(concat('t_footprint_h_04_', symbol))
tags (symbol binary(64) as symbol)
as
select
first(ts) as ts,
close as price,
cast(sum(case when dir =1 then vol else 0.0 end) as double) as bid_vol,
cast(sum(case when dir =3 then vol else 0.0 end) as double) as ask_vol,
cast(sum(case when dir =2 then vol else 0.0 end) as double) as mid_vol,
_twend as create_time
from %%tbname
where ts>= _twstart and ts < _twend
group by close;
create stream stream_tick_to_kldir_h_04
interval(4h) sliding(4h)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_kldir_h_04
output_subtable(concat('t_kldir_h_04_', symbol))
tags (symbol binary(64) as symbol)
as
select
_twstart as ts,
sum(case when dir =1 then vol else 0.0 end) as cumulative_bid_vol,
sum(case when dir =3 then vol else 0.0 end) as cumulative_ask_vol,
sum(case when dir =2 then vol else 0.0 end) as cumulative_mid_vol,
_twend as create_time
from %%tbname
where ts>= _twstart and ts <= _twend;
create stream stream_tick_to_klvol_h_04
interval(4h) sliding(4h)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_klvol_h_04
output_subtable(concat('t_klvol_h_04_', symbol))
tags (symbol binary(64) as symbol)
as
select
_twstart as ts,
min(a) as min_bid_vol,
max(a) as max_bid_vol,
last(a) as fin_bid_vol,
_twend as create_time
from (select
ts,
csum(vol * cast(2-dir as double)) as a
from %%tbname
where ts>= _twstart and ts <= _twend)b;
create stream stream_tick_to_kline_d_01
interval(1d) sliding(1d)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_kline_d_01
output_subtable(concat('t_kline_d_01_', symbol))
tags (symbol binary(64) as symbol)
as
select
_twstart as ts,
cast(max(close) as double) as h,
cast(min(close) as double) as l,
cast(first(close) as double) as o,
cast(last(close) as double) as c,
cast(sum(vol) as double) as vol,
cast(sum(vol*close) as double) as turn_over,
_twend as create_time
from %%tbname
where ts>= _twstart and ts < _twend;
create stream stream_tick_to_footprint_d_01
interval(1d) sliding(1d)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_footprint_d_01
output_subtable(concat('t_footprint_d_01_', symbol))
tags (symbol binary(64) as symbol)
as
select
first(ts) as ts,
close as price,
cast(sum(case when dir =1 then vol else 0.0 end) as double) as bid_vol,
cast(sum(case when dir =3 then vol else 0.0 end) as double) as ask_vol,
cast(sum(case when dir =2 then vol else 0.0 end) as double) as mid_vol,
_twend as create_time
from %%tbname
where ts>= _twstart and ts < _twend
group by close;
create stream stream_tick_to_kldir_d_01
interval(1d) sliding(1d)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_kldir_d_01
output_subtable(concat('t_kldir_d_01_', symbol))
tags (symbol binary(64) as symbol)
as
select
_twstart as ts,
sum(case when dir =1 then vol else 0.0 end) as cumulative_bid_vol,
sum(case when dir =3 then vol else 0.0 end) as cumulative_ask_vol,
sum(case when dir =2 then vol else 0.0 end) as cumulative_mid_vol,
_twend as create_time
from %%tbname
where ts>= _twstart and ts <= _twend;
create stream stream_tick_to_klvol_d_01
interval(1d) sliding(1d)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_klvol_d_01
output_subtable(concat('t_klvol_d_01_', symbol))
tags (symbol binary(64) as symbol)
as
select
_twstart as ts,
min(a) as min_bid_vol,
max(a) as max_bid_vol,
last(a) as fin_bid_vol,
_twend as create_time
from (select
ts,
csum(vol * cast(2-dir as double)) as a
from %%tbname
where ts>= _twstart and ts <= _twend)b;
create stream stream_tick_to_kline_w_01
interval(1w) sliding(1w)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_kline_w_01
output_subtable(concat('t_kline_w_01_', symbol))
tags (symbol binary(64) as symbol)
as
select
_twstart as ts,
cast(max(close) as double) as h,
cast(min(close) as double) as l,
cast(first(close) as double) as o,
cast(last(close) as double) as c,
cast(sum(vol) as double) as vol,
cast(sum(vol*close) as double) as turn_over,
_twend as create_time
from %%tbname
where ts>= _twstart and ts < _twend;
create stream stream_tick_to_footprint_w_01
interval(1w) sliding(1w)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_footprint_w_01
output_subtable(concat('t_footprint_w_01_', symbol))
tags (symbol binary(64) as symbol)
as
select
first(ts) as ts,
close as price,
cast(sum(case when dir =1 then vol else 0.0 end) as double) as bid_vol,
cast(sum(case when dir =3 then vol else 0.0 end) as double) as ask_vol,
cast(sum(case when dir =2 then vol else 0.0 end) as double) as mid_vol,
_twend as create_time
from %%tbname
where ts>= _twstart and ts < _twend
group by close;
create stream stream_tick_to_kldir_w_01
interval(1w) sliding(1w)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_kldir_w_01
output_subtable(concat('t_kldir_w_01_', symbol))
tags (symbol binary(64) as symbol)
as
select
_twstart as ts,
sum(case when dir =1 then vol else 0.0 end) as cumulative_bid_vol,
sum(case when dir =3 then vol else 0.0 end) as cumulative_ask_vol,
sum(case when dir =2 then vol else 0.0 end) as cumulative_mid_vol,
_twend as create_time
from %%tbname
where ts>= _twstart and ts <= _twend;
create stream stream_tick_to_klvol_w_01
interval(1w) sliding(1w)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m)| watermark(10s))
into t_klvol_w_01
output_subtable(concat('t_klvol_w_01_', symbol))
tags (symbol binary(64) as symbol)
as
select
_twstart as ts,
min(a) as min_bid_vol,
max(a) as max_bid_vol,
last(a) as fin_bid_vol,
_twend as create_time
from (select
ts,
csum(vol * cast(2-dir as double)) as a
from %%tbname
where ts>= _twstart and ts <= _twend)b;
删除
drop stream if exists stream_tick_to_klvol_w_01;
drop stream if exists stream_tick_to_kldir_w_01;
drop stream if exists stream_tick_to_footprint_w_01;
drop stream if exists stream_tick_to_kline_w_01;
drop stream if exists stream_tick_to_klvol_d_01;
drop stream if exists stream_tick_to_kldir_d_01;
drop stream if exists stream_tick_to_footprint_d_01;
drop stream if exists stream_tick_to_kline_d_01;
drop stream if exists stream_tick_to_klvol_h_04;
drop stream if exists stream_tick_to_kldir_h_04;
drop stream if exists stream_tick_to_footprint_h_04;
drop stream if exists stream_tick_to_kline_h_04;
drop stream if exists stream_tick_to_klvol_h_01;
drop stream if exists stream_tick_to_kldir_h_01;
drop stream if exists stream_tick_to_footprint_h_01;
drop stream if exists stream_tick_to_kline_h_01;
drop stream if exists stream_tick_to_klvol_m_30;
drop stream if exists stream_tick_to_kldir_m_30;
drop stream if exists stream_tick_to_footprint_m_30;
drop stream if exists stream_tick_to_kline_m_30;
drop stream if exists stream_tick_to_klvol_m_15;
drop stream if exists stream_tick_to_kldir_m_15;
drop stream if exists stream_tick_to_footprint_m_15;
drop stream if exists stream_tick_to_kline_m_15;
drop stream if exists stream_tick_to_klvol_m_05;
drop stream if exists stream_tick_to_kldir_m_05;
drop stream if exists stream_tick_to_footprint_m_05;
drop stream if exists stream_tick_to_kline_m_05;
drop stream if exists stream_tick_to_klvol_m_01;
drop stream if exists stream_tick_to_kldir_m_01;
drop stream if exists stream_tick_to_footprint_m_01;
drop stream if exists stream_tick_to_kline_m_01;
drop stream if exists stream_tick_to_klvol_s_15;
drop stream if exists stream_tick_to_kldir_s_15;
drop stream if exists stream_tick_to_footprint_s_15;
drop stream if exists stream_tick_to_kline_s_15;
drop stream if exists stream_tick_to_klvol_s_05;
drop stream if exists stream_tick_to_kldir_s_05;
drop stream if exists stream_tick_to_footprint_s_05;
drop stream if exists stream_tick_to_kline_s_05;
drop stream if exists stream_tick_to_klvol_s_01;
drop stream if exists stream_tick_to_kldir_s_01;
drop stream if exists stream_tick_to_footprint_s_01;
drop stream if exists stream_tick_to_kline_s_01;
查看流在节点的分布
select * from information_schema.ins_streams;写入tick
从指定的kafka写入tick。观察stream的运行。
存量
获得最大吞吐量
增量
查看持续运行稳定性
查询
查询品类列表含最新tick
select symbol,last(ts) as lts,last(close) as lclose,last(dir) as ldir from t_tick group by symbol order by lclose desc limit 0,10;场景
行情这类基于tick生成kline、footprint、vol、dir后,可通过如下语句查询。
查看总共多少个品类
tick写入时,按照品类做了子表,查询meta数据可获得品类数量。
-- 导出品类列表到特定文件
select replace(table_name,'t_tick_',''),* from information_schema.ins_tables where stable_name ="t_tick" order by table_name >> /home/gauss/stat/symbol.csv;
-- 直接获得品类也行
select replace(table_name,'t_tick_','') from information_schema.ins_tables where stable_name ="t_tick" order by table_name >> /home/gauss/stat/symbol.txt;
品类最新价格与时间
taos> select distinct(symbol) as symbol,last(close) as lclose,last(ts) as lts from t_tick group by symbol order by lclose >> /tmp/symbol_last_price_by_p
rice.txt;
Query OK, 34263 row(s) in set (5.670962s)
taos> select distinct(symbol) as symbol,last(close) as lclose,last(ts) as lts from t_tick group by symbol order by lts >> /tmp/symbol_last_price_by_ts.t
Query OK, 34263 row(s) in set (5.875431s)
taos> select distinct(symbol) as symbol,last(close) as lclose,last(ts) as lts from t_tick group by symbol order by symbol >> /tmp/symbol_last_price_by_symbol.txt;
Query OK, 34263 row(s) in set (8.643509s)足迹图
-- d
select * from t_footprint_d_01 where symbol = "1100_601398" and TIMETRUNCATE(ts,1d) = "2026-05-14 00:00:00";
-- h
select * from t_footprint_h_01 where symbol = "1100_601398" and TIMETRUNCATE(ts,1h) = "2026-05-14 09:00:00";
-- m
select * from t_footprint_m_01 where symbol = "1100_601398" and TIMETRUNCATE(ts,1m) = "2026-05-14 09:40:00";
-- s
select * from t_footprint_s_01 where symbol = "1100_601398" and TIMETRUNCATE(ts,1s) = "2026-05-14 09:40:00";
-- 5
select first(ts),symbol,price,sum(bid_vol),sum(ask_vol),sum(mid_vol),first(create_time) from t_footprint_m_01 where symbol = "1100_601398" and TIMETRUNCATE(ts,1m)>= "2026-05-14 09:40:00" and TIMETRUNCATE(ts,1m)< "2026-05-14 09:45:00" group by symbol,price order by price;
-- 10
select first(ts),symbol,price,sum(bid_vol),sum(ask_vol),sum(mid_vol),first(create_time) from t_footprint_m_01 where symbol = "1100_601398" and TIMETRUNCATE(ts,1m)>= "2026-05-14 09:40:00" and TIMETRUNCATE(ts,1m)< "2026-05-14 09:50:00" group by symbol,price order by price;
-- 15
select first(ts),symbol,price,sum(bid_vol),sum(ask_vol),sum(mid_vol),first(create_time) from t_footprint_m_01 where symbol = "1100_601398" and TIMETRUNCATE(ts,1m)>= "2026-05-14 09:45:00" and TIMETRUNCATE(ts,1m)< "2026-05-14 10:00:00" group by symbol,price order by pric
e;
-- 30
select first(ts),symbol,price,sum(bid_vol),sum(ask_vol),sum(mid_vol),first(create_time) from t_footprint_m_01 where symbol = "1100_601398" and TIMETRUNCATE(ts,1m)>= "2026-05-14 09:45:00" and TIMETRUNCATE(ts,1m)< "2026-05-14 10:00:00" group by symbol,price order by price;K线
select * from t_kline_d_01 where symbol = "1100_601398" and TIMETRUNCATE(ts,1d) = "2026-05-14 00:00:00";
select * from t_kline_h_01 where symbol = "1100_601398" and TIMETRUNCATE(ts,1h) = "2026-05-14 09:00:00";
select * from t_kline_m_01 where symbol = "1100_601398" and TIMETRUNCATE(ts,1m) = "2026-05-14 09:40:00";
select * from t_kline_s_01 where symbol = "1100_601398" and TIMETRUNCATE(ts,1s) = "2026-05-14 09:40:00";成交方向
select * from t_kldir_d_01 where symbol = "1100_601398" and TIMETRUNCATE(ts,1d) = "2026-05-14 00:00:00";
select * from t_kldir_h_01 where symbol = "1100_601398" and TIMETRUNCATE(ts,1h) = "2026-05-14 09:00:00";
select * from t_kldir_m_01 where symbol = "1100_601398" and TIMETRUNCATE(ts,1m) = "2026-05-14 09:40:00";
select * from t_kldir_s_01 where symbol = "1100_601398" and TIMETRUNCATE(ts,1s) = "2026-05-14 09:40:00";买盘量k线
select * from t_klvol_d_01 where symbol = "1100_601398" and TIMETRUNCATE(ts,1d) = "2026-05-14 00:00:00";
select * from t_klvol_h_01 where symbol = "1100_601398" and TIMETRUNCATE(ts,1h) = "2026-05-14 09:00:00";
select * from t_klvol_m_01 where symbol = "1100_601398" and TIMETRUNCATE(ts,1m) = "2026-05-14 09:40:00";
select * from t_klvol_s_01 where symbol = "1100_601398" and TIMETRUNCATE(ts,1s) = "2026-05-14 09:40:00";
-- 5
select first(ts),min(min_bid_vol),max(max_bid_vol),sum(fin_bid_vol) from t_klvol_s_01 where symbol = "1100_601398" and TIMETRUNCATE(ts,1s) >= "2026-05-14 09:40:00" and TIMETRUNCATE(ts,1s) < "2026-05-14 09:45:00" group by symbol ;
-- 30
select first(ts),min(min_bid_vol),max(max_bid_vol),sum(fin_bid_vol) from t_klvol_s_01 where symbol = "1100_601398" and TIMETRUNCATE(ts,1s) >= "2026-05-14 09:30:00" and TIMETRUNCATE(ts,1s) < "2026-05-14 10:00:00" group by symbol ;
集群
taosd社区版本支持计算复制集、计算、查询分离,以合理分担压力。可使用三台机器形成数据复制实例,并将流计算放在一个节点,查询放在一个节点,主管理在一个节点。可如下配置/etc/taos/taos.cfg
hosts
每一个节点fqdn就是主机名,并需要在/etc/hosts配置,确保网络是通的。
192.168.7.111 snode
192.168.7.186 mnode
192.168.7.87 qnode
mnode
# The end point of the first dnode in the cluster to be connected to when this dnode or the CLI utility is started
firstEp mnode:6030
fqdn mnode
serverPort 6030
supportVnodes 1
supportSnodes 0
supportQnodes 0默认firstEp就是mnode,taos命令增加mnodes时,所在节点可能会有error,时正常的提示,meta数据会从mnode同步。
-- 创建mnode要求基于dnode来,详细见dnode章节
create mnode on dnode 2;
create mnode on dnode 3;dnode
create dnode "192.168.7.87";
create dnode "192.168.7.111";
show mnodes;
show dnodes; -- 可以获得id,用来创建snodes,流计算需要snode,如果有多个snode,可以更可靠snode
firstEp mnode:6030
fqdn snode
serverPort 6030
supportVnodes 256
supportQnodes 1
supportSnodes 1
qnode
firstEp mnode:6030
fqdn qnode
serverPort 6030
supportVnodes 256
supportQnodes 1
supportSnodes 0
create snode on dnode 1;
create snode on dnode 3;
create snode on dnode 4;
taos> show dnodes;
id | endpoint | vnodes | support_vnodes | status | create_time | reboot_time | note |
=============================================================================================================================================================================
1 | tdengine-0.186:6030 | 3 | 29 | ready | 2026-04-24 18:16:11.228 | 2026-05-06 17:51:10.269 | |
3 | ct4dev:6030 | 2 | 256 | ready | 2026-05-06 16:35:02.383 | 2026-05-07 13:40:44.757 | |
4 | tdengine-7.87:6030 | 2 | 256 | ready | 2026-05-06 17:19:23.917 | 2026-05-06 17:51:46.640 | |
Query OK, 3 row(s) in set (0.005232s)
taos> show snodes;
id | endpoint | create_time | replicaId | asReplicaOf |
========================================================================================================================
1 | tdengine-0.186:6030 | 2026-05-06 18:12:28.481 | 3 | 3, 4 |
3 | ct4dev:6030 | 2026-05-06 16:54:59.151 | 1 | 1 |
4 | tdengine-7.87:6030 | 2026-05-06 18:12:30.914 | 1 | None |
Query OK, 3 row(s) in set (0.005251s)
订阅
创建主题
create topic `tick2` as select * from `db_tick`.`t_tick`可获得这个链接: tmq+ws://gauss:Gauss20250922%40@192.168.7.111:6041/tick2
消费数据
consumer>tree
.
├── pom.xml
├── src
│ ├── main
│ │ └── java
│ │ └── com
│ │ └── taos
│ │ └── App.java
│ └── test
│ └── java
│ └── com
│ └── taos
│ └── AppTest.java
// export TDENGINE_JDBC_URL="jdbc:TAOS-RS://192.168.7.186:6041/rest/tmq?user=root&password=A7fK9xQ2LmR8tY4Z"
package com.taos;
import com.taosdata.jdbc.tmq.ConsumerRecord;
import com.taosdata.jdbc.tmq.ConsumerRecords;
import com.taosdata.jdbc.tmq.TMQConstants;
import com.taosdata.jdbc.tmq.TaosConsumer;
import java.sql.SQLException;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
public class App {
public static void main(String[] args) throws SQLException {
String url = System.getenv("TDENGINE_JDBC_URL");
Properties properties = new Properties();
properties.setProperty(TMQConstants.CONNECT_TYPE, "ws");
properties.setProperty(TMQConstants.CONNECT_URL, url);
properties.setProperty(TMQConstants.CONNECT_TIMEOUT, "10000");
properties.setProperty(TMQConstants.CONNECT_MESSAGE_TIMEOUT, "10000");
properties.setProperty(TMQConstants.MSG_WITH_TABLE_NAME, "true");
properties.setProperty(TMQConstants.ENABLE_AUTO_COMMIT, "true");
properties.setProperty(TMQConstants.GROUP_ID, "gId");
properties.setProperty(TMQConstants.VALUE_DESERIALIZER, "com.taosdata.jdbc.tmq.MapDeserializer");
try (TaosConsumer<Map<String, Object>> consumer = new TaosConsumer<>(properties)) {
consumer.subscribe(Collections.singletonList("tick2"));
// 无限循环消费
while (true) {
try {
// 核心:poll 套一层 try-catch,忽略末尾的信号错误
ConsumerRecords<Map<String, Object>> consumerRecords = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<Map<String, Object>> r : consumerRecords) {
Map<String, Object> bean = r.value();
bean.forEach((k, v) -> {
if ("symbol".equals(k) && v instanceof byte[]) {
System.out.print(k + " : " + new String((byte[]) v) + " ");
} else {
System.out.print(k + " : " + v + " ");
}
});
System.out.println();
}
} catch (Exception e) {
String msg = e.getMessage();
// ✅ 忽略这两种常见“非错误”:
// 1. 消息类型不匹配
// 2. 读取超时(正常无数据)
if (msg.contains("message type is not data")
|| msg.contains("Fetch data timeout")) {
try { Thread.sleep(200); } catch (InterruptedException ignored) {}
continue;
}
// 其他错误才打印并退出
e.printStackTrace();
break;
}
}
}
}
}
package com.taos;
import junit.framework.Test;
import junit.framework.TestCase;
import junit.framework.TestSuite;
/**
* Unit test for simple App.
*/
public class AppTest
extends TestCase
{
/**
* Create the test case
*
* @param testName name of the test case
*/
public AppTest( String testName )
{
super( testName );
}
/**
* @return the suite of tests being tested
*/
public static Test suite()
{
return new TestSuite( AppTest.class );
}
/**
* Rigourous Test :-)
*/
public void testApp()
{
assertTrue( true );
}
}
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.taos</groupId>
<artifactId>consumer</artifactId>
<packaging>jar</packaging>
<version>1.0.0</version>
<name>consumer</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>3.2.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.3.0</version>
<configuration>
<archive>
<manifest>
<mainClass>com.taos.App</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
异常
限定资源上限
TDengien如果不限定资源上线,一些节点会被拉死,导致无法工作之外,连登录都无法做到了。可通过Linux的Service进行资源上限控制。
# cat /etc/systemd/system/taosd.service
[Unit]
Description=TDengine server service
After=network-online.target
Wants=network-online.target
[Service]
CPUQuota=800%
MemoryMax=8G
MemoryHigh=6G
Type=simple
ExecStart=/usr/bin/taosd
ExecStartPre=/usr/local/taos/bin/startPre.sh
TimeoutStopSec=1000000s
LimitNOFILE=infinity
LimitNPROC=infinity
LimitCORE=infinity
TimeoutStartSec=0
StandardOutput=null
Restart=always
StartLimitBurst=3
StartLimitInterval=900s
[Install]
WantedBy=multi-user.target
杀死进程
kill -9 taosd后,taosd可能存在坏掉的wal,而且taos无法自行恢复。这应该是一个重大bug,证明其wal机制存在瑕疵。
删除数据
如果删除data目录数据,重启节点,这个节点将如同新启动的节点一样开始工作。如果这是一台firstEP,那就是新的,否则将从firstEP所在的集群同步数据。
rm -r -f $data,如果发生在单个mnode集群的mnode上,如果重启此节点,其他节点会即可同步数据,导致数据丢失无法找回。
如果不重启,mnode会一直报错,但是也只是报错,没有任何有意义的提示。
猜测应该是mnode保存了schema,schema丢失,其他数据也无法访问。
如果配置多个mnode,不确定删除一个,能否从另外的同步过来。这里面估计还要区分是否firstEP。待测试验证。
表名
流计算通常使用tag来作为表明,tag却不能包含特殊字符如.
SELECT DISTINCT symbol FROM t_tick WHERE symbol REGEXP '[^a-zA-Z0-9_]';
-- 创建的流可将symbol的特定字符替换,以满足表名命名规则
create stream stream_tick_to_kline_m_01
interval(1m) sliding(1m)
from t_tick
partition by tbname,symbol
stream_options(ignore_nodata_trigger | max_delay(1m))
into t_kline_m_01
output_subtable(concat('t_kline_m_01_', replace(symbol, '.', '_')))
tags (symbol binary(64) as symbol)
as
select
_twstart as ts,
cast(max(close) as double) as h,
cast(min(close) as double) as l,
cast(first(close) as double) as o,
cast(last(close) as double) as c,
cast(sum(vol) as double) as vol,
cast(sum(vol*close) as double) as turn_over,
_twend as create_time
from %%tbname
where ts>= _twstart and ts < _twend;文件数量
在本文描述场景中,tdengine将打开近7000个文件。如果文件句柄不够,将造成流无法创建新的 数据,甚至taosd因为无法打开wal文件而崩溃。
另外当taos受制于文件句柄,内存时,将吃光系统内存,导致无法登录系统。可通过服务限制其最高cpu和内存,确保服务器不崩溃。
#/etc/sysctl.conf
sudo sysctl -p
kernel.core_pattern = core.%e.%p.%t
vm.swappiness = 0
vm.vfs_cache_pressure = 50
vm.dirty_ratio = 20
vm.dirty_background_ratio = 10
vm.dirty_expire_centisecs = 2000
# 如下两行
fs.nr_open = 1048576
fs.file-max = 2097152
#cat /etc/systemd/system/taosd.service
[Unit]
Description=TDengine server service
After=network-online.target
Wants=network-online.target
[Service]
# CPU 限制:200% 表示最多使用 2 个完整的 CPU 核心
CPUQuota=200%
# 内存限制:8GB 硬上限
MemoryMax=8G
# 内存限制:6GB 软上限
MemoryHigh=6G
Type=simple
ExecStart=/usr/bin/taosd
ExecStartPre=/usr/local/taos/bin/startPre.sh
TimeoutStopSec=1000000s
LimitNOFILE=infinity
LimitNPROC=infinity
LimitCORE=infinity
TimeoutStartSec=0
StandardOutput=null
Restart=always
StartLimitBurst=3
StartLimitInterval=900s
[Install]
WantedBy=multi-user.target
数据迁移
运行中的数据库,会将节点配置信息写入数据目录的文件中,导致taos.cfg配置无效。比如之前使用ip-port启动的服务,如果直接拷贝数据目录到其他地方,将还是只能从原先设定的ip-port启动。例如:
cat $data/dnode/dnode.json
{
"dnodeId": 3,
"dnodeVer": "8",
"engineVer": "30000000",
"clusterId": "4482909662002860117",
"dropped": 0,
"dnodes": [{
"id": 1,
"fqdn": "192.168.7.186",
"port": 6030,
"isMnode": 1
}, {
"id": 2,
"fqdn": "192.168.7.111",
"port": 6030,
"isMnode": 1
}, {
"id": 3,
"fqdn": "192.168.7.87",
"port": 6030,
"isMnode": 1
}]
}
cat $dat/snode/snode3/snode.json
{
"deployed": 1,
"snodeId": "3",
"leader0": {
"nodeId": "0",
"inUse": "0",
"numOfEps": "0"
},
"leader1": {
"nodeId": "0",
"inUse": "0",
"numOfEps": "0"
},
"replica": {
"nodeId": "1",
"inUse": "0",
"numOfEps": "1",
"eps": [{
"fqdn": "192.168.7.186",
"port": "6030"
}]
}
}
cat $dat/mnode/mnode.json
{
"lastIndex": 142,
"deployed": 1,
"version": 1,
"encrypted": 0
}