核心内容摘要
柚子猫内射频:解锁耳朵里的奇幻旅程
CLI 的工作方式它连接谁bin/flink会连接到 Flink 配置文件里指定的 JobManager或你在命令里用--jobmanager host:port指定的 JM。
前提必须有一个可用的 Flink 部署本地、YARN、K8S、Standalone Session 等。
提交作业run最常用的入口
1 提交一个 JAR推荐加 --detached./bin/flink run\--detached\./examples/streaming/StateMachineExample.jar--detached提交完就返回不会一直挂在终端等作业结束输出里会给出JobID后续 list/savepoint/stop/cancel 都靠它把 JobID 存变量后续更方便exportJOB_IDcca7bc1061d61cf15238e92312c2fc
2
2 用 -D 传递配置发布时非常关键run支持-D传递额外配置例如./bin/flink run --detached\-Dpipeline.max-parallelism120\./your-job.jar这个能力对Application Mode特别重要你可以不改flink-conf.yaml直接在提交时把内存、并发、checkpoint 等配置传进集群。
注意提交到已存在的 Session 集群时一般只支持执行相关参数execution config生效别指望所有参数都能“改动集群级别行为”。
监控作业list查运行中/排队中./bin/flink list它会列出Running/Restarting Jobs运行中/重启中Scheduled Jobs已提交但尚未启动实战习惯提交后第一件事 list 一下确认 Job 状态不是立刻 FAILED/RESTARTING。
Savepoint可控的“状态快照”用于迁移/升级/回滚
1 创建 Savepoint./bin/flink savepoint\$JOB_ID\/tmp/flink-savepointssavepoint 目录可选如果execution.checkpointing.savepoint-dir没配置就必须在命令里带上成功后会返回一个 savepoint 路径后续--fromSavepoint用它
2 Savepoint 触发超时怎么办用 detached状态大时客户端等待 savepoint 完成可能超时TimeoutException。
解决方式是“触发后立刻返回”./bin/flink savepoint\$JOB_ID\/tmp/flink-savepoints\-detached这会返回一个 triggerId之后可以通过 REST API 查询该 trigger 的完成状态CLI 文档也建议这么做。
3 Dispose Savepoint删除 savepoint 数据与元信息./bin/flink savepoint\--dispose\/tmp/flink-savepoints/savepoint-xxx\$JOB_ID注意一个坑如果你的状态里有自定义 state/自定义类尤其 RocksDB statedispose 时可能需要提供原作业 jar否则会 ClassNotFound./bin/flink savepoint\--disposesavepointPath\--jarfilejarFile
手动触发 Checkpoint更偏“运维诊断/临时保底”./bin/flink checkpoint$JOB_ID如果你的作业默认跑的是 incremental checkpoint但你想强制做一次 full checkpoint./bin/flink checkpoint$JOB_ID--fullCheckpoint 和 Savepoint 的关键差异实战理解版Checkpoint系统为容错自动做也可手动触发更偏“持续容错”Savepoint人为控制用于“迁移/升级/回滚/停止再启动”
停作业stop vs cancel一个优雅一个粗暴
1 stop优雅停止并创建最终 Savepoint强烈推荐用于可恢复停机./bin/flink stop\--savepointPath /tmp/flink-savepoints\$JOB_IDstop 的语义是“从 source 到 sink”平滑停让 source 发最后一次 barrier生成 savepointsavepoint 成功后source 调用 cancel() 结束如果你要“彻底停机并清空事件时间相关的等待”可以加--drain./bin/flink stop\--savepointPath /tmp/flink-savepoints\--drain\$JOB_ID--drain会发送 MAX_WATERMARK触发 event-time timer比如窗口把“该出结果的都出完”。
注意想将来从 savepoint 恢复继续跑通常不要 drain否则可能引入恢复后的语义问题。
2 cancel直接取消不保证状态一致性/不做最终保存./bin/flink cancel$JOB_ID文档里提到--withSavepoint在 cancel 时顺便做 savepoint 这个功能已 deprecated生产建议用 stop 来做“取消 最终 savepoint”。
从 Savepoint 启动作业升级/迁移的核心套路./bin/flink run\--detached\--fromSavepoint /tmp/flink-savepoints/savepoint-xxx\./your-job.jar如果你的新版本作业删掉了某些算子导致 savepoint 里有“无法恢复的状态”但你仍想启动可以加./bin/flink run\--fromSavepointsavepointPath\--allowNonRestoredState\...这是“兼容演进”常用开关但它也意味着你明确接受丢弃某些旧状态。
CLI Actions 速查表你每天会用到的run提交并运行作业JAR/PyFlinkinfo打印优化后的执行图排查 SQL/Plan 很有用list列出运行/排队作业savepoint触发/清理 savepointcheckpoint手动触发 checkpoint含 fullstop优雅停止并生成最终 savepointcancel直接取消帮助命令./bin/flink --help ./bin/flinkaction--help
选择部署目标–target 一把梭Session / Application--target会覆盖execution.target的配置。
常见组合YARN./bin/flink run --target yarn-session... ./bin/flink run --target yarn-application...Kubernetes./bin/flink run --target kubernetes-session... ./bin/flink run --target kubernetes-application...Standalone./bin/flink run --targetlocal... ./bin/flink run --target remote...理解建议session提交到已存在集群共享 JM/TMapplication提交时起一个专属集群更适合隔离、参数化、CI/CD
PyFlink 提交不用 jar但要管 Python 环境与依赖
1
1 基础运行./bin/flink run --python examples/python/table/word_count.py先确认 Python 版本 ≥
9python --version
1
2 带依赖文件–pyFiles./bin/flink run\--python your_job.py\--pyFiles file:///user.txt,hdfs:///path/username.txt--pyFiles会加到 PYTHONPATH客户端与远端 python worker 都能用。
1
3 Python 里引用 Java UDF 或外部 connector–jarfile./bin/flink run\--python your_job.py\--jarfile your-udf-or-connector.jar
1
4 用模块方式提交–pyModule./bin/flink run\--pyModule word_count\--pyFiles examples/python/table
1
5 YARN application 模式跑 PyFlink典型生产形态你可以通过-D把 JM/TM 内存、应用名、ship-files 等都带上还能指定 venv、python 可执行文件./bin/flink run -t yarn-application\-Djobmanager.memory.process.size1024m\-Dtaskmanager.memory.process.size1024m\-Dyarn.application.nameApplicationName\-Dyarn.ship-files/path/to/shipfiles\-pyarch shipfiles/venv.zip\-pyclientexec venv.zip/venv/bin/python3\-pyexec venv.zip/venv/bin/python3\-pyfs shipfiles\-pym word_count一个现实限制-pyarch通过 blob server 分发单个归档文件大小上限 2GB超过要放到分布式文件系统再引用。
1