聊聊如何实现一个带幂等模板的Kafka消费者

实现步骤

1、kafka自动提交改为手动提交

spring: kafka: consumer: # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量 enable-auto-commit: ${KAFKA_CONSUMER_ENABLE_AUTO_COMMIT:false}复制代码

2、定义消费端模板抽象基类

@Slf4jpublic abstract class BaseComusmeListener { @KafkaHandler public final void receive(@Payload String data, @Header(value = KafkaHeaders.RECEIVED_TOPIC,required = false) String receivedTopic, @Header(value = KafkaHeaders.RECEIVED_MESSAGE_KEY,required = false) String receivedMessageKey, @Header(value = KafkaHeaders.RECEIVED_TIMESTAMP,required = false) long receivedTimestamp, Acknowledgment ack){ KafkaComsumePayLoad kafkaComsumePayLoad = buildKafkaComsumePayLoad(data,receivedTimestamp,receivedTopic,receivedMessageKey); boolean isRepeateConsume = isRepeateConsume(kafkaComsumePayLoad); if(isRepeateConsume){ log.warn(“messageKey:【{}】,topic:【{}】存在重复消息数据–>【{}】”,receivedMessageKey,receivedTopic,data); //手工确认 ack.acknowledge(); return; } if(doBiz(kafkaComsumePayLoad)){ //手工确认 ack.acknowledge(); } } /** * 是否重复消费 * @param kafkaComsumePayLoad * @return */ public abstract boolean isRepeateConsume(KafkaComsumePayLoad kafkaComsumePayLoad); /** * 业务处理 * @param kafkaComsumerPayLoad */ public abstract boolean doBiz(KafkaComsumePayLoad kafkaComsumerPayLoad); private KafkaComsumePayLoad buildKafkaComsumePayLoad(String data, long receivedTimestamp, String receivedTopic, String receivedMessageKey){ return KafkaComsumePayLoad.builder() .data(data) .receivedTimestamp(receivedTimestamp) .receivedTopic(receivedTopic) .receivedMessageKey(receivedMessageKey) .build(); }}复制代码

3、自定义监听注解【可选】

@Target({ ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE })@Retention(RetentionPolicy.RUNTIME)@KafkaListener@Documented@Componentpublic @interface LybGeekKafkaListener { @AliasFor(annotation = KafkaListener.class, attribute = “id”) String id() default “”; @AliasFor(annotation = KafkaListener.class, attribute = “containerFactory”) String containerFactory() default “”; @AliasFor(annotation = KafkaListener.class, attribute = “topics”) String[] topics() default {}; @AliasFor(annotation = KafkaListener.class, attribute = “topicPattern”) String topicPattern() default “”; @AliasFor(annotation = KafkaListener.class, attribute = “topicPartitions”) TopicPartition[] topicPartitions() default {}; @AliasFor(annotation = KafkaListener.class, attribute = “containerGroup”) String containerGroup() default “”; @AliasFor(annotation = KafkaListener.class, attribute = “errorHandler”) String errorHandler() default “”; @AliasFor(annotation = KafkaListener.class, attribute = “groupId”) String groupId() default “”; @AliasFor(annotation = KafkaListener.class, attribute = “idIsGroup”) boolean idIsGroup() default true; @AliasFor(annotation = KafkaListener.class, attribute = “clientIdPrefix”) String clientIdPrefix() default “”; @AliasFor(annotation = KafkaListener.class, attribute = “beanRef”) String beanRef() default “__listener”; @AliasFor(annotation = KafkaListener.class, attribute = “concurrency”) String concurrency() default “”; @AliasFor(annotation = KafkaListener.class, attribute = “autoStartup”) String autoStartup() default “”; @AliasFor(annotation = KafkaListener.class, attribute = “properties”) String[] properties() default {}; @AliasFor(annotation = Component.class, attribute = “value”) String value() default “”;}复制代码

3、重写KafkaListener注解后置处理器【可选】

注: 因示例项目的springboot版本比较低,直接使用@LybGeekKafkaListener不起作用

public class LybGeekKafkaListenerAnnotationBeanPostProcessorimplements BeanPostProcessor, Ordered, BeanFactoryAware, SmartInitializingSingleton {private static final String GENERATED_ID_PREFIX = “org.springframework.kafka.KafkaListenerEndpointContainer#”;/** * The bean name of the default {@link KafkaListenerContainerFactory}. */public static final String DEFAULT_KAFKA_LISTENER_CONTAINER_FACTORY_BEAN_NAME = “kafkaListenerContainerFactory”;private final Set nonAnnotatedClasses = Collections.newSetFromMap(new ConcurrentHashMap(64));private final Log logger = LogFactory.getLog(getClass());private final ListenerScope listenerScope = new ListenerScope();private KafkaListenerEndpointRegistry endpointRegistry;private String defaultContainerFactoryBeanName = DEFAULT_KAFKA_LISTENER_CONTAINER_FACTORY_BEAN_NAME;private DefaultListableBeanFactory beanFactory;private final KafkaHandlerMethodFactoryAdapter messageHandlerMethodFactory =new KafkaHandlerMethodFactoryAdapter();private final KafkaListenerEndpointRegistrar registrar = new KafkaListenerEndpointRegistrar();private final AtomicInteger counter = new AtomicInteger();private BeanExpressionResolver resolver = new StandardBeanExpressionResolver();private BeanExpressionContext expressionContext;private Charset charset = StandardCharsets.UTF_8;@Overridepublic int getOrder() {return LOWEST_PRECEDENCE;}/** * Set the {@link KafkaListenerEndpointRegistry} that will hold the created * endpoint and manage the lifecycle of the related listener container. * @param endpointRegistry the {@link KafkaListenerEndpointRegistry} to set. */public void setEndpointRegistry(KafkaListenerEndpointRegistry endpointRegistry) {this.endpointRegistry = endpointRegistry;}/** * Set the name of the {@link KafkaListenerContainerFactory} to use by default. *

If none is specified, “kafkaListenerContainerFactory” is assumed to be defined. * @param containerFactoryBeanName the {@link KafkaListenerContainerFactory} bean name. */public void setDefaultContainerFactoryBeanName(String containerFactoryBeanName) {this.defaultContainerFactoryBeanName = containerFactoryBeanName;}/** * Set the {@link MessageHandlerMethodFactory} to use to configure the message * listener responsible to serve an endpoint detected by this processor. *

By default, {@link DefaultMessageHandlerMethodFactory} is used and it * can be configured further to support additional method arguments * or to customize conversion and validation support. See * {@link DefaultMessageHandlerMethodFactory} Javadoc for more details. * @param messageHandlerMethodFactory the {@link MessageHandlerMethodFactory} instance. */public void setMessageHandlerMethodFactory(MessageHandlerMethodFactory messageHandlerMethodFactory) {this.messageHandlerMethodFactory.setMessageHandlerMethodFactory(messageHandlerMethodFactory);}/** * Making a {@link BeanFactory} available is optional; if not set, * {@link KafkaListenerConfigurer} beans won’t get autodetected and an * {@link #setEndpointRegistry endpoint registry} has to be explicitly configured. * @param beanFactory the {@link BeanFactory} to be used. */@Overridepublic void setBeanFactory(BeanFactory beanFactory) {this.beanFactory = (DefaultListableBeanFactory) beanFactory;if (beanFactory instanceof ConfigurableListableBeanFactory) {this.resolver = ((ConfigurableListableBeanFactory) beanFactory).getBeanExpressionResolver();this.expressionContext = new BeanExpressionContext((ConfigurableListableBeanFactory) beanFactory,this.listenerScope);}}/** * Set a charset to use when converting byte[] to String in method arguments. * Default UTF-8. * @param charset the charset. * @since 2.2 */public void setCharset(Charset charset) {Assert.notNull(charset, “‘charset’ cannot be null”);this.charset = charset;}@Overridepublic void afterSingletonsInstantiated() {this.registrar.setBeanFactory(this.beanFactory);if (this.beanFactory instanceof ListableBeanFactory) {Map instances =((ListableBeanFactory) this.beanFactory).getBeansOfType(KafkaListenerConfigurer.class);for (KafkaListenerConfigurer configurer : instances.values()) {configurer.configureKafkaListeners(this.registrar);}}if (this.registrar.getEndpointRegistry() == null) {if (this.endpointRegistry == null) {Assert.state(this.beanFactory != null,”BeanFactory must be set to find endpoint registry by bean name”);this.endpointRegistry = this.beanFactory.getBean(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,KafkaListenerEndpointRegistry.class);}this.registrar.setEndpointRegistry(this.endpointRegistry);}if (this.defaultContainerFactoryBeanName != null) {this.registrar.setContainerFactoryBeanName(this.defaultContainerFactoryBeanName);}// Set the custom handler method factory once resolved by the configurerMessageHandlerMethodFactory handlerMethodFactory = this.registrar.getMessageHandlerMethodFactory();if (handlerMethodFactory != null) {this.messageHandlerMethodFactory.setMessageHandlerMethodFactory(handlerMethodFactory);}else {addFormatters(this.messageHandlerMethodFactory.defaultFormattingConversionService);}// Actually register all listenersthis.registrar.afterPropertiesSet();beanFactory.removeBeanDefinition(KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME);}@Overridepublic Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {return bean;}@Overridepublic Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {if (!this.nonAnnotatedClasses.contains(bean.getClass())) {Class targetClass = AopUtils.getTargetClass(bean);Collection classLevelListeners = findListenerAnnotations(targetClass);final boolean hasClassLevelListeners = classLevelListeners.size() > 0;final List multiMethods = new ArrayList();Map annotatedMethods = MethodIntrospector.selectMethods(targetClass,(MethodIntrospector.MetadataLookup) method -> {Set listenerMethods = findListenerAnnotations(method);return (!listenerMethods.isEmpty() ? listenerMethods : null);});if (hasClassLevelListeners) {Set methodsWithHandler = MethodIntrospector.selectMethods(targetClass,(ReflectionUtils.MethodFilter) method ->AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null);multiMethods.addAll(methodsWithHandler);}if (annotatedMethods.isEmpty()) {this.nonAnnotatedClasses.add(bean.getClass());if (this.logger.isTraceEnabled()) {this.logger.trace(“No @LybGeekKafkaListener annotations found on bean type: ” + bean.getClass());}}else {// Non-empty set of methodsfor (Map.Entry entry : annotatedMethods.entrySet()) {Method method = entry.getKey();for (LybGeekKafkaListener listener : entry.getValue()) {processKafkaListener(listener, method, bean, beanName);}}if (this.logger.isDebugEnabled()) {this.logger.debug(annotatedMethods.size() + ” @LybGeekKafkaListener methods processed on bean ‘”+ beanName + “‘: ” + annotatedMethods);}}if (hasClassLevelListeners) {processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName);}}return bean;}/* * AnnotationUtils.getRepeatableAnnotations does not look at interfaces */private Collection findListenerAnnotations(Class clazz) {Set listeners = new HashSet();LybGeekKafkaListener ann = AnnotationUtils.findAnnotation(clazz, LybGeekKafkaListener.class);if (ann != null) {listeners.add(ann);}return listeners;}/* * AnnotationUtils.getRepeatableAnnotations does not look at interfaces */private Set findListenerAnnotations(Method method) {Set listeners = new HashSet();LybGeekKafkaListener ann = AnnotatedElementUtils.findMergedAnnotation(method, LybGeekKafkaListener.class);if (ann != null) {listeners.add(ann);}return listeners;}private void processMultiMethodListeners(Collection classLevelListeners, List multiMethods,Object bean, String beanName) {List checkedMethods = new ArrayList();Method defaultMethod = null;for (Method method : multiMethods) {Method checked = checkProxy(method, bean);KafkaHandler annotation = AnnotationUtils.findAnnotation(method, KafkaHandler.class);if (annotation != null && annotation.isDefault()) {final Method toAssert = defaultMethod;Assert.state(toAssert == null, () -> “Only one @KafkaHandler can be marked ‘isDefault’, found: “+ toAssert.toString() + ” and ” + method.toString());defaultMethod = checked;}checkedMethods.add(checked);}for (LybGeekKafkaListener classLevelListener : classLevelListeners) {MultiMethodKafkaListenerEndpoint endpoint =new MultiMethodKafkaListenerEndpoint(checkedMethods, defaultMethod, bean);processListener(endpoint, classLevelListener, bean, bean.getClass(), beanName);}}protected void processKafkaListener(LybGeekKafkaListener kafkaListener, Method method, Object bean, String beanName) {Method methodToUse = checkProxy(method, bean);MethodKafkaListenerEndpoint endpoint = new MethodKafkaListenerEndpoint();endpoint.setMethod(methodToUse);processListener(endpoint, kafkaListener, bean, methodToUse, beanName);}private Method checkProxy(Method methodArg, Object bean) {Method method = methodArg;if (AopUtils.isJdkDynamicProxy(bean)) {try {// Found a @LybGeekKafkaListener method on the target class for this JDK proxy ->// is it also present on the proxy itself?method = bean.getClass().getMethod(method.getName(), method.getParameterTypes());Class[] proxiedInterfaces = ((Advised) bean).getProxiedInterfaces();for (Class iface : proxiedInterfaces) {try {method = iface.getMethod(method.getName(), method.getParameterTypes());break;}catch (NoSuchMethodException noMethod) {}}}catch (SecurityException ex) {ReflectionUtils.handleReflectionException(ex);}catch (NoSuchMethodException ex) {throw new IllegalStateException(String.format(“@LybGeekKafkaListener method ‘%s’ found on bean target class ‘%s’, ” +”but not found in any interface(s) for bean JDK proxy. Either ” +”pull the method up to an interface or switch to subclass (CGLIB) ” +”proxies by setting proxy-target-class/proxyTargetClass ” +”attribute to ‘true'”, method.getName(),method.getDeclaringClass().getSimpleName()), ex);}}return method;}protected void processListener(MethodKafkaListenerEndpoint endpoint, LybGeekKafkaListener kafkaListener,Object bean, Object adminTarget, String beanName) {String beanRef = kafkaListener.beanRef();if (StringUtils.hasText(beanRef)) {this.listenerScope.addListener(beanRef, bean);}endpoint.setBean(bean);endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);endpoint.setId(getEndpointId(kafkaListener));endpoint.setGroupId(getEndpointGroupId(kafkaListener, endpoint.getId()));endpoint.setTopicPartitions(resolveTopicPartitions(kafkaListener));endpoint.setTopics(resolveTopics(kafkaListener));endpoint.setTopicPattern(resolvePattern(kafkaListener));endpoint.setClientIdPrefix(resolveExpressionAsString(kafkaListener.clientIdPrefix(), “clientIdPrefix”));String group = kafkaListener.containerGroup();if (StringUtils.hasText(group)) {Object resolvedGroup = resolveExpression(group);if (resolvedGroup instanceof String) {endpoint.setGroup((String) resolvedGroup);}}String concurrency = kafkaListener.concurrency();if (StringUtils.hasText(concurrency)) {endpoint.setConcurrency(resolveExpressionAsInteger(concurrency, “concurrency”));}String autoStartup = kafkaListener.autoStartup();if (StringUtils.hasText(autoStartup)) {endpoint.setAutoStartup(resolveExpressionAsBoolean(autoStartup, “autoStartup”));}resolveKafkaProperties(endpoint, kafkaListener.properties());KafkaListenerContainerFactory factory = null;String containerFactoryBeanName = resolve(kafkaListener.containerFactory());if (StringUtils.hasText(containerFactoryBeanName)) {Assert.state(this.beanFactory != null, “BeanFactory must be set to obtain container factory by bean name”);try {factory = this.beanFactory.getBean(containerFactoryBeanName, KafkaListenerContainerFactory.class);}catch (NoSuchBeanDefinitionException ex) {throw new BeanInitializationException(“Could not register Kafka listener endpoint on [” + adminTarget+ “] for bean ” + beanName + “, no ” + KafkaListenerContainerFactory.class.getSimpleName()+ ” with id ‘” + containerFactoryBeanName + “‘ was found in the application context”, ex);}}endpoint.setBeanFactory(this.beanFactory);String errorHandlerBeanName = resolveExpressionAsString(kafkaListener.errorHandler(), “errorHandler”);if (StringUtils.hasText(errorHandlerBeanName)) {endpoint.setErrorHandler(this.beanFactory.getBean(errorHandlerBeanName, KafkaListenerErrorHandler.class));}this.registrar.registerEndpoint(endpoint, factory);if (StringUtils.hasText(beanRef)) {this.listenerScope.removeListener(beanRef);}}private void resolveKafkaProperties(MethodKafkaListenerEndpoint endpoint, String[] propertyStrings) {if (propertyStrings.length > 0) {Properties properties = new Properties();for (String property : propertyStrings) {String value = resolveExpressionAsString(property, “property”);if (value != null) {try {properties.load(new StringReader(value));}catch (IOException e) {this.logger.error(“Failed to load property ” + property + “, continuing…”, e);}}}endpoint.setConsumerProperties(properties);}}private String getEndpointId(LybGeekKafkaListener kafkaListener) {if (StringUtils.hasText(kafkaListener.id())) {return resolveExpressionAsString(kafkaListener.id(), “id”);}else {return GENERATED_ID_PREFIX + this.counter.getAndIncrement();}}private String getEndpointGroupId(LybGeekKafkaListener kafkaListener, String id) {String groupId = null;if (StringUtils.hasText(kafkaListener.groupId())) {groupId = resolveExpressionAsString(kafkaListener.groupId(), “groupId”);}if (groupId == null && kafkaListener.idIsGroup() && StringUtils.hasText(kafkaListener.id())) {groupId = id;}return groupId;}private TopicPartitionInitialOffset[] resolveTopicPartitions(LybGeekKafkaListener kafkaListener) {TopicPartition[] topicPartitions = kafkaListener.topicPartitions();List result = new ArrayList();if (topicPartitions.length > 0) {for (TopicPartition topicPartition : topicPartitions) {result.addAll(resolveTopicPartitionsList(topicPartition));}}return result.toArray(new TopicPartitionInitialOffset[0]);}private String[] resolveTopics(LybGeekKafkaListener kafkaListener) {String[] topics = kafkaListener.topics();List result = new ArrayList();if (topics.length > 0) {for (String topic1 : topics) {Object topic = resolveExpression(topic1);resolveAsString(topic, result);}}return result.toArray(new String[0]);}private Pattern resolvePattern(LybGeekKafkaListener kafkaListener) {Pattern pattern = null;String text = kafkaListener.topicPattern();if (StringUtils.hasText(text)) {Object resolved = resolveExpression(text);if (resolved instanceof Pattern) {pattern = (Pattern) resolved;}else if (resolved instanceof String) {pattern = Pattern.compile((String) resolved);}else if (resolved != null) {throw new IllegalStateException(“topicPattern must resolve to a Pattern or String, not ” + resolved.getClass());}}return pattern;}private List resolveTopicPartitionsList(TopicPartition topicPartition) {Object topic = resolveExpression(topicPartition.topic());Assert.state(topic instanceof String,”topic in @TopicPartition must resolve to a String, not ” + topic.getClass());Assert.state(StringUtils.hasText((String) topic), “topic in @TopicPartition must not be empty”);String[] partitions = topicPartition.partitions();PartitionOffset[] partitionOffsets = topicPartition.partitionOffsets();Assert.state(partitions.length > 0 || partitionOffsets.length > 0,”At least one ‘partition’ or ‘partitionOffset’ required in @TopicPartition for topic ‘” + topic + “‘”);List result = new ArrayList();for (String partition : partitions) {resolvePartitionAsInteger((String) topic, resolveExpression(partition), result);}for (PartitionOffset partitionOffset : partitionOffsets) {TopicPartitionInitialOffset topicPartitionOffset =new TopicPartitionInitialOffset((String) topic,resolvePartition(topic, partitionOffset),resolveInitialOffset(topic, partitionOffset),isRelative(topic, partitionOffset));if (!result.contains(topicPartitionOffset)) {result.add(topicPartitionOffset);}else {throw new IllegalArgumentException(String.format(“@TopicPartition can’t have the same partition configuration twice: [%s]”,topicPartitionOffset));}}return result;}private Integer resolvePartition(Object topic, PartitionOffset partitionOffset) {Object partitionValue = resolveExpression(partitionOffset.partition());Integer partition;if (partitionValue instanceof String) {Assert.state(StringUtils.hasText((String) partitionValue),”partition in @PartitionOffset for topic ‘” + topic + “‘ cannot be empty”);partition = Integer.valueOf((String) partitionValue);}else if (partitionValue instanceof Integer) {partition = (Integer) partitionValue;}else {throw new IllegalArgumentException(String.format(“@PartitionOffset for topic ‘%s’ can’t resolve ‘%s’ as an Integer or String, resolved to ‘%s'”,topic, partitionOffset.partition(), partitionValue.getClass()));}return partition;}private Long resolveInitialOffset(Object topic, PartitionOffset partitionOffset) {Object initialOffsetValue = resolveExpression(partitionOffset.initialOffset());Long initialOffset;if (initialOffsetValue instanceof String) {Assert.state(StringUtils.hasText((String) initialOffsetValue),”‘initialOffset’ in @PartitionOffset for topic ‘” + topic + “‘ cannot be empty”);initialOffset = Long.valueOf((String) initialOffsetValue);}else if (initialOffsetValue instanceof Long) {initialOffset = (Long) initialOffsetValue;}else {throw new IllegalArgumentException(String.format(“@PartitionOffset for topic ‘%s’ can’t resolve ‘%s’ as a Long or String, resolved to ‘%s'”,topic, partitionOffset.initialOffset(), initialOffsetValue.getClass()));}return initialOffset;}private boolean isRelative(Object topic, PartitionOffset partitionOffset) {Object relativeToCurrentValue = resolveExpression(partitionOffset.relativeToCurrent());Boolean relativeToCurrent;if (relativeToCurrentValue instanceof String) {relativeToCurrent = Boolean.valueOf((String) relativeToCurrentValue);}else if (relativeToCurrentValue instanceof Boolean) {relativeToCurrent = (Boolean) relativeToCurrentValue;}else {throw new IllegalArgumentException(String.format(“@PartitionOffset for topic ‘%s’ can’t resolve ‘%s’ as a Boolean or String, resolved to ‘%s'”,topic, partitionOffset.relativeToCurrent(), relativeToCurrentValue.getClass()));}return relativeToCurrent;}@SuppressWarnings(“unchecked”)private void resolveAsString(Object resolvedValue, List result) {if (resolvedValue instanceof String[]) {for (Object object : (String[]) resolvedValue) {resolveAsString(object, result);}}else if (resolvedValue instanceof String) {result.add((String) resolvedValue);}else if (resolvedValue instanceof Iterable) {for (Object object : (Iterable) resolvedValue) {resolveAsString(object, result);}}else {throw new IllegalArgumentException(String.format(“@LybGeekKafkaListener can’t resolve ‘%s’ as a String”, resolvedValue));}}@SuppressWarnings(“unchecked”)private void resolvePartitionAsInteger(String topic, Object resolvedValue,List result) {if (resolvedValue instanceof String[]) {for (Object object : (String[]) resolvedValue) {resolvePartitionAsInteger(topic, object, result);}}else if (resolvedValue instanceof String) {Assert.state(StringUtils.hasText((String) resolvedValue),”partition in @TopicPartition for topic ‘” + topic + “‘ cannot be empty”);result.add(new TopicPartitionInitialOffset(topic, Integer.valueOf((String) resolvedValue)));}else if (resolvedValue instanceof Integer[]) {for (Integer partition : (Integer[]) resolvedValue) {result.add(new TopicPartitionInitialOffset(topic, partition));}}else if (resolvedValue instanceof Integer) {result.add(new TopicPartitionInitialOffset(topic, (Integer) resolvedValue));}else if (resolvedValue instanceof Iterable) {for (Object object : (Iterable) resolvedValue) {resolvePartitionAsInteger(topic, object, result);}}else {throw new IllegalArgumentException(String.format(“@LybGeekKafkaListener for topic ‘%s’ can’t resolve ‘%s’ as an Integer or String”, topic, resolvedValue));}}private String resolveExpressionAsString(String value, String attribute) {Object resolved = resolveExpression(value);if (resolved instanceof String) {return (String) resolved;}else if (resolved != null) {throw new IllegalStateException(“The [” + attribute + “] must resolve to a String. “+ “Resolved to [” + resolved.getClass() + “] for [” + value + “]”);}return null;}private Integer resolveExpressionAsInteger(String value, String attribute) {Object resolved = resolveExpression(value);Integer result = null;if (resolved instanceof String) {result = Integer.parseInt((String) resolved);}else if (resolved instanceof Number) {result = ((Number) resolved).intValue();}else if (resolved != null) {throw new IllegalStateException(“The [” + attribute + “] must resolve to an Number or a String that can be parsed as an Integer. “+ “Resolved to [” + resolved.getClass() + “] for [” + value + “]”);}return result;}private Boolean resolveExpressionAsBoolean(String value, String attribute) {Object resolved = resolveExpression(value);Boolean result = null;if (resolved instanceof Boolean) {result = (Boolean) resolved;}else if (resolved instanceof String) {result = Boolean.parseBoolean((String) resolved);}else if (resolved != null) {throw new IllegalStateException(“The [” + attribute + “] must resolve to a Boolean or a String that can be parsed as a Boolean. “+ “Resolved to [” + resolved.getClass() + “] for [” + value + “]”);}return result;}private Object resolveExpression(String value) {return this.resolver.evaluate(resolve(value), this.expressionContext);}/** * Resolve the specified value if possible. * @param value the value to resolve * @return the resolved value * @see ConfigurableBeanFactory#resolveEmbeddedValue */private String resolve(String value) {if (this.beanFactory != null && this.beanFactory instanceof ConfigurableBeanFactory) {return ((ConfigurableBeanFactory) this.beanFactory).resolveEmbeddedValue(value);}return value;}private void addFormatters(FormatterRegistry registry) {for (Converter converter : getBeansOfType(Converter.class)) {registry.addConverter(converter);}for (GenericConverter converter : getBeansOfType(GenericConverter.class)) {registry.addConverter(converter);}for (Formatter formatter : getBeansOfType(Formatter.class)) {registry.addFormatter(formatter);}}private Collection getBeansOfType(Class type) {if (LybGeekKafkaListenerAnnotationBeanPostProcessor.this.beanFactory instanceof ListableBeanFactory) {return ((ListableBeanFactory) LybGeekKafkaListenerAnnotationBeanPostProcessor.this.beanFactory).getBeansOfType(type).values();}else {return Collections.emptySet();}}/** * An {@link MessageHandlerMethodFactory} adapter that offers a configurable underlying * instance to use. Useful if the factory to use is determined once the endpoints * have been registered but not created yet. * @see KafkaListenerEndpointRegistrar#setMessageHandlerMethodFactory */private class KafkaHandlerMethodFactoryAdapter implements MessageHandlerMethodFactory {private final DefaultFormattingConversionService defaultFormattingConversionService =new DefaultFormattingConversionService();private MessageHandlerMethodFactory messageHandlerMethodFactory;public void setMessageHandlerMethodFactory(MessageHandlerMethodFactory kafkaHandlerMethodFactory1) {this.messageHandlerMethodFactory = kafkaHandlerMethodFactory1;}@Overridepublic InvocableHandlerMethod createInvocableHandlerMethod(Object bean, Method method) {return getMessageHandlerMethodFactory().createInvocableHandlerMethod(bean, method);}private MessageHandlerMethodFactory getMessageHandlerMethodFactory() {if (this.messageHandlerMethodFactory == null) {this.messageHandlerMethodFactory = createDefaultMessageHandlerMethodFactory();}return this.messageHandlerMethodFactory;}private MessageHandlerMethodFactory createDefaultMessageHandlerMethodFactory() {DefaultMessageHandlerMethodFactory defaultFactory = new DefaultMessageHandlerMethodFactory();Validator validator = LybGeekKafkaListenerAnnotationBeanPostProcessor.this.registrar.getValidator();if (validator != null) {defaultFactory.setValidator(validator);}defaultFactory.setBeanFactory(LybGeekKafkaListenerAnnotationBeanPostProcessor.this.beanFactory);ConfigurableBeanFactory cbf =LybGeekKafkaListenerAnnotationBeanPostProcessor.this.beanFactory instanceof ConfigurableBeanFactory ?(ConfigurableBeanFactory) LybGeekKafkaListenerAnnotationBeanPostProcessor.this.beanFactory :null;this.defaultFormattingConversionService.addConverter(new BytesToStringConverter(LybGeekKafkaListenerAnnotationBeanPostProcessor.this.charset));defaultFactory.setConversionService(this.defaultFormattingConversionService);List argumentResolvers = new ArrayList();// Annotation-based argument resolutionargumentResolvers.add(new HeaderMethodArgumentResolver(this.defaultFormattingConversionService, cbf));argumentResolvers.add(new HeadersMethodArgumentResolver());// Type-based argument resolutionfinal GenericMessageConverter messageConverter =new GenericMessageConverter(this.defaultFormattingConversionService);argumentResolvers.add(new MessageMethodArgumentResolver(messageConverter));argumentResolvers.add(new PayloadArgumentResolver(messageConverter, validator) {@Overridepublic Object resolveArgument(MethodParameter parameter, Message message) throws Exception {Object resolved = super.resolveArgument(parameter, message);/* * Replace KafkaNull list elements with null. */if (resolved instanceof List) {List list = ((List) resolved);for (int i = 0; i < list.size(); i++) {if (list.get(i) instanceof KafkaNull) {list.set(i, null);}}}return resolved;}@Overrideprotected boolean isEmptyPayload(Object payload) {return payload == null || payload instanceof KafkaNull;}});defaultFactory.setArgumentResolvers(argumentResolvers);defaultFactory.afterPropertiesSet();return defaultFactory;}}private static class BytesToStringConverter implements Converter {private final Charset charset;BytesToStringConverter(Charset charset) {this.charset = charset;}@Overridepublic String convert(byte[] source) {return new String(source, this.charset);}}private static class ListenerScope implements Scope {private final Map listeners = new HashMap();ListenerScope() {super();}public void addListener(String key, Object bean) {this.listeners.put(key, bean);}public void removeListener(String key) {this.listeners.remove(key);}@Overridepublic Object get(String name, ObjectFactory objectFactory) {return this.listeners.get(name);}@Overridepublic Object remove(String name) {return null;}@Overridepublic void registerDestructionCallback(String name, Runnable callback) {}@Overridepublic Object resolveContextualObject(String key) {return this.listeners.get(key);}@Overridepublic String getConversationId() {return null;}}}复制代码

业务侧如何使用

示例

@LybGeekKafkaListener(id = “createUser”,topics = Constant.USER_TOPIC)public class UserComsumer extends BaseComusmeListener { @Autowired private UserService userService; @Override public boolean isRepeateConsume(KafkaComsumePayLoad kafkaComsumePayLoad) { User user = JSON.parseObject(kafkaComsumePayLoad.getData(),User.class); return userService.isExistUserByUsername(user.getUsername()); } @Override public boolean doBiz(KafkaComsumePayLoad kafkaComsumerPayLoad) { User user = JSON.parseObject(kafkaComsumerPayLoad.getData(),User.class); return userService.save(user); }}复制代码

总结

有时候我们在宣导一些事情时,往往会发现即使我们已经说了N遍了,事情仍然会出现纰漏。这时候我们可以考虑把我们想宣导的东西工具化,通过工具来规范。比如有些业务,可能一些开发没考虑全面,我们就可以基于业务,把一些核心的场景抽象成方法,然后开发人员基于这些抽象方法,做具体实现。

郑重声明:本文内容及图片均整理自互联网,不代表本站立场,版权归原作者所有,如有侵权请联系管理员(admin#wlmqw.com)删除。
(0)
用户投稿
上一篇 2022年6月22日
下一篇 2022年6月22日

相关推荐

  • 龙佰集团:预计下半年欧洲的钛白粉市场份额会由国内氯化法钛白粉部分替代

    龙佰集团9月10日在深交所“互动易”平台表示,欧洲同行业因缺电缺气,导致其产能大幅降低,预计下半年原本欧洲的钛白粉市场份额会由国内氯化法钛白粉部分替代。目前公司的出口订单没有明显的…

    2022年9月13日
  • 孩子太老实怎么办?

    学会拒绝自己不想做的事情。 父母要告诉孩子拒绝别人是一种权力,每个人都有权选择帮助别人,也有权拒绝别人。自己不愿意做的事情要学会拒绝,每个人都有选择的权力,所以没必要太过于善解人意…

    2022年8月19日
  • 运动+健康,小姐姐的腕上新宠-佳明Smart 5手环上手体验

    Hello,大家好,我是婉儿,很高兴又见面了! 随着智能穿戴设备在当下的流行化,其衍生出来的运动手环也受到越来越多用户的青睐。提到智能运动设备,就无法忽视佳明的存在,作为智能运动手…

    2022年6月25日
  • 区块链、比特币、以太坊和ICO,你真的懂了么?

    区块链 区块链毫无疑问是近来最火的概念之一,朋友圈里区块链、比特币、以太坊和ICO,各种名词每天刷屏, 但你真的明白这些名词的意义么? 01 区块链 技术角度来说,区块链是一种由多…

    2022年6月21日
  • 38岁乔丹和38岁詹姆斯,谁更出色?看数据荣誉对比后:新的第一人

    38岁乔丹和38岁詹姆斯到底谁更出色,两位都是历史级别的顶级球员,生涯巅峰期都很长。在个人数据方面,38岁詹姆斯场均出战37.2分钟,场均贡献30.3+8.2+6.3,乔丹场均出战…

    2022年8月15日
  • 哪一段记忆使你记忆深刻?

    其实人生中有很多记忆深而且每一段经历都不一样会触发不同的感觉! 记得那一年的高二的时候我正在读书然后也是期中考试,然后被三个女孩子拉下那个球场里打球打着打着到10点,然后宿舍休息的…

    2022年5月4日
  • 经济观察:科技守护 中国夏粮丰收有“数”

    (经济观察)科技守护 中国夏粮丰收有“数” 中新社北京7月15日电 (记者 刘育英)2022年中国夏粮再获丰收。在喜获丰收的背后,数字科技在种植、烘干、收储、加工、销售等环节发挥着…

    2022年7月16日
  • 印度RCS服务被禁止!国内5G消息会不会步入后尘?

    近日,谷歌在印度用户表示,他们在使用RCS服务时,会收到大量的垃圾广告,让人难以接受。 据悉,谷歌的RCS功能可以允许经过验证的企业向客户发送消息、图像并具备一些交互功能。 对此,…

    2022年7月3日
  • 网购的手机竟然变成充电头!50多人上当受骗:损失超100万元

    近日,据新京报报道,有多位消费者反映称,618购物节期间,自己在微信小程序“趣玩数码优品”中下单购买手机,结果只收到了数据线和充电头。目前为止,已经有50多名消费者受骗,总计损失超…

    2022年6月26日
  • 今年618丨“只买一箱纸”消费者捂紧钱包 商家追求利润谨慎促销

    谁会在收入下降、充满不确定性的环境下大肆购物呢? 只买一箱纸和2瓶洗衣液,化妆品从Lamer、SK2等国际大牌换成国货百雀羚、佰草集,猫粮从渴望降低到百利,接受电厂采访的3名消费者…

    2022年9月6日

联系我们

联系邮箱:admin#wlmqw.com
工作时间:周一至周五,10:30-18:30,节假日休息