Loading [MathJax]/jax/input/TeX/config.js
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >dubbo源码——服务提供者的服务暴露过程(二)

dubbo源码——服务提供者的服务暴露过程(二)

原创
作者头像
冰枫
发布于 2018-06-11 04:23:58
发布于 2018-06-11 04:23:58
1.7K00
代码可运行
举报
文章被收录于专栏:冰枫冰枫
运行总次数:0
代码可运行

上篇博客讲到了doExportUrls()方法

  • #1顾名思义加载注册中心。(后面详细讲解
  • #2获取并遍历所有协议,将服务根据不同协议暴露,并注册到每个注册中心上。
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
//ServiceConfig.java
//serviceConfig->doExportUrls()
@SuppressWarnings({"unchecked", "rawtypes"})
private void doExportUrls() {
    List<URL> registryURLs = loadRegistries(true);   #1
    for (ProtocolConfig protocolConfig : protocols) { #2
        doExportUrlsFor1Protocol(protocolConfig, registryURLs);
    }
}
  • #1检查registry,如果registries为空,则从环境中加载registry参数。
  • #2如果registry标签的address属性为空,则默认为0.0.0.0
  • #3从环境加载address,参数配置-Ddubbo.registry.address=127.0.0.1:2181,此优先级最高。
  • #4将application,config填充到map
  • #5将path设置为RegistryService类名。example:registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService/xxxxxxx
  • #6dubbo版本号、时间戳、pid,protocol
  • #7

##1将address进行分割,和属性填充例如在一个标签中中配置了多个地址,example:<dubbo:registry

address="dubbo://127.0.0.1:2181;zookeeper://127.0.0.2:2181"/>,则会被分割为dubbo://127.0.0.1:2181 , zookeeper://127.0.0.12:2181来处理。

##2对url的参数进行一些填充,如protocol、username、password、port、host,path,最终生成的url:example: registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo2-service&dubbo=2.6.1&pid=1773&timestamp=1528679207767

  • #8设置url参数,example: registry=zookeeper
  • #9设置协议为registry
  • #10我们在调用loadRegistries()时传入的参数为true,代表为服务提供者,那么register不应该为false,如果传入false,那说明为消费者,则subscribe不应该为false。

最终的url形式为example: registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo2-service&dubbo=2.6.1&pid=1922&registry=zookeeper&timestamp=1528679861114

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public static List<URL> parseURLs(String address, Map<String, String> defaults) {
    if (address == null || address.length() == 0) {
        return null;
    }
    String[] addresses = Constants.REGISTRY_SPLIT_PATTERN.split(address);  ##1
    if (addresses == null || addresses.length == 0) {
        return null; //here won't be empty
    }
    List<URL> registries = new ArrayList<URL>();
    for (String addr : addresses) {
        registries.add(parseURL(addr, defaults));     ##2
    }
    return registries;
}
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public static URL parseURL(String address, Map<String, String> defaults) {
    if (address == null || address.length() == 0) {
        return null;
    }
    String url;
    if (address.indexOf("://") >= 0) {
        url = address;
    } else {
        String[] addresses = Constants.COMMA_SPLIT_PATTERN.split(address);
        url = addresses[0];
        if (addresses.length > 1) {
            StringBuilder backup = new StringBuilder();
            for (int i = 1; i < addresses.length; i++) {
                if (i > 1) {
                    backup.append(",");
                }
                backup.append(addresses[i]);
            }
            url += "?" + Constants.BACKUP_KEY + "=" + backup.toString();
        }
    }
    String defaultProtocol = defaults == null ? null : defaults.get("protocol");
    if (defaultProtocol == null || defaultProtocol.length() == 0) {
        defaultProtocol = "dubbo";
    }
    String defaultUsername = defaults == null ? null : defaults.get("username");
    String defaultPassword = defaults == null ? null : defaults.get("password");
    int defaultPort = StringUtils.parseInteger(defaults == null ? null : defaults.get("port"));
    String defaultPath = defaults == null ? null : defaults.get("path");
    Map<String, String> defaultParameters = defaults == null ? null : new HashMap<String, String>(defaults);
    if (defaultParameters != null) {
        defaultParameters.remove("protocol");
        defaultParameters.remove("username");
        defaultParameters.remove("password");
        defaultParameters.remove("host");
        defaultParameters.remove("port");
        defaultParameters.remove("path");
    }
    URL u = URL.valueOf(url);
    boolean changed = false;
    String protocol = u.getProtocol();
    String username = u.getUsername();
    String password = u.getPassword();
    String host = u.getHost();
    int port = u.getPort();
    String path = u.getPath();
    Map<String, String> parameters = new HashMap<String, String>(u.getParameters());
    if ((protocol == null || protocol.length() == 0) && defaultProtocol != null && defaultProtocol.length() > 0) {
        changed = true;
        protocol = defaultProtocol;
    }
    if ((username == null || username.length() == 0) && defaultUsername != null && defaultUsername.length() > 0) {
        changed = true;
        username = defaultUsername;
    }
    if ((password == null || password.length() == 0) && defaultPassword != null && defaultPassword.length() > 0) {
        changed = true;
        password = defaultPassword;
    }
    /*if (u.isAnyHost() || u.isLocalHost()) {
        changed = true;
        host = NetUtils.getLocalHost();
    }*/
    if (port <= 0) {
        if (defaultPort > 0) {
            changed = true;
            port = defaultPort;
        } else {
            changed = true;
            port = 9090;
        }
    }
    if (path == null || path.length() == 0) {
        if (defaultPath != null && defaultPath.length() > 0) {
            changed = true;
            path = defaultPath;
        }
    }
    if (defaultParameters != null && defaultParameters.size() > 0) {
        for (Map.Entry<String, String> entry : defaultParameters.entrySet()) {
            String key = entry.getKey();
            String defaultValue = entry.getValue();
            if (defaultValue != null && defaultValue.length() > 0) {
                String value = parameters.get(key);
                if (value == null || value.length() == 0) {
                    changed = true;
                    parameters.put(key, defaultValue);
                }
            }
        }
    }
    if (changed) {
        u = new URL(protocol, username, password, host, port, path, parameters);
    }
    return u;
}

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
//AbstractInterfaceConfig.java
//AbstractInterfaceConfig.loadRegistries()
    protected List<URL> loadRegistries(boolean provider) {
        checkRegistry();   #1
        List<URL> registryList = new ArrayList<URL>();
        if (registries != null && !registries.isEmpty()) {
            for (RegistryConfig config : registries) {
                String address = config.getAddress();
                if (address == null || address.length() == 0) {    #2
                    address = Constants.ANYHOST_VALUE;
                }
                String sysaddress = System.getProperty("dubbo.registry.address");  #3
                if (sysaddress != null && sysaddress.length() > 0) {
                    address = sysaddress;
                }
                if (address != null && address.length() > 0
                        && !RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(address)) {
                    Map<String, String> map = new HashMap<String, String>();
                    appendParameters(map, application);       #4
                    appendParameters(map, config);
                    map.put("path", RegistryService.class.getName());   #5
                    map.put("dubbo", Version.getVersion());  #6
                    map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
                    if (ConfigUtils.getPid() > 0) {
                        map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
                    }
                    if (!map.containsKey("protocol")) {
                        if (ExtensionLoader.getExtensionLoader(RegistryFactory.class).hasExtension("remote")) {
                            map.put("protocol", "remote");
                        } else {
                            map.put("protocol", "dubbo");
                        }
                    }
                    List<URL> urls = UrlUtils.parseURLs(address, map); #7
                    for (URL url : urls) {
                        url = url.addParameter(Constants.REGISTRY_KEY, url.getProtocol()); #8
                        url = url.setProtocol(Constants.REGISTRY_PROTOCOL);   #9
                        if ((provider && url.getParameter(Constants.REGISTER_KEY, true))  #10
                                || (!provider && url.getParameter(Constants.SUBSCRIBE_KEY, true))) {
                            registryList.add(url);
                        }
                    }
                }
            }
        }
        return registryList;
    }

ok,既然注册中心URL构造好了,那么就该遍历每个协议,进行服务暴露和注册了!doExportUrlsFor1Protocol()

emmm,dubbo的方法命名和Spring的好像啊,do....就是方法也太长了吧。。

  • #1协议name,默认为dubbo。支持的协议还有rmi、hessian、http、webservice、thrift、redis
  • #2设置side,dubbo,timestamp,pid,ApplicationConfig,ModuleConfig,ProviderCofnig,ProtocolConfig,ServiceConfig等参数
  • #3如果在<dubbo:service/>标签中配置的子标签属性放入map。
  • #4如果generic为true,那么将generic=true,method=*作为参数
  • #5否则为普通接口,将当前服务的版本reversion=xxxx作为参数。
  • #6获取接口中的方法名,用,分割for example: methods=test1,test2;如果不存在方法则使用通配符*。
  • #7如果配置文件配置token为true或者default,则使用UUID作为token,否则将自定义。
  • #8获取host

①从系统参数中中获取host example: -DDUBBO_IP_TO_BIND=192.168.1.1

String hostToBind = getValueFromConfig(protocolConfig, Constants.DUBBO_IP_TO_BIND);

②从配置文件中获取host example <dubbo:protocol host="192.168.1.1"/>

hostToBind = provider.getHost()

③从本机中获取host

hostToBind = InetAddress.getLocalHost().getHostAddress();

④获取注册中心的host

hostToBind = socket.getLocalAddress().getHostAddress();

⑤遍历本地网卡,返回第一个合理的IP

hostToBind = getLocalHost();

host有效性验证:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public static boolean isInvalidLocalHost(String host) {
    return host == null
            || host.length() == 0
            || host.equalsIgnoreCase("localhost")
            || host.equals("0.0.0.0")
            || (LOCAL_IP_PATTERN.matcher(host).matches());
}
  • #9 获取port

①从系统运行参数中获取 for example: -DDUBBO_PORT_TO_BIND=28080

String port = getValueFromConfig(protocolConfig, Constants.DUBBO_PORT_TO_BIND);

②从配置文件中获取 for example: <dubbo:protocol port="28080"/>

portToBind = protocolConfig.getPort();

③从provider配置中获取端口

portToBind = provider.getPort();

④获取协议默认端口

final int defaultPort = ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(name).getDefaultPort();

⑤获取随机端口

portToBind = getRandomPort(name);

  • #10根据以上参数构造出协议对应的URL。
  • #11如果配置了scope为none,则不暴露服务。
  • #12如果scope配置为local,(默认为local),则在本地进行暴露,而不向注册中心进行注册服务。
  • #13如果scope配置为remote,则先在本地进行暴露,然后向注册中心注册服务。
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
    String name = protocolConfig.getName();    #1
    if (name == null || name.length() == 0) {
        name = "dubbo";
    }

    Map<String, String> map = new HashMap<String, String>();
    map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE); #2
    map.put(Constants.DUBBO_VERSION_KEY, Version.getVersion());
    map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
    if (ConfigUtils.getPid() > 0) {
        map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
    }
    appendParameters(map, application);
    appendParameters(map, module);
    appendParameters(map, provider, Constants.DEFAULT_KEY);
    appendParameters(map, protocolConfig);
    appendParameters(map, this);
    if (methods != null && !methods.isEmpty()) {   #3
        for (MethodConfig method : methods) {
            appendParameters(map, method, method.getName());
            String retryKey = method.getName() + ".retry";
            if (map.containsKey(retryKey)) {
                String retryValue = map.remove(retryKey);
                if ("false".equals(retryValue)) {
                    map.put(method.getName() + ".retries", "0");
                }
            }
            List<ArgumentConfig> arguments = method.getArguments();
            if (arguments != null && !arguments.isEmpty()) {
                for (ArgumentConfig argument : arguments) {
                    // convert argument type
                    if (argument.getType() != null && argument.getType().length() > 0) {
                        Method[] methods = interfaceClass.getMethods();
                        // visit all methods
                        if (methods != null && methods.length > 0) {
                            for (int i = 0; i < methods.length; i++) {
                                String methodName = methods[i].getName();
                                // target the method, and get its signature
                                if (methodName.equals(method.getName())) {
                                    Class<?>[] argtypes = methods[i].getParameterTypes();
                                    // one callback in the method
                                    if (argument.getIndex() != -1) {
                                        if (argtypes[argument.getIndex()].getName().equals(argument.getType())) {
                                            appendParameters(map, argument, method.getName() + "." + argument.getIndex());
                                        } else {
                                            throw new IllegalArgumentException("argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
                                        }
                                    } else {
                                        // multiple callbacks in the method
                                        for (int j = 0; j < argtypes.length; j++) {
                                            Class<?> argclazz = argtypes[j];
                                            if (argclazz.getName().equals(argument.getType())) {
                                                appendParameters(map, argument, method.getName() + "." + j);
                                                if (argument.getIndex() != -1 && argument.getIndex() != j) {
                                                    throw new IllegalArgumentException("argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
                                                }
                                            }
                                        }
                                    }
                                }
                            }
                        }
                    } else if (argument.getIndex() != -1) {
                        appendParameters(map, argument, method.getName() + "." + argument.getIndex());
                    } else {
                        throw new IllegalArgumentException("argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>");
                    }

                }
            }
        } // end of methods for
    }

    if (ProtocolUtils.isGeneric(generic)) {    #4
        map.put("generic", generic);
        map.put("methods", Constants.ANY_VALUE);
    } else {    #5
        String revision = Version.getVersion(interfaceClass, version);
        if (revision != null && revision.length() > 0) {
            map.put("revision", revision);
        }

        String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames(); #6
        if (methods.length == 0) {
            logger.warn("NO method found in service interface " + interfaceClass.getName());
            map.put("methods", Constants.ANY_VALUE);
        } else {
            map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
        }
    }
    if (!ConfigUtils.isEmpty(token)) {    #7
        if (ConfigUtils.isDefault(token)) {
            map.put("token", UUID.randomUUID().toString());
        } else {
            map.put("token", token);
        }
    }
    if ("injvm".equals(protocolConfig.getName())) {  
        protocolConfig.setRegister(false);
        map.put("notify", "false");
    }
    // export service
    String contextPath = protocolConfig.getContextpath();
    if ((contextPath == null || contextPath.length() == 0) && provider != null) {
        contextPath = provider.getContextpath();
    }

    String host = this.findConfigedHosts(protocolConfig, registryURLs, map); #8
    Integer port = this.findConfigedPorts(protocolConfig, name, map); #9
    URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);
#10
    if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
            .hasExtension(url.getProtocol())) {
        url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                .getExtension(url.getProtocol()).getConfigurator(url).configure(url);
    }

    String scope = url.getParameter(Constants.SCOPE_KEY);
    // don't export when none is configured
    if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {  #11

        // export to local if the config is not remote (export to remote only when config is remote)
        if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {  #12
            exportLocal(url);
        }
        // export to remote if the config is not local (export to local only when config is local)
        if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {  #13
            if (logger.isInfoEnabled()) {
                logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
            }
            if (registryURLs != null && !registryURLs.isEmpty()) {
                for (URL registryURL : registryURLs) {
                    url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
                    URL monitorUrl = loadMonitor(registryURL);
                    if (monitorUrl != null) {
                        url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
                    }
                    if (logger.isInfoEnabled()) {
                        logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
                    }
                    Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
                    DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

                    Exporter<?> exporter = protocol.export(wrapperInvoker);
                    exporters.add(exporter);
                }
            } else {
                Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
                DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

                Exporter<?> exporter = protocol.export(wrapperInvoker);
                exporters.add(exporter);
            }
        }
    }
    this.urls.add(url);
}

既然这样, 那么我们只需查看remote的代码即可。

  • #1增加dynamic参数,dynamic:服务是否动态注册,如果设为false,注册后将显示后disable状态,需人工启用,并且服务提供者停止时,也不会自动取消册,需人工禁用。 默认:true。
  • #2如果提供了监控中心,那么将会启用注册中心
  • #3使用ProxyFactory将实例包装成Invoker,dubbo默认使用JavaassistRpcProxyFactory
  • #4将invoker包装DelegateProviderMetaDataInvoker,包括invoker和ServiceConfig

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {  
            if (logger.isInfoEnabled()) {
                logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
            }
            if (registryURLs != null && !registryURLs.isEmpty()) {
                for (URL registryURL : registryURLs) {
                    url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));  #1
                    URL monitorUrl = loadMonitor(registryURL); #2
                    if (monitorUrl != null) {
                        url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
                    }
                    if (logger.isInfoEnabled()) {
                        logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
                    }
                    #3
                    Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
                    DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); #4

                    Exporter<?> exporter = protocol.export(wrapperInvoker);
                    exporters.add(exporter);
                }
            } else {
                Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
                DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

                Exporter<?> exporter = protocol.export(wrapperInvoker);
                exporters.add(exporter);
            }

export()方法是核心方法,根据不同协议暴露服务。下篇博客讲解

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
大数据技术之_24_电影推荐系统项目_08_项目总结及补充
一 数据加载服务1、目标2、步骤二 离线推荐服务2.1 基于统计性算法1、目标2、步骤2.2 基于隐语义模型(LFM)的协同过滤推荐算法(ALS)1、目标2、步骤2.3 基于 ElasticSearch 的内容推荐算法1、目标2、步骤2.4 基于内容的推荐服务--电影标签三 实时推荐服务3.1 推荐算法解析3.2 实时推荐算法的实现过程3.3 日志的预处理四 综合业务服务4.1 后台架构4.2 Spring 框架搭建4.3 API 接口规划五 用户可视化服务5.1 前端框架搭建5.2 创建与运行项目5.2.1 创建项目骨架5.2.2 添加项目依赖5.2.3 创建模块、组件与服务5.2.4 调试项目5.2.5 发布项目六 项目重构6.1 核心模型提取6.2 通过配置的方式来获取硬编码的值6.3 项目打包6.3.1 AngularJS 前端文件打包6.3.2 businessServer 下的 java web 项目的打包方式6.3.3 核心模型 项目的打包方式6.3.4 recommender 下的后端文件打包方式6.4 系统部署
黑泽君
2019/05/26
2.7K0
大数据技术之_28_电商推荐系统项目_01
  项目以推荐系统建设领域知名的经过修改过的中文亚马逊电商数据集作为依托,以某电商网站真实业务数据架构为基础,构建了符合教学体系的一体化的电商推荐系统,包含了离线推荐与实时推荐体系,综合利用了协同过滤算法以及基于内容的推荐方法来提供混合推荐。提供了从前端应用、后台服务、算法设计实现、平台部署等多方位的闭环的业务实现。
黑泽君
2019/06/15
3.2K1
大数据技术之_24_电影推荐系统项目_06_项目体系架构设计 + 工具环境搭建 + 创建项目并初始化业务数据 + 离线推荐服务建设 + 实时推荐服务建设 + 基于内容的推荐服务建设
  用户可视化:主要负责实现和用户的交互以及业务数据的展示, 主体采用 AngularJS2 进行实现,部署在 Apache 服务上。(或者可以部署在 Nginx 上)   综合业务服务:主要实现 JavaEE 层面整体的业务逻辑,通过 Spring 进行构建,对接业务需求。部署在 Tomcat 上。 【数据存储部分】   业务数据库:项目采用广泛应用的文档数据库 MongDB 作为主数据库,主要负责平台业务逻辑数据的存储。   搜索服务器:项目采用 ElasticSearch 作为模糊检索服务器,通过利用 ES 强大的匹配查询能力实现基于内容的推荐服务。   缓存数据库:项目采用 Redis 作为缓存数据库,主要用来支撑实时推荐系统部分对于数据的高速获取需求。 【离线推荐部分】   离线统计服务:批处理统计性业务采用 Spark Core + Spark SQL 进行实现,实现对指标类数据的统计任务。   离线推荐服务:离线推荐业务采用 Spark Core + Spark MLlib 进行实现,采用 ALS 算法进行实现。   工作调度服务:对于离线推荐部分需要以一定的时间频率对算法进行调度,采用 Azkaban 进行任务的调度。 【实时推荐部分】   日志采集服务:通过利用 Flume-ng 对业务平台中用户对于电影的一次评分行为进行采集,实时发送到 Kafka 集群。   消息缓冲服务:项目采用 Kafka 作为流式数据的缓存组件,接受来自 Flume 的数据采集请求。并将数据推送到项目的实时推荐系统部分。   实时推荐服务:项目采用 Spark Streaming 作为实时推荐系统,通过接收 Kafka 中缓存的数据,通过设计的推荐算法实现对实时推荐的数据处理,并将结果合并更新到 MongoDB 数据库。
黑泽君
2019/05/23
5.4K0
大数据技术之_24_电影推荐系统项目_06_项目体系架构设计 + 工具环境搭建 + 创建项目并初始化业务数据 + 离线推荐服务建设 + 实时推荐服务建设 + 基于内容的推荐服务建设
大数据技术之_27_电商平台数据分析项目_03_项目概述 + 项目主体架构 + 模拟业务数据源 + 程序框架解析 + 需求解析 + 项目总结
1、user_visit_action user_visit_action 表,存放网站或者 APP 每天的点击流数据。通俗地讲,就是用户对 网站/APP 每点击一下,就会产生一条存放在这个表里面的数据。
黑泽君
2019/06/14
3.9K0
大数据技术之_27_电商平台数据分析项目_03_项目概述 + 项目主体架构 + 模拟业务数据源 + 程序框架解析 + 需求解析 + 项目总结
使用Spark MLlib给豆瓣用户推荐电影
问题导读: 1.常用的推荐算法有哪些? 2.推荐系统是什么样的流程? 3.从这个推荐系统我们能学到什么? 推荐算法就是利用用户的一些行为,通过一些数学算法,推测出用户可能喜欢的东西。 随着电子商务规模的不断扩大,商品数量和种类不断增长,用户对于检索和推荐提出了更高的要求。由于不同用户在兴趣爱好、关注领域、个人经历等方面的不同,以满足不同用户的不同推荐需求为目的、不同人可以获得不同推荐为重要特征的个性化推荐系统应运而生。 推荐系统成为一个相对独立的研究方向一般被认为始自1994年明尼苏达大学GroupLen
用户1410343
2018/03/27
2.1K0
使用Spark MLlib给豆瓣用户推荐电影
大数据技术周报第 003 期
一是客户端、服务端需要的内存会变多(需要维护一些分区的信息,如果分区越多,这些信息所占的内存就越大)
大数据学习指南
2022/05/26
2480
大数据技术周报第 003 期
大数据技术之_26_交通状态预测项目_01
该项目以车辆预测为基础,学习业务解决的方法论。 学习完本项目后,可以解决如下问题或适用于如下业务场景:   1、公路堵车预测   2、地铁人流量预测   3、共享单车聚集点预测等等
黑泽君
2019/05/30
1.3K0
利用Spark MLIB实现电影推荐
源码及数据集:https://github.com/luo948521848/BigData
Java架构师必看
2021/07/22
1.1K0
BigData--大数据技术之SparkStreaming
所有基于窗口的操作都需要两个参数,分别为窗口时长以及滑动步长,两者都必须是 StreamContext 的批次间隔的整数倍。
MiChong
2020/09/24
9700
BigData--大数据技术之SparkStreaming
白话推荐系统——从原理到实践,还有福利赠送!
之前流水账似的介绍过一篇机器学习入门的文章,大致介绍了如何学习以及机器学习的入门方法并提供了一些博主自己整理的比较有用的资源。这篇就尽量以白话解释并介绍机器学习在推荐系统中的实践以及遇到的问题... 也许很多点在行家的眼里都是小菜一碟,但是对于刚刚接触机器学习来说,还有很多未知等待挑战。 所以读者可以把本篇当做是机器学习的玩具即可,如果文中有任何问题,还请不吝指教。 本篇将会以下面的步骤描述机器学习是如何在实践中应用的: 1 什么是推荐系统? 2 机器学习的作用 3 机器学习是如何使用的? 4 基于S
用户1154259
2018/01/17
8330
白话推荐系统——从原理到实践,还有福利赠送!
基于大数据技术的开源在线教育项目 三
本文代码可在开源项目https://github.com/SoundHearer/kuaiban中找到。
soundhearer
2020/12/18
6030
Spark2.3.1+Kafka0.9使用Direct模式消费信息异常
在验证kafka属性时不能使用scala默认的类,需要指定kafka带的类 createDirectStream[String, String, StringDecoder, StringDecoder]其中StringDecoder必须是kafka.serializer.StringDecoder
笨兔儿
2019/05/26
8670
Spark Core项目实战(1) | 准备数据与计算Top10 热门品类(附完整项目代码及注释)
  本实战项目的数据是采集自电商的用户行为数据.   主要包含用户的 4 种行为: 搜索, 点击, 下单和支付.   数据格式如下, 不同的字段使用下划线分割开_:
不温卜火
2020/10/28
1.1K0
Spark Core项目实战(1) | 准备数据与计算Top10 热门品类(附完整项目代码及注释)
Spark与mongodb整合完整版本
一,准备阶段 MongoDB Connector for spark是的spark操作mongodb数据很简单,这样方便使用spark去分析mongodb数据,sql分析,流式处理,机器学习,图计算。 要求: 1),要有mongodb和spark的基础 2),mongodb要求是2.6以上 3),Spark 1.6.x 4),Scala 2.10.x 使用mongo-spark-connector_2.10 5),Scala 2.11.x 使用mongo-spark-connector_2.11 <depe
Spark学习技巧
2018/01/30
9.4K0
Spark与mongodb整合完整版本
【从0开始の全记录】Flume+Kafka+Spark+Spring Boot 统计网页访问量项目
新建Scala文件——WebStatStreamingApp.scala,首先使用Direct模式连通Kafka:
王知无-import_bigdata
2020/08/20
2K0
【从0开始の全记录】Flume+Kafka+Spark+Spring Boot 统计网页访问量项目
Spark2Streaming读非Kerberos环境的Kafka并写数据到Kudu
在前面的文章Fayson介绍了在Kerberos环境下《Spark2Streaming读Kerberos环境的Kafka并写数据到Kudu》,本篇文章Fayson主要介绍如何使用Spark2 Streaming访问非Kerberos环境的Kafka并将接收到的数据写入Kudu。
Fayson
2018/08/17
1K0
Spark2Streaming读非Kerberos环境的Kafka并写数据到Kudu
Spark2Streaming读Kerberos环境的Kafka并写数据到Kudu
温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。 Fayson的github: https://github.com/fayson/cdhproject 提示:代码块部分可以左右滑动查看噢 1.文档编写目的 ---- 在前面的文章Fayson介绍了一些关于SparkStreaming的示例《如何使用Spark Streaming读取HBase的数据并写入到HDFS》、《SparkStreaming读Kafka数据写HBase》和《SparkStreaming读Kafka数据写Kudu》以上文章
Fayson
2018/07/12
2.6K0
2021年大数据Spark(二十一):Spark Core案例-SogouQ日志分析
使用搜狗实验室提供【用户查询日志(SogouQ)】数据,使用Spark框架,将数据封装到RDD中进行业务数据处理分析。数据网址:http://www.sogou.com/labs/resource/q.php
Lansonli
2021/10/09
2.1K1
大数据技术之_27_电商平台数据分析项目_02_预备知识 + Scala + Spark Core + Spark SQL + Spark Streaming + Java 对象池
List 元素的追加 方式1-在列表的最后增加数据 方式2-在列表的最前面增加数据
黑泽君
2019/06/14
2.9K0
大数据技术之_27_电商平台数据分析项目_02_预备知识 + Scala + Spark Core + Spark SQL + Spark Streaming + Java 对象池
Flink入门学习笔记
创建一个执行环境,表示当前执行程序的上下文。 如果程序是独立调用的,则此方法返回本地执行环境;如果从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境,也就是说,getExecutionEnvironment 会根据查询运行的方式决定返回什么样的运行环境,是最常用的一种创建执行环境的方式。
用砖头敲代码
2022/06/14
9580
推荐阅读
大数据技术之_24_电影推荐系统项目_08_项目总结及补充
2.7K0
大数据技术之_28_电商推荐系统项目_01
3.2K1
大数据技术之_24_电影推荐系统项目_06_项目体系架构设计 + 工具环境搭建 + 创建项目并初始化业务数据 + 离线推荐服务建设 + 实时推荐服务建设 + 基于内容的推荐服务建设
5.4K0
大数据技术之_27_电商平台数据分析项目_03_项目概述 + 项目主体架构 + 模拟业务数据源 + 程序框架解析 + 需求解析 + 项目总结
3.9K0
使用Spark MLlib给豆瓣用户推荐电影
2.1K0
大数据技术周报第 003 期
2480
大数据技术之_26_交通状态预测项目_01
1.3K0
利用Spark MLIB实现电影推荐
1.1K0
BigData--大数据技术之SparkStreaming
9700
白话推荐系统——从原理到实践,还有福利赠送!
8330
基于大数据技术的开源在线教育项目 三
6030
Spark2.3.1+Kafka0.9使用Direct模式消费信息异常
8670
Spark Core项目实战(1) | 准备数据与计算Top10 热门品类(附完整项目代码及注释)
1.1K0
Spark与mongodb整合完整版本
9.4K0
【从0开始の全记录】Flume+Kafka+Spark+Spring Boot 统计网页访问量项目
2K0
Spark2Streaming读非Kerberos环境的Kafka并写数据到Kudu
1K0
Spark2Streaming读Kerberos环境的Kafka并写数据到Kudu
2.6K0
2021年大数据Spark(二十一):Spark Core案例-SogouQ日志分析
2.1K1
大数据技术之_27_电商平台数据分析项目_02_预备知识 + Scala + Spark Core + Spark SQL + Spark Streaming + Java 对象池
2.9K0
Flink入门学习笔记
9580
相关推荐
大数据技术之_24_电影推荐系统项目_08_项目总结及补充
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档