back to all blogsすべてのブログ投稿を表示

24.0.0.2におけるMicroProfile Reactive Messagingの否定応答など

image of author image of author
David Mueller and 浅田 かおり (翻訳) 2024年2月27日
他言語版へのリンク: English ,

24.0.0.2 リリースには、MicroProfile Reactive Messaging および MicroProfile Streams Operators の新機能 (否定応答、エミッター、backpressure のサポートなど) が含まれています。また、アプリケーションの本番環境での統合テストの構築に役立つ新しい Open Liberty ガイドも提供されます。

Open Liberty 24.0.0.2:

24.0.0.2で修正されたバグリストをご参照ください。

過去のセキュリティ脆弱性修正の一覧は、セキュリティ脆弱性(CVE)リストをご参照ください。

Develop and run your apps using 24.0.0.2

Mavenを使うときは下記の設定をお使いください。

<plugin>
    <groupId>io.openliberty.tools</groupId>
    <artifactId>liberty-maven-plugin</artifactId>
    <version>3.10</version>
</plugin>

Gradleの場合は、build.gradle ファイルに以下をインクルードします。

buildscript {
    repositories {
        mavenCentral()
    }
    dependencies {
        classpath 'io.openliberty.tools:liberty-gradle-plugin:3.8'
    }
}
apply plugin: 'liberty'

コンテナ・イメージの場合はこちらです。

FROM icr.io/appcafe/open-liberty

ダウンロード・ページをご参照ください。

IntelliJ IDEA, Visual Studio CodeまたはEclipse IDEをお使いの場合は、IDE内で効率的な開発、テスト、デバッグ、アプリケーション管理を行うためのオープンソースLiberty開発者ツールをお試しください。

MicroProfile Reactive Messaging 3.0およびMicroProfile Streams Operators 3.0用の否定応答など

MicroProfile Reactive Messaging 3.0 では、否定応答、emitter、backpressure のサポートなど、MicroProfile Reactive Messaging 1.0 からの多くの新機能と変更が導入されています。これらの機能は Jakarta EE 9 以降と互換性があります。

MicroProfile Reactive Messaging または MicroProfile Streams Operators を使用するアプリケーションは、通常、リアクティブストリームに沿って渡されるメッセージを消費、生成、処理する CDI Bean で構成されます。これらのメッセージは、アプリケーションの内部で使用することも、外部のメッセージブローカを介して送受信することもできます。MicroProfile Reactive Messaging は、MicroProfile Streams Operators を使用して、メソッド間のチャネルを介して、または Kafka などのメッセージングソリューションにメッセージを渡し、メッセージの回復力のあるストレージを提供します。

両方の機能を一緒に使用するには、Microprofile Reactive Messaging 3.0 機能を server.xml ファイルに追加します

<feature>mpReactiveMessaging-3.0</feature>

MicroProfile Reactive Messaging 3.0 を有効にすると、MicroProfile Reactive Streams Operators 3.0 機能も自動的に有効になります。

MicroProfile Reactive Messaging を使用せずに MicroProfile Reactive Streams Operators 3.0 を使用するには、Microprofile Reactive Streams Operators 3.0 機能を server.xml ファイルに追加します:

<feature>mpReactiveStreams-3.0</feature>

否定的な応答

MicroProfile Reactive Messaging 1.0 では、メッセージは肯定的な応答しかできませんでした。ペイロードに問題がある場合、または例外的な動作が発生した場合、ストリーム内で発生した問題を表示または処理するメカニズムはありませんでした。新しい否定応答 (nack) サポートは、これらのイベントを送信または処理することができます。

以下の例では、受信メッセージを明示的に否定的に応答しています

@Incoming("data")
@Outgoing("out")
public Message<String> process(Message<String> m) {
    String s = m.getPayload();
    if (s.equalsIgnoreCase("b")) {
        // we cannot fail, we must nack explicitly.
        m.nack(new IllegalArgumentException("b"));
        return null;
    }
    return m.withPayload(s.toUpperCase());
}

確認応答ストラテジーを定義せずにメッセージを受け取るメソッドのシグネチャーは、デフォルトで MANUAL ストラテジーになります。あなたのコードは、メッセージを否定的(ack())または肯定的(nack())に確認する責任を負います。前の例では、メッセージは送信チャネルのダウンストリームで確認することができます。

次の例は、否定応答を引き起こす例外をスローします

@Incoming("data")
@Outgoing("out")
public String process(String s) {
    if (s.equalsIgnoreCase("b")) {
        throw new IllegalArgumentException("b");
    }
    return s.toUpperCase();
}

確認応答ストラテジーを定義せずにペイロードを受け取るメソッドのシグネチャのデフォルトは POST_PROCESSING ストラテジーです。実装では、メソッドまたはチェーンが完了した後に、メッセージに対する ack()nack() 呼び出しを処理します。上流のデータは IllegalArgumentException という理由で否定応答を受け取ります。例外がスローされた場合、実装はメッセージに対する nack() 呼び出しを実行します。

エミッター

MicroProfile Reactive Messaging 1.0 では、RESTful リソースや Bean のような命令型のコードを統合する明確な方法がありませんでした。バージョン3.0では、エミッターが2つのモデルの橋渡しをします。

次の例では、CDI を使用して RESTful リソースにエミッターを挿入し、指定されたチャネルにメッセージを配置します

@Inject
@Channel(CHANNEL_NAME)
Emitter<String> emitter;

@POST
@Path("/payload")
public CompletionStage<Void> emitPayload(String payload){
    CompletionStage<Void> cs = emitter.send(payload);
    return cs;
}

@POST
@Path("/message")
public CompletionStage<Void> emitPayload(String payload){
    CompletableFuture<Void> ackCf = new CompletableFuture<>();
    emitter.send(Message.of(payload,
        () -> {
            ackCf.complete(null);
            return CompletableFuture.completedFuture(null);
        },
        t -> {
            ackCf.completeExceptionally(t);
            return CompletableFuture.completedFuture(null);
        }));
    return ackCf;
}

エミッターを定義する際、ペイロードまたはメッセージのコンテンツとして送信されるオブジェクトのタイプを指定します。

エミッターがペイロードを送信する場合、MicroProfile Reactive Messaging はメッセージに対する ack()nack() 呼び出しを自動的に処理します。しかし、エミッターがメッセージを送信する場合、送信側のコードはメッセージが下流で否定応答されるか肯定応答されるかを処理する必要があります。

Backpressureサポート

Backpressure サポートは、消費されるよりも速く発行されるメッセージやペイロードを処理します。Backpressureストラテジーは、このような状況におけるアプリケーションの動作を定義します。以下の例では、バッファは最大300メッセージを保持し、新しいメッセージが送られたときにバッファが一杯になると例外をスローします

@Inject @Channel("myChannel")
@OnOverflow(value=OnOverflow.Strategy.BUFFER, bufferSize=300)
private Emitter<String> emitter;

public void publishMessage() {
  emitter.send("a");
  emitter.send("b");
  emitter.complete();
}

以下のBackpressureストラテジーを定義可能です

  • BUFFER - bufferSize が設定されている場合は、その値によって決まるサイズのバッファを使用します。そうでない場合は、mp.messaging.emitter.default-buffer-size MicroProfile Config プロパティが存在する場合は、その値がサイズになります。これらの値のどちらも定義されていない場合、デフォルトのサイズは 128 です。バッファが一杯になると、send メソッドから例外がスローされます。これは、他のストラテジーが定義されていない場合のデフォルトのストラテジーです。

  • DROP - ダウンストリームが追いつかない場合、最新の値をドロップします。エミッターによって放出された新しい値は無視されます。

  • FAIL - ダウンストリームが追いつかない場合にエラーを伝搬します。それ以上の値は出力されません。

  • LATEST- 最新の値のみを保持し、ダウンストリームが追いつかない場合は以前の値をすべて削除します。

  • NONE - backpressureシグナルを無視し、ストラテジーを実行するのは下流の消費者に任せます。

  • THROW_EXCEPTION - ダウンストリームが追いつかない場合、送信メソッドから例外をスローします。

  • UNBOUNDED_BUFFER - バインドされていないバッファを使用してください。値が消費されるよりも速く追加され続けると、アプリケーションはメモリ不足になるかもしれません。

Liberty-kafkaコネクターの新しいオプション

Open Liberty は、MicroProfile Reactive Messaging で使用する Kafka コネクターを提供し、Kafka をメッセージングの仲介として使用してメッセージを送受信します。このリリースでは、コネクターに2つの新しいオプション、fast.ackcontext.service が追加されました。これらのオプションはMicroProfile Config properties として設定します。

fast.ackを使った受信応答の設定

fast.ack` boolean 属性は、受信チャンネルに対するコネクターの確認応答を決定します。

  • true: 確認応答は、Kafka Connectorが確認応答シグナルを受信するとすぐに完了として報告されます。

  • false: パーティション・オフセットがKafkaブローカーにコミットされるまで、確認応答は完了として報告されません。この処理中にエラーが発生した場合、確認応答は失敗として報告されます。

fast.ackliberty-kafka コネクタまたは受信チャネルの属性として定義されます。送信チャネルで指定した場合は無視されます。

mp.messaging.connector.liberty-kafka.fast.ack=false

mp.messaginging.incoming.foo.connector=liberty-kafka
mp.messaginging.incoming.foo.fast.ack=true

この例では、コネクターはすべてのチャンネルで使用される標準値を設定します。しかし、foo チャンネルは true 値を使用します。これは、チャンネル属性がコネクター属性よりも優先されるためです。

mpReactiveMessaging-1.0 では、オプションのデフォルト値は false です。 mpReactiveMessaging-3.0 では、オプションのデフォルト値は true です。

context.serviceで非同期タスクを管理する

context.service 属性は非同期タスクに使用するコンテキストサービスを指定します。 context.service 属性の値は server.xml ファイルで定義された context.service インスタンスの id 属性への参照として指定します。

以下の`server.xml`ファイルでは、3つの異なるコンテキストサービスが一意のIDで定義されています。

<contextService id=rst/>
<contextService id=uvw/>
<contextService id=xyz/>

アプリケーションの microprofile-config.properties ファイルでは、最初のコンテキストサービスがコネクターに設定されています。アプリケーションには3つのチャネルがあります。def チャンネルは独自の context.service インスタンスを指定しないので、コネクターで定義されたものを使用します。2番目と3番目のチャネルは、独自のサービスを定義して使用します。

mp.messaging.connector.liberty-kafka.context.service=rst

mp.messaging.incoming.def.connector=liberty-kafka
mp.messaging.incoming.foo.connector=liberty-kafka
mp.messaging.incoming.foo.context.service=uvw
mp.messaging.outgoing.bar.connector=liberty-kafka
mp.messaging.outgoing.bar.context.service=xyz

Jakarta Concurrency フィーチャーが server.xml ファイルで有効になっている場合、デフォルトのコンテキストサービスが使用されます。この機能が有効になっていない場合は、Open Liberty 組み込みのコンテキストサービスが、非同期タスクをキャプチャして適用するコンテキストタイプのセットリストとともに使用されます。

詳しくはこちらをご覧ください

新規のガイド: "Testcontainersで本番に近い統合テストを構築する"

新しいガイド、Testcontainersで本番に近い統合テストを構築するTestカテゴリの下に公開されています。 このガイドでは、Testcontainers と JUnit を使って、Java マイクロサービスの統合テストを本番さながらに記述する方法を学びます。ローカルでのセットアップや前提条件を気にすることなく、ホストされた環境でこのガイドを実行することもできます。このガイドのクラウド ホスト版にアクセスするには、ガイド コード ペインの Run in cloud ボタンをクリックします。

== 今すぐOpen Liberty 24.0.0.2を入手する

Stack Overflowで質問する align="center"