JBoss DataGrid的集群部署與訪問
集群部署
JDG的緩存模式包括本地(Local)模式和集群(Clustered)模式。本項目采用多節點的Clustered模式部署,數據在多個節點的子集間進行復制,而不是同步復制到所有的節點。使用子集復制可以提升容錯的效率但對可伸縮性不會造成太大影響。在使用Clustered模式部署之前,應該配置JGroup。
1. 使用UDP方式廣播。
l 適用于大的集群(超過100節點);
l 適用于Invalidation和Replication模式;
l 提高socket通信的效率。
2. 使用TCP方式廣播。
更適合于distribution模式的小規模(少于100節點)的集群,這時因為TCP協議在點對點通信中更加高效。
Clustered模式又分為Invalidation、Replication和Distribution模式。
Distribution模式
JDG的Distribution模式可以存儲緩存數據在集群的子集節點,而不是存儲數據到每一個節點中。通常存儲到多于1個節點來提供數據冗余和容錯。
Distribution模式使用一致性Hash算法從集群中選擇存儲數據的節點,一致性Hash算法配置為一個緩存數據存儲到多個副本。副本數的設置需要平衡性能和容錯,過多的副本會影響性能而過少的副本會造成節點失效時丟失數據。
在Distribution模式中,一個put操作會執行num_copies次遠程調用,同樣在任意節點的get操作會執行至少一次的遠程調用。實際上,get操作甚至也會執行num_copies次遠程調用,但它們是并行的,只要有一個返回,結果就會被返回給調用者。
讀一致性
由于get操作是并行從多個節點取數并使用第一個返回的結果,在使用異步方式時可能會導致數據的不一致。注意這種情況只會出現在異步調用方式,如果是同步方式,則不會出現讀不一致。
配置緩存容器和緩存
JDG的Cache在使用時,必須在standalone/configuration/clustered.xml配置文件中設置要使用的Cache配置。如果客戶端訪問了未配置命名的Cache,將視為非法操作。
在<subsystemxmlns="urn:infinispan:server:core:5.2"default-cache-container="clustered"> …… </subsystem>中配置cache-container和distributed-cache,本項目使用分布式緩存模式。
節點<cache-containername="clustered" default-cache="default">中增加對Cache的配置節點,如下面的名稱為default的cache(默認緩存配置):
<cache-container name="clustered" default-cache="default"> <transport executor="infinispan-transport" lock-timeout="60000"/> <distributed-cache name="default" mode="SYNC" segments="20" owners="2" remote-timeout="30000" start="EAGER"> <locking isolation="READ_COMMITTED" acquire-timeout="30000" concurrency-level="1000" striping="false"/> <transaction mode="NONE"/> </distributed-cache> </cache-container> |
以上的配置項目并非所有的都是必須,各節點說明見下:
#定義緩存默認的過期策略
#lifespan: 緩存條目最大的生存時間,單位毫秒,-1表示從不過期。
# max-idle: 緩存條目的最大空閑時間,單位毫秒,-1表示從不過期。如果空閑時間超過該值,條目將被視為過期。
#interval: 從緩存中清除過期條目的執行間隔時間(毫秒)。如果要完全禁用定期回收任務,設置為-1。
<expirationlifespan=”2000” max-idle=”1000”/>
#定義緩存默認的回收策略
#eviction: 指定回收策略。可用的回收策略包括UNORDERED,FIFO,LIRS和NONE(禁用回收策略)。
#max-entries: 指定Cache實例中的最大條目數量,-1表示不限制。實際取值為>=選定值的2的冪的最小值
<evictionstrategy="LRU" max-entries="10000"/>
#定義緩存鎖
#isolation: 定義隔離級別。可用的隔離級別包括NONE,READ_UNCOMMITED,READ_COMMITED,REPEATABLE_READ,SERIALIZABLE,默認REPEATABLE_READ
#striping: 鎖條帶化。如果為true,使用共享鎖池來維護所有需要鎖定的條目。否則,為每個條目創建一個鎖。鎖條帶化有助于控制內存占用,但可能會降低系統的并發能力。
#acquire-timeout: 嘗試獲取一個特定鎖的最大超時時間。
#concurrency-level: 鎖容器的并發級別。根據和infinispan并發交互的線程數量來調整這個值。
#concurrent-updates: 僅用于非事務緩存。如果設置為true(默認值),則緩存在并發更新時保持數據的一致性。對于集群模式將帶來額外的RPC成本,如果你的應用不會并發寫數據,禁用該標志以提升性能。
<lockingisolation="READ_COMMITTED" acquire-timeout="30000"concurrency-level="1000" striping="false"/>
#定義緩存事務
#雖然可以定義服務器緩存支持事務,但目前沒有可用的協議來支持事務能力。
<transactionmode="NONE"/>
啟動JDG集群
可以把JDG部署在多個節點構成一個集群,目前我們的項目使用的是數據緩存服務使用默認3個節點的分布式集群。在啟動時各個節點的nodeName必須予以區分,例如分別命名為nodeA、nodeB和nodeC。
JDG是運行在JBoss容器中的,在啟動時即啟動了JBoss,如果服務器已經運行了JBOSS,請在啟動時設置端口offset,以避免沖突。
同時為了避免和JBoss服務器的JBOSS_HOME沖突,定義環境變量JDG_HOME(例如 exportJDG_HOME=/usr/local/jboss-datagrid-server-6.1.0),并修改bin /clustered.sh中的所有JBOSS_HOME替換為JDG_HOME。
以nodeA節點為例(JDG對外訪問的端口為11222,下面offset后為11322),服務器ip是192.168.1.100,執行啟動命令為:
./clustered.sh-Djboss.socket.binding.port-offset=100 -Djboss.bind.address=192.168.1.100 -Djboss.node.name=nodeA
客戶端訪問
遠程客戶端可以使用REST, memcached或HotRod協議,我們這里使用HotRod協議,它是一種二進制協議,性能較好,同時它提供了自動的負載均衡和failover。
配置文件
客戶端訪問緩存服務端的配置文件默認命名為hotrod-client.properties。定義解釋見下。
############JDG 服務器配置############
##請求均衡策略,default =org.infinispan.client.hotrod.impl.transport.tcp.RoundRobinBalancingStrategy
#infinispan.client.hotrod.request_balancing_strategy=
##服務器列表,default = 127.0.0.1:11222
infinispan.client.hotrod.server_list=192.168.1.100:11322;192.168.1.101:11322;192.168.1.102:11322
##是否強迫返回值, default = false
#infinispan.client.hotrod.force_return_values=
##TCP_NO_DELAY, default = true
#infinispan.client.hotrod.tcp_no_delay=
##啟動時是否發送PING請求來獲取CLUSTER拓撲, default = true
#infinispan.client.hotrod.ping_on_startup=
##控制使用的傳輸機制,目前只支持TCP, default =org.infinispan.client.hotrod.impl.transport.tcp.TcpTransportFactory
#infinispan.client.hotrod.transport_factory=
##序列化使用的Marshaller, default =org.infinispan.marshall.jboss.GenericJBossMarshaller
##如果要降低傳輸負載,可以配置為ApacheAvroMarshaller
#infinispan.client.hotrod.marshaller=
##指定自定義的AsyncExecutorFactory, default =org.infinispan.client.hotrod.impl.async.DefaultAsyncExecutorFactory
#infinispan.client.hotrod.async_executor_factory=
##指定并發線程池大小, default = 10
#infinispan.client.hotrod.default_executor_factory.pool_size=
##指定并發隊列大小, default = 100000
#infinispan.client.hotrod.default_executor_factory.queue_size=
##Hash函數實現的版本及一致性Hash算法,和HotRod的服務器版本相關
#infinispan.client.hotrod.hash_function_impl.1=
##序列化和反序列化鍵的緩存允許字節數,目的是避免數組大小調整, default =64
#infinispan.client.hotrod.key_size_estimate=
##序列化和反序列化值的緩存允許字節數,目的是避免數組大小調整, default =512
#infinispan.client.hotrod.value_size_estimate=
##socket讀超時, default = 60000 (60 seconds)
infinispan.client.hotrod.socket_timeout=50000
##socket連接超時, default = 60000 (60 seconds)
infinispan.client.hotrod.connect_timeout=10000
##指定客戶端使用的協議版本, default = 1.1,其他值還有1.0
#infinispan.client.hotrod.protocol_version=
##指定錯誤時的重試次數, default = 10
#infinispan.client.hotrod.max_retries=
############連接池配置############
##指定每個服務器的最大連接數,負值表示沒有限制,默認-1
maxActive=100
##指定服務器組內允許的全局持久連接的數量,負值表示沒有限制,默認-1
maxTotal=100
##指定每個服務器空閑持久連接的最大數,負值表示沒有限制,默認-1
maxIdle=20
##指定當連接池耗盡時,服務器如何響應:
##0-拋出異常給調用者
##1-阻塞調用者,直到有空閑的連接
##2-創建一個新的連接(不受maxActive的限制)
##默認值是1
#whenExhaustedAction=1
##檢查空閑連接的Eviction線程每次運行間隔的時間,默認是2分鐘
#timeBetweenEvictionRunsMillis=120000
##在空閑池中的連接存在多長時間需要被銷毀,負值表示沒有空閑連接被銷毀,默認值為30分鐘
#minEvictableIdleTimeMillis=1800000
##指定在Eviction線程執行時,空閑的連接是否通過發送一個TCP數據包到服務器來驗證,
##即無法驗證的連接將從池中被清除
##默認值為true
#testWhileIdle=true
##指定每個服務器最小的可用連接的空閑線程數。默認值為1
#minIdle=1客戶端訪問代碼
import java.net.URL; import java.util.Map;
import org.infinispan.client.hotrod.RemoteCache; import org.infinispan.client.hotrod.RemoteCacheManager; import org.infinispan.client.hotrod.ServerStatistics;
public class Quickstart {
public static void main(String[] args) {
URL resource = Thread.currentThread().getContextClassLoader() .getResource("hotrod-client.properties"); RemoteCacheManager cacheContainer = new RemoteCacheManager(resource, true);
//獲得一個遠程的Cache RemoteCache cache = cacheContainer.getCache("myCache");
//put數據到緩存中,然后確認是否存在 cache.put("name", "paul"); if(cache.get("name").equals("paul")){ System.out.println("Cache Hit!"); } else { System.out.println("Cache Miss!"); }
//刪除緩存數據 cache.remove("name");
cacheContainer.stop(); } } |