/
ReferenceConfig.java
514 lines (462 loc) · 21.7 KB
/
ReferenceConfig.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.config;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.Version;
import org.apache.dubbo.common.bytecode.Wrapper;
import org.apache.dubbo.common.constants.RegistryConstants;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.ConfigUtils;
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.common.utils.UrlUtils;
import org.apache.dubbo.config.annotation.Reference;
import org.apache.dubbo.config.bootstrap.DubboBootstrap;
import org.apache.dubbo.config.event.ReferenceConfigDestroyedEvent;
import org.apache.dubbo.config.event.ReferenceConfigInitializedEvent;
import org.apache.dubbo.config.support.Parameter;
import org.apache.dubbo.config.utils.ConfigValidationUtils;
import org.apache.dubbo.event.Event;
import org.apache.dubbo.event.EventDispatcher;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Protocol;
import org.apache.dubbo.rpc.ProxyFactory;
import org.apache.dubbo.rpc.cluster.Cluster;
import org.apache.dubbo.rpc.cluster.directory.StaticDirectory;
import org.apache.dubbo.rpc.cluster.support.ClusterUtils;
import org.apache.dubbo.rpc.cluster.support.registry.ZoneAwareCluster;
import org.apache.dubbo.rpc.model.ApplicationModel;
import org.apache.dubbo.rpc.model.AsyncMethodInfo;
import org.apache.dubbo.rpc.model.ConsumerModel;
import org.apache.dubbo.rpc.model.ServiceDescriptor;
import org.apache.dubbo.rpc.model.ServiceRepository;
import org.apache.dubbo.rpc.protocol.injvm.InjvmProtocol;
import org.apache.dubbo.rpc.service.GenericService;
import org.apache.dubbo.rpc.support.ProtocolUtils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.apache.dubbo.common.constants.CommonConstants.ANY_VALUE;
import static org.apache.dubbo.common.constants.CommonConstants.CLUSTER_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.COMMA_SEPARATOR;
import static org.apache.dubbo.common.constants.CommonConstants.COMMA_SEPARATOR_CHAR;
import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER_SIDE;
import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.LOCALHOST_VALUE;
import static org.apache.dubbo.common.constants.CommonConstants.METHODS_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.MONITOR_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.PROXY_CLASS_REF;
import static org.apache.dubbo.common.constants.CommonConstants.REVISION_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.SEMICOLON_SPLIT_PATTERN;
import static org.apache.dubbo.common.constants.CommonConstants.SIDE_KEY;
import static org.apache.dubbo.common.constants.RegistryConstants.SUBSCRIBED_SERVICE_NAMES_KEY;
import static org.apache.dubbo.common.utils.NetUtils.isInvalidLocalHost;
import static org.apache.dubbo.common.utils.StringUtils.splitToSet;
import static org.apache.dubbo.config.Constants.DUBBO_IP_TO_REGISTRY;
import static org.apache.dubbo.registry.Constants.REGISTER_IP_KEY;
import static org.apache.dubbo.rpc.Constants.LOCAL_PROTOCOL;
import static org.apache.dubbo.rpc.cluster.Constants.REFER_KEY;
/**
* Please avoid using this class for any new application,
* use {@link ReferenceConfigBase} instead.
*/
public class ReferenceConfig<T> extends ReferenceConfigBase<T> {
public static final Logger logger = LoggerFactory.getLogger(ReferenceConfig.class);
/**
* The {@link Protocol} implementation with adaptive functionality,it will be different in different scenarios.
* A particular {@link Protocol} implementation is determined by the protocol attribute in the {@link URL}.
* For example:
*
* <li>when the url is registry://224.5.6.7:1234/org.apache.dubbo.registry.RegistryService?application=dubbo-sample,
* then the protocol is <b>RegistryProtocol</b></li>
*
* <li>when the url is dubbo://224.5.6.7:1234/org.apache.dubbo.config.api.DemoService?application=dubbo-sample, then
* the protocol is <b>DubboProtocol</b></li>
* <p>
* Actually,when the {@link ExtensionLoader} init the {@link Protocol} instants,it will automatically wraps two
* layers, and eventually will get a <b>ProtocolFilterWrapper</b> or <b>ProtocolListenerWrapper</b>
*/
private static final Protocol REF_PROTOCOL = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
/**
* The {@link Cluster}'s implementation with adaptive functionality, and actually it will get a {@link Cluster}'s
* specific implementation who is wrapped with <b>MockClusterInvoker</b>
*/
private static final Cluster CLUSTER = ExtensionLoader.getExtensionLoader(Cluster.class).getAdaptiveExtension();
/**
* A {@link ProxyFactory} implementation that will generate a reference service's proxy,the JavassistProxyFactory is
* its default implementation
*/
private static final ProxyFactory PROXY_FACTORY = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
/**
* The interface proxy reference
*/
private transient volatile T ref;
/**
* The invoker of the reference service
*/
private transient volatile Invoker<?> invoker;
/**
* The flag whether the ReferenceConfig has been initialized
*/
private transient volatile boolean initialized;
/**
* whether this ReferenceConfig has been destroyed
*/
private transient volatile boolean destroyed;
private final ServiceRepository repository;
private DubboBootstrap bootstrap;
/**
* The service names that the Dubbo interface subscribed.
*
* @since 2.7.8
*/
private String services;
public ReferenceConfig() {
super();
this.repository = ApplicationModel.getServiceRepository();
}
public ReferenceConfig(Reference reference) {
super(reference);
this.repository = ApplicationModel.getServiceRepository();
}
/**
* Get a string presenting the service names that the Dubbo interface subscribed.
* If it is a multiple-values, the content will be a comma-delimited String.
*
* @return non-null
* @see RegistryConstants#SUBSCRIBED_SERVICE_NAMES_KEY
* @since 2.7.8
*/
@Parameter(key = SUBSCRIBED_SERVICE_NAMES_KEY)
public String getServices() {
return services;
}
/**
* It's an alias method for {@link #getServices()}, but the more convenient.
*
* @return the String {@link List} presenting the Dubbo interface subscribed
* @since 2.7.8
*/
@Parameter(excluded = true)
public Set<String> getSubscribedServices() {
return splitToSet(getServices(), COMMA_SEPARATOR_CHAR);
}
/**
* Set the service names that the Dubbo interface subscribed.
*
* @param services If it is a multiple-values, the content will be a comma-delimited String.
* @since 2.7.8
*/
public void setServices(String services) {
this.services = services;
}
public synchronized T get() {
if (destroyed) {
throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
}
if (ref == null) {
init();
}
return ref;
}
public synchronized void destroy() {
if (ref == null) {
return;
}
if (destroyed) {
return;
}
destroyed = true;
try {
invoker.destroy();
} catch (Throwable t) {
logger.warn("Unexpected error occured when destroy invoker of ReferenceConfig(" + url + ").", t);
}
invoker = null;
ref = null;
// dispatch a ReferenceConfigDestroyedEvent since 2.7.4
dispatch(new ReferenceConfigDestroyedEvent(this));
}
public synchronized void init() {
if (initialized) {
return;
}
if (bootstrap == null) {
bootstrap = DubboBootstrap.getInstance();
bootstrap.init();
}
checkAndUpdateSubConfigs();
checkStubAndLocal(interfaceClass);
ConfigValidationUtils.checkMock(interfaceClass, this);
Map<String, String> map = new HashMap<String, String>();
map.put(SIDE_KEY, CONSUMER_SIDE);
ReferenceConfigBase.appendRuntimeParameters(map);
if (!ProtocolUtils.isGeneric(generic)) {
String revision = Version.getVersion(interfaceClass, version);
if (revision != null && revision.length() > 0) {
map.put(REVISION_KEY, revision);
}
String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
if (methods.length == 0) {
logger.warn("No method found in service interface " + interfaceClass.getName());
map.put(METHODS_KEY, ANY_VALUE);
} else {
map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), COMMA_SEPARATOR));
}
}
map.put(INTERFACE_KEY, interfaceName);
AbstractConfig.appendParameters(map, getMetrics());
AbstractConfig.appendParameters(map, getApplication());
AbstractConfig.appendParameters(map, getModule());
// remove 'default.' prefix for configs from ConsumerConfig
// appendParameters(map, consumer, Constants.DEFAULT_KEY);
AbstractConfig.appendParameters(map, consumer);
AbstractConfig.appendParameters(map, this);
Map<String, AsyncMethodInfo> attributes = null;
if (CollectionUtils.isNotEmpty(getMethods())) {
attributes = new HashMap<>();
for (MethodConfig methodConfig : getMethods()) {
AbstractConfig.appendParameters(map, methodConfig, methodConfig.getName());
String retryKey = methodConfig.getName() + ".retry";
if (map.containsKey(retryKey)) {
String retryValue = map.remove(retryKey);
if ("false".equals(retryValue)) {
map.put(methodConfig.getName() + ".retries", "0");
}
}
AsyncMethodInfo asyncMethodInfo = AbstractConfig.convertMethodConfig2AsyncInfo(methodConfig);
if (asyncMethodInfo != null) {
// consumerModel.getMethodModel(methodConfig.getName()).addAttribute(ASYNC_KEY, asyncMethodInfo);
attributes.put(methodConfig.getName(), asyncMethodInfo);
}
}
}
String hostToRegistry = ConfigUtils.getSystemProperty(DUBBO_IP_TO_REGISTRY);
if (StringUtils.isEmpty(hostToRegistry)) {
hostToRegistry = NetUtils.getLocalHost();
} else if (isInvalidLocalHost(hostToRegistry)) {
throw new IllegalArgumentException("Specified invalid registry ip from property:" + DUBBO_IP_TO_REGISTRY + ", value:" + hostToRegistry);
}
map.put(REGISTER_IP_KEY, hostToRegistry);
serviceMetadata.getAttachments().putAll(map);
ref = createProxy(map);
serviceMetadata.setTarget(ref);
serviceMetadata.addAttribute(PROXY_CLASS_REF, ref);
ConsumerModel consumerModel = repository.lookupReferredService(serviceMetadata.getServiceKey());
consumerModel.setProxyObject(ref);
consumerModel.init(attributes);
initialized = true;
checkInvokerAvailable();
// dispatch a ReferenceConfigInitializedEvent since 2.7.4
dispatch(new ReferenceConfigInitializedEvent(this, invoker));
}
@SuppressWarnings({"unchecked", "rawtypes", "deprecation"})
private T createProxy(Map<String, String> map) {
if (shouldJvmRefer(map)) {
URL url = new URL(LOCAL_PROTOCOL, LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);
invoker = REF_PROTOCOL.refer(interfaceClass, url);
if (logger.isInfoEnabled()) {
logger.info("Using injvm service " + interfaceClass.getName());
}
} else {
urls.clear();
if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.
String[] us = SEMICOLON_SPLIT_PATTERN.split(url);
if (us != null && us.length > 0) {
for (String u : us) {
URL url = URL.valueOf(u);
if (StringUtils.isEmpty(url.getPath())) {
url = url.setPath(interfaceName);
}
if (UrlUtils.isRegistry(url)) {
urls.add(url.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
} else {
urls.add(ClusterUtils.mergeUrl(url, map));
}
}
}
} else { // assemble URL from register center's configuration
// if protocols not injvm checkRegistry
if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())) {
checkRegistry();
List<URL> us = ConfigValidationUtils.loadRegistries(this, false);
if (CollectionUtils.isNotEmpty(us)) {
for (URL u : us) {
URL monitorUrl = ConfigValidationUtils.loadMonitor(this, u);
if (monitorUrl != null) {
map.put(MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
}
urls.add(u.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
}
}
if (urls.isEmpty()) {
throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
}
}
}
if (urls.size() == 1) {
invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
} else {
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
for (URL url : urls) {
invokers.add(REF_PROTOCOL.refer(interfaceClass, url));
if (UrlUtils.isRegistry(url)) {
registryURL = url; // use last registry url
}
}
if (registryURL != null) { // registry url is available
// for multi-subscription scenario, use 'zone-aware' policy by default
String cluster = registryURL.getParameter(CLUSTER_KEY, ZoneAwareCluster.NAME);
// The invoker wrap sequence would be: ZoneAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, routing happens here) -> Invoker
invoker = Cluster.getCluster(cluster, false).join(new StaticDirectory(registryURL, invokers));
} else { // not a registry url, must be direct invoke.
String cluster = CollectionUtils.isNotEmpty(invokers)
? (invokers.get(0).getUrl() != null ? invokers.get(0).getUrl().getParameter(CLUSTER_KEY, ZoneAwareCluster.NAME) : Cluster.DEFAULT)
: Cluster.DEFAULT;
invoker = Cluster.getCluster(cluster).join(new StaticDirectory(invokers));
}
}
}
if (logger.isInfoEnabled()) {
logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
}
// create service proxy
return (T) PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic));
}
private void checkInvokerAvailable() throws IllegalStateException {
if (shouldCheck() && !invoker.isAvailable()) {
invoker.destroy();
throw new IllegalStateException("Failed to check the status of the service "
+ interfaceName
+ ". No provider available for the service "
+ (group == null ? "" : group + "/")
+ interfaceName +
(version == null ? "" : ":" + version)
+ " from the url "
+ invoker.getUrl()
+ " to the consumer "
+ NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
}
}
/**
* This method should be called right after the creation of this class's instance, before any property in other config modules is used.
* Check each config modules are created properly and override their properties if necessary.
*/
public void checkAndUpdateSubConfigs() {
if (StringUtils.isEmpty(interfaceName)) {
throw new IllegalStateException("<dubbo:reference interface=\"\" /> interface not allow null!");
}
completeCompoundConfigs(consumer);
if (consumer != null) {
if (StringUtils.isEmpty(registryIds)) {
setRegistryIds(consumer.getRegistryIds());
}
}
// get consumer's global configuration
checkDefault();
this.refresh();
if (getGeneric() == null && getConsumer() != null) {
setGeneric(getConsumer().getGeneric());
}
if (ProtocolUtils.isGeneric(generic)) {
interfaceClass = GenericService.class;
} else {
try {
interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
.getContextClassLoader());
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e.getMessage(), e);
}
checkInterfaceAndMethods(interfaceClass, getMethods());
}
//init serivceMetadata
serviceMetadata.setVersion(getVersion());
serviceMetadata.setGroup(getGroup());
serviceMetadata.setDefaultGroup(getGroup());
serviceMetadata.setServiceType(getActualInterface());
serviceMetadata.setServiceInterfaceName(interfaceName);
// TODO, uncomment this line once service key is unified
serviceMetadata.setServiceKey(URL.buildKey(interfaceName, group, version));
ServiceRepository repository = ApplicationModel.getServiceRepository();
ServiceDescriptor serviceDescriptor = repository.registerService(interfaceClass);
repository.registerConsumer(
serviceMetadata.getServiceKey(),
serviceDescriptor,
this,
null,
serviceMetadata);
resolveFile();
ConfigValidationUtils.validateReferenceConfig(this);
postProcessConfig();
}
/**
* Figure out should refer the service in the same JVM from configurations. The default behavior is true
* 1. if injvm is specified, then use it
* 2. then if a url is specified, then assume it's a remote call
* 3. otherwise, check scope parameter
* 4. if scope is not specified but the target service is provided in the same JVM, then prefer to make the local
* call, which is the default behavior
*/
protected boolean shouldJvmRefer(Map<String, String> map) {
URL tmpUrl = new URL("temp", "localhost", 0, map);
boolean isJvmRefer;
if (isInjvm() == null) {
// if a url is specified, don't do local reference
if (url != null && url.length() > 0) {
isJvmRefer = false;
} else {
// by default, reference local service if there is
isJvmRefer = InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl);
}
} else {
isJvmRefer = isInjvm();
}
return isJvmRefer;
}
/**
* Dispatch an {@link Event event}
*
* @param event an {@link Event event}
* @since 2.7.5
*/
protected void dispatch(Event event) {
EventDispatcher.getDefaultExtension().dispatch(event);
}
public DubboBootstrap getBootstrap() {
return bootstrap;
}
public void setBootstrap(DubboBootstrap bootstrap) {
this.bootstrap = bootstrap;
}
private void postProcessConfig() {
List<ConfigPostProcessor> configPostProcessors = ExtensionLoader.getExtensionLoader(ConfigPostProcessor.class)
.getActivateExtension(URL.valueOf("configPostProcessor://"), (String[]) null);
configPostProcessors.forEach(component -> component.postProcessReferConfig(this));
}
// just for test
Invoker<?> getInvoker() {
return invoker;
}
}