要是说检测恶意内容纯属扯淡,只能看异常行为

非标端口(常见端口, 22, xx22),

SELECT src_ip_str , dst_ip_str, groupUniqArray(dst_port),count(*) as n FROM ssh_log_meta WHERE match(toString(dst_port) , '.*22')=0 
group by src_ip_str , dst_ip_str
order by n desc

SSH 非标协议(22 端口无法解析出 SSH 协议)

SELECT client_ip_addr, server_ip_addr,groupUniqArray(link_id), groupUniqArray(protocol) as protocol_list, toDateTime(min(`date`)/1000) , count(*) as n FROM tcp_session
WHERE server_port=22 and server_total_packet>3 and protocol not in [826, 600]
and `date` > toInt64(now() - INTERVAL 1 DAY)*1000
group by client_ip_addr, server_ip_addr 
order by n desc

客户端大量 UA (对同款软件的不同版本是否合并?某个服务器被多人使用,用的不同客户端)

SELECT src_ip_str,  arrayDistinct(splitByChar(',',arrayStringConcat(groupArray(client_support), ','))) as UA, LENGTH(UA) as n from ssh_log_meta 
group by src_ip_str 
HAVING n>2
ORDER BY n desc

SSH 端口暴露统计(外部的地址访问)

SELECT src_ip_str, dst_ip_str, max(`date`) as dt,count(*) as n FROM ssh_log_meta 
WHERE match(src_ip_str,  '^(10\.|172\.(1[6-9]|2[0-9]|3[0-1])|192\.168|0\.|127\.)')=0
group by src_ip_str, dst_ip_str

ssh 爆破 (10 分钟>100 次)

SELECT 
    src_ip_str, dst_ip_str, min(access_time) as min_time, max(access_time) as max_time, COUNT(0) AS access_count
FROM (
    SELECT
        src_ip_str, dst_ip_str, toDateTime(`date`/1000) AS access_time,
        toStartOfInterval(access_time, INTERVAL 10 MINUTE) AS time_slice
    FROM
        ssh_log_meta
)
GROUP BY
    src_ip_str, dst_ip_str, time_slice
HAVING
    access_count > 100
ORDER BY 
    access_count DESC

ssh 密码喷洒(短时间内访问超过 10 个 ssh)

SELECT 
    src_ip_str, groupUniqArray(dst_ip_str) as dst_ip_array, groupArray(access_time)[1] as access_time, LENGTH(dst_ip_array) AS server_num
FROM (
    SELECT
        src_ip_str, dst_ip_str, toDateTime(`date`/1000) AS access_time,
        toStartOfInterval(access_time, INTERVAL 1 MINUTE) AS time_slice
    FROM
        ssh_log_meta
)
GROUP BY
    src_ip_str, time_slice
HAVING
    server_num > 10
ORDER BY 
    server_num DESC 

访问方向统计, 1:内, 2:外(长期未被外网访问的被访问,一直访问内网的突然访问外网?)

SELECT src_ip_str, groupUniqArray(dst_ip_str), groupUniqArray(concat(toString(src_net), toString(dst_net))) as direction  from
(
    SELECT src_ip_str, dst_ip_str, 
    match(src_ip_str,  '^(10\.|172\.(1[6-9]|2[0-9]|3[0-1])|192\.168|0\.|127\.)')=1 as src_net, 
    match(dst_ip_str,  '^(10\.|172\.(1[6-9]|2[0-9]|3[0-1])|192\.168|0\.|127\.)')=1 as dst_net
    FROM ssh_log_meta
)
group by src_ip_str
HAVING LENGTH(direction)>1

心跳/周期访问(计算连接时间差值,然后注意考虑抖动量)

SELECT link_id, src_ip_str, dst_ip_str, groupArray(dt_str) as dt_list, groupArray(time_diff) as dt_diff, arrayStringConcat(groupArray(toString(abs(time_diff-lag2)<time_diff*0.01)), '') as diff_str, anyHeavy(time_diff) as diff_mode from(
    SELECT link_id, src_ip_str, dst_ip_str, FROM_UNIXTIME(CAST(dt AS bigint)) as dt_str, dt-lag as time_diff, any(time_diff) OVER (PARTITION By link_id, src_ip_str, dst_ip_str order by dt ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING) as lag2 from(
        SELECT link_id, src_ip_str, dst_ip_str, arrayJoin(time_array) as dt, any(dt) OVER (PARTITION By link_id, src_ip_str, dst_ip_str order by dt ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING) as lag from(
            SELECT link_id, src_ip_str, dst_ip_str, groupArray(`date`/1000) as time_array FROM ssh_log_meta
            group by src_ip_str, dst_ip_str, link_id HAVING LENGTH(time_array)>5
        )
    )
) group by link_id, src_ip_str, dst_ip_str HAVING diff_str like '%11111%' and diff_mode>10

内网 ssh 服务器,新增访问者

SELECT groupUniqArray(src_ip_str), dst_ip_str FROM ssh_log_meta 
WHERE match(dst_ip_str,  '^(10\.|172\.(1[6-9]|2[0-9]|3[0-1])|192\.168|0\.|127\.)')=1
and `date` < toInt64(toDateTime(today()-1))*1000 
group by dst_ip_str

SELECT groupUniqArray(src_ip_str), dst_ip_str FROM ssh_log_meta 
WHERE match(dst_ip_str,  '^(10\.|172\.(1[6-9]|2[0-9]|3[0-1])|192\.168|0\.|127\.)')=1
and `date` > toInt64(toDateTime(today()-1))*1000  
group by dst_ip_str

查询非工作时间登录且一月内无其它日期登录记录

SELECT src_ip_str , dst_ip_str, groupUniqArray(dt) as date_array from
(SELECT src_ip_str , dst_ip_str, toHour(toDateTime(`date`/1000)) as h, toDate(`date`/1000) as dt FROM ssh_log_meta
WHERE (src_ip_str , dst_ip_str) in (
    SELECT src_ip_str, dst_ip_str from (
        SELECT src_ip_str , dst_ip_str, toHour(toDateTime(`date`/1000)) as h, toDate(`date`/1000) as dt FROM ssh_log_meta
        WHERE (h<=7 or h>=23) and dt=today()
    )
) and (h<=7 or h>=23) and dt > today() - 30
)group by src_ip_str , dst_ip_str
HAVING LENGTH(date_array)=1