Documentation Index Fetch the complete documentation index at: https://mintlify.com/fortra/impacket/llms.txt
Use this file to discover all available pages before exploring further.
MS-RPC Protocol
Impacket’s DCERPC (Distributed Computing Environment / Remote Procedure Call) implementation provides comprehensive support for Microsoft RPC protocols, enabling remote administration and exploitation of Windows services.
Overview
The DCERPC implementation is located in impacket/dcerpc/v5/ and includes:
Transport layers - SMB named pipes, TCP/IP, HTTP
40+ protocol interfaces - Service control, registry, SAM, Active Directory, and more
Authentication - NTLM, Kerberos, anonymous
Data marshalling - NDR (Network Data Representation)
Protocol Stack DCERPC sits on top of various transports (SMB, TCP, HTTP) and provides a framework for calling remote procedures on Windows systems. Each service exposes specific interfaces identified by UUIDs.
Transport Layers
DCERPC supports multiple transport protocols, configured via string bindings.
String Bindings
From transport.py:36-94, string bindings follow this format:
[uuid@]protocol:address[endpoint,options]
SMB Named Pipes (ncacn_np)
from impacket.dcerpc.v5 import transport, scmr
# Connect via SMB named pipe
string_binding = r 'ncacn_np:192 . 168 . 1 . 10 [ \\ pipe \\ svcctl ] '
rpc_transport = transport.DCERPCTransportFactory(string_binding)
# Set credentials
rpc_transport.set_credentials( 'username' , 'password' , 'DOMAIN' )
# Connect and bind
dce = rpc_transport.get_dce_rpc()
dce.connect()
dce.bind(scmr. MSRPC_UUID_SCMR )
# Call RPC method
resp = scmr.hROpenSCManagerW(dce)
print ( f "SCM Handle: { resp[ 'lpScHandle' ] } " )
dce.disconnect()
TCP/IP Transport (ncacn_ip_tcp)
# Direct TCP connection (requires port)
string_binding = 'ncacn_ip_tcp:192.168.1.10[135]' # EPM endpoint
rpc_transport = transport.DCERPCTransportFactory(string_binding)
# Or let endpoint mapper resolve
from impacket.dcerpc.v5 import epm
string_binding = 'ncacn_ip_tcp:192.168.1.10'
rpc_transport = transport.DCERPCTransportFactory(string_binding)
rpc_transport.set_credentials( 'user' , 'pass' , 'DOMAIN' )
dce = rpc_transport.get_dce_rpc()
dce.connect()
# Endpoint mapper will resolve the interface
dce.bind(scmr. MSRPC_UUID_SCMR )
HTTP Transport (ncacn_http)
# RPC over HTTP (RPC Proxy)
string_binding = 'ncacn_http:mail.corp.com[6001,RpcProxy=mail.corp.com:443]'
rpc_transport = transport.DCERPCTransportFactory(string_binding)
rpc_transport.set_credentials( 'user' , 'pass' , 'CORP' )
dce = rpc_transport.get_dce_rpc()
dce.connect()
Common RPC Interfaces
Service Control Manager (SCMR)
Manage Windows services remotely.
from impacket.dcerpc.v5 import transport, scmr
def list_services ( target , username , password , domain = '' ):
string_binding = rf 'ncacn_np: { target } [ \\ pipe \\ svcctl]'
rpc_transport = transport.DCERPCTransportFactory(string_binding)
rpc_transport.set_credentials(username, password, domain)
dce = rpc_transport.get_dce_rpc()
dce.connect()
dce.bind(scmr. MSRPC_UUID_SCMR )
# Open SCM
resp = scmr.hROpenSCManagerW(dce)
scm_handle = resp[ 'lpScHandle' ]
# Enumerate services
resp = scmr.hREnumServicesStatusW(
dce,
scm_handle,
scmr. SERVICE_WIN32_OWN_PROCESS | scmr. SERVICE_WIN32_SHARE_PROCESS
)
for service in resp[ 'lpServices' ]:
print ( f " { service[ 'lpServiceName' ] } : { service[ 'lpDisplayName' ] } " )
scmr.hRCloseServiceHandle(dce, scm_handle)
dce.disconnect()
# Create/start/stop service
def manage_service ( target , username , password ):
# ... connect and bind ...
resp = scmr.hROpenSCManagerW(dce)
scm_handle = resp[ 'lpScHandle' ]
# Open existing service
resp = scmr.hROpenServiceW(
dce,
scm_handle,
'EventLog' ,
scmr. SERVICE_ALL_ACCESS
)
svc_handle = resp[ 'lpServiceHandle' ]
# Query service status
resp = scmr.hRQueryServiceStatus(dce, svc_handle)
print ( f "Service State: { resp[ 'lpServiceStatus' ][ 'dwCurrentState' ] } " )
# Start service
scmr.hRStartServiceW(dce, svc_handle)
# Stop service
scmr.hRControlService(dce, svc_handle, scmr. SERVICE_CONTROL_STOP )
scmr.hRCloseServiceHandle(dce, svc_handle)
scmr.hRCloseServiceHandle(dce, scm_handle)
Remote Registry (RRP)
Access and modify Windows registry remotely.
from impacket.dcerpc.v5 import transport, rrp
def read_registry ( target , username , password ):
string_binding = rf 'ncacn_np: { target } [ \\ pipe \\ winreg]'
rpc_transport = transport.DCERPCTransportFactory(string_binding)
rpc_transport.set_credentials(username, password)
dce = rpc_transport.get_dce_rpc()
dce.connect()
dce.bind(rrp. MSRPC_UUID_RRP )
# Open HKLM
resp = rrp.hOpenLocalMachine(dce)
reg_handle = resp[ 'phKey' ]
# Open key
resp = rrp.hBaseRegOpenKey(
dce,
reg_handle,
'SOFTWARE \\ Microsoft \\ Windows NT \\ CurrentVersion'
)
key_handle = resp[ 'phkResult' ]
# Query value
resp = rrp.hBaseRegQueryValue(
dce,
key_handle,
'ProductName'
)
print ( f "Windows Version: { resp[ 'lpData' ] } " )
# Enumerate subkeys
i = 0
while True :
try :
resp = rrp.hBaseRegEnumKey(dce, key_handle, i)
print ( f "Subkey: { resp[ 'lpNameOut' ] } " )
i += 1
except Exception :
break
rrp.hBaseRegCloseKey(dce, key_handle)
rrp.hBaseRegCloseKey(dce, reg_handle)
dce.disconnect()
SAM Remote (SAMR)
Manage users and groups.
from impacket.dcerpc.v5 import transport, samr
def enumerate_users ( target , username , password ):
string_binding = rf 'ncacn_np: { target } [ \\ pipe \\ samr]'
rpc_transport = transport.DCERPCTransportFactory(string_binding)
rpc_transport.set_credentials(username, password)
dce = rpc_transport.get_dce_rpc()
dce.connect()
dce.bind(samr. MSRPC_UUID_SAMR )
# Connect to SAM
resp = samr.hSamrConnect(dce)
server_handle = resp[ 'ServerHandle' ]
# Open domain
resp = samr.hSamrLookupDomainInSamServer(
dce,
server_handle,
'Builtin'
)
domain_sid = resp[ 'DomainId' ]
resp = samr.hSamrOpenDomain(
dce,
server_handle,
domainId = domain_sid
)
domain_handle = resp[ 'DomainHandle' ]
# Enumerate users
resp = samr.hSamrEnumerateUsersInDomain(dce, domain_handle)
for user in resp[ 'Buffer' ][ 'Buffer' ]:
print ( f "User: { user[ 'Name' ] } (RID: { user[ 'RelativeId' ] } )" )
samr.hSamrCloseHandle(dce, domain_handle)
samr.hSamrCloseHandle(dce, server_handle)
dce.disconnect()
Server Service (SRVS)
Enumerate shares and sessions.
from impacket.dcerpc.v5 import transport, srvs
def enum_shares_rpc ( target , username , password ):
string_binding = rf 'ncacn_np: { target } [ \\ pipe \\ srvsvc]'
rpc_transport = transport.DCERPCTransportFactory(string_binding)
rpc_transport.set_credentials(username, password)
dce = rpc_transport.get_dce_rpc()
dce.connect()
dce.bind(srvs. MSRPC_UUID_SRVS )
# Enumerate shares (level 1)
resp = srvs.hNetrShareEnum(dce, 1 )
for share in resp[ 'InfoStruct' ][ 'ShareInfo' ][ 'Level1' ][ 'Buffer' ]:
print ( f " { share[ 'shi1_netname' ][: - 1 ] } : { share[ 'shi1_remark' ][: - 1 ] } " )
# Enumerate sessions
resp = srvs.hNetrSessionEnum(dce, NULL , NULL , 10 )
for session in resp[ 'InfoStruct' ][ 'SessionInfo' ][ 'Level10' ][ 'Buffer' ]:
print ( f "User: { session[ 'sesi10_username' ] } from { session[ 'sesi10_cname' ] } " )
dce.disconnect()
Advanced Interfaces
Active Directory (DRSUAPI)
Domain replication and DCSync.
from impacket.dcerpc.v5 import transport, drsuapi
from impacket.dcerpc.v5.dtyp import NULL
def dcsync_user ( target , username , password , target_user ):
"""
DCSync to retrieve user hash (requires replication rights)
"""
string_binding = rf 'ncacn_ip_tcp: { target } [135]'
rpc_transport = transport.DCERPCTransportFactory(string_binding)
rpc_transport.set_credentials(username, password, 'DOMAIN' )
rpc_transport.set_kerberos( True , kdcHost = target)
dce = rpc_transport.get_dce_rpc()
dce.connect()
dce.bind(drsuapi. MSRPC_UUID_DRSUAPI )
# DRS Bind
request = drsuapi.DRSBind()
request[ 'puuidClientDsa' ] = drsuapi. NTDSAPI_CLIENT_GUID
drs = drsuapi.DRS_EXTENSIONS_INT()
drs[ 'cb' ] = len (drs)
drs[ 'dwFlags' ] = drsuapi. DRS_EXT_GETCHGREQ_V6 | drsuapi. DRS_EXT_GETCHGREPLY_V6
request[ 'pextClient' ][ 'cb' ] = len (drs)
request[ 'pextClient' ][ 'rgb' ] = list (drs.getData())
resp = dce.request(request)
drs_handle = resp[ 'phDrs' ]
# Get NC changes (simplified)
# Full implementation in secretsdump.py
drsuapi.hDRSUnbind(dce, drs_handle)
dce.disconnect()
Task Scheduler (TSCH)
Schedule tasks remotely.
from impacket.dcerpc.v5 import transport, tsch
import time
def create_scheduled_task ( target , username , password ):
string_binding = rf 'ncacn_np: { target } [ \\ pipe \\ atsvc]'
rpc_transport = transport.DCERPCTransportFactory(string_binding)
rpc_transport.set_credentials(username, password)
dce = rpc_transport.get_dce_rpc()
dce.connect()
dce.bind(tsch. MSRPC_UUID_TSCHS )
# Create XML task definition
xml = f """
<?xml version="1.0" encoding="UTF-16"?>
<Task version="1.2">
<Actions>
<Exec>
<Command>cmd.exe</Command>
<Arguments>/c calc.exe</Arguments>
</Exec>
</Actions>
<Triggers>
<TimeTrigger>
<StartBoundary>2026-01-01T00:00:00</StartBoundary>
</TimeTrigger>
</Triggers>
</Task>
"""
tsch.hSchRpcRegisterTask(
dce,
' \\ MyTask' ,
xml,
tsch. TASK_CREATE ,
NULL ,
tsch. TASK_LOGON_NONE
)
# Run task immediately
tsch.hSchRpcRun(dce, ' \\ MyTask' )
# Delete task
tsch.hSchRpcDelete(dce, ' \\ MyTask' )
dce.disconnect()
Authentication
rpc_transport.set_credentials( 'user' , 'password' , 'DOMAIN' )
# Or with hashes
rpc_transport.set_credentials(
'user' ,
'' ,
'DOMAIN' ,
lmhash = 'aad3b435b51404eeaad3b435b51404ee' ,
nthash = '31d6cfe0d16ae931b73c59d7e0c089c0'
)
rpc_transport.set_credentials( 'user' , 'password' , 'DOMAIN' )
rpc_transport.set_kerberos( True , kdcHost = 'dc01.corp.local' )
# With AES key
rpc_transport.set_kerberos(
True ,
kdcHost = 'dc01.corp.local' ,
aesKey = 'e5c4d39c...'
)
# Anonymous/null session
rpc_transport.set_credentials( '' , '' )
dce = rpc_transport.get_dce_rpc()
dce.connect()
dce.bind(epm. MSRPC_UUID_PORTMAP )
Endpoint Mapper
The Endpoint Mapper (EPM) helps resolve RPC interfaces to network addresses.
from impacket.dcerpc.v5 import transport, epm
from impacket.dcerpc.v5.dtypes import NULL
def query_endpoint_mapper ( target ):
string_binding = f 'ncacn_ip_tcp: { target } [135]'
rpc_transport = transport.DCERPCTransportFactory(string_binding)
dce = rpc_transport.get_dce_rpc()
dce.connect()
dce.bind(epm. MSRPC_UUID_PORTMAP )
# Lookup all endpoints
resp = epm.hept_lookup(dce, inquiry_type = epm. RPC_C_EP_ALL_ELTS )
for entry in resp:
binding = epm.PrintStringBinding(entry[ 'tower' ][ 'Floors' ])
uuid_str = bin_to_uuidtup(entry[ 'tower' ][ 'Floors' ][ 0 ])
print ( f " { uuid_str } : { binding } " )
dce.disconnect()
# Map specific interface
def map_interface ( target , interface_uuid ):
string_binding = f 'ncacn_ip_tcp: { target } [135]'
rpc_transport = transport.DCERPCTransportFactory(string_binding)
dce = rpc_transport.get_dce_rpc()
dce.connect()
dce.bind(epm. MSRPC_UUID_PORTMAP )
resp = epm.hept_map(
dce,
interface_uuid,
protocol = 'ncacn_ip_tcp'
)
print ( f "Interface available at: { resp } " )
dce.disconnect()
NDR Data Types
DCERPC uses NDR (Network Data Representation) for data marshalling.
from impacket.dcerpc.v5.ndr import NDRCALL , NDRSTRUCT , NDRUniConformantArray
from impacket.dcerpc.v5.dtypes import DWORD , LPWSTR , ULONG , NULL
# Define custom RPC structure
class MyStruct ( NDRSTRUCT ):
structure = (
( 'dwValue' , DWORD ),
( 'lpName' , LPWSTR ),
)
# Define RPC call
class MyRPCCall ( NDRCALL ):
opnum = 10 # Operation number
structure = (
( 'hHandle' , DWORD ),
( 'pStruct' , MyStruct),
)
# Use in request
request = MyRPCCall()
request[ 'hHandle' ] = 0x 12345678
request[ 'pStruct' ][ 'dwValue' ] = 42
request[ 'pStruct' ][ 'lpName' ] = 'TestName'
resp = dce.request(request)
Error Handling
RPC calls can fail for various reasons: access denied, invalid parameters, service unavailable. Always handle DCERPCException.
from impacket.dcerpc.v5.rpcrt import DCERPCException, RPC_C_AUTHN_LEVEL_NONE
from impacket.dcerpc.v5 import transport, scmr
try :
string_binding = rf 'ncacn_np: { target } [ \\ pipe \\ svcctl]'
rpc_transport = transport.DCERPCTransportFactory(string_binding)
rpc_transport.set_credentials( 'user' , 'pass' , 'DOMAIN' )
dce = rpc_transport.get_dce_rpc()
dce.set_auth_level( RPC_C_AUTHN_LEVEL_PKT_PRIVACY )
dce.connect()
dce.bind(scmr. MSRPC_UUID_SCMR )
resp = scmr.hROpenSCManagerW(dce)
except DCERPCException as e:
print ( f "RPC Error: { e } " )
error_code = e.get_error_code()
if error_code == 0x 5 : # ERROR_ACCESS_DENIED
print ( "Access denied - insufficient privileges" )
elif error_code == 0x 6ba : # RPC_S_SERVER_UNAVAILABLE
print ( "RPC server unavailable" )
finally :
if dce:
dce.disconnect()
Complete Example: Service Manager
from impacket.dcerpc.v5 import transport, scmr
from impacket.dcerpc.v5.rpcrt import DCERPCException
import sys
def service_manager ( target , username , password , service_name , action ):
"""
Manage Windows services via RPC
Actions: list, start, stop, query
"""
string_binding = rf 'ncacn_np: { target } [ \\ pipe \\ svcctl]'
rpc_transport = transport.DCERPCTransportFactory(string_binding)
rpc_transport.set_credentials(username, password)
dce = rpc_transport.get_dce_rpc()
try :
dce.connect()
dce.bind(scmr. MSRPC_UUID_SCMR )
# Open SCM
resp = scmr.hROpenSCManagerW(dce)
scm_handle = resp[ 'lpScHandle' ]
if action == 'list' :
# Enumerate all services
resp = scmr.hREnumServicesStatusW(dce, scm_handle)
print ( f " \n [*] Services on { target } :" )
for svc in resp[ 'lpServices' ]:
state_map = {
1 : 'STOPPED' ,
2 : 'START_PENDING' ,
3 : 'STOP_PENDING' ,
4 : 'RUNNING'
}
state = state_map.get(svc[ 'ServiceStatus' ][ 'dwCurrentState' ], 'UNKNOWN' )
print ( f " { svc[ 'lpServiceName' ] :30} [ { state } ] - { svc[ 'lpDisplayName' ] } " )
else :
# Open specific service
resp = scmr.hROpenServiceW(
dce,
scm_handle,
service_name,
scmr. SERVICE_ALL_ACCESS
)
svc_handle = resp[ 'lpServiceHandle' ]
if action == 'query' :
resp = scmr.hRQueryServiceStatus(dce, svc_handle)
status = resp[ 'lpServiceStatus' ]
print ( f " \n [*] { service_name } Status:" )
print ( f " State: { status[ 'dwCurrentState' ] } " )
print ( f " Type: { status[ 'dwServiceType' ] } " )
print ( f " Controls Accepted: { status[ 'dwControlsAccepted' ] } " )
elif action == 'start' :
print ( f "[*] Starting { service_name } ..." )
scmr.hRStartServiceW(dce, svc_handle)
print ( "[+] Service started" )
elif action == 'stop' :
print ( f "[*] Stopping { service_name } ..." )
scmr.hRControlService(dce, svc_handle, scmr. SERVICE_CONTROL_STOP )
print ( "[+] Service stopped" )
scmr.hRCloseServiceHandle(dce, svc_handle)
scmr.hRCloseServiceHandle(dce, scm_handle)
except DCERPCException as e:
print ( f "[-] RPC Error: { e } " )
return 1
except Exception as e:
print ( f "[-] Error: { e } " )
return 1
finally :
dce.disconnect()
return 0
if __name__ == '__main__' :
if len (sys.argv) < 5 :
print ( f "Usage: { sys.argv[ 0 ] } <target> <user> <pass> <action> [service]" )
print ( "Actions: list, start, stop, query" )
sys.exit( 1 )
target = sys.argv[ 1 ]
user = sys.argv[ 2 ]
password = sys.argv[ 3 ]
action = sys.argv[ 4 ]
service = sys.argv[ 5 ] if len (sys.argv) > 5 else None
sys.exit(service_manager(target, user, password, service, action))
Available Interfaces
Impacket includes 40+ RPC interfaces in impacket/dcerpc/v5/:
Common Interfaces
scmr - Service Control Manager
rrp - Remote Registry Protocol
samr - Security Account Manager
srvs - Server Service
lsad/lsat - Local Security Authority
drsuapi - Directory Replication Service
tsch - Task Scheduler
epm - Endpoint Mapper
even/even6 - Event Log
wkst - Workstation Service
References
Source : impacket/dcerpc/v5/
[MS-RPCE] : Remote Procedure Call Protocol Extensions
[MS-DTYP] : Windows Data Types
[C706] : DCE 1.1: RPC Specification