spring整合thrift

jopen 11年前發布 | 67K 次閱讀 Spring JEE框架

整合thrift主要分客戶端和服務端兩部分。用spring主要是為了調用和發布服務方便。代碼時公司的一個高手寫的,這里記錄下,以便學習。

先看客戶端代碼。thrift的服務端暴露服務,客戶端則是調用。以平臺共通服務為例:

<!-- 平臺共通服務 -->
   <bean id="standardService"
        class="framework.support.thrift.ThriftHttpProxyFactoryBean">
        <property name="serviceUrl" value="${cs.url}/standardService" />
        <property name="serviceInterface"
            value="api.standard.StandardService$Iface" />
    </bean>

看配置就知道使用了spring的FactoryBean模式,在ThriftHttpProxyFactoryBean里面使用了httpClient獲取thrift暴露的接口。

private HttpClient httpClient; // FactoryBean初始化httpClient 
public ThriftHttpProxyFactoryBean() {
        SchemeRegistry schemeRegistry = new SchemeRegistry();
        schemeRegistry.register(new Scheme("http", 80, PlainSocketFactory.getSocketFactory()));
        schemeRegistry.register(new Scheme("https", 443, SSLSocketFactory.getSocketFactory()));
        PoolingClientConnectionManager connectionManager = new PoolingClientConnectionManager(schemeRegistry);
        connectionManager.setMaxTotal(DEFAULT_MAX_TOTAL_CONNECTIONS);
        connectionManager.setDefaultMaxPerRoute(DEFAULT_MAX_CONNECTIONS_PER_ROUTE);
        this.httpClient = new DefaultHttpClient(connectionManager);
        setReadTimeout(DEFAULT_READ_TIMEOUT_MILLISECONDS);
    }

然后factoryBean繼承的方法:

public T getObject() {
        return (T) getServiceProxy(); // 服務對象不在本地,使用代理
    }

    public Class<?> getObjectType() {
        return getServiceInterface(); // 服務對象類型,就是配置里面的serviceInterface
    }

    public boolean isSingleton() {
        return true;
    }

ThriftHttpProxyFactoryBean有個父類ThriftClientInterceptor,它繼承了UrlBasedRemoteAccessor并實現了MethodInterceptor。先看父類ThriftClientInterceptor的afterPropertiesSet:

public void afterPropertiesSet() {
        super.afterPropertiesSet();
        if (getServiceInterface() == null) {
            throw new IllegalArgumentException("property serviceInterface is required.");
        }
        .....
        ProxyFactory pf = new ProxyFactory(getServiceInterface(), this);//用當前對象包裝接口
        this.serviceProxy = pf.getProxy(getBeanClassLoader());
    }

ThriftHttpProxyFactoryBean實現返回的就是上面的serviceProxy,所以我們接下來看ThriftClientInterceptor的invoke方法。這段是精華。

public Object invoke(MethodInvocation invocation) throws Throwable {
        Method method = invocation.getMethod();
        Object[] args = invocation.getArguments();
        if (serviceRegistry != null) {
            ServiceReference sr = serviceRegistry.getService(getServiceInterface());
            if (sr != null) {
                try {
                    return method.invoke(sr.getService(), args);
                } catch (InvocationTargetException e) {
                    throw e.getTargetException();
                }
            }
        }
        String name = method.getName();
        if (args.length == 0) {
            if ("toString".equals(name)) {
                return "Thrift proxy for service URL [" + getServiceUrl() + "]";
            } else if ("hashCode".equals(name)) {
                return getServiceUrl().hashCode();
            }
        } else if (args.length == 1 && "equals".equals(name)) {
            return getServiceUrl().equals(args[0]);
        }
        Object client = clientConstructor.newInstance(protocolFactory.getProtocol(getTransport()));
        Assert.notNull(client, "the Thrift RPC client was not correctly created. Aborting.");
        ClassLoader originalClassLoader = overrideThreadContextClassLoader();
        try {
            return method.invoke(client, args);
        } catch (InvocationTargetException e) {
            Throwable target = e.getTargetException();
            if (target instanceof TApplicationException && ((TApplicationException) target).getType() == TApplicationException.MISSING_RESULT) {
                return null;
            }
            throw convertException(target);
        } catch (Throwable ex) {
            throw convertException(ex);
        } finally {
            resetThreadContextClassLoader(originalClassLoader);
        }
    }
serviceRegistry這個其實類型于連接池,注冊服務的地方在服務端,這種模式必須客戶端服務端代碼在一個tomcat里面,避免網絡請求。不過我覺得如果分開部署其實也可以使用一個連接池緩存對象,這樣就只請求一次即可不過服務端代碼有改動客戶端就得想辦法更新連接池了,稍微復雜點。重點看通過網絡獲取服務對象的內容: protocolFactory.getProtocol(getTransport())

protocolFactory 是thrift的類型 TProtocolFactory,而getTransport()內容如下:

protected TTransport getTransport() {
        return new THttpClient(getServiceUrl(), getHttpClient());
    }

這里都使用了thrift自帶的一些類。

————————————————————分割線————————————————————

然后是服務端代碼, 先看服務端暴露服務的配置:

<!-- Thrift export -->
    <bean name="/paymentService" class="framework.support.thrift.ThriftHttpServiceExporter">
        <property name="service">
            <bean class="payment.service.impl.ThriftPaymentServiceImpl"/>
        </property>
        <property name="serviceInterface" value="api.payment.PaymentService$Iface"/>
    </bean>
很明顯使用的spring的HttpRequestHandler框架。ThriftHttpServiceExporter實現了 HttpRequestHandler接口并且有個父類ThriftExporter,而ThriftExporter繼承了spring的RemoteExporter。

我們先看ThriftHttpServiceExporter的處理請求的方法:

public void handleRequest(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
        if (!"POST".equals(request.getMethod()) && metadataXml != null) {
            response.setContentType("text/xml; charset=UTF-8");
            FileCopyUtils.copy(metadataXml.openStream(), response.getOutputStream());
            return;
        }
        InputStream in = request.getInputStream();
        OutputStream out = response.getOutputStream();
        try {
            ThriftContextHolder.init();
            response.setContentType("application/x-thrift");
            TTransport transport = new TIOStreamTransport(in, out);
            TProtocol protocol = request.getParameter("_json") != null ? jsonProtocolFactory.getProtocol(transport) : getProtocolFactory().getProtocol(transport);
            doInvoke(protocol, protocol);
        } catch (Throwable e) {
            response.setContentType("text/plain; charset=UTF-8");
            response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
            e.printStackTrace(new PrintWriter(out, true));
            if (LOG.isErrorEnabled()) {
                LOG.error("Thrift server direct error", e);
            }
        } finally {
            ThriftContextHolder.reset();
        }
    }
就是拿到請求調用thrift方法處理。doInvoke內容:
protected void doInvoke(TProtocol in, TProtocol out) throws Throwable {
        processor.process(in, out);
    }

在ThriftExport類里面的afterPropertiesSet里面:

public void afterPropertiesSet() throws Exception {
        if (serviceRegistry != null) {
            List<MethodInterceptor> il = new ArrayList<MethodInterceptor>();
            il.add(new ClassLoaderInterceptor(getBeanClassLoader()));
            if (threadResourceManager != null) {
                il.add(new ThreadResourceSupportInterceptor(threadResourceManager));
            }
            serviceRegistry.exportService(getServiceInterface(), getProxyForService0(il));
        }
        super.setInterceptors(ArrayUtils.add(interceptors, new AnyExceptionConvertInterceptor()));
        this.processor = ThriftHelper.buildProcessor(getServiceInterface(), getProxyForService());
    }

可以看到serviceRegistry的使用。

spring整合thrift服務主要內容就這些了。主要還是使用了spring的Remote的框架。

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!