diff --git a/.gitignore b/.gitignore index e81ea6b..9460bc7 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,5 @@ tmp/* .idea *.iml checkmate-dist +/target/ +/logs/ diff --git a/README.md b/README.md index 20cd6eb..9430bb8 100644 --- a/README.md +++ b/README.md @@ -1,15 +1,24 @@ -# checkmate-java +checkmate-java +========== -CheckMate is a cluster monitoring system. +# 프로젝트 설명 -currently there is a nodejs version of checkmate which monitors impala cluster. -due to many problems it's facing, the author decided to move to java. +- QueryCache를 한 눈에 모니터링할 수 있는 데시보드 -first supported system is QueryCache( https://github.com/izlley/querycache ) -impala will be moved to this project soon. +# 빌드 방법 -the author is planning to support more hadoop based sql engines. +```bash +$ ./mkdist.sh +$ ls checkmate-dist.tgz +``` -features planned: - 1. graphic chart showing various usage metrics - 2. authentication + +# 설치 방법 + +``` +$ upload checkmate-dist.tgz to hdfs@DICc-m002:/app/home/hdfs/admin/deploy_new/dipkg/checkmate +$ ./deploy_checkmate.sh server +``` + +# 위키 링크 +* [CheckMate](http://wiki.skplanet.com/display/DIT/CheckMate) diff --git a/bin/start-checkmate.sh b/bin/start-checkmate.sh new file mode 100755 index 0000000..2b34afe --- /dev/null +++ b/bin/start-checkmate.sh @@ -0,0 +1,37 @@ +#!/bin/bash + +bin=`dirname "$0"` +bin=`cd "${bin}/.."; pwd` + +export CM_HOME="${bin}" + +CM_LOG_DIR=${CM_HOME}/logs +CM_CONF_DIR=${CM_HOME}/conf +CM_PID_DIR=${CM_HOME}/pid + +if [ ! -d ${CM_LOG_DIR} ]; then + mkdir ${CM_LOG_DIR} + if [ "$?" != "0" ]; then + echo "ERROR mkdir ${CM_LOG_DIR}" + exit 1 + fi +fi +if [ ! -d ${CM_PID_DIR} ]; then + mkdir ${CM_PID_DIR} + if [ "$?" != "0" ]; then + echo "ERROR mkdir ${CM_PID_DIR}" + exit 1 + fi +fi +START_TIME=`date "+%Y%m%d_%H%M%S"` + +CLASSPATH=${CM_CONF_DIR}:${CM_HOME}/lib/*:${CLASSPATH} + +if [ "${JVM_ARGS}" == "" ]; then + JVM_ARGS="-enableassertions -enablesystemassertions -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:+CMSParallelRemarkEnabled -XX:CMSInitiatingOccupancyFraction=70 -XX:+UseCMSInitiatingOccupancyOnly -Xms256m -Xmx512m" +fi +JVM_ARGS="${JVM_ARGS} -DCM_HOME=${CM_HOME}" +DEBUG_ARGS="-XX:+PrintGCDetails -XX:+PrintGCTimeStamps -verbose:gc -Xloggc:${CM_LOG_DIR}/checkmate_${START_TIME}.gc" + +nohup java $JVM_ARGS -cp "$CLASSPATH" com.skplanet.checkmate.CheckMateServer "$@" > ${CM_LOG_DIR}/checkmate_${START_TIME}.out 2>&1 & +echo $! > ${CM_PID_DIR}/checkmate.pid diff --git a/bin/stop-checkmate.sh b/bin/stop-checkmate.sh new file mode 100755 index 0000000..b5db44a --- /dev/null +++ b/bin/stop-checkmate.sh @@ -0,0 +1,15 @@ +#!/bin/bash + +bin=`dirname "$0"` +bin=`cd "${bin}/.."; pwd` + +export CM_HOME="${bin}" + +CM_PID_DIR=${CM_HOME}/pid + +if [ -f ${CM_PID_DIR}/checkmate.pid ]; then + pid=`cat ${CM_PID_DIR}/checkmate.pid` + echo "kill -9 ${pid}" + kill -9 "${pid}" + rm ${CM_PID_DIR}/checkmate.pid +fi diff --git a/conf/.gitignore b/conf/.gitignore new file mode 100644 index 0000000..df63422 --- /dev/null +++ b/conf/.gitignore @@ -0,0 +1,2 @@ +/checkmate.ini +/querycache.ini diff --git a/conf/checkmate.ini.sample b/conf/checkmate.ini.sample index c91cfa3..654e965 100644 --- a/conf/checkmate.ini.sample +++ b/conf/checkmate.ini.sample @@ -1 +1,9 @@ -WebServicePort = 60080 +[global] +WebPort = 8082 +UseWebSocket = false + +[email] +SmtpServer = +SmtpFrom = CheckMate +SmtpPort = +MailRecipients = diff --git a/conf/log4j.properties b/conf/log4j.properties index 6762096..5501ebe 100644 --- a/conf/log4j.properties +++ b/conf/log4j.properties @@ -1,4 +1,4 @@ -checkmate.root.logger=INFO,DRFA,RFA,console +checkmate.root.logger=INFO,DRFA,console checkmate.log.dir=${CM_HOME}/logs checkmate.log.file=checkmate.log checkmate.warn.log.file=warn.checkmate.log @@ -15,38 +15,10 @@ log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender log4j.appender.DRFA.File=${checkmate.log.dir}/${checkmate.log.file} log4j.appender.DRFA.DatePattern=.yyyy-MM-dd log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout -log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n - -# WARN -log4j.appender.RFA=org.apache.log4j.DailyRollingFileAppender -log4j.appender.RFA.File=${checkmate.log.dir}/${checkmate.warn.log.file} -log4j.appender.RFA.DatePattern=.yyyy-MM-dd -log4j.appender.RFA.layout=org.apache.log4j.PatternLayout -log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n -log4j.appender.RFA.Threshold=WARN +log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2}(%L): %m%n # Console log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.target=System.err log4j.appender.console.layout=org.apache.log4j.PatternLayout -log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n - -# querycache log -checkmate.querycache.log.file=querycache.log -log4j.logger.querycache=DEBUG,DRFAquerycache -log4j.appender.DRFAquerycache=org.apache.log4j.DailyRollingFileAppender -log4j.appender.DRFAquerycache.File=${checkmate.log.dir}/${checkmate.querycache.log.file} -log4j.appender.DRFAquerycache.DatePattern=.yyyy-MM-dd -log4j.appender.DRFAquerycache.layout=org.apache.log4j.PatternLayout -log4j.appender.DRFAquerycache.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n - - -# querycache log -checkmate.api.log.file=api.log -log4j.logger.api=DEBUG,DRFAapi -log4j.appender.DRFAapi=org.apache.log4j.DailyRollingFileAppender -log4j.appender.DRFAapi.File=${checkmate.log.dir}/${checkmate.api.log.file} -log4j.appender.DRFAapi.DatePattern=.yyyy-MM-dd -log4j.appender.DRFAapi.layout=org.apache.log4j.PatternLayout -log4j.appender.DRFAapi.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n - +log4j.appender.console.layout.ConversionPattern=%d{ISO8601} %-5p %c{2}(%L): %m%n diff --git a/conf/querycache.ini.sample b/conf/querycache.ini.sample index 9ebdba7..e5cd112 100644 --- a/conf/querycache.ini.sample +++ b/conf/querycache.ini.sample @@ -1,7 +1,11 @@ +[global] PartialUpdateInterval = 3000 FullUpdateInterval = 10000 +UpdateFailSleepTime = 300000; Clusters = "myCluster" [myCluster] WebPort = 8080 Servers = localhost +MaxCompleteQueries = 100 +UseWebSocket = false \ No newline at end of file diff --git a/pom.xml b/pom.xml index 43ba042..89ff1ad 100644 --- a/pom.xml +++ b/pom.xml @@ -16,37 +16,6 @@ - - - - - - - - - - - - - - - - - - - - - - - - org.apache.maven.plugins - maven-compiler-plugin - 3.2 - - 1.7 - 1.7 - - org.codehaus.mojo exec-maven-plugin @@ -54,8 +23,6 @@ java - - -cp ${basedir}/conf:${basedir}/lib/* -DCM_HOME=${basedir} @@ -65,28 +32,12 @@ org.apache.maven.plugins - maven-enforcer-plugin - 1.1.1 - - - enforce-java - - enforce - - - - - [3.0.0,) - - - [1.7,) - [ERROR] OLD JDK [${java.version}] in use. - Jetty ${jettyVersion} requires JDK 1.7 or newer - - - - - + maven-compiler-plugin + 3.5.1 + + 1.8 + 1.8 + @@ -168,21 +119,6 @@ javax.mail 1.5.4 - - - - - - - - - - - - - - - io.dropwizard.metrics metrics-core @@ -193,5 +129,10 @@ metrics-servlets ${metrics.version} + + com.skplanet.querycache + querycache-server + 0.26.15-SNAPSHOT + diff --git a/src/main/java/com/beans/ClustersBean.java b/src/main/java/com/beans/ClustersBean.java index 026bf66..d0f6294 100644 --- a/src/main/java/com/beans/ClustersBean.java +++ b/src/main/java/com/beans/ClustersBean.java @@ -1,21 +1,30 @@ package com.beans; -import com.skplanet.checkmate.querycache.QCClusterManager; - import java.util.ArrayList; import java.util.Collection; +import com.skplanet.checkmate.CheckMateServer; +import com.skplanet.checkmate.querycache.QCClusterOptions; + /** * Created by nazgul33 on 15. 2. 6. */ public class ClustersBean implements java.io.Serializable { - Collection qcClusters = new ArrayList<>(); + + + private static final long serialVersionUID = 2387849279567737354L; + + private Collection qcClusters = new ArrayList<>(); public ClustersBean() { - qcClusters.addAll( QCClusterManager.getInstance().getClusterList() ); + qcClusters.addAll( CheckMateServer.getClusterManager().getClusterNameList() ); } public Collection getQcClusters() { return qcClusters; } + + public QCClusterOptions getQcOption(String name) { + return CheckMateServer.getClusterManager().getCluster(name).getOptions(); + } } diff --git a/src/main/java/com/skplanet/checkmate/CheckMateServer.java b/src/main/java/com/skplanet/checkmate/CheckMateServer.java index d5d0a37..754e47d 100644 --- a/src/main/java/com/skplanet/checkmate/CheckMateServer.java +++ b/src/main/java/com/skplanet/checkmate/CheckMateServer.java @@ -1,270 +1,130 @@ package com.skplanet.checkmate; -import com.codahale.metrics.MetricRegistry; -import com.codahale.metrics.servlets.MetricsServlet; -import com.skplanet.checkmate.querycache.QCClusterManager; -import com.skplanet.checkmate.servlet.CMApiServletQC; -import com.skplanet.checkmate.servlet.CMWebSocketServletQC; -import com.skplanet.checkmate.utils.MailSender; -import com.skplanet.checkmate.yarn.YarnMonitor; +import static com.skplanet.checkmate.ConfigKeys.*; + +import java.io.File; + import org.apache.commons.configuration.HierarchicalINIConfiguration; import org.apache.commons.configuration.SubnodeConfiguration; -import org.apache.tomcat.InstanceManager; -import org.apache.tomcat.SimpleInstanceManager; -import org.eclipse.jetty.annotations.ServletContainerInitializersStarter; -import org.eclipse.jetty.apache.jsp.JettyJasperInitializer; -import org.eclipse.jetty.jsp.JettyJspServlet; -import org.eclipse.jetty.plus.annotation.ContainerInitializer; -import org.eclipse.jetty.server.Handler; -import org.eclipse.jetty.server.Server; -import org.eclipse.jetty.server.handler.ContextHandlerCollection; -import org.eclipse.jetty.server.handler.HandlerList; -import org.eclipse.jetty.servlet.DefaultServlet; -import org.eclipse.jetty.servlet.ServletContextHandler; -import org.eclipse.jetty.servlet.ServletHolder; -import org.eclipse.jetty.webapp.WebAppContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.net.*; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; +import com.codahale.metrics.MetricRegistry; +import com.skplanet.checkmate.querycache.QCClusterManager; +import com.skplanet.checkmate.utils.MailSender; /** * Hello world! * */ -public class CheckMateServer -{ - private static final Logger LOG = LoggerFactory.getLogger("main"); +public class CheckMateServer { + + private static final Logger LOG = LoggerFactory.getLogger(CheckMateServer.class); + private static final MetricRegistry metrics = new MetricRegistry(); - public static MetricRegistry getMetrics() { - return metrics; - } + private File confDir; + + private final HierarchicalINIConfiguration ini; - private void StartClusterManagers() { - LOG.info("Starting QueryCache Cluster Thread"); - QCClusterManager.getInstance().startThread(); + private QCClusterManager clusterManager; + private CheckMateWebServer webServer; -// LOG.info("Starting Yarn Monitor"); -// YarnMonitor.getInstance(); - } - private void ShutdownClusterManagers() { - LOG.info("Shutting down QueryCache Cluster Thread"); - QCClusterManager.getInstance().finishThread(); - } + private static CheckMateServer _this = null; + + private CheckMateServer(File homeDir) throws Exception { - private Server webServer = null; - private int webServicePort; - private String home = null; - private String wwwroot = null; - private static final String WEBROOT_INDEX = "/www/"; + confDir = new File(homeDir, "conf"); - private URI getWebRootResourceUri() throws FileNotFoundException, URISyntaxException, MalformedURLException - { - // try local resource first - wwwroot = home + "/www"; - URL indexUri = new File(wwwroot).toURI().toURL(); - if (indexUri == null) - { - // fallback to resources in jar - wwwroot = WEBROOT_INDEX; - indexUri = this.getClass().getResource(wwwroot); - throw new FileNotFoundException("Unable to find resource " + wwwroot); - } - // Points to wherever /webroot/ (the resource) is - return indexUri.toURI(); - } + File iniFile = new File(confDir, CHECKMATE_INI_FILE); + LOG.info("reading checkmate configuration from " + iniFile); - private File getScratchDir() throws IOException - { - File scratchDir = new File(home, "/tmp/jetty-jsp"); - - if (!scratchDir.exists()) + ini = new HierarchicalINIConfiguration(iniFile); + + SubnodeConfiguration global = ini.getSection(CHECKMATE_GLOBAL_SECTION); + global.setThrowExceptionOnMissing(true); + + // MailSender { - LOG.info("making scratch directory " + scratchDir.toString()); - if (!scratchDir.mkdirs()) - { - LOG.error("making scratch directory failed."); - throw new IOException("Unable to create scratch directory: " + scratchDir); + SubnodeConfiguration email = ini.getSection(CHECKMATE_EMAIL_SECTION); + email.setThrowExceptionOnMissing(true); + + String server = email.getString(CHECKMATE_EMAIL_SMTP_SERVER, null); + String from = email.getString(CHECKMATE_EMAIL_SMTP_FROM, null); + int port = email.getInt(CHECKMATE_EMAIL_SMTP_PORT, CHECKMATE_EMAIL_STMP_PORT_DEFAULT); + String tos = email.getString(CHECKMATE_EMAIL_RECIPIENTS, null); + + if (server != null && !server.isEmpty() && tos != null && !tos.isEmpty()) { + MailSender.initialize(server, port, from, tos.split(",")); + } else { + LOG.info("mail sender not initialized"); } } - return scratchDir; + + // ClusterManager + clusterManager = new QCClusterManager(confDir); + + // WebServer + { + int port = global.getInt(CHECKMATE_WEB_PORT, CHECKMATE_WEB_PORT_DEFAULT); + boolean useWebSocket = global.getBoolean(CHECKMATE_USER_WEBSOCET, CHECKMATE_USER_WEBSOCET_DEFAULT); + webServer = new CheckMateWebServer(port, useWebSocket, homeDir, metrics); + } } - - private List jspInitializers() - { - JettyJasperInitializer sci = new JettyJasperInitializer(); - ContainerInitializer initializer = new ContainerInitializer(sci, null); - List initializers = new ArrayList<>(); - initializers.add(initializer); - return initializers; + + public static CheckMateServer getInstance() { + return _this; } - - private ClassLoader getUrlClassLoader() - { - ClassLoader jspClassLoader = new URLClassLoader(new URL[0], this.getClass().getClassLoader()); - return jspClassLoader; + + public static QCClusterManager getClusterManager() { + return _this.clusterManager; } - - private ServletHolder jspServletHolder() - { - ServletHolder holderJsp = new ServletHolder("jsp", JettyJspServlet.class); - holderJsp.setInitOrder(0); - holderJsp.setInitParameter("logVerbosityLevel", "DEBUG"); - holderJsp.setInitParameter("fork", "false"); - holderJsp.setInitParameter("xpoweredBy", "false"); - holderJsp.setInitParameter("compilerTargetVM", "1.7"); - holderJsp.setInitParameter("compilerSourceVM", "1.7"); - holderJsp.setInitParameter("keepgenerated", "true"); - return holderJsp; + + public static MetricRegistry getMetrics() { + return metrics; } - private ServletHolder defaultServletHolder(URI baseUri) - { - ServletHolder holderDefault = new ServletHolder("default", DefaultServlet.class); - LOG.info("Base URI: " + baseUri); - holderDefault.setInitParameter("resourceBase", baseUri.toASCIIString()); - holderDefault.setInitParameter("dirAllowed", "false"); - holderDefault.setInitParameter("redirectWelcome", "true"); - holderDefault.setInitParameter("welcomeServlets", "false"); + private void startClusterManagers() { + + LOG.info("Starting QueryCache Cluster Thread"); + clusterManager.start(); - return holderDefault; +// LOG.info("Starting Yarn Monitor"); +// YarnMonitor.getInstance(); } - - private WebAppContext getWebAppContext(URI baseUri, File scratchDir) - { - WebAppContext context = new WebAppContext(); - context.setContextPath("/"); - context.setAttribute("javax.servlet.context.tempdir", scratchDir); - context.setAttribute("org.eclipse.jetty.server.webapp.ContainerIncludeJarPattern", - ".*/[^/]*servlet-api-[^/]*\\.jar$|.*/javax.servlet.jsp.jstl-.*\\.jar$|.*/.*taglibs.*\\.jar$"); - context.setResourceBase(baseUri.toASCIIString()); - context.setAttribute("org.eclipse.jetty.containerInitializers", jspInitializers()); - context.setAttribute(InstanceManager.class.getName(), new SimpleInstanceManager()); - context.addBean(new ServletContainerInitializersStarter(context), true); - context.setClassLoader(getUrlClassLoader()); - - context.addServlet(jspServletHolder(), "*.jsp"); - // Add Application Servlets - // context.addServlet(BeanMapper.class, "*.cm"); - context.addServlet(defaultServletHolder(baseUri), "/"); - return context; + + private void startWebServer() throws Exception { + webServer.runWebServer(); } - - public Server runWebInterface() throws Exception { - // create web service - LOG.info("Starting web interface..."); - webServer = new Server(webServicePort); - - URI baseUri = getWebRootResourceUri(); - - System.setProperty("org.apache.jasper.compiler.disablejsr199","false"); - -// ServletContextHandler apiImpala = new ServletContextHandler(ServletContextHandler.SESSIONS); -// apiImpala.setContextPath("/api/impala"); -// apiImpala.addServlet(new ServletHolder(new QCWebApiServlet()), "/*"); - - ServletContextHandler apiQc = new ServletContextHandler(ServletContextHandler.SESSIONS); - apiQc.setContextPath("/api/qc"); - apiQc.addServlet(new ServletHolder(new CMWebSocketServletQC()), "/websocket/*"); - apiQc.addServlet(new ServletHolder(new CMApiServletQC()), "/*"); - - ServletContextHandler metrics = new ServletContextHandler(ServletContextHandler.SESSIONS); - metrics.setContextPath("/api/metrics"); - metrics.addServlet(new ServletHolder(new MetricsServlet(getMetrics())), "/metrics"); - - File scratchDir = getScratchDir(); - ServletContextHandler webappHandler = getWebAppContext(baseUri, scratchDir); - - ContextHandlerCollection contextCollection = new ContextHandlerCollection(); - contextCollection.setHandlers( new Handler[] { apiQc, metrics, webappHandler } ); - - HandlerList handlers = new HandlerList(); - handlers.setHandlers(new Handler[]{contextCollection}); - webServer.setHandler(handlers); - webServer.start(); - LOG.info("Started web interface..."); - - return webServer; + + private void stopClusterManagers() { + LOG.info("Shutting down QueryCache Cluster Thread"); + clusterManager.stop(); } - private final HierarchicalINIConfiguration clusterConfigFile; - private MailSender mailSender = null; - private List mailRecipients = null; - - public CheckMateServer() throws Exception { - home = System.getenv("CM_HOME"); - if (home == null) { - home = "./"; - } - - String conf = home + "/conf/checkmate.ini"; - LOG.info("reading checkmate configuration from " + conf); - - clusterConfigFile = new HierarchicalINIConfiguration(conf); - SubnodeConfiguration globalSection = clusterConfigFile.getSection(null); - globalSection.setThrowExceptionOnMissing(true); - webServicePort = globalSection.getInt("WebServicePort", 8080); + public static void main( String[] args ) { - // init mail sender - String smtpServer = globalSection.getString("SmtpServer", null); - String smtpFrom = globalSection.getString("SmtpFrom", "CheckMate"); - int smtpPort = globalSection.getInt("SmtpPort", 25); - String recipients = globalSection.getString("MailRecipients", null); - - if (smtpServer != null && recipients != null) { - mailRecipients = Arrays.asList( recipients.split(",") ); - mailSender = new MailSender(smtpServer, smtpFrom, smtpPort, mailRecipients); - new Thread(mailSender).start(); + String cmHome = System.getenv("CM_HOME"); + if (cmHome == null) { + throw new RuntimeException("$CM_HOME undefined"); } - } - - public HierarchicalINIConfiguration getClusterConfigFile() { - return clusterConfigFile; - } - - public List getMailRecipients() { - return mailRecipients; - } - - public MailSender getMailSender() { - return mailSender; - } - - private static CheckMateServer server = null; - public static CheckMateServer getInstance() { - return server; - } - - public static void main( String[] args ) - { - try { - server = new CheckMateServer(); - server.StartClusterManagers(); + + File homeDir = new File(cmHome); + try { + _this = new CheckMateServer(homeDir); + _this.startClusterManagers(); } catch (Exception e) { LOG.error("error initializing CheckMateServer", e); System.exit(1); } try { - server.runWebInterface(); + _this.startWebServer(); } catch (Exception e) { - LOG.error("exception while starting web interface"); + LOG.error("exception while starting web interface", e); System.exit(1); } - - // run forever until interrupted. - try { - server.webServer.join(); - } catch (Exception e) { - LOG.error("Checkmate Interrupted." + e.toString()); - } - server.ShutdownClusterManagers(); + + _this.stopClusterManagers(); } } diff --git a/src/main/java/com/skplanet/checkmate/CheckMateWebServer.java b/src/main/java/com/skplanet/checkmate/CheckMateWebServer.java new file mode 100644 index 0000000..8373a97 --- /dev/null +++ b/src/main/java/com/skplanet/checkmate/CheckMateWebServer.java @@ -0,0 +1,133 @@ +package com.skplanet.checkmate; + +import java.io.File; +import java.io.FileNotFoundException; +import java.net.URI; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.ArrayList; +import java.util.List; + +import org.apache.tomcat.InstanceManager; +import org.apache.tomcat.SimpleInstanceManager; +import org.eclipse.jetty.annotations.ServletContainerInitializersStarter; +import org.eclipse.jetty.apache.jsp.JettyJasperInitializer; +import org.eclipse.jetty.jsp.JettyJspServlet; +import org.eclipse.jetty.plus.annotation.ContainerInitializer; +import org.eclipse.jetty.server.Handler; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.handler.ContextHandlerCollection; +import org.eclipse.jetty.server.handler.HandlerList; +import org.eclipse.jetty.servlet.DefaultServlet; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.eclipse.jetty.webapp.WebAppContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.servlets.MetricsServlet; +import com.skplanet.checkmate.servlet.CMQCApiServlet; +import com.skplanet.checkmate.servlet.CMQCWebSocketServlet; + +public class CheckMateWebServer { + + private static final Logger LOG = LoggerFactory.getLogger(CheckMateWebServer.class); + + private Server webServer; + + public CheckMateWebServer(int port, boolean useWebSocket, File rootDir, MetricRegistry metric) throws Exception { + + LOG.info("Starting web interface..."); + webServer = new Server(port); + + File webRoot = new File(rootDir,"www"); + if (!webRoot.exists()) { + throw new FileNotFoundException("Unable to find resource " + webRoot); + } + + // Points to wherever /www/ (the resource) is + URI baseUri = webRoot.toURI(); + + System.setProperty("org.apache.jasper.compiler.disablejsr199","false"); + + // ServletContextHandler apiImpala = new ServletContextHandler(ServletContextHandler.SESSIONS); + // apiImpala.setContextPath("/api/impala"); + // apiImpala.addServlet(new ServletHolder(new QCWebApiServlet()), "/*"); + + ServletContextHandler apiQc = new ServletContextHandler(ServletContextHandler.SESSIONS); + apiQc.setContextPath("/api/qc"); + if (useWebSocket) { + apiQc.addServlet(new ServletHolder(new CMQCWebSocketServlet()), "/websocket/*"); + } + apiQc.addServlet(new ServletHolder(new CMQCApiServlet()), "/*"); + + ServletContextHandler metrics = new ServletContextHandler(ServletContextHandler.SESSIONS); + metrics.setContextPath("/api/metrics"); + metrics.addServlet(new ServletHolder(new MetricsServlet(metric)), "/metrics"); + + // File scratchDir = getScratchDir(); + WebAppContext context = new WebAppContext(); + context.setContextPath("/"); + // context.setAttribute("javax.servlet.context.tempdir", scratchDir); + context.setAttribute("org.eclipse.jetty.server.webapp.ContainerIncludeJarPattern", + ".*/[^/]*servlet-api-[^/]*\\.jar$|.*/javax.servlet.jsp.jstl-.*\\.jar$|.*/.*taglibs.*\\.jar$"); + context.setResourceBase(webRoot.toString()); + context.setAttribute("org.eclipse.jetty.containerInitializers", getJspInitializers()); + context.setAttribute(InstanceManager.class.getName(), new SimpleInstanceManager()); + context.addBean(new ServletContainerInitializersStarter(context), true); + context.setClassLoader(getUrlClassLoader()); + context.addServlet(getJspServletHolder(), "*.jsp"); + context.addServlet(getDefaultServletHolder(baseUri), "/"); + // ServletContextHandler webappHandler = getWebAppContext(baseUri, scratchDir); + + ContextHandlerCollection contextCollection = new ContextHandlerCollection(); + contextCollection.setHandlers( new Handler[] { apiQc, metrics, context } ); + + HandlerList handlers = new HandlerList(); + handlers.setHandlers(new Handler[]{contextCollection}); + webServer.setHandler(handlers); + } + + private List getJspInitializers() { + JettyJasperInitializer sci = new JettyJasperInitializer(); + ContainerInitializer initializer = new ContainerInitializer(sci, null); + List initializers = new ArrayList<>(); + initializers.add(initializer); + return initializers; + } + + private ClassLoader getUrlClassLoader() { + ClassLoader jspClassLoader = new URLClassLoader(new URL[0], this.getClass().getClassLoader()); + return jspClassLoader; + } + + private ServletHolder getJspServletHolder() { + ServletHolder holderJsp = new ServletHolder("jsp", JettyJspServlet.class); + holderJsp.setInitOrder(0); + holderJsp.setInitParameter("logVerbosityLevel", "DEBUG"); + holderJsp.setInitParameter("fork", "false"); + holderJsp.setInitParameter("xpoweredBy", "false"); + holderJsp.setInitParameter("compilerTargetVM", "1.7"); + holderJsp.setInitParameter("compilerSourceVM", "1.7"); + holderJsp.setInitParameter("keepgenerated", "true"); + return holderJsp; + } + + private ServletHolder getDefaultServletHolder(URI baseUri) { + ServletHolder holderDefault = new ServletHolder("default", DefaultServlet.class); + LOG.info("Base URI: " + baseUri); + holderDefault.setInitParameter("resourceBase", baseUri.toASCIIString()); + holderDefault.setInitParameter("dirAllowed", "false"); + holderDefault.setInitParameter("redirectWelcome", "true"); + holderDefault.setInitParameter("welcomeServlets", "false"); + + return holderDefault; + } + + public void runWebServer() throws Exception { + + webServer.start(); + webServer.join(); + } +} diff --git a/src/main/java/com/skplanet/checkmate/ConfigKeys.java b/src/main/java/com/skplanet/checkmate/ConfigKeys.java new file mode 100644 index 0000000..bb8e23b --- /dev/null +++ b/src/main/java/com/skplanet/checkmate/ConfigKeys.java @@ -0,0 +1,41 @@ +package com.skplanet.checkmate; + +public class ConfigKeys { + + public static final String CHECKMATE_INI_FILE = "checkmate.ini"; + + public static final String CHECKMATE_GLOBAL_SECTION = "global"; + public static final String CHECKMATE_WEB_PORT = "WebPort"; + public static final int CHECKMATE_WEB_PORT_DEFAULT = 8082; + public static final String CHECKMATE_USER_WEBSOCET = "UseWebSocket"; + public static final boolean CHECKMATE_USER_WEBSOCET_DEFAULT = false; + public static final String QUERYCACHE_INI_FILE = "querycache.ini"; + + public static final String CHECKMATE_EMAIL_SECTION = "email"; + public static final String CHECKMATE_EMAIL_SMTP_SERVER = "SmtpServer"; + public static final String CHECKMATE_EMAIL_SMTP_FROM = "SmtpFrom"; + public static final String CHECKMATE_EMAIL_SMTP_PORT = "SmtpPort"; + public static final int CHECKMATE_EMAIL_STMP_PORT_DEFAULT = 25; + public static final String CHECKMATE_EMAIL_RECIPIENTS = "MailRecipients"; + + // Global Section + public static final String PARTIAL_UPDATE_INTERAVAL = "PartialUpdateInterval"; + public static final long PARTIAL_UPDATE_INTERVAL_DEFAULT = 5*1000; + public static final String FULL_UPDATE_INTERVAL = "FullUpdateInterval"; + public static final long FULL_UPDATE_INTERVAL_DEFAULT = 10*1000; + public static final String UPDATE_FAIL_SLEEP_TIME = "UpdateFailSleepTime"; + public static final int UPDATE_FAIL_SLEEP_TIME_DEFAULT = 5*60*1000; + + // Cluster Section + public static final String CLUSTER_NAMES = "Clusters"; + public static final String CLUSTER_WEB_PORT = "WebPort"; + public static final int CLUSTER_WEB_PORT_DEFFAULT = 8081; + public static final String CLUSTER_SERVERS = "Servers"; + public static final String CLUSTER_MAX_COMPLETE_QUERIES = "MaxComleteQueries"; + public static final int CLUSTER_MAX_COMPLETE_QUERIES_DEFAULT = 100; + public static final String CLUSTER_USE_WEBSOCKET = "UseWebSocket"; + public static final boolean CLUSTER_USER_WEBSOCKE_DEFAULT = false; + + // Server Config + public static final int EVENT_QUEUE_MAX_LIMIT = 3000; +} diff --git a/src/main/java/com/skplanet/checkmate/querycache/QCClientWebSocket.java b/src/main/java/com/skplanet/checkmate/querycache/QCClientWebSocket.java deleted file mode 100644 index 2ca8086..0000000 --- a/src/main/java/com/skplanet/checkmate/querycache/QCClientWebSocket.java +++ /dev/null @@ -1,136 +0,0 @@ -package com.skplanet.checkmate.querycache; - -import com.google.gson.Gson; -import org.eclipse.jetty.websocket.api.Session; -import org.eclipse.jetty.websocket.api.WebSocketListener; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import sun.misc.Request; - -/** - * Created by nazgul33 on 15. 2. 23. - */ -public class QCClientWebSocket implements WebSocketListener { - private static final boolean DEBUG = false; - private static final Logger LOG = LoggerFactory.getLogger("websocket-client-qc"); - - private Session outbound = null; - private QCServer server = null; - - private static class RequestMsg { - public String request; - public String channel; - public String data; - } - - public QCClientWebSocket(QCServer server) { - this.server = server; - } - - @Override - public void onWebSocketBinary(byte[] payload, int offset, int len) - { - /* not interested */ - } - - @Override - public void onWebSocketClose(int statusCode, String reason) - { - LOG.info("WebSocket to " + server.name + " closed"); - this.outbound = null; - server.removeRtWebSocket(); - } - - @Override - public void onWebSocketConnect(Session session) { - this.outbound = session; - LOG.info("WebSocket to " + server.name + " established"); - - RequestMsg req = new RequestMsg(); - req.request = "subscribe"; - req.channel = "runningQueries"; - - Gson gson = new Gson(); - String msg = gson.toJson(req); - - sendMessage(msg); - } - - @Override - public void onWebSocketError(Throwable cause) { - LOG.error("websocket error", cause); - if (this.outbound == null) { - // connection error. server's websocket should be cleared. - server.removeRtWebSocket(); - } - } - - public static final String evtStrQueryAdded = "runningQueryAdded"; - public static final String evtStrQueryUpdated = "runningQueryUpdated"; - public static final String evtStrQueryRemoved = "runningQueryRemoved"; - public static final String evtStrResult = "result"; - public static final String evtStrPong = "pong"; - private static class RunningQueryEvent { - public String msgType = null; - public String result = null; - public QCQuery.QueryImport query = null; - public String queryId = null; - } - - @Override - public void onWebSocketText(String message) { - Gson gson = new Gson(); - RunningQueryEvent evt = null; - try { - evt = gson.fromJson(message, RunningQueryEvent.class); - } catch (Exception e) { - LOG.error("Gson exception :", message); - } - - if (evt == null || evt.msgType == null) { - LOG.error("invalid event object received.", message); - return; - } - - QCQuery q; - switch (evt.msgType) { - case evtStrQueryAdded: - case evtStrQueryUpdated: - if (evt.query != null) { - LOG.debug("new RT query"); - q = new QCQuery(server, evt.query); - server.processAddQueryEvent(q); - } - break; - case evtStrQueryRemoved: - if (evt.query != null) { - LOG.debug("removing RT query"); - q = new QCQuery(server, evt.query); - server.processRemoveQueryEvent(q); - } - break; - case evtStrResult: - LOG.info("result " + evt.result); - break; - case evtStrPong: - LOG.debug("pong!!"); - break; - default: - LOG.warn("ignoring message", message); - break; - } - } - - public void sendMessage(String message) { - if (outbound == null || !outbound.isOpen()) - return; - outbound.getRemote().sendString(message, null); - } - - public void ping() { - Gson gson = new Gson(); - RequestMsg msg = new RequestMsg(); - msg.request = "ping"; - sendMessage(gson.toJson(msg)); - } -} diff --git a/src/main/java/com/skplanet/checkmate/querycache/QCCluster.java b/src/main/java/com/skplanet/checkmate/querycache/QCCluster.java index 8caaebb..4665f8b 100644 --- a/src/main/java/com/skplanet/checkmate/querycache/QCCluster.java +++ b/src/main/java/com/skplanet/checkmate/querycache/QCCluster.java @@ -1,384 +1,415 @@ package com.skplanet.checkmate.querycache; +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.codahale.metrics.Histogram; import com.codahale.metrics.SlidingTimeWindowReservoir; import com.google.gson.Gson; import com.skplanet.checkmate.CheckMateServer; -import com.skplanet.checkmate.servlet.CMQCServerWebSocket; -import com.skplanet.checkmate.utils.MailSender; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.awt.*; -import java.util.*; -import java.util.List; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; +import com.skplanet.checkmate.ConfigKeys; +import com.skplanet.checkmate.querycache.data.ClusterServerPoolInfo; +import com.skplanet.checkmate.querycache.data.ClusterServerSysStats; +import com.skplanet.checkmate.servlet.CMQCWebSocket; +import com.skplanet.querycache.server.cli.ObjectPool.QCObjectPoolProfile; +import com.skplanet.querycache.server.cli.QueryProfile; +import com.skplanet.querycache.server.common.SQLUtil; +import com.skplanet.querycache.servlet.QCApiConDesc; +import com.skplanet.querycache.servlet.QCApiServerSystemStats; +import com.skplanet.querycache.servlet.QCWebSocket.ChangedProfile; /** * Created by nazgul33 on 15. 1. 27. */ public class QCCluster { - private static final boolean DEBUG = false; - private static final Logger LOG = LoggerFactory.getLogger("querycache"); - private static final int MAX_COMPLETE_QUERIES = 100; - private static final String RQE_ADDED = "runningQueryAdded"; - private static final String RQE_UPDATED = "runningQueryUpdated"; - private static final String RQE_REMOVED = "runningQueryRemoved"; - - static public class Options { - public int webPort = 8080; - public int partialUpdateInterval = 3000; - public int fullUpdateInterval = 10000; - public String[] servers; - } - - public final String name; - private Map servers; - private Options opt; - - private Timer notifyTimer = new Timer(); - Histogram histogram; - - public QCCluster(String name, Options opt) { - this.name = name; - this.opt = opt; - - servers = new HashMap<>(); - for (String serverName : opt.servers) { - if (servers.containsKey(serverName)) { - System.console().printf("duplicate server name %s\n", serverName); - System.exit(1); - } - servers.put(serverName, new QCServer(serverName, this)); - } - - TimerTask wsNotifyTask = new TimerTask() { - @Override - public void run() { - notifySubscribers(); - } - }; - notifyTimer.scheduleAtFixedRate(wsNotifyTask, 0, 50); - notifyTimer.scheduleAtFixedRate(queryAnomalyDetectorTask, 5000L, 5000L); - - histogram = new Histogram( new SlidingTimeWindowReservoir(5L, TimeUnit.MINUTES) ); - CheckMateServer.getMetrics().register("queries."+name, histogram); - } - - public void updateCounters() { - int count = 0; - for (Map.Entry entry : servers.entrySet()) { - count += entry.getValue().updateQueryCount(); - } - histogram.update(count); - } - public Map getServers() { - return servers; - } - - public QCServer getServer(String name) { - return servers.get(name); - } - - public void Update() { - // update all servers - Date start = new Date(); - for (Map.Entry entry : servers.entrySet()) { - QCServer server = entry.getValue(); - server.Update(); - - // update sysinfo - QCServer.SysInfoCollection sysInfo = server.getSysInfoCollection(); - if (sysInfo != null) { - SystemInfo mySysInfo = sysInfoMap.get(server.name); - if ( mySysInfo == null) { - mySysInfo = new SystemInfo(); - mySysInfo.server = server.name; - sysInfoMap.put(server.name, mySysInfo); - } - mySysInfo.jvm = sysInfo.jvm; - mySysInfo.system = sysInfo.system; - mySysInfo.threads = sysInfo.threads; - } - - QCServer.ObjectPool objPoolInfo = server.getObjectPool(); - Collection connPoolInfo = server.getConnections(); - if (objPoolInfo != null && connPoolInfo != null) { - PoolInfo myPoolInfo = poolInfoMap.get(server.name); - if ( myPoolInfo == null) { - myPoolInfo = new PoolInfo(); - myPoolInfo.server = server.name; - poolInfoMap.put(server.name, myPoolInfo); - } - myPoolInfo.objPool = objPoolInfo; - myPoolInfo.connPoolList = connPoolInfo; - } - } - Date end = new Date(); - if (DEBUG) { - LOG.debug("Updating cluster " + name + " took " + (end.getTime() - start.getTime()) + " ms"); - } - - refreshExportedCompleteQueries(); - } - - public Options getOpt() { - return opt; - } - - MailSender mailSender = CheckMateServer.getInstance().getMailSender(); - private class EventAnomalyDetector { - private static final long longRunningLimit = 900L * 1000L; // 900 sec - boolean longRunningQuery = false; - QCQuery.QueryExport query = null; - - void detect() { - long now = System.currentTimeMillis(); - if ( query == null ) - return; - - if (!longRunningQuery) { - if (query.startTime + longRunningLimit < now && - ("EXEC".equals(query.state) || "INIT".equals(query.state))) { - longRunningQuery = true; - - LOG.info("Slow query {}:{}", query.backend, query.id); - if (mailSender != null) mailSender.addMail( - new QCQueryMail(query, "Slow Query by " + query.user, "Slow query detected by CheckMate.") - ); - } - } - } - } - - private final HashMap detectorMap = new HashMap<>(); - - TimerTask queryAnomalyDetectorTask = new TimerTask() { - @Override - public void run() { - Map dMap; - synchronized (detectorMap) { - dMap = new HashMap<>(detectorMap); - } - for ( Map.Entry entry : dMap.entrySet() ) { - entry.getValue().detect(); - } - } - }; - - private static class RunningQueryEvent { - public long timestamp = System.currentTimeMillis(); - public String msgType = null; - public QCQuery.QueryExport query = null; - public String cuqId = null; - } - - private final Map exportedRunningQueriesMap = new HashMap<>(); - public QCQuery.QueryExport getExportedRunningQueryByCuqId(String cuqId) { - QCQuery.QueryExport q; - synchronized (exportedRunningQueriesMap) { - q = exportedRunningQueriesMap.get(cuqId); - } - return q; - } - - public void addExportedRunningQuery(QCQuery q) { - QCQuery.QueryExport eq = q.export(); - boolean queryUpdated; - synchronized (exportedRunningQueriesMap) { - queryUpdated = (exportedRunningQueriesMap.put(eq.cuqId, eq) != null); - } - synchronized (detectorMap) { - EventAnomalyDetector det = detectorMap.get(eq.cuqId); - if (det != null) - det.query = eq; - else { - det = new EventAnomalyDetector(); - det.query = eq; - detectorMap.put(eq.cuqId, det); - } - } - - RunningQueryEvent evt = new RunningQueryEvent(); - evt.msgType = queryUpdated? RQE_UPDATED:RQE_ADDED; - evt.query = eq; - evt.cuqId = eq.cuqId; - addRunningQueryEvent(evt); - } - - public void removeExportedRunningQuery(QCQuery q) { - synchronized (exportedRunningQueriesMap) { - exportedRunningQueriesMap.remove(q.cuqId); - } - synchronized (detectorMap) { - detectorMap.remove(q.cuqId); - } - QCQuery.QueryExport eq = q.export(); - RunningQueryEvent evt = new RunningQueryEvent(); - evt.msgType = RQE_REMOVED; - evt.query = eq; - evt.cuqId = eq.cuqId; - addRunningQueryEvent(evt); - } - - public Collection getExportedRunningQueries() { - List ql = new ArrayList<>(); - synchronized (exportedRunningQueriesMap) { - ql.addAll(exportedRunningQueriesMap.values()); - } - Collections.sort(ql, new Comparator() { - @Override - public int compare(QCQuery.QueryExport o1, QCQuery.QueryExport o2) { - return ((o1.startTime - o2.startTime) < 0) ? -1 : (o1.startTime == o2.startTime) ? 0 : 1; - } - }); - return ql; - } - - // complete query update accelerators - private final List tmpCompleteQueries = new ArrayList<>(); - void queueCompleteQuery(QCQuery q) { - synchronized (tmpCompleteQueries) { - tmpCompleteQueries.add(q); - } - } - - private final LinkedList exportedCompleteQueries = new LinkedList<>(); - private void refreshExportedCompleteQueries() { - synchronized (tmpCompleteQueries) { - Collections.sort(tmpCompleteQueries, new Comparator() { - @Override - public int compare(QCQuery o1, QCQuery o2) { - return (o2.endTime > o1.endTime) ? 1 : (o2.endTime == o1.endTime) ? 0 : -1; - } - }); - - synchronized (exportedCompleteQueries) { - for (QCQuery q: tmpCompleteQueries) { - // addFirst() to make most recent query comes first. - exportedCompleteQueries.addFirst(q.export()); - } - - // maintain size of exportedCompleteQueries - int toRemove = exportedCompleteQueries.size() - MAX_COMPLETE_QUERIES; - if (toRemove > 0) { - for (int i=0; i getExportedCompleteQueries() { - List ql = new ArrayList<>(); - synchronized (exportedCompleteQueries) { - ql.addAll(exportedCompleteQueries); - } - return ql; - } - - static public class SystemInfo { - public String server; - public QCServer.RuntimeInfo jvm; - public QCServer.SystemInfo system; - public QCServer.ThreadInfo threads; - } - private final Map sysInfoMap = new HashMap<>(); - public Collection getSystemInfo() { - List l = new ArrayList<>(); - synchronized (sysInfoMap) { - l.addAll(sysInfoMap.values()); - } - - Collections.sort(l, new Comparator() { - @Override - public int compare(SystemInfo o1, SystemInfo o2) { - return o1.server.compareTo(o2.server); - } - }); - return l; - } - - static public class PoolInfo { - public String server; - public QCServer.ObjectPool objPool; - public Collection connPoolList; - } - - private final Map poolInfoMap = new HashMap<>(); - public Collection getPoolInfo() { - List l = new ArrayList<>(); - synchronized (poolInfoMap) { - l.addAll(poolInfoMap.values()); - } - Collections.sort(l, new Comparator() { - @Override - public int compare(PoolInfo o1, PoolInfo o2) { - return o1.server.compareTo(o2.server); - } - }); - return l; - } - - private Map realtimeMessageReceivers = new HashMap<>(); - AtomicInteger realtimeMessageReceiversId = new AtomicInteger(0); - - public int subscribe(CMQCServerWebSocket ws) { - int id = realtimeMessageReceiversId.getAndAdd(1); - realtimeMessageReceivers.put(id, ws); - return id; - } - - public void unSubscribe(int id) { - realtimeMessageReceivers.remove(id); - } - - private final static long retentionTime = 200; - private final LinkedList evtQueue = new LinkedList<>(); - private void addRunningQueryEvent(RunningQueryEvent evt) { - synchronized (evtQueue) { - if (RQE_REMOVED.equals(evt.msgType)) { - Iterator it = evtQueue.iterator(); - while (it.hasNext()) { - RunningQueryEvent rqe = it.next(); - String cuqId = (rqe.cuqId != null)? rqe.cuqId:rqe.query.cuqId; - if (evt.cuqId.equals(cuqId)) { - it.remove(); - } - } - } - - evtQueue.addLast(evt); - } - } - public void notifySubscribers() { - long retainAfter = System.currentTimeMillis() - retentionTime; // send events older than 200ms - - LinkedList el = new LinkedList<>(); - synchronized (evtQueue) { - Iterator it = evtQueue.iterator(); - while (it.hasNext()) { - RunningQueryEvent evt = it.next(); - if (evt.timestamp < retainAfter) { - el.addLast(evt); - it.remove(); - } - } - } - - if (el.size()>0) { - Gson gson = new Gson(); - for (RunningQueryEvent evt: el) { - String msg = gson.toJson(evt); - for (CMQCServerWebSocket ws : realtimeMessageReceivers.values()) { - ws.sendMessage(msg); - } - } - el.clear(); - } - } + + private static final Logger LOG = LoggerFactory.getLogger(QCCluster.class); + + private int MAX_COMPLETE_QUERIES = 100; + + private String name; + private QCClusterOptions opts; + + private Map serverMap = new HashMap<>(); + + private Map runningMap = new HashMap<>(); + private ArrayDeque completeList = new ArrayDeque<>(); + + private Map objectPoolMap = new HashMap<>(); + private Map systemStatMap = new HashMap<>(); + private Map> connectionMap = new HashMap<>(); + + private EventManager eventManager; + + private Histogram histogram; + + private long partialUpdateInterval; + private long fullUpdateInterval; + + private Thread eventThread; + private Thread fullUpdateThread; + private Thread partialUpdateThread; + private Timer histogramTimer = new Timer(true); + + private boolean running = true; + + private Gson gson = new Gson(); + + public QCCluster(String name, QCClusterOptions opts) throws URISyntaxException, IOException { + + this.name = name; + this.opts = opts; + this.fullUpdateInterval = opts.getFullUpdateInterval(); + this.partialUpdateInterval = opts.getPartialUpdateInterval(); + + this.MAX_COMPLETE_QUERIES = opts.getMaxCompleteQueries(); + + for (String serverName : opts.getServers()) { + if (serverMap.containsKey(serverName)) { + throw new IOException("duplicate server name=" + serverName); + } else { + serverMap.put(serverName, new QCServer(this, serverName)); + } + } + + histogram = new Histogram( new SlidingTimeWindowReservoir(5L, TimeUnit.MINUTES) ); + + CheckMateServer.getMetrics().register("queries."+name, histogram); + + if (opts.isUseWebSocket()) { + eventManager = new EventManager(); + eventThread = new Thread() { + @Override + public void run() { + LOG.info("Cluster {} WebSocket Start", name); + + while (running) { + eventManager.send(); + try { + TimeUnit.MILLISECONDS.sleep(1000); + } catch (InterruptedException e) { + LOG.error("Cluster {} WebSocket interrupted", name); + break; + } + } + running = false; + LOG.info("Cluster {} WebSocket Stop", name); + } + }; + eventThread.start(); + } else { + this.partialUpdateInterval = Math.min(this.partialUpdateInterval, 3000); + this.fullUpdateInterval = Math.min(this.fullUpdateInterval, 5000); + } + } + + public String getName() { + return name; + } + + public QCServer getServer(String name) { + return serverMap.get(name); + } + + public QCClusterOptions getOptions() { + return opts; + } + + public void start() { + + for (QCServer server:serverMap.values()) { + server.start(); + } + + fullUpdateThread = new Thread() { + + @Override + public void run() { + LOG.info("Cluster {} Full Update API Start", name); + while (running) { + for (QCServer server:serverMap.values()) { + server.updateFull(); + } + try { + TimeUnit.MILLISECONDS.sleep(fullUpdateInterval); + } catch (InterruptedException e) { + break; + } + } + LOG.info("Cluster {} Full Update API Stop", name); + } + }; + fullUpdateThread.start(); + + partialUpdateThread = new Thread() { + + @Override + public void run() { + LOG.info("Cluster {} Partial Update API Start", name); + while (running) { + for (QCServer server:serverMap.values()) { + server.updatePartial(); + } + try { + TimeUnit.MILLISECONDS.sleep(partialUpdateInterval); + } catch (InterruptedException e) { + break; + } + } + LOG.info("Cluster {} Partial Update API Stop", name); + } + }; + partialUpdateThread.start(); + + histogramTimer.scheduleAtFixedRate(new TimerTask() { + + @Override + public void run() { + int count = 0; + for (QCServer server:serverMap.values()) { + count += server.getAndResetQueryCount(); + } + histogram.update(count); + } + }, 60*1000, 60*1000); + } + + public void close() { + running = false; + if (eventThread != null) { + eventThread.interrupt(); + } + if (fullUpdateThread != null) { + fullUpdateThread.interrupt(); + } + if (partialUpdateThread != null) { + partialUpdateThread.interrupt(); + } + histogramTimer.cancel(); + + for (QCServer server:serverMap.values()) { + server.stop(); + } + } + + public void putSystemStats(String server, QCApiServerSystemStats stat) { + systemStatMap.put(server, stat); + } + + public List getServerInfo() { + List list = new ArrayList<>(serverMap.size()); + for (QCServer server:serverMap.values()) { + list.add(new ClusterServerSysStats(server.getName(), systemStatMap.get(server.getName()))); + } + Collections.sort(list, new Comparator() { + @Override + public int compare(ClusterServerSysStats o1, ClusterServerSysStats o2) { + return o1.getServer().compareTo(o2.getServer()); + } + }); + return list; + } + + public void putObjectPools(String server, QCObjectPoolProfile info) { + objectPoolMap.put(server, info); + } + + public void putConnections(String server, List connections) { + connectionMap.put(server, connections); + } + + public List getPoolInfo() { + List list = new ArrayList<>(serverMap.size()); + for (QCServer server:serverMap.values()) { + list.add(new ClusterServerPoolInfo(server.getName(), + objectPoolMap.get(server.getName()), + connectionMap.get(server.getName()))); + } + Collections.sort(list, new Comparator() { + @Override + public int compare(ClusterServerPoolInfo o1, ClusterServerPoolInfo o2) { + return o1.getServer().compareTo(o2.getServer()); + } + }); + return list; + } + + public QueryProfile getRunningQuery(String clusterUniqueId) { + return runningMap.get(clusterUniqueId); + } + + public void addRunningQueries(List profileList) { + synchronized(runningMap) { + if (profileList != null && profileList.size()>0) { + String hostname = profileList.get(0).getServer(); + int port = profileList.get(0).getPort(); + Iterator> iter = runningMap.entrySet().iterator(); + while (iter.hasNext()) { + QueryProfile profile = iter.next().getValue(); + if (profile.getServer().equals(hostname) && profile.getPort() == port) { + iter.remove(); + } + } + for (QueryProfile profile:profileList) { + runningMap.put(profile.getCuqId(), profile); + } + } + } + } + + public List getRunningQueries() { + List list = new ArrayList<>(runningMap.values()); + Collections.sort(list, new Comparator() { + @Override + public int compare(QueryProfile o1, QueryProfile o2) { + return Long.compare(o2.getStartTime(), o1.getStartTime()); + } + }); + return list; + } + + public void addCompleteQueries(List profileList){ + synchronized(completeList) { + for (QueryProfile profile:profileList) { + if (completeList.size() > MAX_COMPLETE_QUERIES) { + completeList.removeFirst(); + } + runningMap.remove(profile.getCuqId()); + completeList.add(profile); + } + } + } + + public List getCompleteQueries() { + return new ArrayList<>(completeList); + } + + public void addSubscriber(long idx, CMQCWebSocket ws) { + if (eventManager != null) { + eventManager.addSubscriber(idx, ws); + } + } + + public void removeSubscriber(long idx) { + if (eventManager != null) { + eventManager.removeSubscriber(idx); + } + } + + public void processWebSocketEvent(List profileList, List changedList) { + synchronized(runningMap) { + for (QueryProfile profile:profileList) { + if (!runningMap.containsKey(profile.getCuqId())){ + runningMap.put(profile.getCuqId(), profile); + eventManager.add(profile); + } + } + for (ChangedProfile changed:changedList) { + QueryProfile profile = runningMap.get(changed.getCuqId()); + if (profile != null) { + profile.setState(changed.getState()); + profile.setRowCnt(changed.getRowCnt()); + profile.setEndTime(changed.getEndTime()); + if (changed.getEndTime()>0) { + runningMap.remove(changed.getCuqId()); + completeList.add(profile); + eventManager.add(profile); + } else { + eventManager.add(changed); + } + } + } + } + } + + private class EventManager { + + private Map eventSubscriberMap = new HashMap<>(); + private ArrayDeque profileQueueList = new ArrayDeque<>(); + private ArrayDeque changedQueueList = new ArrayDeque<>(); + private int subscriberCount; + private List profileList = new ArrayList<>(); + private List changedList = new ArrayList<>(); + private int count = 0; + private QueryProfile profile = null; + private ChangedProfile changed = null; + private CMQCWebSocket.Response resp = new CMQCWebSocket.Response(CMQCWebSocket.Response.DATA); + private String respMessage = null; + + public void add(QueryProfile profile) { + if (profileQueueList.size()+changedQueueList.size() < ConfigKeys.EVENT_QUEUE_MAX_LIMIT && running) { + profileQueueList.add(profile); + } + } + + public void add(ChangedProfile changed) { + if (profileQueueList.size()+changedQueueList.size() < ConfigKeys.EVENT_QUEUE_MAX_LIMIT && running) { + changedQueueList.add(changed); + } + } + + public void send() { + count = 0; + while (profileQueueList.size() > 0 && count++<1000) { + profile = profileQueueList.removeFirst(); + profileList.add(profile); + } + while (changedQueueList.size() > 0 && count++<1000) { + changed = changedQueueList.removeFirst(); + changedList.add(changed); + } + if (profileList.size() > 0 || changedList.size() > 0) { + synchronized(eventSubscriberMap) { + if (eventSubscriberMap.size() > 0) { + resp.setProfiles(profileList); + resp.setCprofiles(changedList); + respMessage = gson.toJson(resp); + + for (CMQCWebSocket ws: eventSubscriberMap.values()) { + try { + ws.send(respMessage); + } catch (Exception e) { + LOG.error("send event error={}, client={}", SQLUtil.getMessage(e), ws.getClient(), e); + } + } + } + } + profileList.clear(); + changedList.clear(); + } + } + + public void addSubscriber(long idx, CMQCWebSocket ws) { + synchronized (eventSubscriberMap) { + eventSubscriberMap.put(idx, ws); + subscriberCount = eventSubscriberMap.size(); + } + } + + public void removeSubscriber(long idx) { + synchronized (eventSubscriberMap) { + eventSubscriberMap.remove(idx); + subscriberCount = eventSubscriberMap.size(); + if (subscriberCount == 0) { + profileQueueList.clear(); + changedQueueList.clear(); + } + } + } + } } diff --git a/src/main/java/com/skplanet/checkmate/querycache/QCClusterManager.java b/src/main/java/com/skplanet/checkmate/querycache/QCClusterManager.java index 777bdd0..f084acb 100644 --- a/src/main/java/com/skplanet/checkmate/querycache/QCClusterManager.java +++ b/src/main/java/com/skplanet/checkmate/querycache/QCClusterManager.java @@ -1,193 +1,101 @@ package com.skplanet.checkmate.querycache; +import static com.skplanet.checkmate.ConfigKeys.*; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.HierarchicalINIConfiguration; import org.apache.commons.configuration.SubnodeConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.*; -import java.util.concurrent.atomic.AtomicBoolean; - /** * Created by nazgul33 on 15. 1. 27. */ public class QCClusterManager { - private static final Logger LOG = LoggerFactory.getLogger("querycache"); - private static QCClusterManager qcMgr = null; - - private Map clusterConfigMap = new HashMap<>(); - - private int fullUpdateInterval = 10 * 1000; - private int pingWebSocketsInterval = 5 * 1000; - - private void readConfiguration() { - String home = System.getenv("CM_HOME"); - if (home == null) { - home = "./"; - } - String conf = home + "/conf/querycache.ini"; - HierarchicalINIConfiguration clusterConfigFile = null; - String[] clusterNames; - QCCluster.Options options[]; - + + private static final Logger LOG = LoggerFactory.getLogger(QCClusterManager.class); + + private Map clusterMap = new HashMap<>(); + private Map optionMap = new HashMap<>(); + private AtomicLong eventSubscriberIdx = new AtomicLong(0); + + public QCClusterManager(File confDir) throws Exception { + + File iniFile = new File(confDir, QUERYCACHE_INI_FILE); + HierarchicalINIConfiguration ini = null; + try { - LOG.info("reading checkmate::querycache configuration from " + conf); - clusterConfigFile = new HierarchicalINIConfiguration(conf); - SubnodeConfiguration globalSection = clusterConfigFile.getSection(null); - globalSection.setThrowExceptionOnMissing(true); - clusterNames = globalSection.getStringArray("Clusters"); - fullUpdateInterval = globalSection.getInt("FullUpdateInterval"); - - for (String cn: clusterNames) { - SubnodeConfiguration sc = clusterConfigFile.getSection(cn); - QCCluster.Options opt = new QCCluster.Options(); - opt.webPort = sc.getInt("WebPort"); - opt.servers = sc.getStringArray("Servers"); - - if (clusterConfigFile.containsKey(cn)) { + LOG.info("reading checkmate::querycache configuration from " + iniFile); + ini = new HierarchicalINIConfiguration(iniFile); + + SubnodeConfiguration global = ini.getSection("global"); + global.setThrowExceptionOnMissing(true); + + long partialUpdateInterval = global.getLong(PARTIAL_UPDATE_INTERAVAL, PARTIAL_UPDATE_INTERVAL_DEFAULT); + long fullUpdateInterval = global.getLong(FULL_UPDATE_INTERVAL, FULL_UPDATE_INTERVAL_DEFAULT); + long updateFailSleepTime = global.getLong(UPDATE_FAIL_SLEEP_TIME, UPDATE_FAIL_SLEEP_TIME_DEFAULT); + String[] cnames = global.getStringArray(CLUSTER_NAMES); + for (String cname: cnames) { + + SubnodeConfiguration sc = ini.getSection(cname); + QCClusterOptions opts = new QCClusterOptions(); + opts.setWebPort(sc.getInt(CLUSTER_WEB_PORT, CLUSTER_WEB_PORT_DEFFAULT)); + opts.setServers(sc.getStringArray(CLUSTER_SERVERS)); + opts.setMaxCompleteQueries(sc.getInt(CLUSTER_MAX_COMPLETE_QUERIES, CLUSTER_MAX_COMPLETE_QUERIES_DEFAULT)); + opts.setUseWebSocket(sc.getBoolean(CLUSTER_USE_WEBSOCKET, CLUSTER_USER_WEBSOCKE_DEFAULT)); + opts.setFullUpdateInterval(fullUpdateInterval); + opts.setPartialUpdateInterval(partialUpdateInterval); + opts.setUpdateFailSleepTime(updateFailSleepTime); + + if (ini.containsKey(cname)) { throw new ConfigurationException("duplicate cluster definition"); } - clusterConfigMap.put(cn, opt); - } - } - catch (Exception e) { - LOG.error("error loading configuration.", e); - System.exit(1); - } - } - - public class QCClusterThread implements Runnable { - private class QCClusterFullUpdateTask extends TimerTask { - @Override - public void run() { - if ( !QCClusterThread.this.fullUpdateTaskRunning.compareAndSet(false, true) ) { - // update task is running. skip this time.... - LOG.warn("a full update task is running. skipping full update task"); - return; - } - - QCClusterManager.qcMgr.UpdateClusters(); - lastFullUpdate = new Date(); - QCClusterThread.this.fullUpdateTaskRunning.set(false); - } - } - - private class QCClusterWebSocketPingTask extends TimerTask { - @Override - public void run() { - QCClusterManager.qcMgr.pingWebSockets(); - } - } - - private class QCCounterUpdateTask extends TimerTask { - @Override - public void run() { - for (QCCluster cluster: clusters) { - cluster.updateCounters(); - } + optionMap.put(cname, opts); + + QCCluster cluster = new QCCluster(cname, optionMap.get(cname)); + clusterMap.put(cname, cluster); } + } catch (Exception e) { + throw new Exception("error loading configuration. "+iniFile, e); } - TimerTask qcTimerFull = new QCClusterFullUpdateTask(); - TimerTask qcTimerPingWebSockets = new QCClusterWebSocketPingTask(); - TimerTask qcCounterUpdateTask = new QCCounterUpdateTask(); - - AtomicBoolean fullUpdateTaskRunning = new AtomicBoolean(false); - Date lastFullUpdate = new Date(); - - @Override - public void run() { - Timer timerFull = new Timer(true); - Timer timerPingWebSockets = new Timer(true); - Timer timerCounterUpdateTask = new Timer(true); - - timerFull.scheduleAtFixedRate(qcTimerFull, 0, fullUpdateInterval); - timerPingWebSockets.scheduleAtFixedRate(qcTimerPingWebSockets, 0, pingWebSocketsInterval); - timerCounterUpdateTask.scheduleAtFixedRate(qcCounterUpdateTask, 60*1000, 60*1000); - while (!QCClusterManager.this.quitThread) { - try { - Thread.sleep(100); - } catch (Exception e) { - } - } - - timerFull.cancel(); - timerPingWebSockets.cancel(); - } - } - - public static QCClusterManager getInstance() { - if (qcMgr == null) { - qcMgr = new QCClusterManager(); - } - - return qcMgr; - } - - private List clusters = new ArrayList<>(); - private Thread qcClusterThread = new Thread(this.new QCClusterThread()); - private boolean quitThread = false; - - protected QCClusterManager() { - readConfiguration(); - - // initialize clusters; - for (Map.Entry entry: clusterConfigMap.entrySet()) { - QCCluster cluster = new QCCluster(entry.getKey(), entry.getValue()); - clusters.add(cluster); - } - - // initialize thread; - qcClusterThread = new Thread(this.new QCClusterThread()); } - public void startThread() { - qcClusterThread.start(); + public void start() { + for (QCCluster cluster:clusterMap.values()) { + cluster.start(); + } } // stop - public void finishThread() { - quitThread = true; - try { - qcClusterThread.join(); - } catch (Exception e) { - LOG.error("exception while waiting for thread " + e.toString()); - } + public void stop() { + for (QCCluster cluster:clusterMap.values()) { + cluster.close(); + } } - public void UpdateClusters() { - for (QCCluster cluster: clusters) { - cluster.Update(); - } + + public QCCluster getCluster(String name) { + return clusterMap.get(name); } - public QCCluster getCluster(String clusterName) { - for (QCCluster c: clusters) { - if (c.name.equals(clusterName)) { - return c; - } - } - return null; + public QCClusterOptions getClusterOptions(String name) { + return optionMap.get(name); } - - public Collection getClusterList() { - List cl = new ArrayList<>(); - for (QCCluster c: clusters) { - cl.add(c.name); - } - return cl; + + public List getClusterNameList() { + List list = new ArrayList<>(clusterMap.keySet()); + Collections.sort(list); + return list; } - - private void pingWebSockets() { - for (QCCluster cluster: clusters) { - for (QCServer server: cluster.getServers().values()) { - QCClientWebSocket ws = server.getRtWebSocket(); - if (ws == null) { - server.connectRealtimeWebSocket(); - } - else { - ws.ping(); - } - } - } + + public long getEventSubscriberIdx() { + return eventSubscriberIdx.incrementAndGet(); } } diff --git a/src/main/java/com/skplanet/checkmate/querycache/QCClusterOptions.java b/src/main/java/com/skplanet/checkmate/querycache/QCClusterOptions.java new file mode 100644 index 0000000..78b079f --- /dev/null +++ b/src/main/java/com/skplanet/checkmate/querycache/QCClusterOptions.java @@ -0,0 +1,69 @@ +package com.skplanet.checkmate.querycache; + +public class QCClusterOptions { + + private int webPort; + private long partialUpdateInterval; + private long fullUpdateInterval; + private int maxCompleteQueries; + private boolean useWebSocket; + private long updateFailSleepTime; + + public long getUpdateFailSleepTime() { + return updateFailSleepTime; + } + + public void setUpdateFailSleepTime(long updateFailSleepTime) { + this.updateFailSleepTime = updateFailSleepTime; + } + + private String[] servers; + + public int getWebPort() { + return webPort; + } + + public void setWebPort(int webPort) { + this.webPort = webPort; + } + + public long getPartialUpdateInterval() { + return partialUpdateInterval; + } + + public void setPartialUpdateInterval(long partialUpdateInterval) { + this.partialUpdateInterval = partialUpdateInterval; + } + + public long getFullUpdateInterval() { + return fullUpdateInterval; + } + + public void setFullUpdateInterval(long fullUpdateInterval) { + this.fullUpdateInterval = fullUpdateInterval; + } + + public String[] getServers() { + return servers; + } + + public void setServers(String[] servers) { + this.servers = servers; + } + + public int getMaxCompleteQueries() { + return maxCompleteQueries; + } + + public void setMaxCompleteQueries(int maxCompleteQueries) { + this.maxCompleteQueries = maxCompleteQueries; + } + + public boolean isUseWebSocket() { + return useWebSocket; + } + + public void setUseWebSocket(boolean useWebSocket) { + this.useWebSocket = useWebSocket; + } +} \ No newline at end of file diff --git a/src/main/java/com/skplanet/checkmate/querycache/QCQuery.java b/src/main/java/com/skplanet/checkmate/querycache/QCQuery.java deleted file mode 100644 index 3c9aa95..0000000 --- a/src/main/java/com/skplanet/checkmate/querycache/QCQuery.java +++ /dev/null @@ -1,169 +0,0 @@ -package com.skplanet.checkmate.querycache; - -import org.apache.commons.codec.binary.Base64; - -import java.util.Arrays; -import java.util.Date; - -/** - * Created by nazgul33 on 15. 1. 27. - */ -public class QCQuery { - private static final boolean DEBUG = false; - public static class QueryImport { - public String queryId; - public String connType; - public String user; - public String queryType; - public String queryStr; - public String clientIp; - public String stmtState; - public long rowCnt; - public long startTime; - public long endTime; - // 0:exec/1:getmeta/2:fetch/3:stmtclose or - // 1:getschemas/2:fetch - public long[] timeHistogram = {0,0,0,0}; - public long[] execProfile = null; - public long[] fetchProfile = {0,0,0,-1,-1,-1,-1,0,0,-1,-1}; - } - - public static class QueryExport { - public String cuqId; // cluster unique query id - public String server; - public String id; - public String backend; - public String user; - public String type; - public String statement; - public String client; - public String state; - public long rowCnt = -1; - public long startTime; - public long endTime = 0; - // 0:exec/1:getmeta/2:fetch/3:stmtclose or - // 1:getschemas/2:fetch - public long[] timeHistogram; - public long[] execProfile; - public long[] fetchProfile; - public String cancelUrl; - } - - private QCServer server; - - public final String cuqId; - public final String id; - public final String backend; - public final String user; - public String type; - public final String statement; - public final String client; - public String state; - public long rowCnt = -1; - public final long startTime; - public long endTime = 0; - // 0:exec/1:getmeta/2:fetch/3:stmtclose or - // 1:getschemas/2:fetch - public long[] timeHistogram; - public long[] execProfile; - public long[] fetchProfile; - - public long updateTime = 0; - - public QCQuery(QCServer server, QueryImport qObj) { - this.server = server; - this.id = qObj.queryId; - this.backend = qObj.connType; - this.user = qObj.user; - this.type = qObj.queryType; - this.statement = qObj.queryStr; - this.client = qObj.clientIp; - this.state = qObj.stmtState; - this.rowCnt = qObj.rowCnt; - this.startTime = qObj.startTime; - this.endTime = qObj.endTime; - this.timeHistogram = qObj.timeHistogram; - this.execProfile = qObj.execProfile; - this.fetchProfile = qObj.fetchProfile; - - this.updateTime = new Date().getTime(); - - this.cuqId = getCuqId(server, qObj); - } - - public boolean Update(QCQuery qObj) { - boolean updated = false; - if ( !this.type.equals(qObj.type) ) { - if (DEBUG) System.console().printf("query %s update type %s -> %s\n", id, type, qObj.type); - this.type = qObj.type; - updated = true; - } - if ( !this.state.equals(qObj.state) ) { - if (DEBUG) System.console().printf("query %s update state %s -> %s\n", id, state, qObj.state); - this.state = qObj.state; - updated = true; - } - if ( this.rowCnt != qObj.rowCnt ) { - if (DEBUG) System.console().printf("query %s update rowCnt %s -> %s\n", id, rowCnt, qObj.rowCnt); - this.rowCnt = qObj.rowCnt; - updated = true; - } - if ( this.endTime != qObj.endTime ) { - if (DEBUG) System.console().printf("query %s update rowCnt %s -> %s\n", id, endTime, qObj.endTime); - this.endTime = qObj.endTime; - updated = true; - } - -// if ( !Arrays.equals(this.timeHistogram, qObj.timeHistogram) ) { -// this.timeHistogram = qObj.timeHistogram; -// updated = true; -// } -// if ( !Arrays.equals(this.execProfile, qObj.execProfile) ) { -// this.execProfile = qObj.execProfile; -// updated = true; -// } -// if ( !Arrays.equals(this.fetchProfile, qObj.fetchProfile) ) { -// this.fetchProfile = qObj.fetchProfile; -// updated = true; -// } - - if (updated) { - this.updateTime = new Date().getTime(); - } - return updated; - } - - public static String getCuqId(QCServer server, QCQuery.QueryImport qi) { - String cuqid = new String(Base64.encodeBase64((server.name+qi.connType+qi.queryId).trim().toUpperCase().getBytes())); - return cuqid.replace('+',':').replace('/','.').replace('=','-'); - } - - public QueryExport export() { - QueryExport eq = new QueryExport(); - eq.cuqId = cuqId; - // remove characters not supported by html4 standard for id field. - - eq.server = server.name; - eq.id = id; - eq.backend = backend; - eq.user = user; - eq.type = type; - eq.statement = statement; - eq.client = client; - eq.state = state; - eq.rowCnt = rowCnt; - eq.startTime = startTime; - eq.endTime = endTime; - eq.timeHistogram = new long[timeHistogram.length]; - System.arraycopy(eq.timeHistogram, 0, timeHistogram, 0, timeHistogram.length); - if (execProfile != null) { - eq.execProfile = new long[execProfile.length]; - System.arraycopy(eq.execProfile, 0, execProfile, 0, execProfile.length); - } - eq.fetchProfile = new long[fetchProfile.length]; - System.arraycopy(eq.fetchProfile, 0, fetchProfile, 0, fetchProfile.length); - // eq.cancelUrl = "http://" + server.name + ":" + server.cluster.getOpt().webPort + "/api/cancelQuery?id="+id+"&driver="+backend; - eq.cancelUrl = "/api/qc/cancelQuery?cluster="+server.cluster.name+"&cuqId="+eq.cuqId; - return eq; - } -} diff --git a/src/main/java/com/skplanet/checkmate/querycache/QCQueryMail.java b/src/main/java/com/skplanet/checkmate/querycache/QCQueryMail.java index 6beae7f..39db7bf 100644 --- a/src/main/java/com/skplanet/checkmate/querycache/QCQueryMail.java +++ b/src/main/java/com/skplanet/checkmate/querycache/QCQueryMail.java @@ -1,52 +1,34 @@ package com.skplanet.checkmate.querycache; -import com.skplanet.checkmate.utils.MailSender; -import org.apache.commons.lang.time.DateUtils; - +import java.text.SimpleDateFormat; import java.util.Date; +import com.skplanet.checkmate.utils.MailSender; +import com.skplanet.querycache.server.cli.QueryProfile; + /** * Created by nazgul33 on 4/8/16. */ public class QCQueryMail extends MailSender.Mail { - public QCQueryMail(QCQuery.QueryExport query, String subject, String content) { - super(subject, content); - setContent( content + "\n\n" + queryToMailContent(query) ); - } + + public QCQueryMail(QueryProfile query, String subject, String content) { + super(subject); + setContent(content + "\n\n" + queryToMailContent(query) ); + } - static String queryToMailContent(QCQuery.QueryExport query) { - StringBuilder sb = new StringBuilder(); - /* - public String cuqId; // cluster unique query id - public String server; - public String id; - public String backend; - public String user; - public String type; - public String statement; - public String client; - public String state; - public long rowCnt = -1; - public long startTime; - public long endTime = 0; - // 0:exec/1:getmeta/2:fetch/3:stmtclose or - // 1:getschemas/2:fetch - public long[] timeHistogram; - public long[] execProfile; - public long[] fetchProfile; - public String cancelUrl; - */ - sb.append("Server : ").append(query.server).append('\n'); - sb.append("QueryId : ").append(query.id).append('\n'); - sb.append("Backend : ").append(query.backend).append('\n'); - sb.append("User : ").append(query.user).append('\n'); - sb.append("State : ").append(query.state).append('\n'); - sb.append("Fetched Rows : ").append(query.rowCnt).append('\n'); - sb.append("StartTime : ").append( new Date(query.startTime) ).append('\n'); - sb.append("Now : ").append( new Date() ).append('\n'); + public static String queryToMailContent(QueryProfile query) { - sb.append("SQL : ").append(query.statement).append('\n'); + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + StringBuilder sb = new StringBuilder(); + sb.append("Server : ").append(query.getServer()).append('\n'); + sb.append("QueryId : ").append(query.getQueryId()).append('\n'); + sb.append("User : ").append(query.getUserId()).append('\n'); + sb.append("State : ").append(query.getState()).append('\n'); + sb.append("Fetched Rows : ").append(query.getRowCnt()).append('\n'); + sb.append("StartTime : ").append( sdf.format(new Date(query.getStartTime())) ).append('\n'); + sb.append("Now : ").append( sdf.format(new Date()) ).append('\n'); + sb.append("SQL : ").append(query.getQuery()).append('\n'); - return sb.toString(); - } + return sb.toString(); + } } diff --git a/src/main/java/com/skplanet/checkmate/querycache/QCServer.java b/src/main/java/com/skplanet/checkmate/querycache/QCServer.java index 9136a28..40f6384 100644 --- a/src/main/java/com/skplanet/checkmate/querycache/QCServer.java +++ b/src/main/java/com/skplanet/checkmate/querycache/QCServer.java @@ -1,436 +1,201 @@ package com.skplanet.checkmate.querycache; -import com.codahale.metrics.Histogram; -import com.codahale.metrics.Meter; -import com.codahale.metrics.SlidingTimeWindowReservoir; -import com.google.gson.Gson; -import com.google.gson.reflect.TypeToken; -import com.skplanet.checkmate.CheckMateServer; -import com.skplanet.checkmate.utils.HttpUtil; +import static java.lang.String.format; +import java.lang.reflect.Type; import java.net.ConnectException; -import java.net.URI; import java.net.URISyntaxException; -import java.util.*; +import java.util.ArrayList; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import org.eclipse.jetty.websocket.client.ClientUpgradeRequest; -import org.eclipse.jetty.websocket.client.WebSocketClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.codahale.metrics.Histogram; +import com.codahale.metrics.SlidingTimeWindowReservoir; +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; +import com.skplanet.checkmate.CheckMateServer; +import com.skplanet.checkmate.utils.HttpUtil; +import com.skplanet.querycache.server.cli.ObjectPool.QCObjectPoolProfile; +import com.skplanet.querycache.servlet.QCApiConDesc; +import com.skplanet.querycache.servlet.QCApiServerQueries; +import com.skplanet.querycache.servlet.QCApiServerSystemStats; + /** * Created by nazgul33 on 15. 1. 27. */ public class QCServer { - private static final boolean DEBUG = false; - private static final Logger LOG = LoggerFactory.getLogger("querycache"); - private static final int MAX_COMPLETE_QUERIES = 100; - private final Histogram histogram; - - // inner-classes below are for gson conversion from json object. - // NOTICE! - // sync between QCWebApiServlet - public static class ConDesc { - public String driver; - public int free; - public int using; - } - - public static class RuntimeInfo { - public int nProcessors; - public long memFree; - public long memTotal; - public long memMax; - } + + private static final Logger LOG = LoggerFactory.getLogger(QCServer.class); + + private Histogram histogram; - public static class ThreadInfo { - public int webServerThreads; - public int webServerThreadsIdle; - public int handlerThreads; - public int handlerThreadsIdle; - public int totalThreads; - } + private String name; + private QCCluster cluster; + private QCClusterOptions opts; + private String wsUrl; - public static class SystemInfo { - public double loadSystem; - public double loadProcess; - public long memPhysFree; - public long memPhysTotal; - public long swapFree; - public long swapTotal; - } + private long lastExceptionTime; - public static class ObjectPool { - // configuration - public long sReloadCycle = 0; - public int sMaxPoolSize = 0; - public float sReloadingThreshold = 0; - public int sCellCoeff = 0; - public int[] poolSize = {0, 0, 0, 0}; - } + private QCWebSocketClient wsClient; - public static class Queries { - public List runningQueries = new ArrayList<>(); - public List completeQueries = new ArrayList<>(); - } - - public static class SysInfoCollection { - public RuntimeInfo jvm; - public SystemInfo system; - public ThreadInfo threads; - } - - public final String name; - private Map queriesRunning; - private Map queriesComplete; - public final QCCluster cluster; - - private List connections = null; - private ObjectPool objectPool = null; - private SysInfoCollection sysInfoCollection = null; - - private long lastQueryUpdate = 0; + private AtomicInteger queryCount = new AtomicInteger(0); - private boolean online; - private String lastException; + private String queriesUrl; + private String systemUrl; + private String connectionUrl; + private String objectpoolUrl; - private AtomicInteger queryCount = new AtomicInteger(0); + private Gson gson = new Gson(); + private Type connectionType = new TypeToken>(){}.getType(); - QCServer(String name, QCCluster cluster) { - this.name = name; - queriesRunning = new HashMap<>(); - queriesComplete = new LinkedHashMap() { - protected boolean removeEldestEntry(Map.Entry eldest) { - return size() > MAX_COMPLETE_QUERIES; - } - }; - online = false; + public QCServer(QCCluster cluster, String name) throws URISyntaxException { + this.cluster = cluster; + this.opts = cluster.getOptions(); + this.name = name; - connectRealtimeWebSocket(); + int webPort = cluster.getOptions().getWebPort(); + + this.wsUrl = format("ws://%s:%s/api/websocket", name, webPort); + this.queriesUrl = format("http://%s:%s/api/queries", name, webPort); + this.systemUrl = format("http://%s:%s/api/system", name, webPort); + this.connectionUrl = format("http://%s:%s/api/connections", name, webPort); + this.objectpoolUrl = format("http://%s:%s/api/objectpool", name, webPort); - StringBuffer sb = new StringBuffer(); - sb.append("queries.").append(cluster.name).append('.').append(this.name); - histogram = new Histogram( new SlidingTimeWindowReservoir(5L, TimeUnit.MINUTES) ); - CheckMateServer.getMetrics().register(sb.toString(), histogram); - } + if (cluster.getOptions().isUseWebSocket()) { + this.wsClient = new QCWebSocketClient(this, wsUrl); + } - public int updateQueryCount() { - int count = queryCount.getAndSet(0); - histogram.update(count); - return count; + String metricName = format("queries.%s.%s",cluster.getName(),name); + histogram = new Histogram( new SlidingTimeWindowReservoir(5L, TimeUnit.MINUTES) ); + CheckMateServer.getMetrics().register(metricName, histogram); } - - public boolean isOnline() { - return online; + + public String getName() { + return name; } - - public void setOnline(boolean online) { - this.online = online; + + public QCCluster getCluster() { + return cluster; } - public String getLastException() { - return lastException; + public void start() { + if (wsClient != null) { + wsClient.start(); + } } - - public void setLastException(Exception e) { - this.lastException = e.getMessage(); + + public void stop() { + if (wsClient != null) { + wsClient.stop(); + } } - - public Collection getRunningQueries() { - Collection ql; - synchronized (queriesRunning) { - ql = queriesRunning.values(); - } - return ql; - } - - public Collection getCompleteQueries() { - Collection ql; - synchronized (queriesComplete) { - ql = queriesComplete.values(); - } - return ql; - } - - public void Update() { - // update routines below will set offline when update is failed. - this.setOnline(true); - - UpdateQueries(); - lastQueryUpdate = new Date().getTime(); - UpdateConnections(); - UpdateObjectPools(); - UpdateSystem(); + + public int getAndResetQueryCount() { + int count = queryCount.getAndSet(0); + histogram.update(count); + return count; } - protected String getUrl(String path) { - return "http://" + this.name + ":" + cluster.getOpt().webPort + path; + private void setLastException() { + this.lastExceptionTime = System.currentTimeMillis(); } - protected void processAddQueryEvent(QCQuery q) { - QCQuery qExisting; - synchronized (queriesRunning) { - qExisting = queriesRunning.get(q.cuqId); - if (qExisting != null) { - if( !qExisting.Update(q) ) { - // not updated : do nothing - return; - } - } else { - this.queriesRunning.put(q.cuqId, q); - queryCount.incrementAndGet(); - } - } - this.cluster.addExportedRunningQuery(q); + private boolean isCheckable() { + return lastExceptionTime == 0 || + lastExceptionTime + opts.getUpdateFailSleepTime() < System.currentTimeMillis() ; } - - protected void processRemoveQueryEvent(QCQuery q) { - synchronized (queriesRunning) { - this.queriesRunning.remove(q.cuqId); - } - this.cluster.removeExportedRunningQuery(q); + + public void updatePartial() { + updateQueries(); } - - protected void UpdateQueries() { - HttpUtil httpUtil = new HttpUtil(); - StringBuffer buf = new StringBuffer(); - int res = 0; - - try { - res = httpUtil.get( getUrl("/api/queries"), buf); - if (res == 200) { - Gson gSon = new Gson(); - Queries queries = gSon.fromJson(buf.toString(), Queries.class); - synchronized (queriesRunning) { - int added = 0; - int removed = 0; - int updated = 0; - List rqList = new ArrayList<>(); - Map rqMap = new HashMap<>(); - // process "current" running query list - for (QCQuery.QueryImport qi : queries.runningQueries) { - // add to map for reverse existence check below. - QCQuery q = new QCQuery(this, qi); - rqList.add(q); - rqMap.put(q.cuqId, q); - } - - // cross check queries in my list to remove "disappeared" queries. - Iterator> it = queriesRunning.entrySet().iterator(); - while (it.hasNext()) { - Map.Entry entry = it.next(); - if (!rqMap.containsKey(entry.getKey())) { - this.cluster.removeExportedRunningQuery(entry.getValue()); - it.remove(); - removed++; - } - } - - for (QCQuery q : rqList) { - QCQuery qExisting = queriesRunning.get(q.cuqId); - if (qExisting != null) { - if ( qExisting.Update(q) ) { - updated++; - this.cluster.addExportedRunningQuery(qExisting); - } - } else { - this.queriesRunning.put(q.cuqId, q); - added++; - this.cluster.addExportedRunningQuery(q); - queryCount.incrementAndGet(); - } - } - - if (DEBUG) { - LOG.debug(this.name + ": running queries +" + added + " #" + updated + " -" + removed); - } - } - - synchronized (queriesComplete) { - int added = 0; - - // process "current" complete query list - // sort key endTime, ascending - Collections.sort(queries.completeQueries, new Comparator() { - @Override - public int compare(QCQuery.QueryImport o1, QCQuery.QueryImport o2) { - return (o2.endTime > o1.endTime) ? 1 : (o2.endTime == o1.endTime) ? 0 : -1; - } - }); - - for (QCQuery.QueryImport qi : queries.completeQueries) { - String cuqId = QCQuery.getCuqId(this, qi); - // complete query has no changes. process new complete queries only. - if (!this.queriesComplete.containsKey(cuqId)) { - QCQuery q = new QCQuery(this, qi); - this.queriesComplete.put(cuqId, q); - cluster.queueCompleteQuery(q); - added++; - } - } - - if (DEBUG) { - LOG.debug("{} : cq size = {}(+{})", - this.name, this.queriesComplete.size(), added); - } - } - } - } catch (Exception e) { - if (e instanceof ConnectException) { - LOG.error(this.name + ": updating queries: connection refused."); - this.setOnline(false); - this.setLastException(e); - } - else { - LOG.error(this.name + ": updating queries: ", e); - this.setLastException(e); - } - } + + public void updateFull() { + updateConnections(); + updateObjectPools(); + updateSystem(); } - protected void UpdateConnections() { - HttpUtil httpUtil = new HttpUtil(); - StringBuffer buf = new StringBuffer(); - int res = 0; + private void updateQueries() { + if (!isCheckable()) { + return; + } try { - res = httpUtil.get( getUrl("/api/connections"), buf); - if (res == 200) { - Gson gSon = new Gson(); - connections = gSon.fromJson(buf.toString(), new TypeToken>() {}.getType()); - if (DEBUG) { - for (ConDesc con: connections) { - LOG.debug(this.name + " con " + con.driver + " free " + con.free + " using " + con.using); - } - } - } + String content = HttpUtil.get(queriesUrl); + QCApiServerQueries queries = gson.fromJson(content, QCApiServerQueries.class); + cluster.addRunningQueries(queries.getRunningQueries()); + cluster.addCompleteQueries(queries.getCompleteQueries()); + } catch (ConnectException e) { + LOG.error("{}.{}: updating queries : {} {}", + cluster.getName(), name, queriesUrl, e.getMessage()); + setLastException(); } catch (Exception e) { - if (e instanceof ConnectException) { - LOG.error(this.name + ": updating connections: connection refused."); - this.setOnline(false); - this.setLastException(e); - } - else { - LOG.error(this.name + ": updating connections,", e); - this.setLastException(e); - } + LOG.error("{}.{}: updating queries : {} {}", + cluster.getName(), name, queriesUrl, e.getMessage(), e); + setLastException(); } } - - protected void UpdateObjectPools() { - HttpUtil httpUtil = new HttpUtil(); - StringBuffer buf = new StringBuffer(); - int res = 0; + + private void updateConnections() { + if (!isCheckable()) { + return; + } try { - res = httpUtil.get( getUrl("/api/objectpool"), buf); - if (res == 200) { - Gson gSon = new Gson(); - objectPool = gSon.fromJson(buf.toString(), ObjectPool.class); - if (DEBUG) { - LOG.debug(this.name + " pool " + objectPool.poolSize[0] + "," + - objectPool.poolSize[1] + "," + - objectPool.poolSize[2] + "," + - objectPool.poolSize[3]); - } - } + String content = HttpUtil.get(connectionUrl); + cluster.putConnections(name, gson.fromJson(content, connectionType)); + } catch (ConnectException e) { + cluster.putConnections(name, null); + LOG.error("{}.{}: updating connections : {} {}", + cluster.getName(), name, connectionUrl, e.getMessage()); + setLastException(); } catch (Exception e) { - if (e instanceof ConnectException) { - LOG.error(this.name + ": updating pools: connection refused."); - this.setOnline(false); - this.setLastException(e); - } - else { - LOG.error(this.name + ": updating pools,", e); - this.setLastException(e); - } + cluster.putConnections(name, null); + LOG.error("{}.{}: updating connections : {} {}", + cluster.getName(), name, connectionUrl, e.getMessage(), e); + setLastException(); } } - protected void UpdateSystem() { - HttpUtil httpUtil = new HttpUtil(); - StringBuffer buf = new StringBuffer(); - int res = 0; + private void updateObjectPools() { + if (!isCheckable()) { + return; + } try { - res = httpUtil.get( getUrl("/api/system"), buf); - if (res == 200) { - Gson gSon = new Gson(); - sysInfoCollection = gSon.fromJson(buf.toString(), SysInfoCollection.class); - if (DEBUG) { - LOG.debug(this.name + " total threads " + sysInfoCollection.threads.totalThreads); - LOG.debug(this.name + " runtime mem used " + (sysInfoCollection.jvm.memTotal - sysInfoCollection.jvm.memFree)); - } - } + String content = HttpUtil.get(objectpoolUrl); + cluster.putObjectPools(name, gson.fromJson(content, QCObjectPoolProfile.class)); + } catch (ConnectException e) { + cluster.putObjectPools(name, null); + LOG.error("{}.{}: updating object pools : {} {}", + cluster.getName(), name, objectpoolUrl, e.getMessage()); + setLastException(); } catch (Exception e) { - if (e instanceof ConnectException) { - LOG.error(this.name + ": updating system stats: connection refused."); - this.setOnline(false); - this.setLastException(e); - } - else { - LOG.error(this.name + ": updating system stats,", e); - this.setLastException(e); - } - } - } - - public Collection exportRunningQueries() { - List ql; - synchronized (queriesRunning) { - ql = new ArrayList<>(queriesRunning.size()); - for(QCQuery q: queriesRunning.values()) { - ql.add( q.export() ); - } - } - return ql; - } - - public Collection exportCompleteQueries() { - List ql; - synchronized (queriesComplete) { - ql = new ArrayList<>(queriesComplete.size()); - for(QCQuery q: queriesComplete.values()) { - ql.add( q.export() ); - } + cluster.putObjectPools(name, null); + LOG.error("{}.{}: updating object pools : {} {}", + cluster.getName(), name, objectpoolUrl, e.getMessage(), e); + setLastException(); } - return ql; - } - - public SysInfoCollection getSysInfoCollection() { - return sysInfoCollection; - } - - public ObjectPool getObjectPool() { - return objectPool; - } - - public List getConnections() { - return connections; } - private QCClientWebSocket rtWebSocket = null; - public QCClientWebSocket getRtWebSocket() { - return rtWebSocket; - } - public void removeRtWebSocket() { - rtWebSocket = null; - } - public void connectRealtimeWebSocket() { - String uri = "ws://" + this.name + ":" + cluster.getOpt().webPort + "/api/websocket"; - WebSocketClient c = new WebSocketClient(); - QCClientWebSocket s = new QCClientWebSocket(this); - rtWebSocket = null; + private void updateSystem() { + if (!isCheckable()) { + return; + } try { - c.start(); - URI rtUri = new URI(uri); - ClientUpgradeRequest req = new ClientUpgradeRequest(); - c.connect(s, rtUri, req); - rtWebSocket = s; - } catch (URISyntaxException e) { - LOG.error("invalid uri " + uri, e); + String content = HttpUtil.get(systemUrl); + cluster.putSystemStats(name, gson.fromJson(content, QCApiServerSystemStats.class)); + } catch (ConnectException e) { + cluster.putSystemStats(name, null); + LOG.error("{}.{}: updating system stats : {} {}", + cluster.getName(), name, systemUrl, e.getMessage()); + setLastException(); } catch (Exception e) { - LOG.error("websocket client error", e); + cluster.putSystemStats(name, null); + LOG.error("{}.{}: updating system stats : {} {}", + cluster.getName(), name, systemUrl, e.getMessage(), e); + setLastException(); } } } diff --git a/src/main/java/com/skplanet/checkmate/querycache/QCWebSocketClient.java b/src/main/java/com/skplanet/checkmate/querycache/QCWebSocketClient.java new file mode 100644 index 0000000..3bcd467 --- /dev/null +++ b/src/main/java/com/skplanet/checkmate/querycache/QCWebSocketClient.java @@ -0,0 +1,161 @@ +package com.skplanet.checkmate.querycache; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.TimeUnit; + +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.WebSocketListener; +import org.eclipse.jetty.websocket.client.ClientUpgradeRequest; +import org.eclipse.jetty.websocket.client.WebSocketClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.gson.Gson; +import com.skplanet.querycache.servlet.QCWebSocket; + +/** + * Created by nazgul33 on 15. 2. 23. + */ +public class QCWebSocketClient implements WebSocketListener { + + private static final Logger LOG = LoggerFactory.getLogger(QCWebSocketClient.class); + + private static final long MAX_ERROR_COUNT = 10; + + private WebSocketClient client; + private Session session; + private long errorCount = 0; + + private Timer timer = new Timer(true); + + private QCCluster listener; + private String cluster; + private String wsUrl; + private URI uri; + + private String pingMessage; + + private Gson gson = new Gson(); + + public QCWebSocketClient(QCServer server, String wsUrl) throws URISyntaxException { + + this.cluster = server.getCluster().getName(); + this.listener = server.getCluster(); + this.wsUrl = wsUrl; + this.uri = new URI(wsUrl); + + client = new WebSocketClient(); + client.setDaemon(true); + + pingMessage = gson.toJson(new QCWebSocket.Request(QCWebSocket.PING)); + } + + public void start() { + timer.schedule(new TimerTask() { + @Override + public void run() { + try { + if (session == null || !session.isOpen()) { + if (!client.isStarted()) { + client.start(); + } + client.connect(QCWebSocketClient.this, uri, new ClientUpgradeRequest()); + errorCount = 0; + } + } catch (Exception e) { + errorCount++; + LOG.error("ws error {} {} {}", cluster, wsUrl, e.getMessage()); + } + if (errorCount > MAX_ERROR_COUNT) { + try { + TimeUnit.MINUTES.sleep(5); + } catch (InterruptedException e) { + } + } + } + }, 0, 5*1000); + timer.schedule(new TimerTask() { + @Override + public void run() { + sendMessage(pingMessage); + } + }, 0, 5*1000); + } + + public void stop() { + timer.cancel(); + try { + if (session != null && session.isOpen()) { + session.close(); + } + } catch (Exception e) { + LOG.error("ws session error {} {} {}", cluster, wsUrl, e.getMessage()); + } + try { + if (client != null) { + client.stop(); + } + } catch (Exception e) { + LOG.error("ws client stop failed. {} {}", cluster, wsUrl, e); + } + } + + @Override + public void onWebSocketBinary(byte[] payload, int offset, int len) { + /* not interested */ + } + + @Override + public void onWebSocketClose(int statusCode, String reason) { + try { + session.disconnect(); + } catch (Exception e) { + LOG.error("ws close statusCode={}, reason={}, cluster={}, url={}", + statusCode, reason, cluster, wsUrl); + } + session = null; + } + + @Override + public void onWebSocketConnect(Session session) { + this.session = session; + } + + @Override + public void onWebSocketError(Throwable e) { + LOG.error("ws error={}, cluster={}, url={}", e.getMessage(), cluster, wsUrl); + } + + @Override + public void onWebSocketText(String message) { + + LOG.info(message); + try { + QCWebSocket.Response resp = gson.fromJson(message, QCWebSocket.Response.class); + switch(resp.getType()) { + case QCWebSocket.OK: + listener.processWebSocketEvent(resp.getProfiles(), resp.getCprofiles()); + break; + case QCWebSocket.PONG: + break; + case QCWebSocket.FAIL: + case QCWebSocket.ERROR : + LOG.error(resp.getMsg()); + break; + default: + LOG.error("unsupported response type={}, cluster={}, url={}", resp.getType(), cluster, wsUrl); + } + } catch (Exception e){ + LOG.error("ws error={}, cluster={}, url={}", e.getMessage(), cluster, wsUrl); + } + } + + private void sendMessage(String message) { + if (session != null && session.isOpen()) { + session.getRemote().sendString(message, null); + } + } +} diff --git a/src/main/java/com/skplanet/checkmate/querycache/data/ClusterServerPoolInfo.java b/src/main/java/com/skplanet/checkmate/querycache/data/ClusterServerPoolInfo.java new file mode 100644 index 0000000..028a595 --- /dev/null +++ b/src/main/java/com/skplanet/checkmate/querycache/data/ClusterServerPoolInfo.java @@ -0,0 +1,41 @@ +package com.skplanet.checkmate.querycache.data; + +import java.util.Collections; +import java.util.List; + +import com.skplanet.querycache.server.cli.ObjectPool.QCObjectPoolProfile; +import com.skplanet.querycache.servlet.QCApiConDesc; + +public class ClusterServerPoolInfo { + private String server; + private QCObjectPoolProfile objPool; + private List connPoolList; + + public ClusterServerPoolInfo(){} + + @SuppressWarnings("unchecked") + public ClusterServerPoolInfo(String server, QCObjectPoolProfile objPool, List connPoolList) { + this.server = server; + this.objPool = (objPool!=null?objPool:new QCObjectPoolProfile()); + this.connPoolList = (connPoolList!=null?connPoolList:Collections.EMPTY_LIST); + } + + public String getServer() { + return server; + } + public void setServer(String server) { + this.server = server; + } + public QCObjectPoolProfile getObjPool() { + return objPool; + } + public void setObjPool(QCObjectPoolProfile objPool) { + this.objPool = objPool; + } + public List getConnPoolList() { + return connPoolList; + } + public void setConnPoolList(List connPoolList) { + this.connPoolList = connPoolList; + } +} \ No newline at end of file diff --git a/src/main/java/com/skplanet/checkmate/querycache/data/ClusterServerSysStats.java b/src/main/java/com/skplanet/checkmate/querycache/data/ClusterServerSysStats.java new file mode 100644 index 0000000..615f0b2 --- /dev/null +++ b/src/main/java/com/skplanet/checkmate/querycache/data/ClusterServerSysStats.java @@ -0,0 +1,47 @@ +package com.skplanet.checkmate.querycache.data; + +import com.skplanet.querycache.servlet.QCApiJVMRuntimeInfo; +import com.skplanet.querycache.servlet.QCApiSystemInfo; +import com.skplanet.querycache.servlet.QCApiThreadInfo; +import com.skplanet.querycache.servlet.QCApiServerSystemStats; + +public class ClusterServerSysStats { + private String server; + private QCApiJVMRuntimeInfo jvm; + private QCApiSystemInfo system; + private QCApiThreadInfo threads; + + public ClusterServerSysStats(){} + + public ClusterServerSysStats(String server, QCApiServerSystemStats svrStats) { + this.server = server; + this.jvm = (svrStats!=null?svrStats.getJvm():new QCApiJVMRuntimeInfo()); + this.system = (svrStats!=null?svrStats.getSystem():new QCApiSystemInfo()); + this.threads = (svrStats!=null?svrStats.getThreads():new QCApiThreadInfo()); + } + + public String getServer() { + return server; + } + public void setServer(String server) { + this.server = server; + } + public QCApiJVMRuntimeInfo getJvm() { + return jvm; + } + public void setJvm(QCApiJVMRuntimeInfo jvm) { + this.jvm = jvm; + } + public QCApiSystemInfo getSystem() { + return system; + } + public void setSystem(QCApiSystemInfo system) { + this.system = system; + } + public QCApiThreadInfo getThreads() { + return threads; + } + public void setThreads(QCApiThreadInfo threads) { + this.threads = threads; + } +} \ No newline at end of file diff --git a/src/main/java/com/skplanet/checkmate/servlet/CMApiServletQC.java b/src/main/java/com/skplanet/checkmate/servlet/CMApiServletQC.java deleted file mode 100644 index 2c6e546..0000000 --- a/src/main/java/com/skplanet/checkmate/servlet/CMApiServletQC.java +++ /dev/null @@ -1,200 +0,0 @@ -package com.skplanet.checkmate.servlet; - -import com.google.gson.Gson; -import com.google.gson.reflect.TypeToken; -import com.skplanet.checkmate.querycache.QCCluster; -import com.skplanet.checkmate.querycache.QCClusterManager; -import com.skplanet.checkmate.querycache.QCQuery; -import com.skplanet.checkmate.utils.HttpUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.servlet.AsyncContext; -import javax.servlet.ServletException; -import javax.servlet.http.HttpServlet; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; -import java.io.IOException; -import java.io.PrintWriter; -import java.lang.reflect.Type; -import java.util.Collection; -import java.util.Date; - -/** - * Created by nazgul33 on 15. 1. 29. - */ -public class CMApiServletQC extends HttpServlet { - private static final boolean DEBUG = false; - private static final Logger LOG = LoggerFactory.getLogger("api"); - - private static final String ASYNC_REQ_ATTR = CMApiServletQC.class.getName() + ".async"; - private static final String ASYNC_RETURN_ATTR = CMApiServletQC.class.getName() + ".async.return"; - protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { - response.setContentType("application/json; charset=utf-8"); - - if (DEBUG) { - LOG.debug(request.getRequestURI() + "?" + request.getQueryString()); - } - - PrintWriter writer = response.getWriter(); - String path = request.getRequestURI().substring(request.getContextPath().length()); - String urlWithParameter = request.getRequestURI(); - if (request.getQueryString() != null) { - urlWithParameter += "?" + request.getQueryString(); - } - - QCClusterManager qcMgr = QCClusterManager.getInstance(); - Gson gson = new Gson(); - switch (path) { - case "/clusterList": { - Collection cl = qcMgr.getClusterList(); - Type qListType = new TypeToken>() {}.getType(); - String jsonObj = gson.toJson(cl, qListType); - writer.printf("{\"result\":\"ok\", \"msg\":\"ok\", \"data\":%s}", jsonObj); - response.setStatus(HttpServletResponse.SC_OK); - break; - } - case "/runningQueries": { - QCCluster c = qcMgr.getCluster(request.getParameter("cluster")); - if (c!=null) { - Collection ql = c.getExportedRunningQueries(); - Type qListType = new TypeToken>() { - }.getType(); - String jsonObj = gson.toJson(ql, qListType); - writer.printf("{\"result\":\"ok\", \"msg\":\"ok\", \"data\":%s}", jsonObj); - response.setStatus(HttpServletResponse.SC_OK); - } - else { - LOG.error(urlWithParameter + " : invalid parameter"); - writer.print("{\"result\":\"error\", \"msg\":\"invalid parameter\"}"); - response.setStatus(HttpServletResponse.SC_BAD_REQUEST); - } - break; - } - - case "/completeQueries": { - QCCluster c = qcMgr.getCluster(request.getParameter("cluster")); - if (c!=null) { - Collection ql = c.getExportedCompleteQueries(); - - /* - // TODO: paging, filtering - int page_size = 100; - int page_number = 0; - int lastIdx = (page_number + 1)*page_size; - if (lastIdx > ql.size() ) - lastIdx = ql.size(); - if (page_number*page_size > ql.size()) { - // page out of bound - return - } - return ql.subList(page_number*page_size, (page_number + 1)*page_size); - */ - - Type qListType = new TypeToken>() {}.getType(); - String jsonObj = gson.toJson(ql, qListType); - writer.printf("{\"result\":\"ok\", \"msg\":\"ok\", \"data\":%s}", jsonObj); - response.setStatus(HttpServletResponse.SC_OK); - } - else { - LOG.error(urlWithParameter + " : invalid parameter"); - writer.print("{\"result\":\"error\", \"msg\":\"invalid parameter\"}"); - response.setStatus(HttpServletResponse.SC_BAD_REQUEST); - } - break; - } - case "/sysInfoList": { - QCCluster c = qcMgr.getCluster(request.getParameter("cluster")); - if (c!=null) { - Collection l = c.getSystemInfo(); - Type lType = new TypeToken>() { - }.getType(); - String jsonObj = gson.toJson(l, lType); - writer.printf("{\"result\":\"ok\", \"msg\":\"ok\", \"data\":%s}", jsonObj); - response.setStatus(HttpServletResponse.SC_OK); - } - else { - LOG.error(urlWithParameter + " : invalid parameter"); - writer.print("{\"result\":\"error\", \"msg\":\"invalid parameter\"}"); - response.setStatus(HttpServletResponse.SC_BAD_REQUEST); - } - break; - } - case "/poolInfoList": { - QCCluster c = qcMgr.getCluster(request.getParameter("cluster")); - if (c != null) { - Collection l = c.getPoolInfo(); - Type lType = new TypeToken>() {}.getType(); - String jsonObj = gson.toJson(l, lType); - writer.printf("{\"result\":\"ok\", \"msg\":\"ok\", \"data\":%s}", jsonObj); - response.setStatus(HttpServletResponse.SC_OK); - } - else { - LOG.error(urlWithParameter + " : invalid parameter"); - writer.print("{\"result\":\"error\", \"msg\":\"invalid parameter\"}"); - response.setStatus(HttpServletResponse.SC_BAD_REQUEST); - } - break; - } - case "/cancelQuery": { - if ( request.getAttribute(ASYNC_REQ_ATTR) == null ) { - final String cuqId = request.getParameter("cuqId"); - final QCCluster c = qcMgr.getCluster(request.getParameter("cluster")); - if (c == null || cuqId == null || cuqId.length() == 0) { - LOG.error(urlWithParameter + " : invalid parameter"); - writer.print("{\"result\":\"error\", \"msg\":\"invalid parameter\"}"); - response.setStatus(HttpServletResponse.SC_BAD_REQUEST); - break; - } - final QCQuery.QueryExport eq = c.getExportedRunningQueryByCuqId(cuqId); - if ( eq == null ) { - response.setContentType("application/json; charset=utf-8"); - response.setStatus(HttpServletResponse.SC_OK); - writer.printf("{\"result\":\"%s\", \"msg\":\"%s\"}", "error", "query finished already."); - LOG.error("cancelQuery invalid. cuqId " + cuqId + " not found"); - break; - } - - LOG.info("Starting cancel async job. " + eq.server + " " + eq.backend + " " + eq.id); - final AsyncContext async = request.startAsync(); - request.setAttribute(ASYNC_REQ_ATTR, new Boolean(true)); - async.setTimeout(30000); - new Thread(new Runnable() { - @Override - public void run() { - HttpUtil httpUtil = new HttpUtil(); - StringBuffer buf = new StringBuffer(); - int res = 0; - try { - String url = "http://" + eq.server + ":" + c.getOpt().webPort + "/api/cancelQuery?id="+eq.id+"&driver="+eq.backend; - - httpUtil.get( url, buf ); - async.getRequest().setAttribute(ASYNC_RETURN_ATTR, buf.toString()); - LOG.info("Cancel reply " + buf.toString()); - } catch (Exception e) { - async.getRequest().setAttribute(ASYNC_RETURN_ATTR, - "{\"result\":\"error\", \"msg\":\"Cancel failed.\"}"); - LOG.error("Cancel failed" + e); - } - async.dispatch(); - } - }).start(); - } else { - String reply = (String) request.getAttribute(ASYNC_RETURN_ATTR); - if (reply == null || reply.length() == 0) { - LOG.error("Cancel AsyncTask didn't set valid return message."); - reply="{\"result\":\"error\", \"msg\":\"Result was not set.\"}"; - } - response.setStatus(HttpServletResponse.SC_OK); - writer.print(reply); - } - break; - } - default: { - LOG.error(request.getRequestURI() + " not found.."); - response.setStatus(HttpServletResponse.SC_NOT_FOUND); - break; - } - } - } -} diff --git a/src/main/java/com/skplanet/checkmate/servlet/CMQCApiServlet.java b/src/main/java/com/skplanet/checkmate/servlet/CMQCApiServlet.java new file mode 100644 index 0000000..3898c6a --- /dev/null +++ b/src/main/java/com/skplanet/checkmate/servlet/CMQCApiServlet.java @@ -0,0 +1,198 @@ +package com.skplanet.checkmate.servlet; + +import static java.lang.String.format; + +import java.io.IOException; +import java.io.PrintWriter; +import java.util.Collection; +import java.util.List; + +import javax.servlet.AsyncContext; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.gson.Gson; +import com.google.gson.JsonObject; +import com.google.gson.reflect.TypeToken; +import com.skplanet.checkmate.CheckMateServer; +import com.skplanet.checkmate.querycache.QCCluster; +import com.skplanet.checkmate.querycache.QCClusterManager; +import com.skplanet.checkmate.querycache.data.ClusterServerPoolInfo; +import com.skplanet.checkmate.querycache.data.ClusterServerSysStats; +import com.skplanet.checkmate.utils.HttpUtil; +import com.skplanet.querycache.server.cli.QueryProfile; + +/** + * Created by nazgul33 on 15. 1. 29. + */ +public class CMQCApiServlet extends HttpServlet { + + private static final long serialVersionUID = 2564648276343514634L; + + private static final Logger LOG = LoggerFactory.getLogger(CMQCApiServlet.class); + + private static final String ASYNC_REQ_ATTR = CMQCApiServlet.class.getName() + ".async"; + private static final String ASYNC_RETURN_ATTR = CMQCApiServlet.class.getName() + ".async.return"; + + private Gson gson = new Gson(); + + protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { + resp.setContentType("application/json; charset=utf-8"); + PrintWriter out = resp.getWriter(); + + + String path = req.getRequestURI().substring(req.getContextPath().length()); + String urlWithParameter = req.getRequestURI(); + if (req.getQueryString() != null) { + urlWithParameter += "?" + req.getQueryString(); + } + + String clusterName = req.getParameter("cluster"); + QCClusterManager qcMgr = CheckMateServer.getClusterManager(); + QCCluster cluster = clusterName!=null?qcMgr.getCluster(clusterName):null; + + JsonObject json = new JsonObject(); + switch (path) { + case "/clusterList": { + List cnameList = qcMgr.getClusterNameList(); + json.addProperty("result", "ok"); + json.addProperty("msg", "ok"); + json.add("data", gson.toJsonTree(cnameList, new TypeToken>(){}.getType())); + out.print(json.toString()); + resp.setStatus(HttpServletResponse.SC_OK); + break; + } + case "/runningQueries": { + if (cluster!=null) { + List profileList = cluster.getRunningQueries(); + + json.addProperty("result", "ok"); + json.addProperty("msg", "ok"); + json.add("data", gson.toJsonTree(profileList, new TypeToken>(){}.getType()).getAsJsonArray()); + out.print(json.toString()); + resp.setStatus(HttpServletResponse.SC_OK); + } else { + json.addProperty("result", "error"); + json.addProperty("msg", "invalid parameter"); + out.print(json.toString()); + resp.setStatus(HttpServletResponse.SC_BAD_REQUEST); + } + break; + } + case "/completeQueries": { + if (cluster!=null) { + List profileList = cluster.getCompleteQueries(); + json.addProperty("result", "ok"); + json.addProperty("msg", "ok"); + json.add("data", gson.toJsonTree(profileList, new TypeToken>(){}.getType()).getAsJsonArray()); + out.print(json.toString()); + resp.setStatus(HttpServletResponse.SC_OK); + } else { + json.addProperty("result", "error"); + json.addProperty("msg", "invalid parameter"); + out.print(json.toString()); + resp.setStatus(HttpServletResponse.SC_BAD_REQUEST); + } + break; + } + case "/sysInfoList": { + if (cluster!=null) { + List list = cluster.getServerInfo(); + json.addProperty("result", "ok"); + json.addProperty("msg", "ok"); + json.add("data", gson.toJsonTree(list, new TypeToken>(){}.getType()).getAsJsonArray()); + out.print(json.toString()); + resp.setStatus(HttpServletResponse.SC_OK); + } else { + json.addProperty("result", "error"); + json.addProperty("msg", "invalid parameter"); + out.print(json.toString()); + resp.setStatus(HttpServletResponse.SC_BAD_REQUEST); + } + break; + } + case "/poolInfoList": { + if (cluster != null) { + List list = cluster.getPoolInfo(); + json.addProperty("result", "ok"); + json.addProperty("msg", "ok"); + json.add("data", gson.toJsonTree(list , new TypeToken>(){}.getType()).getAsJsonArray()); + out.print(json.toString()); + resp.setStatus(HttpServletResponse.SC_OK); + } else { + json.addProperty("result", "error"); + json.addProperty("msg", "invalid parameter"); + out.print(json.toString()); + resp.setStatus(HttpServletResponse.SC_BAD_REQUEST); + } + break; + } + case "/cancelQuery": { + if (req.getAttribute(ASYNC_REQ_ATTR) == null ) { + String cuqId = req.getParameter("cuqId"); + if (cluster != null && cuqId != null && cuqId.trim().length() > 0) { + QueryProfile profile = cluster.getRunningQuery(cuqId); + if (profile != null) { + String server = profile.getServer(); + int port = cluster.getOptions().getWebPort(); + String queryId = profile.getQueryId(); + final AsyncContext async = req.startAsync(); + final String url = format("http://%s:%s/api/cancelQuery?queryId=%s", server, port, queryId); + LOG.info("Starting cancel async job. {}:{}" + profile.getServer() + profile.getQueryId()); + req.setAttribute(ASYNC_REQ_ATTR, new Boolean(true)); + async.setTimeout(30000); + new Thread(){ + @Override + public void run() { + HttpUtil httpUtil = new HttpUtil(); + StringBuilder buf = new StringBuilder(); + try { + httpUtil.get( url, buf ); + async.getRequest().setAttribute(ASYNC_RETURN_ATTR, buf.toString()); + } catch (Exception e) { + JsonObject json = new JsonObject(); + json.addProperty("result", "error"); + json.addProperty("msg", "Cancel failed. +"+e.getMessage()); + async.getRequest().setAttribute(ASYNC_RETURN_ATTR, json.toString()); + } + async.dispatch(); + } + }.start(); + } else { + json.addProperty("result", "error"); + json.addProperty("msg", "query finished already"); + out.print(json.toString()); + resp.setStatus(HttpServletResponse.SC_BAD_REQUEST); + } + } else { + json.addProperty("result", "error"); + json.addProperty("msg", "invalid parameter"); + out.print(json.toString()); + resp.setStatus(HttpServletResponse.SC_BAD_REQUEST); + } + } else { + String reply = (String) req.getAttribute(ASYNC_RETURN_ATTR); + if (reply == null || reply.length() == 0) { + LOG.error("Cancel AsyncTask didn't set valid return message."); + json.addProperty("result", "error"); + json.addProperty("msg", "Result was not set."); + reply = json.toString(); + } + out.print(reply); + resp.setStatus(HttpServletResponse.SC_OK); + } + break; + } + default: { + LOG.error("invalid url request. {}", urlWithParameter); + resp.setStatus(HttpServletResponse.SC_NOT_FOUND); + break; + } + } + } +} diff --git a/src/main/java/com/skplanet/checkmate/servlet/CMQCServerWebSocket.java b/src/main/java/com/skplanet/checkmate/servlet/CMQCServerWebSocket.java deleted file mode 100644 index b45d1cb..0000000 --- a/src/main/java/com/skplanet/checkmate/servlet/CMQCServerWebSocket.java +++ /dev/null @@ -1,88 +0,0 @@ -package com.skplanet.checkmate.servlet; - - -import com.google.gson.Gson; -import com.skplanet.checkmate.querycache.QCCluster; -import com.skplanet.checkmate.querycache.QCClusterManager; -import org.eclipse.jetty.websocket.api.Session; -import org.eclipse.jetty.websocket.api.WebSocketListener; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Created by nazgul33 on 15. 2. 13. - */ -public class CMQCServerWebSocket implements WebSocketListener { - private static final boolean DEBUG = false; - private static final Logger LOG = LoggerFactory.getLogger("websocket-server-qc"); - - private Session outbound; - private int id = -1; - private QCClusterManager qcMgr = QCClusterManager.getInstance(); - private QCCluster cluster = null; - - private static class RequestMsg { - public String request; - public String channel; - public String data; - } - - @Override - public void onWebSocketBinary(byte[] payload, int offset, int len) - { - /* only interested in test messages */ - } - - @Override - public void onWebSocketClose(int statusCode, String reason) - { - this.outbound = null; - if ( this.id >= 0 && this.cluster != null) { - cluster.unSubscribe(id); - } - this.id = -1; - this.cluster = null; - } - - @Override - public void onWebSocketConnect(Session session) { - this.outbound = session; - } - - @Override - public void onWebSocketError(Throwable cause) { - LOG.error("websocket error", cause); - } - - @Override - public void onWebSocketText(String message) { - if (outbound == null || !outbound.isOpen()) - return; - - Gson gson = new Gson(); - RequestMsg msg = gson.fromJson(message, RequestMsg.class); - switch (msg.request) { - case "subscribe": { - if ("cluster".equals(msg.channel)) { - this.cluster = qcMgr.getCluster(msg.data); - if (this.cluster != null) { - this.id = this.cluster.subscribe(this); - } - } - return; - } - case "ping": { - outbound.getRemote().sendString("{\"msgType\":\"pong\"}", null); - return; - } - } - - LOG.error("unknown message " + message); - } - - public void sendMessage(String message) { - if (outbound == null || !outbound.isOpen()) - return; - outbound.getRemote().sendString(message, null); - } -} diff --git a/src/main/java/com/skplanet/checkmate/servlet/CMQCWebSocket.java b/src/main/java/com/skplanet/checkmate/servlet/CMQCWebSocket.java new file mode 100644 index 0000000..48988b1 --- /dev/null +++ b/src/main/java/com/skplanet/checkmate/servlet/CMQCWebSocket.java @@ -0,0 +1,212 @@ +package com.skplanet.checkmate.servlet; + + +import java.util.List; + +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.WebSocketListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.gson.Gson; +import com.skplanet.checkmate.CheckMateServer; +import com.skplanet.checkmate.querycache.QCCluster; +import com.skplanet.checkmate.querycache.QCClusterManager; +import com.skplanet.querycache.server.cli.QueryProfile; +import com.skplanet.querycache.servlet.QCWebSocket.ChangedProfile; + +/** + * Created by nazgul33 on 15. 2. 13. + */ +public class CMQCWebSocket implements WebSocketListener { + + private static final Logger LOG = LoggerFactory.getLogger(CMQCWebSocket.class); + + private String client; + private Session session; + private long idx; + private QCClusterManager qcMgr = CheckMateServer.getClusterManager(); + private QCCluster cluster; + private Gson gson = new Gson(); + + private String okMessage; + private String failMessage; + private String pongMessage; + private String unsupportMessage; + private String errorMessage; + + public CMQCWebSocket() { + okMessage = gson.toJson(new Response(Response.SUBSCRIBE, "OK")); + failMessage = gson.toJson(new Response(Response.SUBSCRIBE, "FAIL")); + pongMessage = gson.toJson(new Response(Response.PONG, "PONG")); + unsupportMessage = gson.toJson(new Response(Response.UNSUPPORT, "unsupport")); + errorMessage = gson.toJson(new Response(Response.UNSUPPORT, "message not json")); + } + + public String getClient() { + return client; + } + + @Override + public void onWebSocketBinary(byte[] payload, int offset, int len) { + /* only interested in test messages */ + } + + @Override + public void onWebSocketClose(int statusCode, String reason) { + if (session != null) { + try { + session.disconnect(); + } catch (Exception e) { + LOG.error("ws disconnect error. client={}, idx={}, statusCode={}, reason={}", client, idx, statusCode, reason, e); + } + session = null; + } + if (cluster != null){ + cluster.removeSubscriber(idx); + } + LOG.info("ws closed. client={}, idx={}, status={}, reason={}", client, idx, statusCode, reason); + } + + @Override + public void onWebSocketConnect(Session session) { + this.session = session; + this.client = session.getRemoteAddress().toString(); + this.idx = qcMgr.getEventSubscriberIdx(); + LOG.info("ws connected. client={}, idx={}", client, idx); + } + + @Override + public void onWebSocketError(Throwable cause) { + LOG.error("ws error", cause); + } + + @Override + public void onWebSocketText(String message) { + if (session == null || !session.isOpen()) + return; + + try { + Request req = gson.fromJson(message, Request.class); + + switch (req.getRequest()) { + case Request.SUBSCRIBE: { + if (Request.CLUSTER.equals(req.getChannel())) { + cluster = qcMgr.getCluster(req.getData()); + if (cluster != null) { + cluster.addSubscriber(idx, this); + send(okMessage); + } else { + send(failMessage); + } + } else { + send(failMessage); + } + break; + } + case Request.PING : { + send(pongMessage); + break; + } + default: + send(unsupportMessage); + break; + } + } catch (Exception e) { + LOG.error("ws error={}, client={}, idx={}", e.getMessage(), client, idx); + send(errorMessage); + } + } + + public void send(String message) { + if (session !=null && session.isOpen() && idx > 0) { + session.getRemote().sendString(message, null); + } + } + + public static class Request { + + public static final String SUBSCRIBE = "subscribe"; + public static final String PING = "ping"; + public static final String CLUSTER = "cluster"; + + private String r; + private String c; + private String d; + + public String getRequest() { + return r; + } + public void setRequest(String request) { + this.r = request; + } + public String getChannel() { + return c; + } + public void setChannel(String channel) { + this.c = channel; + } + public String getData() { + return d; + } + public void setData(String data) { + this.d = data; + } + } + + public static class Response { + + public static final String SUBSCRIBE = "subscribe"; + public static final String PONG = "pong"; + public static final String UNSUPPORT = "unsupport"; + public static final String DATA = "data"; + + private String t; + private String d; + private List q; + private List c; + + public Response(){} + + public Response(String type) { + this.t = type; + } + + public Response(String type, String data) { + this.t = type; + this.d = data; + } + + public String getType() { + return t; + } + + public void setType(String type) { + this.t = type; + } + + public String getData() { + return d; + } + + public void setData(String data) { + this.d = data; + } + + public List getProfiles() { + return q; + } + + public void setProfiles(List profiles) { + this.q = profiles; + } + + public List getCprofiles() { + return c; + } + + public void setCprofiles(List cprofiles) { + this.c = cprofiles; + } + } +} diff --git a/src/main/java/com/skplanet/checkmate/servlet/CMWebSocketServletQC.java b/src/main/java/com/skplanet/checkmate/servlet/CMQCWebSocketServlet.java similarity index 51% rename from src/main/java/com/skplanet/checkmate/servlet/CMWebSocketServletQC.java rename to src/main/java/com/skplanet/checkmate/servlet/CMQCWebSocketServlet.java index 97ff6ec..2391af8 100644 --- a/src/main/java/com/skplanet/checkmate/servlet/CMWebSocketServletQC.java +++ b/src/main/java/com/skplanet/checkmate/servlet/CMQCWebSocketServlet.java @@ -3,17 +3,20 @@ import org.eclipse.jetty.websocket.servlet.WebSocketServlet; import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory; -import javax.servlet.annotation.WebServlet; - /** * Created by nazgul33 on 15. 2. 13. */ -@WebServlet(name = "CheckMate WebSocket for QueryCache", urlPatterns = { "/api/qc/websocket" }) -public class CMWebSocketServletQC extends WebSocketServlet { - @Override +public class CMQCWebSocketServlet extends WebSocketServlet { + + private static final long serialVersionUID = -1985362762429496022L; + + private static final int MAX_TEXT_MESSAGE_SIZE = 10*1024*1024; + + @Override public void configure(WebSocketServletFactory factory) { factory.getPolicy().setIdleTimeout(120*1000); // 2min - factory.register(CMQCServerWebSocket.class); + factory.getPolicy().setMaxTextMessageSize(MAX_TEXT_MESSAGE_SIZE); + factory.register(CMQCWebSocket.class); } } diff --git a/src/main/java/com/skplanet/checkmate/utils/HttpUtil.java b/src/main/java/com/skplanet/checkmate/utils/HttpUtil.java index f246657..8113622 100644 --- a/src/main/java/com/skplanet/checkmate/utils/HttpUtil.java +++ b/src/main/java/com/skplanet/checkmate/utils/HttpUtil.java @@ -5,64 +5,92 @@ import java.net.HttpURLConnection; import java.net.URL; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * Created by nazgul33 on 15. 1. 27. */ public class HttpUtil { - private static boolean DEBUG = false; + + private static final Logger LOG = LoggerFactory.getLogger(HttpUtil.class); + private static int CONNECTION_TIMEOUT = 5000; private static int READ_TIMEOUT = 3000; public final String userAgent; + + public HttpUtil() { + this("CheckMate 1.0"); + } + public HttpUtil(String userAgent) { this.userAgent = userAgent; } - public HttpUtil() { - this.userAgent = "CheckMate 1.0"; - } // HTTP GET request // returns response code // if response code is 200, parameter "response" should have the result - public int get(String url, StringBuffer response) throws Exception { + public int get(String url, StringBuilder response) throws Exception { return get(url, response, null); } - public int getJson(String url, StringBuffer response) throws Exception { + public int getJson(String url, StringBuilder response) throws Exception { return get(url, response, "application/json"); } - public int get(String url, StringBuffer response, String accept) throws Exception { + public int get(String url, StringBuilder response, String accept) throws Exception { URL obj = new URL(url); - HttpURLConnection con = (HttpURLConnection) obj.openConnection(); - con.setConnectTimeout(CONNECTION_TIMEOUT); - con.setReadTimeout(READ_TIMEOUT); + HttpURLConnection con = null; + try { + con = (HttpURLConnection) obj.openConnection(); + con.setConnectTimeout(CONNECTION_TIMEOUT); + con.setReadTimeout(READ_TIMEOUT); + // optional default is GET + con.setRequestMethod("GET"); - // optional default is GET - con.setRequestMethod("GET"); + //add request header + con.setRequestProperty("User-Agent", userAgent); - //add request header - con.setRequestProperty("User-Agent", userAgent); + if (accept != null) { + con.setRequestProperty("Accept", accept); + } - if (accept != null) { - con.setRequestProperty("Accept", accept); - } + int responseCode = con.getResponseCode(); + LOG.debug("sending 'GET' request to URL : {}", url); + LOG.debug("Response Code : {}", responseCode); - int responseCode = con.getResponseCode(); - if (DEBUG) { - System.out.println("\nSending 'GET' request to URL : " + url); - System.out.println("Response Code : " + responseCode); - } + if (responseCode == 200) { + BufferedReader in = null; + try { + in = new BufferedReader( + new InputStreamReader(con.getInputStream())); + String inputLine; - if (responseCode == 200) { - BufferedReader in = new BufferedReader( - new InputStreamReader(con.getInputStream())); - String inputLine; - - while ((inputLine = in.readLine()) != null) { - response.append(inputLine); + while ((inputLine = in.readLine()) != null) { + response.append(inputLine).append("\n"); + } + } finally { + if (in != null) { + in.close(); + } + } } - in.close(); + return responseCode; + } finally { + if (con != null) { + con.disconnect(); + } } - return responseCode; + } + + public static String get(String url) throws Exception { + HttpUtil http = new HttpUtil(); + StringBuilder sb = new StringBuilder(); + int code = http.get(url, sb); + if (code == 200) { + return sb.toString(); + } else { + throw new RuntimeException("response code "+ code); + } } } diff --git a/src/main/java/com/skplanet/checkmate/utils/MailSender.java b/src/main/java/com/skplanet/checkmate/utils/MailSender.java index 692edb4..9980ee8 100644 --- a/src/main/java/com/skplanet/checkmate/utils/MailSender.java +++ b/src/main/java/com/skplanet/checkmate/utils/MailSender.java @@ -1,200 +1,181 @@ package com.skplanet.checkmate.utils; -/** - * Created by nazgul33 on 4/7/16. - */ - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; import javax.mail.Message; -import javax.mail.MessagingException; import javax.mail.Session; import javax.mail.Transport; import javax.mail.internet.InternetAddress; import javax.mail.internet.MimeMessage; -import java.io.PrintWriter; -import java.io.StringWriter; -import java.util.*; + +/** + * Created by nazgul33 on 4/7/16. + */ + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Created by nazgul33 on 10/26/15. */ -public class MailSender implements Runnable { - protected static final Logger LOG = LoggerFactory.getLogger(MailSender.class); - - static public class Mail { - public final String subject; - private String content; - - public Mail(String subject, String content) { - this.subject = subject; - this.content = content; - } - - public Mail(String subject, Exception e) { - this.subject = subject; - - StringWriter errors = new StringWriter(); - errors.append("Exception : ").append(e.getClass().getCanonicalName()).append("\n"); - errors.append(e.getMessage()).append("\n"); - errors.flush(); - e.printStackTrace(new PrintWriter(errors)); - this.content = errors.toString(); - } - - public String getContent() { - return content; - } - - public void setContent(String content) { - this.content = content; - } - } - - private final String smtpServer, smtpFrom; - private final int smtpPort; - private final InternetAddress[] recipientAddresses; - - public MailSender(String smtpServer, String smtpFrom, int smtpPort, List recipients) throws Exception { - this.smtpServer = smtpServer; - this.smtpFrom = smtpFrom; - this.smtpPort = smtpPort; - - recipientAddresses = new InternetAddress[recipients.size()]; - for (int i=0; i mailQueue = new ArrayList<>(); - public void addMail(Mail mail) { - synchronized (mailQueue) { - mailQueue.add(mail); - } - LOG.info("mail queued. {}", mail.subject); - } - - public void run() { - try { - Properties properties = System.getProperties(); - properties.setProperty("mail.smtp.host", smtpServer); - properties.setProperty("mail.smtp.port", String.valueOf(smtpPort)); - - Session session = Session.getDefaultInstance(properties); - - InternetAddress iFrom = new InternetAddress(smtpFrom); - Collection mq = new ArrayList<>(); - - LOG.info("MailSender started."); - - while (!Thread.interrupted()) { - synchronized (mailQueue) { - if (mailQueue.size() > 0) { - for (Mail mail : mailQueue) { - mq.add(mail); - } - mailQueue.clear(); - } - } - if (mq.size() == 0) { - try { - Thread.sleep(500); - } catch (InterruptedException e) { - // ignore - } - continue; - } - - for (Mail mail : mq) { - LOG.info("sending mail '{}'", mail.subject); - - try { - MimeMessage message = new MimeMessage(session); - message.setFrom(iFrom); - message.addRecipients(Message.RecipientType.TO, recipientAddresses); - message.setSubject(mail.subject); - message.setText(mail.content); - - Transport.send(message); - LOG.info("Mail sent to {}", recipientAddresses); - } catch (MessagingException e) { - LOG.error("Exception sending mail", e); - } - } - mq.clear(); - } - } catch (Exception e) { - LOG.error("Unexpected exception", e); - } - } - - public static void main(String args[]) { - String server = null; - int port = 25; - String from = null; - List recipients = null; - String title = null; - String msg = null; - try { - for (int i = 0; i < args.length; i++) { - switch (args[i]) { - case "-s": - server = args[++i]; - System.out.println("Server : " + server); - break; - case "-p": - port = Integer.valueOf(args[++i]); - System.out.println("Port : " + port); - break; - case "-f": - from = args[++i]; - System.out.println("From : " + from); - break; - case "-r": - recipients = Arrays.asList(args[++i].split(",")); - System.out.println("Recipients : " + recipients); - break; - case "-t": - title = args[++i]; - System.out.println("Title : " + title); - break; - case "-m": - msg = args[++i]; - System.out.println("Message : " + msg); - break; - default: - System.out.println("unknown option"); - break; - } - } - } catch (Exception e) { - System.err.println("error parsing arguments."); - e.printStackTrace(); - System.exit(1); - } - - MailSender mailSender = null; - try { - mailSender = new MailSender(server, from, port, recipients); - } catch (Exception e) { - System.err.println("error instantiating MailSender"); - e.printStackTrace(); - System.exit(1); - } - - Thread t = new Thread(mailSender); - t.setDaemon(false); - t.start(); - - Mail mail = new Mail(title, msg); - mailSender.addMail(mail); - - try { - Thread.sleep(10); - t.interrupt(); - t.join(); - } catch (Exception e) { - // ignore - } - } +public class MailSender { + + private static final Logger LOG = LoggerFactory.getLogger(MailSender.class); + + private static MailSender _this; + + private List mailQueue = new ArrayList<>(); + + private String server; + private String from; + private int port; + private InternetAddress[] tos; + private MailSenderThread thread; + + private MailSender(String server, int port, String from, String[] toList) throws Exception { + this.server = server; + this.port = port; + this.from = from; + + tos = new InternetAddress[toList.length]; + for (int i=0;i mq = new ArrayList<>(); + + while (running) { + synchronized (mailQueue) { + if (mailQueue.size() > 0) { + mq.addAll(mailQueue); + mailQueue.clear(); + } + } + for (Mail mail : mq) { + LOG.info("sending mail '{}'", mail.getSubject()); + + try { + MimeMessage message = new MimeMessage(session); + message.setFrom(iFrom); + message.addRecipients(Message.RecipientType.TO, tos); + message.setSubject(mail.getSubject()); + message.setText(mail.getContent()); + + Transport.send(message); + LOG.info("Mail sent to {}", Arrays.toString(tos)); + } catch (Exception e) { + LOG.error("Exception sending mail", e); + } + } + mq.clear(); + try { + Thread.sleep(500); + } catch (InterruptedException e) { + break; + } + } + } catch (Exception e) { + LOG.error("Unexpected exception", e); + } + + LOG.info("MailSender thread started."); + } + + public void close() { + running = false; + interrupt(); + } + } + + public static class Mail { + private String subject; + private String content; + + public Mail(String subject) { + this(subject, (String)null); + } + + public Mail(String subject, String content) { + this.subject = subject; + this.content = content; + } + + public Mail(String subject, Exception e) { + this.subject = subject; + + StringWriter errors = new StringWriter(); + errors.append("Exception : ").append(e.getClass().getCanonicalName()).append("\n"); + errors.append(e.getMessage()).append("\n"); + errors.flush(); + e.printStackTrace(new PrintWriter(errors)); + this.content = errors.toString(); + } + + public String getSubject() { + return subject; + } + + public String getContent() { + return content; + } + + public void setContent(String content) { + this.content = content; + } + } } diff --git a/src/main/java/com/skplanet/checkmate/yarn/YarnMonitor.java b/src/main/java/com/skplanet/checkmate/yarn/YarnMonitor.java index aaca8ac..36b0b25 100644 --- a/src/main/java/com/skplanet/checkmate/yarn/YarnMonitor.java +++ b/src/main/java/com/skplanet/checkmate/yarn/YarnMonitor.java @@ -7,26 +7,27 @@ * Created by nazgul33 on 8/5/15. */ public class YarnMonitor { - Timer updateTimer = new Timer(true); - YarnResourceManager yarn = new YarnResourceManager(); + + Timer updateTimer = new Timer(true); + YarnResourceManager yarn = new YarnResourceManager(); - private static YarnMonitor instance; - public static YarnMonitor getInstance() { - if (instance == null) { - instance = new YarnMonitor(); - } - return instance; - } + private static YarnMonitor instance; + public static YarnMonitor getInstance() { + if (instance == null) { + instance = new YarnMonitor(); + } + return instance; + } - private YarnMonitor() { - updateTimer.scheduleAtFixedRate( - new TimerTask() { - public void run() { - yarn.GetUnfinishedApplications(); - } - }, - 5000, - 5000 - ); - } + private YarnMonitor() { + updateTimer.scheduleAtFixedRate( + new TimerTask() { + public void run() { + yarn.GetUnfinishedApplications(); + } + }, + 5000, + 5000 + ); + } } diff --git a/src/main/java/com/skplanet/checkmate/yarn/YarnResourceManager.java b/src/main/java/com/skplanet/checkmate/yarn/YarnResourceManager.java index 8554ab1..f013a14 100644 --- a/src/main/java/com/skplanet/checkmate/yarn/YarnResourceManager.java +++ b/src/main/java/com/skplanet/checkmate/yarn/YarnResourceManager.java @@ -12,40 +12,41 @@ * Created by nazgul33 on 8/5/15. */ public class YarnResourceManager { - private static final Logger LOG = LoggerFactory.getLogger("yarn"); - private long lastUpdated = 0; + + private static final Logger LOG = LoggerFactory.getLogger(YarnResourceManager.class); + + private Gson gson = new Gson(); + + static class AppsContainer { + static class AppContainer { + List app; + } + AppContainer apps; + } - private Gson gson = new Gson(); - static class AppsContainer { - static class AppContainer { - List app; - } - AppContainer apps; - } + private List yarnApps = new ArrayList(); - private List yarnApps = new ArrayList(); + void GetUnfinishedApplications() { + final String url = "http://dicc-m002:10100/ws/v1/cluster/apps?states=NEW,RUNNING,"; + HttpUtil httpUtil = new HttpUtil(); + int responseCode = 0; + try { + StringBuilder buf = new StringBuilder(2048); + responseCode = httpUtil.getJson(url, buf); + LOG.info("http response {}", responseCode); + if (responseCode == 200) { + AppsContainer ac = gson.fromJson(buf.toString(), AppsContainer.class); + yarnApps.clear(); + if (ac != null && ac.apps != null && ac.apps.app != null) { + yarnApps.addAll(ac.apps.app); + } + } + } catch (Exception e) { + LOG.error("Getting active application list from resource manager", e); + } - void GetUnfinishedApplications() { - final String url = "http://dicc-m002:10100/ws/v1/cluster/apps?states=NEW,RUNNING,"; - HttpUtil httpUtil = new HttpUtil(); - int responseCode = 0; - try { - StringBuffer buf = new StringBuffer(2048); - responseCode = httpUtil.getJson(url, buf); - LOG.info("http response {}", responseCode); - if (responseCode == 200) { - AppsContainer ac = gson.fromJson(buf.toString(), AppsContainer.class); - yarnApps.clear(); - if (ac != null && ac.apps != null && ac.apps.app != null) { - yarnApps.addAll(ac.apps.app); - } - } - } catch (Exception e) { - LOG.error("Getting active application list from resource manager", e); - } - - for (YarnApplication a: yarnApps) { - LOG.info("app {} progress {}", a.id, a.progress); - } - } + for (YarnApplication a: yarnApps) { + LOG.info("app {} progress {}", a.id, a.progress); + } + } } diff --git a/www/css/querycache.css b/www/css/querycache.css index 681985c..488b8e2 100644 --- a/www/css/querycache.css +++ b/www/css/querycache.css @@ -94,90 +94,92 @@ table.fixed { table-layout: fixed; } + +td, +th{ + word-break: break-all; + padding-left: 1px; + padding-right: 1px; +} + +th { + text-align:left; +} + +td.qRight +{ + text-align:right; +} +td.qLeft +{ + text-align:left; +} +td.qCenter +{ + text-align:center; +} td.qServer, th.qServer { - width: 50px; - padding-left: 1px; - padding-right: 1px; + width: 170px; } td.qId, th.qId { - width: 52px; - padding-left: 1px; - padding-right: 1px; -} - -td.qBackend, -th.qBackend -{ - width: 72px; - padding-left: 1px; - padding-right: 1px; + width: 120px; } td.qUser, th.qUser { - width: 72px; - padding-left: 1px; - padding-right: 1px; + width: 70px; } -td.qStatement, -th.qStatement +td.qQuery, +th.qQuery { - padding-left: 1px; - padding-right: 1px; - word-wrap: break-word; + width:%; } td.qState, th.qState { - width: 48px; - padding-left: 1px; - padding-right: 1px; + width: 80px; } td.qClient, th.qClient { - width: 90px; - padding-left: 1px; - padding-right: 1px; + width: 100px; } td.qRows, th.qRows { - width: 72px; - padding-left: 1px; - padding-right: 1px; + width: 80px; } td.qStartTime, th.qStartTime { - width: 80px; - padding-left: 1px; - padding-right: 1px; + width: 90px; +} + +td.qEndTime, +th.qEndTime +{ + width: 90px; } td.qElapsedTime, th.qElapsedTime { - width: 90px; - padding-left: 1px; - padding-right: 1px; + width: 80px; } td.qCancel, th.qCancel { - width: 90px; - padding-left: 1px; - padding-right: 1px; -} + width: 80px; +} \ No newline at end of file diff --git a/www/qc/index.jsp b/www/qc/index.jsp index 847b4f8..b031425 100644 --- a/www/qc/index.jsp +++ b/www/qc/index.jsp @@ -8,16 +8,13 @@ <%@include file="header.jsp"%> - - - <%@include file="queries.funcs.html"%>
-
-
-

Connection Pool Stats

- - - - - - -
serverbackendfreein use
-
+
+

Connection Pool Stats

+ + + + + + +
serverbackendpool freepool useddirectcreated
+
-
-

Object Pool Stats

- - - - - - -
serverTROWSETTROWTCOLUMNVALUETSTRINGVALUE
-
+
+

Object Pool Stats

+ + + + + + +
serverTROWSETTROWTCOLUMNVALUETSTRINGVALUE
+
-
diff --git a/www/qc/queries.funcs.html b/www/qc/queries.funcs.html index 9007c4d..b8ad8f1 100644 --- a/www/qc/queries.funcs.html +++ b/www/qc/queries.funcs.html @@ -1,225 +1,203 @@ diff --git a/www/qc/queries.jsp b/www/qc/queries.jsp index 87d35f4..901dc39 100644 --- a/www/qc/queries.jsp +++ b/www/qc/queries.jsp @@ -1,37 +1,43 @@ -<%@ taglib prefix="c" uri="http://java.sun.com/jsp/jstl/core" %> -<%@ taglib uri="http://java.sun.com/jsp/jstl/functions" prefix="fn"%> - +<%@ taglib prefix="c" uri="http://java.sun.com/jsp/jstl/core"%> +<%@ taglib uri="http://java.sun.com/jsp/jstl/functions" prefix="fn"%> + +<% +boolean useWebSocket = clusters.getQcOption(request.getParameter("cluster")).isUseWebSocket(); +%> - - CheckMate :: QueryCache :: Queries + +CheckMate :: QueryCache :: Queries -<%@include file="header.jsp"%> - - - -<%@include file="queries.funcs.html"%> - -
-

In-Flight Queries

- - - - - - - - - - - - - - - - -
serveridtypeuserstatementstateclient iprowsstartTimeelapsedTimeCancel
+
+

In-Flight Queries

+ + + + + + + + + + + + + + + + + +
serveriduserquerystateclientrowsstartelapsedCancel
-

Complete Queries Manual Refresh

- - - - - - - - - - - - - - - -
serveridtypeuserstatementstateclient iprowsstartTimeelapsedTime
-
+

+ Complete Queries + Manual Refresh + +

+ + + + + + + + + + + + + + + + + +
serveriduserquerystateclientrowsstartendelapsed
+