一种实现分布式消息处理的系统及方法

xiaoxiao2020-7-22  1

一种实现分布式消息处理的系统及方法
【专利摘要】本发明公开了一种实现分布式消息处理的系统及方法,用于提高消息处理系统的通用性及灵活性,该系统包括:表达式模块中包括有规则项、函数、运算符、表达式、宏变量及触发器;设置模块,用于接收用户利用表达式模块建立的消息处理逻辑表达式,并将消息处理逻辑表达式发送给同步模块;消息处理逻辑表达式由规则项、函数、运算符、表达式、宏变量中的一种或多种组成;同步模块,用于将消息处理逻辑表达式实时发送给消息处理模块;消息处理模块,用于根据消息处理逻辑表达式,更新自身保存的消息处理逻辑表达式;接收所需处理的消息,读取最新的消息处理逻辑表达式,按照最新的消息处理逻辑表达式对应的消息处理逻辑对所需处理的消息进行处理。
【专利说明】一种实现分布式消息处理的系统及方法
【技术领域】
[0001]本发明涉及消息处理【技术领域】,具体涉及一种实现分布式消息处理的系统及方法。
【背景技术】
[0002]随着互联网的业务发展,造就了很多的大型系统,这些大型系统一般都采用分布式的方式增加系统的并发能力。分布式消息处理系统一般在大型系统中处理各模块数据交互和逻辑、系统产生数据的加工处理等等,其特点是消息的异步和并发处理吞吐量大,横向扩展容易。
[0003]在现有技术中,分布式消息处理系统消息处理方式和逻辑都需要在系统启动前定义好,其消息处理流程和处理逻辑在系统运行期间不能改变、或者只能做微小变更。如果需要变更处理流程,需要管理人员变更配置重启系统,如果需要变更处理逻辑,需要开发人员修改和重新实现新的处理逻辑并重启系统。这样,如果消息处理逻辑发生改变,只能重新修改系统处理逻辑代码并重启系统,系统通用性很差。

【发明内容】

[0004]有鉴于此,本发明提供一种实现分布式消息处理的系统及方法,以解决现有技术中分布式消息处理系统存在通用性差、无法灵活变更的技术问题。
[0005]为解决上述问题,本发明提供的技术方案如下:
[0006]—种实现分布式消息处理的系统,所述系统包括:
[0007]表达式模块、设置模块、同步模块以及至少一个消息处理模块;
[0008]所述表达式模块中包括有规则项、函数、运算符、表达式、宏变量以及触发器;所述触发器用于当所需处理的消息满足消息处理逻辑表达式时触发对应的处理操作;
[0009]所述设置模块,用于接收用户利用所述表达式模块建立的消息处理逻辑表达式,并将所述消息处理逻辑表达式发送给同步模块;所述消息处理逻辑表达式由所述规则项、所述函数、所述运算符、所述表达式、所述宏变量中的一种或多种组成;
[0010]所述同步模块,用于将接收到的所述消息处理逻辑表达式实时发送给所述消息处理模块;
[0011]消息处理模块,用于根据所述同步模块实时发送的消息处理逻辑表达式,更新自身保存的消息处理逻辑表达式;接收所需处理的消息,读取最新的消息处理逻辑表达式,按照所述最新的消息处理逻辑表达式对应的消息处理逻辑对所述所需处理的消息进行处理。
[0012]相应的,所述系统还包括:
[0013]结果输出模块,用于将所述消息处理单元输出的处理结果发送至对应的结果接收装置,所述结果接收装置包括消息队列、短信发送系统、邮件发送系统、数据库以及系统模块。
[0014]相应的,所述消息处理模块包括:[0015]更新单元,用于根据所述同步模块实时发送的消息处理逻辑表达式,更新自身保存的消息处理逻辑表达式;
[0016]消息初选单元,用于接收所需处理的消息,读取最新的消息处理逻辑表达式,对所述所需处理的消息进行是否符合所述最新的消息处理逻辑表达式的筛选,将筛选通过的消息放入消息队列;
[0017]综合处理单元,用于从所述消息队列读取所述筛选通过的消息,读取最新的消息处理逻辑表达式;按照所述最新的消息处理逻辑表达式对应的消息处理逻辑对所述筛选通过的消息进行处理。
[0018]相应的,所述综合处理单元包括:
[0019]第一读取子单元,用于从所述消息队列读取所述筛选通过的消息;
[0020]第二读取子单元,用于读取最新的消息处理逻辑表达式;
[0021]拆分子单元,用于将所述最新的消息处理逻辑表达式拆分为表达式最小单元;
[0022]处理子单元,用于按照所述表达式最小单元对应的消息处理逻辑对所述筛选通过的消息进行处理。
[0023]相应的,所述消息处理单元还包括:
[0024]确认更新单元,用于在更新自身保存的消息处理逻辑表达式之后,判断更新是否成功,如果是,向所述同步模块发送同步成功信号,以使所述同步模块将所述同步成功信号发送给所述设置模块,如果否,向所述同步模块发送失败信号,以使所述同步模块将所述同步失败信号发送给所述设置模块,使所述设置模块重新将接收用户利用所述表达式模块建立的消息处理逻辑表达式发送给所述同步模块。
[0025]相应的,所述表达式模块还用于:
[0026]判断用户建立的消息处理逻辑表达式是否满足表达式合法性规则,如果是,触发所述设置模块将所述消息处理逻辑表达式发送给同步模块,如果否,通过所述设置模块提示用户重新建立消息处理逻辑表达式。
[0027]—种实现分布式消息处理的方法,所述方法包括:
[0028]设置模块接收用户利用表达式模块建立的消息处理逻辑表达式,并将所述消息处理逻辑表达式发送给同步模块;所述表达式模块中包括有规则项、函数、运算符、表达式、宏变量以及触发器;所述消息处理逻辑表达式由所述规则项、所述函数、所述运算符、所述表达式、所述宏变量中的一种或多种组成;所述触发器用于当所需处理的消息满足消息处理逻辑表达式时触发对应的处理操作;
[0029]所述同步模块将接收到的所述消息处理逻辑表达式实时发送给所述消息处理模块;
[0030]所述消息处理模块根据所述同步模块实时发送的消息处理逻辑表达式,更新自身保存的消息处理逻辑表达式;接收所需处理的消息,读取最新的消息处理逻辑表达式,按照所述最新的消息处理逻辑表达式对应的消息处理逻辑对所述所需处理的消息进行处理。
[0031]相应的,所述方法还包括:
[0032]结果输出模块将所述消息处理单元输出的处理结果发送至对应的结果接收装置,所述结果接收装置包括消息队列、短信发送系统、邮件发送系统、数据库以及系统模块。
[0033]相应的,所述按照所述最新的消息处理逻辑表达式对应的消息处理逻辑对所述所需处理的消息进行处理,包括:
[0034]对所述所需处理的消息进行是否符合所述最新的消息处理逻辑表达式的筛选,将筛选通过的消息放入消息队列;
[0035]从所述消息队列读取所述筛选通过的消息,按照所述最新的消息处理逻辑表达式对应的消息处理逻辑对所述筛选通过的消息进行处理。
[0036]相应的,所述按照所述最新的消息处理逻辑表达式对应的消息处理逻辑对所述筛选通过的消息进行处理,包括:
[0037]将所述最新的消息处理逻辑表达式拆分为表达式最小单元;
[0038]按照所述表达式最小单元对应的消息处理逻辑对所述筛选通过的消息进行处理。
[0039]相应的,在所述消息处理单元更新自身保存的消息处理逻辑表达式之后,所述方法还包括:
[0040]所述消息处理单元判断更新是否成功,如果是,向所述同步模块发送同步成功信号,以使所述同步模块将所述同步成功信号发送给所述设置模块,如果否,向所述同步模块发送失败信号,以使所述同步模块将所述同步失败信号发送给所述设置模块,使所述设置模块重新将接收用户利用所述表达式模块建立的消息处理逻辑表达式发送给所述同步模块。
[0041]相应的,所述方法还包括:
[0042]所述表达式模块判断用户建立的消息处理逻辑表达式是否满足表达式合法性规贝1J,如果是,触发所述设置模块将所述消息处理逻辑表达式发送给同步模块,如果否,通过所述设置模块提示用户重新建立消息处理逻辑表达式。
[0043]由此可见,本发明实施例具有如下有益效果:
[0044]本发明实施例中提供一套表达式系统,抽象各种消息处理逻辑和实现方式,可以将处理逻辑中的规则项、函数、宏变量以及相互之间的关系组成消息处理逻辑表达式,也即消息的处理逻辑可以使用表达式系统中的内容组合实现,具有灵活性;通过实时接收用户利用表达式系统建立的消息处理逻辑表达式,并同步到消息处理模块,在系统不需要重启的情况下,用户在线修改消息处理逻辑表达式就能够实现消息处理逻辑和流程的变更。在本发明实施例中消息的处理逻辑都可以使用逻辑表达式灵活组合实现,且消息处理逻辑可以随时变更,从而实现了一个通用的、高响应、灵活扩展的分布式消息处理系统。
【专利附图】

【附图说明】
[0045]图1为本发明实施例中提供的实现分布式消息处理的系统实施例的示意图;
[0046]图2为本发明实施例中提供的实现分布式消息处理的系统实施例的数据流图;
[0047]图3为本发明实施例中提供的实现分布式消息处理的方法实施例一的流程图;
[0048]图4为本发明实施例中提供的实现分布式消息处理的方法实施例二的流程图。
【具体实施方式】
[0049]为使本发明的上述目的、特征和优点能够更加明显易懂,下面结合附图和【具体实施方式】对本发明实施例作进一步详细的说明。
[0050]本发明实施例提供的实现分布式消息处理的系统及方法,是针对现有技术中消息处理方式和逻辑都需要在系统启动前定义好,无法中途变更;制定处理逻辑的同时,系统要有相对应的处理程序,灵活性很低;当有新种类数据时候,只能添加或修改系统处理逻辑代码实现新数据的处理,系统通用性很差的技术问题,提出从消息处理抽象角度考虑,设计实现一套逻辑表达式系统,以灵活实现实时消息处理变更的需求,其通过将各种处理方式、处理逻辑抽象为各种原子函数,这些原子函数能够组合成各种复杂的处理逻辑。通过表达式系统,用户只需要修改和建立各种消息处理逻辑表达式即可实现各种消息处理逻辑的整合。利用灵活通用的表达式系统,实现不用修改系统处理程序和重启系统情况下,变更消息处理逻辑实现新消息处理。
[0051]基于上述思想,参见图1所示,是本发明实施例提供的实现分布式消息处理的系统实施例,该系统可以包括:
[0052]表达式模块101、设置模块102、同步模块103以及至少一个消息处理模块104。
[0053]其中,表达式模块101中包括有规则项、函数、运算符、表达式、宏变量以及触发器;触发器用于当所需处理的消息满足消息处理逻辑表达式时触发对应的处理操作。
[0054]表达式模块是分布式消息处理系统能够实现动态组合变更消息处理逻辑的核心模块。该模块抽象出一套通用的表达式配置功能,用户能够使用定义的内容实现各种简单以及复杂的消息处理逻辑。表达式模块可以包括如下部分:触发器(trigger)、规则项(item)、函数(function)、表达式(rules)、宏变量(macro)、运算符(operator)。
[0055]规则项代表表达式模块中的规则子项,反映消息的种类、来源或者消息表达的含义,各种消息都可以抽象成不同的规则项。
[0056]函数为消息处理逻辑表达式中用于处理消息的逻辑操作,比如最大值、最小值、中间值、平局值、不同、差值、计数、总和、方差等等。函数和规则项配合使用可以组成消息的处理逻辑。
[0057]运算符代表表达式内部或表达式之间的关系及运算,例如包括大于(>)、小于(O、等于( = )、不等于(#)、关系与(&)、关系非(I)等运算操作。
[0058]表达式可以是利用上述的规则项、函数、运算符等按照用户的处理逻辑组成的表明某种类型的消息按照怎样的处理逻辑处理以及输出怎样的结果的公式,也即表达式可以表示一种消息的处理逻辑。
[0059]宏变量表示抽象的统一替代参数,宏变量可以是一种结果或变量的抽象,当具体到特定实体时,可以替换成具体的实体值。在定义消息处理输出结果时,为了定义通用输出结果模板,可以使用宏变量实现,在具体的实例中,宏变量会被替换成实体的值。
[0060]触发器是消息处理逻辑表达式和具体实体绑定后的特殊单位,是为了通知和标记某实体设置的消息处理逻辑表达式有数据满足条件,触发该实体处理满足条件的消息。如果不需要将结果绑定到单个实体,可以不使用触发器。
[0061]设置模块102,用于接收用户利用表达式模块建立的消息处理逻辑表达式,并将消息处理逻辑表达式发送给同步模块;消息处理逻辑表达式由规则项、函数、运算符、表达式、宏变量中的一种或多种组成。
[0062]设置模块是用户设置管理消息处理逻辑表达式的模块,例如用户可以增加、删除、修改、查询消息处理逻辑表达式,也即设置模块可以接收用户利用表达式模块增加、删除、修改的消息处理逻辑表达式,并为用户提供已有消息处理逻辑表达式的查询。消息处理逻辑表达式是消息处理流程和处理逻辑的体现,用户通过设置模块调用表达式模块中的内容(规则项、函数、运算符、表达式、宏变量)可以实现定义各种消息处理逻辑表达式,每一个消息处理逻辑表达式实现一种消息处理流程和逻辑。消息处理逻辑表达式的输出结果还可以与触发器绑定,当所需处理的消息满足消息处理逻辑表达式时触发对应的处理操作。
[0063]为了更清晰的理解消息处理逻辑表达式,通过一个具体示例对消息处理逻辑表达式进行说明。表达式{PD[DI_pdState].last(#l)}#10为一个逻辑比较简单的消息处理逻辑表达式,在这个表达式中H)为规则项,表示物理磁盘;DI_pdState为规则子项,表示物理磁盘的状态值;last为函数,表示最近一段时间的消息值,last括号里的内容表示消息的时间范围等,#1表示最近一次;最后面的#为运算符不等于。整个消息处理逻辑表达式意思为物理磁盘的状态值在最新的一次采集数据中的值不等于10。如果某个消息符合该表达式,就符合处理条件,可以触发对应的操作处理。该表达式只是简单的示例,在实际应用中利用表达式模块可以组装出更为复杂的消息处理逻辑表达式,例如包括多重表达式等。
[0064]在本发明的一些实施例中,表达式模块还用于:
[0065]判断用户建立的消息处理逻辑表达式是否满足表达式合法性规则,如果是,触发设置模块将消息处理逻辑表达式发送给同步模块,如果否,通过设置模块提示用户重新建立消息处理逻辑表达式。
[0066]对消息处理逻辑表达式进行合法性规则验证,以满足用户输入的消息处理逻辑表达式可以被实现,在本实施例中仅将合法的消息处理逻辑表达式发送给同步模块,可以保证后续消息处理流程的正确性。
[0067]同步模块103,用于将接收到的消息处理逻辑表达式实时发送给消息处理模块。
[0068]用户对消息处理逻辑表达式的修改,通过同步模块实时同步更新到各分布式消息处理模块上。同步模块可以使用socket通信方式,实时将用户消息处理模块更新到各个消息处理模块上。对于同步模块与消息处理模块之间的通信方式在此并不进行限制。
[0069]消息处理模块104,用于根据同步模块实时发送的消息处理逻辑表达式,更新自身保存的消息处理逻辑表达式;接收所需处理的消息,读取最新的消息处理逻辑表达式,按照最新的消息处理逻辑表达式对应的消息处理逻辑对所需处理的消息进行处理。
[0070]在本发明的一些实施例中,消息处理模块可以包括:
[0071]更新单元,用于根据同步模块实时发送的消息处理逻辑表达式,更新自身保存的消息处理逻辑表达式,这样实现了用户通过在线修改消息处理逻辑表达式能够使消息处理流程和逻辑实现进行变更。
[0072]消息初选单元,用于接收所需处理的消息,读取最新的消息处理逻辑表达式,对所需处理的消息进行是否符合最新的消息处理逻辑表达式的筛选,将筛选通过的消息放入消息队列。
[0073]综合处理单元,用于从消息队列读取筛选通过的消息,读取最新的消息处理逻辑表达式;按照最新的消息处理逻辑表达式对应的消息处理逻辑对筛选通过的消息进行处理。
[0074]这样,消息处理可以分为两层,一层为消息初选:对接收到的消息,使用消息处理逻辑表达式的处理逻辑,初步筛选符合消息处理逻辑表达式的消息,放入消息队列,等待后续处理;另外一层为消息综合处理:对筛选出的消息利用对应的消息处理逻辑表达式,按流程和处理逻辑处理数据。使用分层结构,是考虑到数据初选层会收集到大量的消息,这些消息中有很多不符合表达式匹配的消息,过滤掉这些消息,能够让消息综合处理层性能更高,逻辑更简单。
[0075]在本发明的一些实施例中,综合处理单元可以包括:
[0076]第一读取子单元,用于从消息队列读取筛选通过的消息;
[0077]第二读取子单元,用于读取最新的消息处理逻辑表达式;
[0078]拆分子单元,用于将最新的消息处理逻辑表达式拆分为表达式最小单元;
[0079]处理子单元,用于按照表达式最小单元对应的消息处理逻辑对筛选通过的消息进行处理。
[0080]也即综合处理单元从消息队列中取出筛选后的消息,对消息对应的消息处理逻辑表达式进行拆分出来,将复杂表达式分解为各个表达式最小单元,分解后的表达式子项都有对应的处理逻辑。按照实现各表达式子项的处理逻辑和处理函数逐项处理,直到整个表达式处理结束,一条消息处理完毕。这样,通过对表达式进行拆分为最小单元,而最小单元都有对应的消息处理逻辑,可以实现用户修改了消息处理逻辑表达式,但是消息处理逻辑依然可以正确运行,保证了系统的通用性。
[0081]消息初选单元与综合处理单元使用的消息处理逻辑表达式虽然相同,但处理方式与目的并不相同。继续使用消息处理逻辑表达式:{PD[DI_pdState].last (#1)} #10为例,在消息初选单元,读取一些消息后会映射ro[DI_pdState]段,确定消息类型是:物理磁盘状态值,同时确定该值是否#10 (不等于10),如果该条消息中物理磁盘状态值不等于10就符合条件,放入消息队列;在综合处理单元,使用拆分表达式的方式将该消息处理逻辑表达式拆分成4部分:PD、DI_pdState、last (#1)、#10,其中PD、DI_pdState两个字段表示消息类型,last(#l)是带参数的函数,表示该类型数据的最近一次的值,最后一个#10,表示该值不等于10,当满足这个条件后,会根据PD、DI_pdState,按预定义模板生成结果,送入结果队列。
[0082]在本发明的一些实施例中,消息处理单元还可以包括:
[0083]确认更新单元,用于在更新自身保存的消息处理逻辑表达式之后,判断更新是否成功,如果是,向同步模块发送同步成功信号,以使同步模块将同步成功信号发送给设置模块,如果否,向同步模块发送失败信号,以使同步模块将同步失败信号发送给设置模块,使设置模块重新将接收用户利用表达式模块建立的消息处理逻辑表达式发送给同步模块。
[0084]消息处理单元监听到消息处理逻辑表达式变更(例如增加、删除、修改)后,更新消息处理逻辑表达式内存结构体中的内容,如果成功修改,更新程序会发送一个确认消息到同步模块,如修改未成功,也会发送失败消息到同步模块,同时附加错误原因等。
[0085]基于上述实施例,在本发明的一些实施例中,本发明实施例提供的实现分布式消息处理的系统实施例还可以包括:
[0086]结果输出模块,用于将消息处理单元输出的处理结果发送至对应的结果接收装置,结果接收装置包括消息队列、短信发送系统、邮件发送系统、数据库以及系统模块。
[0087]消息被处理后会有相应的结果输出,包括处理结果和接收体,接收体例如短信系统、邮件系统、消息队列、系统模块、数据库等等。
[0088]这样,本发明实施例中通过提供一套表达式系统,抽象各种消息处理逻辑和实现方式,可以将处理逻辑中的规则项、函数、宏变量以及相互之间的关系组成消息处理逻辑表达式,也即消息的处理逻辑可以使用表达式系统中的内容组合实现,具有灵活性;通过实时接收用户利用表达式系统建立的消息处理逻辑表达式,并同步到消息处理模块,在系统不需要重启的情况下,用户在线修改消息处理逻辑表达式就能够实现消息处理逻辑和流程的变更。
[0089]本发明实施例将常见数据处理抽象为一套表达式系统,能够自由组合和实现逻辑,同时易于扩展;消息的处理逻辑都可使用消息处理逻辑表达式组合实现,消息处理逻辑可以随时变更;可以在系统运行中无缝变更和新增消息处理逻辑,不用重启系统,从而实现了一个通用的、高响应、灵活扩展的分布式消息处理系统。
[0090]参见图2所示,是本发明实施例中提供的实现分布式消息处理系统的数据流图。实现分布式消息处理可以分为消息处理逻辑表达式变更流程以及消息处理流程。
[0091]A、消息处理逻辑表达式变更流程:
[0092]Al、用户通过设置模块界面修改(增加、删除、修改、查询)消息处理逻辑表达式。
[0093]A2、表达式模块提供规则项、函数、运算符、表达式、宏变量、触发器。用户修改或新增的消息处理逻辑表达式由表达式模块提供组合、验证、存储等。
[0094]A3、消息处理逻辑表达式创建完成后,并且通过表达式模块的合法性验证,合法消息处理逻辑表达式会通过设置模块传送到同步模块。
[0095]A4、同步模块使用socket通信方式同步到分布式的各消息处理模块中的消息初选单元以及综合处理单元。消息同步之后,各消息处理模块内部自带的更新程序修改、合并、更新消息处理逻辑表达式。完成之后返回确认消息,同步模块把各消息处理模块同步结果返回给设置模块,设置模块通知用户消息处理逻辑表达式是否成功变更和同步,用户可以根据结果选择合适的操作。
[0096]B、消息处理流程:
[0097]B1、外部待处理的消息通过接口传入消息处理模块。(在分布式消息处理模块中,可以存在多个消息初选单元,取决于系统消息并发量和系统处理能力)。
[0098]B2、消息初选单元使用消息处理逻辑表达式,初次匹配数据,筛选出符合消息处理逻辑表达式的消息,送入消息队列中。初选模块只是做消息的初步筛选,真正的处理流程和处理逻辑由综合处理单元完成。采用这种分层处理的设计是因为在大量的消息中,可能存在大量不符合消息处理逻辑表达式的消息,由消息初选单元忽略这些消息,可以降低后续消息处理的压力。如果不分层,消息处理集中在一个处理模块完成,会造成单模块压力过大,处理并发量有限,同时,分布式消息处理系统可能需要综合处理消息,相互关联的数据分布在不同的节点上时候造成处理困难等。
[0099]B3、综合处理单元从消息队列中取出筛选后的数据(可并发多进程、多线程处理),对消息对应的消息处理逻辑表达式进行拆分出来,将复杂表达式分解为各个表达式最小单元,分解后的表达式子项都有对应的处理逻辑。按照实现各表达式子项的处理逻辑和处理函数逐项处理,直到整个表达式处理结束,一条消息处理完毕。
[0100]B4、综合处理单元对消息处理完成后,需要输出消息处理结果,可以放入结果队列,也可以持久化到数据库或文件中。
[0101]B5、邮件或短信发送程序从结果队列中取出结果,发送邮件或短信等。还可以扩展其他的结果输出方式。
[0102]相应的,参见图3所示,是本发明实施例提供的实现分布式消息处理的方法实施例一的流程图,可以包括以下步骤:
[0103]步骤301:设置模块接收用户利用表达式模块建立的消息处理逻辑表达式,并将消息处理逻辑表达式发送给同步模块;表达式模块中包括有规则项、函数、运算符、表达式、宏变量以及触发器;消息处理逻辑表达式由规则项、函数、运算符、表达式、宏变量中的一种或多种组成;触发器用于当所需处理的消息满足消息处理逻辑表达式时触发对应的处理操作。
[0104]步骤302:同步模块将接收到的消息处理逻辑表达式实时发送给消息处理模块。
[0105]步骤303:消息处理模块根据同步模块实时发送的消息处理逻辑表达式,更新自身保存的消息处理逻辑表达式;接收所需处理的消息,读取最新的消息处理逻辑表达式,按照最新的消息处理逻辑表达式对应的消息处理逻辑对所需处理的消息进行处理。
[0106]在本发明的一些实施例中,按照最新的消息处理逻辑表达式对应的消息处理逻辑对所需处理的消息进行处理的具体实现可以包括:
[0107]对所需处理的消息进行是否符合最新的消息处理逻辑表达式的筛选,将筛选通过的消息放入消息队列;
[0108]从消息队列读取筛选通过的消息,按照最新的消息处理逻辑表达式对应的消息处理逻辑对筛选通过的消息进行处理。
[0109]在本发明的一些实施例中,按照最新的消息处理逻辑表达式对应的消息处理逻辑对筛选通过的消息进行处理的具体实现可以包括:
[0110]将最新的消息处理逻辑表达式拆分为表达式最小单元;
[0111]按照表达式最小单元对应的消息处理逻辑对筛选通过的消息进行处理。
[0112]在本发明的一些实施例中,本发明实施例提供的实现分布式消息处理的方法实施例还可以包括:
[0113]结果输出模块将消息处理单元输出的处理结果发送至对应的结果接收装置,结果接收装置包括消息队列、短信发送系统、邮件发送系统、数据库以及系统模块。
[0114]在本发明的一些实施例中,在消息处理单元更新自身保存的消息处理逻辑表达式之后,本发明实施例提供的实现分布式消息处理的方法实施例还可以包括:
[0115]消息处理单元判断更新是否成功,如果是,向同步模块发送同步成功信号,以使同步模块将同步成功信号发送给设置模块,如果否,向同步模块发送失败信号,以使同步模块将同步失败信号发送给设置模块,使设置模块重新将接收用户利用表达式模块建立的消息处理逻辑表达式发送给同步模块。
[0116]本发明实施例提供的实现分布式消息处理的方法实施例还可以包括:
[0117]表达式模块判断用户建立的消息处理逻辑表达式是否满足表达式合法性规则,如果是,触发设置模块将消息处理逻辑表达式发送给同步模块,如果否,通过设置模块提示用户重新建立消息处理逻辑表达式。
[0118]参见图4所示,是本发明实施例提供的实现分布式消息处理的方法实施例二的流程图,可以包括以下步骤:
[0119]步骤401:设置模块接收用户利用表达式模块建立的消息处理逻辑表达式。[0120]步骤402:表达式模块判断用户建立的消息处理逻辑表达式是否满足表达式合法性规则,如果是,进入步骤403,如果否,进入步骤404。
[0121]步骤403:设置模块将消息处理逻辑表达式发送给同步模块,并进入步骤405。
[0122]步骤404:通过设置模块提示用户重新建立消息处理逻辑表达式,返回步骤401。
[0123]步骤405:同步模块将接收到的消息处理逻辑表达式实时发送给消息处理模块。
[0124]步骤406:消息处理模块根据同步模块实时发送的消息处理逻辑表达式,更新自身保存的消息处理逻辑表达式。
[0125]步骤407:消息处理单元判断更新是否成功,如果是,进入步骤408,如果否,执行步骤409。
[0126]步骤408:向同步模块发送同步成功信号,以使同步模块将同步成功信号发送给设置模块,并进入步骤410。[0127]步骤409:向同步模块发送失败信号,以使同步模块将同步失败信号发送给设置模块,返回步骤403。
[0128]步骤410:消息处理模块接收所需处理的消息,读取最新的消息处理逻辑表达式,按照最新的消息处理逻辑表达式对应的消息处理逻辑对所需处理的消息进行处理。
[0129]步骤411:结果输出模块将消息处理单元输出的处理结果发送至对应的结果接收装置,结果接收装置包括消息队列、短信发送系统、邮件发送系统、数据库以及系统模块。
[0130]这样,本发明实施例中提供一套表达式系统,抽象各种消息处理逻辑和实现方式,可以将处理逻辑中的规则项、函数、宏变量以及相互之间的关系组成消息处理逻辑表达式,也即消息的处理逻辑可以使用表达式系统中的内容组合实现,具有灵活性;通过实时接收用户利用表达式系统建立的消息处理逻辑表达式,并同步到消息处理模块,在系统不需要重启的情况下,用户在线修改消息处理逻辑表达式就能够实现消息处理逻辑和流程的变更。在本发明实施例中消息的处理逻辑都可以使用逻辑表达式灵活组合实现,且消息处理逻辑可以随时变更,从而实现了一个通用的、高响应、灵活扩展的分布式消息处理系统。
[0131]需要说明的是,本说明书中各个实施例采用递进的方式描述,每个实施例重点说明的都是与其他实施例的不同之处,各个实施例之间相同相似部分互相参见即可。对于实施例公开的系统或装置而言,由于其与实施例公开的方法相对应,所以描述的比较简单,相关之处参见方法部分说明即可。
[0132]还需要说明的是,在本文中,诸如第一和第二等之类的关系术语仅仅用来将一个实体或者操作与另一个实体或操作区分开来,而不一定要求或者暗示这些实体或操作之间存在任何这种实际的关系或者顺序。而且,术语“包括”、“包含”或者其任何其他变体意在涵盖非排他性的包含,从而使得包括一系列要素的过程、方法、物品或者设备不仅包括那些要素,而且还包括没有明确列出的其他要素,或者是还包括为这种过程、方法、物品或者设备所固有的要素。在没有更多限制的情况下,由语句“包括一个……”限定的要素,并不排除在包括所述要素的过程、方法、物品或者设备中还存在另外的相同要素。
[0133]结合本文中所公开的实施例描述的方法或算法的步骤可以直接用硬件、处理器执行的软件模块,或者二者的结合来实施。软件模块可以置于随机存储器(RAM)、内存、只读存储器(ROM)、电可编程ROM、电可擦除可编程ROM、寄存器、硬盘、可移动磁盘、CD-ROM、或【技术领域】内所公知的任意其它形式的存储介质中。[0134]对所公开的实施例的上述说明,使本领域专业技术人员能够实现或使用本发明。对这些实施例的多种修改对本领域的专业技术人员来说将是显而易见的,本文中所定义的一般原理可以在不脱离本发明的精神或范围的情况下,在其它实施例中实现。因此,本发明将不会被限制于本文所示的这些实施例,而是要符合与本文所公开的原理和新颖特点相一致的最宽的范围。
【权利要求】
1.一种实现分布式消息处理的系统,其特征在于,所述系统包括: 表达式模块、设置模块、同步模块以及至少一个消息处理模块; 所述表达式模块中包括有规则项、函数、运算符、表达式、宏变量以及触发器;所述触发器用于当所需处理的消息满足消息处理逻辑表达式时触发对应的处理操作; 所述设置模块,用于接收用户利用所述表达式模块建立的消息处理逻辑表达式,并将所述消息处理逻辑表达式发送给同步模块;所述消息处理逻辑表达式由所述规则项、所述函数、所述运算符、所述表达式、所述宏变量中的一种或多种组成; 所述同步模块,用于将接收到的所述消息处理逻辑表达式实时发送给所述消息处理模块; 消息处理模块,用于根据所述同步模块实时发送的消息处理逻辑表达式,更新自身保存的消息处理逻辑表达式;接收所需处理的消息,读取最新的消息处理逻辑表达式,按照所述最新的消息处理逻辑表达式对应的消息处理逻辑对所述所需处理的消息进行处理。
2.根据权利要求1所述的系统,其特征在于,所述系统还包括: 结果输出模块,用于将所述消息处理单元输出的处理结果发送至对应的结果接收装置,所述结果接收装置包括消息队列、短信发送系统、邮件发送系统、数据库以及系统模块。
3.根据权利要 求1所述的系统,其特征在于,所述消息处理模块包括: 更新单元,用于根据所述同步模块实时发送的消息处理逻辑表达式,更新自身保存的消息处理逻辑表达式; 消息初选单元,用于接收所需处理的消息,读取最新的消息处理逻辑表达式,对所述所需处理的消息进行是否符合所述最新的消息处理逻辑表达式的筛选,将筛选通过的消息放入消息队列; 综合处理单元,用于从所述消息队列读取所述筛选通过的消息,读取最新的消息处理逻辑表达式;按照所述最新的消息处理逻辑表达式对应的消息处理逻辑对所述筛选通过的消息进行处理。
4.根据权利要求3所述的系统,其特征在于,所述综合处理单元包括: 第一读取子单元,用于从所述消息队列读取所述筛选通过的消息; 第二读取子单元,用于读取最新的消息处理逻辑表达式; 拆分子单元,用于将所述最新的消息处理逻辑表达式拆分为表达式最小单元; 处理子单元,用于按照所述表达式最小单元对应的消息处理逻辑对所述筛选通过的消息进行处理。
5.根据权利要求3所述的系统,其特征在于,所述消息处理单元还包括: 确认更新单元,用于在更新自身保存的消息处理逻辑表达式之后,判断更新是否成功,如果是,向所述同步模块发送同步成功信号,以使所述同步模块将所述同步成功信号发送给所述设置模块,如果否,向所述同步模块发送失败信号,以使所述同步模块将所述同步失败信号发送给所述设置模块,使所述设置模块重新将接收用户利用所述表达式模块建立的消息处理逻辑表达式发送给所述同步模块。
6.根据权利要求1所述的系统,其特征在于,所述表达式模块还用于: 判断用户建立的消息处理逻辑表达式是否满足表达式合法性规则,如果是,触发所述设置模块将所述消息处理逻辑表达式发送给同步模块,如果否,通过所述设置模块提示用户重新建立消息处理逻辑表达式。
7.一种实现分布式消息处理的方法,其特征在于,所述方法包括: 设置模块接收用户利用表达式模块建立的消息处理逻辑表达式,并将所述消息处理逻辑表达式发送给同步模块;所述表达式模块中包括有规则项、函数、运算符、表达式、宏变量以及触发器;所述消息处理逻辑表达式由所述规则项、所述函数、所述运算符、所述表达式、所述宏变量中的一种或多种组成;所述触发器用于当所需处理的消息满足消息处理逻辑表达式时触发对应的处理操作; 所述同步模块将接收到的所述消息处理逻辑表达式实时发送给所述消息处理模块; 所述消息处理模块根据所述同步模块实时发送的消息处理逻辑表达式,更新自身保存的消息处理逻辑表达式;接收所需处理的消息,读取最新的消息处理逻辑表达式,按照所述最新的消息处理逻辑表达式对应的消息处理逻辑对所述所需处理的消息进行处理。
8.根据权利要求7所述的方法,其特征在于,所述方法还包括: 结果输出模块将所述消息处理单元输出的处理结果发送至对应的结果接收装置,所述结果接收装置包括消息队列、短信发送系统、邮件发送系统、数据库以及系统模块。
9.根据权利要求7所述的方法,其特征在于,所述按照所述最新的消息处理逻辑表达式对应的消息处理逻辑对所述所需处理的消息进行处理,包括: 对所述所需处理的消息进行是否符合所述最新的消息处理逻辑表达式的筛选,将筛选通过的消息放入消息队列; 从所述消息队列读取所述筛选通过的消息,按照所述最新的消息处理逻辑表达式对应的消息处理逻辑对所述筛选通过的消息进行处理。
10.根据权利要求9所述的方法,其特征在于,所述按照所述最新的消息处理逻辑表达式对应的消息处理逻辑对所述筛选通过的消息进行处理,包括: 将所述最新的消息处理逻辑表达式拆分为表达式最小单元; 按照所述表达式最小单元对应的消息处理逻辑对所述筛选通过的消息进行处理。
11.根据权利要求7所述的方法,其特征在于,在所述消息处理单元更新自身保存的消息处理逻辑表达式之后,所述方法还包括: 所述消息处理单元判断更新是否成功,如果是,向所述同步模块发送同步成功信号,以使所述同步模块将所述同步成功信号发送给所述设置模块,如果否,向所述同步模块发送失败信号,以使所述同步模块将所述同步失败信号发送给所述设置模块,使所述设置模块重新将接收用户利用所述表达式模块建立的消息处理逻辑表达式发送给所述同步模块。
12.根据权利要求7所述的方法,其特征在于,所述方法还包括: 所述表达式模块判断用户建立的消息处理逻辑表达式是否满足表达式合法性规则,如果是,触发所述设置模块将所述消息处理逻辑表达式发送给同步模块,如果否,通过所述设置模块提示用户重新建立消息处理逻辑表达式。
【文档编号】G06F9/50GK103914349SQ201410174706
【公开日】2014年7月9日 申请日期:2014年4月28日 优先权日:2014年4月28日
【发明者】王帅 申请人:北京搜狐新媒体信息技术有限公司

最新回复(0)