Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                

CLOVER🍀

That was when it all began.

Infinispan 14.0の新しいHot Rod Client APIを試して諦めたという話

これは、なにをしたくて書いたもの?

先日、Infinispan 14.0.0.Finalがリリースされました。

Infinispan 14.0.0.Final

更新内容の中に、新しいHot Rod Clientが含まれているというので試してみました。

結論から言うと、現時点では従来のHot Rod Clientを使っていた方が良さそうです。
Infinispan 14.0.0.Finalの時点でこのページを見た人は、読むのはここまでにしておいた方が無難かもですね。

エントリー自体を書くのをやめてもよかったのですが、ソースコードもいろいろ見たのでせっかくなので書き残しておいて今後の
アップデート時に見返したいなと思いまして。

久しぶりにInfinispanの大きめの新機能を触って、「いろいろ踏み抜いたなー」という気分になりました。前もちょいちょい踏んでいたので、
やや懐かしい感が。

新しいHot Rod Client

こちらのブログには、新しいHot Rod Clientについて以下のように紹介されています。

  • 完全に再設計した新しいHot Rod Client
  • プログラミングモデルは、同期、非同期、MutinyのAPIの中から選択する

Infinispan 14.0.0.Final

サンプルコードは以下のように紹介されています。
※間違ってそうとか、動かないとかいろいろあるんですが…

try (SyncContainer infinispan = Infinispan.create("hotrod://localhost")) {
    // Sync
    SyncCache<String, String> mycache = infinispan.sync().caches().get("mycache");
    mycache.set("key", "value");
    String value = mycache.get("key");
    // set with options
    mycache.set("key", "anothervalue", writeOptions().lifespan(Duration.ofHours(1)).timeout(Duration.ofMillis(500)).build());

    // Async
    infinispan.async().caches()
        .get("mycache").thenApply(c ->
            c.set("key", "value").thenApply(ignore ->
                c.get("key").thenApply(value ->
                    c.set("key", "anothervalue",
                                writeOptions().lifespan(Duration.ofHours(1)).timeout(Duration.ofMillis(500)).build()))
    ));

    // Mutiny
    infinispan.mutiny().caches()
        .get("mycache").map(c ->
            c.query("age > :age").param("age", 80).skip(5).limit(10).find())
            .subscribe().with(System.out::println);
}

新しいAPIについては、14.0.0.Dev03のリリース時に少し紹介がありました。

Infinispan 14.0.0.Dev03

今回のHot Rod Clientに関係ありそうな範囲は、以下ですね。

  • EmbeddedとRemoteで共通のAPI
  • 同期と非同期、そしてMutiny用のAPIを明確に分離

現時点ではHot Rod Client向けの実装のみが出ていますが、そのうちEmbeddedの方も提供されそうですね。EmbeddedはInfinispan 15での
リリースを目標にしているようですが、どうでしょう。

Infinispan 14.0のドキュメントの一覧はこちらなのですが、新しいHot Rod Clientに関する情報はありません。

Infinispan 14.0 documentation index

Hot Rod Java Clientのドキュメントは、あくまで既存のClient APIに関する説明になります。

Using Hot Rod Java clients

というわけで、今回は実装とブログの内容を見つつ試してみたいと思います。なお、オチは最初に書きました。

新しいHot Rod Clientのモジュールはこちら。

https://github.com/infinispan/infinispan/tree/14.0.0.Final/client/hotrod

テストコードは非同期版のみがあります。

https://github.com/infinispan/infinispan/tree/14.0.0.Final/client/hotrod/src/test/java/org/infinispan/hotrod

従来のHot Rod Clientはこちら。

https://github.com/infinispan/infinispan/tree/14.0.0.Final/client/hotrod-client

環境

今回の環境は、こちら。

$ java --version
openjdk 17.0.4 2022-07-19
OpenJDK Runtime Environment (build 17.0.4+8-Ubuntu-120.04)
OpenJDK 64-Bit Server VM (build 17.0.4+8-Ubuntu-120.04, mixed mode, sharing)


$ mvn --version
Apache Maven 3.8.6 (84538c9988a25aec085021c365c560670ad80f63)
Maven home: $HOME/.sdkman/candidates/maven/current
Java version: 17.0.4, vendor: Private Build, runtime: /usr/lib/jvm/java-17-openjdk-amd64
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "5.4.0-126-generic", arch: "amd64", family: "unix"

Infinispan Serverは、172.17.0.2で動作しているものとします。

$ java --version
openjdk 17.0.4.1 2022-08-12
OpenJDK Runtime Environment Temurin-17.0.4.1+1 (build 17.0.4.1+1)
OpenJDK 64-Bit Server VM Temurin-17.0.4.1+1 (build 17.0.4.1+1, mixed mode, sharing)


$ bin/server.sh --version

Infinispan Server 14.0.0.Final (Flying Saucer)
Copyright (C) Red Hat Inc. and/or its affiliates and other contributors
License Apache License, v. 2.0. http://www.apache.org/licenses/LICENSE-2.0

起動は、以下のコマンドで。

$ bin/server.sh \
    -b 0.0.0.0 \
    -Djgroups.tcp.address=`hostname -i`

準備

まずは、Infinispan Serverの準備を行います。ユーザーとCacheを定義しましょう。

管理CLIで、管理ユーザーとアプリケーションユーザーを作成。

$ bin/cli.sh user create -g admin -p password ispn-admin
$ bin/cli.sh user create -g application -p password ispn-user

管理CLIでログイン。

$ bin/cli.sh -c-
Username: ispn-admin
Password:
[e6c550bca326-45515@cluster//containers/default]>

Distributed Cacheを2つ作成します。

create cache --template=org.infinispan.DIST_SYNC simpleCache
create cache --template=org.infinispan.DIST_SYNC bookCache

定義の確認。

describe caches/simpleCache
{
  "simpleCache" : {
    "distributed-cache" : {
      "mode" : "SYNC",
      "remote-timeout" : "17500",
      "statistics" : true,
      "locking" : {
        "concurrency-level" : "1000",
        "acquire-timeout" : "15000",
        "striping" : false
      },
      "state-transfer" : {
        "timeout" : "60000"
      }
    }
  }
}



describe caches/bookCache
{
  "bookCache" : {
    "distributed-cache" : {
      "mode" : "SYNC",
      "remote-timeout" : "17500",
      "statistics" : true,
      "locking" : {
        "concurrency-level" : "1000",
        "acquire-timeout" : "15000",
        "striping" : false
      },
      "state-transfer" : {
        "timeout" : "60000"
      }
    }
  }
}

こちらを使うように、アプリケーションを作成します。

テストコードの準備

確認は、テストコードで行うことにします。

Maven依存関係等。

    <properties>
        <maven.compiler.source>17</maven.compiler.source>
        <maven.compiler.target>17</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    </properties>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.infinispan</groupId>
                <artifactId>infinispan-bom</artifactId>
                <version>14.0.0.Final</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <dependency>
                <groupId>jakarta.platform</groupId>
                <artifactId>jakarta.jakartaee-bom</artifactId>
                <version>8.0.0</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <dependency>
                <groupId>org.junit</groupId>
                <artifactId>junit-bom</artifactId>
                <version>5.9.1</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <dependency>
            <groupId>org.infinispan</groupId>
            <artifactId>infinispan-hotrod</artifactId>
        </dependency>
        <dependency>
            <groupId>jakarta.transaction</groupId>
            <artifactId>jakarta.transaction-api</artifactId>
        </dependency>
        <dependency>
            <groupId>org.infinispan.protostream</groupId>
            <artifactId>protostream-processor</artifactId>
            <optional>true</optional>
        </dependency>

        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.assertj</groupId>
            <artifactId>assertj-core</artifactId>
            <version>3.23.1</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.22.2</version>
            </plugin>
        </plugins>
    </build>

せっかくなので、ProtoStreamも使うことにします。

エンティティクラス。

src/main/java/org/littlewings/infinispan/remote/newclient/Book.java

package org.littlewings.infinispan.remote.newclient;

import org.infinispan.protostream.annotations.ProtoFactory;
import org.infinispan.protostream.annotations.ProtoField;
import org.infinispan.protostream.descriptors.Type;

public class Book {
    @ProtoField(number = 1)
    String isbn;

    @ProtoField(number = 2)
    String title;

    @ProtoField(number = 3, type = Type.INT32, defaultValue = "0")
    int price;

    @ProtoFactory
    public static Book create(String isbn, String title, int price) {
        Book book = new Book();
        book.setIsbn(isbn);
        book.setTitle(title);
        book.setPrice(price);

        return book;
    }

    // getter/settterは省略
}

MarshallerおよびProtocol BuffersのIDLを生成するためのSerializationContextInitializerインターフェースのサブインターフェースを作成します。
今はそのサブインターフェースであるGeneratedSchemaインターフェースを継承するのが良さそうですが。

src/main/java/org/littlewings/infinispan/remote/newclient/EntitiesInitializer.java

package org.littlewings.infinispan.remote.newclient;

import org.infinispan.protostream.GeneratedSchema;
import org.infinispan.protostream.annotations.AutoProtoSchemaBuilder;

@AutoProtoSchemaBuilder(
        includeClasses = {
                Book.class
        },
        schemaFileName = "entities.proto",
        schemaFilePath = "proto/",
        schemaPackageName = "remote_newclient")
public interface EntitiesInitializer extends GeneratedSchema {
}

テストコードの雛形。

src/test/java/org/littlewings/infinispan/remote/newclient/HotRodNewClientTest.java

package org.littlewings.infinispan.remote.newclient;

import java.net.URI;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.stream.IntStream;

import org.infinispan.api.Infinispan;
import org.infinispan.api.async.AsyncCache;
import org.infinispan.api.async.AsyncContainer;
import org.infinispan.api.mutiny.MutinyContainer;
import org.infinispan.api.sync.SyncCache;
import org.infinispan.api.sync.SyncContainer;
import org.infinispan.api.sync.events.cache.SyncCacheContinuousQueryListener;
import org.infinispan.hotrod.configuration.HotRodConfiguration;
import org.infinispan.hotrod.configuration.HotRodConfigurationBuilder;
import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

public class HotRodNewClientTest {

    // ここに、テストを書く!
}

では、テストを書いていきましょう。

Infinispan Serverに接続する

まずはInfinispan Serverに接続する必要があります。以下のようなソースコードになります(Cacheは同期Cacheを使っています)。

    @Test
    public void connectInfinispanServerUsingURI() {
        String uriString = "hotrod://ispn-user:password@172.17.0.2:11222";
        URI uri = URI.create(uriString);

        try (Infinispan infinispan = Infinispan.create(uri);
             // または
             // try (Infinispan infinispan = Infinispan.create(uriString)) {
             SyncContainer container = infinispan.sync()) {
            SyncCache<String, String> cache =
                    container
                            .caches()
                            .get("simpleCache");

            cache.clear();
        }
    }

Infinispan#createに接続URIStringまたはjava.net.URIとして渡すと、Infinispanインスタンスが返ってきます。

        try (Infinispan infinispan = Infinispan.create(uri);
             // または
             // try (Infinispan infinispan = Infinispan.create(uriString)) {

ここから、使いたいAPIの種類に応じてメソッドを呼び出し、コンテナを取得します。メソッドは、同期ならsync、非同期ならasync
Mutinyならmutinyです。

今回は、同期APIを使っています。

             SyncContainer container = infinispan.sync()) {

Infinispanインスタンスおよびコンテナのインスタンスは、終了時にcloseします。
コンテナも、呼び出したメソッドに応じてSyncContainerAsyncContainerMutinyContainerの3種類のいずれかが返ってきます。

また、接続URI以外にもConfigurationインスタンスを渡すことで、Infinispan Serverへ接続できます。

    @Test
    public void connectInfinispanServerUsingConfiguration() {
        HotRodConfiguration configuration =
                new HotRodConfigurationBuilder()
                        .addServers("172.17.0.2:11222")
                        .security()
                        .authentication()
                        .username("ispn-user")
                        .password("password".toCharArray())
                        .build();

        try (Infinispan infinispan = Infinispan.create(configuration);
             SyncContainer container = infinispan.sync()) {
            SyncCache<String, String> cache =
                    container
                            .caches()
                            .get("simpleCache");

            cache.clear();
        }
    }

同期APIを使う

Infinispan Serverへの接続方法がわかったところで、APIを使っていってみましょう。まずは同期APIから。

    @Test
    public void simpleSyncCache() {
        URI uri = URI.create("hotrod://ispn-user:password@172.17.0.2:11222");

        try (Infinispan infinispan = Infinispan.create(uri);
             SyncContainer container = infinispan.sync()) {
            SyncCache<String, String> cache =
                    container
                            .caches()
                            .get("simpleCache");

            IntStream
                    .rangeClosed(1, 100)
                    .forEach(i -> cache.set("key" + i, "value" + i));

            assertThat(cache.get("key1")).isEqualTo("value1");
            assertThat(cache.get("key50")).isEqualTo("value50");
            assertThat(cache.get("key100")).isEqualTo("value100");

            cache.clear();

            assertThat(cache.get("key1")).isNull();
            assertThat(cache.get("key50")).isNull();
            assertThat(cache.get("key100")).isNull();
        }
    }

接続方法を紹介した時に少し出てきていましたが、SyncContainer#cachesからSyncCaches#getSyncCacheを取得します。

            SyncCache<String, String> cache =
                    container
                            .caches()
                            .get("simpleCache");

次に、自分で作成したエンティティを使ってみます。

    @Test
    public void bookSyncCache() {
        // URIではPropertiesの部分は読まない
        //URI uri = URI.create("hotrod://ispn-user:password@172.17.0.2:11222?context-initializers=org.littlewings.infinispan.remote.newclient.EntitiesInitializerImpl");

        HotRodConfiguration configuration =
                new HotRodConfigurationBuilder()
                        .addServers("172.17.0.2:11222")
                        .addContextInitializer("org.littlewings.infinispan.remote.newclient.EntitiesInitializerImpl")
                        .security()
                        .authentication()
                        .username("ispn-user")
                        .password("password".toCharArray())
                        .build();

        try (Infinispan infinispan = Infinispan.create(configuration);
             SyncContainer container = infinispan.sync()) {
            SyncCache<String, Book> cache =
                    container
                            .caches()
                            .get("bookCache");

            List<Book> books = List.of(
                    Book.create("978-1782169970", "Infinispan Data Grid Platform Definitive Guide", 5344),
                    Book.create("978-1785285332", "Getting Started with Hazelcast - Second Edition", 5484),
                    Book.create("978-0359439379", "The Apache Ignite Book", 9964),
                    Book.create("978-1783988181", "Mastering Redis", 8719),
                    Book.create("978-1492080510", "High Performance MySQL", 6428)
            );

            books.forEach(b -> cache.set(b.getIsbn(), b));

            assertThat(cache.get("978-1782169970").getTitle())
                    .isEqualTo("Infinispan Data Grid Platform Definitive Guide");
            assertThat(cache.get("978-1782169970").getPrice())
                    .isEqualTo(5344);
            assertThat(cache.get("978-0359439379").getTitle())
                    .isEqualTo("The Apache Ignite Book");
            assertThat(cache.get("978-0359439379").getPrice())
                    .isEqualTo(9964);

            cache.clear();

            assertThat(cache.get("978-1782169970")).isNull();
            assertThat(cache.get("978-1782169970")).isNull();
        }
    }

驚くことに、こちらは接続URIではなくConfigurationインスタンスを使わないとうまく動作しません…。

APIの紹介や、うまく動かなかったところは最後にまとめて書きますね。

非同期API

次は、非同期API

    @Test
    public void simpleAsyncCache() {
        URI uri = URI.create("hotrod://ispn-user:password@172.17.0.2:11222");

        try (Infinispan infinispan = Infinispan.create(uri);
             AsyncContainer container = infinispan.async()) {
            AsyncCache<String, String> cache =
                    container
                            .caches()
                            .<String, String>get("simpleCache")
                            .toCompletableFuture()
                            .join();

            IntStream
                    .rangeClosed(1, 100)
                    .<CompletionStage<?>>mapToObj(i -> cache.set("key" + i, "value" + i))
                    .map(CompletionStage::toCompletableFuture)
                    .forEach(CompletableFuture::join);

            cache
                    .get("key1")
                    .thenAccept(value -> assertThat(value).isEqualTo("value1"))
                    .thenCompose(v -> cache.get("key50"))
                    .thenAccept(value -> assertThat(value).isEqualTo("value50"))
                    .thenCompose(v -> cache.get("key100"))
                    .thenAccept(value -> assertThat(value).isEqualTo("value100"))
                    .toCompletableFuture()
                    .join();

            cache.clear().toCompletableFuture().join();

            cache
                    .get("key1")
                    .thenAccept(value -> assertThat(value).isNull())
                    .thenCompose(v -> cache.get("key50"))
                    .thenAccept(value -> assertThat(value).isNull())
                    .thenCompose(v -> cache.get("key100"))
                    .thenAccept(value -> assertThat(value).isNull())
                    .toCompletableFuture()
                    .join();
        }
    }

こちらは、Infinispan#asyncを使用してAsyncContainerを取得します。

             AsyncContainer container = infinispan.async()) {
            AsyncCache<String, String> cache =
                    container
                            .caches()
                            .<String, String>get("simpleCache")
                            .toCompletableFuture()
                            .join();

あとは、各操作の戻り値がCompletionStageとなっている点に注意しつつ(entriesなどのまとまって操作を行うメソッドはFlow.Publisher
なっているものもあります)、操作を行います。

エンティティを使った例。

    @Test
    public void bookAsyncCache() {
        HotRodConfiguration configuration =
                new HotRodConfigurationBuilder()
                        .addServers("172.17.0.2:11222")
                        .addContextInitializer("org.littlewings.infinispan.remote.newclient.EntitiesInitializerImpl")
                        .security()
                        .authentication()
                        .username("ispn-user")
                        .password("password".toCharArray())
                        .build();

        try (Infinispan infinispan = Infinispan.create(configuration);
             AsyncContainer container = infinispan.async()) {
            AsyncCache<String, Book> cache =
                    container
                            .caches()
                            .<String, Book>get("bookCache")
                            .toCompletableFuture()
                            .join();

            List<Book> books = List.of(
                    Book.create("978-1782169970", "Infinispan Data Grid Platform Definitive Guide", 5344),
                    Book.create("978-1785285332", "Getting Started with Hazelcast - Second Edition", 5484),
                    Book.create("978-0359439379", "The Apache Ignite Book", 9964),
                    Book.create("978-1783988181", "Mastering Redis", 8719),
                    Book.create("978-1492080510", "High Performance MySQL", 6428)
            );

            books
                    .stream()
                    .map(b -> cache.set(b.getIsbn(), b).toCompletableFuture())
                    .forEach(CompletableFuture::join);

            cache
                    .get("978-1782169970")
                    .thenAccept(b -> {
                        assertThat(b.getTitle()).isEqualTo("Infinispan Data Grid Platform Definitive Guide");
                        assertThat(b.getPrice()).isEqualTo(5344);
                    })
                    .thenCompose(v -> cache.get("978-0359439379"))
                    .thenAccept(b -> {
                        assertThat(b.getTitle()).isEqualTo("The Apache Ignite Book");
                        assertThat(b.getPrice()).isEqualTo(9964);
                    })
                    .toCompletableFuture()
                    .join();

            cache.clear().toCompletableFuture().join();

            cache
                    .get("978-1782169970")
                    .thenAccept(b -> assertThat(b).isNull())
                    .thenCompose(v -> cache.get("978-0359439379"))
                    .thenAccept(b -> assertThat(b).isNull())
                    .toCompletableFuture()
                    .join();
        }
    }

Mutiny API

最後はMutiny…正確にはSmallRye Mutinyですが、こちらは以下になります。

    @Test
    public void simpleMutinyCache() {
        URI uri = URI.create("hotrod://ispn-user:password@172.17.0.2:11222");

        try (Infinispan infinispan = Infinispan.create(uri);
             MutinyContainer container = infinispan.mutiny()) {
            assertThatThrownBy(() ->
                    container
                            .caches()
                            .<String, String>get("simpleCache")
                            .await()
                            .indefinitely()
            )
                    .isInstanceOf(ClassCastException.class)
                    .hasMessage("class java.util.concurrent.CompletableFuture cannot be cast to class org.infinispan.hotrod.impl.cache.RemoteCache (java.util.concurrent.CompletableFuture is in module java.base of loader 'bootstrap'; org.infinispan.hotrod.impl.cache.RemoteCache is in unnamed module of loader 'app')");
        }
    }

残念ですが、現時点ではMutinyCacheを取得する際に失敗します。

追記) Infinispan 14.0.4.Finalで修正されたので、追加のエントリーを書きました

Infinispan 14.0の新しいHot Rod Client APIのMutiny版を試す - CLOVER🍀

新しいAPIについて

ここからは、新しいAPIについていろいろ書いていきたいと思います。

infinispan-apiモジュール

今回、Infinispanに関するimport文のほとんどはorg.infinispan.apiのものでした。

import org.infinispan.api.Infinispan;
import org.infinispan.api.async.AsyncCache;
import org.infinispan.api.async.AsyncContainer;
import org.infinispan.api.mutiny.MutinyContainer;
import org.infinispan.api.sync.SyncCache;
import org.infinispan.api.sync.SyncContainer;
import org.infinispan.api.sync.events.cache.SyncCacheContinuousQueryListener;
import org.infinispan.hotrod.configuration.HotRodConfiguration;
import org.infinispan.hotrod.configuration.HotRodConfigurationBuilder;

これは、infinispan-apiというモジュールです。

https://github.com/infinispan/infinispan/tree/14.0.0.Final/api

README.adocを見ると、どうも作成途中であることと、実験的でInfinispan 10で変更される可能性がある旨が書かれています。

https://github.com/infinispan/infinispan/blob/14.0.0.Final/api/README.adoc

Infinispan 10の時点で、このモジュールはあったんですよね。

https://github.com/infinispan/infinispan/tree/10.0.0.Final/api

これ、Infinispan 10の時に1度作成された新しいAPIが再度形を変えたものな気がしますね。

Infinispan 10.0.0.Beta4

API 2.0という扱いのようです(以前のこのチケットのタイトルは「New Reactive API」でした)。

[ISPN-9893] API 2.0 - Red Hat Issue Tracker

そして、Infinispan 10時点の新しいAPIのモジュールは空っぽになっています。

https://github.com/infinispan/infinispan/tree/14.0.0.Final/client/infinispan-key-value-store-hotrod

というわけで、今回のモジュールは過去のAPIモジュールが再構築されたものである、といえそうです。

Infinispan#create

今回のAPIは、Infinispan#createURIConfigurationを渡すと、実体がロードされるようになっています。

        try (Infinispan infinispan = Infinispan.create(uri);

        // または
        try (Infinispan infinispan = Infinispan.create(configuration);

これは、Service Loaderの仕組みで実現しているようです。

https://github.com/infinispan/infinispan/blob/14.0.0.Final/api/src/main/java/org/infinispan/api/Infinispan.java#L34-L35

https://github.com/infinispan/infinispan/blob/14.0.0.Final/api/src/main/java/org/infinispan/api/Infinispan.java#L48-L49

JDBCドライバーあたりと同じですね。

構成

Infinispan#syncInfinispan#asyncInfinispan#mutinyを呼び出すと、呼び出したメソッドに応じたContainerが取得できます。

   /**
    * Returns a synchronous version of the Infinispan API
    *
    * @return
    */
   SyncContainer sync();

   /**
    * Returns an asynchronous version of the Infinispan API
    *
    * @return
    */
   AsyncContainer async();

   /**
    * Returns a mutiny version of the Infinispan API
    *
    * @return
    */
   MutinyContainer mutiny();

https://github.com/infinispan/infinispan/blob/14.0.0.Final/api/src/main/java/org/infinispan/api/Infinispan.java#L57-L76

ContainerからはCacheMultiMapStrongCounterWeakCounterLockといった様々なデータ構造を取得することができ、
エントリーポイントになっていると言えます。

SyncContainer

public interface SyncContainer extends Infinispan {

   SyncCaches caches();

   SyncMultiMaps multiMaps();

   SyncStrongCounters strongCounters();

   SyncWeakCounters weakCounters();

   SyncLocks locks();

   void listen(SyncContainerListener listener, ContainerListenerEventType... types);

   <T> T batch(Function<SyncContainer, T> function);
}

https://github.com/infinispan/infinispan/blob/14.0.0.Final/api/src/main/java/org/infinispan/api/sync/SyncContainer.java

AsyncContainer

public interface AsyncContainer extends Infinispan {

   AsyncCaches caches();

   AsyncMultiMaps multiMaps();

   AsyncStrongCounters strongCounters();

   AsyncWeakCounters weakCounters();

   AsyncLocks locks();

   Flow.Publisher<ContainerEvent> listen(ContainerListenerEventType... types);

   <T> CompletionStage<T> batch(Function<AsyncContainer, CompletionStage<T>> function);
}

https://github.com/infinispan/infinispan/blob/14.0.0.Final/api/src/main/java/org/infinispan/api/async/AsyncContainer.java

MutinyContainer

public interface MutinyContainer extends Infinispan {
   MutinyCaches caches();

   MutinyMultiMaps multiMaps();

   MutinyStrongCounters strongCounters();

   MutinyWeakCounters weakCounters();

   MutinyLocks locks();

   /**
    * @param types
    * @return
    */
   Multi<ContainerEvent> listen(ContainerListenerEventType... types);

   <R> Uni<R> execute(String name, Object... args);

   <T> Uni<T> batch(Function<MutinyContainer, Uni<T>> function);
}

https://github.com/infinispan/infinispan/blob/14.0.0.Final/api/src/main/java/org/infinispan/api/mutiny/MutinyContainer.java

今回使ったのはCacheのみですが。

そして、各Containerが配置されているパッケージ内に、各種データ構造に対するインターフェースが配置されています。

https://github.com/infinispan/infinispan/tree/14.0.0.Final/api/src/main/java/org/infinispan/api/sync

https://github.com/infinispan/infinispan/tree/14.0.0.Final/api/src/main/java/org/infinispan/api/async

https://github.com/infinispan/infinispan/tree/14.0.0.Final/api/src/main/java/org/infinispan/api/mutiny

Cacheのインターフェース。ついに、java.util.Mapインターフェースを実装しなくなりましたね。

https://github.com/infinispan/infinispan/blob/14.0.0.Final/api/src/main/java/org/infinispan/api/sync/SyncCache.java

https://github.com/infinispan/infinispan/blob/14.0.0.Final/api/src/main/java/org/infinispan/api/async/AsyncCache.java

https://github.com/infinispan/infinispan/blob/14.0.0.Final/api/src/main/java/org/infinispan/api/mutiny/MutinyCache.java

実装は、別モジュール(今回はHot Rod Clientであるinfinispan-hotrod)に含まれています。

同期、非同期、Mutinyの実現

今回のベースと成るAPIは、同期、非同期、Mutinyの3種類で提供されます。

インターフェースはinfinispan-apiモジュールにありましたが、実装はどういった形で実現しているんでしょうか。

実装を見ていると、基本的には非同期およびRxJavaがベースになっています。つまりHotRodAsyncCacheが1番オリジナルに近いです。

https://github.com/infinispan/infinispan/blob/14.0.0.Final/client/hotrod/src/main/java/org/infinispan/hotrod/HotRodAsyncCache.java

SyncCacheの場合は、CompletableFuture#getで同期的に動いているように見せかけています。

https://github.com/infinispan/infinispan/blob/14.0.0.Final/client/hotrod/src/main/java/org/infinispan/hotrod/HotRodSyncCache.java

https://github.com/infinispan/infinispan/blob/14.0.0.Final/client/hotrod/src/main/java/org/infinispan/hotrod/impl/Util.java#L44-L76

MutinyCacheの場合はCompletableFutureUniへ、Flow.PublisherMultiに変換することでSmallRye MutinyのAPIとして
見せています。

https://github.com/infinispan/infinispan/blob/14.0.0.Final/client/hotrod/src/main/java/org/infinispan/hotrod/HotRodMutinyCache.java

なお、いずれのCacheRemoteCacheというインターフェースが実体になっていて、こちらがCompletionStageFlow.Publisher
APIの基盤として使用しています。

https://github.com/infinispan/infinispan/blob/14.0.0.Final/client/hotrod/src/main/java/org/infinispan/hotrod/impl/cache/RemoteCacheImpl.java

ちなみに、このようなCompletableFutureFlow.Publisherをベースにしているものの、同期呼び出しはCompletableFutureの待ち合わせで
実現するといった方法は、既存のHot Rod Clientも同じです。

https://github.com/infinispan/infinispan/blob/14.0.0.Final/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/RemoteCacheSupport.java

https://github.com/infinispan/infinispan/blob/14.0.0.Final/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/RemoteCacheImpl.java

新しいHot Rod Clientと既存のHot Rod Clientの関係

現時点で2系統のHot Rod Clientモジュールがあるわけですが、両者の関係はというと、関係ありません。ブログに書いてあるとおり、
新しいHot Rod Clientは完全に再設計しているようです。

こちらが新しいHot Rod Client。

https://github.com/infinispan/infinispan/tree/14.0.0.Final/client/hotrod

こちらが既存のHot Rod Client。

https://github.com/infinispan/infinispan/tree/14.0.0.Final/client/hotrod-client

最初は従来のHot Rod Clientのラッパーになっていたりするのでは?とも思いましたが、そんなことはありませんでした。

まだまだできないことが多い

RemoteCacheImplを見ているとわかりますが、UnsupportedOperationExceptionを投げてくるメソッドがそれなりにあります。

https://github.com/infinispan/infinispan/blob/14.0.0.Final/client/hotrod/src/main/java/org/infinispan/hotrod/impl/cache/RemoteCacheImpl.java

Configurationの取得、Near Cacheへのリスナーの追加、Cacheのサイズ計算、リスナーの追加、Entry Processorトランザクションなど。

また、SyncCacheのみできないこともあります。

https://github.com/infinispan/infinispan/blob/14.0.0.Final/client/hotrod/src/main/java/org/infinispan/hotrod/HotRodSyncCache.java

確認のコード。

    @Test
    public void simpleSyncCacheUnsupportedOperation() {
        URI uri = URI.create("hotrod://ispn-user:password@172.17.0.2:11222");

        try (Infinispan infinispan = Infinispan.create(uri);
             SyncContainer container = infinispan.sync()) {
            SyncCache<String, String> cache =
                    container
                            .caches()
                            .get("simpleCache");

            assertThatThrownBy(() -> cache.entries())
                    .isInstanceOf(UnsupportedOperationException.class);
            assertThatThrownBy(() -> cache.keys())
                    .isInstanceOf(UnsupportedOperationException.class);
            assertThatThrownBy(() -> cache.listen(new SyncCacheContinuousQueryListener<>() {
            }))
                    .isInstanceOf(UnsupportedOperationException.class);

            assertThatThrownBy(() -> cache.estimateSize())
                    .isInstanceOf(UnsupportedOperationException.class);

        }
    }

ハマったこと

ここでは、ハマったことを少し書いておきます。

依存関係にjakarta.transaction-apiが必要

最初、依存関係にinfinispan-hotrodprotostream-processorだけを足して試していたのですが、Infinispan Serverに接続できずに
苦労しました。

        <dependency>
            <groupId>org.infinispan</groupId>
            <artifactId>infinispan-hotrod</artifactId>
        </dependency>
        <dependency>
            <groupId>org.infinispan.protostream</groupId>
            <artifactId>protostream-processor</artifactId>
            <optional>true</optional>
        </dependency>

URIを指定しての接続方法だと、ログも出ておらず「なんだろう?」と思ってトレースしたのですが、javax.transactionパッケージのクラスに
対してClassNotFoundExceptionを起こしていたようです。

org.infinispan.api.exception.InfinispanConfigurationException: No factory to handle URI hotrod://ispn-user:password@172.17.0.2:11222

    at org.infinispan.api.Infinispan.create(Infinispan.java:40)

後から気づきましたが、Configurationを渡す方だともっとハデにコケてくれました…。

java.lang.NoClassDefFoundError: javax/transaction/RollbackException

    at org.infinispan.hotrod.impl.HotRodTransport.<init>(HotRodTransport.java:91)
    at org.infinispan.hotrod.HotRod.<init>(HotRod.java:16)
    at org.infinispan.hotrod.HotRodFactory.create(HotRodFactory.java:29)
    at org.infinispan.api.Infinispan.create(Infinispan.java:49)
    at org.littlewings.infinispan.remote.newclient.HotRodNewClientTest.connectInfinispanServerUsingConfiguration(HotRodNewClientTest.java:53)
    
       〜省略〜

    at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:57)
    at com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38)
    at com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11)
    at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35)
    at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
    at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
Caused by: java.lang.ClassNotFoundException: javax.transaction.RollbackException
    at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:641)
    at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:188)
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:520)
    ... 75 more

pom.xml上はoptionalになっていますが、実質必須ですね…。

https://github.com/infinispan/infinispan/blob/14.0.0.Final/client/hotrod/pom.xml#L73-L77

URI指定でSerializationContextInitializerが指定できない

先に書いた部分で、以下のように書いていた事象ですね。

        // URIではPropertiesの部分は読まない
        //URI uri = URI.create("hotrod://ispn-user:password@172.17.0.2:11222?context-initializers=org.littlewings.infinispan.remote.newclient.EntitiesInitializerImpl");

これはどうして発生するかというと、URIをパースして得られるProperties

https://github.com/infinispan/infinispan/blob/14.0.0.Final/client/hotrod/src/main/java/org/infinispan/hotrod/impl/HotRodURI.java#L113

ConfigurationBuilder内で無視しているからです…。

   @Override
   public HotRodConfigurationBuilder withProperties(Properties properties) {
      //FIXME
      return this;
   }

https://github.com/infinispan/infinispan/blob/14.0.0.Final/client/hotrod/src/main/java/org/infinispan/hotrod/configuration/HotRodConfigurationBuilder.java#L313-L317

FIXMEが付いているので、修正されるのを待ちましょう…。

Mutinyで動作しない

MutinyCacheを取得しようとすると、謎のClassCastExceptionが発生します。

java.lang.ClassCastException: class java.util.concurrent.CompletableFuture cannot be cast to class org.infinispan.hotrod.impl.cache.RemoteCache (java.util.concurrent.CompletableFuture is in module java.base of loader 'bootstrap'; org.infinispan.hotrod.impl.cache.RemoteCache is in unnamed module of loader 'app')

    at org.infinispan.hotrod.HotRodMutinyCaches.lambda$get$2(HotRodMutinyCaches.java:35)
    at io.smallrye.mutiny.operators.uni.UniOnItemTransform$UniOnItemTransformProcessor.onItem(UniOnItemTransform.java:36)
    at io.smallrye.mutiny.operators.uni.builders.UniCreateFromKnownItem$KnownItemSubscription.forward(UniCreateFromKnownItem.java:38)
    at io.smallrye.mutiny.operators.uni.builders.UniCreateFromKnownItem$KnownItemSubscription.access$100(UniCreateFromKnownItem.java:26)
    at io.smallrye.mutiny.operators.uni.builders.UniCreateFromKnownItem.subscribe(UniCreateFromKnownItem.java:23)
    at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
    at io.smallrye.mutiny.operators.uni.UniOnItemTransform.subscribe(UniOnItemTransform.java:22)
    at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
    at io.smallrye.mutiny.operators.uni.UniBlockingAwait.await(UniBlockingAwait.java:60)
    at io.smallrye.mutiny.groups.UniAwait.atMost(UniAwait.java:65)
    at io.smallrye.mutiny.groups.UniAwait.indefinitely(UniAwait.java:46)

        〜省略〜

HotRodMutinyCachesクラスを見てみると、バッチリFIXMEと書いてあります。

   @Override
   public <K, V> Uni<MutinyCache<K, V>> create(String name, CacheConfiguration cacheConfiguration) {
      // FIXME
      return Uni.createFrom().item(hotrod.transport.getRemoteCache(name)).map(r -> new HotRodMutinyCache<>(hotrod, (RemoteCache<K, V>) r));
   }

   @Override
   public <K, V> Uni<MutinyCache<K, V>> create(String name, String template) {
      // FIXME
      return Uni.createFrom().item(hotrod.transport.getRemoteCache(name)).map(r -> new HotRodMutinyCache<>(hotrod, (RemoteCache<K, V>) r));
   }

   @Override
   public <K, V> Uni<MutinyCache<K, V>> get(String name) {
      return Uni.createFrom().item(hotrod.transport.getRemoteCache(name)).map(r -> new HotRodMutinyCache<>(hotrod, (RemoteCache<K, V>) r));
   }

https://github.com/infinispan/infinispan/blob/14.0.0.Final/client/hotrod/src/main/java/org/infinispan/hotrod/HotRodMutinyCaches.java#L21-L36

HotRod#transport#getRemoteCacheの結果はCompletableFutureなので、RemoteCacheにキャストできずに失敗しています…。

まあ、テストはAsyncCache分しかなかったですしね…。

追記) Infinispan 14.0.4.Finalで修正されたので、追加のエントリーを書きました

Infinispan 14.0の新しいHot Rod Client APIのMutiny版を試す - CLOVER🍀

まとめ

Infinispan 14.0で追加された、新しいHot Rod Client APIを試してみました。

が、ドキュメントがなかったり、動かなかったり、機能がまだ足りなかったりという状態なことがわかりました。

まあ、まだできたばっかりですし、実装が進むのをもうちょっと待つことにします。

Infinispan 10.0の時にも、新しいAPIを試してみたもののそのまま話題に挙がらなくなった時のことを思い出しましたが…。

Infinispan 10のNew Reactive API(Hot Rod)を試す - CLOVER🍀

今回作成したソースコードは、こちらに置いています。

https://github.com/kazuhira-r/infinispan-getting-started/tree/master/remote-new-hotrod-client-underconstruction