春季集成 - 延迟器在使用JDBCMESSAGESTORE和MYSQL时开始发送零消息有效载荷,可与H2一起使用
我的流量以下是很好的流程,直到我将Messagestore添加到延迟器部分(以获取持久消息)。这一切都与嵌入式H2数据库一起使用。如果我切换到MySQL数据库,则无效。
现在发生的事情是,当延迟消息通过Follortimeout Flow将延迟的消息路由回到提交频道时,在调用句柄函数时,消息有效负载为null。
handle<Submission> { p, _ -> poll(p) }
这很奇怪,就像我在follortimeout流中调试代码时,有效载荷被填充。
有人知道为什么我得到这种行为吗?我在启动时或通过其余端点发送消息,可能需要几个来复制错误。
使用MySQL作为消息存储运行时的错误:
2022-05-19 15:32:43.684 DEBUG 46627 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL update
2022-05-19 15:32:43.684 DEBUG 46627 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL statement [DELETE from INT_MESSAGE where MESSAGE_ID=? and REGION=?]
2022-05-19 15:32:43.684 DEBUG 46627 --- [ scheduling-1] o.s.jdbc.datasource.DataSourceUtils : Fetching JDBC Connection from DataSource
2022-05-19 15:32:43.685 TRACE 46627 --- [ scheduling-1] o.s.jdbc.core.StatementCreatorUtils : Setting SQL statement parameter value: column index 1, parameter value [581f4a27-a2b2-92ff-7741-5efdf1a0af73], value class [java.lang.String], SQL type unknown
2022-05-19 15:32:43.685 TRACE 46627 --- [ scheduling-1] o.s.jdbc.core.StatementCreatorUtils : Setting SQL statement parameter value: column index 2, parameter value [DEFAULT], value class [java.lang.String], SQL type unknown
2022-05-19 15:32:43.689 TRACE 46627 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : SQL update affected 1 rows
2022-05-19 15:32:43.689 DEBUG 46627 --- [ scheduling-1] o.s.i.jdbc.store.JdbcMessageStore : Removing messages from group with group key=c54c4771-3f5d-3764-9d5f-f3c3c13d7451
2022-05-19 15:32:43.689 DEBUG 46627 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing SQL batch update [DELETE from INT_GROUP_TO_MESSAGE where GROUP_KEY=? and MESSAGE_ID=? and REGION=?] with a batch size of 100
2022-05-19 15:32:43.689 DEBUG 46627 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL statement [DELETE from INT_GROUP_TO_MESSAGE where GROUP_KEY=? and MESSAGE_ID=? and REGION=?]
2022-05-19 15:32:43.689 DEBUG 46627 --- [ scheduling-1] o.s.jdbc.datasource.DataSourceUtils : Fetching JDBC Connection from DataSource
2022-05-19 15:32:43.689 DEBUG 46627 --- [ scheduling-1] o.s.jdbc.support.JdbcUtils : JDBC driver supports batch updates
2022-05-19 15:32:43.689 TRACE 46627 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Sending SQL batch update #1 with 1 items
2022-05-19 15:32:43.694 DEBUG 46627 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing SQL batch update [DELETE from INT_MESSAGE where MESSAGE_ID=? and REGION=?] with a batch size of 100
2022-05-19 15:32:43.694 DEBUG 46627 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL statement [DELETE from INT_MESSAGE where MESSAGE_ID=? and REGION=?]
2022-05-19 15:32:43.694 DEBUG 46627 --- [ scheduling-1] o.s.jdbc.datasource.DataSourceUtils : Fetching JDBC Connection from DataSource
2022-05-19 15:32:43.694 DEBUG 46627 --- [ scheduling-1] o.s.jdbc.support.JdbcUtils : JDBC driver supports batch updates
2022-05-19 15:32:43.694 TRACE 46627 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Sending SQL batch update #1 with 1 items
2022-05-19 15:32:43.697 DEBUG 46627 --- [ scheduling-1] o.s.i.jdbc.store.JdbcMessageStore : Updating MessageGroup: c54c4771-3f5d-3764-9d5f-f3c3c13d7451
2022-05-19 15:32:43.697 DEBUG 46627 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL update
2022-05-19 15:32:43.697 DEBUG 46627 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL statement [UPDATE INT_MESSAGE_GROUP set UPDATED_DATE=? where GROUP_KEY=? and REGION=?]
2022-05-19 15:32:43.697 DEBUG 46627 --- [ scheduling-1] o.s.jdbc.datasource.DataSourceUtils : Fetching JDBC Connection from DataSource
2022-05-19 15:32:43.697 TRACE 46627 --- [ scheduling-1] o.s.jdbc.core.StatementCreatorUtils : Setting SQL statement parameter value: column index 1, parameter value [2022-05-19 15:32:43.697], value class [java.sql.Timestamp], SQL type unknown
2022-05-19 15:32:43.697 TRACE 46627 --- [ scheduling-1] o.s.jdbc.core.StatementCreatorUtils : Setting SQL statement parameter value: column index 2, parameter value [c54c4771-3f5d-3764-9d5f-f3c3c13d7451], value class [java.lang.String], SQL type unknown
2022-05-19 15:32:43.697 TRACE 46627 --- [ scheduling-1] o.s.jdbc.core.StatementCreatorUtils : Setting SQL statement parameter value: column index 3, parameter value [DEFAULT], value class [java.lang.String], SQL type unknown
2022-05-19 15:32:43.701 TRACE 46627 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : SQL update affected 1 rows
2022-05-19 15:32:43.701 DEBUG 46627 --- [ scheduling-1] o.s.integration.channel.DirectChannel : preSend on channel 'bean 'poll'; defined in: 'class path resource [com/example/springintegrationdemo/ChannelConfiguration.class]'; from source: 'com.example.springintegrationdemo.ChannelConfiguration.poll()'', message: GenericMessage [payload=Submission(submissionId=2, description=hello, delay=1000, status=NOT_READY), headers={replyChannel=nullChannel, pollCount=1, id=d2b60caa-1dd3-f38e-02ff-688ab279b38c, timestamp=1652970762671}]
2022-05-19 15:32:43.701 DEBUG 46627 --- [ scheduling-1] o.s.i.router.MethodInvokingRouter : bean 'pollOrTimeOut.router#0' for component 'pollOrTimeOut.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; defined in: 'class path resource [com/example/springintegrationdemo/SubmissionConfiguration.class]'; from source: 'bean method pollOrTimeOut' received message: GenericMessage [payload=Submission(submissionId=2, description=hello, delay=1000, status=NOT_READY), headers={replyChannel=nullChannel, pollCount=1, id=d2b60caa-1dd3-f38e-02ff-688ab279b38c, timestamp=1652970762671}]
2022-05-19 15:32:43.701 TRACE 46627 --- [ scheduling-1] o.s.b.f.s.DefaultListableBeanFactory : Returning cached instance of singleton bean 'submissions'
2022-05-19 15:32:43.702 DEBUG 46627 --- [ scheduling-1] o.s.integration.channel.DirectChannel : preSend on channel 'bean 'submissions'; defined in: 'class path resource [com/example/springintegrationdemo/ChannelConfiguration.class]'; from source: 'com.example.springintegrationdemo.ChannelConfiguration.submissions()'', message: GenericMessage [payload=Submission(submissionId=2, description=hello, delay=1000, status=NOT_READY), headers={replyChannel=nullChannel, pollCount=1, id=d2b60caa-1dd3-f38e-02ff-688ab279b38c, timestamp=1652970762671}]
2022-05-19 15:32:43.702 DEBUG 46627 --- [ scheduling-1] o.s.i.t.MessageTransformingHandler : bean 'submissionFlow.header-enricher#1' for component 'submissionFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; defined in: 'class path resource [com/example/springintegrationdemo/SubmissionConfiguration.class]'; from source: 'bean method submissionFlow' received message: GenericMessage [payload=Submission(submissionId=2, description=hello, delay=1000, status=NOT_READY), headers={replyChannel=nullChannel, pollCount=1, id=d2b60caa-1dd3-f38e-02ff-688ab279b38c, timestamp=1652970762671}]
2022-05-19 15:32:43.702 DEBUG 46627 --- [ scheduling-1] o.s.integration.channel.DirectChannel : preSend on channel 'bean 'submissionFlow.channel#0'; defined in: 'class path resource [com/example/springintegrationdemo/SubmissionConfiguration.class]'; from source: 'bean method submissionFlow'', message: GenericMessage [payload=Submission(submissionId=2, description=hello, delay=1000, status=NOT_READY), headers={replyChannel=nullChannel, pollCount=1, id=d2b60caa-1dd3-f38e-02ff-688ab279b38c, timestamp=1652970762671}]
2022-05-19 15:32:43.702 DEBUG 46627 --- [ scheduling-1] o.s.i.handler.ServiceActivatingHandler : ServiceActivator for [org.springframework.integration.handler.LambdaMessageProcessor@aa1b4da] (submissionFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#1) received message: GenericMessage [payload=Submission(submissionId=2, description=hello, delay=1000, status=NOT_READY), headers={replyChannel=nullChannel, pollCount=1, id=d2b60caa-1dd3-f38e-02ff-688ab279b38c, timestamp=1652970762671}]
2022-05-19 15:32:43.702 DEBUG 46627 --- [ scheduling-1] o.s.integration.handler.DelayHandler : Release flow threw an exception for message: GenericMessage [payload=org.springframework.integration.handler.DelayHandler$DelayedMessageWrapper@f90e37f0, headers={replyChannel=nullChannel, pollCount=1, id=581f4a27-a2b2-92ff-7741-5efdf1a0af73, timestamp=1652970762671}]
org.springframework.messaging.MessageHandlingException: error occurred in message handler [ServiceActivator for [org.springframework.integration.handler.LambdaMessageProcessor@aa1b4da] (submissionFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#1)]; nested exception is java.lang.NullPointerException: Parameter specified as non-null is null: method com.example.springintegrationdemo.SubmissionConfiguration.poll, parameter input
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191) ~[spring-integration-core-5.5.11.jar:5.5.11]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:65) ~[spring-integration-core-5.5.11.jar:5.5.11]
在此处:
package com.example.springintegrationdemo
import org.springframework.boot.ApplicationRunner
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.integration.annotation.MessagingGateway
import org.springframework.integration.channel.DirectChannel
import org.springframework.integration.dsl.MessageChannels
import org.springframework.integration.dsl.StandardIntegrationFlow
import org.springframework.integration.dsl.integrationFlow
import org.springframework.integration.jdbc.store.JdbcMessageStore
import org.springframework.integration.jdbc.store.channel.ChannelMessageStoreQueryProvider
import org.springframework.integration.jdbc.store.channel.MySqlChannelMessageStoreQueryProvider
import org.springframework.integration.store.MessageGroupStore
import org.springframework.messaging.Message
import org.springframework.web.bind.annotation.PostMapping
import org.springframework.web.bind.annotation.RequestBody
import org.springframework.web.bind.annotation.RestController
import java.io.Serializable
import java.util.concurrent.atomic.AtomicInteger
import javax.sql.DataSource
data class Submission(val submissionId: String,val description : String, val delay : Long, val status : String) : Serializable
@SpringBootApplication
class SpringIntegrationDemoApplication {
@MessagingGateway(defaultRequestChannel = "submissions")
interface SubmissionGateway {
fun poll(submission: Submission)
}
@Bean
fun jdbcChannelMessageStore(dataSource: DataSource): JdbcMessageStore {
return JdbcMessageStore(dataSource)
}
@Bean
fun queryProvider(): ChannelMessageStoreQueryProvider = MySqlChannelMessageStoreQueryProvider()
@Bean
fun runner(gateway: SubmissionGateway): ApplicationRunner? {
return ApplicationRunner {
gateway.poll(Submission("mySubmissionId", "my submission", 1000L, ""))
}
}
}
fun main(args: Array<String>) {
runApplication<SpringIntegrationDemoApplication>(*args)
}
@Configuration
class ChannelConfiguration {
@Bean
fun submissions(): DirectChannel = MessageChannels.direct().get()
@Bean
fun ready(): DirectChannel = MessageChannels.direct().get()
@Bean
fun notReady(): DirectChannel = MessageChannels.direct().get()
@Bean
fun poll(): DirectChannel = MessageChannels.direct().get()
@Bean
fun timeout(): DirectChannel = MessageChannels.direct().get()
}
@Configuration
class SubmissionConfiguration(val channels: ChannelConfiguration) {
val notReady = "NOT_READY"
val ready = "READY"
private val pollCount = "pollCount"
@Bean
fun submissionFlow(): StandardIntegrationFlow = integrationFlow(channels.submissions()) {
enrichHeaders {
headerFunction<Any>(pollCount) {
AtomicInteger()
}
}
handle<Submission> { p, _ -> poll(p) }
route<Message<Submission>> {
when (it.payload.status) {
ready -> channels.ready()
else -> channels.notReady()
}
}
}
fun poll(input: Submission): Submission {
val status = if ((0..10).random() == 0) ready else notReady
println("in polling input is $input, result is $status")
return input.copy(status = status)
}
@Bean
fun readyFlow(): StandardIntegrationFlow = integrationFlow(channels.ready()) {
handle {
println("Handling ready message $it")
}
}
@Bean
fun notReadyFlow(messageStore: MessageGroupStore): StandardIntegrationFlow = integrationFlow(channels.notReady()) {
delay("delayer.messageGroupId") {
messageStore(messageStore)
delayFunction<Submission> {
it.headers[pollCount, AtomicInteger::class.java]?.getAndIncrement()
println(it.headers[pollCount])
it.payload.delay
}
}
channel("poll")
}
@Bean
fun pollOrTimeOut(): StandardIntegrationFlow = integrationFlow(channels.poll()) {
route<Message<*>, String>({
val count = it.headers[pollCount].toString().toInt()
if (count > 10) {
"timeout"
} else {
"submissions"
}
}
) {
}
}
@Bean
fun timeoutFlow(): StandardIntegrationFlow = integrationFlow(channels.timeout()) {
handle {
println("Handling timeout message $it")
}
}
}
@RestController
class SubmissionController(val submissionGateway: SpringIntegrationDemoApplication.SubmissionGateway) {
@PostMapping("/")
fun save(@RequestBody subMission: Submission) {
return submissionGateway.poll(subMission)
}
}
Resources/schema.sql:
CREATE TABLE INT_MESSAGE (
MESSAGE_ID CHAR(36) NOT NULL,
REGION VARCHAR(100) NOT NULL,
CREATED_DATE TIMESTAMP NOT NULL,
MESSAGE_BYTES LONGVARBINARY,
constraint INT_MESSAGE_PK primary key (MESSAGE_ID, REGION)
);
CREATE INDEX INT_MESSAGE_IX1 ON INT_MESSAGE (CREATED_DATE);
CREATE TABLE INT_GROUP_TO_MESSAGE (
GROUP_KEY CHAR(36) NOT NULL,
MESSAGE_ID CHAR(36) NOT NULL,
REGION VARCHAR(100) NOT NULL,
constraint INT_GROUP_TO_MESSAGE_PK primary key (GROUP_KEY, MESSAGE_ID, REGION)
);
CREATE TABLE INT_MESSAGE_GROUP (
GROUP_KEY CHAR(36) NOT NULL,
REGION VARCHAR(100) NOT NULL,
CONDITION VARCHAR(255),
COMPLETE BIGINT,
LAST_RELEASED_SEQUENCE BIGINT,
CREATED_DATE TIMESTAMP NOT NULL,
UPDATED_DATE TIMESTAMP DEFAULT NULL,
constraint INT_MESSAGE_GROUP_PK primary key (GROUP_KEY, REGION)
);
CREATE TABLE INT_LOCK (
LOCK_KEY CHAR(36) NOT NULL,
REGION VARCHAR(100) NOT NULL,
CLIENT_ID CHAR(36),
CREATED_DATE TIMESTAMP NOT NULL,
constraint INT_LOCK_PK primary key (LOCK_KEY, REGION)
);
CREATE SEQUENCE INT_MESSAGE_SEQ START WITH 1 INCREMENT BY 1;
CREATE TABLE INT_CHANNEL_MESSAGE (
MESSAGE_ID CHAR(36) NOT NULL,
GROUP_KEY CHAR(36) NOT NULL,
CREATED_DATE BIGINT NOT NULL,
MESSAGE_PRIORITY BIGINT,
MESSAGE_SEQUENCE BIGINT NOT NULL ,
MESSAGE_BYTES LONGVARBINARY,
REGION VARCHAR(100) NOT NULL,
constraint INT_CHANNEL_MESSAGE_PK primary key (REGION, GROUP_KEY, CREATED_DATE, MESSAGE_SEQUENCE)
);
CREATE INDEX INT_CHANNEL_MSG_DELETE_IDX ON INT_CHANNEL_MESSAGE (REGION, GROUP_KEY, MESSAGE_ID);
-- This is only needed if the message group store property 'priorityEnabled' is true
-- CREATE UNIQUE INDEX INT_CHANNEL_MSG_PRIORITY_IDX ON INT_CHANNEL_MESSAGE (REGION, GROUP_KEY, MESSAGE_PRIORITY DESC, CREATED_DATE, MESSAGE_SEQUENCE);
CREATE TABLE INT_METADATA_STORE (
METADATA_KEY VARCHAR(255) NOT NULL,
METADATA_VALUE VARCHAR(4000),
REGION VARCHAR(100) NOT NULL,
constraint INT_METADATA_STORE_PK primary key (METADATA_KEY, REGION)
);
build.gradle.kts:
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile
plugins {
id("org.springframework.boot") version "2.6.7"
id("io.spring.dependency-management") version "1.0.11.RELEASE"
kotlin("jvm") version "1.6.21"
kotlin("plugin.spring") version "1.6.21"
kotlin("plugin.serialization") version "1.6.21"
java
}
group = "com.example"
version = "0.0.1-SNAPSHOT"
java.sourceCompatibility = JavaVersion.VERSION_11
repositories {
mavenCentral()
}
dependencies {
listOf("integration","webflux","data-jpa","web").forEach {
implementation("org.springframework.boot:spring-boot-starter-$it")
}
implementation("org.springframework.integration:spring-integration-jdbc")
implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
implementation("io.projectreactor.kotlin:reactor-kotlin-extensions")
implementation("org.jetbrains.kotlin:kotlin-reflect")
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.3.2")
runtimeOnly("com.h2database:h2")
developmentOnly("org.springframework.boot:spring-boot-devtools")
testImplementation("org.springframework.boot:spring-boot-starter-test")
testImplementation("org.springframework.integration:spring-integration-test")
}
tasks.withType<KotlinCompile> {
kotlinOptions {
freeCompilerArgs = listOf("-Xjsr305=strict")
jvmTarget = "11"
}
}
tasks.withType<Test> {
useJUnitPlatform()
}
I have the flow below which all worked fine until I added a messageStore to the delayer section (to get persistent messages). This all worked with an embedded h2 database. If I then switch to a mysql database it doesn't work.
What happens now is that when the delayed message gets routed back to the submission channel by the pollOrTimeout flow, the message payload is null when it calls the handle function.
handle<Submission> { p, _ -> poll(p) }
This is odd as when I debug the code in the pollOrTimeout flow, the payload is populated.
Does anyone know why I'm getting this behaviour? I send a message at startup, or via the rest endpoint, it can take a couple of goes to replicate the error.
Error when running with mysql as the message store:
2022-05-19 15:32:43.684 DEBUG 46627 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL update
2022-05-19 15:32:43.684 DEBUG 46627 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL statement [DELETE from INT_MESSAGE where MESSAGE_ID=? and REGION=?]
2022-05-19 15:32:43.684 DEBUG 46627 --- [ scheduling-1] o.s.jdbc.datasource.DataSourceUtils : Fetching JDBC Connection from DataSource
2022-05-19 15:32:43.685 TRACE 46627 --- [ scheduling-1] o.s.jdbc.core.StatementCreatorUtils : Setting SQL statement parameter value: column index 1, parameter value [581f4a27-a2b2-92ff-7741-5efdf1a0af73], value class [java.lang.String], SQL type unknown
2022-05-19 15:32:43.685 TRACE 46627 --- [ scheduling-1] o.s.jdbc.core.StatementCreatorUtils : Setting SQL statement parameter value: column index 2, parameter value [DEFAULT], value class [java.lang.String], SQL type unknown
2022-05-19 15:32:43.689 TRACE 46627 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : SQL update affected 1 rows
2022-05-19 15:32:43.689 DEBUG 46627 --- [ scheduling-1] o.s.i.jdbc.store.JdbcMessageStore : Removing messages from group with group key=c54c4771-3f5d-3764-9d5f-f3c3c13d7451
2022-05-19 15:32:43.689 DEBUG 46627 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing SQL batch update [DELETE from INT_GROUP_TO_MESSAGE where GROUP_KEY=? and MESSAGE_ID=? and REGION=?] with a batch size of 100
2022-05-19 15:32:43.689 DEBUG 46627 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL statement [DELETE from INT_GROUP_TO_MESSAGE where GROUP_KEY=? and MESSAGE_ID=? and REGION=?]
2022-05-19 15:32:43.689 DEBUG 46627 --- [ scheduling-1] o.s.jdbc.datasource.DataSourceUtils : Fetching JDBC Connection from DataSource
2022-05-19 15:32:43.689 DEBUG 46627 --- [ scheduling-1] o.s.jdbc.support.JdbcUtils : JDBC driver supports batch updates
2022-05-19 15:32:43.689 TRACE 46627 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Sending SQL batch update #1 with 1 items
2022-05-19 15:32:43.694 DEBUG 46627 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing SQL batch update [DELETE from INT_MESSAGE where MESSAGE_ID=? and REGION=?] with a batch size of 100
2022-05-19 15:32:43.694 DEBUG 46627 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL statement [DELETE from INT_MESSAGE where MESSAGE_ID=? and REGION=?]
2022-05-19 15:32:43.694 DEBUG 46627 --- [ scheduling-1] o.s.jdbc.datasource.DataSourceUtils : Fetching JDBC Connection from DataSource
2022-05-19 15:32:43.694 DEBUG 46627 --- [ scheduling-1] o.s.jdbc.support.JdbcUtils : JDBC driver supports batch updates
2022-05-19 15:32:43.694 TRACE 46627 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Sending SQL batch update #1 with 1 items
2022-05-19 15:32:43.697 DEBUG 46627 --- [ scheduling-1] o.s.i.jdbc.store.JdbcMessageStore : Updating MessageGroup: c54c4771-3f5d-3764-9d5f-f3c3c13d7451
2022-05-19 15:32:43.697 DEBUG 46627 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL update
2022-05-19 15:32:43.697 DEBUG 46627 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL statement [UPDATE INT_MESSAGE_GROUP set UPDATED_DATE=? where GROUP_KEY=? and REGION=?]
2022-05-19 15:32:43.697 DEBUG 46627 --- [ scheduling-1] o.s.jdbc.datasource.DataSourceUtils : Fetching JDBC Connection from DataSource
2022-05-19 15:32:43.697 TRACE 46627 --- [ scheduling-1] o.s.jdbc.core.StatementCreatorUtils : Setting SQL statement parameter value: column index 1, parameter value [2022-05-19 15:32:43.697], value class [java.sql.Timestamp], SQL type unknown
2022-05-19 15:32:43.697 TRACE 46627 --- [ scheduling-1] o.s.jdbc.core.StatementCreatorUtils : Setting SQL statement parameter value: column index 2, parameter value [c54c4771-3f5d-3764-9d5f-f3c3c13d7451], value class [java.lang.String], SQL type unknown
2022-05-19 15:32:43.697 TRACE 46627 --- [ scheduling-1] o.s.jdbc.core.StatementCreatorUtils : Setting SQL statement parameter value: column index 3, parameter value [DEFAULT], value class [java.lang.String], SQL type unknown
2022-05-19 15:32:43.701 TRACE 46627 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : SQL update affected 1 rows
2022-05-19 15:32:43.701 DEBUG 46627 --- [ scheduling-1] o.s.integration.channel.DirectChannel : preSend on channel 'bean 'poll'; defined in: 'class path resource [com/example/springintegrationdemo/ChannelConfiguration.class]'; from source: 'com.example.springintegrationdemo.ChannelConfiguration.poll()'', message: GenericMessage [payload=Submission(submissionId=2, description=hello, delay=1000, status=NOT_READY), headers={replyChannel=nullChannel, pollCount=1, id=d2b60caa-1dd3-f38e-02ff-688ab279b38c, timestamp=1652970762671}]
2022-05-19 15:32:43.701 DEBUG 46627 --- [ scheduling-1] o.s.i.router.MethodInvokingRouter : bean 'pollOrTimeOut.router#0' for component 'pollOrTimeOut.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; defined in: 'class path resource [com/example/springintegrationdemo/SubmissionConfiguration.class]'; from source: 'bean method pollOrTimeOut' received message: GenericMessage [payload=Submission(submissionId=2, description=hello, delay=1000, status=NOT_READY), headers={replyChannel=nullChannel, pollCount=1, id=d2b60caa-1dd3-f38e-02ff-688ab279b38c, timestamp=1652970762671}]
2022-05-19 15:32:43.701 TRACE 46627 --- [ scheduling-1] o.s.b.f.s.DefaultListableBeanFactory : Returning cached instance of singleton bean 'submissions'
2022-05-19 15:32:43.702 DEBUG 46627 --- [ scheduling-1] o.s.integration.channel.DirectChannel : preSend on channel 'bean 'submissions'; defined in: 'class path resource [com/example/springintegrationdemo/ChannelConfiguration.class]'; from source: 'com.example.springintegrationdemo.ChannelConfiguration.submissions()'', message: GenericMessage [payload=Submission(submissionId=2, description=hello, delay=1000, status=NOT_READY), headers={replyChannel=nullChannel, pollCount=1, id=d2b60caa-1dd3-f38e-02ff-688ab279b38c, timestamp=1652970762671}]
2022-05-19 15:32:43.702 DEBUG 46627 --- [ scheduling-1] o.s.i.t.MessageTransformingHandler : bean 'submissionFlow.header-enricher#1' for component 'submissionFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; defined in: 'class path resource [com/example/springintegrationdemo/SubmissionConfiguration.class]'; from source: 'bean method submissionFlow' received message: GenericMessage [payload=Submission(submissionId=2, description=hello, delay=1000, status=NOT_READY), headers={replyChannel=nullChannel, pollCount=1, id=d2b60caa-1dd3-f38e-02ff-688ab279b38c, timestamp=1652970762671}]
2022-05-19 15:32:43.702 DEBUG 46627 --- [ scheduling-1] o.s.integration.channel.DirectChannel : preSend on channel 'bean 'submissionFlow.channel#0'; defined in: 'class path resource [com/example/springintegrationdemo/SubmissionConfiguration.class]'; from source: 'bean method submissionFlow'', message: GenericMessage [payload=Submission(submissionId=2, description=hello, delay=1000, status=NOT_READY), headers={replyChannel=nullChannel, pollCount=1, id=d2b60caa-1dd3-f38e-02ff-688ab279b38c, timestamp=1652970762671}]
2022-05-19 15:32:43.702 DEBUG 46627 --- [ scheduling-1] o.s.i.handler.ServiceActivatingHandler : ServiceActivator for [org.springframework.integration.handler.LambdaMessageProcessor@aa1b4da] (submissionFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#1) received message: GenericMessage [payload=Submission(submissionId=2, description=hello, delay=1000, status=NOT_READY), headers={replyChannel=nullChannel, pollCount=1, id=d2b60caa-1dd3-f38e-02ff-688ab279b38c, timestamp=1652970762671}]
2022-05-19 15:32:43.702 DEBUG 46627 --- [ scheduling-1] o.s.integration.handler.DelayHandler : Release flow threw an exception for message: GenericMessage [payload=org.springframework.integration.handler.DelayHandler$DelayedMessageWrapper@f90e37f0, headers={replyChannel=nullChannel, pollCount=1, id=581f4a27-a2b2-92ff-7741-5efdf1a0af73, timestamp=1652970762671}]
org.springframework.messaging.MessageHandlingException: error occurred in message handler [ServiceActivator for [org.springframework.integration.handler.LambdaMessageProcessor@aa1b4da] (submissionFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#1)]; nested exception is java.lang.NullPointerException: Parameter specified as non-null is null: method com.example.springintegrationdemo.SubmissionConfiguration.poll, parameter input
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191) ~[spring-integration-core-5.5.11.jar:5.5.11]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:65) ~[spring-integration-core-5.5.11.jar:5.5.11]
Main code here:
package com.example.springintegrationdemo
import org.springframework.boot.ApplicationRunner
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.integration.annotation.MessagingGateway
import org.springframework.integration.channel.DirectChannel
import org.springframework.integration.dsl.MessageChannels
import org.springframework.integration.dsl.StandardIntegrationFlow
import org.springframework.integration.dsl.integrationFlow
import org.springframework.integration.jdbc.store.JdbcMessageStore
import org.springframework.integration.jdbc.store.channel.ChannelMessageStoreQueryProvider
import org.springframework.integration.jdbc.store.channel.MySqlChannelMessageStoreQueryProvider
import org.springframework.integration.store.MessageGroupStore
import org.springframework.messaging.Message
import org.springframework.web.bind.annotation.PostMapping
import org.springframework.web.bind.annotation.RequestBody
import org.springframework.web.bind.annotation.RestController
import java.io.Serializable
import java.util.concurrent.atomic.AtomicInteger
import javax.sql.DataSource
data class Submission(val submissionId: String,val description : String, val delay : Long, val status : String) : Serializable
@SpringBootApplication
class SpringIntegrationDemoApplication {
@MessagingGateway(defaultRequestChannel = "submissions")
interface SubmissionGateway {
fun poll(submission: Submission)
}
@Bean
fun jdbcChannelMessageStore(dataSource: DataSource): JdbcMessageStore {
return JdbcMessageStore(dataSource)
}
@Bean
fun queryProvider(): ChannelMessageStoreQueryProvider = MySqlChannelMessageStoreQueryProvider()
@Bean
fun runner(gateway: SubmissionGateway): ApplicationRunner? {
return ApplicationRunner {
gateway.poll(Submission("mySubmissionId", "my submission", 1000L, ""))
}
}
}
fun main(args: Array<String>) {
runApplication<SpringIntegrationDemoApplication>(*args)
}
@Configuration
class ChannelConfiguration {
@Bean
fun submissions(): DirectChannel = MessageChannels.direct().get()
@Bean
fun ready(): DirectChannel = MessageChannels.direct().get()
@Bean
fun notReady(): DirectChannel = MessageChannels.direct().get()
@Bean
fun poll(): DirectChannel = MessageChannels.direct().get()
@Bean
fun timeout(): DirectChannel = MessageChannels.direct().get()
}
@Configuration
class SubmissionConfiguration(val channels: ChannelConfiguration) {
val notReady = "NOT_READY"
val ready = "READY"
private val pollCount = "pollCount"
@Bean
fun submissionFlow(): StandardIntegrationFlow = integrationFlow(channels.submissions()) {
enrichHeaders {
headerFunction<Any>(pollCount) {
AtomicInteger()
}
}
handle<Submission> { p, _ -> poll(p) }
route<Message<Submission>> {
when (it.payload.status) {
ready -> channels.ready()
else -> channels.notReady()
}
}
}
fun poll(input: Submission): Submission {
val status = if ((0..10).random() == 0) ready else notReady
println("in polling input is $input, result is $status")
return input.copy(status = status)
}
@Bean
fun readyFlow(): StandardIntegrationFlow = integrationFlow(channels.ready()) {
handle {
println("Handling ready message $it")
}
}
@Bean
fun notReadyFlow(messageStore: MessageGroupStore): StandardIntegrationFlow = integrationFlow(channels.notReady()) {
delay("delayer.messageGroupId") {
messageStore(messageStore)
delayFunction<Submission> {
it.headers[pollCount, AtomicInteger::class.java]?.getAndIncrement()
println(it.headers[pollCount])
it.payload.delay
}
}
channel("poll")
}
@Bean
fun pollOrTimeOut(): StandardIntegrationFlow = integrationFlow(channels.poll()) {
route<Message<*>, String>({
val count = it.headers[pollCount].toString().toInt()
if (count > 10) {
"timeout"
} else {
"submissions"
}
}
) {
}
}
@Bean
fun timeoutFlow(): StandardIntegrationFlow = integrationFlow(channels.timeout()) {
handle {
println("Handling timeout message $it")
}
}
}
@RestController
class SubmissionController(val submissionGateway: SpringIntegrationDemoApplication.SubmissionGateway) {
@PostMapping("/")
fun save(@RequestBody subMission: Submission) {
return submissionGateway.poll(subMission)
}
}
resources/schema.sql:
CREATE TABLE INT_MESSAGE (
MESSAGE_ID CHAR(36) NOT NULL,
REGION VARCHAR(100) NOT NULL,
CREATED_DATE TIMESTAMP NOT NULL,
MESSAGE_BYTES LONGVARBINARY,
constraint INT_MESSAGE_PK primary key (MESSAGE_ID, REGION)
);
CREATE INDEX INT_MESSAGE_IX1 ON INT_MESSAGE (CREATED_DATE);
CREATE TABLE INT_GROUP_TO_MESSAGE (
GROUP_KEY CHAR(36) NOT NULL,
MESSAGE_ID CHAR(36) NOT NULL,
REGION VARCHAR(100) NOT NULL,
constraint INT_GROUP_TO_MESSAGE_PK primary key (GROUP_KEY, MESSAGE_ID, REGION)
);
CREATE TABLE INT_MESSAGE_GROUP (
GROUP_KEY CHAR(36) NOT NULL,
REGION VARCHAR(100) NOT NULL,
CONDITION VARCHAR(255),
COMPLETE BIGINT,
LAST_RELEASED_SEQUENCE BIGINT,
CREATED_DATE TIMESTAMP NOT NULL,
UPDATED_DATE TIMESTAMP DEFAULT NULL,
constraint INT_MESSAGE_GROUP_PK primary key (GROUP_KEY, REGION)
);
CREATE TABLE INT_LOCK (
LOCK_KEY CHAR(36) NOT NULL,
REGION VARCHAR(100) NOT NULL,
CLIENT_ID CHAR(36),
CREATED_DATE TIMESTAMP NOT NULL,
constraint INT_LOCK_PK primary key (LOCK_KEY, REGION)
);
CREATE SEQUENCE INT_MESSAGE_SEQ START WITH 1 INCREMENT BY 1;
CREATE TABLE INT_CHANNEL_MESSAGE (
MESSAGE_ID CHAR(36) NOT NULL,
GROUP_KEY CHAR(36) NOT NULL,
CREATED_DATE BIGINT NOT NULL,
MESSAGE_PRIORITY BIGINT,
MESSAGE_SEQUENCE BIGINT NOT NULL ,
MESSAGE_BYTES LONGVARBINARY,
REGION VARCHAR(100) NOT NULL,
constraint INT_CHANNEL_MESSAGE_PK primary key (REGION, GROUP_KEY, CREATED_DATE, MESSAGE_SEQUENCE)
);
CREATE INDEX INT_CHANNEL_MSG_DELETE_IDX ON INT_CHANNEL_MESSAGE (REGION, GROUP_KEY, MESSAGE_ID);
-- This is only needed if the message group store property 'priorityEnabled' is true
-- CREATE UNIQUE INDEX INT_CHANNEL_MSG_PRIORITY_IDX ON INT_CHANNEL_MESSAGE (REGION, GROUP_KEY, MESSAGE_PRIORITY DESC, CREATED_DATE, MESSAGE_SEQUENCE);
CREATE TABLE INT_METADATA_STORE (
METADATA_KEY VARCHAR(255) NOT NULL,
METADATA_VALUE VARCHAR(4000),
REGION VARCHAR(100) NOT NULL,
constraint INT_METADATA_STORE_PK primary key (METADATA_KEY, REGION)
);
build.gradle.kts:
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile
plugins {
id("org.springframework.boot") version "2.6.7"
id("io.spring.dependency-management") version "1.0.11.RELEASE"
kotlin("jvm") version "1.6.21"
kotlin("plugin.spring") version "1.6.21"
kotlin("plugin.serialization") version "1.6.21"
java
}
group = "com.example"
version = "0.0.1-SNAPSHOT"
java.sourceCompatibility = JavaVersion.VERSION_11
repositories {
mavenCentral()
}
dependencies {
listOf("integration","webflux","data-jpa","web").forEach {
implementation("org.springframework.boot:spring-boot-starter-$it")
}
implementation("org.springframework.integration:spring-integration-jdbc")
implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
implementation("io.projectreactor.kotlin:reactor-kotlin-extensions")
implementation("org.jetbrains.kotlin:kotlin-reflect")
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.3.2")
runtimeOnly("com.h2database:h2")
developmentOnly("org.springframework.boot:spring-boot-devtools")
testImplementation("org.springframework.boot:spring-boot-starter-test")
testImplementation("org.springframework.integration:spring-integration-test")
}
tasks.withType<KotlinCompile> {
kotlinOptions {
freeCompilerArgs = listOf("-Xjsr305=strict")
jvmTarget = "11"
}
}
tasks.withType<Test> {
useJUnitPlatform()
}
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
这是由于具有Spring Dev工具依赖性而引起的。感谢Artem和Gary在指出我朝正确方向的评论中的线索。
我已经上传了一个复制问题的示例项目。按照readme.md中的说明进行复制。
https://github.com/frayneposset/springintegrategrategrategrategrategrategrategrategrategrategrategrategrategrategrategrategrategrategratemo
即可解决问题,删除上述依赖性。
替代修复是使用这样的自定义避难所:
This was caused by having the spring dev tools dependency. Thanks to Artem and Gary for the clues in the comments that pointed me in the right direction.
I've uploaded a sample project that replicates the issue. Follow the instructions in the README.md to replicate.
https://github.com/frayneposset/springintegratondemo
To fix the problem, simply remove the above dependency.
Alternative fix is to use a custom deserializer like so: