ScalarDB を使って Cassandra の Partition を跨いだ Tx を実行してみる
先日 NewSQL/分散SQLデータベース よろず勉強会 #1 にて「Cassandra で Partition を跨いだ Tx を実行できるようにする Accord というものが開発中である」というお話を聞いたのですが、「それって ScalarDB でもできるのでは?」ということでやってみました。
注意
このブログの内容は、あくまで個人の趣味の範囲で ScalarDB を使った検証を実施しているだけですので、あしからず。
また、サンプルとして記載しているコードではエラーハンドリング (エラー発生時の ROLLBACK 等) は考慮していません。実際に ScalarDB を使ってアプリを作る場合はそのあたりも考えないといけないので注意してください。ScalarDB でのエラーハンドリングについては Handle Exceptions に記載されているので、興味がある方はそちらもどうぞ。
環境
今回は以下の環境/バージョンで検証しています。
- Ubuntu : 20.04 (WSL2)
- OpenJDK : 17
- Docker : 20.10.21
- ScalarDB : 3.7.0
検証内容概要
以下のような感じで、ScalarDB を使って Cassandra 上の (複数パーティションを跨いだ) レコードを更新します。
+----------------------+ | Java App | | +------------------+ | +-------------+ | | Scalar DB |--------------------------------->| Backend DB | | +------------------+ | | (Cassandra) | +----------------------+ +-------------+
検証
それでは早速検証してみます。
Cassandra 起動
Docker を使って Cassandra を起動します。localhost
でアクセスするために -p 9042:9042
オプションを付けておきます。
$ docker run -d --name cassandra -p 9042:9042 cassandra:3.11.14
$ docker ps CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES 196d8d087453 cassandra:3.11.14 "docker-entrypoint.s…" 23 seconds ago Up 22 seconds 7000-7001/tcp, 7199/tcp, 9160/tcp, 0.0.0.0:9042->9042/tcp, :::9042->9042/tcp cassandra
テーブル作成
今回は以下のようなテーブル foo
をネームスペース ns
上に作成します。(実際に Cassandra 上に作成されるテーブルには ScalarDB 用のメタデータを格納するカラムも含まれます。)
+---------------------+------+ | columnName | type | +---------------------+------+ | pk (Partition Key) | INT | | ck (Clustering Key) | INT | | v | INT | +---------------------+------+
上記のようなテーブルを、ScalarDB の Administrative API を使って作成します。具体的なコードは以下の通りです。
package com.example; import java.util.Properties; import com.scalar.db.api.DistributedTransactionAdmin; import com.scalar.db.api.TableMetadata; import com.scalar.db.io.DataType; import com.scalar.db.service.TransactionFactory; import com.scalar.db.api.Scan; public class CreateSchemaOnCassandra { public static void main(String[] args) throws Exception { Properties properties = new Properties(); properties.setProperty("scalar.db.contact_points", "localhost"); properties.setProperty("scalar.db.contact_port", "9042"); properties.setProperty("scalar.db.storage", "cassandra"); properties.setProperty("scalar.db.username", "cassandra"); properties.setProperty("scalar.db.password", "cassandra"); TransactionFactory transactionFactory = TransactionFactory.create(properties); DistributedTransactionAdmin admin = transactionFactory.getTransactionAdmin(); boolean ifNotExist = true; // Create coordinator tables for ScalarDB admin.createCoordinatorTables(ifNotExist); // Create namespace admin.createNamespace("ns", ifNotExist); // Define a table metadata TableMetadata tableMetadata = TableMetadata.newBuilder() .addColumn("pk", DataType.INT) .addColumn("ck", DataType.TEXT) .addColumn("v", DataType.BIGINT) .addPartitionKey("pk") .addClusteringKey("ck", Scan.Ordering.Order.ASC) .build(); // Create table admin.createTable("ns", "foo", tableMetadata, ifNotExist); admin.close(); } }
Cassandra に cqlsh で接続し、作成されたテーブルの情報を直接参照すると以下のようになっています。ScalarDB 関連のメタデータを含むカラムも併せて定義されていますが、pk
が Partition Key、ck
が Clustering Key として定義されていることが確認できます。
$ docker exec -it cassandra cqlsh Connected to Test Cluster at 127.0.0.1:9042. [cqlsh 5.0.1 | Cassandra 3.11.14 | CQL spec 3.4.4 | Native protocol v4] Use HELP for help. cqlsh> cqlsh> DESCRIBE TABLE ns.foo; CREATE TABLE ns.foo ( pk int, ck int, before_tx_committed_at bigint, before_tx_id text, before_tx_prepared_at bigint, before_tx_state int, before_tx_version int, before_v int, tx_committed_at bigint, tx_id text, tx_prepared_at bigint, tx_state int, tx_version int, v int, PRIMARY KEY (pk, ck) ) WITH CLUSTERING ORDER BY (ck ASC) AND bloom_filter_fp_chance = 0.01 AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'} AND comment = '' AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'} AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'} AND crc_check_chance = 1.0 AND dclocal_read_repair_chance = 0.1 AND default_time_to_live = 0 AND gc_grace_seconds = 864000 AND max_index_interval = 2048 AND memtable_flush_period_in_ms = 0 AND min_index_interval = 128 AND read_repair_chance = 0.0 AND speculative_retry = '99PERCENTILE'; cqlsh>
初期データの INSERT
検証用に以下のようなレコード (Partition Key が異なるレコード) を 2つ INSERT します。
+----+----+---+ | pk | ck | v | +----+----+---+ | 1 | 1 | 1 | | 2 | 2 | 2 | +----+----+---+
上記のようなデータを、ScalarDB の Transactional API を使って INSERT します (ScalarDB の API としては Put
を使います)。具体的なコードは以下の通りです。
package com.example; import java.util.Properties; import com.scalar.db.api.DistributedTransaction; import com.scalar.db.api.DistributedTransactionManager; import com.scalar.db.api.Put; import com.scalar.db.io.Key; import com.scalar.db.service.TransactionFactory; public class InsertRecordOnCassandra { public static void main(String[] args) throws Exception { Properties properties = new Properties(); properties.setProperty("scalar.db.contact_points", "localhost"); properties.setProperty("scalar.db.contact_port", "9042"); properties.setProperty("scalar.db.storage", "cassandra"); properties.setProperty("scalar.db.username", "cassandra"); properties.setProperty("scalar.db.password", "cassandra"); TransactionFactory transactionFactory = TransactionFactory.create(properties); DistributedTransactionManager manager = transactionFactory.getTransactionManager(); // Begin Tx DistributedTransaction transaction = manager.begin(); // INSERT INTO ns.foo (pk, ck, v) VALUES (1, 1, 1) Key partitionKey1 = Key.ofInt("pk", 1); Key clusteringKey1 = Key.ofInt("ck", 1); Put put1 = Put.newBuilder() .namespace("ns") .table("foo") .partitionKey(partitionKey1) .clusteringKey(clusteringKey1) .intValue("v", 1) .build(); transaction.put(put1); // INSERT INTO ns.foo (pk, ck, v) VALUES (2, 2, 2) Key partitionKey2 = Key.ofInt("pk", 2); Key clusteringKey2 = Key.ofInt("ck", 2); Put put2 = Put.newBuilder() .namespace("ns") .table("foo") .partitionKey(partitionKey2) .clusteringKey(clusteringKey2) .intValue("v", 2) .build(); transaction.put(put2); // Commit Tx transaction.commit(); manager.close(); } }
Cassandra に cqlsh で接続し、実際に格納されたレコードの内容を確認しておきます。Partition が異なる (Partition Key が異なる) レコードが 2つ INSERT されていることが確認できます。
cqlsh> SELECT pk, ck, v FROM ns.foo; pk | ck | v ----+----+--- 1 | 1 | 1 2 | 2 | 2 (2 rows)
実は、この時点でこの 2つのレコード (Partition が異なるレコード) は ScalarDB の Tx (同一の Tx) で INSERT されています。ScalarDB は Tx 用のメタデータをレコード内に保持する仕組みになっているため、Tx ID の情報を合わせて確認すると、「同じ Tx ID で (同じ Tx で) 2つのレコードが INSERT されている」ことが確認できます。
cqlsh> SELECT pk, ck, v, tx_id FROM ns.foo; pk | ck | v | tx_id ----+----+---+-------------------------------------- 1 | 1 | 1 | 53da60ca-5879-4e85-88d2-a7580d68e820 2 | 2 | 2 | 53da60ca-5879-4e85-88d2-a7580d68e820 (2 rows)
Partition を跨いだレコードの UPDATE
さて、INSERT の時点で ScalarDB を利用して「Cassandra の『Partition を跨いだ複数のレコード』を『同一 Tx』で更新」できることがほぼ確認できてしまいましたが、せっかくなので UPDATE も実行してみます。先ほど INSERT したレコードのカラム v
の値を以下のように UPDATE します。
+----+----+-----+ | pk | ck | v | +----+----+-----+ | 1 | 1 | 100 | | 2 | 2 | 200 | +----+----+-----+
上記のようなデータを、ScalarDB の Transactional API を使って UPDATE します (ScalarDB の API としては Put
を使います)。具体的なコードは以下の通りです。
※ScalarDB で既存のレコードを更新 (Put
) する場合は、一度対象となるレコードを参照 (Get
) する必要があります。このあたりの動作については こちらのスライド に記載されているので、興味がある方はこのスライドも参照してみてください。
package com.example; import java.util.Properties; import com.scalar.db.api.DistributedTransaction; import com.scalar.db.api.DistributedTransactionManager; import com.scalar.db.api.Get; import com.scalar.db.api.Put; import com.scalar.db.io.Key; import com.scalar.db.service.TransactionFactory; public class UpdateOnCassandra { public static void main(String[] args) throws Exception { Properties properties = new Properties(); properties.setProperty("scalar.db.contact_points", "localhost"); properties.setProperty("scalar.db.contact_port", "9042"); properties.setProperty("scalar.db.storage", "cassandra"); properties.setProperty("scalar.db.username", "cassandra"); properties.setProperty("scalar.db.password", "cassandra"); TransactionFactory transactionFactory = TransactionFactory.create(properties); DistributedTransactionManager manager = transactionFactory.getTransactionManager(); // Begin Tx DistributedTransaction transaction = manager.begin(); // UPDATE foo SET v = 100 WHERE pk = 1 AND ck = 1 Key partitionKey1 = Key.ofInt("pk", 1); Key clusteringKey1 = Key.ofInt("ck", 1); Get get1 = Get.newBuilder() .namespace("ns") .table("foo") .partitionKey(partitionKey1) .clusteringKey(clusteringKey1) .build(); transaction.get(get1); Put put1 = Put.newBuilder() .namespace("ns") .table("foo") .partitionKey(partitionKey1) .clusteringKey(clusteringKey1) .intValue("v", 100) .build(); transaction.put(put1); // UPDATE foo SET v = 200 WHERE pk = 2 AND ck = 2 Key partitionKey2 = Key.ofInt("pk", 2); Key clusteringKey2 = Key.ofInt("ck", 2); Get get2 = Get.newBuilder() .namespace("ns") .table("foo") .partitionKey(partitionKey2) .clusteringKey(clusteringKey2) .build(); transaction.get(get2); Put put2 = Put.newBuilder() .namespace("ns") .table("foo") .partitionKey(partitionKey2) .clusteringKey(clusteringKey2) .intValue("v", 200) .build(); transaction.put(put2); // Commit Tx transaction.commit(); manager.close(); } }
ScalarDB でレコードを UPDATE した後に、Cassandra に cqlsh で接続し、実際に格納されたレコードの内容を確認してみます。Tx ID の情報を合わせて確認すると、「同じ Tx ID で (同じ Tx で) Cassandra の Partition を跨いだ 2つのレコードが UPDATE されている」ことが確認できます。
cqlsh> SELECT pk, ck, v, tx_id FROM ns.foo; pk | ck | v | tx_id ----+----+-----+-------------------------------------- 1 | 1 | 100 | 67e1ed69-8bee-478d-b9e5-7fb6774e36b3 2 | 2 | 200 | 67e1ed69-8bee-478d-b9e5-7fb6774e36b3 (2 rows)
まとめ
ということで、ScalarDB を使うと「Cassandra 上の Partition が異なるレコードを 1つの Tx で UPDATE できる」ことが確認できました。このような感じで、ScalarDB を使うと NoSQL 上で ACID Transaction を実行できます。
なお、一貫性のある状態でレコードを参照 (SELECT) するためには ScalarDB を使って SELECT (ScalarDB の API としては Get
) する必要があります。今回は、同一 Tx (ID) で更新されていることを確認するために、Cassandra に直接接続してメタデータを含むレコードの情報を参照しましたが、この方法では参照時のデータの一貫性が担保されないので、実際に ScalarDB を使ってアプリを作成する場合はご注意ください。