Catalog(目录)提供了关于数据库、表格和访问数据所需的信息的元数据,以及统一的 API 来管理元数据,验证连接,让元数据对 Sources(数据源)、Sinks(数据汇)和 Web 可访问。

Catalog 让用户能够引用其数据系统中的现有元数据,并自动映射到 SeaTunnel 的对应元数据。总之,Catalog 大大简化了使用用户现有系统开始使用 SeaTunnel 的步骤,并显著增强了用户体验。

Catalog 功能的重要性

目前,许多现有功能都是基于 Catalog 实现的,例如 CDC(变更数据捕获)多表同步功能,我们使用 Catalog 获取表格和字段列表。


【资料图】

Apache SeaTunnel 目前正在设计一个叫做 SaveMode 的功能,它是由连接器实现的,用于支持目标表中现有表格结构和数据的处理。这些功能也是基于 Catalog 实现的。

Catalog 是如何设计的?如何实现一个新的 Catalog?以下是详细介绍。

Catalog API

初始化操作

注意:目录名称目前没有被使用,预计会提供给 Web 后端进行保存和查询。

Javapublic interface CatalogFactory extends Factory { String factoryIdentifier(); OptionRule optionRule(); Catalog createCatalog(String catalogName, ReadonlyConfig options); } public interface Catalog extends AutoCloseable { void open() throws CatalogException; void close() throws CatalogException; }

数据库操作

javapublic interface Catalog extends AutoCloseable { // -------------------------------------------------------------------------------------------- // 数据库 // -------------------------------------------------------------------------------------------- String getDefaultDatabase() throws CatalogException; boolean databaseExists(String databaseName) throws CatalogException; List listDatabases() throws CatalogException; void createDatabase(String databaseName, boolean ignoreIfExists) throws DatabaseAlreadyExistException, CatalogException; void dropDatabase(String databaseName, boolean ignoreIfNotExists) throws DatabaseNotExistException, CatalogException; }

表格操作

javapublic interface Catalog extends AutoCloseable { // -------------------------------------------------------------------------------------------- // 表格 // -------------------------------------------------------------------------------------------- List listTables(String databaseName) throws CatalogException, DatabaseNotExistException; boolean tableExists(TablePath tablePath) throws CatalogException; CatalogTable getTable(TablePath tablePath) throws CatalogException, TableNotExistException; void createTable(TablePath tablePath, CatalogTable table, boolean ignoreIfExists) throws TableAlreadyExistException, DatabaseNotExistException, CatalogException; void dropTable(TablePath tablePath, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException; }

这里是一个已经实现的示例。

MySQL Catalog

MySQL Catalog 的使用方式:

配置文件配置

conf[source/sink] { [connector-factory-id] { catalog { factory = "MySQL" username = "test" password = "123456" base-url = "jdbc:mysql://localhost:5432/db" table-names = [ "db.table" ] } } }

如何使用 Catalog

对于支持 Catalog 的连接器,我们将打开一个 Catalog 参数来配置所使用的 Catalog:

示例

sqlenv { "job.mode"=STREAMING "job.name"="cdc_mysql_to_mysql" "checkpoint.interval"="2000" "custom_parameters"="" } source { MySQL-CDC { parallelism = 1 catalog { factory = "MySQL" # 默认情况下,Catalog 将使用与连接器同名的选项 } username = "mysqluser" password = "mysqlpw" database-names = ["seatunnel-test"] table-pattern = "seatunnel-test\\.orders_\\d+" base-url = "jdbc:mysql://localhost:54508/seatunnel-test" } } sink { jdbc { url = "jdbc:mysql://localhost:4000/test" driver = "com.mysql.cj.jdbc.Driver" catalog { factory = "MySQL" username = "root" password = "" base-url = "jdbc:mysql://localhost:4000/test" table-pattern = "seatunnel-test2\\.orders_\\d+" } user = "root" password = "" query = "insert into sink(age, name) values(?,?)" } }

未来规划

目前,我们只实现了部分 Catalog。未来,我们计划扩大 Catalog 的实现范围,包括更多支持 Catalog 的连接器,这将使更多的连接器支持 SaveMode 和自动表格创建等功能。

Apache SeaTunnel 是一个分布式、高性能、易扩展、用于海量数据(离线&实时)同步和转化的数据集成平台

衷心欢迎更多人加入!

我们相信,在「Community Over Code」(社区大于代码)、「Open and Cooperation」(开放协作)、「Meritocracy」(精英管理)、以及「多样性与共识决策」等 The Apache Way 的指引下,我们将迎来更加多元化和包容的社区生态,共建开源精神带来的技术进步!

我们诚邀各位有志于让本土开源立足全球的伙伴加入 SeaTunnel 贡献者大家庭,一起共建开源!

本文由 白鲸开源 提供发布支持!

推荐内容