修炼者
修炼者
发布于 2026-05-29 / 4 阅读
0
0

Flink

安装

下载

#直接使用构建好的包开始
 wget https://archive.apache.org/dist/flink/flink-1.18.1/flink-1.18.1-bin-scala_2.12.tgz  

#基于源码则从这里: 一、克隆后切换
# 1. 克隆 Flink 源码
git clone https://github.com/apache/flink.git
cd flink
# 2. 拉取所有标签 & 远程分支
git fetch --tags

# 3. 切换到指定 tag: release-2.2.1
git checkout release-2.2.1

# 查看当前版本确认
git status
git describe --tags

# 二、只拉取指定tag
git clone --depth 1 --branch release-2.2.1 https://github.com/apache/flink.git flink-2.2.1
cd flink-2.2.1

编译环境

使用国内镜像

mkdir -p .mvn
vim .mvn/settings.xml

<?xml version="1.0" encoding="UTF-8"?>
<settings xmlns="http://maven.apache.org/SETTINGS/1.0.0"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/SETTINGS/1.0.0 http://maven.apache.org/xsd/settings-1.0.0.xsd">

  <localRepository>${user.home}/.m2/repository</localRepository>

  <mirrors>
    <!-- 阿里云中央仓库,拦截所有请求 -->
    <mirror>
      <id>aliyunmaven</id>
      <name>阿里云公共仓库</name>
      <url>https://maven.aliyun.com/repository/public</url>
      <mirrorOf>*,!apache-snapshots</mirrorOf>
    </mirror>
    <!-- Apache 快照源(仅快照包使用) -->
    <mirror>
      <id>apache-snapshots</id>
      <name>Apache Snapshots</name>
      <url>https://repo.apache.org/snapshots/</url>
      <mirrorOf>apache-snapshots</mirrorOf>
    </mirror>
  </mirrors>

  <!-- 可选:统一仓库配置,强制不走红帽源 -->
  <profiles>
    <profile>
      <id>aliyun</id>
      <repositories>
        <repository>
          <id>central</id>
          <url>https://maven.aliyun.com/repository/public</url>
          <releases><enabled>true</enabled></releases>
          <snapshots><enabled>false</enabled></snapshots>
        </repository>
        <repository>
          <id>redhat-ga</id>
          <url>https://maven.aliyun.com/repository/public</url>
          <releases><enabled>true</enabled></releases>
          <snapshots><enabled>false</enabled></snapshots>
        </repository>
      </repositories>
    </profile>
  </profiles>
  <activeProfiles>
    <activeProfile>aliyun</activeProfile>
  </activeProfiles>
</settings>

Java版本

# 用21构建有错误,切换为17
$ sdk use java 17.0.19-amzn
$ sdk use maven 3.9.14

构建

./mvnw clean package -DskipTests -Djdk17 -Pjava17-target

运行

构建成功后,可看到如下目录内容,这里就是二进制发行包。

$flink/flink-dist/target/flink-2.2.1-bin/flink-2.2.1$ tree -L 1
.
├── LICENSE
├── README.txt
├── bin
├── conf
├── examples
├── lib
├── log
├── opt
└── plugins

7 directories, 2 files

# 使用如下命令启动flink

启动集群

# 修改服务端口
vi flink-dist/target/flink-2.2.1-bin/flink-2.2.1/conf/config.yaml
#==============================================================================
# Rest & web frontend
#==============================================================================

rest:
  # The address to which the REST client will connect to
  address: localhost
  # The address that the REST & web server binds to
  # By default, this is localhost, which prevents the REST & web server from
  # being able to communicate outside of the machine/container it is running on.
  #
  # To enable this, set the bind address to one that has access to outside-facing
  # network interface, such as 0.0.0.0.
  # bind-address: localhost
  # # The port to which the REST client connects to. If rest.bind-port has
  # # not been specified, then the server will bind to this port as well.
  port: 7081
  # # Port range for the REST and web server to bind to.
  # bind-port: 8080-8090


$./bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host pc-bdm.
Starting taskexecutor daemon on host pc-bdm.
#然后可访问页面 http://localhost:7081 

启动示例作业

$ ./bin/flink run examples/streaming/WordCount.jar --input README.txt --output /tmp/wordcount-result

Job has been submitted with JobID 729e5212662f9028c7e318765cdcde9b
Program execution finished
Job with JobID 729e5212662f9028c7e318765cdcde9b has finished.
Job Runtime: 1898 ms
# 然后可看到如下输出
$ cat /tmp/wordcount-result/2026-05-28--14/part-e5ae6dc8-24d3-4d01-9ec0-e42ab6f0467e-0
(for,1)
(the,1)
(latest,1)
(information,1)
(about,1)
(flink,1)
......

停止集群

./bin/stop-cluster.sh

功能测试

MySQL->PostgreSQL的CDC同步

需要构建连接器,然后执行Flink的脚本。

PostgreSQL

# 不同版本的源码: https://ftp.postgresql.org/pub/source/
wget https://ftp.postgresql.org/pub/source/v18.1/postgresql-18.1.tar.bz2
tar xjvf postgresql-18.1.tar.bz2

sudo apt install meson

# 支持systemd
sudo apt update
sudo apt install -y libzstd-dev libsystemd-dev
sudo apt install -y uuid-dev
sudo apt install -y liblz4-dev

# 使用meson构建ninja脚本
meson setup build --prefix=/usr/local/pgsql   -Dssl=openssl   -Dreadline=enabled   -Dlibxml=enabled   -Dicu=enabled

# 使用ninja构建
ninja -C build -j$(nproc)
# 安装
sudo ninja -C build install

# 配置
sudo mkdir -p /home/postgres
sudo useradd -r -m -d /home/postgres -s /bin/bash postgres
sudo chown -R postgres:postgres /usr/local/pgsql /home/postgres

# service
sudo vim /etc/systemd/system/postgresql.service
[Unit]
Description=PostgreSQL 18 database server
After=network.target

[Service]
Type=notify
User=postgres
Group=postgres

Environment=PGDATA=/home/postgres
ExecStart=/usr/local/pgsql/bin/postgres -D /home/postgres
ExecReload=/bin/kill -HUP $MAINPID
KillMode=mixed
TimeoutSec=30

[Install]
WantedBy=multi-user.target

# 初始化数据库,只做一次
sudo -u postgres /usr/local/pgsql/bin/initdb -D /home/postgres

# 启动服务
sudo systemctl daemon-reload
sudo systemctl enable postgresql
sudo systemctl start postgresql

# 查看状态
systemctl status postgresql
journalctl -u postgresql -f


允许远程连接

sudo vim /home/postgres/pg_hba.conf

# TYPE  DATABASE  USER   ADDRESS   METHOD
local   all       all              trust
host    all       all   127.0.0.1/32 trust
host    all       all   ::1/128      trust
host    all       all   0.0.0.0/0    trust

Flink需要的CDC

修改配置,支持Flink。sudo vim /home/postgres/postgresql.conf

# ===== 基础 =====
listen_addresses = '*'
port = 5432

# ===== Flink CDC 核心 =====
wal_level = logical
max_replication_slots = 8
max_wal_senders = 8
max_logical_replication_workers = 8

# ===== 推荐 =====
shared_buffers = 512MB
work_mem = 16MB
maintenance_work_mem = 128MB

# ===== zstd =====
wal_compression = on

重启服务

sudo systemctl start postgresql
sudo -u postgres /usr/local/pgsql/bin/psql postgres

# 远程连接必须为用户设定密码
ALTER USER postgres WITH PASSWORD 'Pg@123456';

创建用户

设定用户的密码

# 创建flink用的用户
CREATE USER flinkuser WITH REPLICATION PASSWORD 'flink123';
ALTER USER flinkuser WITH SUPERUSER;

CREATE PUBLICATION flink_pub FOR ALL TABLES;
SELECT * FROM pg_publication;

source:
  type: postgres-cdc
  hostname: localhost
  port: 5432
  username: flinkuser
  password: flink123
  database-name: postgres
  schema-name: public
  table-name: "public.*"
  decoding.plugin.name: test_decoding
  slot.name: flink_slot

创建数据库

CREATE DATABASE db_jygt;
# 查看是否存在
\l
# 切换到数据库
\c db_jygt

\c dbname
切换数据库
\l
列出所有数据库
\dt
当前库列出表
\d t_abcjs_staff
查看表结构
\q
退出 psql

授权

GRANT CONNECT ON DATABASE db_jygt TO flinkuser;

# 创建 通知,这个是CDC需要的
CREATE PUBLICATION flink_pub FOR ALL TABLES;
# 选择
SELECT * FROM pg_publication;

# Flink CDC这样使用
database-name: db_jygt
username: flinkuser
password: flink123
decoding.plugin.name: test_decoding


-- 查看所有 publication
SELECT * FROM pg_publication;

-- 查看 publication 包含的表
SELECT * FROM pg_publication_tables;

创建表

CREATE SCHEMA IF NOT EXISTS db_jygt;

SET search_path = db_jygt;

CREATE TABLE t_abcjs_staff (
    f_id            SERIAL PRIMARY KEY,
    f_name          VARCHAR(255) NOT NULL,
    f_content       TEXT NOT NULL,
    f_state         INT DEFAULT 0,
    f_create_time   TIMESTAMP,
    f_modify_time   TIMESTAMP NOT NULL
                      DEFAULT CURRENT_TIMESTAMP,
    f_user_id       INT,
    f_catalog       INT
);

CREATE INDEX idx_name
    ON t_abcjs_staff (f_name);

CREATE INDEX idx_user_id
    ON t_abcjs_staff (f_user_id);

-- 触发器
CREATE OR REPLACE FUNCTION db_jygt.update_modify_time()
RETURNS TRIGGER AS $$
BEGIN
    NEW.f_modify_time = CURRENT_TIMESTAMP;
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER trg_t_abcjs_staff_modify_time
BEFORE UPDATE ON db_jygt.t_abcjs_staff
FOR EACH ROW
EXECUTE FUNCTION db_jygt.update_modify_time();


db_jygt=# \d t_abcjs_staff;
                                          Table "db_jygt.t_abcjs_staff"
    Column     |            Type             | Collation | Nullable |                   Default
---------------+-----------------------------+-----------+----------+---------------------------------------------
 f_id          | integer                     |           | not null | nextval('t_abcjs_staff_f_id_seq'::regclass)
 f_name        | character varying(255)      |           | not null |
 f_content     | text                        |           | not null |
 f_state       | integer                     |           |          | 0
 f_create_time | timestamp without time zone |           |          |
 f_modify_time | timestamp without time zone |           | not null | CURRENT_TIMESTAMP
 f_user_id     | integer                     |           |          |
 f_catalog     | integer                     |           |          |
Indexes:
    "t_abcjs_staff_pkey" PRIMARY KEY, btree (f_id)
    "idx_name" btree (f_name)
    "idx_user_id" btree (f_user_id)
Publications:
    "flink_pub"
Triggers:
    trg_t_abcjs_staff_modify_time BEFORE UPDATE ON t_abcjs_staff FOR EACH ROW EXECUTE FUNCTION update_modify_time()

MySQL

下载

g

编译

sudo apt update
sudo apt install -y gcc g++ cmake make bison flex libncurses-dev libssl-dev libz-dev libcurl4-openssl-dev libboost-all-dev git pkg-config

# 依赖tirpc
sudo apt install libtirpc-dev

# 建编译目录
mkdir build && cd build

# CMake 配置(生产优化,关闭 debug)
cmake .. \
  -DCMAKE_INSTALL_PREFIX=/usr/local/mysql \
  -DMYSQL_DATADIR=/var/lib/mysql \
  -DMYSQL_UNIX_ADDR=/var/run/mysql/mysql.sock \
  -DMYSQL_USER=mysql \
  -DDEFAULT_CHARSET=utf8mb4 \
  -DDEFAULT_COLLATION=utf8mb4_unicode_ci \
  -DWITH_INNOBASE_STORAGE_ENGINE=1 \
  -DWITH_MYISAM_STORAGE_ENGINE=1 \
  -DWITH_SSL=system \
  -DWITH_ZLIB=system \
  -DWITH_BOOST=bundled \
  -DENABLED_LOCAL_INFILE=1 \
  -DWITH_DEBUG=0 \
  -DWITH_AUTHENTICATION_CLIENT_PLUGINS=ON \
  -DCMAKE_BUILD_TYPE=RelWithDebInfo

# 编译(用全部 CPU 核心,快)
make -j$(nproc)

# 安装到系统
sudo make install

创建用户

sudo mkdir -p /data01/mysql
sudo useradd mysql -d /data01/mysql/
sudo chown -R mysql:mysql /data01/mysql
sudo su - mysql
   mkdir log run lib

初始化

# 初始化(生成临时 root 密码)
sudo /usr/local/mysql/bin/mysqld \
  --initialize \
  --user=mysql \
  --basedir=/usr/local/mysql \
  --datadir=/data01/mysql/lib/

# 注意如下内容

2026-05-29T04:17:54.777428Z 6 [Note] [MY-010454] [Server] A temporary password is generated for root@localhost: Tq(aOW9x3TPy
sudo tee /etc/my.cnf >/dev/null <<EOF
[mysqld]
basedir=/usr/local/mysql
datadir=/data01/mysql/lib/
socket=/data01/mysql/run/mysql.sock
mysqlx_socket=/data01/mysql/run/mysqlx.sock
pid-file=/data01/mysql/run/mysqld.pid
log_error=/data01/mysql/log/error.log

default-storage-engine=InnoDB
character-set-server=utf8mb4
collation-server=utf8mb4_unicode_ci
max_connections=200

bind-address=0.0.0.0
port=3306
skip-networking = OFF

# Flink CDC 配置
server-id = 100
log_bin = /data01/mysql/lib/mysql-bin
binlog_format = ROW
binlog_row_image = FULL
# 7天过期,单位秒
binlog_expire_logs_seconds = 604800
binlog_transaction_compression = OFF
EOF
# systemd 服务
sudo tee /etc/systemd/system/mysqld.service >/dev/null <<EOF
[Unit]
Description=MySQL Server
After=network.target

[Service]
User=mysql
Group=mysql
ExecStart=/usr/local/mysql/bin/mysqld --defaults-file=/etc/my.cnf
Restart=on-failure

[Install]
WantedBy=multi-user.target
EOF

# 启动 + 开机自启
sudo systemctl daemon-reload
sudo systemctl start mysqld
sudo systemctl enable mysqld

# 查看 3306 端口监听
ss -tulpn | grep 3306
# 或
netstat -tulpn | grep 3306

# 使用临时密码登录进去之后,需要修改
ALTER USER 'root'@'localhost' IDENTIFIED BY 'root1234';

-- 先刷新权限(必须步骤)
FLUSH PRIVILEGES;

-- 如需远程登录,一并设置
CREATE USER 'root'@'%' IDENTIFIED BY 'root1234';
GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' WITH GRANT OPTION;
FLUSH PRIVILEGES;

FLUSH PRIVILEGES;
exit;

-- 如果用dbeaver连接,jdbc:mysql://192.168.7.87:3306/?allowPublicKeyRetrieval=true&useSSL=false

创建DB和表

mysql> create database db_jygt;
Query OK, 1 row affected (0.86 sec)

mysql> use db_jygt
Database changed
mysql> CREATE TABLE `t_abcjs_staff` (
    ->   `f_id` int(11) NOT NULL AUTO_INCREMENT,
    ->   `f_name` varchar(255) COLLATE utf8mb4_bin NOT NULL,
    ->   `f_content` text COLLATE utf8mb4_bin NOT NULL,
    ->   `f_state` int(11) DEFAULT '0',
    ->   `f_create_time` datetime DEFAULT NULL,
    ->   `f_modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    ->   `f_user_id` int(11) DEFAULT NULL,
    ->   `f_catalog` int(11) DEFAULT NULL,
    ->   PRIMARY KEY (`f_id`) USING BTREE,
    ->   KEY `idx_name` (`f_name`) USING BTREE,
    ->   KEY `idx_user_id` (`f_user_id`)
    -> ) ENGINE=InnoDB AUTO_INCREMENT=110265 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
Query OK, 0 rows affected, 4 warnings (2.85 sec)

CDC用户

CREATE USER 'flink_cdc'@'%' IDENTIFIED BY 'fl@123456';
GRANT SELECT, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'flink_cdc'@'%';
FLUSH PRIVILEGES;

-- 看 binlog 是否开启
show variables like 'log_bin'; -- ON
show variables like 'binlog_format'; -- ROW
show variables like 'binlog_row_image'; -- FULL
-- 看 server-id
show variables like 'server_id'; -- 100

# 作为目的端的权限
CREATE USER 'flink_sink'@'%' IDENTIFIED BY 'fl@123456';
GRANT INSERT, UPDATE, DELETE, ALTER, CREATE, DROP ON *.* TO 'flink_sink'@'%';
FLUSH PRIVILEGES;

启动集群后,运行命令flink-dist/target/flink-2.2.1-bin/flink-2.2.1/bin/sql-client.sh,进入交互界面:

创建源表

DROP TABLE mysql_t_abcjs_staff;


CREATE TABLE mysql_t_abcjs_staff (
    f_id INT,
    f_name STRING,
    f_content STRING,
    f_state INT,
    f_create_time TIMESTAMP(3),
    f_modify_time TIMESTAMP(3),
    f_user_id INT,
    f_catalog INT,
    PRIMARY KEY (f_id) NOT ENFORCED
) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'www.aigrow.space',
    'port' = '3306',
    'username' = 'user',
    'password' = 'user@1234',
    'database-name' = 'db_jygt',
    'table-name' = 't_abcjs_staff',
    'server-id' = '5401',
    'server-time-zone' = 'Asia/Shanghai'
                         
);

创建目标表

CREATE TABLE pg_t_abcjs_staff (
    f_id INT,
    f_name STRING,
    f_content STRING,
    f_state INT,
    f_create_time TIMESTAMP(3),
    f_modify_time TIMESTAMP(3),
    f_user_id INT,
    f_catalog INT,
    PRIMARY KEY (f_id) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:postgresql://192.168.7.186:5432/db_jygt',
    'table-name' = 'db_jygt.t_abcjs_staff',   
    'username' = 'postgres',
    'password' = 'Pg@123456',
    'driver' = 'org.postgresql.Driver'
);

SHOW TABLES;

启动

Flink SQL> INSERT INTO pg_t_abcjs_staff SELECT * FROM mysql_t_abcjs_staff;

这里总报错,无法匹配到适应的数据源connnector。只好降级到1.18.1

# 下载mysql postgresql jdb的驱动
wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-jdbc/3.2.0-1.18/flink-connector-jdbc-3.2.0-1.18.jar
wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.27/mysql-connector-java-8.0.27.jar
wget https://repo1.maven.org/maven2/org/postgresql/postgresql/42.7.4/postgresql-42.7.4-all.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-mysql-cdc/3.2.1/flink-sql-connector-mysql-cdc-3.2.1.jar


评论