收集 Signal Sciences WAF 記錄
本文說明如何使用 Google Cloud Storage 將 Signal Sciences WAF 記錄匯入 Google 安全作業。剖析器會將 Signal Sciences 記錄從 JSON 格式轉換為 Chronicle 的統一資料模型 (UDM)。它會處理兩種主要的訊息結構:「RPC.PreRequest/PostRequest」訊息會使用 Grok 模式進行剖析,其他訊息則會以 JSON 物件的形式處理,擷取相關欄位並對應至 UDM 結構定義。
事前準備
請確認您已具備下列必要條件:
- Google SecOps 執行個體
- 在 Google Cloud 環境中設定並啟用 VPC Flow
- 可存取 Signal Sciences WAF 的特殊存取權
建立 Google Cloud 儲存空間值區
- 登入 Google Cloud 控制台。
前往「Cloud Storage Buckets」(Cloud Storage 值區) 頁面。
按一下 [建立]。
在「Create a bucket」(建立值區) 頁面中輸入值區資訊。完成下列每個步驟後,請按一下「繼續」繼續下一步:
在「開始使用」部分執行下列操作:
- 輸入符合值區名稱規定的不重複名稱 (例如 vpcflow-logs)。
- 如要啟用階層命名空間,請按一下展開箭頭,展開「為檔案導向和資料密集型工作負載進行最佳化」部分,然後選取「為這個值區啟用階層命名空間」。
- 如要新增分類標籤,請按一下展開箭頭,展開「Labels」部分。
- 按一下「Add label」(新增標籤),然後指定標籤的鍵和值。
在「Choose where to store your data」(選擇資料的儲存位置) 專區中執行下列操作:
- 選取「位置類型」。
- 使用位置類型選單選取位置,指定要永久儲存值區內物件資料的位置。
- 如要設定跨值區複製作業,請展開「設定跨值區複製作業」部分。
在「Choose a storage class for your data」專區中,為值區選取預設儲存空間級別,或選取「Autoclass」,讓系統自動管理值區的資料儲存空間級別。
在「選取如何控制物件的存取權」專區中,選取「否」來強制執行公開存取防護,然後為值區物件選取存取權控管模型。
在「Choose how to protect object data」(選擇保護物件資料的方式) 專區中執行下列操作:
- 選取「資料保護」下方的任一選項,設定值區。
- 如要選擇物件資料的加密方式,請按一下標示為「資料加密」的展開箭頭,然後選取「資料加密方法」。
按一下 [建立]。
設定 Signal Sciences WAF API 金鑰
- 登入 Signal Sciences WAF 網頁 UI。
- 依序前往「我的個人資料」>「API 存取權權杖」。
- 按一下「新增 API 存取權杖」。
- 請提供不重複的描述性名稱 (例如
Google SecOps
)。 - 按一下「Create API access token」。
- 複製並儲存權杖至安全的位置。
- 按一下「我知道了」,即可完成建立權杖程序。
在 Linux 主機上部署指令碼,從 Signal Sciences 提取記錄並儲存在 Google Cloud中
- 使用 SSH 登入 Linux 主機。
安裝 Python 程式庫,將 Signal Sciences WAF JSON 儲存至 Cloud Storage 值區:
pip install google-cloud-storage
設定這個環境變數,呼叫含有 Google Cloud憑證的 JSON 檔案:
export GOOGLE_APPLICATION_CREDENTIALS="path/to/your/service-account-key.json"
請設定下列環境變數,因為這些資訊不得硬式編碼:
export SIGSCI_EMAIL=<Signal_Sciences_account_email> export SIGSCI_TOKEN=<Signal_Sciences_API_token> export SIGSCI_CORP=<Corporation_name_in_Signal_Sciences>
執行下列指令碼:
import sys import requests import os import calendar import json from datetime import datetime, timedelta from google.cloud import storage # Check if all necessary environment variables are set if 'SIGSCI_EMAIL' not in os.environ or 'SIGSCI_TOKEN' not in os.environ or 'SIGSCI_CORP' not in os.environ: print("ERROR: You need to define SIGSCI_EMAIL, SIGSCI_TOKEN, and SIGSCI_CORP environment variables.") print("Please fix and run again. Existing...") sys.exit(1) # Exit if environment variables are not set # Define the Google Cloud Storage bucket name and output file name bucket_name = 'Your_GCS_Bucket' # Replace with your GCS bucket name output_file_name = 'signal_sciences_logs.json' # Initialize Google Cloud Storage client storage_client = storage.Client() # Function to upload data to Google Cloud Storage def upload_to_gcs(bucket_name, data, destination_blob_name): bucket = storage_client.bucket(bucket_name) blob = bucket.blob(destination_blob_name) blob.upload_from_string(data, content_type='application/json') print(f"Data uploaded to {destination_blob_name} in bucket {bucket_name}") # Signal Sciences API information api_host = 'https://dashboard.signalsciences.net' # email = 'user@domain.com' # Signal Sciences account email # token = 'XXXXXXXX-XXXX-XXX-XXXX-XXXXXXXXXXXX' # API token for authentication # corp_name = 'Domain' # Corporation name in Signal Sciences # site_names = ['testenv'] # Replace with your actual site names # List of comma-delimited sites that you want to extract data from site_names = [ 'site123', 'site345' ] # Define all sites to pull logs from email = os.environ.get('SIGSCI_EMAIL') # Signal Sciences account email token = os.environ.get('SIGSCI_TOKEN') # API token for authentication corp_name = os.environ.get('SIGSCI_CORP') # Corporation name in Signal Sciences # Calculate the start and end timestamps for the previous hour in UTC until_time = datetime.utcnow().replace(minute=0, second=0, microsecond=0) from_time = until_time - timedelta(hours=1) until_time = calendar.timegm(until_time.utctimetuple()) from_time = calendar.timegm(from_time.utctimetuple()) # Prepare HTTP headers for the API request headers = { 'Content-Type': 'application/json', 'x-api-user': email, 'x-api-token': token } # Collect logs for each site collected_logs = [] for site_name in site_names: url = f"{api_host}/api/v0/corps/{corp_name}/sites/{site_name}/feed/requests?from={from_time}&until={until_time}" while True: response = requests.get(url, headers=headers) if response.status_code != 200: print(f"Error fetching logs: {response.text}", file=sys.stderr) break # Parse the JSON response data = response.json() collected_logs.extend(data['data']) # Add the log messages to our list # Pagination: check if there is a next page next_url = data.get('next', {}).get('uri') if not next_url: break url = api_host + next_url # Convert the collected logs to a newline-delimited JSON string json_data = '\n'.join(json.dumps(log) for log in collected_logs) # Save the newline-delimited JSON data to a GCS bucket upload_to_gcs(bucket_name, json_data, output_file_name)
在 Google SecOps 中設定動態饋給,以便擷取 Signal Sciences WAF 記錄
- 依序前往「SIEM 設定」>「動態饋給」。
- 按一下「新增」。
- 在「動態饋給名稱」欄位中輸入動態饋給的名稱 (例如
Signal Sciences WAF Logs
)。 - 選取「Google Cloud Storage」做為「來源類型」。
- 選取「Signal Sciences WAF」做為「記錄類型」。
- 按一下「取得服務帳戶」,做為「Chronicle 服務帳戶」。
- 點按「Next」。
指定下列輸入參數的值:
- Storage Bucket URI: Google Cloud 以
gs://my-bucket/<value>
格式表示的儲存空間值區網址。 - URI 是:選取「Directory which includes subdirectories」。
來源刪除選項:根據個人偏好選取刪除選項。
資產命名空間:資產命名空間。
擷取標籤:套用至這個動態饋給事件的標籤。
- Storage Bucket URI: Google Cloud 以
點按「Next」。
在「完成」畫面中查看新的動態饋給設定,然後按一下「提交」。
UDM 對應表
記錄欄位 | UDM 對應 | 邏輯 |
---|---|---|
CLIENT-IP | target.ip | 從 CLIENT-IP 標頭欄位擷取。 |
CLIENT-IP | target.port | 從 CLIENT-IP 標頭欄位擷取。 |
連線 | security_result.about.labels | 系統會從原始記錄 Connection 欄位擷取值,並對應至 security_result.about.labels 。 |
Content-Length | security_result.about.labels | 系統會從原始記錄 Content-Length 欄位擷取值,並對應至 security_result.about.labels 。 |
Content-Type | security_result.about.labels | 系統會從原始記錄 Content-Type 欄位擷取值,並對應至 security_result.about.labels 。 |
已建立 | metadata.event_timestamp | 系統會從原始記錄 created 欄位擷取值,並對應至 metadata.event_timestamp 。 |
details.headersIn | security_result.about.resource.attribute.labels | 系統會從原始記錄 details.headersIn 欄位擷取值,並對應至 security_result.about.resource.attribute.labels 。 |
details.headersOut | security_result.about.resource.attribute.labels | 系統會從原始記錄 details.headersOut 欄位擷取值,並對應至 security_result.about.resource.attribute.labels 。 |
details.id | principal.process.pid | 系統會從原始記錄 details.id 欄位擷取值,並對應至 principal.process.pid 。 |
details.method | network.http.method | 系統會從原始記錄 details.method 欄位擷取值,並對應至 network.http.method 。 |
details.protocol | network.application_protocol | 系統會從原始記錄 details.protocol 欄位擷取值,並對應至 network.application_protocol 。 |
details.remoteCountryCode | principal.location.country_or_region | 系統會從原始記錄 details.remoteCountryCode 欄位擷取值,並對應至 principal.location.country_or_region 。 |
details.remoteHostname | target.hostname | 系統會從原始記錄 details.remoteHostname 欄位擷取值,並對應至 target.hostname 。 |
details.remoteIP | target.ip | 系統會從原始記錄 details.remoteIP 欄位擷取值,並對應至 target.ip 。 |
details.responseCode | network.http.response_code | 系統會從原始記錄 details.responseCode 欄位擷取值,並對應至 network.http.response_code 。 |
details.responseSize | network.received_bytes | 系統會從原始記錄 details.responseSize 欄位擷取值,並對應至 network.received_bytes 。 |
details.serverHostname | principal.hostname | 系統會從原始記錄 details.serverHostname 欄位擷取值,並對應至 principal.hostname 。 |
details.serverName | principal.asset.network_domain | 系統會從原始記錄 details.serverName 欄位擷取值,並對應至 principal.asset.network_domain 。 |
details.tags | security_result.detection_fields | 系統會從原始記錄 details.tags 欄位擷取值,並對應至 security_result.detection_fields 。 |
details.tlsCipher | network.tls.cipher | 系統會從原始記錄 details.tlsCipher 欄位擷取值,並對應至 network.tls.cipher 。 |
details.tlsProtocol | network.tls.version | 系統會從原始記錄 details.tlsProtocol 欄位擷取值,並對應至 network.tls.version 。 |
details.userAgent | network.http.user_agent | 系統會從原始記錄 details.userAgent 欄位擷取值,並對應至 network.http.user_agent 。 |
details.uri | network.http.referral_url | 系統會從原始記錄 details.uri 欄位擷取值,並對應至 network.http.referral_url 。 |
eventType | metadata.product_event_type | 系統會從原始記錄 eventType 欄位擷取值,並對應至 metadata.product_event_type 。 |
headersIn | security_result.about.labels | 系統會從原始記錄 headersIn 欄位擷取值,並對應至 security_result.about.labels 。 |
headersOut | security_result.about.labels | 系統會從原始記錄 headersOut 欄位擷取值,並對應至 security_result.about.labels 。 |
id | principal.process.pid | 系統會從原始記錄 id 欄位擷取值,並對應至 principal.process.pid 。 |
訊息 | metadata.description | 系統會從原始記錄 message 欄位擷取值,並對應至 metadata.description 。 |
方法 | network.http.method | 系統會從原始記錄 method 欄位擷取值,並對應至 network.http.method 。 |
ModuleVersion | metadata.ingestion_labels | 系統會從原始記錄 ModuleVersion 欄位擷取值,並對應至 metadata.ingestion_labels 。 |
msgData.actions | security_result.action | 系統會從原始記錄 msgData.actions 欄位擷取值,並對應至 security_result.action 。 |
msgData.changes | target.resource.attribute.labels | 系統會從原始記錄 msgData.changes 欄位擷取值,並對應至 target.resource.attribute.labels 。 |
msgData.conditions | security_result.description | 系統會從原始記錄 msgData.conditions 欄位擷取值,並對應至 security_result.description 。 |
msgData.detailLink | network.http.referral_url | 系統會從原始記錄 msgData.detailLink 欄位擷取值,並對應至 network.http.referral_url 。 |
msgData.name | target.resource.name | 系統會從原始記錄 msgData.name 欄位擷取值,並對應至 target.resource.name 。 |
msgData.reason | security_result.summary | 系統會從原始記錄 msgData.reason 欄位擷取值,並對應至 security_result.summary 。 |
msgData.sites | network.http.user_agent | 系統會從原始記錄 msgData.sites 欄位擷取值,並對應至 network.http.user_agent 。 |
通訊協定 | network.application_protocol | 系統會從原始記錄 protocol 欄位擷取值,並對應至 network.application_protocol 。 |
remoteCountryCode | principal.location.country_or_region | 系統會從原始記錄 remoteCountryCode 欄位擷取值,並對應至 principal.location.country_or_region 。 |
remoteHostname | target.hostname | 系統會從原始記錄 remoteHostname 欄位擷取值,並對應至 target.hostname 。 |
remoteIP | target.ip | 系統會從原始記錄 remoteIP 欄位擷取值,並對應至 target.ip 。 |
responseCode | network.http.response_code | 系統會從原始記錄 responseCode 欄位擷取值,並對應至 network.http.response_code 。 |
responseSize | network.received_bytes | 系統會從原始記錄 responseSize 欄位擷取值,並對應至 network.received_bytes 。 |
serverHostname | principal.hostname | 系統會從原始記錄 serverHostname 欄位擷取值,並對應至 principal.hostname 。 |
serverName | principal.asset.network_domain | 系統會從原始記錄 serverName 欄位擷取值,並對應至 principal.asset.network_domain 。 |
標記 | security_result.detection_fields | 系統會從原始記錄 tags 欄位擷取值,並對應至 security_result.detection_fields 。 |
時間戳記 | metadata.event_timestamp | 系統會從原始記錄 timestamp 欄位擷取值,並對應至 metadata.event_timestamp 。 |
tlsCipher | network.tls.cipher | 系統會從原始記錄 tlsCipher 欄位擷取值,並對應至 network.tls.cipher 。 |
tlsProtocol | network.tls.version | 系統會從原始記錄 tlsProtocol 欄位擷取值,並對應至 network.tls.version 。 |
URI | target.url | 系統會從原始記錄 URI 欄位擷取值,並對應至 target.url 。 |
userAgent | network.http.user_agent | 系統會從原始記錄 userAgent 欄位擷取值,並對應至 network.http.user_agent 。 |
uri | network.http.referral_url | 系統會從原始記錄 uri 欄位擷取值,並對應至 network.http.referral_url 。 |
X-ARR-SSL | network.tls.client.certificate.issuer | 使用 grok 和 kv 篩選器,從 X-ARR-SSL 標頭欄位擷取值。 |
metadata.event_type | 事件類型是由剖析器根據目標和主要資訊的存在與否決定。如果目標和主要對象都存在,事件類型為 NETWORK_HTTP 。如果只有主體,事件類型為 STATUS_UPDATE 。否則,事件類型為 GENERIC_EVENT 。 |
|
metadata.log_type | 值為硬式編碼的 SIGNAL_SCIENCES_WAF 。 |
|
metadata.product_name | 值為硬式編碼的 Signal Sciences WAF 。 |
|
metadata.vendor_name | 值為硬式編碼的 Signal Sciences 。 |
|
principal.asset.hostname | 這個值取自 principal.hostname 欄位。 |
|
target.asset.hostname | 這個值取自 target.hostname 欄位。 |
|
target.asset.ip | 這個值取自 target.ip 欄位。 |
|
target.user.user_display_name | 系統會使用 grok 篩選器從 message_data 欄位擷取值。 |
|
target.user.userid | 系統會使用 grok 篩選器從 message_data 欄位擷取值。 |
還有其他問題嗎?向社群成員和 Google SecOps 專家尋求解答。