氛围 多个流式 XmlHttpRequests (XHR)/Channels 块?

发布于 2024-10-11 00:31:54 字数 9534 浏览 8 评论 0原文

我正在构建一个使用 Comet 的网络应用程序。后端是用 Atmosphere 和 Jersey 构建的。然而,当我想订阅多个频道时,我遇到了麻烦。 jQuery 插件氛围仅支持 1 个通道。我开始为目前的彗星编写自己的实现。

问题

如果我用消息“Hello”更新频道 1,我不会收到通知。然而,当我之后用消息“World”更新频道 2 时。我同时收到“Hello”和“World”。


var connection1 = new AtmosphereConnectionComet("http://localhost/b/product/status/1");
var connection2 = new AtmosphereConnectionComet("http://localhost/b/product/status/2");

var handleMessage = function(msg)
{
   alert(msg);
};

connection1.NewMessage.add(handleMessage);
connection2.NewMessage.add(handleMessage);

connection1.connect();
connection2.connect();

AtmosphereConnectionComet 实现:

更新

  • 添加了 Ivo 的修复(范围和即时问题)
  • 修复了 <--EOD --> indexOf 捕获气氛消息
  • 向 onIncoming XHR 方法添加了注释

function AtmosphereConnectionComet(url)
{
    //signals for dispatching
    this.Connected = new signals.Signal();
    this.Disconnected = new signals.Signal();
    this.NewMessage = new signals.Signal();

    //private vars
    var xhr = null;
    var self = this;
    var gotWelcomeMessage = false;
    var readPosition;
    var url = url;

    //private methods
    var onIncomingXhr = function()
    {
        //check if we got some new data
        if (xhr.readyState == 3)
        {
            //if the status is oke
            if (xhr.status==200)  // Received a message
            {
                //get the message
                //this is like streaming.. each time we get readyState 3 and status 200 there will be text appended to xhr.responseText
                var message = xhr.responseText;

                console.log(message);

                //check if we dont have the welcome message yet and if its maybe there... (it doesn't come in one pull)
                if(!gotWelcomeMessage && message.indexOf("<--EOD-->") > -1)
                {
                    //we has it
                    gotWelcomeMessage = true;
                    //dispatch a signal
                    self.Connected.dispatch(sprintf("Connected to %s", url));
                }
                //welcome message set, from now on only messages (yes this will fail for larger date i presume)
                else
                {
                    //dispatch the new message by substr from the last readPosition
                    self.NewMessage.dispatch(message.substr(readPosition));
                }

                //update the readPosition to the size of this message
                readPosition = xhr.responseText.length;
            }
        }
        //ooh the connection got resumed, seems we got disconnected
        else if (xhr.readyState == 4)
        {
            //disconnect
            self.disconnect();
        }
    }

    var getXhr = function()
    {
        if ( window.location.protocol !== "file:" ) {
            try {
                return new window.XMLHttpRequest();
            } catch(xhrError) {}
        }

        try {
            return new window.ActiveXObject("Microsoft.XMLHTTP");
        } catch(activeError) {}
    }

    this.connect = function()
    {
        xhr = getXhr();
        xhr.onreadystatechange = onIncomingXhr;
        xhr.open("GET", url, true);
        xhr.send(null);
    }

    this.disconnect = function()
    {
        xhr.onreadystatechange = null;
        xhr.abort();
    }

    this.send = function(message)
    {

    }
}

UPDATE 9-1 23:00 GMT+1

看来气氛没有输出这些内容。.

ProductEventObserver

这是观察 SEAM 事件的 ProductEventObserver。该组件是自动创建的,并且位于 SEAM 的 APPLICATION 上下文中。它捕获事件并使用broadcastToProduct 获取正确的广播器(通过broadcasterfactory)并将json 消息(我使用gson 作为json 序列化器/编组器)广播到挂起的连接。


package nl.ambrero.botenveiling.managers.product;

import com.google.gson.Gson;
import nl.ambrero.botenveiling.entity.product.Product;
import nl.ambrero.botenveiling.entity.product.ProductBid;
import nl.ambrero.botenveiling.entity.product.ProductBidRetraction;
import nl.ambrero.botenveiling.entity.product.ProductRetraction;
import nl.ambrero.botenveiling.managers.EventTypes;
import nl.ambrero.botenveiling.rest.vo.*;
import org.atmosphere.cpr.Broadcaster;
import org.atmosphere.cpr.BroadcasterFactory;
import org.atmosphere.cpr.DefaultBroadcaster;
import org.jboss.seam.ScopeType;
import org.jboss.seam.annotations.*;
import org.jboss.seam.log.Log;

@Name("productEventObserver")
@Scope(ScopeType.APPLICATION)
@AutoCreate
public class ProductEventObserver
{
    @Logger
    Log logger;

    Gson gson;

    @Create
    public void init()
    {
        gson = new Gson();
    }

    private void broadCastToProduct(int id, ApplicationEvent message)
    {
        Broadcaster broadcaster = BroadcasterFactory.getDefault().lookup(DefaultBroadcaster.class, String.format("%s", id));

        logger.info(String.format("There are %s broadcasters active", BroadcasterFactory.getDefault().lookupAll().size()));

        if(broadcaster == null)
        {
            logger.info("No broadcaster found..");

            return;
        }

        logger.info(String.format("Broadcasting message of type '%s' to '%s' with scope '%s'", message.getEventType(), broadcaster.getID(), broadcaster.getScope().toString()));

        broadcaster.broadcast(gson.toJson(message));
    }

    @Observer(value = { EventTypes.PRODUCT_AUCTION_EXPIRED, EventTypes.PRODUCT_AUCTION_SOLD })
    public void handleProductAcutionEnded(Product product)
    {
        this.broadCastToProduct(
            product.getId(),
            new ProductEvent(ApplicationEventType.PRODUCT_AUCTION_ENDED, product)
        );
    }

    @Observer(value = EventTypes.PRODUCT_RETRACTED)
    public void handleProductRetracted(ProductRetraction productRetraction)
    {
        this.broadCastToProduct(
            productRetraction.getProduct().getId(),
            new ProductRetractionEvent(ApplicationEventType.PRODUCT_RETRACTED, productRetraction)
        );
    }

    @Observer(value = EventTypes.PRODUCT_AUCTION_STARTED)
    public void handleProductAuctionStarted(Product product)
    {
        this.broadCastToProduct(
            product.getId(),
            new ProductEvent(ApplicationEventType.PRODUCT_AUCTION_STARTED, product)
        );
    }

    @Observer(value = EventTypes.PRODUCT_BID_ADDED)
    public void handleProductNewBid(ProductBid bid)
    {
        this.broadCastToProduct(
            bid.getProduct().getId(),
            new ProductBidEvent(ApplicationEventType.PRODUCT_BID_ADDED, bid)
        );
    }

    @Observer(value = EventTypes.PRODUCT_BID_RETRACTED)
    public void handleProductRetractedBid(ProductBidRetraction bidRetraction)
    {
        this.broadCastToProduct(
            bidRetraction.getProductBid().getProduct().getId(),
            new ProductBidRetractionEvent(ApplicationEventType.PRODUCT_BID_RETRACTED, bidRetraction)
        );
    }
}

Web.xml

<servlet>
        <description>AtmosphereServlet</description>
        <servlet-name>AtmosphereServlet</servlet-name>
        <servlet-class>org.atmosphere.cpr.AtmosphereServlet</servlet-class>
        <init-param>
            <param-name>com.sun.jersey.config.property.packages</param-name>
            <param-value>nl.ambrero.botenveiling.rest</param-value>
        </init-param>
        <init-param>
            <param-name>org.atmosphere.useWebSocket</param-name>
            <param-value>true</param-value>
        </init-param>
        <init-param>
            <param-name>org.atmosphere.useNative</param-name>
            <param-value>true</param-value>
        </init-param>
        <load-on-startup>0</load-on-startup>
    </servlet>
    <servlet-mapping>
        <servlet-name>AtmosphereServlet</servlet-name>
        <url-pattern>/b/*</url-pattern>
    </servlet-mapping>

气氛.xml

<atmosphere-handlers>
    <atmosphere-handler context-root="/b" class-name="org.atmosphere.handler.ReflectorServletProcessor">
        <property name="servletClass" value="com.sun.jersey.spi.container.servlet.ServletContainer"/>
    </atmosphere-handler>
</atmosphere-handlers>

广播者:


@Path("/product/status/{product}")
@Produces(MediaType.APPLICATION_JSON)
public class ProductEventBroadcaster
{
    @PathParam("product")
    private Broadcaster product;

    @GET
    public SuspendResponse subscribe()
    {
        return new SuspendResponse.SuspendResponseBuilder()
                .broadcaster(product)
                .build();
    }
}

UPDATE 10-1 4:18 GMT+1

  • 以下控制台输出显示 找到广播公司并且 是活跃的。
  • 我将broadcastToProduct更新为完整的类代码
  • 更新了问题的起始段落
  • 添加了web.xml和atmosphere.xml

控制台输出:


16:15:16,623 INFO  [STDOUT] 16:15:16,623 INFO  [ProductEventObserver] There are 3 broadcasters active
16:15:16,624 INFO  [STDOUT] 16:15:16,624 INFO  [ProductEventObserver] Broadcasting message of type 'productBidAdded' to '2' with scope 'APPLICATION'
16:15:47,580 INFO  [STDOUT] 16:15:47,580 INFO  [ProductEventObserver] There are 3 broadcasters active
16:15:47,581 INFO  [STDOUT] 16:15:47,581 INFO  [ProductEventObserver] Broadcasting message of type 'productBidAdded' to '1' with scope 'APPLICATION'

I am building an web app which uses Comet. The backend side is build with Atmosphere and Jersey. However I ran into trouble when i wanted to subscribe to multiple channels. The jQuery plugin atmosphere supplies only has support for 1 channel. I started to write my own implementation as for comet that is for the moment.

THE PROBLEM

If i update channel 1 with msg "Hello" i don't get notified. However when i update channel 2 with msg "World" afterwards. I get both "Hello" and "World" at the same time..


var connection1 = new AtmosphereConnectionComet("http://localhost/b/product/status/1");
var connection2 = new AtmosphereConnectionComet("http://localhost/b/product/status/2");

var handleMessage = function(msg)
{
   alert(msg);
};

connection1.NewMessage.add(handleMessage);
connection2.NewMessage.add(handleMessage);

connection1.connect();
connection2.connect();

The AtmosphereConnectionComet implementation:

UPDATED

  • Added the fixes of Ivo (scoping and instatantion issue)
  • Fixed the <--EOD--> indexOf to capture the atmosphere message
  • Added comments to the onIncoming XHR method

function AtmosphereConnectionComet(url)
{
    //signals for dispatching
    this.Connected = new signals.Signal();
    this.Disconnected = new signals.Signal();
    this.NewMessage = new signals.Signal();

    //private vars
    var xhr = null;
    var self = this;
    var gotWelcomeMessage = false;
    var readPosition;
    var url = url;

    //private methods
    var onIncomingXhr = function()
    {
        //check if we got some new data
        if (xhr.readyState == 3)
        {
            //if the status is oke
            if (xhr.status==200)  // Received a message
            {
                //get the message
                //this is like streaming.. each time we get readyState 3 and status 200 there will be text appended to xhr.responseText
                var message = xhr.responseText;

                console.log(message);

                //check if we dont have the welcome message yet and if its maybe there... (it doesn't come in one pull)
                if(!gotWelcomeMessage && message.indexOf("<--EOD-->") > -1)
                {
                    //we has it
                    gotWelcomeMessage = true;
                    //dispatch a signal
                    self.Connected.dispatch(sprintf("Connected to %s", url));
                }
                //welcome message set, from now on only messages (yes this will fail for larger date i presume)
                else
                {
                    //dispatch the new message by substr from the last readPosition
                    self.NewMessage.dispatch(message.substr(readPosition));
                }

                //update the readPosition to the size of this message
                readPosition = xhr.responseText.length;
            }
        }
        //ooh the connection got resumed, seems we got disconnected
        else if (xhr.readyState == 4)
        {
            //disconnect
            self.disconnect();
        }
    }

    var getXhr = function()
    {
        if ( window.location.protocol !== "file:" ) {
            try {
                return new window.XMLHttpRequest();
            } catch(xhrError) {}
        }

        try {
            return new window.ActiveXObject("Microsoft.XMLHTTP");
        } catch(activeError) {}
    }

    this.connect = function()
    {
        xhr = getXhr();
        xhr.onreadystatechange = onIncomingXhr;
        xhr.open("GET", url, true);
        xhr.send(null);
    }

    this.disconnect = function()
    {
        xhr.onreadystatechange = null;
        xhr.abort();
    }

    this.send = function(message)
    {

    }
}

UPDATE 9-1 23:00 GMT+1

It seems atmosphere doesn't output the stuff..

ProductEventObserver

This is a ProductEventObserver which observes SEAM events. This component is autocreated and is in the APPLICATION context of SEAM. It catches the events and uses the broadcastToProduct to get the right broadcaster (via the broadcasterfactory) and broadcast the json message (i used gson as json serializer/marshaller) to the supspended connections.


package nl.ambrero.botenveiling.managers.product;

import com.google.gson.Gson;
import nl.ambrero.botenveiling.entity.product.Product;
import nl.ambrero.botenveiling.entity.product.ProductBid;
import nl.ambrero.botenveiling.entity.product.ProductBidRetraction;
import nl.ambrero.botenveiling.entity.product.ProductRetraction;
import nl.ambrero.botenveiling.managers.EventTypes;
import nl.ambrero.botenveiling.rest.vo.*;
import org.atmosphere.cpr.Broadcaster;
import org.atmosphere.cpr.BroadcasterFactory;
import org.atmosphere.cpr.DefaultBroadcaster;
import org.jboss.seam.ScopeType;
import org.jboss.seam.annotations.*;
import org.jboss.seam.log.Log;

@Name("productEventObserver")
@Scope(ScopeType.APPLICATION)
@AutoCreate
public class ProductEventObserver
{
    @Logger
    Log logger;

    Gson gson;

    @Create
    public void init()
    {
        gson = new Gson();
    }

    private void broadCastToProduct(int id, ApplicationEvent message)
    {
        Broadcaster broadcaster = BroadcasterFactory.getDefault().lookup(DefaultBroadcaster.class, String.format("%s", id));

        logger.info(String.format("There are %s broadcasters active", BroadcasterFactory.getDefault().lookupAll().size()));

        if(broadcaster == null)
        {
            logger.info("No broadcaster found..");

            return;
        }

        logger.info(String.format("Broadcasting message of type '%s' to '%s' with scope '%s'", message.getEventType(), broadcaster.getID(), broadcaster.getScope().toString()));

        broadcaster.broadcast(gson.toJson(message));
    }

    @Observer(value = { EventTypes.PRODUCT_AUCTION_EXPIRED, EventTypes.PRODUCT_AUCTION_SOLD })
    public void handleProductAcutionEnded(Product product)
    {
        this.broadCastToProduct(
            product.getId(),
            new ProductEvent(ApplicationEventType.PRODUCT_AUCTION_ENDED, product)
        );
    }

    @Observer(value = EventTypes.PRODUCT_RETRACTED)
    public void handleProductRetracted(ProductRetraction productRetraction)
    {
        this.broadCastToProduct(
            productRetraction.getProduct().getId(),
            new ProductRetractionEvent(ApplicationEventType.PRODUCT_RETRACTED, productRetraction)
        );
    }

    @Observer(value = EventTypes.PRODUCT_AUCTION_STARTED)
    public void handleProductAuctionStarted(Product product)
    {
        this.broadCastToProduct(
            product.getId(),
            new ProductEvent(ApplicationEventType.PRODUCT_AUCTION_STARTED, product)
        );
    }

    @Observer(value = EventTypes.PRODUCT_BID_ADDED)
    public void handleProductNewBid(ProductBid bid)
    {
        this.broadCastToProduct(
            bid.getProduct().getId(),
            new ProductBidEvent(ApplicationEventType.PRODUCT_BID_ADDED, bid)
        );
    }

    @Observer(value = EventTypes.PRODUCT_BID_RETRACTED)
    public void handleProductRetractedBid(ProductBidRetraction bidRetraction)
    {
        this.broadCastToProduct(
            bidRetraction.getProductBid().getProduct().getId(),
            new ProductBidRetractionEvent(ApplicationEventType.PRODUCT_BID_RETRACTED, bidRetraction)
        );
    }
}

Web.xml

<servlet>
        <description>AtmosphereServlet</description>
        <servlet-name>AtmosphereServlet</servlet-name>
        <servlet-class>org.atmosphere.cpr.AtmosphereServlet</servlet-class>
        <init-param>
            <param-name>com.sun.jersey.config.property.packages</param-name>
            <param-value>nl.ambrero.botenveiling.rest</param-value>
        </init-param>
        <init-param>
            <param-name>org.atmosphere.useWebSocket</param-name>
            <param-value>true</param-value>
        </init-param>
        <init-param>
            <param-name>org.atmosphere.useNative</param-name>
            <param-value>true</param-value>
        </init-param>
        <load-on-startup>0</load-on-startup>
    </servlet>
    <servlet-mapping>
        <servlet-name>AtmosphereServlet</servlet-name>
        <url-pattern>/b/*</url-pattern>
    </servlet-mapping>

atmosphere.xml

<atmosphere-handlers>
    <atmosphere-handler context-root="/b" class-name="org.atmosphere.handler.ReflectorServletProcessor">
        <property name="servletClass" value="com.sun.jersey.spi.container.servlet.ServletContainer"/>
    </atmosphere-handler>
</atmosphere-handlers>

Broadcaster:


@Path("/product/status/{product}")
@Produces(MediaType.APPLICATION_JSON)
public class ProductEventBroadcaster
{
    @PathParam("product")
    private Broadcaster product;

    @GET
    public SuspendResponse subscribe()
    {
        return new SuspendResponse.SuspendResponseBuilder()
                .broadcaster(product)
                .build();
    }
}

UPDATE 10-1 4:18 GMT+1

  • The following console output shows
    that the broadcasters are found and
    are active.
  • I updated the broadcastToProduct to the full class code
  • Updated the start paragraph with the problem
  • Added web.xml and atmosphere.xml

Console output:


16:15:16,623 INFO  [STDOUT] 16:15:16,623 INFO  [ProductEventObserver] There are 3 broadcasters active
16:15:16,624 INFO  [STDOUT] 16:15:16,624 INFO  [ProductEventObserver] Broadcasting message of type 'productBidAdded' to '2' with scope 'APPLICATION'
16:15:47,580 INFO  [STDOUT] 16:15:47,580 INFO  [ProductEventObserver] There are 3 broadcasters active
16:15:47,581 INFO  [STDOUT] 16:15:47,581 INFO  [ProductEventObserver] Broadcasting message of type 'productBidAdded' to '1' with scope 'APPLICATION'

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

悲歌长辞 2024-10-18 00:31:55

我将我的项目发送到 Jfarcand。他发现我使用的 Atmosphere 0.6.3 包含线程池的一个错误。这不应该出现在 0.6.2 中。在 0.7-SNAPSHOT 中它也被修复了,但我认为他正在工作 0.6.4,该错误也被修复了。

I send my project to Jfarcand. He found out that Atmosphere 0.6.3 which I was using contained a bug with the ThreadPool. This should not be in 0.6.2. In 0.7-SNAPSHOT it was fixed aswell, bu I think he is working 0.6.4 where the bug is fixed aswell.

装纯掩盖桑 2024-10-18 00:31:54

产品的值:

@PathParam("product")
private Broadcaster product;

它是否与broadCastToProduct(int id,ApplicationEvent message)的id匹配?

给我发送一个我可以查看的测试用例(发布[电子邮件受保护] )。

The value of product:

@PathParam("product")
private Broadcaster product;

Does it match the id of the broadCastToProduct(int id, ApplicationEvent message)?

Send me a test case I can look at (post it [email protected]).

浪漫之都 2024-10-18 00:31:54

实际上,您发布的代码根本不应该工作,因为 AtmosphereConnectionComet 不会创建新对象。

function AtmosphereConnectionComet(url)
{
    this.Connected = new signals.Signal();
    this.Disconnected = new signals.Signal();
    this.NewMessage = new signals.Signal();

这应该是一个构造函数,但您不能这样调用它:

var connection1 = AtmosphereConnectionComet(...);

您必须使用 new 关键字,因此它将像构造函数一样工作,否则 this AtmosphereConnectionComet 内部不会引用 new 对象,但它将引用 window 对象(!)。

 var connection1 = new AtmosphereConnectionComet(...);

现在,在第二次调用覆盖旧内容之前,您确实必须建立不同的连接。

看看构造函数this 在 JavaScript 中工作。

更多问题

        readPosition = this.responseText.length;
    }
} 
else if (this.readyState == 4)

那些this应该是xhr,虽然它们会起作用,因为该函数是在请求的上下文中被调用的,为了清楚起见,您应该坚持使用 thisxhr

更新

另一个错误。

  else
    {
        self.NewMessage.dispatch(message.substr(readPosition));
    }

    // this should be before the above if statement
    readPosition = xhr.responseText.length;

Actually, the code you posted shouldn't work at all since AtmosphereConnectionComet does not create new objects.

function AtmosphereConnectionComet(url)
{
    this.Connected = new signals.Signal();
    this.Disconnected = new signals.Signal();
    this.NewMessage = new signals.Signal();

This is supposed to be a constructor, but you're not calling it as such:

var connection1 = AtmosphereConnectionComet(...);

You have to use the new keyword, so it will work like a constructor, otherwise this inside AtmosphereConnectionComet will not refer to a new object, but it will refer to the window object(!).

 var connection1 = new AtmosphereConnectionComet(...);

Now you will really have to distinct connections, before the second call did just overwrite the old stuff.

Have a look at how Constructors and this works in JavaScript.

More problems

        readPosition = this.responseText.length;
    }
} 
else if (this.readyState == 4)

Those this should rather be xhr, while they will work, due to the fact that the function gets called in the context of the request, for clarity you should stick to either this or xhr.

Update

Another Bug.

  else
    {
        self.NewMessage.dispatch(message.substr(readPosition));
    }

    // this should be before the above if statement
    readPosition = xhr.responseText.length;
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文