spring integration java dsl已經(jīng)融合到spring integration core 5.0,這是一個聰明而明顯的舉動,因為:
- 基于java config啟動新spring項目的每個人都使用它
- si java dsl使您可以使用lambdas等新的強大java 8功能
- 您可以使用 基于integrationflowbuilder的builder模式構(gòu)建流
讓我們看看基于activemq jms的示例如何使用它。
maven依賴:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
<dependencies> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-activemq</artifactid> </dependency> <dependency> <groupid>org.springframework.integration</groupid> <artifactid>spring-integration-core</artifactid> </dependency> <dependency> <groupid>org.springframework.integration</groupid> <artifactid>spring-integration-jms</artifactid> </dependency> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-test</artifactid> <scope>test</scope> </dependency> <dependency> <groupid>org.apache.activemq</groupid> <artifactid>activemq-kahadb-store</artifactid> </dependency> <!-- https: //mvnrepository.com/artifact/org.springframework.integration/spring-integration-java-dsl --> <dependency> <groupid>org.springframework.integration</groupid> <artifactid>spring-integration-java-dsl</artifactid> <version> 1.2 . 3 .release</version> </dependency> </dependencies> |
示例1:jms入站網(wǎng)關(guān)
我們有以下serviceactivator:
1
2
3
4
5
6
7
|
@service public class activemqendpoint { @serviceactivator (inputchannel = "inboundchannel" ) public void processmessage( final string inboundpayload) { system.out.println( "inbound message: " +inboundpayload); } } |
如果您想使用si java dsl 將inboundpayload從jms隊列發(fā)送到gateway風(fēng)格的激活器,那么請使用dsljms工廠:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
@bean public dynamicdestinationresolver dynamicdestinationresolver() { return new dynamicdestinationresolver(); } @bean public activemqconnectionfactory connectionfactory() { return new activemqconnectionfactory(); } @bean public defaultmessagelistenercontainer listenercontainer() { final defaultmessagelistenercontainer defaultmessagelistenercontainer = new defaultmessagelistenercontainer(); defaultmessagelistenercontainer.setdestinationresolver(dynamicdestinationresolver()); defaultmessagelistenercontainer.setconnectionfactory(connectionfactory()); defaultmessagelistenercontainer.setdestinationname( "jms.activemq.test" ); return defaultmessagelistenercontainer; } @bean public messagechannel inboundchannel() { return messagechannels.direct( "inboundchannel" ).get(); } @bean public jmsinboundgateway dataendpoint() { return jms.inboundgateway(listenercontainer()) .requestchannel(inboundchannel()).get(); } |
通過dataendpoint bean 返回jmsinboundgatewayspec,您還可以向si通道或jms目標(biāo)發(fā)送回復(fù)。查看文檔。
示例2:jms消息驅(qū)動的通道適配器
如果您正在尋找替換消息驅(qū)動通道適配器的xml jms配置,那么jmsmessagedrivenchanneladapter是一種適合您的方式:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
@bean public dynamicdestinationresolver dynamicdestinationresolver() { return new dynamicdestinationresolver(); } @bean public activemqconnectionfactory connectionfactory() { return new activemqconnectionfactory(); } @bean public defaultmessagelistenercontainer listenercontainer() { final defaultmessagelistenercontainer defaultmessagelistenercontainer = new defaultmessagelistenercontainer(); defaultmessagelistenercontainer.setdestinationresolver(dynamicdestinationresolver()); defaultmessagelistenercontainer.setconnectionfactory(connectionfactory()); defaultmessagelistenercontainer.setdestinationname( "jms.activemq.test" ); return defaultmessagelistenercontainer; } @bean public messagechannel inboundchannel() { return messagechannels.direct( "inboundchannel" ).get(); } @bean public jmsmessagedrivenchanneladapter dataendpoint() { final channelpublishingjmsmessagelistener channelpublishingjmsmessagelistener = new channelpublishingjmsmessagelistener(); channelpublishingjmsmessagelistener.setexpectreply( false ); final jmsmessagedrivenchanneladapter messagedrivenchanneladapter = new jmsmessagedrivenchanneladapter(listenercontainer(), channelpublishingjmsmessagelistener ); messagedrivenchanneladapter.setoutputchannel(inboundchannel()); return messagedrivenchanneladapter; } |
與前面的示例一樣,入站有效負(fù)載如樣本1中一樣發(fā)送給激活器。
示例3:使用jaxb的jms消息驅(qū)動的通道適配器
在典型的場景中,您希望通過jms接受xml作為文本消息,將其轉(zhuǎn)換為jaxb存根并在服務(wù)激活器中處理它。我將向您展示如何使用si java dsl執(zhí)行此操作,但首先讓我們?yōu)閤ml處理添加兩個依賴項:
1
2
3
4
5
6
7
8
9
|
<dependency> <groupid>org.springframework.integration</groupid> <artifactid>spring-integration-xml</artifactid> </dependency> <dependency> <groupid>org.springframework</groupid> <artifactid>spring-oxm</artifactid> </dependency> |
我們將通過jms接受shiporders ,所以首先xsd命名為shiporder.xsd:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
<?xml version= "1.0" encoding= "utf-8" ?> <xs:schema xmlns:xs= "http://www.w3.org/2001/xmlschema" > <xs:element name= "shiporder" > <xs:complextype> <xs:sequence> <xs:element name= "orderperson" type= "xs:string" /> <xs:element name= "shipto" > <xs:complextype> <xs:sequence> <xs:element name= "name" type= "xs:string" /> <xs:element name= "address" type= "xs:string" /> <xs:element name= "city" type= "xs:string" /> <xs:element name= "country" type= "xs:string" /> </xs:sequence> </xs:complextype> </xs:element> <xs:element name= "item" maxoccurs= "unbounded" > <xs:complextype> <xs:sequence> <xs:element name= "title" type= "xs:string" /> <xs:element name= "note" type= "xs:string" minoccurs= "0" /> <xs:element name= "quantity" type= "xs:positiveinteger" /> <xs:element name= "price" type= "xs:decimal" /> </xs:sequence> </xs:complextype> </xs:element> </xs:sequence> <xs:attribute name= "orderid" type= "xs:string" use= "required" /> </xs:complextype> </xs:element> </xs:schema> |
新增jaxb maven plugin 生成jaxb存根:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
<plugin> <groupid>org.codehaus.mojo</groupid> <artifactid>jaxb2-maven-plugin</artifactid> <version> 2.3 . 1 </version> <executions> <execution> <id>xjc-schema1</id> <goals> <goal>xjc</goal> </goals> <configuration> <!-- use all xsds under the west directory for sources here. --> <sources> <source>src/main/resources/xsds/shiporder.xsd</source> </sources> <!-- package name of the generated sources. --> <packagename>com.example.stubs</packagename> <outputdirectory>src/main/java</outputdirectory> <clearoutputdir> false </clearoutputdir> </configuration> </execution> </executions> </plugin> |
我們已經(jīng)準(zhǔn)備好了存根類和一切,現(xiàn)在使用jaxb magic的java dsl jms消息驅(qū)動適配器:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
/** * sample 3: jms message driven adapter with jaxb */ @bean public jmsmessagedrivenchanneladapter dataendpoint() { final channelpublishingjmsmessagelistener channelpublishingjmsmessagelistener = new channelpublishingjmsmessagelistener(); channelpublishingjmsmessagelistener.setexpectreply( false ); channelpublishingjmsmessagelistener.setmessageconverter( new marshallingmessageconverter(shipordersmarshaller())); final jmsmessagedrivenchanneladapter messagedrivenchanneladapter = new jmsmessagedrivenchanneladapter(listenercontainer(), channelpublishingjmsmessagelistener ); messagedrivenchanneladapter.setoutputchannel(inboundchannel()); return messagedrivenchanneladapter; } @bean public jaxb2marshaller shipordersmarshaller() { jaxb2marshaller marshaller = new jaxb2marshaller(); marshaller.setcontextpath( "com.example.stubs" ); return marshaller; } |
xml配置在java中使用它可以為您提供如此強大的功能和靈活性。要完成此示例,inboundchannel的服務(wù)激活器將如下所示:
1
2
3
4
5
6
7
8
9
|
/** * sample 3 * @param shiporder */ @serviceactivator (inputchannel = "inboundchannel" ) public void processmessage( final shiporder shiporder) { system.out.println(shiporder.getorderid()); system.out.println(shiporder.getorderperson()); } |
要測試流,您可以使用以下xml通過jconsole發(fā)送到j(luò)ms隊列:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
<?xml version= "1.0" encoding= "utf-8" ?> <shiporder orderid= "889923" xmlns:xsi= "http://www.w3.org/2001/xmlschema-instance" xsi:nonamespaceschemalocation= "shiporder.xsd" > <orderperson>john smith</orderperson> <shipto> <name>ola nordmann</name> <address>langgt 23 </address> <city> 4000 stavanger</city> <country>norway</country> </shipto> <item> <title>empire burlesque</title> <note>special edition</note> <quantity> 1 </quantity> <price> 10.90 </price> </item> <item> <title>hide your heart</title> <quantity> 1 </quantity> <price> 9.90 </price> </item> </shiporder> |
示例4:具有jaxb和有效負(fù)載根路由的jms消息驅(qū)動的通道適配器
另一種典型情況是接受xml作為jms文本消息,將其轉(zhuǎn)換為jaxb存根并根據(jù)有效負(fù)載根類型將有效負(fù)載路由到某個服務(wù)激活器。當(dāng)然si java dsl支持所有類型的路由,我將向您展示如何根據(jù)有效載荷類型進行路由。
首先,將以下xsd添加到shiporder.xsd所在的文件夾中,并將其命名為purchaseorder.xsd:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
<xsd:schema xmlns:xsd= "http://www.w3.org/2001/xmlschema" xmlns:tns= "http://tempuri.org/purchaseorderschema.xsd" targetnamespace= "http://tempuri.org/purchaseorderschema.xsd" elementformdefault= "qualified" > <xsd:element name= "purchaseorder" > <xsd:complextype> <xsd:sequence> <xsd:element name= "shipto" type= "tns:usaddress" maxoccurs= "2" /> <xsd:element name= "billto" type= "tns:usaddress" /> </xsd:sequence> <xsd:attribute name= "orderdate" type= "xsd:date" /> </xsd:complextype> </xsd:element> <xsd:complextype name= "usaddress" > <xsd:sequence> <xsd:element name= "name" type= "xsd:string" /> <xsd:element name= "street" type= "xsd:string" /> <xsd:element name= "city" type= "xsd:string" /> <xsd:element name= "state" type= "xsd:string" /> <xsd:element name= "zip" type= "xsd:integer" /> </xsd:sequence> <xsd:attribute name= "country" type= "xsd:nmtoken" fixed= "us" /> </xsd:complextype> </xsd:schema> |
然后添加到j(luò)axb maven插件配置:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
<plugin> <groupid>org.codehaus.mojo</groupid> <artifactid>jaxb2-maven-plugin</artifactid> <version> 2.3 . 1 </version> <executions> <execution> <id>xjc-schema1</id> <goals> <goal>xjc</goal> </goals> <configuration> <!-- use all xsds under the west directory for sources here. --> <sources> <source>src/main/resources/xsds/shiporder.xsd</source> <source>src/main/resources/xsds/purchaseorder.xsd</source> </sources> <!-- package name of the generated sources. --> <packagename>com.example.stubs</packagename> <outputdirectory>src/main/java</outputdirectory> <clearoutputdir> false </clearoutputdir> </configuration> </execution> </executions> </plugin> |
運行mvn clean install以生成新xsd的jaxb存根。現(xiàn)在承諾有效負(fù)載根映射:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
@bean public jaxb2marshaller ordersmarshaller() { jaxb2marshaller marshaller = new jaxb2marshaller(); marshaller.setcontextpath( "com.example.stubs" ); return marshaller; } /** * sample 4: jms message driven adapter with jaxb and payload routing. * @return */ @bean public jmsmessagedrivenchanneladapter dataendpoint() { final channelpublishingjmsmessagelistener channelpublishingjmsmessagelistener = new channelpublishingjmsmessagelistener(); channelpublishingjmsmessagelistener.setmessageconverter( new marshallingmessageconverter(ordersmarshaller())); final jmsmessagedrivenchanneladapter messagedrivenchanneladapter = new jmsmessagedrivenchanneladapter(listenercontainer(), channelpublishingjmsmessagelistener ); messagedrivenchanneladapter.setoutputchannel(inboundchannel()); return messagedrivenchanneladapter; } @bean public integrationflow payloadrootmapping() { return integrationflows.from(inboundchannel()).<object, class <?>>route(object::getclass, m->m .subflowmapping(shiporder. class , sf->sf.handle((messagehandler) message -> { final shiporder shiporder = (shiporder) message.getpayload(); system.out.println(shiporder.getorderperson()); system.out.println(shiporder.getorderid()); })) .subflowmapping(purchaseorder. class , sf->sf.handle((messagehandler) message -> { final purchaseorder purchaseordertype = (purchaseorder) message.getpayload(); system.out.println(purchaseordertype.getbillto().getname()); })) ).get(); } |
注意payloadrootmapping bean,讓我們解釋一下重要的部分:
- <object, class<?>> route - 表示來自inboundchannel的輸入將是object,并且將根據(jù)class <?>執(zhí)行路由
- subflowmapping(shiporder.class.. - shipoders的處理。
- subflowmapping(purchaseorder.class ... - 處理purchaseorders。
要測試shiporder有效負(fù)載,請使用示例3中的xml,以測試purchaseorder有效負(fù)載,使用以下xml:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
<?xml version= "1.0" encoding= "utf-8" ?> <purchaseorder orderdate= "1900-01-01" xmlns= "http://tempuri.org/purchaseorderschema.xsd" > <shipto country= "us" > <name>name1</name> <street>street1</street> <city>city1</city> <state>state1</state> <zip> 1 </zip> </shipto> <shipto country= "us" > <name>name2</name> <street>street2</street> <city>city2</city> <state>state2</state> <zip>- 79228162514264337593543950335 </zip> </shipto> <billto country= "us" > <name>name1</name> <street>street1</street> <city>city1</city> <state>state1</state> <zip> 1 </zip> </billto> </purchaseorder> |
應(yīng)根據(jù)subflow 子流map路由兩個有效載荷。
示例5:integrationflowadapter
除了企業(yè)集成模式的其他實現(xiàn)(check them out)),我需要提到integrationflowadapter。通過擴展此類并實現(xiàn)buildflow方法,如:
1
2
3
4
5
6
7
8
9
10
11
12
|
[url=https: //bitbucket.org/component/]@component[/url] public class myflowadapter extends integrationflowadapter { @autowired private connectionfactory rabbitconnectionfactory; @override protected integrationflowdefinition<?> buildflow() { return from(amqp.inboundadapter( this .rabbitconnectionfactory, "myqueue" )) .<string, string>transform(string::tolowercase) .channel(c -> c.queue( "myflowadapteroutput" )); } |
你可以將bean的重復(fù)聲明包裝成一個組件并給它們所需的流量。然后可以配置這樣的組件并將其作為一個類實例提供給調(diào)用代碼!
因此,讓我們舉例說明這個repo中的示例3更短一些,并為所有jmsendpoints定義基類,并在其中定義重復(fù)bean:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
public class jmsendpoint extends integrationflowadapter { private string queuename; private string channelname; private string contextpath; /** * @param queuename * @param channelname * @param contextpath */ public jmsendpoint(string queuename, string channelname, string contextpath) { this .queuename = queuename; this .channelname = channelname; this .contextpath = contextpath; } @override protected integrationflowdefinition<?> buildflow() { return from(jms.messagedrivenchanneladapter(listenercontainer()) .jmsmessageconverter( new marshallingmessageconverter(shipordersmarshaller())) ).channel(channelname); } @bean public jaxb2marshaller shipordersmarshaller() { jaxb2marshaller marshaller = new jaxb2marshaller(); marshaller.setcontextpath(contextpath); return marshaller; } @bean public dynamicdestinationresolver dynamicdestinationresolver() { return new dynamicdestinationresolver(); } @bean public activemqconnectionfactory connectionfactory() { return new activemqconnectionfactory(); } @bean public defaultmessagelistenercontainer listenercontainer() { final defaultmessagelistenercontainer defaultmessagelistenercontainer = new defaultmessagelistenercontainer(); defaultmessagelistenercontainer.setdestinationresolver(dynamicdestinationresolver()); defaultmessagelistenercontainer.setconnectionfactory(connectionfactory()); defaultmessagelistenercontainer.setdestinationname(queuename); return defaultmessagelistenercontainer; } @bean public messagechannel inboundchannel() { return messagechannels.direct(channelname).get(); } } |
現(xiàn)在聲明特定隊列的jms端點很容易:
1
2
3
4
|
@bean public jmsendpoint jmsendpoint() { return new jmsendpoint( "jms.activemq.test" , "inboundchannel" , "com.example.stubs" ); } |
inboundchannel的服務(wù)激活器:
1
2
3
4
5
6
7
8
9
|
/** * sample 3, 5 * @param shiporder */ @serviceactivator (inputchannel = "inboundchannel" ) public void processmessage( final shiporder shiporder) { system.out.println(shiporder.getorderid()); system.out.println(shiporder.getorderperson()); } |
您不應(yīng)該錯過在項目中使用integrationflowadapter。我喜歡它的概念。
我最近在embedit的新的基于spring boot的項目中開始使用spring integration java dsl 。即使有一些配置,我發(fā)現(xiàn)它非常有用。
- 它很容易調(diào)試。不添加像wiretap這樣的配置。
- 閱讀起來要容易得多。是的,即使是lambdas!
- 它很強大。在java配置中,您現(xiàn)在有很多選擇。
源碼地址: https://bitbucket.org/tomask79/spring-integration-java-dsl
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持服務(wù)器之家。
原文鏈接:https://www.jdon.com/51378