こたつ&&みかん&&でーたべーす

DB 関連の話を中心に技術っぽい記事を書きます。

ScalarDB を使って Cassandra の Partition を跨いだ Tx を実行してみる

先日 NewSQL/分散SQLデータベース よろず勉強会 #1 にて「Cassandra で Partition を跨いだ Tx を実行できるようにする Accord というものが開発中である」というお話を聞いたのですが、「それって ScalarDB でもできるのでは?」ということでやってみました。

注意

このブログの内容は、あくまで個人の趣味の範囲で ScalarDB を使った検証を実施しているだけですので、あしからず。

また、サンプルとして記載しているコードではエラーハンドリング (エラー発生時の ROLLBACK 等) は考慮していません。実際に ScalarDB を使ってアプリを作る場合はそのあたりも考えないといけないので注意してください。ScalarDB でのエラーハンドリングについては Handle Exceptions に記載されているので、興味がある方はそちらもどうぞ。

環境

今回は以下の環境/バージョンで検証しています。

  • Ubuntu : 20.04 (WSL2)
  • OpenJDK : 17
  • Docker : 20.10.21
  • ScalarDB : 3.7.0

検証内容概要

以下のような感じで、ScalarDB を使って Cassandra 上の (複数パーティションを跨いだ) レコードを更新します。

+----------------------+
|       Java App       |
| +------------------+ |                                +-------------+
| |    Scalar DB     |--------------------------------->| Backend DB  |
| +------------------+ |                                | (Cassandra) |
+----------------------+                                +-------------+

検証

それでは早速検証してみます。

Cassandra 起動

Docker を使って Cassandra を起動します。localhost でアクセスするために -p 9042:9042 オプションを付けておきます。

$ docker run -d --name cassandra -p 9042:9042 cassandra:3.11.14
$ docker ps
CONTAINER ID   IMAGE               COMMAND                  CREATED          STATUS          PORTS                                                                          NAMES
196d8d087453   cassandra:3.11.14   "docker-entrypoint.s…"   23 seconds ago   Up 22 seconds   7000-7001/tcp, 7199/tcp, 9160/tcp, 0.0.0.0:9042->9042/tcp, :::9042->9042/tcp   cassandra

テーブル作成

今回は以下のようなテーブル foo をネームスペース ns 上に作成します。(実際に Cassandra 上に作成されるテーブルには ScalarDB 用のメタデータを格納するカラムも含まれます。)

+---------------------+------+
| columnName          | type |
+---------------------+------+
| pk (Partition Key)  | INT  |
| ck (Clustering Key) | INT  |
| v                   | INT  |
+---------------------+------+

上記のようなテーブルを、ScalarDB の Administrative API を使って作成します。具体的なコードは以下の通りです。

package com.example;

import java.util.Properties;

import com.scalar.db.api.DistributedTransactionAdmin;
import com.scalar.db.api.TableMetadata;
import com.scalar.db.io.DataType;
import com.scalar.db.service.TransactionFactory;
import com.scalar.db.api.Scan;

public class CreateSchemaOnCassandra {
    public static void main(String[] args) throws Exception {

        Properties properties = new Properties();
        properties.setProperty("scalar.db.contact_points", "localhost");
        properties.setProperty("scalar.db.contact_port", "9042");
        properties.setProperty("scalar.db.storage", "cassandra");
        properties.setProperty("scalar.db.username", "cassandra");
        properties.setProperty("scalar.db.password", "cassandra");

        TransactionFactory transactionFactory = TransactionFactory.create(properties);
        DistributedTransactionAdmin admin = transactionFactory.getTransactionAdmin();

        boolean ifNotExist = true;
        // Create coordinator tables for ScalarDB
        admin.createCoordinatorTables(ifNotExist);
        // Create namespace
        admin.createNamespace("ns", ifNotExist);

        // Define a table metadata
        TableMetadata tableMetadata =
            TableMetadata.newBuilder()
                .addColumn("pk", DataType.INT)
                .addColumn("ck", DataType.TEXT)
                .addColumn("v", DataType.BIGINT)
                .addPartitionKey("pk")
                .addClusteringKey("ck", Scan.Ordering.Order.ASC)
                .build();

        // Create table
        admin.createTable("ns", "foo", tableMetadata, ifNotExist);

        admin.close();
    }
}

Cassandra に cqlsh で接続し、作成されたテーブルの情報を直接参照すると以下のようになっています。ScalarDB 関連のメタデータを含むカラムも併せて定義されていますが、pk が Partition Key、ck が Clustering Key として定義されていることが確認できます。

$ docker exec -it cassandra cqlsh
Connected to Test Cluster at 127.0.0.1:9042.
[cqlsh 5.0.1 | Cassandra 3.11.14 | CQL spec 3.4.4 | Native protocol v4]
Use HELP for help.
cqlsh>
cqlsh> DESCRIBE TABLE ns.foo;

CREATE TABLE ns.foo (
    pk int,
    ck int,
    before_tx_committed_at bigint,
    before_tx_id text,
    before_tx_prepared_at bigint,
    before_tx_state int,
    before_tx_version int,
    before_v int,
    tx_committed_at bigint,
    tx_id text,
    tx_prepared_at bigint,
    tx_state int,
    tx_version int,
    v int,
    PRIMARY KEY (pk, ck)
) WITH CLUSTERING ORDER BY (ck ASC)
    AND bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99PERCENTILE';

cqlsh>

初期データの INSERT

検証用に以下のようなレコード (Partition Key が異なるレコード) を 2つ INSERT します。

+----+----+---+
| pk | ck | v |
+----+----+---+
| 1  | 1  | 1 |
| 2  | 2  | 2 |
+----+----+---+

上記のようなデータを、ScalarDB の Transactional API を使って INSERT します (ScalarDB の API としては Put を使います)。具体的なコードは以下の通りです。

package com.example;

import java.util.Properties;

import com.scalar.db.api.DistributedTransaction;
import com.scalar.db.api.DistributedTransactionManager;
import com.scalar.db.api.Put;
import com.scalar.db.io.Key;
import com.scalar.db.service.TransactionFactory;

public class InsertRecordOnCassandra {
    public static void main(String[] args) throws Exception {

        Properties properties = new Properties();
        properties.setProperty("scalar.db.contact_points", "localhost");
        properties.setProperty("scalar.db.contact_port", "9042");
        properties.setProperty("scalar.db.storage", "cassandra");
        properties.setProperty("scalar.db.username", "cassandra");
        properties.setProperty("scalar.db.password", "cassandra");

        TransactionFactory transactionFactory = TransactionFactory.create(properties);
        DistributedTransactionManager manager = transactionFactory.getTransactionManager();

        // Begin Tx
        DistributedTransaction transaction = manager.begin();

        // INSERT INTO ns.foo (pk, ck, v) VALUES (1, 1, 1)
        Key partitionKey1 = Key.ofInt("pk", 1);
        Key clusteringKey1 = Key.ofInt("ck", 1);

        Put put1 = 
            Put.newBuilder()
                .namespace("ns")
                .table("foo")
                .partitionKey(partitionKey1)
                .clusteringKey(clusteringKey1)
                .intValue("v", 1)
                .build();

        transaction.put(put1);

        // INSERT INTO ns.foo (pk, ck, v) VALUES (2, 2, 2)
        Key partitionKey2 = Key.ofInt("pk", 2);
        Key clusteringKey2 = Key.ofInt("ck", 2);

        Put put2 = 
            Put.newBuilder()
                .namespace("ns")
                .table("foo")
                .partitionKey(partitionKey2)
                .clusteringKey(clusteringKey2)
                .intValue("v", 2)
                .build();

        transaction.put(put2);

        // Commit Tx
        transaction.commit();

        manager.close();
    }
}

Cassandra に cqlsh で接続し、実際に格納されたレコードの内容を確認しておきます。Partition が異なる (Partition Key が異なる) レコードが 2つ INSERT されていることが確認できます。

cqlsh> SELECT pk, ck, v FROM ns.foo;

 pk | ck | v
----+----+---
  1 |  1 | 1
  2 |  2 | 2

(2 rows)

実は、この時点でこの 2つのレコード (Partition が異なるレコード) は ScalarDB の Tx (同一の Tx) で INSERT されています。ScalarDB は Tx 用のメタデータをレコード内に保持する仕組みになっているため、Tx ID の情報を合わせて確認すると、「同じ Tx ID で (同じ Tx で) 2つのレコードが INSERT されている」ことが確認できます。

cqlsh> SELECT pk, ck, v, tx_id FROM ns.foo;

 pk | ck | v | tx_id
----+----+---+--------------------------------------
  1 |  1 | 1 | 53da60ca-5879-4e85-88d2-a7580d68e820
  2 |  2 | 2 | 53da60ca-5879-4e85-88d2-a7580d68e820

(2 rows)

Partition を跨いだレコードの UPDATE

さて、INSERT の時点で ScalarDB を利用して「Cassandra の『Partition を跨いだ複数のレコード』を『同一 Tx』で更新」できることがほぼ確認できてしまいましたが、せっかくなので UPDATE も実行してみます。先ほど INSERT したレコードのカラム v の値を以下のように UPDATE します。

+----+----+-----+
| pk | ck |  v  |
+----+----+-----+
| 1  | 1  | 100 |
| 2  | 2  | 200 |
+----+----+-----+

上記のようなデータを、ScalarDB の Transactional API を使って UPDATE します (ScalarDB の API としては Put を使います)。具体的なコードは以下の通りです。

※ScalarDB で既存のレコードを更新 (Put) する場合は、一度対象となるレコードを参照 (Get) する必要があります。このあたりの動作については こちらのスライド に記載されているので、興味がある方はこのスライドも参照してみてください。

package com.example;

import java.util.Properties;

import com.scalar.db.api.DistributedTransaction;
import com.scalar.db.api.DistributedTransactionManager;
import com.scalar.db.api.Get;
import com.scalar.db.api.Put;
import com.scalar.db.io.Key;
import com.scalar.db.service.TransactionFactory;

public class UpdateOnCassandra {
    public static void main(String[] args) throws Exception {

        Properties properties = new Properties();
        properties.setProperty("scalar.db.contact_points", "localhost");
        properties.setProperty("scalar.db.contact_port", "9042");
        properties.setProperty("scalar.db.storage", "cassandra");
        properties.setProperty("scalar.db.username", "cassandra");
        properties.setProperty("scalar.db.password", "cassandra");

        TransactionFactory transactionFactory = TransactionFactory.create(properties);
        DistributedTransactionManager manager = transactionFactory.getTransactionManager();

        // Begin Tx
        DistributedTransaction transaction = manager.begin();

        // UPDATE foo SET v = 100 WHERE pk = 1 AND ck = 1
        Key partitionKey1 = Key.ofInt("pk", 1);
        Key clusteringKey1 = Key.ofInt("ck", 1);

        Get get1 = 
            Get.newBuilder()
                .namespace("ns")
                .table("foo")
                .partitionKey(partitionKey1)
                .clusteringKey(clusteringKey1)
                .build();
        transaction.get(get1);

        Put put1 = 
            Put.newBuilder()
                .namespace("ns")
                .table("foo")
                .partitionKey(partitionKey1)
                .clusteringKey(clusteringKey1)
                .intValue("v", 100)
                .build();
        transaction.put(put1);

        // UPDATE foo SET v = 200 WHERE pk = 2 AND ck = 2
        Key partitionKey2 = Key.ofInt("pk", 2);
        Key clusteringKey2 = Key.ofInt("ck", 2);
        Get get2 = 
            Get.newBuilder()
                .namespace("ns")
                .table("foo")
                .partitionKey(partitionKey2)
                .clusteringKey(clusteringKey2)
                .build();
        transaction.get(get2);

        Put put2 = 
            Put.newBuilder()
                .namespace("ns")
                .table("foo")
                .partitionKey(partitionKey2)
                .clusteringKey(clusteringKey2)
                .intValue("v", 200)
                .build();
        transaction.put(put2);

        // Commit Tx
        transaction.commit();

        manager.close();
    }
}

ScalarDB でレコードを UPDATE した後に、Cassandra に cqlsh で接続し、実際に格納されたレコードの内容を確認してみます。Tx ID の情報を合わせて確認すると、「同じ Tx ID で (同じ Tx で) Cassandra の Partition を跨いだ 2つのレコードが UPDATE されている」ことが確認できます。

cqlsh> SELECT pk, ck, v, tx_id FROM ns.foo;

 pk | ck | v   | tx_id
----+----+-----+--------------------------------------
  1 |  1 | 100 | 67e1ed69-8bee-478d-b9e5-7fb6774e36b3
  2 |  2 | 200 | 67e1ed69-8bee-478d-b9e5-7fb6774e36b3

(2 rows)

まとめ

ということで、ScalarDB を使うと「Cassandra 上の Partition が異なるレコードを 1つの Tx で UPDATE できる」ことが確認できました。このような感じで、ScalarDB を使うと NoSQL 上で ACID Transaction を実行できます。

なお、一貫性のある状態でレコードを参照 (SELECT) するためには ScalarDB を使って SELECT (ScalarDB の API としては Get) する必要があります。今回は、同一 Tx (ID) で更新されていることを確認するために、Cassandra に直接接続してメタデータを含むレコードの情報を参照しましたが、この方法では参照時のデータの一貫性が担保されないので、実際に ScalarDB を使ってアプリを作成する場合はご注意ください。