カテゴリー「java」

javaのweb socketで、decoderとpathparamはある条件において不可能なようなだ。

 

以下の場合はダメ。

@ServerEndpoint(
  value = "/ws/{p1}/{p2}/"
  decoders = {HogeDecoder.class},
  encoders = {HogeEncoder.class})
public class HogeWebsocket {
  /**
   * open hander.
   */
  @OnOpen
  public void onOpen(@PathParam("p1") String p1, Session session, EndpointConfig config) {
    :
    :  
  }
}

 

これだと、p1がdecoderの対象になるみたい。

そのため、次のような方法で対処する。

 

@ServerEndpoint(
  value = "/ws/{p1}/{p2}/"
  decoders = {HogeDecoder.class},
  encoders = {HogeEncoder.class})
public class HogeWebsocket {
  /**
   * open hander.
   */
  @OnOpen
  public void onOpen(Session session, EndpointConfig config) {
    Map<String, String> pathParameters = session.getPathParameters();
    String p1 = pathParameters.get("p1");
    :  
  }
}

 

本件は、decoderでバイナリからテキストに変換しようとした際に起きた現象。

なので、decoderが何をするかによるとは思うが。。

javaのページを見たが、それらしき記述はなかった。。

 

投稿日時:2015年12月21日 18:38   カテゴリー:java, websocket   [コメントがあればどうぞ]

redisの冗長化を行うためには、

  • master <-> slave構成
  • master <-> slave構成 + sentinel
  • cluster

がある。

clusterはredis3より正式サポートされた機能である。

 

特徴としては、以下の通り。

 

master <-> slave構成だと、

masterが倒れたときのフェイルオーバが皆無である。

 

master <-> slave構成 + sentinelだと、

masterが倒れたときに、slaveが自動昇格できる。

ただ、slave x 2以上、sentinel x 3以上が望ましい感じがする。

 

cluster構成だと、

分散でデータを保持しているため、

slaveも同時に使い、自動昇格させる必要がある。(slaveがあれば自動昇格する)

master x 3以上、slave x 3以上にする必要がある。

 

と考えると、clusterが一番な気がするが、

clusterの欠点は以下の通り。

  1. selectはない
  2. multi – execができない?
  3. 対応しているクライアントライブラリが少ない
  4. リダイレクトが発生するため、速度が遅くなる?
  5. クラスター再構築が難しめ

などなど。

とくに、クライアントライブラリが少ないのは気がかりである。

python, rubyではあるらしい。

javaでもlettuceが対応している。

javaのコードは以下の通り。

List<RedisURI> list = new ArrayList<>();
list.add(new RedisURI("192.168.1.45", 16381, 1, TimeUnit.SECONDS));
list.add(new RedisURI("192.168.1.45", 16382, 1, TimeUnit.SECONDS));
list.add(new RedisURI("192.168.1.45", 16383, 1, TimeUnit.SECONDS));
list.add(new RedisURI("192.168.1.45", 16384, 1, TimeUnit.SECONDS));
list.add(new RedisURI("192.168.1.45", 16385, 1, TimeUnit.SECONDS));
list.add(new RedisURI("192.168.1.45", 16386, 1, TimeUnit.SECONDS));

RedisClusterClient client = RedisClusterClient.create(new Iterable<RedisURI>() {
  @Override
  public Iterator<RedisURI> iterator() {
    return list.iterator();
  }
});

AsyncExecutions<String> excutions = null;

RedisAdvancedClusterAsyncCommands<String, String> con = client.connect().async();

AsyncNodeSelection<String, String> masters = con.masters();

excutions = masters.commands().set("hoge", "fuga");

excutions.forEach(result -> result.thenAccept(ret -> System.out.println(ret)));
excutions = masters.commands().get("hoge");

excutions.forEach(result -> result.thenAccept(ret -> System.out.println(ret)));
con.close();
client.shutdown();

 

しかし、multi – execができないのは結構痛い。。。

うまいことやればできるのかな。。

ただ、分散してしまうから、無理なきがする。

 

このlettuceっていうライブラリは良さげ。

nettyをベースに使っていて、

ノンブロッキングをサポートしているしね。

 

投稿日時:2015年12月10日 13:06   カテゴリー:java, redis   [コメントがあればどうぞ]

昨日書いた記事を検証してみた。

 

以下3つの例を検証。

 

1.単純ケース

@WebServlet(name = "test01", urlPatterns = {"/test01"})
public class Test01 extends HttpServlet {

  @Override
  protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
    String name = req.getParameter("name");

    try (PrintWriter writer = resp.getWriter();) {
      resp.setStatus(200);
      resp.setContentType("text/plain");
      writer.write("name is " + name);
      writer.flush();
      writer.close();
    }
  }
}

 

2.絶対ダメなケース

@WebServlet(name = "test02", urlPatterns = {"/test02"})
public class Test02 extends HttpServlet {

  private String name;

  @Override
  protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
    name = req.getParameter("name");

    try (PrintWriter writer = resp.getWriter();) {
      resp.setStatus(200);
      resp.setContentType("text/plain");
      writer.write("name is " + name);
      writer.flush();
      writer.close();
    }
  }
}

 

3.今回検証したかったケース

@WebServlet(name = "test03", urlPatterns = {"/test03"})
public class Test03 extends HttpServlet {

  @Override
  protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
    new MyClass(req, resp).execute();
  }

  class MyClass {

    private HttpServletRequest req;
    private HttpServletResponse resp;
    private String name;

    public MyClass(HttpServletRequest req, HttpServletResponse resp) {
      this.req = req;
      this.resp = resp;

      name = req.getParameter("name");
    }

    public void execute() throws IOException {
      try (PrintWriter writer = resp.getWriter();) {
        resp.setStatus(200);
        resp.setContentType("text/plain");
        writer.write("name is " + name);
        writer.flush();
        writer.close();
      }
    }
  }
}

 

結論としては、3は大丈夫であった。

jmeterで同時接続100を10回やって、一度も不整合は起きず。

 

投稿日時:2015年11月27日 14:52   カテゴリー:java   [コメントがあればどうぞ]

servletはエントリーポイントのインスタンスは1つしか作成しない。

そして、それが複数スレッドで共有される。

そのため、インスタンスフィールドはスレッドセーフにならない。

 

・スレッドセーフではない例

public class TestServlet extends HttpServlet {
  private String name;

  @Override
  protected void doGet(HttpServletRequest req, HttpServletResponse resp)
      throws ServletException, IOException {
    name = req.getParameter("name");

    PrintWriter writer = res.getWriter();
    writer.print(name);
  }
}

 

で、ここからがわからんところなのだが、

ローカル変数はスレッドセーフになるのだから、

doGetの処理の中で、以下のようなことをしたら、問題ないのか?

public class TestServlet extends HttpServlet {
  @Override
  protected void doGet(HttpServletRequest req, HttpServletResponse resp)
      throws ServletException, IOException {
    new MyClass(req, res).execute();
  }
}

public class MyClass {
  private HttpServletRequest req;
  private HttpServletResponse res;

  public MyClass(HttpServletRequest req, HttpServletResponse res) {
    this.req = req;
    this.res = res;
  }

  public void execute() {
    String name = req.getParameter("name");

    PrintWriter writer = res.getWriter();
    writer.print(name);
  } 
} 

 

もし、上記でスレッドセーフになるっていうなら、

それでよくね?って思っちゃう。

 

それとも、MyClassのインスタンスフィールドは実はスレッドセーフではないのか?

 

メモリ使用量としては、生成するインスタンス分大きいのはわかるが、

もし上記で解決するなら、煩わしいスレッド問題とはオサラバできると思うのだが、

問題あるのだろうか?

 

servletはフレームワークの世界だから、厳密にわかる人は少ないのだろうけど、

ここは教えて欲しい。。。

 

・・・後日

上の例は大丈夫だった。

新しく記事を書きました。

投稿日時:2015年11月26日 19:58   カテゴリー:java   [コメントがあればどうぞ]

ラムダ式が導入されて(java8)結構たちましたが、

あまりやる気がしなかったが、

重い腰を上げて、おれおれフレームワークに少しづつ適用中。

 

いろいろ例はあるが、やはりコレクションには適用しやすい。

二元ループをするケースで結構はまったので、

メモを残しておく。

 

(例)URLのBodyパラメータを手作業で分解する場合

Bodyパラメータはこんな感じを想定

hoge=1&fuga=2&piyo[]=1&piyo[]=2

 

ここではInputStreamから抜かないで、一度分解したのち、文字列に戻すということをやる。

これを従来の処理でやると以下になる。

Map<String, List<String>> paramsBody = new LinkedHashMap<String, List<String>>(); 
StringBuffer buffer = new StringBuffer(); 

// サーブレットリクエストからパラメータの情報をとる 
Map<String, String[]> params = servletRequest.getParameterMap(); 

// 回しながら、List型に変換して、さらに文字列を構築する 
String sep = ""; 
for (Iterator<Entry<String, String[]>> iterator = params.entrySet().iterator(); iterator.hasNext();) { 
  Map.Entry<String, String[]> entry = (Map.Entry<String, String[]>) iterator.next(); 
  String name = entry.getKey(); 
  String[] values = entry.getValue(); 
  if (!paramsBody.containsKey(name)) { 
    paramsBody.put(name, new ArrayList<String>(Arrays.asList(values))); 
  } 
  for (String value : values) { 
    buffer.append(sep).append(name).append("=").append(WsUrl.decode(value, encoding)); 
    sep = "&"; 
  } 
  body = buffer.toString(); 
}

 

おなじことをラムダ式をつかうと、

Map<String, List<String>> paramsBody = new LinkedHashMap<String, List<String>>();
StringBuffer buffer = new StringBuffer(); 

// サーブレットリクエストからパラメータの情報をとる 
Map<String, String[]> params = servletRequest.getParameterMap(); 

// 回しながら、List型に変換して、さらに文字列を構築する 
String paramString = params.entrySet() 
    .stream() 
    .map(entry -> { 
      String name = entry.getKey(); 
      return Arrays.stream(entry.getValue()) 
         .map(value -> String.format("%s=%s", name, WsUrl.decode(value, encoding))) 
         .collect(Collectors.joining("&")); 
    })
    .collect(Collectors.joining("&")); 
buffer.append(paramString); 
body = buffer.toString();

 

ここまでくるのに結構大変だった。。

投稿日時:2015年10月29日 16:28   カテゴリー:java   [コメントがあればどうぞ]

javaでwebsocketクライアントをやろうとしたら、

websocket-apiライブラリでは動かなかった。

なんでも、meta-infのserviceのところに何かを配備する必要があるらしい。

 

そこで、いろいろ調べたら、glassfishのライブラリを導入すれば、

一発で済むことがわかった。

 

pom.xmlは以下を書くだけ。

      <dependency>
        <groupId>org.glassfish.tyrus.bundles</groupId>
        <artifactId>tyrus-standalone-client</artifactId>
        <version>1.11</version>
      </dependency>

 

あとは、

WebSocketContainer cont = ContainerProvider.getWebSocketContainer();
URI uri = URI.create("ハンドシェイクのURL");
cont.connectToServer("websocketのインスタンス", uri);

で接続できる。

 

検証したのは、JDK8だが、JDK7でも大丈夫でしょう。

 

以上

投稿日時:2015年08月18日 09:29   カテゴリー:java, websocket   [コメントがあればどうぞ]

実行可能warをmavenで作る場合、

warパッケージの直下にmainファイルを配備する必要がある。

 

しかし、warアプリケーションをpackageすると、

WEB-INF/classes以下にmainファイルも配備されてしまう。

 

そのため、maven3では、

antrunプラグインを用いて、mainファイルを移動する必要がある。

 

pom.xmlはこんな感じかな。

      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-antrun-plugin</artifactId>
        <executions>
          <execution>
            <id>move-main</id>
            <phase>prepare-package</phase>
            <configuration>
              <tasks>
                <move todir="target/${project.build.finalName}">
                  <fileset dir="target/classes">
                    <include name="app/AppMain.class" />
                  </fileset>
                </move>
              </tasks>
            </configuration>
            <goals>
              <goal>run</goal>
            </goals>
          </execution>
        </executions>
      </plugin>

 

これにより、warファイル直下に実行可能クラスが配備される。

 

また、jettyなどのembedを組み込む場合、

一緒に依存関係も持って行ってやる必要がある。

こんな感じ。

 

      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-dependency-plugin</artifactId>
        <executions>
          <execution>
            <id>jetty-classpath</id>
            <phase>prepare-package</phase>
            <goals>
              <goal>unpack-dependencies</goal>
            </goals>
            <configuration>
              <includeGroupIds>org.eclipse.jetty,javax.servlet,javax.websocket</includeGroupIds>
              <excludes>META-INF/ECLIPSEF.*</excludes>
              <outputDirectory>${project.build.directory}/${project.build.finalName}</outputDirectory>
            </configuration>
          </execution>
        </executions>
      </plugin>

 

そして、mainファイルはこんな感じ。

package app;

import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.webapp.WebAppContext;
import org.eclipse.jetty.websocket.jsr356.server.deploy.WebSocketServerContainerInitializer;

import java.net.URL;

public class AppMain {

  public static void main(String[] args) throws Exception {
    // 初期設定
    int port = 8080;
    System.setProperty("prefix", "/");

    // サーバ生成
    Server server = new Server(port);

    // warを読み込むコンテクスト生成
    URL warUrl = AppMain.class.getProtectionDomain().getCodeSource().getLocation();
    String warLocation = warUrl.toExternalForm();
    WebAppContext context = new WebAppContext();
    context.setWar(warLocation);
    context.setContextPath("/");

    // コンテクストにサーバをセットする
    // ※重要
    context.setServer(server);

    // websocketを使えるようにする
    WebSocketServerContainerInitializer.configureContext(context);

    // 開始
    server.setHandler(context);
    server.start();
    server.join();
  }
}

 

上記が非常に重要。

これで、websocketのendpointをServletContextListener側で指定してやるとうまくいく。

 

package app;

import websocket.WebsocketIndex;

import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
import javax.websocket.DeploymentException;
import javax.websocket.server.ServerContainer;

public class AppInitializeListener implements ServletContextListener {

  @Override
  public void contextInitialized(ServletContextEvent sce) {
    // embed用のwebsocketの追加
    try {
      ServerContainer wsContainer = (ServerContainer) sce.getServletContext().getAttribute(ServerContainer.class.getName());
      System.out.println(String.format("wsContainer => %s", wsContainer));
      if (wsContainer != null) {
        wsContainer.addEndpoint(WebsocketIndex.class);
      }
    } catch (DeploymentException e) {
      e.printStackTrace();
    }
  }

  @Override
  public void contextDestroyed(ServletContextEvent sce) {
  }
}

 

実行可能jarでwebsocketを作成する場合、

mainクラス内でendpointを設定してやればよいのだが、

実行可能warの場合、

mainクラスが実際のwebsocketのendpointのクラスの配備位置が異なるため、

クラスのロードが出来ない状態となる。

そのため、上記のような方法で、動的に指定してやることで、

実行可能warにおいて、websocketが利用できる。

 

【参考】

・実行可能warの生成の詳細がかかれています。

http://qiita.com/k_ui/items/1d3bbbd7993c4c9adf71

・実行可能jarでのwebsocketの設定方法が書かれています。

https://github.com/jetty-project/embedded-jetty-websocket-examples/blob/master/javax.websocket-example/src/main/java/org/eclipse/jetty/demo/EventServer.java

 

以上

 

投稿日時:2015年05月15日 09:19   カテゴリー:java, websocket   [コメントがあればどうぞ]

まとめをしてみます。

  1. 第1回
  2. 第2回
  3. 第3回
  4. 第4回
  5. 第5回
  6. 第6回
  7. 第7回

という感じにまとまりました。

結局java側だけしかかけなったけど、

javaでは排他制御をかける(synchronizedブロック)がほんとに重要。

 

nodeはシングルスレッドだから、

排他制御は基本的には不要。

 

その他、大事なトピックとしては、

  • クライアントに送信しようとしても送信できないケースがある(たぶん避けらない)
  • バックエンドでDBを使う場合は、基本的にはつなぎっぱなし(ただし、RDBとRedisではちょっと扱いが違うので注意)

などでしょうか。

 

最後にスケールアウトについて。

単純にスケールアウトする場合、pub/subを使うしか手がない。

pub/subができるのは、

  • redis
  • ActiveMQ
  • Java Message Service

などがあるが、pub/subがあるとないとでは、

実装がかなり異なる。

重要なのは、

「sub」の接続を先に確立して、

処理を行い、

対象者に「pub」して、

「sub」でメッセージ送信を行う

という流れである。

 

pub/subを行うと、

データ自体もメモリに持てなくなるので、

十分注意しながらやっていただければと思います。

 

以上で、websocketの話はほんとにおしまい。

 

投稿日時:2015年04月02日 18:36   カテゴリー:java, node.js, websocket   [コメントがあればどうぞ]

ラウンド編。

また、websocketの処理だけ書く。

package trmpj.ws;

import java.util.concurrent.ConcurrentHashMap;

import javax.websocket.CloseReason;
import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;

import trmpj.TPair;
import trmpj.TReq;
import trmpj.TSession;
import trmpj.TWs;
import trmpj.req.TReqEnd;
import trmpj.req.TReqGame;
import trmpj.req.TReqPrepared;
import trmpj.res.TResEnded;
import trmpj.res.TResError;
import trmpj.res.TResGame;
import trmpj.res.TResInvalid;
import trmpj.res.TResNone;
import trmpj.res.TResStart;
import trmpj.res.TResWait;

/**
 * 対戦用処理
 *
 */
@ServerEndpoint(value = "/ws/round/{pairId}")
public class TWsRound extends TWs {

    /**
     * pairId => TPairのマップ
     */
    private static ConcurrentHashMap<String, TPair> pairs = new ConcurrentHashMap<String, TPair>();

    /**
     * pairId => Threadのマップ
     */
    private static ConcurrentHashMap<String, Thread> threads = new ConcurrentHashMap<String, Thread>();

    /**
     * openハンドラ
     * @param pairId
     * @param session
     */
    @OnOpen
    public void onOpen(@PathParam("pairId") String pairId, Session session) {
        // セッションにアクセスIDを格納する
        String accessdId = generateAccessId();
        session.getUserProperties().put(KEY_ACCESS_ID, accessdId);

        // アクセスIDを、onMessage, onCloseで取得するときの関連を付ける
        // ※sessionsのキー側はsessionでもよいのですが、
        // runnableからの参照時に、セッションが破棄されていると困るので、
        // あえて文字列にしている
        TSession ts = new TSession(session);
        sessions.putIfAbsent(accessdId, ts);
    }

    /**
     * messageハンドラ
     * @param pairId
     * @param session
     * @param message
     */
    @OnMessage
    public void onMessage(@PathParam("pairId") String pairId, Session session, String message) {
        // アクセスIDを取得
        String accessdId = (String) session.getUserProperties().get(KEY_ACCESS_ID);
        TSession ts = null;

        try {
            // アクセスIDをもとに、openハンドラで登録したTSessionを呼び出す
            if (!sessions.containsKey(accessdId)) {
                throw new Exception();
            }
            ts = sessions.get(accessdId);

            // リクエストを解析
            TReq req = ts.receiveMessage(message, TReq.class);
            if (req == null) {
                throw new Exception();
            }

            // ペアIDに応じたペア情報をマップから取り出す
            if (!pairs.containsKey(pairId)) {
                pairs.putIfAbsent(pairId, new TPair(pairId));
            }
            TPair pair = pairs.get(pairId);

            // ペア情報に排他制御をかける
            // ※重要
            synchronized (pair) {
                if (req.command.equals(TReq.COMMAND_PREPARED)) {
                    // preparedコマンド
                    TReqPrepared reqPrepared = ts.receiveMessage(message, TReqPrepared.class);
                    if (reqPrepared == null) {
                        throw new Exception();
                    }

                    // 既にペアが揃った状態ならここで終了
                    if (pair.isPrepared()) {
                        throw new Exception();
                    }

                    // 自分の情報をセッションに追加
                    ts.setUser(reqPrepared.user);

                    // ペア情報に自分の情報を追加
                    pair.addTSession(ts);

                    // 同一ユーザIDはエラー
                    if (pair.isDuplicated()) {
                        throw new Exception();
                    }

                    // 2人そろった状態なら、2人にstartを返す
                    if (pair.isPrepared()) {
                        // スレッドを止める
                        threads.remove(pairId);

                        TResStart resStart = new TResStart();
                        resStart.users.addAll(pair.getUsers());
                        pair.sendMessage(resStart);
                        return;
                    }

                    // 揃っていない状態なので、自分にwaitを返す
                    TResWait resWait = new TResWait();
                    ts.sendMessage(resWait);

                    // 別スレッドで、相手がくるまで数秒待機する
                    if (!threads.containsKey(pairId)) {
                        threads.putIfAbsent(pairId, new Thread(new TRunnable(pairId)));
                        threads.get(pair).start();
                    }
                    return;

                } else if (req.command.equals(TReq.COMMAND_GAME)) {
                    // gameコマンド
                    TReqGame reqGame = ts.receiveMessage(message, TReqGame.class);
                    if (reqGame == null) {
                        throw new Exception();
                    }

                    // 同一ユーザIDはエラー
                    if (pair.isDuplicated()) {
                        throw new Exception();
                    }

                    // 揃った状態でない、または終了状態なら何もしない
                    if (!pair.isPrepared() || pair.isEnded()) {
                        throw new Exception();
                    }

                    // 2人に返す
                    TResGame resGame = new TResGame();
                    resGame.useId = ts.getUser().userId;
                    resGame.data = reqGame.data;
                    pair.sendMessage(resGame);
                    return;

                } else if (req.command.equals(TReq.COMMAND_END)) {
                    // endコマンド
                    TReqEnd reqEnd = ts.receiveMessage(message, TReqEnd.class);
                    if (reqEnd == null) {
                        throw new Exception();
                    }

                    // 同一ユーザIDはエラー
                    if (pair.isDuplicated()) {
                        throw new Exception();
                    }

                    // ペアを終了状態にする
                    pair.setEnded(true);

                    // 自分に返す
                    TResEnded resEnded = new TResEnded();
                    ts.sendMessage(resEnded);
                    return;

                }
            }

        } catch (Exception e) {
            e.printStackTrace();
        }

        // エラー処理
        if (ts != null) {
            TResError resError = new TResError();
            ts.sendMessage(resError);
        }
    }

    /**
     * closeハンドラ
     * @param pairId
     * @param session
     * @param reason
     */
    @OnClose
    public void onClose(@PathParam("pairId") String pairId, Session session, CloseReason reason) {
        // アクセスIDを取得
        String accessdId = (String) session.getUserProperties().get(KEY_ACCESS_ID);
        TSession ts = null;

        try {
            // アクセスIDをもとに、openハンドラで登録したTSessionを呼び出す
            if (!sessions.containsKey(accessdId)) {
                throw new Exception();
            }
            ts = sessions.get(accessdId);

            // ペアIDに応じたペア情報をマップから取り出す
            if (!pairs.containsKey(pairId)) {
                throw new Exception();
            }
            TPair pair = pairs.get(pairId);

            // ペア情報に排他制御をかける
            // ※重要
            synchronized (pair) {
                // 自分の情報削除
                pair.removeTSession(ts);

                // 空になっていたら削除
                if (pair.isEmpty()) {
                    pairs.remove(pairId);
                    return;
                }

                // game中で、endでない場合は、残っている方にnoneを送信
                if (pair.isPrepared() && !pair.isEnded()) {
                    TResNone resNone = new TResNone();
                    pair.sendMessage(resNone);
                    return;
                }
            }

        } catch (Exception e) {
            e.printStackTrace();
        }

        // セッション情報も破棄する
        sessions.remove(accessdId);
    }

    /**
     * 2人が揃うまで待つ別スレッド
     * @author furuta
     *
     */
    public class TRunnable implements Runnable {

        /**
         * ペアID
         */
        private String pairId;

        /**
         * コンストラクタ
         * @param pairId
         */
        public TRunnable(String pairId) {
            this.pairId = pairId;
        }

        /**
         * 処理
         */
        public void run() {
            try {
                // 10秒まつ
                Thread.sleep(10000);

                // ペアを取得
                if (!pairs.containsKey(pairId)) {
                    throw new Exception();
                }
                TPair pair = pairs.get(pairId);

                synchronized (pair) {
                    if (!pair.isPrepared()) {
                        // 揃っていない状態であれば、残っている人にinvalidを返す
                        TResInvalid resInvalid = new TResInvalid();
                        pair.sendMessage(resInvalid);
                    }
                }

            } catch (Exception e) {
                e.printStackTrace();
            }

            // スレッドを削除
            threads.remove(pairId);
        }
    }
}

ポイントはsynchronizedでしっかり排他制御をかけること。
これをやらないとだめだぜ。。

ソースコードはちゃんとテストしてないけど、
処理の流れはわかるはず。。

とりあえず、作成したコードをzipで固めておきます。

ダウンロード

 

これで、websocketは終了。

nodeは書く気力なし。。

投稿日時:2015年04月02日 18:23   カテゴリー:java, node.js, websocket   [コメントがあればどうぞ]

やっとかく。

とりあえず、websocketのメイン処理をさらす。

 

マッチング

package trmpj.ws;

import java.util.LinkedList;
import java.util.concurrent.ConcurrentHashMap;

import javax.websocket.CloseReason;
import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;

import trmpj.TReq;
import trmpj.TSession;
import trmpj.TWs;
import trmpj.req.TReqEntry;
import trmpj.res.TResEntried;
import trmpj.res.TResError;
import trmpj.res.TResPrepare;

/**
 * マッチング用処理
 *
 */
@ServerEndpoint(value = "/ws/matching/{type}")
public class TWsMatching extends TWs {

    /**
     * type => Set<TSession>のマップ
     */
    private static ConcurrentHashMap<Integer, LinkedList<TSession>> entries = new ConcurrentHashMap<Integer, LinkedList<TSession>>();

    /**
     * openハンドラ
     * @param type
     * @param session
     */
    @OnOpen
    public void onOpen(@PathParam("type") int type, Session session) {
        // セッションにアクセスIDを格納する
        String accessdId = generateAccessId();
        session.getUserProperties().put(KEY_ACCESS_ID, accessdId);

        // アクセスIDを、onMessage, onCloseで取得するときの関連を付ける
        // ※sessionsのキー側はsessionでもよいのですが、
        // runnableからの参照時に、セッションが破棄されていると困るので、
        // あえて文字列にしている
        TSession ts = new TSession(session);
        sessions.putIfAbsent(accessdId, ts);
    }

    /**
     * messageハンドラ
     * @param type
     * @param session
     * @param message
     */
    @OnMessage
    public void onMessage(@PathParam("type") int type, Session session, String message) {
        // アクセスIDを取得
        String accessdId = (String) session.getUserProperties().get(KEY_ACCESS_ID);
        TSession ts = null;

        try {
            // アクセスIDをもとに、openハンドラで登録したTSessionを呼び出す
            if (!sessions.containsKey(accessdId)) {
                throw new Exception();
            }
            ts = sessions.get(accessdId);

            // リクエストを解析
            TReq req = ts.receiveMessage(message, TReq.class);
            if (req == null) {
                throw new Exception();
            }

            // タイプに応じたエントリー情報をマップから取り出す
            if (!entries.containsKey(type)) {
                entries.putIfAbsent(type, new LinkedList<TSession>());
            }
            LinkedList<TSession> es = entries.get(type);

            // タイプごとのエントリー情報に排他制御をかける
            // ※重要
            synchronized (es) {
                if (req.command.equals(TReq.COMMAND_ENTRY)) {
                    // entryコマンド
                    TReqEntry reqEntry = ts.receiveMessage(message, TReqEntry.class);
                    if (reqEntry == null) {
                        throw new Exception();
                    }

                    // 自分の情報をセッションに追加
                    ts.setUser(reqEntry.user);

                    try {
                        // 先にエントリーしている人の情報をとる
                        TSession tsPrev = es.pop();

                        // 情報をとった結果、同じユーザIDの場合は、エラーとする
                        if (tsPrev.getUser().userId.equals(ts.getUser().userId)) {
                            throw new Exception();
                        }

                        // エントリーしている人の情報と、自分の情報を、マッチング成立として両者に返す
                        TResPrepare resPrepare = new TResPrepare();
                        resPrepare.type = type;
                        resPrepare.seed = (int) (Math.random() * 10000) + 1;
                        resPrepare.users.add(ts.getUser());
                        resPrepare.users.add(tsPrev.getUser());

                        ts.sendMessage(resPrepare);
                        tsPrev.sendMessage(resPrepare);
                        return;

                    } catch (Exception e) {
                        // ここはpopで例外が発生するが、無視する
                    }

                    // 自分を加える
                    es.add(ts);

                    // 自分にエントリー完了を送信する
                    TResEntried resEntried = new TResEntried();
                    ts.sendMessage(resEntried);
                    return;

                }
            }

        } catch (Exception e) {
            e.printStackTrace();
        }

        // エラー処理
        if (ts != null) {
            TResError resError = new TResError();
            ts.sendMessage(resError);
        }
    }

    /**
     * closeハンドラ
     * @param type
     * @param session
     * @param reason
     */
    @OnClose
    public void onClose(@PathParam("type") int type, Session session, CloseReason reason) {
        // アクセスIDを取得
        String accessdId = (String) session.getUserProperties().get(KEY_ACCESS_ID);
        TSession ts = null;

        try {
            // アクセスIDをもとに、openハンドラで登録したTSessionを呼び出す
            if (!sessions.containsKey(accessdId)) {
                throw new Exception();
            }
            ts = sessions.get(accessdId);

            // 自分が既に登録されていたら、削除する
            if (!entries.containsKey(type)) {
                throw new Exception();
            }
            LinkedList<TSession> es = entries.get(type);
            es.remove(ts);

        } catch (Exception e) {
            e.printStackTrace();
        }

        // セッション情報も破棄する
        sessions.remove(accessdId);
    }
}

最大のポイントは、
onMessageにおけるsynchronizedブロックである。

これでちゃんと排他制御をかけないと、

エラーになるぜ。。

 

ながいので、ラウンド編は次回に。

 

投稿日時:2015年04月02日 18:11   カテゴリー:java, node.js, websocket   [コメントがあればどうぞ]