安装
下载
#直接使用构建好的包开始
wget https://archive.apache.org/dist/flink/flink-1.18.1/flink-1.18.1-bin-scala_2.12.tgz
#基于源码则从这里: 一、克隆后切换
# 1. 克隆 Flink 源码
git clone https://github.com/apache/flink.git
cd flink
# 2. 拉取所有标签 & 远程分支
git fetch --tags
# 3. 切换到指定 tag: release-2.2.1
git checkout release-2.2.1
# 查看当前版本确认
git status
git describe --tags
# 二、只拉取指定tag
git clone --depth 1 --branch release-2.2.1 https://github.com/apache/flink.git flink-2.2.1
cd flink-2.2.1编译环境
使用国内镜像
mkdir -p .mvn
vim .mvn/settings.xml
<?xml version="1.0" encoding="UTF-8"?>
<settings xmlns="http://maven.apache.org/SETTINGS/1.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/SETTINGS/1.0.0 http://maven.apache.org/xsd/settings-1.0.0.xsd">
<localRepository>${user.home}/.m2/repository</localRepository>
<mirrors>
<!-- 阿里云中央仓库,拦截所有请求 -->
<mirror>
<id>aliyunmaven</id>
<name>阿里云公共仓库</name>
<url>https://maven.aliyun.com/repository/public</url>
<mirrorOf>*,!apache-snapshots</mirrorOf>
</mirror>
<!-- Apache 快照源(仅快照包使用) -->
<mirror>
<id>apache-snapshots</id>
<name>Apache Snapshots</name>
<url>https://repo.apache.org/snapshots/</url>
<mirrorOf>apache-snapshots</mirrorOf>
</mirror>
</mirrors>
<!-- 可选:统一仓库配置,强制不走红帽源 -->
<profiles>
<profile>
<id>aliyun</id>
<repositories>
<repository>
<id>central</id>
<url>https://maven.aliyun.com/repository/public</url>
<releases><enabled>true</enabled></releases>
<snapshots><enabled>false</enabled></snapshots>
</repository>
<repository>
<id>redhat-ga</id>
<url>https://maven.aliyun.com/repository/public</url>
<releases><enabled>true</enabled></releases>
<snapshots><enabled>false</enabled></snapshots>
</repository>
</repositories>
</profile>
</profiles>
<activeProfiles>
<activeProfile>aliyun</activeProfile>
</activeProfiles>
</settings>Java版本
# 用21构建有错误,切换为17
$ sdk use java 17.0.19-amzn
$ sdk use maven 3.9.14构建
./mvnw clean package -DskipTests -Djdk17 -Pjava17-target运行
构建成功后,可看到如下目录内容,这里就是二进制发行包。
$flink/flink-dist/target/flink-2.2.1-bin/flink-2.2.1$ tree -L 1
.
├── LICENSE
├── README.txt
├── bin
├── conf
├── examples
├── lib
├── log
├── opt
└── plugins
7 directories, 2 files
# 使用如下命令启动flink
启动集群
# 修改服务端口
vi flink-dist/target/flink-2.2.1-bin/flink-2.2.1/conf/config.yaml
#==============================================================================
# Rest & web frontend
#==============================================================================
rest:
# The address to which the REST client will connect to
address: localhost
# The address that the REST & web server binds to
# By default, this is localhost, which prevents the REST & web server from
# being able to communicate outside of the machine/container it is running on.
#
# To enable this, set the bind address to one that has access to outside-facing
# network interface, such as 0.0.0.0.
# bind-address: localhost
# # The port to which the REST client connects to. If rest.bind-port has
# # not been specified, then the server will bind to this port as well.
port: 7081
# # Port range for the REST and web server to bind to.
# bind-port: 8080-8090
$./bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host pc-bdm.
Starting taskexecutor daemon on host pc-bdm.
#然后可访问页面 http://localhost:7081
启动示例作业
$ ./bin/flink run examples/streaming/WordCount.jar --input README.txt --output /tmp/wordcount-result
Job has been submitted with JobID 729e5212662f9028c7e318765cdcde9b
Program execution finished
Job with JobID 729e5212662f9028c7e318765cdcde9b has finished.
Job Runtime: 1898 ms
# 然后可看到如下输出
$ cat /tmp/wordcount-result/2026-05-28--14/part-e5ae6dc8-24d3-4d01-9ec0-e42ab6f0467e-0
(for,1)
(the,1)
(latest,1)
(information,1)
(about,1)
(flink,1)
......停止集群
./bin/stop-cluster.sh功能测试
MySQL->PostgreSQL的CDC同步
需要构建连接器,然后执行Flink的脚本。
PostgreSQL
# 不同版本的源码: https://ftp.postgresql.org/pub/source/
wget https://ftp.postgresql.org/pub/source/v18.1/postgresql-18.1.tar.bz2
tar xjvf postgresql-18.1.tar.bz2
sudo apt install meson
# 支持systemd
sudo apt update
sudo apt install -y libzstd-dev libsystemd-dev
sudo apt install -y uuid-dev
sudo apt install -y liblz4-dev
# 使用meson构建ninja脚本
meson setup build --prefix=/usr/local/pgsql -Dssl=openssl -Dreadline=enabled -Dlibxml=enabled -Dicu=enabled
# 使用ninja构建
ninja -C build -j$(nproc)
# 安装
sudo ninja -C build install
# 配置
sudo mkdir -p /home/postgres
sudo useradd -r -m -d /home/postgres -s /bin/bash postgres
sudo chown -R postgres:postgres /usr/local/pgsql /home/postgres
# service
sudo vim /etc/systemd/system/postgresql.service
[Unit]
Description=PostgreSQL 18 database server
After=network.target
[Service]
Type=notify
User=postgres
Group=postgres
Environment=PGDATA=/home/postgres
ExecStart=/usr/local/pgsql/bin/postgres -D /home/postgres
ExecReload=/bin/kill -HUP $MAINPID
KillMode=mixed
TimeoutSec=30
[Install]
WantedBy=multi-user.target
# 初始化数据库,只做一次
sudo -u postgres /usr/local/pgsql/bin/initdb -D /home/postgres
# 启动服务
sudo systemctl daemon-reload
sudo systemctl enable postgresql
sudo systemctl start postgresql
# 查看状态
systemctl status postgresql
journalctl -u postgresql -f
允许远程连接
sudo vim /home/postgres/pg_hba.conf
# TYPE DATABASE USER ADDRESS METHOD
local all all trust
host all all 127.0.0.1/32 trust
host all all ::1/128 trust
host all all 0.0.0.0/0 trustFlink需要的CDC
修改配置,支持Flink。sudo vim /home/postgres/postgresql.conf
# ===== 基础 =====
listen_addresses = '*'
port = 5432
# ===== Flink CDC 核心 =====
wal_level = logical
max_replication_slots = 8
max_wal_senders = 8
max_logical_replication_workers = 8
# ===== 推荐 =====
shared_buffers = 512MB
work_mem = 16MB
maintenance_work_mem = 128MB
# ===== zstd =====
wal_compression = on重启服务
sudo systemctl start postgresql
sudo -u postgres /usr/local/pgsql/bin/psql postgres
# 远程连接必须为用户设定密码
ALTER USER postgres WITH PASSWORD 'Pg@123456';
创建用户
设定用户的密码
# 创建flink用的用户
CREATE USER flinkuser WITH REPLICATION PASSWORD 'flink123';
ALTER USER flinkuser WITH SUPERUSER;
CREATE PUBLICATION flink_pub FOR ALL TABLES;
SELECT * FROM pg_publication;
Flink CDC连接信息
source:
type: postgres-cdc
hostname: localhost
port: 5432
username: flinkuser
password: flink123
database-name: postgres
schema-name: public
table-name: "public.*"
decoding.plugin.name: test_decoding
slot.name: flink_slot创建数据库
CREATE DATABASE db_jygt;
# 查看是否存在
\l
# 切换到数据库
\c db_jygt
\c dbname
切换数据库
\l
列出所有数据库
\dt
当前库列出表
\d t_abcjs_staff
查看表结构
\q
退出 psql授权
GRANT CONNECT ON DATABASE db_jygt TO flinkuser;
# 创建 通知,这个是CDC需要的
CREATE PUBLICATION flink_pub FOR ALL TABLES;
# 选择
SELECT * FROM pg_publication;
# Flink CDC这样使用
database-name: db_jygt
username: flinkuser
password: flink123
decoding.plugin.name: test_decoding
-- 查看所有 publication
SELECT * FROM pg_publication;
-- 查看 publication 包含的表
SELECT * FROM pg_publication_tables;创建表
CREATE SCHEMA IF NOT EXISTS db_jygt;
SET search_path = db_jygt;
CREATE TABLE t_abcjs_staff (
f_id SERIAL PRIMARY KEY,
f_name VARCHAR(255) NOT NULL,
f_content TEXT NOT NULL,
f_state INT DEFAULT 0,
f_create_time TIMESTAMP,
f_modify_time TIMESTAMP NOT NULL
DEFAULT CURRENT_TIMESTAMP,
f_user_id INT,
f_catalog INT
);
CREATE INDEX idx_name
ON t_abcjs_staff (f_name);
CREATE INDEX idx_user_id
ON t_abcjs_staff (f_user_id);
-- 触发器
CREATE OR REPLACE FUNCTION db_jygt.update_modify_time()
RETURNS TRIGGER AS $$
BEGIN
NEW.f_modify_time = CURRENT_TIMESTAMP;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER trg_t_abcjs_staff_modify_time
BEFORE UPDATE ON db_jygt.t_abcjs_staff
FOR EACH ROW
EXECUTE FUNCTION db_jygt.update_modify_time();
db_jygt=# \d t_abcjs_staff;
Table "db_jygt.t_abcjs_staff"
Column | Type | Collation | Nullable | Default
---------------+-----------------------------+-----------+----------+---------------------------------------------
f_id | integer | | not null | nextval('t_abcjs_staff_f_id_seq'::regclass)
f_name | character varying(255) | | not null |
f_content | text | | not null |
f_state | integer | | | 0
f_create_time | timestamp without time zone | | |
f_modify_time | timestamp without time zone | | not null | CURRENT_TIMESTAMP
f_user_id | integer | | |
f_catalog | integer | | |
Indexes:
"t_abcjs_staff_pkey" PRIMARY KEY, btree (f_id)
"idx_name" btree (f_name)
"idx_user_id" btree (f_user_id)
Publications:
"flink_pub"
Triggers:
trg_t_abcjs_staff_modify_time BEFORE UPDATE ON t_abcjs_staff FOR EACH ROW EXECUTE FUNCTION update_modify_time()
MySQL
下载
g编译
sudo apt update
sudo apt install -y gcc g++ cmake make bison flex libncurses-dev libssl-dev libz-dev libcurl4-openssl-dev libboost-all-dev git pkg-config
# 依赖tirpc
sudo apt install libtirpc-dev
# 建编译目录
mkdir build && cd build
# CMake 配置(生产优化,关闭 debug)
cmake .. \
-DCMAKE_INSTALL_PREFIX=/usr/local/mysql \
-DMYSQL_DATADIR=/var/lib/mysql \
-DMYSQL_UNIX_ADDR=/var/run/mysql/mysql.sock \
-DMYSQL_USER=mysql \
-DDEFAULT_CHARSET=utf8mb4 \
-DDEFAULT_COLLATION=utf8mb4_unicode_ci \
-DWITH_INNOBASE_STORAGE_ENGINE=1 \
-DWITH_MYISAM_STORAGE_ENGINE=1 \
-DWITH_SSL=system \
-DWITH_ZLIB=system \
-DWITH_BOOST=bundled \
-DENABLED_LOCAL_INFILE=1 \
-DWITH_DEBUG=0 \
-DWITH_AUTHENTICATION_CLIENT_PLUGINS=ON \
-DCMAKE_BUILD_TYPE=RelWithDebInfo
# 编译(用全部 CPU 核心,快)
make -j$(nproc)
# 安装到系统
sudo make install创建用户
sudo mkdir -p /data01/mysql
sudo useradd mysql -d /data01/mysql/
sudo chown -R mysql:mysql /data01/mysql
sudo su - mysql
mkdir log run lib
初始化
# 初始化(生成临时 root 密码)
sudo /usr/local/mysql/bin/mysqld \
--initialize \
--user=mysql \
--basedir=/usr/local/mysql \
--datadir=/data01/mysql/lib/
# 注意如下内容
2026-05-29T04:17:54.777428Z 6 [Note] [MY-010454] [Server] A temporary password is generated for root@localhost: Tq(aOW9x3TPy
sudo tee /etc/my.cnf >/dev/null <<EOF
[mysqld]
basedir=/usr/local/mysql
datadir=/data01/mysql/lib/
socket=/data01/mysql/run/mysql.sock
mysqlx_socket=/data01/mysql/run/mysqlx.sock
pid-file=/data01/mysql/run/mysqld.pid
log_error=/data01/mysql/log/error.log
default-storage-engine=InnoDB
character-set-server=utf8mb4
collation-server=utf8mb4_unicode_ci
max_connections=200
bind-address=0.0.0.0
port=3306
skip-networking = OFF
# Flink CDC 配置
server-id = 100
log_bin = /data01/mysql/lib/mysql-bin
binlog_format = ROW
binlog_row_image = FULL
# 7天过期,单位秒
binlog_expire_logs_seconds = 604800
binlog_transaction_compression = OFF
EOF# systemd 服务
sudo tee /etc/systemd/system/mysqld.service >/dev/null <<EOF
[Unit]
Description=MySQL Server
After=network.target
[Service]
User=mysql
Group=mysql
ExecStart=/usr/local/mysql/bin/mysqld --defaults-file=/etc/my.cnf
Restart=on-failure
[Install]
WantedBy=multi-user.target
EOF
# 启动 + 开机自启
sudo systemctl daemon-reload
sudo systemctl start mysqld
sudo systemctl enable mysqld
# 查看 3306 端口监听
ss -tulpn | grep 3306
# 或
netstat -tulpn | grep 3306
# 使用临时密码登录进去之后,需要修改
ALTER USER 'root'@'localhost' IDENTIFIED BY 'root1234';
-- 先刷新权限(必须步骤)
FLUSH PRIVILEGES;
-- 如需远程登录,一并设置
CREATE USER 'root'@'%' IDENTIFIED BY 'root1234';
GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' WITH GRANT OPTION;
FLUSH PRIVILEGES;
FLUSH PRIVILEGES;
exit;
-- 如果用dbeaver连接,jdbc:mysql://192.168.7.87:3306/?allowPublicKeyRetrieval=true&useSSL=false创建DB和表
mysql> create database db_jygt;
Query OK, 1 row affected (0.86 sec)
mysql> use db_jygt
Database changed
mysql> CREATE TABLE `t_abcjs_staff` (
-> `f_id` int(11) NOT NULL AUTO_INCREMENT,
-> `f_name` varchar(255) COLLATE utf8mb4_bin NOT NULL,
-> `f_content` text COLLATE utf8mb4_bin NOT NULL,
-> `f_state` int(11) DEFAULT '0',
-> `f_create_time` datetime DEFAULT NULL,
-> `f_modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
-> `f_user_id` int(11) DEFAULT NULL,
-> `f_catalog` int(11) DEFAULT NULL,
-> PRIMARY KEY (`f_id`) USING BTREE,
-> KEY `idx_name` (`f_name`) USING BTREE,
-> KEY `idx_user_id` (`f_user_id`)
-> ) ENGINE=InnoDB AUTO_INCREMENT=110265 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
Query OK, 0 rows affected, 4 warnings (2.85 sec)
CDC用户
CREATE USER 'flink_cdc'@'%' IDENTIFIED BY 'fl@123456';
GRANT SELECT, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'flink_cdc'@'%';
FLUSH PRIVILEGES;
-- 看 binlog 是否开启
show variables like 'log_bin'; -- ON
show variables like 'binlog_format'; -- ROW
show variables like 'binlog_row_image'; -- FULL
-- 看 server-id
show variables like 'server_id'; -- 100
# 作为目的端的权限
CREATE USER 'flink_sink'@'%' IDENTIFIED BY 'fl@123456';
GRANT INSERT, UPDATE, DELETE, ALTER, CREATE, DROP ON *.* TO 'flink_sink'@'%';
FLUSH PRIVILEGES;Flink
启动集群后,运行命令flink-dist/target/flink-2.2.1-bin/flink-2.2.1/bin/sql-client.sh,进入交互界面:
创建源表
DROP TABLE mysql_t_abcjs_staff;
CREATE TABLE mysql_t_abcjs_staff (
f_id INT,
f_name STRING,
f_content STRING,
f_state INT,
f_create_time TIMESTAMP(3),
f_modify_time TIMESTAMP(3),
f_user_id INT,
f_catalog INT,
PRIMARY KEY (f_id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'www.aigrow.space',
'port' = '3306',
'username' = 'user',
'password' = 'user@1234',
'database-name' = 'db_jygt',
'table-name' = 't_abcjs_staff',
'server-id' = '5401',
'server-time-zone' = 'Asia/Shanghai'
);
创建目标表
CREATE TABLE pg_t_abcjs_staff (
f_id INT,
f_name STRING,
f_content STRING,
f_state INT,
f_create_time TIMESTAMP(3),
f_modify_time TIMESTAMP(3),
f_user_id INT,
f_catalog INT,
PRIMARY KEY (f_id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://192.168.7.186:5432/db_jygt',
'table-name' = 'db_jygt.t_abcjs_staff',
'username' = 'postgres',
'password' = 'Pg@123456',
'driver' = 'org.postgresql.Driver'
);
SHOW TABLES;启动
Flink SQL> INSERT INTO pg_t_abcjs_staff SELECT * FROM mysql_t_abcjs_staff;这里总报错,无法匹配到适应的数据源connnector。只好降级到1.18.1
# 下载mysql postgresql jdb的驱动
wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-jdbc/3.2.0-1.18/flink-connector-jdbc-3.2.0-1.18.jar
wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.27/mysql-connector-java-8.0.27.jar
wget https://repo1.maven.org/maven2/org/postgresql/postgresql/42.7.4/postgresql-42.7.4-all.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-mysql-cdc/3.2.1/flink-sql-connector-mysql-cdc-3.2.1.jar