So I use the debezium key.field.name
in my MySQL source connector to add a field into my topic.
The message looks below after landing on topic.
{"id":20,"__PKtableowner":"reviewDB.review.search_user_02"}:{"before":null,"after":{"search_user_all_shards.Value":{"id":20,"name":{"string":"oliver"},"email":{"string":"[email protected]"},"department":{"string":"sales"},"modified":"2021-01-06T09:27:02Z"}},"source":{"version":"1.0.2.Final","connector":"mysql","name":"reviewDB","ts_ms":1609925222000,"snapshot":{"string":"false"},"db":"review","table":{"string":"search_user_02"},"server_id":1,"gtid":null,"file":"binlog.000002","pos":14630,"row":0,"thread":{"long":13},"query":null},"op":"c","ts_ms":{"long":1609925222956}}
Where in,
key is
{"id":20,"__PKtableowner":"reviewDB.review.search_user_02"}
and value is
{"before":null,"after":{"search_user_all_shards.Value":{"id":20,"name":{"string":"oliver"},"email":{"string":"[email protected]"},"department":{"string":"sales"},"modified":"2021-01-06T09:27:02Z"}},"source":{"version":"1.0.2.Final","connector":"mysql","name":"reviewDB","ts_ms":1609925222000,"snapshot":{"string":"false"},"db":"review","table":{"string":"search_user_02"},"server_id":1,"gtid":null,"file":"binlog.000002","pos":14630,"row":0,"thread":{"long":13},"query":null},"op":"c","ts_ms":{"long":1609925222956}}
As part of my sink hdfsSinkConnector I need to fetch the message key "__PKtableowner":"reviewDB.review.search_user_02
as part of a column or field in hdfs or hive.
The only SMT I found is ValueToKey, but it seems it didn't fit my use case because it's fetching from the value and not from the message key. I've tried (InsertField, CreateKey, ExtractField, etc.) Almost all of the transformation you can find here but no luck.
https://docs.confluent.io/platform/current/connect/transforms/valuetokey.html
I'm looking for a KeyToValue kind of SMT or if there are other workaround.
Below are my source and sink configurations.
Source:
{
"name": "REVIEW__MYSQL__search_user__source",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.history.kafka.topic": "review.search_user_logs",
"database.history.consumer.max.block.ms": "3000",
"include.schema.changes": "false",
"database.history.consumer.session.timeout.ms": "30000",
"database.history.kafka.consumer.group": "compose-connect-group",
"snapshot.new.tables": "parallel",
"database.history.kafka.sasl.mechanism": "GSSAPI",
"database.whitelist": "review",
"database.history.producer.sasl.mechanism": "GSSAPI",
"database.user": "root",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"time.precision.mode": "connect",
"database.server.name": "reviewDB",
"database.port": "3306",
"database.history.consumer.heartbeat.interval.ms": "1000",
"min.row.count.to.stream.results": "0",
"database.hostname": "mysql",
"database.password": "example",
"database.history.consumer.sasl.mechanism": "GSSAPI",
"snapshot.mode": "when_needed",
"table.whitelist": "review.search_user_(.*)",
"transforms": "Reroute",
"transforms.Reroute.type": "io.debezium.transforms.ByLogicalTableRouter",
"transforms.Reroute.topic.regex": "reviewDB.review.search_user_(.*)",
"transforms.Reroute.topic.replacement": "search_user_all_shards",
"transforms.Reroute.key.field.name": "__PKtableowner"
}
}
Sink
{ "name": "REVIEW__MYSQL__search_user__sink",
"config":
{
"connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
"topics.dir": "/_incr_files",
"flush.size": 1,
"tasks.max": 1,
"timezone": "UTC",
"rotate.interval.ms": 5000,
"locale": "en",
"hadoop.home": "/etc/hadoop",
"logs.dir": "/_incr_files_wal",
"hive.integration": "false",
"partition.duration.ms": "20000",
"hadoop.conf.dir": "/etc/hadoop",
"topics": "search_user_all_shards",
"hdfs.url": "hdfs://namenode:9000",
"transforms": "unwrap,insertTopicOffset,insertTimeStamp",
"transforms.insertTimeStamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.unwrap.drop.tombstones": "true",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.insertTimeStamp.timestamp.field": "spdb_landing_timestamp",
"transforms.insertTopicOffset.offset.field": "spdb_topic_offset",
"transforms.insertTopicOffset.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"schema.compatibility": "NONE",
"path.format": "'partition'=YYYY-MM-dd-HH",
"partitioner.class": "io.confluent.connect.hdfs.partitioner.TimeBasedPartitioner"
}
}