要是说检测恶意内容纯属扯淡,只能看异常行为
非标端口(常见端口, 22, xx22),
SELECT src_ip_str , dst_ip_str, groupUniqArray(dst_port),count(*) as n FROM ssh_log_meta WHERE match(toString(dst_port) , '.*22')=0
group by src_ip_str , dst_ip_str
order by n desc
SSH 非标协议(22 端口无法解析出 SSH 协议)
SELECT client_ip_addr, server_ip_addr,groupUniqArray(link_id), groupUniqArray(protocol) as protocol_list, toDateTime(min(`date`)/1000) , count(*) as n FROM tcp_session
WHERE server_port=22 and server_total_packet>3 and protocol not in [826, 600]
and `date` > toInt64(now() - INTERVAL 1 DAY)*1000
group by client_ip_addr, server_ip_addr
order by n desc
客户端大量 UA (对同款软件的不同版本是否合并?某个服务器被多人使用,用的不同客户端)
SELECT src_ip_str, arrayDistinct(splitByChar(',',arrayStringConcat(groupArray(client_support), ','))) as UA, LENGTH(UA) as n from ssh_log_meta
group by src_ip_str
HAVING n>2
ORDER BY n desc
SSH 端口暴露统计(外部的地址访问)
SELECT src_ip_str, dst_ip_str, max(`date`) as dt,count(*) as n FROM ssh_log_meta
WHERE match(src_ip_str, '^(10\.|172\.(1[6-9]|2[0-9]|3[0-1])|192\.168|0\.|127\.)')=0
group by src_ip_str, dst_ip_str
ssh 爆破 (10 分钟>100 次)
SELECT
src_ip_str, dst_ip_str, min(access_time) as min_time, max(access_time) as max_time, COUNT(0) AS access_count
FROM (
SELECT
src_ip_str, dst_ip_str, toDateTime(`date`/1000) AS access_time,
toStartOfInterval(access_time, INTERVAL 10 MINUTE) AS time_slice
FROM
ssh_log_meta
)
GROUP BY
src_ip_str, dst_ip_str, time_slice
HAVING
access_count > 100
ORDER BY
access_count DESC
ssh 密码喷洒(短时间内访问超过 10 个 ssh)
SELECT
src_ip_str, groupUniqArray(dst_ip_str) as dst_ip_array, groupArray(access_time)[1] as access_time, LENGTH(dst_ip_array) AS server_num
FROM (
SELECT
src_ip_str, dst_ip_str, toDateTime(`date`/1000) AS access_time,
toStartOfInterval(access_time, INTERVAL 1 MINUTE) AS time_slice
FROM
ssh_log_meta
)
GROUP BY
src_ip_str, time_slice
HAVING
server_num > 10
ORDER BY
server_num DESC
访问方向统计, 1:内, 2:外(长期未被外网访问的被访问,一直访问内网的突然访问外网?)
SELECT src_ip_str, groupUniqArray(dst_ip_str), groupUniqArray(concat(toString(src_net), toString(dst_net))) as direction from
(
SELECT src_ip_str, dst_ip_str,
match(src_ip_str, '^(10\.|172\.(1[6-9]|2[0-9]|3[0-1])|192\.168|0\.|127\.)')=1 as src_net,
match(dst_ip_str, '^(10\.|172\.(1[6-9]|2[0-9]|3[0-1])|192\.168|0\.|127\.)')=1 as dst_net
FROM ssh_log_meta
)
group by src_ip_str
HAVING LENGTH(direction)>1
心跳/周期访问(计算连接时间差值,然后注意考虑抖动量)
SELECT link_id, src_ip_str, dst_ip_str, groupArray(dt_str) as dt_list, groupArray(time_diff) as dt_diff, arrayStringConcat(groupArray(toString(abs(time_diff-lag2)<time_diff*0.01)), '') as diff_str, anyHeavy(time_diff) as diff_mode from(
SELECT link_id, src_ip_str, dst_ip_str, FROM_UNIXTIME(CAST(dt AS bigint)) as dt_str, dt-lag as time_diff, any(time_diff) OVER (PARTITION By link_id, src_ip_str, dst_ip_str order by dt ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING) as lag2 from(
SELECT link_id, src_ip_str, dst_ip_str, arrayJoin(time_array) as dt, any(dt) OVER (PARTITION By link_id, src_ip_str, dst_ip_str order by dt ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING) as lag from(
SELECT link_id, src_ip_str, dst_ip_str, groupArray(`date`/1000) as time_array FROM ssh_log_meta
group by src_ip_str, dst_ip_str, link_id HAVING LENGTH(time_array)>5
)
)
) group by link_id, src_ip_str, dst_ip_str HAVING diff_str like '%11111%' and diff_mode>10
内网 ssh 服务器,新增访问者
SELECT groupUniqArray(src_ip_str), dst_ip_str FROM ssh_log_meta
WHERE match(dst_ip_str, '^(10\.|172\.(1[6-9]|2[0-9]|3[0-1])|192\.168|0\.|127\.)')=1
and `date` < toInt64(toDateTime(today()-1))*1000
group by dst_ip_str
SELECT groupUniqArray(src_ip_str), dst_ip_str FROM ssh_log_meta
WHERE match(dst_ip_str, '^(10\.|172\.(1[6-9]|2[0-9]|3[0-1])|192\.168|0\.|127\.)')=1
and `date` > toInt64(toDateTime(today()-1))*1000
group by dst_ip_str
查询非工作时间登录且一月内无其它日期登录记录
SELECT src_ip_str , dst_ip_str, groupUniqArray(dt) as date_array from
(SELECT src_ip_str , dst_ip_str, toHour(toDateTime(`date`/1000)) as h, toDate(`date`/1000) as dt FROM ssh_log_meta
WHERE (src_ip_str , dst_ip_str) in (
SELECT src_ip_str, dst_ip_str from (
SELECT src_ip_str , dst_ip_str, toHour(toDateTime(`date`/1000)) as h, toDate(`date`/1000) as dt FROM ssh_log_meta
WHERE (h<=7 or h>=23) and dt=today()
)
) and (h<=7 or h>=23) and dt > today() - 30
)group by src_ip_str , dst_ip_str
HAVING LENGTH(date_array)=1